Spaces:
Sleeping
Sleeping
File size: 5,859 Bytes
5802824 6beab5b 5802824 6beab5b bcdafda 5802824 6beab5b bcdafda 6beab5b 5802824 6beab5b 5802824 bcdafda 5802824 bcdafda 5802824 bcdafda 5802824 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 |
import numpy as np
import pandas as pd
from statsmodels.tsa.seasonal import seasonal_decompose
from statsmodels.tsa.arima.model import ARIMA
from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import StandardScaler
import gradio as gr
import traceback
import logging
import matplotlib.pyplot as plt
logging.basicConfig(level=logging.ERROR)
class TrendAnalysisAgent:
def analyze(self, data):
result = seasonal_decompose(data, model='additive', period=1)
return result.trend
class SeasonalityDetectionAgent:
def detect(self, data):
result = seasonal_decompose(data, model='additive', period=12)
return result.seasonal
class AnomalyDetectionAgent:
def detect(self, data):
scaler = StandardScaler()
data_scaled = scaler.fit_transform(data.reshape(-1, 1))
iso_forest = IsolationForest(contamination=0.1, random_state=42)
anomalies = iso_forest.fit_predict(data_scaled)
return anomalies == -1
def plot_data(data, title, anomalies=None):
fig, ax = plt.subplots(figsize=(10, 6))
ax.plot(data, label='Data')
if anomalies is not None:
anomaly_indices = np.where(anomalies)[0]
ax.scatter(anomaly_indices, data[anomaly_indices], color='red', label='Anomalies')
ax.set_title(title)
ax.legend()
plt.close(fig)
return fig
class FeatureExtractionAgent:
def extract(self, data):
features = pd.DataFrame({
'mean': [np.mean(data)],
'std': [np.std(data)],
'min': [np.min(data)],
'max': [np.max(data)]
})
return features
class ForecastingAgent:
def forecast(self, data, steps):
model = ARIMA(data, order=(1,1,1))
results = model.fit()
forecast = results.forecast(steps=steps)
return forecast
class RetrievalMechanism:
def __init__(self):
self.database = {}
def store(self, key, data):
self.database[key] = data
def retrieve(self, key):
return self.database.get(key, None)
class MockLanguageModel:
def generate_insight(self, data, trend, seasonality, anomalies, features, forecast):
insight = f"The time series has a mean of {features['mean'].values[0]:.2f} and standard deviation of {features['std'].values[0]:.2f}. "
insight += f"There {'are' if anomalies.sum() > 1 else 'is'} {anomalies.sum()} anomal{'ies' if anomalies.sum() > 1 else 'y'} detected. "
insight += f"The forecast suggests a {'upward' if forecast[-1] > data[-1] else 'downward'} trend in the near future."
return insight
class AgenticRAG:
def __init__(self):
self.trend_agent = TrendAnalysisAgent()
self.seasonality_agent = SeasonalityDetectionAgent()
self.anomaly_agent = AnomalyDetectionAgent()
self.feature_agent = FeatureExtractionAgent()
self.forecasting_agent = ForecastingAgent()
self.retrieval = RetrievalMechanism()
self.language_model = MockLanguageModel()
def process(self, data, forecast_steps):
trend = self.trend_agent.analyze(data)
seasonality = self.seasonality_agent.detect(data)
anomalies = self.anomaly_agent.detect(data)
features = self.feature_agent.extract(data)
forecast = self.forecasting_agent.forecast(data, forecast_steps)
insight = self.language_model.generate_insight(data, trend, seasonality, anomalies, features, forecast)
return trend, seasonality, anomalies, features, forecast, insight
def analyze_time_series(data, forecast_steps):
try:
data = np.array([float(x) for x in data.split(',')])
if len(data) < 2:
raise ValueError("Input data must contain at least two values.")
agentic_rag = AgenticRAG()
trend, seasonality, anomalies, features, forecast, insight = agentic_rag.process(data, forecast_steps)
trend_plot = plot_data(trend, "Trend")
seasonality_plot = plot_data(seasonality, "Seasonality")
anomalies_plot = plot_data(data, "Anomalies", anomalies)
full_data = np.concatenate([data, forecast])
forecast_plot = plot_data(full_data, "Forecast")
ax = forecast_plot.axes[0]
ax.axvline(x=len(data) - 1, color='r', linestyle='--', label='Forecast Start')
ax.legend()
return (
trend_plot,
seasonality_plot,
anomalies_plot,
features.to_dict(orient='records')[0],
forecast_plot,
insight,
"" # Empty string for the error output
)
except Exception as e:
error_msg = f"An error occurred: {str(e)}\n{traceback.format_exc()}"
logging.error(error_msg)
return (None, None, None, None, None, "", error_msg)
example_input = "120,125,130,140,135,145,150,160,155,165,170,180,175,185,190,200,195,205,210,220,215,225,230,240,235,245,250,260,255,265,270,280,275,285,290,300,295,305,310,320,315,325,330,340,335,345,350,360,355,365,370,380,375,385,390,400,395,405,410,420"
iface = gr.Interface(
fn=analyze_time_series,
inputs=[
gr.Textbox(label="Enter comma-separated time series data", value=example_input),
gr.Number(label="Number of steps to forecast", value=5)
],
outputs=[
gr.Plot(label="Trend"),
gr.Plot(label="Seasonality"),
gr.Plot(label="Anomalies"),
gr.JSON(label="Features"),
gr.Plot(label="Forecast"),
gr.Textbox(label="Insight"),
gr.Textbox(label="Error", visible=False)
],
title="Agentic RAG Time Series Analysis",
description="Enter a comma-separated list of numbers representing your time series data, and specify the number of steps to forecast."
)
if __name__ == "__main__":
iface.launch() |