Spaces:
Sleeping
Sleeping
import streamlit as st | |
import pandas as pd | |
import numpy as np | |
import matplotlib.pyplot as plt | |
from pyod.models.iforest import IForest | |
from pyod.models.lof import LOF | |
def main(): | |
st.title("AI-Based Network Anomaly Detection (Predictive Maintenance)") | |
st.markdown( | |
""" | |
This application uses AI to detect unusual behavior in a network before it leads to failure. | |
By leveraging open source models and PyOD, it predicts potential issues, enabling proactive maintenance. | |
""" | |
) | |
# Sidebar settings for model and parameters | |
st.sidebar.header("Settings") | |
model_choice = st.sidebar.selectbox("Select Anomaly Detection Model", ("Isolation Forest", "Local Outlier Factor")) | |
contamination = st.sidebar.slider("Contamination (Expected anomaly ratio)", 0.0, 0.5, 0.1) | |
uploaded_file = st.file_uploader("Upload CSV file with network data", type=["csv"]) | |
if uploaded_file is not None: | |
data = pd.read_csv(uploaded_file) | |
st.write("### Data Preview") | |
st.dataframe(data.head()) | |
else: | |
st.info("No file uploaded. Generating synthetic network data for demonstration.") | |
# Generate synthetic data with features like traffic, latency, and packet_loss | |
np.random.seed(42) | |
n_samples = 300 | |
traffic = np.random.normal(100, 10, n_samples) | |
latency = np.random.normal(50, 5, n_samples) | |
packet_loss = np.random.normal(0.5, 0.1, n_samples) | |
# Introduce anomalies by modifying a subset of data points | |
anomaly_indices = np.random.choice(n_samples, size=20, replace=False) | |
traffic[anomaly_indices] *= 1.5 | |
latency[anomaly_indices] *= 2 | |
packet_loss[anomaly_indices] *= 5 | |
data = pd.DataFrame({ | |
"traffic": traffic, | |
"latency": latency, | |
"packet_loss": packet_loss | |
}) | |
st.write("### Synthetic Data") | |
st.dataframe(data.head()) | |
# Use only numeric features for anomaly detection | |
features = data.select_dtypes(include=[np.number]).columns.tolist() | |
if not features: | |
st.error("No numeric columns found in the data for anomaly detection.") | |
return | |
X = data[features].values | |
# Initialize the selected model from PyOD | |
if model_choice == "Isolation Forest": | |
model = IForest(contamination=contamination) | |
elif model_choice == "Local Outlier Factor": | |
model = LOF(contamination=contamination) | |
# Fit the model and predict anomalies (0: normal, 1: anomaly) | |
model.fit(X) | |
predictions = model.labels_ | |
data["anomaly"] = predictions | |
st.subheader("Anomaly Detection Results") | |
st.write(data.head()) | |
n_anomalies = np.sum(predictions) | |
st.write(f"Detected **{n_anomalies}** anomalies out of **{len(data)}** data points.") | |
# Visualization (if at least 2 numeric features are available) | |
if len(features) >= 2: | |
st.subheader("Visualization") | |
fig, ax = plt.subplots() | |
# Plot using the first two numeric features | |
x_feature = features[0] | |
y_feature = features[1] | |
normal_data = data[data["anomaly"] == 0] | |
anomaly_data = data[data["anomaly"] == 1] | |
ax.scatter(normal_data[x_feature], normal_data[y_feature], label="Normal", color="blue", alpha=0.5) | |
ax.scatter(anomaly_data[x_feature], anomaly_data[y_feature], label="Anomaly", color="red", marker="x") | |
ax.set_xlabel(x_feature) | |
ax.set_ylabel(y_feature) | |
ax.legend() | |
st.pyplot(fig) | |
if __name__ == "__main__": | |
main() | |