arjunanand13's picture
Update app.py
bcdafda verified
import numpy as np
import pandas as pd
from statsmodels.tsa.seasonal import seasonal_decompose
from statsmodels.tsa.arima.model import ARIMA
from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import StandardScaler
import gradio as gr
import traceback
import logging
import matplotlib.pyplot as plt
logging.basicConfig(level=logging.ERROR)
class TrendAnalysisAgent:
def analyze(self, data):
result = seasonal_decompose(data, model='additive', period=1)
return result.trend
class SeasonalityDetectionAgent:
def detect(self, data):
result = seasonal_decompose(data, model='additive', period=12)
return result.seasonal
class AnomalyDetectionAgent:
def detect(self, data):
scaler = StandardScaler()
data_scaled = scaler.fit_transform(data.reshape(-1, 1))
iso_forest = IsolationForest(contamination=0.1, random_state=42)
anomalies = iso_forest.fit_predict(data_scaled)
return anomalies == -1
def plot_data(data, title, anomalies=None):
fig, ax = plt.subplots(figsize=(10, 6))
ax.plot(data, label='Data')
if anomalies is not None:
anomaly_indices = np.where(anomalies)[0]
ax.scatter(anomaly_indices, data[anomaly_indices], color='red', label='Anomalies')
ax.set_title(title)
ax.legend()
plt.close(fig)
return fig
class FeatureExtractionAgent:
def extract(self, data):
features = pd.DataFrame({
'mean': [np.mean(data)],
'std': [np.std(data)],
'min': [np.min(data)],
'max': [np.max(data)]
})
return features
class ForecastingAgent:
def forecast(self, data, steps):
model = ARIMA(data, order=(1,1,1))
results = model.fit()
forecast = results.forecast(steps=steps)
return forecast
class RetrievalMechanism:
def __init__(self):
self.database = {}
def store(self, key, data):
self.database[key] = data
def retrieve(self, key):
return self.database.get(key, None)
class MockLanguageModel:
def generate_insight(self, data, trend, seasonality, anomalies, features, forecast):
insight = f"The time series has a mean of {features['mean'].values[0]:.2f} and standard deviation of {features['std'].values[0]:.2f}. "
insight += f"There {'are' if anomalies.sum() > 1 else 'is'} {anomalies.sum()} anomal{'ies' if anomalies.sum() > 1 else 'y'} detected. "
insight += f"The forecast suggests a {'upward' if forecast[-1] > data[-1] else 'downward'} trend in the near future."
return insight
class AgenticRAG:
def __init__(self):
self.trend_agent = TrendAnalysisAgent()
self.seasonality_agent = SeasonalityDetectionAgent()
self.anomaly_agent = AnomalyDetectionAgent()
self.feature_agent = FeatureExtractionAgent()
self.forecasting_agent = ForecastingAgent()
self.retrieval = RetrievalMechanism()
self.language_model = MockLanguageModel()
def process(self, data, forecast_steps):
trend = self.trend_agent.analyze(data)
seasonality = self.seasonality_agent.detect(data)
anomalies = self.anomaly_agent.detect(data)
features = self.feature_agent.extract(data)
forecast = self.forecasting_agent.forecast(data, forecast_steps)
insight = self.language_model.generate_insight(data, trend, seasonality, anomalies, features, forecast)
return trend, seasonality, anomalies, features, forecast, insight
def analyze_time_series(data, forecast_steps):
try:
data = np.array([float(x) for x in data.split(',')])
if len(data) < 2:
raise ValueError("Input data must contain at least two values.")
agentic_rag = AgenticRAG()
trend, seasonality, anomalies, features, forecast, insight = agentic_rag.process(data, forecast_steps)
trend_plot = plot_data(trend, "Trend")
seasonality_plot = plot_data(seasonality, "Seasonality")
anomalies_plot = plot_data(data, "Anomalies", anomalies)
full_data = np.concatenate([data, forecast])
forecast_plot = plot_data(full_data, "Forecast")
ax = forecast_plot.axes[0]
ax.axvline(x=len(data) - 1, color='r', linestyle='--', label='Forecast Start')
ax.legend()
return (
trend_plot,
seasonality_plot,
anomalies_plot,
features.to_dict(orient='records')[0],
forecast_plot,
insight,
"" # Empty string for the error output
)
except Exception as e:
error_msg = f"An error occurred: {str(e)}\n{traceback.format_exc()}"
logging.error(error_msg)
return (None, None, None, None, None, "", error_msg)
example_input = "120,125,130,140,135,145,150,160,155,165,170,180,175,185,190,200,195,205,210,220,215,225,230,240,235,245,250,260,255,265,270,280,275,285,290,300,295,305,310,320,315,325,330,340,335,345,350,360,355,365,370,380,375,385,390,400,395,405,410,420"
iface = gr.Interface(
fn=analyze_time_series,
inputs=[
gr.Textbox(label="Enter comma-separated time series data", value=example_input),
gr.Number(label="Number of steps to forecast", value=5)
],
outputs=[
gr.Plot(label="Trend"),
gr.Plot(label="Seasonality"),
gr.Plot(label="Anomalies"),
gr.JSON(label="Features"),
gr.Plot(label="Forecast"),
gr.Textbox(label="Insight"),
gr.Textbox(label="Error", visible=False)
],
title="Agentic RAG Time Series Analysis",
description="Enter a comma-separated list of numbers representing your time series data, and specify the number of steps to forecast."
)
if __name__ == "__main__":
iface.launch()