|
""" |
|
Statistical Modeling Module |
|
Advanced statistical analysis for economic indicators including regression, correlation, and diagnostics |
|
""" |
|
|
|
import logging |
|
from typing import Dict, List, Optional, Tuple, Union |
|
|
|
import numpy as np |
|
import pandas as pd |
|
from scipy import stats |
|
from sklearn.linear_model import LinearRegression |
|
from sklearn.metrics import r2_score, mean_squared_error |
|
from sklearn.preprocessing import StandardScaler |
|
from statsmodels.stats.diagnostic import het_breuschpagan |
|
from statsmodels.stats.outliers_influence import variance_inflation_factor |
|
from statsmodels.stats.stattools import durbin_watson |
|
from statsmodels.tsa.stattools import adfuller, kpss |
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
class StatisticalModeling: |
|
""" |
|
Advanced statistical modeling for economic indicators |
|
including regression analysis, correlation analysis, and diagnostic testing |
|
""" |
|
|
|
def __init__(self, data: pd.DataFrame): |
|
""" |
|
Initialize statistical modeling with economic data |
|
|
|
Args: |
|
data: DataFrame with economic indicators |
|
""" |
|
self.data = data.copy() |
|
self.models = {} |
|
self.diagnostics = {} |
|
self.correlations = {} |
|
|
|
def prepare_regression_data(self, target: str, predictors: List[str] = None, |
|
lag_periods: int = 4) -> Tuple[pd.DataFrame, pd.Series]: |
|
""" |
|
Prepare data for regression analysis with lagged variables |
|
|
|
Args: |
|
target: Target variable name |
|
predictors: List of predictor variables. If None, use all other numeric columns |
|
lag_periods: Number of lag periods to include |
|
|
|
Returns: |
|
Tuple of (features DataFrame, target Series) |
|
""" |
|
if target not in self.data.columns: |
|
raise ValueError(f"Target variable {target} not found in data") |
|
|
|
if predictors is None: |
|
predictors = [col for col in self.data.select_dtypes(include=[np.number]).columns |
|
if col != target] |
|
|
|
|
|
growth_data = self.data[[target] + predictors].pct_change().dropna() |
|
|
|
|
|
feature_data = {} |
|
|
|
for predictor in predictors: |
|
|
|
feature_data[predictor] = growth_data[predictor] |
|
|
|
|
|
for lag in range(1, lag_periods + 1): |
|
feature_data[f"{predictor}_lag{lag}"] = growth_data[predictor].shift(lag) |
|
|
|
|
|
for lag in range(1, lag_periods + 1): |
|
feature_data[f"{target}_lag{lag}"] = growth_data[target].shift(lag) |
|
|
|
|
|
features_df = pd.DataFrame(feature_data) |
|
features_df = features_df.dropna() |
|
|
|
|
|
target_series = growth_data[target].iloc[features_df.index] |
|
|
|
return features_df, target_series |
|
|
|
def fit_regression_model(self, target: str, predictors: List[str] = None, |
|
lag_periods: int = 4, include_interactions: bool = False) -> Dict: |
|
""" |
|
Fit linear regression model with diagnostic testing |
|
|
|
Args: |
|
target: Target variable name |
|
predictors: List of predictor variables |
|
lag_periods: Number of lag periods to include |
|
include_interactions: Whether to include interaction terms |
|
|
|
Returns: |
|
Dictionary with model results and diagnostics |
|
""" |
|
|
|
features_df, target_series = self.prepare_regression_data(target, predictors, lag_periods) |
|
|
|
if include_interactions: |
|
|
|
interaction_features = [] |
|
feature_cols = features_df.columns.tolist() |
|
|
|
for i, col1 in enumerate(feature_cols): |
|
for col2 in feature_cols[i+1:]: |
|
interaction_name = f"{col1}_x_{col2}" |
|
interaction_features.append(features_df[col1] * features_df[col2]) |
|
features_df[interaction_name] = interaction_features[-1] |
|
|
|
|
|
scaler = StandardScaler() |
|
features_scaled = scaler.fit_transform(features_df) |
|
features_scaled_df = pd.DataFrame(features_scaled, |
|
index=features_df.index, |
|
columns=features_df.columns) |
|
|
|
|
|
model = LinearRegression() |
|
model.fit(features_scaled_df, target_series) |
|
|
|
|
|
predictions = model.predict(features_scaled_df) |
|
residuals = target_series - predictions |
|
|
|
|
|
r2 = r2_score(target_series, predictions) |
|
mse = mean_squared_error(target_series, predictions) |
|
rmse = np.sqrt(mse) |
|
|
|
|
|
coefficients = pd.DataFrame({ |
|
'variable': features_df.columns, |
|
'coefficient': model.coef_, |
|
'abs_coefficient': np.abs(model.coef_) |
|
}).sort_values('abs_coefficient', ascending=False) |
|
|
|
|
|
diagnostics = self.perform_regression_diagnostics(features_scaled_df, target_series, |
|
predictions, residuals) |
|
|
|
return { |
|
'model': model, |
|
'scaler': scaler, |
|
'features': features_df, |
|
'target': target_series, |
|
'predictions': predictions, |
|
'residuals': residuals, |
|
'coefficients': coefficients, |
|
'performance': { |
|
'r2': r2, |
|
'mse': mse, |
|
'rmse': rmse, |
|
'mae': np.mean(np.abs(residuals)) |
|
}, |
|
'diagnostics': diagnostics |
|
} |
|
|
|
def perform_regression_diagnostics(self, features: pd.DataFrame, target: pd.Series, |
|
predictions: np.ndarray, residuals: pd.Series) -> Dict: |
|
""" |
|
Perform comprehensive regression diagnostics |
|
|
|
Args: |
|
features: Feature matrix |
|
target: Target variable |
|
predictions: Model predictions |
|
residuals: Model residuals |
|
|
|
Returns: |
|
Dictionary with diagnostic test results |
|
""" |
|
diagnostics = {} |
|
|
|
|
|
try: |
|
normality_stat, normality_p = stats.shapiro(residuals) |
|
diagnostics['normality'] = { |
|
'statistic': normality_stat, |
|
'p_value': normality_p, |
|
'is_normal': normality_p > 0.05 |
|
} |
|
except: |
|
diagnostics['normality'] = {'error': 'Test failed'} |
|
|
|
|
|
try: |
|
bp_stat, bp_p, bp_f, bp_f_p = het_breuschpagan(residuals, features) |
|
diagnostics['homoscedasticity'] = { |
|
'statistic': bp_stat, |
|
'p_value': bp_p, |
|
'f_statistic': bp_f, |
|
'f_p_value': bp_f_p, |
|
'is_homoscedastic': bp_p > 0.05 |
|
} |
|
except: |
|
diagnostics['homoscedasticity'] = {'error': 'Test failed'} |
|
|
|
|
|
try: |
|
dw_stat = durbin_watson(residuals) |
|
diagnostics['autocorrelation'] = { |
|
'statistic': dw_stat, |
|
'interpretation': self._interpret_durbin_watson(dw_stat) |
|
} |
|
except: |
|
diagnostics['autocorrelation'] = {'error': 'Test failed'} |
|
|
|
|
|
try: |
|
vif_scores = {} |
|
for i, col in enumerate(features.columns): |
|
vif = variance_inflation_factor(features.values, i) |
|
vif_scores[col] = vif |
|
|
|
diagnostics['multicollinearity'] = { |
|
'vif_scores': vif_scores, |
|
'high_vif_variables': [var for var, vif in vif_scores.items() if vif > 10], |
|
'mean_vif': np.mean(list(vif_scores.values())) |
|
} |
|
except: |
|
diagnostics['multicollinearity'] = {'error': 'Test failed'} |
|
|
|
|
|
try: |
|
|
|
adf_result = adfuller(target) |
|
diagnostics['stationarity_adf'] = { |
|
'statistic': adf_result[0], |
|
'p_value': adf_result[1], |
|
'is_stationary': adf_result[1] < 0.05 |
|
} |
|
|
|
|
|
kpss_result = kpss(target, regression='c') |
|
diagnostics['stationarity_kpss'] = { |
|
'statistic': kpss_result[0], |
|
'p_value': kpss_result[1], |
|
'is_stationary': kpss_result[1] > 0.05 |
|
} |
|
except: |
|
diagnostics['stationarity'] = {'error': 'Test failed'} |
|
|
|
return diagnostics |
|
|
|
def _interpret_durbin_watson(self, dw_stat: float) -> str: |
|
"""Interpret Durbin-Watson statistic""" |
|
if dw_stat < 1.5: |
|
return "Positive autocorrelation" |
|
elif dw_stat > 2.5: |
|
return "Negative autocorrelation" |
|
else: |
|
return "No significant autocorrelation" |
|
|
|
def analyze_correlations(self, indicators: List[str] = None, |
|
method: str = 'pearson') -> Dict: |
|
""" |
|
Perform comprehensive correlation analysis |
|
|
|
Args: |
|
indicators: List of indicators to analyze. If None, use all numeric columns |
|
method: Correlation method ('pearson', 'spearman', 'kendall') |
|
|
|
Returns: |
|
Dictionary with correlation analysis results |
|
""" |
|
if indicators is None: |
|
indicators = self.data.select_dtypes(include=[np.number]).columns.tolist() |
|
|
|
|
|
growth_data = self.data[indicators].pct_change().dropna() |
|
|
|
|
|
corr_matrix = growth_data.corr(method=method) |
|
|
|
|
|
significant_correlations = [] |
|
for i in range(len(corr_matrix.columns)): |
|
for j in range(i+1, len(corr_matrix.columns)): |
|
var1 = corr_matrix.columns[i] |
|
var2 = corr_matrix.columns[j] |
|
corr_value = corr_matrix.iloc[i, j] |
|
|
|
|
|
n = len(growth_data) |
|
t_stat = corr_value * np.sqrt((n-2) / (1-corr_value**2)) |
|
p_value = 2 * (1 - stats.t.cdf(abs(t_stat), n-2)) |
|
|
|
if p_value < 0.05: |
|
significant_correlations.append({ |
|
'variable1': var1, |
|
'variable2': var2, |
|
'correlation': corr_value, |
|
'p_value': p_value, |
|
'strength': self._interpret_correlation_strength(abs(corr_value)) |
|
}) |
|
|
|
|
|
significant_correlations.sort(key=lambda x: abs(x['correlation']), reverse=True) |
|
|
|
|
|
try: |
|
pca = self._perform_pca_analysis(growth_data) |
|
except Exception as e: |
|
logger.warning(f"PCA analysis failed: {e}") |
|
pca = {'error': str(e)} |
|
|
|
return { |
|
'correlation_matrix': corr_matrix, |
|
'significant_correlations': significant_correlations, |
|
'method': method, |
|
'pca_analysis': pca |
|
} |
|
|
|
def _interpret_correlation_strength(self, corr_value: float) -> str: |
|
"""Interpret correlation strength""" |
|
if corr_value >= 0.8: |
|
return "Very Strong" |
|
elif corr_value >= 0.6: |
|
return "Strong" |
|
elif corr_value >= 0.4: |
|
return "Moderate" |
|
elif corr_value >= 0.2: |
|
return "Weak" |
|
else: |
|
return "Very Weak" |
|
|
|
def _perform_pca_analysis(self, data: pd.DataFrame) -> Dict: |
|
"""Perform Principal Component Analysis""" |
|
from sklearn.decomposition import PCA |
|
|
|
|
|
scaler = StandardScaler() |
|
data_scaled = scaler.fit_transform(data) |
|
|
|
|
|
pca = PCA() |
|
pca_result = pca.fit_transform(data_scaled) |
|
|
|
|
|
explained_variance = pca.explained_variance_ratio_ |
|
cumulative_variance = np.cumsum(explained_variance) |
|
|
|
|
|
loadings = pd.DataFrame( |
|
pca.components_.T, |
|
columns=[f'PC{i+1}' for i in range(pca.n_components_)], |
|
index=data.columns |
|
) |
|
|
|
return { |
|
'explained_variance': explained_variance, |
|
'cumulative_variance': cumulative_variance, |
|
'loadings': loadings, |
|
'n_components': pca.n_components_, |
|
'components_to_explain_80_percent': np.argmax(cumulative_variance >= 0.8) + 1 |
|
} |
|
|
|
def perform_granger_causality(self, target: str, predictor: str, |
|
max_lags: int = 4) -> Dict: |
|
""" |
|
Perform Granger causality test |
|
|
|
Args: |
|
target: Target variable |
|
predictor: Predictor variable |
|
max_lags: Maximum number of lags to test |
|
|
|
Returns: |
|
Dictionary with Granger causality test results |
|
""" |
|
try: |
|
from statsmodels.tsa.stattools import grangercausalitytests |
|
|
|
|
|
growth_data = self.data[[target, predictor]].pct_change().dropna() |
|
|
|
|
|
test_data = growth_data[[predictor, target]] |
|
gc_result = grangercausalitytests(test_data, maxlag=max_lags, verbose=False) |
|
|
|
|
|
results = {} |
|
for lag in range(1, max_lags + 1): |
|
if lag in gc_result: |
|
lag_result = gc_result[lag] |
|
results[lag] = { |
|
'f_statistic': lag_result[0]['ssr_ftest'][0], |
|
'p_value': lag_result[0]['ssr_ftest'][1], |
|
'is_significant': lag_result[0]['ssr_ftest'][1] < 0.05 |
|
} |
|
|
|
|
|
min_p_value = min([result['p_value'] for result in results.values()]) |
|
overall_significant = min_p_value < 0.05 |
|
|
|
return { |
|
'results_by_lag': results, |
|
'min_p_value': min_p_value, |
|
'is_causal': overall_significant, |
|
'optimal_lag': min(results.keys(), key=lambda k: results[k]['p_value']) |
|
} |
|
|
|
except Exception as e: |
|
logger.error(f"Granger causality test failed: {e}") |
|
return {'error': str(e)} |
|
|
|
def generate_statistical_report(self, regression_results: Dict = None, |
|
correlation_results: Dict = None, |
|
causality_results: Dict = None) -> str: |
|
""" |
|
Generate comprehensive statistical analysis report |
|
|
|
Args: |
|
regression_results: Results from regression analysis |
|
correlation_results: Results from correlation analysis |
|
causality_results: Results from causality analysis |
|
|
|
Returns: |
|
Formatted report string |
|
""" |
|
report = "STATISTICAL MODELING REPORT\n" |
|
report += "=" * 50 + "\n\n" |
|
|
|
if regression_results: |
|
report += "REGRESSION ANALYSIS\n" |
|
report += "-" * 30 + "\n" |
|
|
|
|
|
performance = regression_results['performance'] |
|
report += f"Model Performance:\n" |
|
report += f" R²: {performance['r2']:.4f}\n" |
|
report += f" RMSE: {performance['rmse']:.4f}\n" |
|
report += f" MAE: {performance['mae']:.4f}\n\n" |
|
|
|
|
|
coefficients = regression_results['coefficients'] |
|
report += f"Top 5 Most Important Variables:\n" |
|
for i, row in coefficients.head().iterrows(): |
|
report += f" {row['variable']}: {row['coefficient']:.4f}\n" |
|
report += "\n" |
|
|
|
|
|
diagnostics = regression_results['diagnostics'] |
|
report += f"Model Diagnostics:\n" |
|
|
|
if 'normality' in diagnostics and 'error' not in diagnostics['normality']: |
|
norm = diagnostics['normality'] |
|
report += f" Normality (Shapiro-Wilk): p={norm['p_value']:.4f} " |
|
report += f"({'Normal' if norm['is_normal'] else 'Not Normal'})\n" |
|
|
|
if 'homoscedasticity' in diagnostics and 'error' not in diagnostics['homoscedasticity']: |
|
hom = diagnostics['homoscedasticity'] |
|
report += f" Homoscedasticity (Breusch-Pagan): p={hom['p_value']:.4f} " |
|
report += f"({'Homoscedastic' if hom['is_homoscedastic'] else 'Heteroscedastic'})\n" |
|
|
|
if 'autocorrelation' in diagnostics and 'error' not in diagnostics['autocorrelation']: |
|
autocorr = diagnostics['autocorrelation'] |
|
report += f" Autocorrelation (Durbin-Watson): {autocorr['statistic']:.4f} " |
|
report += f"({autocorr['interpretation']})\n" |
|
|
|
if 'multicollinearity' in diagnostics and 'error' not in diagnostics['multicollinearity']: |
|
mult = diagnostics['multicollinearity'] |
|
report += f" Multicollinearity (VIF): Mean VIF = {mult['mean_vif']:.2f}\n" |
|
if mult['high_vif_variables']: |
|
report += f" High VIF variables: {', '.join(mult['high_vif_variables'])}\n" |
|
|
|
report += "\n" |
|
|
|
if correlation_results: |
|
report += "CORRELATION ANALYSIS\n" |
|
report += "-" * 30 + "\n" |
|
report += f"Method: {correlation_results['method'].title()}\n" |
|
report += f"Significant Correlations: {len(correlation_results['significant_correlations'])}\n\n" |
|
|
|
|
|
report += f"Top 5 Strongest Correlations:\n" |
|
for i, corr in enumerate(correlation_results['significant_correlations'][:5]): |
|
report += f" {corr['variable1']} ↔ {corr['variable2']}: " |
|
report += f"{corr['correlation']:.4f} ({corr['strength']}, p={corr['p_value']:.4f})\n" |
|
|
|
|
|
if 'pca_analysis' in correlation_results and 'error' not in correlation_results['pca_analysis']: |
|
pca = correlation_results['pca_analysis'] |
|
report += f"\nPrincipal Component Analysis:\n" |
|
report += f" Components to explain 80% variance: {pca['components_to_explain_80_percent']}\n" |
|
report += f" Total components: {pca['n_components']}\n" |
|
|
|
report += "\n" |
|
|
|
if causality_results: |
|
report += "GRANGER CAUSALITY ANALYSIS\n" |
|
report += "-" * 30 + "\n" |
|
|
|
for target, results in causality_results.items(): |
|
if 'error' not in results: |
|
report += f"{target}:\n" |
|
report += f" Is causal: {results['is_causal']}\n" |
|
report += f" Minimum p-value: {results['min_p_value']:.4f}\n" |
|
report += f" Optimal lag: {results['optimal_lag']}\n\n" |
|
|
|
return report |