Spaces:
Sleeping
Sleeping
import streamlit as st | |
import pandas as pd | |
import numpy as np | |
import plotly.express as px | |
from sklearn.ensemble import IsolationForest | |
import io | |
from fpdf import FPDF | |
import requests | |
import PyPDF2 | |
import tempfile | |
import os | |
st.set_page_config(page_title="π WiFi Anomaly Detection", layout="wide") | |
# ------------------------------- | |
# WiFi Anomaly Detection Overview | |
# ------------------------------- | |
st.title("π WiFi Anomaly Detection Overview") | |
st.markdown(""" | |
**Detect anomalies in Public Wi-Fi Systems**: | |
Identify suspicious spikes that may indicate hacking attempts, ensuring proactive maintenance and reliable network performance. | |
""") | |
st.markdown("### How it Works:") | |
st.markdown(""" | |
- **Data Collection:** Upload network logs in CSV, TXT, or PDF format. | |
- **Anomaly Detection:** Use AI algorithms to automatically spot unusual patterns. | |
- **Visualization:** Review data in 2D and 3D interactive charts. | |
- **Report Generation:** Download a comprehensive PDF report with summaries and visuals. | |
""") | |
# ------------------------------- | |
# Sidebar: File Upload & Options | |
# ------------------------------- | |
st.sidebar.header("π Upload Data File") | |
uploaded_file = st.sidebar.file_uploader("Choose a file", type=["csv", "txt", "pdf"]) | |
st.sidebar.markdown("---") | |
model_option = st.sidebar.radio("Select Anomaly Detection Model", ("Local Model", "Groq API")) | |
# ------------------------------- | |
# Helper Functions | |
# ------------------------------- | |
def load_data(uploaded_file): | |
file_type = uploaded_file.name.split('.')[-1].lower() | |
if file_type == 'csv': | |
try: | |
df = pd.read_csv(uploaded_file) | |
return df, "csv" | |
except Exception as e: | |
st.error("Error reading CSV file.") | |
return None, None | |
elif file_type == 'txt': | |
try: | |
# Try comma separated first; if not, try whitespace separation | |
try: | |
df = pd.read_csv(uploaded_file, sep=",") | |
except: | |
df = pd.read_csv(uploaded_file, sep="\s+") | |
return df, "txt" | |
except Exception as e: | |
st.error("Error reading TXT file.") | |
return None, None | |
elif file_type == 'pdf': | |
try: | |
pdf_reader = PyPDF2.PdfReader(uploaded_file) | |
text = "" | |
for page in pdf_reader.pages: | |
text += page.extract_text() | |
# For demonstration, create a DataFrame with one text column | |
df = pd.DataFrame({"text": [text]}) | |
return df, "pdf" | |
except Exception as e: | |
st.error("Error reading PDF file.") | |
return None, None | |
else: | |
st.error("Unsupported file type.") | |
return None, None | |
def run_local_anomaly_detection(df): | |
# Use IsolationForest for numeric data anomaly detection. | |
numeric_cols = df.select_dtypes(include=[np.number]).columns | |
if len(numeric_cols) < 2: | |
st.warning("Not enough numeric columns for anomaly detection. (Need at least 2 numeric columns)") | |
return df | |
X = df[numeric_cols].fillna(0) | |
model = IsolationForest(contamination=0.1, random_state=42) | |
model.fit(X) | |
# Model returns -1 for anomalies, 1 for normal records | |
df['anomaly'] = model.predict(X) | |
df['anomaly_flag'] = df['anomaly'].apply(lambda x: "π¨ Anomaly" if x == -1 else "β Normal") | |
return df | |
def call_groq_api(df): | |
# ----- Dummy Groq API integration ----- | |
# In a real implementation, you would send your data via a POST request like: | |
# response = requests.post("https://api.groq.ai/detect", json=df.to_dict(orient="records")) | |
# and then process the JSON response. | |
# For demo purposes, we simply call the local model. | |
# ---------------------------------------- | |
df = run_local_anomaly_detection(df) | |
return df | |
def generate_plots(df): | |
# Create 2D and 3D scatter plots based on the first numeric columns | |
numeric_cols = df.select_dtypes(include=[np.number]).columns | |
fig2d, fig3d = None, None | |
if len(numeric_cols) >= 2: | |
fig2d = px.scatter(df, x=numeric_cols[0], y=numeric_cols[1], | |
color='anomaly_flag', | |
title="π 2D Anomaly Detection Plot") | |
if len(numeric_cols) >= 3: | |
fig3d = px.scatter_3d(df, x=numeric_cols[0], y=numeric_cols[1], z=numeric_cols[2], | |
color='anomaly_flag', | |
title="π 3D Anomaly Detection Plot") | |
return fig2d, fig3d | |
def generate_pdf_report(summary_text, fig2d, fig3d): | |
pdf = FPDF() | |
pdf.add_page() | |
pdf.set_font("Arial", 'B', 16) | |
pdf.cell(0, 10, "WiFi Anomaly Detection Report", ln=True) | |
pdf.ln(10) | |
pdf.set_font("Arial", size=12) | |
pdf.multi_cell(0, 10, summary_text) | |
pdf.ln(10) | |
# Save figures as temporary image files using Kaleido (Plotly's image export engine) | |
image_files = [] | |
if fig2d is not None: | |
with tempfile.NamedTemporaryFile(delete=False, suffix=".png") as tmpfile: | |
fig2d.write_image(tmpfile.name) | |
image_files.append(tmpfile.name) | |
if fig3d is not None: | |
with tempfile.NamedTemporaryFile(delete=False, suffix=".png") as tmpfile: | |
fig3d.write_image(tmpfile.name) | |
image_files.append(tmpfile.name) | |
# Add each image to the PDF | |
for image in image_files: | |
pdf.image(image, w=pdf.w - 40) | |
pdf.ln(10) | |
# Clean up temporary image files | |
for image in image_files: | |
os.remove(image) | |
pdf_output = io.BytesIO() | |
pdf.output(pdf_output) | |
pdf_data = pdf_output.getvalue() | |
pdf_output.close() | |
return pdf_data | |
# ------------------------------- | |
# Main Workflow | |
# ------------------------------- | |
if uploaded_file is not None: | |
df, file_type = load_data(uploaded_file) | |
if df is not None: | |
if file_type == "pdf": | |
st.subheader("π Extracted Text from PDF:") | |
st.text_area("PDF Content", df["text"][0], height=300) | |
else: | |
st.subheader("π Data Preview:") | |
st.dataframe(df.head()) | |
if st.button("βΆοΈ Check Data Visualization & Summary"): | |
if file_type in ["csv", "txt"]: | |
# Run the selected anomaly detection method | |
if model_option == "Local Model": | |
df_result = run_local_anomaly_detection(df) | |
else: | |
df_result = call_groq_api(df) | |
st.subheader("π Anomaly Detection Summary:") | |
anomaly_count = (df_result['anomaly'] == -1).sum() | |
total_count = df_result.shape[0] | |
summary_text = f"Total records: {total_count}\nDetected anomalies: {anomaly_count}" | |
st.text(summary_text) | |
st.dataframe(df_result.head()) | |
fig2d, fig3d = generate_plots(df_result) | |
if fig2d: | |
st.plotly_chart(fig2d, use_container_width=True) | |
if fig3d: | |
st.plotly_chart(fig3d, use_container_width=True) | |
if st.button("β¬οΈ Download Report as PDF"): | |
pdf_data = generate_pdf_report(summary_text, fig2d, fig3d) | |
st.download_button("Download PDF", data=pdf_data, | |
file_name="wifi_anomaly_report.pdf", | |
mime="application/pdf") | |
else: | |
st.info("Anomaly detection is available only for CSV/TXT data.") | |
else: | |
st.info("Please upload a CSV, TXT, or PDF file to begin. π") | |