|
import gradio as gr |
|
import pandas as pd |
|
from sklearn.metrics import classification_report |
|
|
|
def timestamp_wise_evaluation(anomalies_cases, anomalies_news, threshold): |
|
print(f"Classification report for threshold {threshold} (timestamp-wise evaluation):") |
|
print(classification_report(anomalies_cases, anomalies_news)) |
|
|
|
def tolerance_based_evaluation(anomalies_cases, anomalies_news, cases_df, news_df, threshold): |
|
Tp = 0 |
|
Fp = 0 |
|
Fn = 0 |
|
Tn = 0 |
|
for i in range(len(news_df)): |
|
news_an = news_df.iloc[i][anomalies_news] |
|
if news_an == 1: |
|
if i == len(news_df) - 1: |
|
if cases_df.iloc[i][anomalies_cases] == 1: |
|
Tp += 1 |
|
else: |
|
Fp += 1 |
|
elif i == len(news_df) - 2: |
|
if cases_df.iloc[i][anomalies_cases] == 1 or cases_df.iloc[i+1][anomalies_cases] == 1: |
|
Tp += 1 |
|
else: |
|
Fp += 1 |
|
else: |
|
if cases_df.iloc[i][anomalies_cases] == 1 or cases_df.iloc[i+1][anomalies_cases] == 1 or cases_df.iloc[i+2][anomalies_cases] == 1: |
|
Tp += 1 |
|
else: |
|
Fp += 1 |
|
else: |
|
if i == len(news_df) - 1: |
|
if cases_df.iloc[i][anomalies_cases] == 1: |
|
Fn += 1 |
|
else: |
|
Tn += 1 |
|
elif i == len(news_df) - 2: |
|
if cases_df.iloc[i][anomalies_cases] == 1: |
|
Fn += 1 |
|
else: |
|
Tn += 1 |
|
else: |
|
if cases_df.iloc[i][anomalies_cases] == 1: |
|
Fn += 1 |
|
else: |
|
Tn += 1 |
|
print(f"Tolerance-based evaluation for method {threshold}:") |
|
print(f"True Positives: {Tp}, False Positives: {Fp}, False Negatives: {Fn}, True Negatives: {Tn}") |
|
precision = Tp / (Tp + Fp) |
|
recall = Tp / (Tp + Fn) |
|
f1 = 2 * (precision * recall) / (precision + recall) |
|
print(f"Precision: {precision}, Recall: {recall}, F1: {f1}") |
|
|
|
|
|
def prepare_time_series_dataframe(df): |
|
"""Prepare dataframe for time series analysis by setting datetime index and renaming columns""" |
|
df.set_index(df.columns[0], inplace=True) |
|
|
|
try: |
|
df.index = pd.to_datetime(df.index) |
|
except ValueError: |
|
raise ValueError("The first column of the CSV file must be a datetime column.") |
|
|
|
df.rename(columns={df.columns[0]: "news"}, inplace=True) |
|
return df |
|
|
|
def update_controls(method): |
|
""" |
|
Updates the interactivity of control elements based on the selected method. |
|
|
|
Args: |
|
method (str): The selected anomaly detection method |
|
|
|
Returns: |
|
dict: Update configuration for Gradio components |
|
""" |
|
is_lstm = method == "LSTM" |
|
return [ |
|
gr.update(interactive=is_lstm), |
|
gr.update(interactive=is_lstm) |
|
] |