JuanJoseMV's picture
add methods for each strategy
9485251
import pandas as pd
import numpy as np
from statsmodels.tsa.stattools import adfuller, acf, pacf
from statsmodels.tsa.arima.model import ARIMA
NEW_ANOMALY_COLUMN_NAME = 'anomaly'
class ARIMAforOutbreakDetection:
def __init__(self, window_size=7, stride=1, k=1.5, significance=0.05, max_lag=30):
self.window_size = window_size
self.stride = stride
self.k = k
self.significance = significance
self.max_lag = max_lag
def test_stationarity(self, ts_data, column=''):
if isinstance(ts_data, pd.Series):
adf_test = adfuller(ts_data, autolag='AIC')
else:
adf_test = adfuller(ts_data[column], autolag='AIC')
return "Stationary" if adf_test[1] <= self.significance else "Non-Stationary"
def make_stationary(self, dataframe, column):
df_to_return = None
result = self.test_stationarity(dataframe, column)
if result == "Stationary":
return dataframe
diff_series = dataframe.copy()
for diff_count in range(5):
diff_series = diff_series.diff().fillna(0)
if self.test_stationarity(diff_series, column) == "Stationary":
return diff_series
return diff_series
def create_windows(self, df):
windows, gts = [], []
for i in range(0, len(df) - self.window_size, self.stride):
end_id = i + self.window_size
windows.append(df.iloc[i:end_id, :])
gts.append(df.iloc[end_id, :])
return np.stack(windows), np.stack(gts)
def find_p_q(self, series):
N = len(series)
acf_values, _ = acf(series, nlags=self.max_lag, alpha=self.significance, fft=False)
pacf_values, _ = pacf(series, nlags=self.max_lag, alpha=self.significance)
threshold = 1.96 / np.sqrt(N)
def find_last_consecutive_outlier(values):
for i in range(1, len(values)):
if values[i] < 0 or (values[i] > 0 and abs(values[i]) < threshold):
return i
return len(values) - 1
return find_last_consecutive_outlier(pacf_values), find_last_consecutive_outlier(acf_values)
def detect_anomalies(self, dataset, news_or_cases='news'):
stationary_data = self.make_stationary(dataset, news_or_cases)
p, q = self.find_p_q(stationary_data[news_or_cases])
anomalies, means, stdevs, residuals, predictions, gts = self._train_arima_model(stationary_data, p, q)
result_df = self._prepare_resulting_dataframe(
residuals, means, stdevs, dataset.iloc[self.window_size:],
anomalies, gts, predictions
)
return self._postprocess_anomalies(result_df, news_or_cases), NEW_ANOMALY_COLUMN_NAME
def _train_arima_model(self, dataset, p, q):
predictions, residuals, means, stdevs, anomalies = [], [], [], [], []
windows, gts = self.create_windows(dataset)
for window, gt in zip(windows, gts):
model = ARIMA(window, order=(p, 0, q))
model.initialize_approximate_diffuse()
fit = model.fit()
pred = fit.forecast(steps=1)[0]
residual = np.abs(gt - pred)
mu, std = np.mean(fit.resid), np.std(fit.resid)
anomalies.append(
1 if residual > mu + self.k * std or residual < mu - self.k * std else 0
)
means.append(mu)
stdevs.append(std)
residuals.append(residual)
predictions.append(pred)
return anomalies, means, stdevs, residuals, predictions, gts
def _prepare_resulting_dataframe(self, residuals, means, stdevs, original_dataset,
anomalies, gts, predictions):
result_df = original_dataset.copy()
result_df['residuals'] = residuals
result_df['mu'] = means
result_df['sigma'] = stdevs
result_df['anomaly'] = anomalies
result_df['gts_diff'] = gts
result_df['pred_diff'] = predictions
return result_df
def _postprocess_anomalies(self, dataframe, col_name='news'):
dataframe['derivative'] = dataframe[col_name].diff().fillna(0)
dataframe['new_anomaly'] = [
0 if row.derivative < 0 and row.anomaly == 1 else row.anomaly
for _, row in dataframe.iterrows()
]
return dataframe