import pandas as pd import numpy as np from statsmodels.tsa.stattools import adfuller, acf, pacf from statsmodels.tsa.arima.model import ARIMA NEW_ANOMALY_COLUMN_NAME = 'anomaly' class ARIMAforOutbreakDetection: def __init__(self, window_size=7, stride=1, k=1.5, significance=0.05, max_lag=30): self.window_size = window_size self.stride = stride self.k = k self.significance = significance self.max_lag = max_lag def test_stationarity(self, ts_data, column=''): if isinstance(ts_data, pd.Series): adf_test = adfuller(ts_data, autolag='AIC') else: adf_test = adfuller(ts_data[column], autolag='AIC') return "Stationary" if adf_test[1] <= self.significance else "Non-Stationary" def make_stationary(self, dataframe, column): df_to_return = None result = self.test_stationarity(dataframe, column) if result == "Stationary": return dataframe diff_series = dataframe.copy() for diff_count in range(5): diff_series = diff_series.diff().fillna(0) if self.test_stationarity(diff_series, column) == "Stationary": return diff_series return diff_series def create_windows(self, df): windows, gts = [], [] for i in range(0, len(df) - self.window_size, self.stride): end_id = i + self.window_size windows.append(df.iloc[i:end_id, :]) gts.append(df.iloc[end_id, :]) return np.stack(windows), np.stack(gts) def find_p_q(self, series): N = len(series) acf_values, _ = acf(series, nlags=self.max_lag, alpha=self.significance, fft=False) pacf_values, _ = pacf(series, nlags=self.max_lag, alpha=self.significance) threshold = 1.96 / np.sqrt(N) def find_last_consecutive_outlier(values): for i in range(1, len(values)): if values[i] < 0 or (values[i] > 0 and abs(values[i]) < threshold): return i return len(values) - 1 return find_last_consecutive_outlier(pacf_values), find_last_consecutive_outlier(acf_values) def detect_anomalies(self, dataset, news_or_cases='news'): stationary_data = self.make_stationary(dataset, news_or_cases) p, q = self.find_p_q(stationary_data[news_or_cases]) anomalies, means, stdevs, residuals, predictions, gts = self._train_arima_model(stationary_data, p, q) result_df = self._prepare_resulting_dataframe( residuals, means, stdevs, dataset.iloc[self.window_size:], anomalies, gts, predictions ) return self._postprocess_anomalies(result_df, news_or_cases), NEW_ANOMALY_COLUMN_NAME def _train_arima_model(self, dataset, p, q): predictions, residuals, means, stdevs, anomalies = [], [], [], [], [] windows, gts = self.create_windows(dataset) for window, gt in zip(windows, gts): model = ARIMA(window, order=(p, 0, q)) model.initialize_approximate_diffuse() fit = model.fit() pred = fit.forecast(steps=1)[0] residual = np.abs(gt - pred) mu, std = np.mean(fit.resid), np.std(fit.resid) anomalies.append( 1 if residual > mu + self.k * std or residual < mu - self.k * std else 0 ) means.append(mu) stdevs.append(std) residuals.append(residual) predictions.append(pred) return anomalies, means, stdevs, residuals, predictions, gts def _prepare_resulting_dataframe(self, residuals, means, stdevs, original_dataset, anomalies, gts, predictions): result_df = original_dataset.copy() result_df['residuals'] = residuals result_df['mu'] = means result_df['sigma'] = stdevs result_df['anomaly'] = anomalies result_df['gts_diff'] = gts result_df['pred_diff'] = predictions return result_df def _postprocess_anomalies(self, dataframe, col_name='news'): dataframe['derivative'] = dataframe[col_name].diff().fillna(0) dataframe['new_anomaly'] = [ 0 if row.derivative < 0 and row.anomaly == 1 else row.anomaly for _, row in dataframe.iterrows() ] return dataframe