Spaces:
Sleeping
Sleeping
import numpy as np | |
import pandas as pd | |
from statsmodels.tsa.seasonal import seasonal_decompose | |
from statsmodels.tsa.arima.model import ARIMA | |
from sklearn.ensemble import IsolationForest | |
from sklearn.preprocessing import StandardScaler | |
import gradio as gr | |
import traceback | |
import logging | |
import matplotlib.pyplot as plt | |
logging.basicConfig(level=logging.ERROR) | |
class TrendAnalysisAgent: | |
def analyze(self, data): | |
result = seasonal_decompose(data, model='additive', period=1) | |
return result.trend | |
class SeasonalityDetectionAgent: | |
def detect(self, data): | |
result = seasonal_decompose(data, model='additive', period=12) | |
return result.seasonal | |
class AnomalyDetectionAgent: | |
def detect(self, data): | |
scaler = StandardScaler() | |
data_scaled = scaler.fit_transform(data.reshape(-1, 1)) | |
iso_forest = IsolationForest(contamination=0.1, random_state=42) | |
anomalies = iso_forest.fit_predict(data_scaled) | |
return anomalies == -1 | |
def plot_data(data, title, anomalies=None): | |
fig, ax = plt.subplots(figsize=(10, 6)) | |
ax.plot(data, label='Data') | |
if anomalies is not None: | |
anomaly_indices = np.where(anomalies)[0] | |
ax.scatter(anomaly_indices, data[anomaly_indices], color='red', label='Anomalies') | |
ax.set_title(title) | |
ax.legend() | |
plt.close(fig) | |
return fig | |
class FeatureExtractionAgent: | |
def extract(self, data): | |
features = pd.DataFrame({ | |
'mean': [np.mean(data)], | |
'std': [np.std(data)], | |
'min': [np.min(data)], | |
'max': [np.max(data)] | |
}) | |
return features | |
class ForecastingAgent: | |
def forecast(self, data, steps): | |
model = ARIMA(data, order=(1,1,1)) | |
results = model.fit() | |
forecast = results.forecast(steps=steps) | |
return forecast | |
class RetrievalMechanism: | |
def __init__(self): | |
self.database = {} | |
def store(self, key, data): | |
self.database[key] = data | |
def retrieve(self, key): | |
return self.database.get(key, None) | |
class MockLanguageModel: | |
def generate_insight(self, data, trend, seasonality, anomalies, features, forecast): | |
insight = f"The time series has a mean of {features['mean'].values[0]:.2f} and standard deviation of {features['std'].values[0]:.2f}. " | |
insight += f"There {'are' if anomalies.sum() > 1 else 'is'} {anomalies.sum()} anomal{'ies' if anomalies.sum() > 1 else 'y'} detected. " | |
insight += f"The forecast suggests a {'upward' if forecast[-1] > data[-1] else 'downward'} trend in the near future." | |
return insight | |
class AgenticRAG: | |
def __init__(self): | |
self.trend_agent = TrendAnalysisAgent() | |
self.seasonality_agent = SeasonalityDetectionAgent() | |
self.anomaly_agent = AnomalyDetectionAgent() | |
self.feature_agent = FeatureExtractionAgent() | |
self.forecasting_agent = ForecastingAgent() | |
self.retrieval = RetrievalMechanism() | |
self.language_model = MockLanguageModel() | |
def process(self, data, forecast_steps): | |
trend = self.trend_agent.analyze(data) | |
seasonality = self.seasonality_agent.detect(data) | |
anomalies = self.anomaly_agent.detect(data) | |
features = self.feature_agent.extract(data) | |
forecast = self.forecasting_agent.forecast(data, forecast_steps) | |
insight = self.language_model.generate_insight(data, trend, seasonality, anomalies, features, forecast) | |
return trend, seasonality, anomalies, features, forecast, insight | |
def analyze_time_series(data, forecast_steps): | |
try: | |
data = np.array([float(x) for x in data.split(',')]) | |
if len(data) < 2: | |
raise ValueError("Input data must contain at least two values.") | |
agentic_rag = AgenticRAG() | |
trend, seasonality, anomalies, features, forecast, insight = agentic_rag.process(data, forecast_steps) | |
trend_plot = plot_data(trend, "Trend") | |
seasonality_plot = plot_data(seasonality, "Seasonality") | |
anomalies_plot = plot_data(data, "Anomalies", anomalies) | |
full_data = np.concatenate([data, forecast]) | |
forecast_plot = plot_data(full_data, "Forecast") | |
ax = forecast_plot.axes[0] | |
ax.axvline(x=len(data) - 1, color='r', linestyle='--', label='Forecast Start') | |
ax.legend() | |
return ( | |
trend_plot, | |
seasonality_plot, | |
anomalies_plot, | |
features.to_dict(orient='records')[0], | |
forecast_plot, | |
insight, | |
"" # Empty string for the error output | |
) | |
except Exception as e: | |
error_msg = f"An error occurred: {str(e)}\n{traceback.format_exc()}" | |
logging.error(error_msg) | |
return (None, None, None, None, None, "", error_msg) | |
example_input = "120,125,130,140,135,145,150,160,155,165,170,180,175,185,190,200,195,205,210,220,215,225,230,240,235,245,250,260,255,265,270,280,275,285,290,300,295,305,310,320,315,325,330,340,335,345,350,360,355,365,370,380,375,385,390,400,395,405,410,420" | |
iface = gr.Interface( | |
fn=analyze_time_series, | |
inputs=[ | |
gr.Textbox(label="Enter comma-separated time series data", value=example_input), | |
gr.Number(label="Number of steps to forecast", value=5) | |
], | |
outputs=[ | |
gr.Plot(label="Trend"), | |
gr.Plot(label="Seasonality"), | |
gr.Plot(label="Anomalies"), | |
gr.JSON(label="Features"), | |
gr.Plot(label="Forecast"), | |
gr.Textbox(label="Insight"), | |
gr.Textbox(label="Error", visible=False) | |
], | |
title="Agentic RAG Time Series Analysis", | |
description="Enter a comma-separated list of numbers representing your time series data, and specify the number of steps to forecast." | |
) | |
if __name__ == "__main__": | |
iface.launch() |