File size: 5,859 Bytes
5802824
 
 
 
 
 
 
 
 
6beab5b
 
5802824
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
6beab5b
bcdafda
 
 
 
 
 
 
 
 
 
 
5802824
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
6beab5b
 
bcdafda
 
 
 
 
 
 
6beab5b
 
 
 
 
 
 
 
 
 
5802824
 
 
6beab5b
5802824
bcdafda
 
5802824
 
 
bcdafda
5802824
 
 
 
 
 
 
 
 
bcdafda
5802824
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
import numpy as np
import pandas as pd
from statsmodels.tsa.seasonal import seasonal_decompose
from statsmodels.tsa.arima.model import ARIMA
from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import StandardScaler
import gradio as gr
import traceback
import logging
import matplotlib.pyplot as plt


logging.basicConfig(level=logging.ERROR)

class TrendAnalysisAgent:
    def analyze(self, data):
        result = seasonal_decompose(data, model='additive', period=1)
        return result.trend

class SeasonalityDetectionAgent:
    def detect(self, data):
        result = seasonal_decompose(data, model='additive', period=12)
        return result.seasonal

class AnomalyDetectionAgent:
    def detect(self, data):
        scaler = StandardScaler()
        data_scaled = scaler.fit_transform(data.reshape(-1, 1))
        iso_forest = IsolationForest(contamination=0.1, random_state=42)
        anomalies = iso_forest.fit_predict(data_scaled)
        return anomalies == -1
        

def plot_data(data, title, anomalies=None):
    fig, ax = plt.subplots(figsize=(10, 6))
    ax.plot(data, label='Data')
    if anomalies is not None:
        anomaly_indices = np.where(anomalies)[0]
        ax.scatter(anomaly_indices, data[anomaly_indices], color='red', label='Anomalies')
    ax.set_title(title)
    ax.legend()
    plt.close(fig)
    return fig

class FeatureExtractionAgent:
    def extract(self, data):
        features = pd.DataFrame({
            'mean': [np.mean(data)],
            'std': [np.std(data)],
            'min': [np.min(data)],
            'max': [np.max(data)]
        })
        return features

class ForecastingAgent:
    def forecast(self, data, steps):
        model = ARIMA(data, order=(1,1,1))
        results = model.fit()
        forecast = results.forecast(steps=steps)
        return forecast

class RetrievalMechanism:
    def __init__(self):
        self.database = {}
    
    def store(self, key, data):
        self.database[key] = data
    
    def retrieve(self, key):
        return self.database.get(key, None)

class MockLanguageModel:
    def generate_insight(self, data, trend, seasonality, anomalies, features, forecast):
        insight = f"The time series has a mean of {features['mean'].values[0]:.2f} and standard deviation of {features['std'].values[0]:.2f}. "
        insight += f"There {'are' if anomalies.sum() > 1 else 'is'} {anomalies.sum()} anomal{'ies' if anomalies.sum() > 1 else 'y'} detected. "
        insight += f"The forecast suggests a {'upward' if forecast[-1] > data[-1] else 'downward'} trend in the near future."
        return insight

class AgenticRAG:
    def __init__(self):
        self.trend_agent = TrendAnalysisAgent()
        self.seasonality_agent = SeasonalityDetectionAgent()
        self.anomaly_agent = AnomalyDetectionAgent()
        self.feature_agent = FeatureExtractionAgent()
        self.forecasting_agent = ForecastingAgent()
        self.retrieval = RetrievalMechanism()
        self.language_model = MockLanguageModel()
    
    def process(self, data, forecast_steps):
        trend = self.trend_agent.analyze(data)
        seasonality = self.seasonality_agent.detect(data)
        anomalies = self.anomaly_agent.detect(data)
        features = self.feature_agent.extract(data)
        forecast = self.forecasting_agent.forecast(data, forecast_steps)
        
        insight = self.language_model.generate_insight(data, trend, seasonality, anomalies, features, forecast)
        
        return trend, seasonality, anomalies, features, forecast, insight

def analyze_time_series(data, forecast_steps):
    try:
        data = np.array([float(x) for x in data.split(',')])
        if len(data) < 2:
            raise ValueError("Input data must contain at least two values.")
        
        agentic_rag = AgenticRAG()
        trend, seasonality, anomalies, features, forecast, insight = agentic_rag.process(data, forecast_steps)
        
        trend_plot = plot_data(trend, "Trend")
        seasonality_plot = plot_data(seasonality, "Seasonality")
        anomalies_plot = plot_data(data, "Anomalies", anomalies)
        
        full_data = np.concatenate([data, forecast])
        forecast_plot = plot_data(full_data, "Forecast")
        ax = forecast_plot.axes[0]
        ax.axvline(x=len(data) - 1, color='r', linestyle='--', label='Forecast Start')
        ax.legend()
        
        return (
            trend_plot,
            seasonality_plot,
            anomalies_plot,
            features.to_dict(orient='records')[0],
            forecast_plot,
            insight,
            ""  # Empty string for the error output
        )
    except Exception as e:
        error_msg = f"An error occurred: {str(e)}\n{traceback.format_exc()}"
        logging.error(error_msg)
        return (None, None, None, None, None, "", error_msg)

example_input = "120,125,130,140,135,145,150,160,155,165,170,180,175,185,190,200,195,205,210,220,215,225,230,240,235,245,250,260,255,265,270,280,275,285,290,300,295,305,310,320,315,325,330,340,335,345,350,360,355,365,370,380,375,385,390,400,395,405,410,420"

iface = gr.Interface(
    fn=analyze_time_series,
    inputs=[
        gr.Textbox(label="Enter comma-separated time series data", value=example_input),
        gr.Number(label="Number of steps to forecast", value=5)
    ],
    outputs=[
        gr.Plot(label="Trend"),
        gr.Plot(label="Seasonality"),
        gr.Plot(label="Anomalies"),
        gr.JSON(label="Features"),
        gr.Plot(label="Forecast"),
        gr.Textbox(label="Insight"),
        gr.Textbox(label="Error", visible=False)
    ],
    title="Agentic RAG Time Series Analysis",
    description="Enter a comma-separated list of numbers representing your time series data, and specify the number of steps to forecast."
)

if __name__ == "__main__":
    iface.launch()