Source code for pecos.monitoring.PerformanceMonitoring

import pandas as pd
import numpy as np
import re
import logging

none_list = ['','none','None','NONE', None, [], {}]

logger = logging.getLogger(__name__)
    
[docs]class PerformanceMonitoring(object): def __init__(self): """ Performance Monitoring class """ self.df = pd.DataFrame() self.trans = {} self.tfilter = pd.Series() self.test_results = pd.DataFrame(columns=['System Name', 'Variable Name', 'Start Date', 'End Date', 'Timesteps', 'Error Flag'])
[docs] def add_dataframe(self, df, system_name, add_identity_translation_dictonary = False): """ Add dataframe to the PerformanceMonitoring class Parameters ----------- df : pd.Dataframe Dataframe to add to the Performance Monitoring class system_name : string System name add_identity_translation_dictonary : bool (default = False) Add a 1:1 translation dictonary to the Performance Monitoring class using all column names in df """ temp = df.copy() # Combine variable name with system name (System: Variable) temp.columns = [system_name + ':' + s for s in temp.columns] if self.df is not None: self.df = self.df.combine_first(temp) else: self.df = temp.copy() # define identifty translation if add_identity_translation_dictonary: trans = {} for col in df.columns: trans[col] = [col] self.add_translation_dictonary(trans, system_name)
[docs] def add_translation_dictonary(self, trans, system_name): """ Add translation dictonary to the PerformanceMonitoring class Parameters ----------- trans : dictonary Translation dictonary system_name : string System name """ # Combine variable name with system name (System: Variable) for key, values in trans.iteritems(): self.trans[key] = [] for value in values: self.trans[key].append(system_name + ':' + value)
[docs] def add_time_filter(self, time_filter): """ Add a time filter to the PerformanceMonitoring class Parameters ---------- time_filter : pd.Series Time filter containing boolean values for each time index """ if isinstance(time_filter, pd.DataFrame): self.tfilter = pd.Series(data = time_filter.values[:,0], index = self.df.index) else: self.tfilter = time_filter
[docs] def add_signal(self, col_name, df): """ Add signal to the PerformanceMonitoring dataframe Parameters ----------- col_name : string Column name to add to translation dictonary df : pd.DataFarame DataFrame to add to df """ df = pd.DataFrame(df) if col_name in self.trans.keys(): logger.info(col_name + ' already exists in trans') return for col in df.columns.values.tolist(): if col in self.df.columns.values.tolist(): logger.info(col + ' already exists in df') return try: self.trans[col_name] = df.columns.values.tolist() #self.df[df.columns] = df for col in df.columns: self.df[col] = df[col] except: logger.warning("Add signal failed: " + col_name) return
[docs] def append_test_results(self, mask, error_msg, min_failures=1, variable_name=True): #, sub_df=None): """ Append QC results to the PerformanceMonitoring class Parameters ---------- mask : pd.Dataframe Result from QC test, boolean values. error_msg : string Error message to store with the QC results min_failures : int Minimum number of consecutive failures required for reporting variable_name : bool (default = True) Add variable name to QC results, set to False for timestamp tests """ if len(mask.columns) == 1 and mask.columns[0] == 0: sub_df = self.df else: sub_df = self.df[mask.columns] # Find blocks order = 'col' if order == 'col': mask = mask.T np_mask = mask.values start_nans_mask = np.hstack((np.resize(np_mask[:,0],(mask.shape[0],1)), np.logical_and(np.logical_not(np_mask[:,:-1]), np_mask[:,1:]) )) stop_nans_mask = np.hstack((np.logical_and(np_mask[:,:-1], np.logical_not(np_mask[:,1:])), np.resize(np_mask[:,-1], (mask.shape[0],1)) )) start_row_idx,start_col_idx = np.where(start_nans_mask) stop_row_idx,stop_col_idx = np.where(stop_nans_mask) if order == 'col': temp = start_row_idx; start_row_idx = start_col_idx; start_col_idx = temp temp = stop_row_idx; stop_row_idx = stop_col_idx; stop_col_idx = temp #mask = mask.T block = {'Start Row': list(start_row_idx), 'Start Col': list(start_col_idx), 'Stop Row': list(stop_row_idx), 'Stop Col': list(stop_col_idx)} #if sub_df is None: # sub_df = self.df for i in range(len(block['Start Col'])): length = block['Stop Row'][i] - block['Start Row'][i] + 1 if length >= min_failures: if variable_name: var_name = sub_df.iloc[:,block['Start Col'][i]].name #sub_df.icol(block['Start Col'][i]).name system_name = '' temp = var_name.split(':') if len(temp) == 2: var_name = temp[1] system_name = temp[0] else: var_name = '' system_name = '' s = pd.Series([system_name, \ var_name, \ sub_df.index[block['Start Row'][i]], \ sub_df.index[block['Stop Row'][i]], \ length, \ error_msg], \ index=['System Name', 'Variable Name', 'Start Date', 'End Date', 'Timesteps', 'Error Flag']) self.test_results = self.test_results.append(s, ignore_index=True)
[docs] def check_timestamp(self, frequency, expected_start_time=None, expected_end_time=None, min_failures=1): """ Check time series for non-monotonic and duplicate timestamps. Parameters ---------- frequency : int Expected timeseries frequency, in seconds expected_start_time : Timestamp (default = None) Expected start time. If not specified, the minimimum timestamp is used. expected_end_time : Timestamp (default = None) Expected end time. If not specified, the maximum timestamp is used. min_failures : int (default = 1) Minimum number of consecutive failures required for reporting """ logger.info("Check timestamp") if self.df.empty: logger.info("Empty database") return if expected_start_time is None: expected_start_time = min(self.df.index) if expected_end_time is None: expected_end_time = max(self.df.index) rng = pd.date_range(start=expected_start_time, end=expected_end_time, freq=str(frequency) + 's') # Check to see if timestamp is monotonic # mask = pd.TimeSeries(self.df.index).diff() < 0 mask = pd.Series(self.df.index).diff() < pd.Timedelta('0 days 00:00:00') mask.index = self.df.index mask[mask.index[0]] = False mask = pd.DataFrame(mask) mask.columns = [0] self.append_test_results(mask, 'Nonmonotonic timestamp', variable_name=False, min_failures=min_failures) # If not monotonic, sort df by timestamp if not self.df.index.is_monotonic: self.df = self.df.sort_index() # Check for duplicate timestamps # mask = pd.TimeSeries(self.df.index).diff() == 0 mask = pd.Series(self.df.index).diff() == pd.Timedelta('0 days 00:00:00') mask.index = self.df.index mask[mask.index[0]] = False mask = pd.DataFrame(mask) mask.columns = [0] self.append_test_results(mask, 'Duplicate timestamp', variable_name=False, min_failures=min_failures) # Drop duplicate timestamps self.df['TEMP'] = self.df.index #self.df.drop_duplicates(subset='TEMP', take_last=False, inplace=True) self.df.drop_duplicates(subset='TEMP', keep='first', inplace=True) # reindex timestamps missing = [] for i in rng: if i not in self.df.index: missing.append(i) self.df = self.df.reindex(index=rng) mask = pd.DataFrame(data=self.df.shape[0]*[False], index = self.df.index) mask.loc[missing] = True self.append_test_results(mask, 'Missing timestamp', variable_name=False, min_failures=min_failures) del self.df['TEMP']
[docs] def check_range(self, bound, key=None, specs={}, rolling_mean=1, min_failures=1): """ Check data range Parameters ---------- bound : list [lower bound, upper bound], None can be used in place of a lower or upper bound key : string (default = None) Translation dictonary key. If not specified, all columns are used in the test. specs : dict (default = {}) Constants used in bound rolling_mean : int (default = 1) Rolling mean window in number of timesteps min_failures : int (default = 1) Minimum number of consecutive failures required for reporting """ logger.info("Check variable ranges") if self.df.empty: logger.info("Empty database") return tfilter = self.tfilter # Isolate subset if key is not None if key is not None: try: df = self.df[self.trans[key]] except: logger.warning("Insufficient data for Check Range: " + key) return else: df = self.df # Compute moving average if rolling_mean > 1: df = pd.rolling_mean(df, rolling_mean) # Evaluate strings for bound values for i in range(len(bound)): if bound[i] in none_list: bound[i] = None elif type(bound[i]) is str: bound[i] = self.evaluate_string('', bound[i], specs) # Lower Bound if bound[0] is not None: mask = (df < bound[0]) if not tfilter.empty: mask[~tfilter] = False if mask.sum(axis=1).sum(axis=0) > 0: self.append_test_results(mask, 'Data < lower bound, '+str(bound[0]), min_failures=min_failures) # sub_df=df) # Upper Bound if bound[1] is not None: mask = (df > bound[1]) if not tfilter.empty: mask[~tfilter] = False if mask.sum(axis=1).sum(axis=0) > 0: self.append_test_results(mask, 'Data > upper bound, '+str(bound[1]), min_failures=min_failures) #sub_df=df)
[docs] def check_increment(self, bound, key=None, specs={}, increment=1, absolute_value=True, rolling_mean=1, min_failures=1): """ Check range on data increments Parameters ---------- bound : list [lower bound, upper bound], None can be used in place of a lower or upper bound key : string (default = None) Translation dictonary key. If not specified, all columns are used in the test. specs : dict (default = {}) Constants used in bound increment : int (default = 1) Timestep shift used to compute difference absolute_value : bool (default = True) Take the absolute value of the increment data rolling_mean : int (default = 1) Rolling mean window in number of timesteps min_failures : int (default = 1) Minimum number of consecutive failures required for reporting """ logger.info("Check increment range") if self.df.empty: logger.info("Empty database") return tfilter = self.tfilter # Isolate subset if key is not None if key is not None: try: df = self.df[self.trans[key]] except: logger.warning("Insufficient data for Check Increment: " + key) return else: df = self.df # Compute moving average if rolling_mean > 1: df = pd.rolling_mean(df, rolling_mean) # Compute interval if absolute_value: df = np.abs(df.diff(periods=increment)) else: df = df.diff(periods=increment) # Evaluate strings for bound values for i in range(len(bound)): if bound[i] in none_list: bound[i] = None elif type(bound[i]) is str: bound[i] = self.evaluate_string('', bound[i], specs) # Lower Bound if bound[0] is not None: mask = (df < bound[0]) if not tfilter.empty: mask[~tfilter] = False if mask.sum(axis=1).sum(axis=0) > 0: self.append_test_results(mask, 'Increment < lower bound, '+str(bound[0]), min_failures=min_failures) #sub_df=df) # Upper Bound if bound[1] is not None: mask = (df > bound[1]) if not tfilter.empty: mask[~tfilter] = False if mask.sum(axis=1).sum(axis=0) > 0: self.append_test_results(mask, 'Increment > upper bound, '+str(bound[1]), min_failures=min_failures) #sub_df=df)
[docs] def check_missing(self, key=None, min_failures=1): """ Check for missing data Parameters ---------- key : string (default = None) Translation dictonary key. If not specified, all columns are used in the test. min_failures : int (default = 1) Minimum number of consecutive failures required for reporting """ logger.info("Check for missing data") if self.df.empty: logger.info("Empty database") return tfilter = self.tfilter if key is not None: try: df = self.df[self.trans[key]] except: logger.warning("Insufficient data for Check Missing: " + key) return else: df = self.df mask = pd.isnull(df) # checks for np.nan, np.inf missing_timestamps = self.test_results[self.test_results['Error Flag'] == 'Missing timestamp'] for index, row in missing_timestamps.iterrows(): mask.loc[row['Start Date']:row['End Date']] = False if not tfilter.empty: mask[~tfilter] = False if mask.sum(axis=1).sum(axis=0) > 0: self.append_test_results(mask, 'Missing data', min_failures=min_failures)
[docs] def check_corrupt(self, corrupt_values, key=None, min_failures=1): """ Check for corrupt data Parameters ---------- corrupt_values : list List of corrupt data values key : string (default = None) Translation dictonary key. If not specified, all columns are used in the test. min_failures : int (default = 1) Minimum number of consecutive failures required for reporting """ logger.info("Check for corrupt data") if self.df.empty: logger.info("Empty database") return tfilter = self.tfilter if key is not None: try: df = self.df[self.trans[key]] except: logger.warning("Insufficient data for Check Corrupt: " + key) return else: df = self.df mask = pd.DataFrame(data = np.zeros(df.shape), index = df.index, columns = df.columns, dtype = bool) # all False for i in corrupt_values: mask = mask | (df == i) if not tfilter.empty: mask[~tfilter] = False if mask.sum(axis=1).sum(axis=0) > 0: self.df[mask] = np.nan self.append_test_results(mask, 'Corrupt data', min_failures=min_failures)
[docs] def evaluate_string(self, col_name, string_to_eval, specs={}): """ Returns the evaluated python equation written as a string (BETA). For each [keyword] in string_to_eval, [keyword] is first expanded to self.df[self.trans[keyword]], if that fails, then [keyword] is expanded to specs[keyword]. Parameters ---------- col_name : string Column name for the new signal string_to_eval : string String to evaluate specs : dict (default = {}) Constants used as keywords Returns -------- signal : pd.DataFrame or pd.Series DataFrame or Series with results of the evaluated string """ match = re.findall(r"\{(.*?)\}", string_to_eval) for m in set(match): m = m.replace('[','') # check for list if m == 'ELAPSED_TIME': ELAPSED_TIME = self.get_elapsed_time() string_to_eval = string_to_eval.replace("{"+m+"}",m) elif m == 'CLOCK_TIME': CLOCK_TIME = self.get_clock_time() string_to_eval = string_to_eval.replace("{"+m+"}",m) else: try: self.df[self.trans[m]] datastr = "self.df[self.trans['" + m + "']]" string_to_eval = string_to_eval.replace("{"+m+"}",datastr) except: try: specs[m] datastr = "specs['" + m + "']" string_to_eval = string_to_eval.replace("{"+m+"}",datastr) except: pass try: signal = eval(string_to_eval) if type(signal) is tuple: col_name = [col_name + " " + str(i+1) for i in range(len(signal))] signal = pd.concat(signal, axis=1) signal.columns = [col_name] signal.index = self.df.index elif type(signal) is float: signal = signal else: signal = pd.DataFrame(signal) if len(signal.columns) == 1: signal.columns = [col_name] else: signal.columns = [col_name + " " + str(i+1) for i in range(signal.shape[1])] signal.index = self.df.index except: signal = None logger.warning("Insufficient data for Composite Signals: " + col_name + ' -- ' + string_to_eval) return signal
[docs] def get_elapsed_time(self): """ Returns elapsed time in seconds from the dataframe index Returns -------- elapsed_time : pd.DataFrame Elapsed time of the dataframe index """ elapsed_time = ((self.df.index - self.df.index[0]).values)/1000000000 # convert ns to s elapsed_time = pd.DataFrame(data=elapsed_time, index=self.df.index, dtype=int) return elapsed_time
[docs] def get_clock_time(self): """ Returns clock time in seconds from the dataframe index Returns -------- clock_time : pd.DataFrame Clock time of the dataframe index """ clock_time = ((self.df.index - self.df.index[0]).values)/1000000000 # convert ns to s clock_time = pd.DataFrame(data=clock_time, index=self.df.index, dtype=int) clock_time = np.mod(clock_time, 86400) return clock_time
[docs] def get_test_results_mask(self, key=None): """ Return a mask of data-times that failed a quality control test Parameters ----------- key : string (default = None) Translation dictonary key. If not specified, all columns are used Returns -------- test_results_mask : pd.DataFrame DataFrame containing boolean values for each data point, True = data point pass all tests, False = data point did not pass at least one test. """ if self.df.empty: logger.info("Empty database") return if key is not None: try: df = self.df[self.trans[key]] except: logger.warning("Key not in dataframe") return else: df = self.df test_results_mask = ~pd.isnull(df) for i in self.test_results.index: system = self.test_results.loc[i, 'System Name'] variable = self.test_results.loc[i, 'Variable Name'] start_date = self.test_results.loc[i, 'Start Date'] end_date = self.test_results.loc[i, 'End Date'] error_flag = self.test_results.loc[i, 'Error Flag'] if error_flag in ['Nonmonotonic timestamp', 'Duplicate timestamp']: continue if variable == '': test_results_mask.loc[start_date:end_date] = False else: if system == '': column_name = variable else: column_name = system + ':'+ variable test_results_mask.loc[start_date:end_date,column_name] = False return test_results_mask