Source code for pecos.monitoring
"""
The monitoring module contains the PerformanceMonitoring class used to run
quality control tests and store results.
"""
import pandas as pd
import numpy as np
import re
import logging
none_list = ['','none','None','NONE', None, [], {}]
logger = logging.getLogger(__name__)
[docs]class PerformanceMonitoring(object):
def __init__(self):
"""
PerformanceMonitoring class
"""
self.df = pd.DataFrame()
self.trans = {}
self.tfilter = pd.Series()
self.test_results = pd.DataFrame(columns=['System Name', 'Variable Name',
'Start Date', 'End Date',
'Timesteps', 'Error Flag'])
[docs] def add_dataframe(self, df, system_name, add_identity_translation_dictionary = False):
"""
Add DataFrame to the PerformanceMonitoring object.
Parameters
-----------
df : pd.Dataframe
Dataframe to add to the PerformanceMonitoring object
system_name : string
System name
add_identity_translation_dictionary : boolean (optional)
Add a 1:1 translation dictionary to the PerformanceMonitoring object
using all column names in df, default = False
"""
temp = df.copy()
# Combine variable name with system name (System: Variable)
temp.columns = [system_name + ':' + s for s in temp.columns]
if self.df is not None:
self.df = self.df.combine_first(temp)
else:
self.df = temp.copy()
# define identity translation
if add_identity_translation_dictionary:
trans = {}
for col in df.columns:
trans[col] = [col]
self.add_translation_dictionary(trans, system_name)
[docs] def add_translation_dictionary(self, trans, system_name):
"""
Add translation dictionary to the PerformanceMonitoring object.
Parameters
-----------
trans : dictionary
Translation dictionary
system_name : string
System name
"""
# Combine variable name with system name (System: Variable)
for key, values in trans.items():
self.trans[key] = []
for value in values:
self.trans[key].append(system_name + ':' + value)
[docs] def add_time_filter(self, time_filter):
"""
Add a time filter to the PerformanceMonitoring object.
Parameters
----------
time_filter : pd.DataFrame with a single column or pd.Series
Time filter containing boolean values for each time index
"""
if isinstance(time_filter, pd.DataFrame):
self.tfilter = pd.Series(data = time_filter.values[:,0], index = self.df.index)
else:
self.tfilter = time_filter
[docs] def add_signal(self, col_name, data):
"""
Add signal to the PerformanceMonitoring DataFrame.
Parameters
-----------
col_name : string
Column name to add to translation dictionary
data : pd.DataFarame or pd.Series
Data to add to df
"""
if type(data) is pd.core.series.Series:
data = data.to_frame(col_name)
if type(data) is not pd.core.frame.DataFrame:
logger.warning("Add signal failed")
return
if col_name in self.trans.keys():
logger.info(col_name + ' already exists in trans')
return
for col in data.columns.values.tolist():
if col in self.df.columns.values.tolist():
logger.info(col + ' already exists in df')
return
try:
self.trans[col_name] = data.columns.values.tolist()
#self.df[df.columns] = df
for col in data.columns:
self.df[col] = data[col]
except:
logger.warning("Add signal failed: " + col_name)
return
[docs] def append_test_results(self, mask, error_msg, min_failures=1, variable_name=True): #, sub_df=None):
"""
Append QC results to the PerformanceMonitoring object.
Parameters
----------
mask : pd.Dataframe
Result from quality control test, boolean values
error_msg : string
Error message to store with the QC results
min_failures : int (optional)
Minimum number of consecutive failures required for reporting, default = 1
variable_name : boolean (optional)
Add variable name to QC results, set to False for timestamp tests, default = True
"""
if len(mask.columns) == 1 and mask.columns[0] == 0:
sub_df = self.df
else:
sub_df = self.df[mask.columns]
# Find blocks
order = 'col'
if order == 'col':
mask = mask.T
np_mask = mask.values
start_nans_mask = np.hstack((np.resize(np_mask[:,0],(mask.shape[0],1)),
np.logical_and(np.logical_not(np_mask[:,:-1]), np_mask[:,1:])
))
stop_nans_mask = np.hstack((np.logical_and(np_mask[:,:-1], np.logical_not(np_mask[:,1:])),
np.resize(np_mask[:,-1], (mask.shape[0],1))
))
start_row_idx,start_col_idx = np.where(start_nans_mask)
stop_row_idx,stop_col_idx = np.where(stop_nans_mask)
if order == 'col':
temp = start_row_idx; start_row_idx = start_col_idx; start_col_idx = temp
temp = stop_row_idx; stop_row_idx = stop_col_idx; stop_col_idx = temp
#mask = mask.T
block = {'Start Row': list(start_row_idx),
'Start Col': list(start_col_idx),
'Stop Row': list(stop_row_idx),
'Stop Col': list(stop_col_idx)}
#if sub_df is None:
# sub_df = self.df
for i in range(len(block['Start Col'])):
length = block['Stop Row'][i] - block['Start Row'][i] + 1
if length >= min_failures:
if variable_name:
var_name = sub_df.iloc[:,block['Start Col'][i]].name #sub_df.icol(block['Start Col'][i]).name
system_name = ''
temp = var_name.split(':')
if len(temp) == 2:
var_name = temp[1]
system_name = temp[0]
else:
var_name = ''
system_name = ''
frame = pd.DataFrame([system_name, var_name,
sub_df.index[block['Start Row'][i]],
sub_df.index[block['Stop Row'][i]],
length, error_msg],
index=['System Name', 'Variable Name', 'Start Date', 'End Date', 'Timesteps', 'Error Flag'])
frame_t = frame.transpose()
self.test_results = self.test_results.append(frame_t, ignore_index=True)
[docs] def check_timestamp(self, frequency, expected_start_time=None, expected_end_time=None, min_failures=1):
"""
Check time series for non-monotonic and duplicate timestamps.
Parameters
----------
frequency : int
Expected time series frequency, in seconds
expected_start_time : Timestamp (optional)
Expected start time. If not specified, the minimum timestamp is used
expected_end_time : Timestamp (optional)
Expected end time. If not specified, the maximum timestamp is used
min_failures : int (optional)
Minimum number of consecutive failures required for reporting, default = 1
"""
logger.info("Check timestamp")
if self.df.empty:
logger.info("Empty database")
return
if expected_start_time is None:
expected_start_time = min(self.df.index)
if expected_end_time is None:
expected_end_time = max(self.df.index)
rng = pd.date_range(start=expected_start_time, end=expected_end_time, freq=str(frequency) + 's')
# Check to see if timestamp is monotonic
# mask = pd.TimeSeries(self.df.index).diff() < 0
mask = pd.Series(self.df.index).diff() < pd.Timedelta('0 days 00:00:00')
mask.index = self.df.index
mask[mask.index[0]] = False
mask = pd.DataFrame(mask)
mask.columns = [0]
self.append_test_results(mask, 'Nonmonotonic timestamp', variable_name=False, min_failures=min_failures)
# If not monotonic, sort df by timestamp
if not self.df.index.is_monotonic:
self.df = self.df.sort_index()
# Check for duplicate timestamps
# mask = pd.TimeSeries(self.df.index).diff() == 0
mask = pd.Series(self.df.index).diff() == pd.Timedelta('0 days 00:00:00')
mask.index = self.df.index
mask[mask.index[0]] = False
mask = pd.DataFrame(mask)
mask.columns = [0]
mask['TEMP'] = mask.index # remove duplicates in the mask
mask.drop_duplicates(subset='TEMP', keep='last', inplace=True)
del mask['TEMP']
# Drop duplicate timestamps (this has to be done before the results are appended)
self.df['TEMP'] = self.df.index
#self.df.drop_duplicates(subset='TEMP', take_last=False, inplace=True)
self.df.drop_duplicates(subset='TEMP', keep='first', inplace=True)
self.append_test_results(mask, 'Duplicate timestamp', variable_name=False, min_failures=min_failures)
# reindex timestamps
missing = []
for i in rng:
if i not in self.df.index:
missing.append(i)
self.df = self.df.reindex(index=rng)
mask = pd.DataFrame(data=self.df.shape[0]*[False], index = self.df.index)
mask.loc[missing] = True
self.append_test_results(mask, 'Missing timestamp', variable_name=False, min_failures=min_failures)
del self.df['TEMP']
[docs] def check_range(self, bound, key=None, specs={}, rolling_mean=1, min_failures=1):
"""
Check data range.
Parameters
----------
bound : list of floats
[lower bound, upper bound], None can be used in place of a lower or upper bound
key : string (optional)
Translation dictionary key. If not specified, all columns are used in the test.
specs : dictionary (optional)
Constants used in bound
rolling_mean : int (optional)
Rolling mean window in number of time steps, default = 1
min_failures : int (optional)
Minimum number of consecutive failures required for reporting, default = 1
"""
logger.info("Check variable ranges")
if self.df.empty:
logger.info("Empty database")
return
tfilter = self.tfilter
# Isolate subset if key is not None
if key is not None:
try:
df = self.df[self.trans[key]]
except:
logger.warning("Insufficient data for Check Range: " + key)
return
else:
df = self.df
# Compute moving average
if rolling_mean > 1:
df = pd.rolling_mean(df, rolling_mean)
# Evaluate strings for bound values
for i in range(len(bound)):
if bound[i] in none_list:
bound[i] = None
elif type(bound[i]) is str:
bound[i] = self.evaluate_string('', bound[i], specs)
# Lower Bound
if bound[0] is not None:
mask = (df < bound[0])
if not tfilter.empty:
mask[~tfilter] = False
if mask.sum(axis=1).sum(axis=0) > 0:
self.append_test_results(mask, 'Data < lower bound, '+str(bound[0]), min_failures=min_failures) # sub_df=df)
# Upper Bound
if bound[1] is not None:
mask = (df > bound[1])
if not tfilter.empty:
mask[~tfilter] = False
if mask.sum(axis=1).sum(axis=0) > 0:
self.append_test_results(mask, 'Data > upper bound, '+str(bound[1]), min_failures=min_failures) #sub_df=df)
[docs] def check_increment(self, bound, key=None, specs={}, increment=1, absolute_value=True, rolling_mean=1, min_failures=1):
"""
Check range on data increments.
Parameters
----------
bound : list of floats
[lower bound, upper bound], None can be used in place of a lower or upper bound
key : string (optional)
Translation dictionary key. If not specified, all columns are used in the test.
specs : dictionary (optional)
Constants used in bound
increment : int (optional)
Time step shift used to compute difference, default = 1
absolute_value : boolean (optional)
Take the absolute value of the increment data, default = True
rolling_mean : int (optional)
Rolling mean window in number of time steps, default = 1
min_failures : int (optional)
Minimum number of consecutive failures required for reporting, default = 1
"""
logger.info("Check increment range")
if self.df.empty:
logger.info("Empty database")
return
tfilter = self.tfilter
# Isolate subset if key is not None
if key is not None:
try:
df = self.df[self.trans[key]]
except:
logger.warning("Insufficient data for Check Increment: " + key)
return
else:
df = self.df
# Compute moving average
if rolling_mean > 1:
df = pd.rolling_mean(df, rolling_mean)
# Compute interval
if absolute_value:
df = np.abs(df.diff(periods=increment))
else:
df = df.diff(periods=increment)
# Evaluate strings for bound values
for i in range(len(bound)):
if bound[i] in none_list:
bound[i] = None
elif type(bound[i]) is str:
bound[i] = self.evaluate_string('', bound[i], specs)
# Lower Bound
if bound[0] is not None:
mask = (df < bound[0])
if not tfilter.empty:
mask[~tfilter] = False
if mask.sum(axis=1).sum(axis=0) > 0:
self.append_test_results(mask, 'Increment < lower bound, '+str(bound[0]), min_failures=min_failures) #sub_df=df)
# Upper Bound
if bound[1] is not None:
mask = (df > bound[1])
if not tfilter.empty:
mask[~tfilter] = False
if mask.sum(axis=1).sum(axis=0) > 0:
self.append_test_results(mask, 'Increment > upper bound, '+str(bound[1]), min_failures=min_failures) #sub_df=df)
[docs] def check_missing(self, key=None, min_failures=1):
"""
Check for missing data
Parameters
----------
key : string (optional)
Translation dictionary key. If not specified, all columns are used in the test.
min_failures : int (optional)
Minimum number of consecutive failures required for reporting, default = 1
"""
logger.info("Check for missing data")
if self.df.empty:
logger.info("Empty database")
return
tfilter = self.tfilter
if key is not None:
try:
df = self.df[self.trans[key]]
except:
logger.warning("Insufficient data for Check Missing: " + key)
return
else:
df = self.df
mask = pd.isnull(df) # checks for np.nan, np.inf
missing_timestamps = self.test_results[self.test_results['Error Flag'] == 'Missing timestamp']
for index, row in missing_timestamps.iterrows():
mask.loc[row['Start Date']:row['End Date']] = False
if not tfilter.empty:
mask[~tfilter] = False
if mask.sum(axis=1).sum(axis=0) > 0:
self.append_test_results(mask, 'Missing data', min_failures=min_failures)
[docs] def check_corrupt(self, corrupt_values, key=None, min_failures=1):
"""
Check for corrupt data.
Parameters
----------
corrupt_values : list of floats
List of corrupt data values
key : string (optional)
Translation dictionary key. If not specified, all columns are used in the test.
min_failures : int (optional)
Minimum number of consecutive failures required for reporting, default = 1
"""
logger.info("Check for corrupt data")
if self.df.empty:
logger.info("Empty database")
return
tfilter = self.tfilter
if key is not None:
try:
df = self.df[self.trans[key]]
except:
logger.warning("Insufficient data for Check Corrupt: " + key)
return
else:
df = self.df
mask = pd.DataFrame(data = np.zeros(df.shape), index = df.index, columns = df.columns, dtype = bool) # all False
for i in corrupt_values:
mask = mask | (df == i)
if not tfilter.empty:
mask[~tfilter] = False
if mask.sum(axis=1).sum(axis=0) > 0:
self.df[mask] = np.nan
self.append_test_results(mask, 'Corrupt data', min_failures=min_failures)
[docs] def evaluate_string(self, col_name, string_to_eval, specs={}):
"""
Returns the evaluated python equation written as a string (BETA).
For each {keyword} in string_to_eval,
{keyword} is first expanded to self.df[self.trans[keyword]],
if that fails, then {keyword} is expanded to specs[keyword].
Parameters
----------
col_name : string
Column name for the new signal
string_to_eval : string
String to evaluate
specs : dictionary (optional)
Constants used as keywords
Returns
--------
signal : pd.DataFrame or pd.Series
DataFrame or Series with results of the evaluated string
"""
match = re.findall(r"\{(.*?)\}", string_to_eval)
for m in set(match):
m = m.replace('[','') # check for list
if m == 'ELAPSED_TIME':
ELAPSED_TIME = self.get_elapsed_time()
string_to_eval = string_to_eval.replace("{"+m+"}",m)
elif m == 'CLOCK_TIME':
CLOCK_TIME = self.get_clock_time()
string_to_eval = string_to_eval.replace("{"+m+"}",m)
else:
try:
self.df[self.trans[m]]
datastr = "self.df[self.trans['" + m + "']]"
string_to_eval = string_to_eval.replace("{"+m+"}",datastr)
except:
try:
specs[m]
datastr = "specs['" + m + "']"
string_to_eval = string_to_eval.replace("{"+m+"}",datastr)
except:
pass
try:
signal = eval(string_to_eval)
if type(signal) is tuple:
col_name = [col_name + " " + str(i+1) for i in range(len(signal))]
signal = pd.concat(signal, axis=1)
signal.columns = [col_name]
signal.index = self.df.index
elif type(signal) is float:
signal = signal
else:
signal = pd.DataFrame(signal)
if len(signal.columns) == 1:
signal.columns = [col_name]
else:
signal.columns = [col_name + " " + str(i+1) for i in range(signal.shape[1])]
signal.index = self.df.index
except:
signal = None
logger.warning("Insufficient data for Composite Signals: " + col_name + ' -- ' + string_to_eval)
return signal
[docs] def get_elapsed_time(self):
"""
Returns the elapsed time in seconds for each Timestamp in the DataFrame index.
Returns
--------
elapsed_time : pd.DataFrame
Elapsed time of the DataFrame index
"""
elapsed_time = ((self.df.index - self.df.index[0]).values)/1000000000 # convert ns to s
elapsed_time = pd.DataFrame(data=elapsed_time, index=self.df.index, dtype=int)
return elapsed_time
[docs] def get_clock_time(self):
"""
Returns the time of day in seconds past midnight for each Timestamp in the DataFrame index.
Returns
--------
clock_time : pd.DataFrame
Clock time of the DataFrame index
"""
secofday = self.df.index.hour*3600 + \
self.df.index.minute*60 + \
self.df.index.second + \
self.df.index.microsecond/1000000.0
clock_time = pd.DataFrame(secofday, index=self.df.index)
return clock_time
[docs] def get_test_results_mask(self, key=None):
"""
Return a mask of data-times that failed a quality control test.
Parameters
-----------
key : string (optional)
Translation dictionary key. If not specified, all columns are used
Returns
--------
test_results_mask : pd.DataFrame
DataFrame containing boolean values for each data point, True =
data point pass all tests, False = data point did not pass at least one test.
"""
if self.df.empty:
logger.info("Empty database")
return
if key is not None:
try:
df = self.df[self.trans[key]]
except:
logger.warning("Key not in DataFrame")
return
else:
df = self.df
test_results_mask = ~pd.isnull(df)
for i in self.test_results.index:
system = self.test_results.loc[i, 'System Name']
variable = self.test_results.loc[i, 'Variable Name']
start_date = self.test_results.loc[i, 'Start Date']
end_date = self.test_results.loc[i, 'End Date']
error_flag = self.test_results.loc[i, 'Error Flag']
if error_flag in ['Nonmonotonic timestamp', 'Duplicate timestamp']:
continue
if variable == '': # occurs when data is missing
test_results_mask.loc[start_date:end_date] = False
else:
if system == '': # occurs when data is a composite signal
column_name = variable
else:
column_name = system + ':'+ variable
test_results_mask.loc[start_date:end_date,column_name] = False
return test_results_mask