File size: 6,014 Bytes
06e3227 ed84b2f 06e3227 ed84b2f 06e3227 ed84b2f 06e3227 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 |
from io import BytesIO
import numpy as np
import pandas as pd
import plotly.graph_objs as go
import ruptures as rpt
import streamlit as st
st.title("KPIsAnomaly Detection")
uploaded_file = st.file_uploader("Upload KPI file", type=["csv", "xlsx"])
penalty = st.number_input("Penalty", min_value=1.0, max_value=100.0, value=2.5)
@st.cache_data(show_spinner="Analyzing anomalies...")
def detect_anomalies(df: pd.DataFrame, penalty: int):
# Cleaning
df = df.rename(
columns={
df.columns[0]: "date",
df.columns[1]: "ctrl",
df.columns[2]: "bts",
df.columns[3]: "cell",
df.columns[4]: "DN",
}
)
df["date"] = pd.to_datetime(df["date"], errors="coerce")
df = df.dropna(subset=["date", "cell"])
non_kpi_columns = ["date", "cell", "bts", "ctrl", "DN"]
kpi_columns = [col for col in df.columns if col not in non_kpi_columns]
anomaly_dict = {}
anomaly_data = {}
def detect_change_points(series, model="rbf", penalty=penalty):
algo = rpt.Pelt(model=model).fit(series)
result = algo.predict(pen=penalty)
return result[:-1]
def process_kpi_cell(df_cell, kpi, cell):
df_kpi = (
df_cell[["date", kpi]].dropna().sort_values("date").reset_index(drop=True)
)
if len(df_kpi) < 30:
return None
series = df_kpi[kpi].values.copy()
# if len(series) > 0:
# series[-1] *= 10 # Boost last value to give it more weight
try:
change_indices = detect_change_points(series)
if not change_indices:
return None
df_kpi["change_point"] = False
for idx in change_indices:
if 0 <= idx < len(df_kpi):
df_kpi.loc[idx, "change_point"] = True
df_kpi["cell"] = cell
change_indices = [0] + change_indices + [len(df_kpi)]
segments = [
series[change_indices[i] : change_indices[i + 1]]
for i in range(len(change_indices) - 1)
]
if len(segments) < 2:
return None
initial_mean = np.mean(segments[0])
final_mean = np.mean(segments[-1])
if abs(final_mean - initial_mean) > 0.1 * abs(initial_mean):
# Attach full history, not just final segment
df_kpi["initial_mean"] = initial_mean
df_kpi["final_mean"] = final_mean
return df_kpi
else:
return None
except Exception as e:
print(f"Error {cell}-{kpi}: {e}")
return None
for kpi in kpi_columns:
anomalies = []
for cell, group in df.groupby("cell"):
result = process_kpi_cell(group, kpi, cell)
if result is not None:
anomalies.append(cell)
anomaly_data[(kpi, cell)] = result
if anomalies:
anomaly_dict[kpi] = anomalies
return anomaly_dict, anomaly_data, kpi_columns
if uploaded_file:
# Detect if file is csv or excel
if uploaded_file.name.endswith(".csv"):
df = pd.read_csv(uploaded_file, delimiter=";")
else:
df = pd.read_excel(uploaded_file, sheet_name=0, skiprows=[1])
anomaly_dict, anomaly_data, all_kpis = detect_anomalies(df, penalty)
if not anomaly_dict:
st.info("No anomalies detected.")
else:
st.success(f"{len(anomaly_dict)} KPI(s) have un-recovered anomalies detected.")
@st.fragment
def selection_and_plot():
selected_kpi = st.selectbox("KPI with anomalies", list(anomaly_dict.keys()))
selected_cell = st.selectbox("Affected cell", anomaly_dict[selected_kpi])
df_plot = anomaly_data[(selected_kpi, selected_cell)]
fig = go.Figure()
fig.add_trace(
go.Scatter(
x=df_plot["date"],
y=df_plot[selected_kpi],
mode="lines+markers",
name="KPI",
)
)
fig.add_trace(
go.Scatter(
x=df_plot[df_plot["change_point"]]["date"],
y=df_plot[df_plot["change_point"]][selected_kpi],
mode="markers",
name="Change Point",
marker=dict(color="red", size=10),
)
)
fig.add_hline(
y=df_plot["initial_mean"].iloc[0],
line_dash="dot",
line_color="gray",
annotation_text="Initial Mean",
)
fig.add_hline(
y=df_plot["final_mean"].iloc[0],
line_dash="dash",
line_color="black",
annotation_text="Final Mean",
)
fig.update_layout(
title=f"{selected_kpi} - {selected_cell}",
xaxis_title="Date",
yaxis_title=selected_kpi,
)
st.plotly_chart(fig, use_container_width=True)
@st.fragment
def export_button():
if st.button("Generate Excel file with anomalies"):
buffer = BytesIO()
with pd.ExcelWriter(buffer, engine="openpyxl") as writer:
for kpi, cells in anomaly_dict.items():
results = [anomaly_data[(kpi, c)] for c in cells]
df_final = pd.concat(results)
df_final.to_excel(writer, sheet_name=kpi[:31], index=False)
st.download_button(
label="Download Excel file",
data=buffer.getvalue(),
file_name="anomalies_kpi_2G.xlsx",
mime="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
type="primary",
)
selection_and_plot()
export_button()
|