JuanJoseMV's picture
add methods for each strategy
9485251
import torch
import numpy as np
import pandas as pd
import torch.utils.data as data_utils
from sklearn.preprocessing import MinMaxScaler
from .lstm_model import LstmModel, testing
PRETRAINED_MODEL_N_CHANNELS = 1
PRETRAINED_MODEL_Z_SIZE = 32
class LSTMforOutbreakDetection:
def __init__(
self,
checkpoint_path=None,
n_channels=PRETRAINED_MODEL_N_CHANNELS,
z_size=PRETRAINED_MODEL_Z_SIZE,
device='cpu',
window=7,
batch_size=32,
k=1.5,
percentile=95,
threshold_method=0
):
self.device = torch.device(device)
self.window = window
self.batch_size = batch_size
self.n_channels = n_channels
self.z_size = z_size
self.scaler = MinMaxScaler(feature_range=(0,1))
self.k = k
self.percentile = percentile
self.threshold_method = threshold_method
if checkpoint_path:
self.model = self._load_model(checkpoint_path)
def _load_model(self, checkpoint_path):
model = LstmModel(self.n_channels, self.z_size)
model = model.to(self.device)
model.load_state_dict(torch.load(checkpoint_path, map_location=self.device))
return model
def create_test_sequences(self, dataframe, time_steps, news_or_cases='news'):
if news_or_cases not in ['news', 'cases']:
raise ValueError("news_or_cases should be either 'news' or 'cases'")
output, output2 = [], []
dataframe[[news_or_cases]] = self.scaler.fit_transform(dataframe[[news_or_cases]])
norm = np.array(dataframe[[news_or_cases]]).astype(float)
for i in range(len(norm)):
end_ix = i + time_steps
if end_ix > len(norm)-1:
break
seq_x, seq_y = norm[i:end_ix, :], norm[end_ix, 0]
output.append(seq_x)
output2.append(seq_y)
return np.stack(output), np.stack(output2)
def prepare_input_dataframe(self, dataframe, news_column_name='news'):
X_test, y_test = self.create_test_sequences(dataframe, self.window, news_column_name)
test_loader = torch.utils.data.DataLoader(
data_utils.TensorDataset(
torch.from_numpy(X_test).float(),
torch.from_numpy(y_test).float()
),
batch_size=self.batch_size,
shuffle=False,
num_workers=0
)
return test_loader, y_test
def predict(self, dataframe, news_column_name='news'):
test_loader, y_test = self.prepare_input_dataframe(dataframe, news_column_name)
results, w = testing(self.model, test_loader, self.device)
forecast_test = np.concatenate([
torch.stack(w[:-1]).flatten().detach().cpu().numpy(),
w[-1].flatten().detach().cpu().numpy()
])
test_df = dataframe[self.window:].copy()
test_df['y_test'] = y_test
test_df['pred_forec'] = forecast_test
test_df['abs_loss'] = np.abs(test_df.y_test - test_df.pred_forec)
test_df['rel_loss'] = np.abs((test_df['pred_forec'] - test_df['y_test']) / (1 + test_df['pred_forec']))
test_df['diff'] = test_df['y_test'] - test_df['pred_forec']
return test_df
@staticmethod
def _iqr_rolling(timeseries, k):
q1, q3 = np.percentile(timeseries, [25, 75])
iqr = q3 - q1
return q3 + k * iqr
def windowed_iqr(self, df, k, type_of_loss='diff'):
peaks = {}
for i in range(len(df)):
end_ix = i + self.window
if end_ix > len(df)-1:
break
seq_x = df.iloc[i:end_ix, :]
ub = self._iqr_rolling(seq_x[type_of_loss], k)
for j in seq_x.index:
condition = int(seq_x.loc[j, type_of_loss] > ub)
peaks.setdefault(f'{j}', []).append(condition)
return {k: 1 if sum(v) > 0 else 0 for k, v in peaks.items()}
def get_perc_threshold(self, test_df, percentile, col='abs_loss'):
if col not in ['abs_loss', 'loss']:
raise ValueError("col should be either 'abs_loss' or 'loss'")
test1 = test_df[:-1].copy()
anom_perc_loss = {}
for i in range(len(test_df)):
end_ix = i + self.window
if end_ix > len(test_df)-1:
break
seq_x = test_df.iloc[i:end_ix, :].copy()
mae = seq_x['abs_loss'].values if col == 'abs_loss' else seq_x['y_test'] - seq_x['pred_forec']
threshold = np.percentile(mae, percentile)
seq_x['threshold'] = threshold
for j in seq_x.index:
condition = int(seq_x.loc[j, col] > seq_x.loc[j, 'threshold'])
anom_perc_loss.setdefault(f'{j}', []).append(condition)
final_anom = {k: 1 if sum(v) > 0 else 0 for k, v in anom_perc_loss.items()}
new_col = 'anom_perc_abs_loss' if col == 'abs_loss' else 'anom_perc_diff_gt_pred'
test1[new_col] = pd.Series(final_anom)
return test1
def postprocess_anomalies(self, test_df, new_col, old_col, news_or_cases):
test_df = test_df.copy()
test_df['derivative'] = test_df[news_or_cases].diff().fillna(0)
test_df[new_col] = [0 if v.derivative < 0 and v[old_col] == 1 else v[old_col]
for k, v in test_df.iterrows()]
return test_df
def detect_anomalies(self, test_df, news_or_cases='news'):
"""
Detect anomalies using different methods:
0: IQR on (ground truth - forecast)
1: IQR on |ground truth - forecast|
2: IQR on |ground truth - forecast|/forecast
3: Percentile threshold on absolute loss
4: Percentile threshold on raw loss
input parameters: k (1-3), threshold_method, percentile
"""
test_df = test_df.copy()
test = self.predict(test_df, news_column_name=news_or_cases)
if self.threshold_method in [0, 1, 2]:
loss_type = {0: 'diff', 1: 'abs_loss', 2: 'rel_loss'}[self.threshold_method]
iqr_suffix = {0: 'f_iqr', 1: 'abs_iqr', 2: 'rel_iqr'}[self.threshold_method]
new_label = {0: 'f_new_label', 1: 'abs_new_label', 2: 'rel_new_label'}[self.threshold_method]
peaks = self.windowed_iqr(test, self.k, loss_type)
peak_series = pd.Series(peaks)
peak_series.index = pd.to_datetime(peak_series.index)
test[iqr_suffix] = peak_series
test = self.postprocess_anomalies(test, new_label, iqr_suffix, news_or_cases)
return test, new_label
elif self.threshold_method in [3, 4]:
loss_type = 'abs_loss' if self.threshold_method == 3 else 'loss'
new_label = 'new_anom_absl' if self.threshold_method == 3 else 'new_anom_diff'
old_label = 'anom_perc_abs_loss' if self.threshold_method == 3 else 'anom_perc_diff_gt_pred'
test = self.get_perc_threshold(test, self.percentile, loss_type)
test = self.postprocess_anomalies(test, new_label, old_label, news_or_cases)
return test, new_label
raise ValueError("threshold_method must be between 0 and 4")