import numpy as np import pandas as pd from statsmodels.tsa.seasonal import seasonal_decompose from statsmodels.tsa.arima.model import ARIMA from sklearn.ensemble import IsolationForest from sklearn.preprocessing import StandardScaler import gradio as gr import traceback import logging import matplotlib.pyplot as plt logging.basicConfig(level=logging.ERROR) class TrendAnalysisAgent: def analyze(self, data): result = seasonal_decompose(data, model='additive', period=1) return result.trend class SeasonalityDetectionAgent: def detect(self, data): result = seasonal_decompose(data, model='additive', period=12) return result.seasonal class AnomalyDetectionAgent: def detect(self, data): scaler = StandardScaler() data_scaled = scaler.fit_transform(data.reshape(-1, 1)) iso_forest = IsolationForest(contamination=0.1, random_state=42) anomalies = iso_forest.fit_predict(data_scaled) return anomalies == -1 def plot_data(data, title, anomalies=None): fig, ax = plt.subplots(figsize=(10, 6)) ax.plot(data, label='Data') if anomalies is not None: anomaly_indices = np.where(anomalies)[0] ax.scatter(anomaly_indices, data[anomaly_indices], color='red', label='Anomalies') ax.set_title(title) ax.legend() plt.close(fig) return fig class FeatureExtractionAgent: def extract(self, data): features = pd.DataFrame({ 'mean': [np.mean(data)], 'std': [np.std(data)], 'min': [np.min(data)], 'max': [np.max(data)] }) return features class ForecastingAgent: def forecast(self, data, steps): model = ARIMA(data, order=(1,1,1)) results = model.fit() forecast = results.forecast(steps=steps) return forecast class RetrievalMechanism: def __init__(self): self.database = {} def store(self, key, data): self.database[key] = data def retrieve(self, key): return self.database.get(key, None) class MockLanguageModel: def generate_insight(self, data, trend, seasonality, anomalies, features, forecast): insight = f"The time series has a mean of {features['mean'].values[0]:.2f} and standard deviation of {features['std'].values[0]:.2f}. " insight += f"There {'are' if anomalies.sum() > 1 else 'is'} {anomalies.sum()} anomal{'ies' if anomalies.sum() > 1 else 'y'} detected. " insight += f"The forecast suggests a {'upward' if forecast[-1] > data[-1] else 'downward'} trend in the near future." return insight class AgenticRAG: def __init__(self): self.trend_agent = TrendAnalysisAgent() self.seasonality_agent = SeasonalityDetectionAgent() self.anomaly_agent = AnomalyDetectionAgent() self.feature_agent = FeatureExtractionAgent() self.forecasting_agent = ForecastingAgent() self.retrieval = RetrievalMechanism() self.language_model = MockLanguageModel() def process(self, data, forecast_steps): trend = self.trend_agent.analyze(data) seasonality = self.seasonality_agent.detect(data) anomalies = self.anomaly_agent.detect(data) features = self.feature_agent.extract(data) forecast = self.forecasting_agent.forecast(data, forecast_steps) insight = self.language_model.generate_insight(data, trend, seasonality, anomalies, features, forecast) return trend, seasonality, anomalies, features, forecast, insight def analyze_time_series(data, forecast_steps): try: data = np.array([float(x) for x in data.split(',')]) if len(data) < 2: raise ValueError("Input data must contain at least two values.") agentic_rag = AgenticRAG() trend, seasonality, anomalies, features, forecast, insight = agentic_rag.process(data, forecast_steps) trend_plot = plot_data(trend, "Trend") seasonality_plot = plot_data(seasonality, "Seasonality") anomalies_plot = plot_data(data, "Anomalies", anomalies) full_data = np.concatenate([data, forecast]) forecast_plot = plot_data(full_data, "Forecast") ax = forecast_plot.axes[0] ax.axvline(x=len(data) - 1, color='r', linestyle='--', label='Forecast Start') ax.legend() return ( trend_plot, seasonality_plot, anomalies_plot, features.to_dict(orient='records')[0], forecast_plot, insight, "" # Empty string for the error output ) except Exception as e: error_msg = f"An error occurred: {str(e)}\n{traceback.format_exc()}" logging.error(error_msg) return (None, None, None, None, None, "", error_msg) example_input = "120,125,130,140,135,145,150,160,155,165,170,180,175,185,190,200,195,205,210,220,215,225,230,240,235,245,250,260,255,265,270,280,275,285,290,300,295,305,310,320,315,325,330,340,335,345,350,360,355,365,370,380,375,385,390,400,395,405,410,420" iface = gr.Interface( fn=analyze_time_series, inputs=[ gr.Textbox(label="Enter comma-separated time series data", value=example_input), gr.Number(label="Number of steps to forecast", value=5) ], outputs=[ gr.Plot(label="Trend"), gr.Plot(label="Seasonality"), gr.Plot(label="Anomalies"), gr.JSON(label="Features"), gr.Plot(label="Forecast"), gr.Textbox(label="Insight"), gr.Textbox(label="Error", visible=False) ], title="Agentic RAG Time Series Analysis", description="Enter a comma-separated list of numbers representing your time series data, and specify the number of steps to forecast." ) if __name__ == "__main__": iface.launch()