sunbal7's picture
Update app.py
f91b527 verified
raw
history blame
7.03 kB
import streamlit as st
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from pyod.models.iforest import IForest
from pyod.models.lof import LOF
from pyod.models.ocsvm import OCSVM
from pyod.models.combination import aom, moa, average
from pyod.utils.utility import standardizer
from sklearn.decomposition import PCA
from sklearn.metrics import precision_score, recall_score
import base64
from datetime import datetime
# Configuration
st.set_option('deprecation.showPyplotGlobalUse', False)
def generate_report(data, predictions, model_names, metrics):
report = f"""
Network Anomaly Detection Report
Generated: {datetime.now().strftime("%Y-%m-%d %H:%M:%S")}
-----------------------------------------------
Total Data Points: {len(data)}
Features Analyzed: {', '.join(data.columns)}
Detection Results:
- Total Anomalies Detected: {sum(predictions)}
- Anomaly Percentage: {sum(predictions)/len(data):.2%}
Model Performance:
{metrics.to_markdown()}
Conclusion:
The system detected {sum(predictions)} potential anomalies using ensemble of {len(model_names)} models.
Recommended actions: Investigate flagged points, check network equipment logs, and verify traffic patterns.
"""
return report
def plot_3d_projections(data, predictions):
pca = PCA(n_components=3)
projections = pca.fit_transform(data)
fig = plt.figure(figsize=(10, 7))
ax = fig.add_subplot(111, projection='3d')
normal = projections[predictions == 0]
anomalies = projections[predictions == 1]
ax.scatter(normal[:,0], normal[:,1], normal[:,2], c='b', label='Normal')
ax.scatter(anomalies[:,0], anomalies[:,1], anomalies[:,2], c='r', marker='x', label='Anomaly')
ax.set_xlabel('PC1')
ax.set_ylabel('PC2')
ax.set_zlabel('PC3')
plt.title('3D PCA Projection of Network Data')
plt.legend()
return fig
def main():
st.title("πŸ›œ AI Network Anomaly Detection with Multi-Model Ensemble")
# Sidebar configuration
st.sidebar.header("Model Configuration")
models = st.sidebar.multiselect(
"Select Detection Models",
["Isolation Forest", "Local Outlier Factor", "One-Class SVM"],
default=["Isolation Forest", "Local Outlier Factor"]
)
contamination = st.sidebar.slider("Expected Anomaly Ratio", 0.01, 0.5, 0.1)
ensemble_method = st.sidebar.selectbox("Ensemble Method", ["Average", "MOA", "AOM"])
# Data input section
uploaded_file = st.file_uploader("Upload network data (CSV)", type=["csv"])
if uploaded_file:
data = pd.read_csv(uploaded_file)
st.success("Uploaded data loaded successfully!")
else:
# Generate synthetic network data
np.random.seed(42)
n_samples = 500
data = pd.DataFrame({
"traffic": np.random.normal(100, 15, n_samples),
"latency": np.random.normal(50, 8, n_samples),
"packet_loss": np.random.normal(0.5, 0.2, n_samples),
"error_rate": np.random.normal(0.1, 0.05, n_samples)
})
# Inject anomalies
anomaly_idx = np.random.choice(n_samples, 50, replace=False)
data.loc[anomaly_idx, 'traffic'] *= 2.5
data.loc[anomaly_idx, 'latency'] += 100
data.loc[anomaly_idx, 'packet_loss'] *= 4
st.info("Using synthetic network data. Upload a CSV to use your own.")
# Data preprocessing
numeric_cols = data.select_dtypes(include=np.number).columns.tolist()
X = data[numeric_cols].values
X_norm = standardizer(X)
# Model initialization
model_dict = {
"Isolation Forest": IForest(contamination=contamination, n_jobs=-1),
"Local Outlier Factor": LOF(contamination=contamination, n_jobs=-1),
"One-Class SVM": OCSVM(contamination=contamination)
}
selected_models = [model_dict[m] for m in models]
if not selected_models:
st.error("Please select at least one detection model!")
return
# Training and prediction
st.subheader("Model Training Progress")
progress_bar = st.progress(0)
train_scores = np.zeros([len(X), len(selected_models)])
for i, model in enumerate(selected_models):
model.fit(X_norm)
train_scores[:, i] = model.decision_function(X_norm)
progress_bar.progress((i+1)/len(selected_models))
# Ensemble prediction
if ensemble_method == "Average":
combined_scores = average(train_scores)
elif ensemble_method == "MOA":
combined_scores = moa(train_scores)
else:
combined_scores = aom(train_scores)
threshold = np.percentile(combined_scores, 100*(1-contamination))
predictions = (combined_scores > threshold).astype(int)
# Performance metrics
if uploaded_file is None: # Use synthetic ground truth
y_true = np.zeros(n_samples)
y_true[anomaly_idx] = 1
precision = precision_score(y_true, predictions)
recall = recall_score(y_true, predictions)
else:
precision = recall = "N/A (No ground truth)"
metrics_df = pd.DataFrame({
"Model": models + ["Ensemble"],
"Precision": list([m.decision_scores_.mean() for m in selected_models]) + [precision],
"Recall": list([m.decision_scores_.std() for m in selected_models]) + [recall]
})
# Display results
st.subheader("Detection Results")
col1, col2 = st.columns(2)
with col1:
st.metric("Total Anomalies", sum(predictions))
st.metric("Anomaly Ratio", f"{sum(predictions)/len(data):.2%}")
with col2:
st.metric("Ensemble Precision", f"{precision:.2%}" if isinstance(precision, float) else precision)
st.metric("Ensemble Recall", f"{recall:.2%}" if isinstance(recall, float) else recall)
# Visualization
st.subheader("Data Visualization")
tab1, tab2 = st.tabs(["2D Projection", "3D Projection"])
with tab1:
pca = PCA(n_components=2)
viz_data = pca.fit_transform(X_norm)
plt.figure(figsize=(10, 6))
plt.scatter(viz_data[predictions==0, 0], viz_data[predictions==0, 1],
c='blue', label='Normal', alpha=0.6)
plt.scatter(viz_data[predictions==1, 0], viz_data[predictions==1, 1],
c='red', marker='x', label='Anomaly')
plt.xlabel("Principal Component 1")
plt.ylabel("Principal Component 2")
plt.title("PCA Projection of Network Data")
plt.legend()
st.pyplot()
with tab2:
st.pyplot(plot_3d_projections(X_norm, predictions))
# Generate report
st.subheader("Analysis Report")
report = generate_report(data[numeric_cols], predictions, models, metrics_df)
st.code(report, language='text')
# Report download
st.download_button(
label="Download Full Report",
data=report,
file_name=f"network_anomaly_report_{datetime.now().strftime('%Y%m%d')}.txt",
mime="text/plain"
)
if __name__ == "__main__":
main()