|
""" |
|
Economic Forecasting Module |
|
Advanced time series forecasting for economic indicators using ARIMA/ETS models |
|
""" |
|
|
|
import logging |
|
import warnings |
|
from datetime import datetime, timedelta |
|
from typing import Dict, List, Optional, Tuple, Union |
|
|
|
import numpy as np |
|
import pandas as pd |
|
from scipy import stats |
|
from sklearn.metrics import mean_absolute_error, mean_squared_error |
|
from statsmodels.tsa.arima.model import ARIMA |
|
from statsmodels.tsa.holtwinters import ExponentialSmoothing |
|
from statsmodels.tsa.seasonal import seasonal_decompose |
|
from statsmodels.tsa.stattools import adfuller |
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
class EconomicForecaster: |
|
""" |
|
Advanced economic forecasting using ARIMA and ETS models |
|
with comprehensive backtesting and performance evaluation |
|
""" |
|
|
|
def __init__(self, data: pd.DataFrame): |
|
""" |
|
Initialize forecaster with economic data |
|
|
|
Args: |
|
data: DataFrame with economic indicators (GDPC1, INDPRO, RSAFS, etc.) |
|
""" |
|
self.data = data.copy() |
|
self.forecasts = {} |
|
self.backtest_results = {} |
|
self.model_performance = {} |
|
|
|
def prepare_data(self, target_series: str, frequency: str = 'Q') -> pd.Series: |
|
""" |
|
Prepare time series data for forecasting |
|
|
|
Args: |
|
target_series: Series name to forecast |
|
frequency: Data frequency ('Q' for quarterly, 'M' for monthly) |
|
|
|
Returns: |
|
Prepared time series |
|
""" |
|
if target_series not in self.data.columns: |
|
raise ValueError(f"Series {target_series} not found in data") |
|
|
|
series = self.data[target_series].dropna() |
|
|
|
|
|
if frequency == 'Q': |
|
series = series.resample('Q').mean() |
|
elif frequency == 'M': |
|
series = series.resample('M').mean() |
|
|
|
|
|
if target_series in ['GDPC1', 'INDPRO', 'RSAFS']: |
|
series = series.pct_change().dropna() |
|
|
|
return series |
|
|
|
def check_stationarity(self, series: pd.Series) -> Dict: |
|
""" |
|
Perform Augmented Dickey-Fuller test for stationarity |
|
|
|
Args: |
|
series: Time series to test |
|
|
|
Returns: |
|
Dictionary with test results |
|
""" |
|
result = adfuller(series.dropna()) |
|
|
|
return { |
|
'adf_statistic': result[0], |
|
'p_value': result[1], |
|
'critical_values': result[4], |
|
'is_stationary': result[1] < 0.05 |
|
} |
|
|
|
def decompose_series(self, series: pd.Series, period: int = 4) -> Dict: |
|
""" |
|
Decompose time series into trend, seasonal, and residual components |
|
|
|
Args: |
|
series: Time series to decompose |
|
period: Seasonal period (4 for quarterly, 12 for monthly) |
|
|
|
Returns: |
|
Dictionary with decomposition components |
|
""" |
|
decomposition = seasonal_decompose(series.dropna(), period=period, extrapolate_trend='freq') |
|
|
|
return { |
|
'trend': decomposition.trend, |
|
'seasonal': decomposition.seasonal, |
|
'residual': decomposition.resid, |
|
'observed': decomposition.observed |
|
} |
|
|
|
def fit_arima_model(self, series: pd.Series, order: Tuple[int, int, int] = None) -> ARIMA: |
|
""" |
|
Fit ARIMA model to time series |
|
|
|
Args: |
|
series: Time series data |
|
order: ARIMA order (p, d, q). If None, auto-detect |
|
|
|
Returns: |
|
Fitted ARIMA model |
|
""" |
|
if order is None: |
|
|
|
best_aic = np.inf |
|
best_order = (1, 1, 1) |
|
|
|
for p in range(0, 3): |
|
for d in range(0, 2): |
|
for q in range(0, 3): |
|
try: |
|
model = ARIMA(series, order=(p, d, q)) |
|
fitted_model = model.fit() |
|
if fitted_model.aic < best_aic: |
|
best_aic = fitted_model.aic |
|
best_order = (p, d, q) |
|
except: |
|
continue |
|
|
|
order = best_order |
|
logger.info(f"Auto-detected ARIMA order: {order}") |
|
|
|
model = ARIMA(series, order=order) |
|
fitted_model = model.fit() |
|
|
|
return fitted_model |
|
|
|
def fit_ets_model(self, series: pd.Series, seasonal_periods: int = 4) -> ExponentialSmoothing: |
|
""" |
|
Fit ETS (Exponential Smoothing) model to time series |
|
|
|
Args: |
|
series: Time series data |
|
seasonal_periods: Number of seasonal periods |
|
|
|
Returns: |
|
Fitted ETS model |
|
""" |
|
model = ExponentialSmoothing( |
|
series, |
|
seasonal_periods=seasonal_periods, |
|
trend='add', |
|
seasonal='add' |
|
) |
|
fitted_model = model.fit() |
|
|
|
return fitted_model |
|
|
|
def forecast_series(self, series: pd.Series, model_type: str = 'auto', |
|
forecast_periods: int = 4) -> Dict: |
|
""" |
|
Forecast time series using specified model |
|
|
|
Args: |
|
series: Time series to forecast |
|
model_type: 'arima', 'ets', or 'auto' |
|
forecast_periods: Number of periods to forecast |
|
|
|
Returns: |
|
Dictionary with forecast results |
|
""" |
|
if model_type == 'auto': |
|
|
|
try: |
|
arima_model = self.fit_arima_model(series) |
|
arima_aic = arima_model.aic |
|
except: |
|
arima_aic = np.inf |
|
|
|
try: |
|
ets_model = self.fit_ets_model(series) |
|
ets_aic = ets_model.aic |
|
except: |
|
ets_aic = np.inf |
|
|
|
if arima_aic < ets_aic: |
|
model_type = 'arima' |
|
model = arima_model |
|
else: |
|
model_type = 'ets' |
|
model = ets_model |
|
elif model_type == 'arima': |
|
model = self.fit_arima_model(series) |
|
elif model_type == 'ets': |
|
model = self.fit_ets_model(series) |
|
else: |
|
raise ValueError("model_type must be 'arima', 'ets', or 'auto'") |
|
|
|
|
|
forecast = model.forecast(steps=forecast_periods) |
|
|
|
|
|
if model_type == 'arima': |
|
forecast_ci = model.get_forecast(steps=forecast_periods).conf_int() |
|
else: |
|
|
|
forecast_std = series.std() |
|
forecast_ci = pd.DataFrame({ |
|
'lower': forecast - 1.96 * forecast_std, |
|
'upper': forecast + 1.96 * forecast_std |
|
}) |
|
|
|
return { |
|
'model': model, |
|
'model_type': model_type, |
|
'forecast': forecast, |
|
'confidence_intervals': forecast_ci, |
|
'aic': model.aic if hasattr(model, 'aic') else None |
|
} |
|
|
|
def backtest_forecast(self, series: pd.Series, model_type: str = 'auto', |
|
train_size: float = 0.8, test_periods: int = 8) -> Dict: |
|
""" |
|
Perform backtesting of forecasting models |
|
|
|
Args: |
|
series: Time series to backtest |
|
model_type: Model type to use |
|
train_size: Proportion of data for training |
|
test_periods: Number of periods to test |
|
|
|
Returns: |
|
Dictionary with backtest results |
|
""" |
|
n = len(series) |
|
train_end = int(n * train_size) |
|
|
|
actual_values = [] |
|
predicted_values = [] |
|
errors = [] |
|
|
|
for i in range(test_periods): |
|
if train_end + i >= n: |
|
break |
|
|
|
|
|
train_data = series.iloc[:train_end + i] |
|
test_value = series.iloc[train_end + i] |
|
|
|
try: |
|
forecast_result = self.forecast_series(train_data, model_type, 1) |
|
prediction = forecast_result['forecast'].iloc[0] |
|
|
|
actual_values.append(test_value) |
|
predicted_values.append(prediction) |
|
errors.append(test_value - prediction) |
|
|
|
except Exception as e: |
|
logger.warning(f"Forecast failed at step {i}: {e}") |
|
continue |
|
|
|
if not actual_values: |
|
return {'error': 'No successful forecasts generated'} |
|
|
|
|
|
mae = mean_absolute_error(actual_values, predicted_values) |
|
mse = mean_squared_error(actual_values, predicted_values) |
|
rmse = np.sqrt(mse) |
|
mape = np.mean(np.abs(np.array(actual_values) - np.array(predicted_values)) / np.abs(actual_values)) * 100 |
|
|
|
return { |
|
'actual_values': actual_values, |
|
'predicted_values': predicted_values, |
|
'errors': errors, |
|
'mae': mae, |
|
'mse': mse, |
|
'rmse': rmse, |
|
'mape': mape, |
|
'test_periods': len(actual_values) |
|
} |
|
|
|
def forecast_economic_indicators(self, indicators: List[str] = None) -> Dict: |
|
""" |
|
Forecast multiple economic indicators |
|
|
|
Args: |
|
indicators: List of indicators to forecast. If None, use default set |
|
|
|
Returns: |
|
Dictionary with forecasts for all indicators |
|
""" |
|
if indicators is None: |
|
indicators = ['GDPC1', 'INDPRO', 'RSAFS'] |
|
|
|
results = {} |
|
|
|
for indicator in indicators: |
|
try: |
|
|
|
series = self.prepare_data(indicator) |
|
|
|
|
|
stationarity = self.check_stationarity(series) |
|
|
|
|
|
decomposition = self.decompose_series(series) |
|
|
|
|
|
forecast_result = self.forecast_series(series) |
|
|
|
|
|
backtest_result = self.backtest_forecast(series) |
|
|
|
results[indicator] = { |
|
'stationarity': stationarity, |
|
'decomposition': decomposition, |
|
'forecast': forecast_result, |
|
'backtest': backtest_result, |
|
'series': series |
|
} |
|
|
|
logger.info(f"Successfully forecasted {indicator}") |
|
|
|
except Exception as e: |
|
logger.error(f"Failed to forecast {indicator}: {e}") |
|
results[indicator] = {'error': str(e)} |
|
|
|
return results |
|
|
|
def generate_forecast_report(self, forecasts: Dict) -> str: |
|
""" |
|
Generate comprehensive forecast report |
|
|
|
Args: |
|
forecasts: Dictionary with forecast results |
|
|
|
Returns: |
|
Formatted report string |
|
""" |
|
report = "ECONOMIC FORECASTING REPORT\n" |
|
report += "=" * 50 + "\n\n" |
|
|
|
for indicator, result in forecasts.items(): |
|
if 'error' in result: |
|
report += f"{indicator}: ERROR - {result['error']}\n\n" |
|
continue |
|
|
|
report += f"INDICATOR: {indicator}\n" |
|
report += "-" * 30 + "\n" |
|
|
|
|
|
stationarity = result['stationarity'] |
|
report += f"Stationarity Test (ADF):\n" |
|
report += f" ADF Statistic: {stationarity['adf_statistic']:.4f}\n" |
|
report += f" P-value: {stationarity['p_value']:.4f}\n" |
|
report += f" Is Stationary: {stationarity['is_stationary']}\n\n" |
|
|
|
|
|
forecast = result['forecast'] |
|
report += f"Model: {forecast['model_type'].upper()}\n" |
|
if forecast['aic']: |
|
report += f"AIC: {forecast['aic']:.4f}\n" |
|
report += f"Forecast Periods: {len(forecast['forecast'])}\n\n" |
|
|
|
|
|
backtest = result['backtest'] |
|
if 'error' not in backtest: |
|
report += f"Backtest Performance:\n" |
|
report += f" MAE: {backtest['mae']:.4f}\n" |
|
report += f" RMSE: {backtest['rmse']:.4f}\n" |
|
report += f" MAPE: {backtest['mape']:.2f}%\n" |
|
report += f" Test Periods: {backtest['test_periods']}\n\n" |
|
|
|
|
|
report += f"Forecast Values:\n" |
|
for i, value in enumerate(forecast['forecast']): |
|
ci = forecast['confidence_intervals'] |
|
lower = ci.iloc[i]['lower'] if 'lower' in ci.columns else 'N/A' |
|
upper = ci.iloc[i]['upper'] if 'upper' in ci.columns else 'N/A' |
|
report += f" Period {i+1}: {value:.4f} [{lower:.4f}, {upper:.4f}]\n" |
|
|
|
report += "\n" + "=" * 50 + "\n\n" |
|
|
|
return report |