Source code for pecos.monitoring.PerformanceMonitoring
import pandas as pd
import numpy as np
import re
import logging
none_list = ['','none','None','NONE', None, [], {}]
logger = logging.getLogger(__name__)
[docs]class PerformanceMonitoring(object):
def __init__(self):
"""
Performance Monitoring class
"""
self.df = pd.DataFrame()
self.trans = {}
self.tfilter = pd.Series()
self.test_results = pd.DataFrame(columns=['System Name', 'Variable Name',
'Start Date', 'End Date',
'Timesteps', 'Error Flag'])
[docs] def add_dataframe(self, df, system_name, add_identity_translation_dictonary = False):
"""
Add dataframe to the PerformanceMonitoring class
Parameters
-----------
df : pd.Dataframe
Dataframe to add to the Performance Monitoring class
system_name : string
System name
add_identity_translation_dictonary : bool (default = False)
Add a 1:1 translation dictonary to the Performance Monitoring class
using all column names in df
"""
temp = df.copy()
# Combine variable name with system name (System: Variable)
temp.columns = [system_name + ':' + s for s in temp.columns]
if self.df is not None:
self.df = self.df.combine_first(temp)
else:
self.df = temp.copy()
# define identifty translation
if add_identity_translation_dictonary:
trans = {}
for col in df.columns:
trans[col] = [col]
self.add_translation_dictonary(trans, system_name)
[docs] def add_translation_dictonary(self, trans, system_name):
"""
Add translation dictonary to the PerformanceMonitoring class
Parameters
-----------
trans : dictonary
Translation dictonary
system_name : string
System name
"""
# Combine variable name with system name (System: Variable)
for key, values in trans.iteritems():
self.trans[key] = []
for value in values:
self.trans[key].append(system_name + ':' + value)
[docs] def add_time_filter(self, time_filter):
"""
Add a time filter to the PerformanceMonitoring class
Parameters
----------
time_filter : pd.Series
Time filter containing boolean values for each time index
"""
if isinstance(time_filter, pd.DataFrame):
self.tfilter = pd.Series(data = time_filter.values[:,0], index = self.df.index)
else:
self.tfilter = time_filter
[docs] def add_signal(self, col_name, df):
"""
Add signal to the PerformanceMonitoring dataframe
Parameters
-----------
col_name : string
Column name to add to translation dictonary
df : pd.DataFarame
DataFrame to add to df
"""
df = pd.DataFrame(df)
if col_name in self.trans.keys():
logger.info(col_name + ' already exists in trans')
return
for col in df.columns.values.tolist():
if col in self.df.columns.values.tolist():
logger.info(col + ' already exists in df')
return
try:
self.trans[col_name] = df.columns.values.tolist()
#self.df[df.columns] = df
for col in df.columns:
self.df[col] = df[col]
except:
logger.warning("Add signal failed: " + col_name)
return
[docs] def append_test_results(self, mask, error_msg, min_failures=1, variable_name=True): #, sub_df=None):
"""
Append QC results to the PerformanceMonitoring class
Parameters
----------
mask : pd.Dataframe
Result from QC test, boolean values.
error_msg : string
Error message to store with the QC results
min_failures : int
Minimum number of consecutive failures required for reporting
variable_name : bool (default = True)
Add variable name to QC results, set to False for timestamp tests
"""
if len(mask.columns) == 1 and mask.columns[0] == 0:
sub_df = self.df
else:
sub_df = self.df[mask.columns]
# Find blocks
order = 'col'
if order == 'col':
mask = mask.T
np_mask = mask.values
start_nans_mask = np.hstack((np.resize(np_mask[:,0],(mask.shape[0],1)),
np.logical_and(np.logical_not(np_mask[:,:-1]), np_mask[:,1:])
))
stop_nans_mask = np.hstack((np.logical_and(np_mask[:,:-1], np.logical_not(np_mask[:,1:])),
np.resize(np_mask[:,-1], (mask.shape[0],1))
))
start_row_idx,start_col_idx = np.where(start_nans_mask)
stop_row_idx,stop_col_idx = np.where(stop_nans_mask)
if order == 'col':
temp = start_row_idx; start_row_idx = start_col_idx; start_col_idx = temp
temp = stop_row_idx; stop_row_idx = stop_col_idx; stop_col_idx = temp
#mask = mask.T
block = {'Start Row': list(start_row_idx),
'Start Col': list(start_col_idx),
'Stop Row': list(stop_row_idx),
'Stop Col': list(stop_col_idx)}
#if sub_df is None:
# sub_df = self.df
for i in range(len(block['Start Col'])):
length = block['Stop Row'][i] - block['Start Row'][i] + 1
if length >= min_failures:
if variable_name:
var_name = sub_df.iloc[:,block['Start Col'][i]].name #sub_df.icol(block['Start Col'][i]).name
system_name = ''
temp = var_name.split(':')
if len(temp) == 2:
var_name = temp[1]
system_name = temp[0]
else:
var_name = ''
system_name = ''
s = pd.Series([system_name, \
var_name, \
sub_df.index[block['Start Row'][i]], \
sub_df.index[block['Stop Row'][i]], \
length, \
error_msg], \
index=['System Name', 'Variable Name', 'Start Date', 'End Date', 'Timesteps', 'Error Flag'])
self.test_results = self.test_results.append(s, ignore_index=True)
[docs] def check_timestamp(self, frequency, expected_start_time=None, expected_end_time=None, min_failures=1):
"""
Check time series for non-monotonic and duplicate timestamps.
Parameters
----------
frequency : int
Expected timeseries frequency, in seconds
expected_start_time : Timestamp (default = None)
Expected start time. If not specified, the minimimum timestamp is used.
expected_end_time : Timestamp (default = None)
Expected end time. If not specified, the maximum timestamp is used.
min_failures : int (default = 1)
Minimum number of consecutive failures required for reporting
"""
logger.info("Check timestamp")
if self.df.empty:
logger.info("Empty database")
return
if expected_start_time is None:
expected_start_time = min(self.df.index)
if expected_end_time is None:
expected_end_time = max(self.df.index)
rng = pd.date_range(start=expected_start_time, end=expected_end_time, freq=str(frequency) + 's')
# Check to see if timestamp is monotonic
# mask = pd.TimeSeries(self.df.index).diff() < 0
mask = pd.Series(self.df.index).diff() < pd.Timedelta('0 days 00:00:00')
mask.index = self.df.index
mask[mask.index[0]] = False
mask = pd.DataFrame(mask)
mask.columns = [0]
self.append_test_results(mask, 'Nonmonotonic timestamp', variable_name=False, min_failures=min_failures)
# If not monotonic, sort df by timestamp
if not self.df.index.is_monotonic:
self.df = self.df.sort_index()
# Check for duplicate timestamps
# mask = pd.TimeSeries(self.df.index).diff() == 0
mask = pd.Series(self.df.index).diff() == pd.Timedelta('0 days 00:00:00')
mask.index = self.df.index
mask[mask.index[0]] = False
mask = pd.DataFrame(mask)
mask.columns = [0]
self.append_test_results(mask, 'Duplicate timestamp', variable_name=False, min_failures=min_failures)
# Drop duplicate timestamps
self.df['TEMP'] = self.df.index
#self.df.drop_duplicates(subset='TEMP', take_last=False, inplace=True)
self.df.drop_duplicates(subset='TEMP', keep='first', inplace=True)
# reindex timestamps
missing = []
for i in rng:
if i not in self.df.index:
missing.append(i)
self.df = self.df.reindex(index=rng)
mask = pd.DataFrame(data=self.df.shape[0]*[False], index = self.df.index)
mask.loc[missing] = True
self.append_test_results(mask, 'Missing timestamp', variable_name=False, min_failures=min_failures)
del self.df['TEMP']
[docs] def check_range(self, bound, key=None, specs={}, rolling_mean=1, min_failures=1):
"""
Check data range
Parameters
----------
bound : list
[lower bound, upper bound], None can be used in place of a lower or upper bound
key : string (default = None)
Translation dictonary key. If not specified, all columns are used in the test.
specs : dict (default = {})
Constants used in bound
rolling_mean : int (default = 1)
Rolling mean window in number of timesteps
min_failures : int (default = 1)
Minimum number of consecutive failures required for reporting
"""
logger.info("Check variable ranges")
if self.df.empty:
logger.info("Empty database")
return
tfilter = self.tfilter
# Isolate subset if key is not None
if key is not None:
try:
df = self.df[self.trans[key]]
except:
logger.warning("Insufficient data for Check Range: " + key)
return
else:
df = self.df
# Compute moving average
if rolling_mean > 1:
df = pd.rolling_mean(df, rolling_mean)
# Evaluate strings for bound values
for i in range(len(bound)):
if bound[i] in none_list:
bound[i] = None
elif type(bound[i]) is str:
bound[i] = self.evaluate_string('', bound[i], specs)
# Lower Bound
if bound[0] is not None:
mask = (df < bound[0])
if not tfilter.empty:
mask[~tfilter] = False
if mask.sum(axis=1).sum(axis=0) > 0:
self.append_test_results(mask, 'Data < lower bound, '+str(bound[0]), min_failures=min_failures) # sub_df=df)
# Upper Bound
if bound[1] is not None:
mask = (df > bound[1])
if not tfilter.empty:
mask[~tfilter] = False
if mask.sum(axis=1).sum(axis=0) > 0:
self.append_test_results(mask, 'Data > upper bound, '+str(bound[1]), min_failures=min_failures) #sub_df=df)
[docs] def check_increment(self, bound, key=None, specs={}, increment=1, absolute_value=True, rolling_mean=1, min_failures=1):
"""
Check range on data increments
Parameters
----------
bound : list
[lower bound, upper bound], None can be used in place of a lower or upper bound
key : string (default = None)
Translation dictonary key. If not specified, all columns are used in the test.
specs : dict (default = {})
Constants used in bound
increment : int (default = 1)
Timestep shift used to compute difference
absolute_value : bool (default = True)
Take the absolute value of the increment data
rolling_mean : int (default = 1)
Rolling mean window in number of timesteps
min_failures : int (default = 1)
Minimum number of consecutive failures required for reporting
"""
logger.info("Check increment range")
if self.df.empty:
logger.info("Empty database")
return
tfilter = self.tfilter
# Isolate subset if key is not None
if key is not None:
try:
df = self.df[self.trans[key]]
except:
logger.warning("Insufficient data for Check Increment: " + key)
return
else:
df = self.df
# Compute moving average
if rolling_mean > 1:
df = pd.rolling_mean(df, rolling_mean)
# Compute interval
if absolute_value:
df = np.abs(df.diff(periods=increment))
else:
df = df.diff(periods=increment)
# Evaluate strings for bound values
for i in range(len(bound)):
if bound[i] in none_list:
bound[i] = None
elif type(bound[i]) is str:
bound[i] = self.evaluate_string('', bound[i], specs)
# Lower Bound
if bound[0] is not None:
mask = (df < bound[0])
if not tfilter.empty:
mask[~tfilter] = False
if mask.sum(axis=1).sum(axis=0) > 0:
self.append_test_results(mask, 'Increment < lower bound, '+str(bound[0]), min_failures=min_failures) #sub_df=df)
# Upper Bound
if bound[1] is not None:
mask = (df > bound[1])
if not tfilter.empty:
mask[~tfilter] = False
if mask.sum(axis=1).sum(axis=0) > 0:
self.append_test_results(mask, 'Increment > upper bound, '+str(bound[1]), min_failures=min_failures) #sub_df=df)
[docs] def check_missing(self, key=None, min_failures=1):
"""
Check for missing data
Parameters
----------
key : string (default = None)
Translation dictonary key. If not specified, all columns are used in the test.
min_failures : int (default = 1)
Minimum number of consecutive failures required for reporting
"""
logger.info("Check for missing data")
if self.df.empty:
logger.info("Empty database")
return
tfilter = self.tfilter
if key is not None:
try:
df = self.df[self.trans[key]]
except:
logger.warning("Insufficient data for Check Missing: " + key)
return
else:
df = self.df
mask = pd.isnull(df) # checks for np.nan, np.inf
missing_timestamps = self.test_results[self.test_results['Error Flag'] == 'Missing timestamp']
for index, row in missing_timestamps.iterrows():
mask.loc[row['Start Date']:row['End Date']] = False
if not tfilter.empty:
mask[~tfilter] = False
if mask.sum(axis=1).sum(axis=0) > 0:
self.append_test_results(mask, 'Missing data', min_failures=min_failures)
[docs] def check_corrupt(self, corrupt_values, key=None, min_failures=1):
"""
Check for corrupt data
Parameters
----------
corrupt_values : list
List of corrupt data values
key : string (default = None)
Translation dictonary key. If not specified, all columns are used in the test.
min_failures : int (default = 1)
Minimum number of consecutive failures required for reporting
"""
logger.info("Check for corrupt data")
if self.df.empty:
logger.info("Empty database")
return
tfilter = self.tfilter
if key is not None:
try:
df = self.df[self.trans[key]]
except:
logger.warning("Insufficient data for Check Corrupt: " + key)
return
else:
df = self.df
mask = pd.DataFrame(data = np.zeros(df.shape), index = df.index, columns = df.columns, dtype = bool) # all False
for i in corrupt_values:
mask = mask | (df == i)
if not tfilter.empty:
mask[~tfilter] = False
if mask.sum(axis=1).sum(axis=0) > 0:
self.df[mask] = np.nan
self.append_test_results(mask, 'Corrupt data', min_failures=min_failures)
[docs] def evaluate_string(self, col_name, string_to_eval, specs={}):
"""
Returns the evaluated python equation written as a string (BETA).
For each [keyword] in string_to_eval,
[keyword] is first expanded to self.df[self.trans[keyword]],
if that fails, then [keyword] is expanded to specs[keyword].
Parameters
----------
col_name : string
Column name for the new signal
string_to_eval : string
String to evaluate
specs : dict (default = {})
Constants used as keywords
Returns
--------
signal : pd.DataFrame or pd.Series
DataFrame or Series with results of the evaluated string
"""
match = re.findall(r"\{(.*?)\}", string_to_eval)
for m in set(match):
m = m.replace('[','') # check for list
if m == 'ELAPSED_TIME':
ELAPSED_TIME = self.get_elapsed_time()
string_to_eval = string_to_eval.replace("{"+m+"}",m)
elif m == 'CLOCK_TIME':
CLOCK_TIME = self.get_clock_time()
string_to_eval = string_to_eval.replace("{"+m+"}",m)
else:
try:
self.df[self.trans[m]]
datastr = "self.df[self.trans['" + m + "']]"
string_to_eval = string_to_eval.replace("{"+m+"}",datastr)
except:
try:
specs[m]
datastr = "specs['" + m + "']"
string_to_eval = string_to_eval.replace("{"+m+"}",datastr)
except:
pass
try:
signal = eval(string_to_eval)
if type(signal) is tuple:
col_name = [col_name + " " + str(i+1) for i in range(len(signal))]
signal = pd.concat(signal, axis=1)
signal.columns = [col_name]
signal.index = self.df.index
elif type(signal) is float:
signal = signal
else:
signal = pd.DataFrame(signal)
if len(signal.columns) == 1:
signal.columns = [col_name]
else:
signal.columns = [col_name + " " + str(i+1) for i in range(signal.shape[1])]
signal.index = self.df.index
except:
signal = None
logger.warning("Insufficient data for Composite Signals: " + col_name + ' -- ' + string_to_eval)
return signal
[docs] def get_elapsed_time(self):
"""
Returns elapsed time in seconds from the dataframe index
Returns
--------
elapsed_time : pd.DataFrame
Elapsed time of the dataframe index
"""
elapsed_time = ((self.df.index - self.df.index[0]).values)/1000000000 # convert ns to s
elapsed_time = pd.DataFrame(data=elapsed_time, index=self.df.index, dtype=int)
return elapsed_time
[docs] def get_clock_time(self):
"""
Returns clock time in seconds from the dataframe index
Returns
--------
clock_time : pd.DataFrame
Clock time of the dataframe index
"""
clock_time = ((self.df.index - self.df.index[0]).values)/1000000000 # convert ns to s
clock_time = pd.DataFrame(data=clock_time, index=self.df.index, dtype=int)
clock_time = np.mod(clock_time, 86400)
return clock_time
[docs] def get_test_results_mask(self, key=None):
"""
Return a mask of data-times that failed a quality control test
Parameters
-----------
key : string (default = None)
Translation dictonary key. If not specified, all columns are used
Returns
--------
test_results_mask : pd.DataFrame
DataFrame containing boolean values for each data point, True =
data point pass all tests, False = data point did not pass at least one test.
"""
if self.df.empty:
logger.info("Empty database")
return
if key is not None:
try:
df = self.df[self.trans[key]]
except:
logger.warning("Key not in dataframe")
return
else:
df = self.df
test_results_mask = ~pd.isnull(df)
for i in self.test_results.index:
system = self.test_results.loc[i, 'System Name']
variable = self.test_results.loc[i, 'Variable Name']
start_date = self.test_results.loc[i, 'Start Date']
end_date = self.test_results.loc[i, 'End Date']
error_flag = self.test_results.loc[i, 'Error Flag']
if error_flag in ['Nonmonotonic timestamp', 'Duplicate timestamp']:
continue
if variable == '':
test_results_mask.loc[start_date:end_date] = False
else:
if system == '':
column_name = variable
else:
column_name = system + ':'+ variable
test_results_mask.loc[start_date:end_date,column_name] = False
return test_results_mask