import torch import numpy as np import pandas as pd import torch.utils.data as data_utils from sklearn.preprocessing import MinMaxScaler from .lstm_model import LstmModel, testing PRETRAINED_MODEL_N_CHANNELS = 1 PRETRAINED_MODEL_Z_SIZE = 32 class LSTMforOutbreakDetection: def __init__( self, checkpoint_path=None, n_channels=PRETRAINED_MODEL_N_CHANNELS, z_size=PRETRAINED_MODEL_Z_SIZE, device='cpu', window=7, batch_size=32, k=1.5, percentile=95, threshold_method=0 ): self.device = torch.device(device) self.window = window self.batch_size = batch_size self.n_channels = n_channels self.z_size = z_size self.scaler = MinMaxScaler(feature_range=(0,1)) self.k = k self.percentile = percentile self.threshold_method = threshold_method if checkpoint_path: self.model = self._load_model(checkpoint_path) def _load_model(self, checkpoint_path): model = LstmModel(self.n_channels, self.z_size) model = model.to(self.device) model.load_state_dict(torch.load(checkpoint_path, map_location=self.device)) return model def create_test_sequences(self, dataframe, time_steps, news_or_cases='news'): if news_or_cases not in ['news', 'cases']: raise ValueError("news_or_cases should be either 'news' or 'cases'") output, output2 = [], [] dataframe[[news_or_cases]] = self.scaler.fit_transform(dataframe[[news_or_cases]]) norm = np.array(dataframe[[news_or_cases]]).astype(float) for i in range(len(norm)): end_ix = i + time_steps if end_ix > len(norm)-1: break seq_x, seq_y = norm[i:end_ix, :], norm[end_ix, 0] output.append(seq_x) output2.append(seq_y) return np.stack(output), np.stack(output2) def prepare_input_dataframe(self, dataframe, news_column_name='news'): X_test, y_test = self.create_test_sequences(dataframe, self.window, news_column_name) test_loader = torch.utils.data.DataLoader( data_utils.TensorDataset( torch.from_numpy(X_test).float(), torch.from_numpy(y_test).float() ), batch_size=self.batch_size, shuffle=False, num_workers=0 ) return test_loader, y_test def predict(self, dataframe, news_column_name='news'): test_loader, y_test = self.prepare_input_dataframe(dataframe, news_column_name) results, w = testing(self.model, test_loader, self.device) forecast_test = np.concatenate([ torch.stack(w[:-1]).flatten().detach().cpu().numpy(), w[-1].flatten().detach().cpu().numpy() ]) test_df = dataframe[self.window:].copy() test_df['y_test'] = y_test test_df['pred_forec'] = forecast_test test_df['abs_loss'] = np.abs(test_df.y_test - test_df.pred_forec) test_df['rel_loss'] = np.abs((test_df['pred_forec'] - test_df['y_test']) / (1 + test_df['pred_forec'])) test_df['diff'] = test_df['y_test'] - test_df['pred_forec'] return test_df @staticmethod def _iqr_rolling(timeseries, k): q1, q3 = np.percentile(timeseries, [25, 75]) iqr = q3 - q1 return q3 + k * iqr def windowed_iqr(self, df, k, type_of_loss='diff'): peaks = {} for i in range(len(df)): end_ix = i + self.window if end_ix > len(df)-1: break seq_x = df.iloc[i:end_ix, :] ub = self._iqr_rolling(seq_x[type_of_loss], k) for j in seq_x.index: condition = int(seq_x.loc[j, type_of_loss] > ub) peaks.setdefault(f'{j}', []).append(condition) return {k: 1 if sum(v) > 0 else 0 for k, v in peaks.items()} def get_perc_threshold(self, test_df, percentile, col='abs_loss'): if col not in ['abs_loss', 'loss']: raise ValueError("col should be either 'abs_loss' or 'loss'") test1 = test_df[:-1].copy() anom_perc_loss = {} for i in range(len(test_df)): end_ix = i + self.window if end_ix > len(test_df)-1: break seq_x = test_df.iloc[i:end_ix, :].copy() mae = seq_x['abs_loss'].values if col == 'abs_loss' else seq_x['y_test'] - seq_x['pred_forec'] threshold = np.percentile(mae, percentile) seq_x['threshold'] = threshold for j in seq_x.index: condition = int(seq_x.loc[j, col] > seq_x.loc[j, 'threshold']) anom_perc_loss.setdefault(f'{j}', []).append(condition) final_anom = {k: 1 if sum(v) > 0 else 0 for k, v in anom_perc_loss.items()} new_col = 'anom_perc_abs_loss' if col == 'abs_loss' else 'anom_perc_diff_gt_pred' test1[new_col] = pd.Series(final_anom) return test1 def postprocess_anomalies(self, test_df, new_col, old_col, news_or_cases): test_df = test_df.copy() test_df['derivative'] = test_df[news_or_cases].diff().fillna(0) test_df[new_col] = [0 if v.derivative < 0 and v[old_col] == 1 else v[old_col] for k, v in test_df.iterrows()] return test_df def detect_anomalies(self, test_df, news_or_cases='news'): """ Detect anomalies using different methods: 0: IQR on (ground truth - forecast) 1: IQR on |ground truth - forecast| 2: IQR on |ground truth - forecast|/forecast 3: Percentile threshold on absolute loss 4: Percentile threshold on raw loss input parameters: k (1-3), threshold_method, percentile """ test_df = test_df.copy() test = self.predict(test_df, news_column_name=news_or_cases) if self.threshold_method in [0, 1, 2]: loss_type = {0: 'diff', 1: 'abs_loss', 2: 'rel_loss'}[self.threshold_method] iqr_suffix = {0: 'f_iqr', 1: 'abs_iqr', 2: 'rel_iqr'}[self.threshold_method] new_label = {0: 'f_new_label', 1: 'abs_new_label', 2: 'rel_new_label'}[self.threshold_method] peaks = self.windowed_iqr(test, self.k, loss_type) peak_series = pd.Series(peaks) peak_series.index = pd.to_datetime(peak_series.index) test[iqr_suffix] = peak_series test = self.postprocess_anomalies(test, new_label, iqr_suffix, news_or_cases) return test, new_label elif self.threshold_method in [3, 4]: loss_type = 'abs_loss' if self.threshold_method == 3 else 'loss' new_label = 'new_anom_absl' if self.threshold_method == 3 else 'new_anom_diff' old_label = 'anom_perc_abs_loss' if self.threshold_method == 3 else 'anom_perc_diff_gt_pred' test = self.get_perc_threshold(test, self.percentile, loss_type) test = self.postprocess_anomalies(test, new_label, old_label, news_or_cases) return test, new_label raise ValueError("threshold_method must be between 0 and 4")