FREDML / src /analysis /advanced_analytics.py
Edwin Salguero
Enterprise: Transform to production-grade architecture with FastAPI, Docker, K8s, monitoring, and comprehensive tooling
832348e
#!/usr/bin/env python3
"""
Advanced Analytics Module for FRED Economic Data
Performs comprehensive statistical analysis, modeling, and insights extraction.
"""
import warnings
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import seaborn as sns
import statsmodels.api as sm
from scipy import stats
from sklearn.cluster import KMeans
from sklearn.decomposition import PCA
from sklearn.linear_model import LinearRegression
from sklearn.metrics import mean_squared_error, r2_score, silhouette_score
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from statsmodels.stats.diagnostic import het_breuschpagan
from statsmodels.stats.outliers_influence import variance_inflation_factor
from statsmodels.tsa.arima.model import ARIMA
from statsmodels.tsa.seasonal import seasonal_decompose
warnings.filterwarnings("ignore")
class AdvancedAnalytics:
"""
Comprehensive analytics class for FRED economic data.
Performs EDA, statistical modeling, segmentation, and time series analysis.
"""
def __init__(self, data_path=None, df=None):
"""Initialize with data path or DataFrame."""
if df is not None:
self.df = df
elif data_path:
self.df = pd.read_csv(data_path, index_col=0, parse_dates=True)
else:
raise ValueError("Must provide either data_path or DataFrame")
self.scaler = StandardScaler()
self.results = {}
def perform_eda(self):
"""Perform comprehensive Exploratory Data Analysis."""
print("=" * 60)
print("EXPLORATORY DATA ANALYSIS")
print("=" * 60)
# Basic info
print(f"\nDataset Shape: {self.df.shape}")
print(f"Date Range: {self.df.index.min()} to {self.df.index.max()}")
print(f"Variables: {list(self.df.columns)}")
# Descriptive statistics
print("\n" + "=" * 40)
print("DESCRIPTIVE STATISTICS")
print("=" * 40)
desc_stats = self.df.describe()
print(desc_stats)
# Skewness and Kurtosis
print("\n" + "=" * 40)
print("SKEWNESS AND KURTOSIS")
print("=" * 40)
skewness = self.df.skew()
kurtosis = self.df.kurtosis()
for col in self.df.columns:
print(f"{col}:")
print(f" Skewness: {skewness[col]:.3f}")
print(f" Kurtosis: {kurtosis[col]:.3f}")
# Correlation Analysis
print("\n" + "=" * 40)
print("CORRELATION ANALYSIS")
print("=" * 40)
# Pearson correlation
pearson_corr = self.df.corr(method="pearson")
print("\nPearson Correlation Matrix:")
print(pearson_corr.round(3))
# Spearman correlation
spearman_corr = self.df.corr(method="spearman")
print("\nSpearman Correlation Matrix:")
print(spearman_corr.round(3))
# Store results
self.results["eda"] = {
"descriptive_stats": desc_stats,
"skewness": skewness,
"kurtosis": kurtosis,
"pearson_corr": pearson_corr,
"spearman_corr": spearman_corr,
}
return self.results["eda"]
def perform_dimensionality_reduction(self, method="pca", n_components=2):
"""Perform dimensionality reduction for visualization."""
print("\n" + "=" * 40)
print(f"DIMENSIONALITY REDUCTION ({method.upper()})")
print("=" * 40)
# Prepare data (remove NaN values)
df_clean = self.df.dropna()
if method.lower() == "pca":
# PCA
pca = PCA(n_components=n_components)
scaled_data = self.scaler.fit_transform(df_clean)
pca_result = pca.fit_transform(scaled_data)
print(f"Explained variance ratio: {pca.explained_variance_ratio_}")
print(f"Total explained variance: {sum(pca.explained_variance_ratio_):.3f}")
# Create DataFrame with PCA results
pca_df = pd.DataFrame(
pca_result,
columns=[f"PC{i+1}" for i in range(n_components)],
index=df_clean.index,
)
self.results["pca"] = {
"components": pca_df,
"explained_variance": pca.explained_variance_ratio_,
"feature_importance": pd.DataFrame(
pca.components_.T,
columns=[f"PC{i+1}" for i in range(n_components)],
index=df_clean.columns,
),
}
return self.results["pca"]
return None
def perform_statistical_modeling(self, target_var="GDP", test_size=0.2):
"""Perform linear regression with comprehensive diagnostics."""
print("\n" + "=" * 40)
print("STATISTICAL MODELING - LINEAR REGRESSION")
print("=" * 40)
# Prepare data
df_clean = self.df.dropna()
if target_var not in df_clean.columns:
print(f"Target variable '{target_var}' not found in dataset")
return None
# Prepare features and target
feature_cols = [col for col in df_clean.columns if col != target_var]
X = df_clean[feature_cols]
y = df_clean[target_var]
# Split data
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=test_size, random_state=42
)
# Fit linear regression
model = LinearRegression()
model.fit(X_train, y_train)
# Predictions
y_pred_train = model.predict(X_train)
y_pred_test = model.predict(X_test)
# Model performance
r2_train = r2_score(y_train, y_pred_train)
r2_test = r2_score(y_test, y_pred_test)
rmse_train = np.sqrt(mean_squared_error(y_train, y_pred_train))
rmse_test = np.sqrt(mean_squared_error(y_test, y_pred_test))
print(f"\nModel Performance:")
print(f"R² (Training): {r2_train:.4f}")
print(f"R² (Test): {r2_test:.4f}")
print(f"RMSE (Training): {rmse_train:.4f}")
print(f"RMSE (Test): {rmse_test:.4f}")
# Coefficients
print(f"\nCoefficients:")
for feature, coef in zip(feature_cols, model.coef_):
print(f" {feature}: {coef:.4f}")
print(f" Intercept: {model.intercept_:.4f}")
# Statistical significance using statsmodels
X_with_const = sm.add_constant(X_train)
model_sm = sm.OLS(y_train, X_with_const).fit()
print(f"\nStatistical Significance:")
print(model_sm.summary().tables[1])
# Assumption tests
print(f"\n" + "=" * 30)
print("REGRESSION ASSUMPTIONS")
print("=" * 30)
# 1. Normality of residuals
residuals = y_train - y_pred_train
_, p_value_norm = stats.normaltest(residuals)
print(f"Normality test (p-value): {p_value_norm:.4f}")
# 2. Multicollinearity (VIF)
vif_data = []
for i in range(X_train.shape[1]):
try:
vif = variance_inflation_factor(X_train.values, i)
vif_data.append(vif)
except:
vif_data.append(np.nan)
print(f"\nVariance Inflation Factors:")
for feature, vif in zip(feature_cols, vif_data):
print(f" {feature}: {vif:.3f}")
# 3. Homoscedasticity
try:
_, p_value_het = het_breuschpagan(residuals, X_with_const)
print(f"\nHomoscedasticity test (p-value): {p_value_het:.4f}")
except:
p_value_het = np.nan
print(f"\nHomoscedasticity test failed")
# Store results
self.results["regression"] = {
"model": model,
"model_sm": model_sm,
"performance": {
"r2_train": r2_train,
"r2_test": r2_test,
"rmse_train": rmse_train,
"rmse_test": rmse_test,
},
"coefficients": dict(zip(feature_cols, model.coef_)),
"assumptions": {
"normality_p": p_value_norm,
"homoscedasticity_p": p_value_het,
"vif": dict(zip(feature_cols, vif_data)),
},
}
return self.results["regression"]
def perform_clustering(self, max_k=10):
"""Perform clustering analysis with optimal k selection."""
print("\n" + "=" * 40)
print("CLUSTERING ANALYSIS")
print("=" * 40)
# Prepare data
df_clean = self.df.dropna()
if df_clean.shape[0] < 10 or df_clean.shape[1] < 2:
print(
"Not enough data for clustering (need at least 10 rows and 2 columns after dropna). Skipping."
)
self.results["clustering"] = None
return None
try:
scaled_data = self.scaler.fit_transform(df_clean)
except Exception as e:
print(f"Scaling failed: {e}")
self.results["clustering"] = None
return None
# Find optimal k using elbow method and silhouette score
inertias = []
silhouette_scores = []
k_range = range(2, min(max_k + 1, len(df_clean) // 10 + 1))
if len(k_range) < 2:
print("Not enough data for multiple clusters. Skipping clustering.")
self.results["clustering"] = None
return None
try:
for k in k_range:
kmeans = KMeans(n_clusters=k, random_state=42)
kmeans.fit(scaled_data)
inertias.append(kmeans.inertia_)
silhouette_scores.append(silhouette_score(scaled_data, kmeans.labels_))
# Plot elbow curve only if there are results
if inertias and silhouette_scores:
plt.figure(figsize=(12, 4))
plt.subplot(1, 2, 1)
plt.plot(list(k_range), inertias, "bo-")
plt.xlabel("Number of Clusters (k)")
plt.ylabel("Inertia")
plt.title("Elbow Method")
plt.grid(True)
plt.subplot(1, 2, 2)
plt.plot(list(k_range), silhouette_scores, "ro-")
plt.xlabel("Number of Clusters (k)")
plt.ylabel("Silhouette Score")
plt.title("Silhouette Analysis")
plt.grid(True)
plt.tight_layout()
plt.savefig(
"data/exports/clustering_analysis.png", dpi=300, bbox_inches="tight"
)
plt.show()
# Choose optimal k (highest silhouette score)
optimal_k = list(k_range)[np.argmax(silhouette_scores)]
print(f"Optimal number of clusters: {optimal_k}")
print(f"Best silhouette score: {max(silhouette_scores):.3f}")
# Perform clustering with optimal k
kmeans_optimal = KMeans(n_clusters=optimal_k, random_state=42)
cluster_labels = kmeans_optimal.fit_predict(scaled_data)
# Add cluster labels to data
df_clustered = df_clean.copy()
df_clustered["Cluster"] = cluster_labels
# Cluster characteristics
print(f"\nCluster Characteristics:")
cluster_stats = df_clustered.groupby("Cluster").agg(["mean", "std"])
print(cluster_stats.round(3))
# Store results
self.results["clustering"] = {
"optimal_k": optimal_k,
"silhouette_score": max(silhouette_scores),
"cluster_labels": cluster_labels,
"clustered_data": df_clustered,
"cluster_stats": cluster_stats,
"inertias": inertias,
"silhouette_scores": silhouette_scores,
}
return self.results["clustering"]
except Exception as e:
print(f"Clustering failed: {e}")
self.results["clustering"] = None
return None
def perform_time_series_analysis(self, target_var="GDP"):
"""Perform comprehensive time series analysis."""
print("\n" + "=" * 40)
print("TIME SERIES ANALYSIS")
print("=" * 40)
if target_var not in self.df.columns:
print(f"Target variable '{target_var}' not found")
self.results["time_series"] = None
return None
# Prepare time series data
ts_data = self.df[target_var].dropna()
if len(ts_data) < 50:
print(
"Insufficient data for time series analysis (need at least 50 points). Skipping."
)
self.results["time_series"] = None
return None
print(f"Time series length: {len(ts_data)} observations")
print(f"Date range: {ts_data.index.min()} to {ts_data.index.max()}")
# 1. Time Series Decomposition
print(f"\nTime Series Decomposition:")
try:
# Resample to monthly data if needed
if ts_data.index.freq is None:
ts_monthly = ts_data.resample("M").mean()
else:
ts_monthly = ts_data
decomposition = seasonal_decompose(ts_monthly, model="additive", period=12)
# Plot decomposition
fig, axes = plt.subplots(4, 1, figsize=(12, 10))
decomposition.observed.plot(ax=axes[0], title="Original Time Series")
decomposition.trend.plot(ax=axes[1], title="Trend")
decomposition.seasonal.plot(ax=axes[2], title="Seasonality")
decomposition.resid.plot(ax=axes[3], title="Residuals")
plt.tight_layout()
plt.savefig(
"data/exports/time_series_decomposition.png",
dpi=300,
bbox_inches="tight",
)
plt.show()
except Exception as e:
print(f"Decomposition failed: {e}")
# 2. ARIMA Modeling
print(f"\nARIMA Modeling:")
try:
# Fit ARIMA model
model = ARIMA(ts_monthly, order=(1, 1, 1))
fitted_model = model.fit()
print(f"ARIMA Model Summary:")
print(fitted_model.summary())
# Forecast
forecast_steps = min(12, len(ts_monthly) // 4)
forecast = fitted_model.forecast(steps=forecast_steps)
conf_int = fitted_model.get_forecast(steps=forecast_steps).conf_int()
# Plot forecast
plt.figure(figsize=(12, 6))
ts_monthly.plot(label="Historical Data")
forecast.plot(label="Forecast", color="red")
plt.fill_between(
forecast.index,
conf_int.iloc[:, 0],
conf_int.iloc[:, 1],
alpha=0.3,
color="red",
label="Confidence Interval",
)
plt.title(f"{target_var} - ARIMA Forecast")
plt.legend()
plt.grid(True)
plt.tight_layout()
plt.savefig(
"data/exports/time_series_forecast.png", dpi=300, bbox_inches="tight"
)
plt.show()
# Store results
self.results["time_series"] = {
"model": fitted_model,
"forecast": forecast,
"confidence_intervals": conf_int,
"decomposition": decomposition if "decomposition" in locals() else None,
}
except Exception as e:
print(f"ARIMA modeling failed: {e}")
self.results["time_series"] = None
return self.results.get("time_series")
def generate_insights_report(self):
"""Generate comprehensive insights report in layman's terms."""
print("\n" + "=" * 60)
print("COMPREHENSIVE INSIGHTS REPORT")
print("=" * 60)
insights = []
# EDA Insights
if "eda" in self.results and self.results["eda"] is not None:
insights.append("EXPLORATORY DATA ANALYSIS INSIGHTS:")
insights.append("-" * 40)
# Correlation insights
pearson_corr = self.results["eda"]["pearson_corr"]
high_corr_pairs = []
for i in range(len(pearson_corr.columns)):
for j in range(i + 1, len(pearson_corr.columns)):
corr_val = pearson_corr.iloc[i, j]
if abs(corr_val) > 0.7:
high_corr_pairs.append(
(pearson_corr.columns[i], pearson_corr.columns[j], corr_val)
)
if high_corr_pairs:
insights.append("Strong correlations found:")
for var1, var2, corr in high_corr_pairs:
insights.append(f" • {var1} and {var2}: {corr:.3f}")
else:
insights.append(
"No strong correlations (>0.7) found between variables."
)
else:
insights.append("EDA could not be performed or returned no results.")
# Regression Insights
if "regression" in self.results and self.results["regression"] is not None:
insights.append("\nREGRESSION MODEL INSIGHTS:")
insights.append("-" * 40)
reg_results = self.results["regression"]
r2_test = reg_results["performance"]["r2_test"]
insights.append(f"Model Performance:")
insights.append(
f" • The model explains {r2_test:.1%} of the variation in the target variable"
)
if r2_test > 0.7:
insights.append(" • This is considered a good model fit")
elif r2_test > 0.5:
insights.append(" • This is considered a moderate model fit")
else:
insights.append(" • This model has limited predictive power")
# Assumption insights
assumptions = reg_results["assumptions"]
if assumptions["normality_p"] > 0.05:
insights.append(
" • Residuals are normally distributed (assumption met)"
)
else:
insights.append(
" • Residuals are not normally distributed (assumption violated)"
)
else:
insights.append(
"Regression modeling could not be performed or returned no results."
)
# Clustering Insights
if "clustering" in self.results and self.results["clustering"] is not None:
insights.append("\nCLUSTERING INSIGHTS:")
insights.append("-" * 40)
cluster_results = self.results["clustering"]
optimal_k = cluster_results["optimal_k"]
silhouette_score = cluster_results["silhouette_score"]
insights.append(f"Optimal number of clusters: {optimal_k}")
insights.append(f"Cluster quality score: {silhouette_score:.3f}")
if silhouette_score > 0.5:
insights.append(" • Clusters are well-separated and distinct")
elif silhouette_score > 0.3:
insights.append(" • Clusters show moderate separation")
else:
insights.append(" • Clusters may not be well-defined")
else:
insights.append("Clustering could not be performed or returned no results.")
# Time Series Insights
if "time_series" in self.results and self.results["time_series"] is not None:
insights.append("\nTIME SERIES INSIGHTS:")
insights.append("-" * 40)
insights.append(
" • Time series decomposition shows trend, seasonality, and random components"
)
insights.append(
" • ARIMA model provides future forecasts with confidence intervals"
)
insights.append(
" • Forecasts can be used for planning and decision-making"
)
else:
insights.append(
"Time series analysis could not be performed or returned no results."
)
# Print insights
for insight in insights:
print(insight)
# Save insights to file
with open("data/exports/insights_report.txt", "w") as f:
f.write("\n".join(insights))
return insights
def run_complete_analysis(self):
"""Run the complete advanced analytics workflow."""
print("Starting comprehensive advanced analytics...")
# 1. EDA
self.perform_eda()
# 2. Dimensionality reduction
self.perform_dimensionality_reduction()
# 3. Statistical modeling
self.perform_statistical_modeling()
# 4. Clustering
self.perform_clustering()
# 5. Time series analysis
self.perform_time_series_analysis()
# 6. Generate insights
self.generate_insights_report()
print("\n" + "=" * 60)
print("ANALYSIS COMPLETE!")
print("=" * 60)
print("Check the following outputs:")
print(" • data/exports/insights_report.txt - Comprehensive insights")
print(" • data/exports/clustering_analysis.png - Clustering results")
print(
" • data/exports/time_series_decomposition.png - Time series decomposition"
)
print(" • data/exports/time_series_forecast.png - Time series forecast")
return self.results