Spaces:
Sleeping
Sleeping
import streamlit as st | |
import pandas as pd | |
import numpy as np | |
import plotly.express as px | |
from sklearn.ensemble import IsolationForest | |
import io | |
from fpdf import FPDF | |
import requests | |
import PyPDF2 | |
import tempfile | |
import os | |
# ------------------------------- | |
# Page Configuration and Header | |
# ------------------------------- | |
st.set_page_config(page_title="π WiFi Anomaly Detection", layout="wide") | |
st.title("π WiFi Anomaly Detection System") | |
st.markdown(""" | |
> "Innovation distinguishes between a leader and a follower." β *Steve Jobs* | |
> "The future depends on what you do today." β *Mahatma Gandhi* | |
""") | |
st.markdown(""" | |
Welcome to the WiFi Anomaly Detection System. This application uses AI to proactively detect abnormal behavior in Public Wi-Fi systems, identifying suspicious spikes that may indicate hacking attempts. Letβs build a more secure network, one anomaly at a time! | |
""") | |
# ------------------------------- | |
# Define Helper Functions | |
# ------------------------------- | |
def load_data(uploaded_file): | |
file_type = uploaded_file.name.split('.')[-1].lower() | |
if file_type == 'csv': | |
try: | |
df = pd.read_csv(uploaded_file) | |
return df, "csv" | |
except Exception as e: | |
st.error("Error reading CSV file.") | |
return None, None | |
elif file_type == 'txt': | |
try: | |
try: | |
df = pd.read_csv(uploaded_file, sep=",") | |
except: | |
df = pd.read_csv(uploaded_file, sep="\s+") | |
return df, "txt" | |
except Exception as e: | |
st.error("Error reading TXT file.") | |
return None, None | |
elif file_type == 'pdf': | |
try: | |
pdf_reader = PyPDF2.PdfReader(uploaded_file) | |
text = "" | |
for page in pdf_reader.pages: | |
text += page.extract_text() | |
df = pd.DataFrame({"text": [text]}) | |
return df, "pdf" | |
except Exception as e: | |
st.error("Error reading PDF file.") | |
return None, None | |
else: | |
st.error("Unsupported file type.") | |
return None, None | |
def run_local_anomaly_detection(df): | |
# Use IsolationForest for numeric data anomaly detection. | |
numeric_cols = df.select_dtypes(include=[np.number]).columns | |
if len(numeric_cols) < 2: | |
st.warning("Not enough numeric columns for anomaly detection. (Need at least 2 numeric columns)") | |
return df | |
X = df[numeric_cols].fillna(0) | |
model = IsolationForest(contamination=0.1, random_state=42) | |
model.fit(X) | |
df['anomaly'] = model.predict(X) | |
df['anomaly_flag'] = df['anomaly'].apply(lambda x: "π¨ Anomaly" if x == -1 else "β Normal") | |
return df | |
def call_groq_api(df): | |
# ----- Dummy Groq API integration ----- | |
# Replace this dummy call with an actual Groq API call as needed. | |
df = run_local_anomaly_detection(df) | |
return df | |
def generate_plots(df): | |
# Generate 2D and 3D plots from the first numeric columns | |
numeric_cols = df.select_dtypes(include=[np.number]).columns | |
fig2d, fig3d = None, None | |
if len(numeric_cols) >= 2: | |
fig2d = px.scatter(df, x=numeric_cols[0], y=numeric_cols[1], | |
color='anomaly_flag', | |
title="π 2D Anomaly Detection Plot") | |
if len(numeric_cols) >= 3: | |
fig3d = px.scatter_3d(df, x=numeric_cols[0], y=numeric_cols[1], z=numeric_cols[2], | |
color='anomaly_flag', | |
title="π 3D Anomaly Detection Plot") | |
return fig2d, fig3d | |
def generate_pdf_report(summary_text, fig2d, fig3d): | |
pdf = FPDF() | |
pdf.add_page() | |
pdf.set_font("Arial", 'B', 16) | |
pdf.cell(0, 10, "WiFi Anomaly Detection Report", ln=True) | |
pdf.ln(10) | |
pdf.set_font("Arial", size=12) | |
pdf.multi_cell(0, 10, summary_text) | |
pdf.ln(10) | |
# Save figures as temporary image files using Kaleido (Plotly's image export engine) | |
image_files = [] | |
if fig2d is not None: | |
with tempfile.NamedTemporaryFile(delete=False, suffix=".png") as tmpfile: | |
fig2d.write_image(tmpfile.name) | |
image_files.append(tmpfile.name) | |
if fig3d is not None: | |
with tempfile.NamedTemporaryFile(delete=False, suffix=".png") as tmpfile: | |
fig3d.write_image(tmpfile.name) | |
image_files.append(tmpfile.name) | |
# Add each image to the PDF | |
for image in image_files: | |
pdf.image(image, w=pdf.w - 40) | |
pdf.ln(10) | |
# Clean up temporary image files | |
for image in image_files: | |
os.remove(image) | |
pdf_data = pdf.output(dest="S").encode("latin1") | |
return pdf_data | |
# ------------------------------- | |
# Initialize Session State Variables | |
# ------------------------------- | |
if "step" not in st.session_state: | |
st.session_state.step = "upload" | |
if "df" not in st.session_state: | |
st.session_state.df = None | |
if "df_processed" not in st.session_state: | |
st.session_state.df_processed = None | |
if "fig2d" not in st.session_state: | |
st.session_state.fig2d = None | |
if "fig3d" not in st.session_state: | |
st.session_state.fig3d = None | |
if "summary_text" not in st.session_state: | |
st.session_state.summary_text = "" | |
# ------------------------------- | |
# Sidebar: Step Buttons | |
# ------------------------------- | |
st.sidebar.title("π§ Application Steps") | |
if st.sidebar.button("π Upload File"): | |
st.session_state.step = "upload" | |
if st.sidebar.button("π Data Visualization"): | |
st.session_state.step = "viz" | |
if st.sidebar.button("π Statistic Analysis"): | |
st.session_state.step = "stats" | |
if st.sidebar.button("β¬οΈ Download Report"): | |
st.session_state.step = "download" | |
# ------------------------------- | |
# Main Workflow Based on Step | |
# ------------------------------- | |
if st.session_state.step == "upload": | |
st.subheader("Step 1: Upload Your Data File") | |
st.markdown("Please upload a CSV, TXT, or PDF file with network data. The expected columns for CSV/TXT files are:") | |
st.code("['traffic', 'latency', 'packet_loss']", language="python") | |
uploaded_file = st.file_uploader("Choose a file", type=["csv", "txt", "pdf"]) | |
if uploaded_file is not None: | |
df, file_type = load_data(uploaded_file) | |
if df is not None: | |
st.session_state.df = df | |
st.success("File uploaded and processed successfully!") | |
if file_type == "pdf": | |
st.subheader("Extracted Text from PDF:") | |
st.text_area("PDF Content", df["text"][0], height=300) | |
else: | |
st.subheader("Data Preview:") | |
st.dataframe(df.head()) | |
else: | |
st.info("Awaiting file upload. π") | |
elif st.session_state.step == "viz": | |
st.subheader("Step 2: Data Visualization") | |
if st.session_state.df is None: | |
st.error("Please upload a file first in the 'Upload File' step.") | |
else: | |
# Process the data if not already done | |
if st.session_state.df_processed is None: | |
# Here, you can choose between the local model or Groq API; we use the local model for demo. | |
st.session_state.df_processed = run_local_anomaly_detection(st.session_state.df) | |
fig2d, fig3d = generate_plots(st.session_state.df_processed) | |
st.session_state.fig2d = fig2d | |
st.session_state.fig3d = fig3d | |
if fig2d: | |
st.plotly_chart(fig2d, use_container_width=True) | |
if fig3d: | |
st.plotly_chart(fig3d, use_container_width=True) | |
elif st.session_state.step == "stats": | |
st.subheader("Step 3: Statistic Analysis") | |
if st.session_state.df_processed is None: | |
st.error("Data has not been processed yet. Please complete the Data Visualization step first.") | |
else: | |
df_result = st.session_state.df_processed | |
anomaly_count = (df_result['anomaly'] == -1).sum() | |
total_count = df_result.shape[0] | |
st.session_state.summary_text = f"Total records: {total_count}\nDetected anomalies: {anomaly_count}" | |
st.markdown("**Anomaly Detection Summary:**") | |
st.text(st.session_state.summary_text) | |
st.markdown("**Detailed Data:**") | |
st.dataframe(df_result.head()) | |
st.markdown("**Descriptive Statistics:**") | |
st.dataframe(df_result.describe()) | |
elif st.session_state.step == "download": | |
st.subheader("Step 4: Download PDF Report") | |
if st.session_state.df_processed is None or (st.session_state.fig2d is None and st.session_state.fig3d is None): | |
st.error("Please complete the previous steps (Upload, Visualization, Statistic Analysis) before downloading the report.") | |
else: | |
pdf_data = generate_pdf_report(st.session_state.summary_text, st.session_state.fig2d, st.session_state.fig3d) | |
st.download_button("β¬οΈ Download PDF Report", data=pdf_data, | |
file_name="wifi_anomaly_report.pdf", | |
mime="application/pdf") | |