Edwin Salguero
Prepare for Streamlit Cloud deployment - Add deployment files, fix clustering chart error, update requirements
6ce20d9
#!/usr/bin/env python3 | |
""" | |
Chart Generator for FRED ML | |
Creates comprehensive economic visualizations and stores them in S3 | |
""" | |
import io | |
import json | |
import os | |
from datetime import datetime | |
from typing import Dict, List, Optional, Tuple | |
import boto3 | |
import matplotlib.pyplot as plt | |
import numpy as np | |
import pandas as pd | |
import plotly.express as px | |
import plotly.graph_objects as go | |
import seaborn as sns | |
from plotly.subplots import make_subplots | |
from sklearn.decomposition import PCA | |
from sklearn.preprocessing import StandardScaler | |
# Use hardcoded defaults to avoid import issues | |
DEFAULT_REGION = 'us-east-1' | |
# Set style for matplotlib | |
plt.style.use('seaborn-v0_8') | |
sns.set_palette("husl") | |
class ChartGenerator: | |
"""Generate comprehensive economic visualizations""" | |
def __init__(self, s3_bucket: str = 'fredmlv1', aws_region: str = None): | |
self.s3_bucket = s3_bucket | |
if aws_region is None: | |
aws_region = DEFAULT_REGION | |
self.s3_client = boto3.client('s3', region_name=aws_region) | |
self.chart_paths = [] | |
def create_time_series_chart(self, df: pd.DataFrame, title: str = "Economic Indicators") -> str: | |
"""Create time series chart and upload to S3""" | |
try: | |
fig, ax = plt.subplots(figsize=(15, 8)) | |
for column in df.columns: | |
if column != 'Date': | |
ax.plot(df.index, df[column], label=column, linewidth=2) | |
ax.set_title(title, fontsize=16, fontweight='bold') | |
ax.set_xlabel('Date', fontsize=12) | |
ax.set_ylabel('Value', fontsize=12) | |
ax.legend(fontsize=10) | |
ax.grid(True, alpha=0.3) | |
plt.xticks(rotation=45) | |
plt.tight_layout() | |
# Save to bytes | |
img_buffer = io.BytesIO() | |
plt.savefig(img_buffer, format='png', dpi=300, bbox_inches='tight') | |
img_buffer.seek(0) | |
# Upload to S3 | |
chart_key = f"visualizations/time_series_{datetime.now().strftime('%Y%m%d_%H%M%S')}.png" | |
self.s3_client.put_object( | |
Bucket=self.s3_bucket, | |
Key=chart_key, | |
Body=img_buffer.getvalue(), | |
ContentType='image/png' | |
) | |
plt.close() | |
self.chart_paths.append(chart_key) | |
return chart_key | |
except Exception as e: | |
print(f"Error creating time series chart: {e}") | |
return None | |
def create_correlation_heatmap(self, df: pd.DataFrame) -> str: | |
"""Create correlation heatmap and upload to S3""" | |
try: | |
corr_matrix = df.corr() | |
fig, ax = plt.subplots(figsize=(12, 10)) | |
sns.heatmap(corr_matrix, annot=True, cmap='coolwarm', center=0, | |
square=True, linewidths=0.5, cbar_kws={"shrink": .8}) | |
plt.title('Economic Indicators Correlation Matrix', fontsize=16, fontweight='bold') | |
plt.tight_layout() | |
# Save to bytes | |
img_buffer = io.BytesIO() | |
plt.savefig(img_buffer, format='png', dpi=300, bbox_inches='tight') | |
img_buffer.seek(0) | |
# Upload to S3 | |
chart_key = f"visualizations/correlation_heatmap_{datetime.now().strftime('%Y%m%d_%H%M%S')}.png" | |
self.s3_client.put_object( | |
Bucket=self.s3_bucket, | |
Key=chart_key, | |
Body=img_buffer.getvalue(), | |
ContentType='image/png' | |
) | |
plt.close() | |
self.chart_paths.append(chart_key) | |
return chart_key | |
except Exception as e: | |
print(f"Error creating correlation heatmap: {e}") | |
return None | |
def create_distribution_charts(self, df: pd.DataFrame) -> List[str]: | |
"""Create distribution charts for each indicator""" | |
chart_keys = [] | |
try: | |
for column in df.columns: | |
if column != 'Date': | |
fig, ax = plt.subplots(figsize=(10, 6)) | |
# Histogram with KDE | |
sns.histplot(df[column].dropna(), kde=True, ax=ax) | |
ax.set_title(f'Distribution of {column}', fontsize=14, fontweight='bold') | |
ax.set_xlabel(column, fontsize=12) | |
ax.set_ylabel('Frequency', fontsize=12) | |
plt.tight_layout() | |
# Save to bytes | |
img_buffer = io.BytesIO() | |
plt.savefig(img_buffer, format='png', dpi=300, bbox_inches='tight') | |
img_buffer.seek(0) | |
# Upload to S3 | |
chart_key = f"visualizations/distribution_{column}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.png" | |
self.s3_client.put_object( | |
Bucket=self.s3_bucket, | |
Key=chart_key, | |
Body=img_buffer.getvalue(), | |
ContentType='image/png' | |
) | |
plt.close() | |
chart_keys.append(chart_key) | |
self.chart_paths.append(chart_key) | |
return chart_keys | |
except Exception as e: | |
print(f"Error creating distribution charts: {e}") | |
return [] | |
def create_pca_visualization(self, df: pd.DataFrame, n_components: int = 2) -> str: | |
"""Create PCA visualization and upload to S3""" | |
try: | |
# Prepare data | |
df_clean = df.dropna() | |
scaler = StandardScaler() | |
scaled_data = scaler.fit_transform(df_clean) | |
# Perform PCA | |
pca = PCA(n_components=n_components) | |
pca_result = pca.fit_transform(scaled_data) | |
# Create visualization | |
fig, ax = plt.subplots(figsize=(12, 8)) | |
if n_components == 2: | |
scatter = ax.scatter(pca_result[:, 0], pca_result[:, 1], alpha=0.6) | |
ax.set_xlabel(f'PC1 ({pca.explained_variance_ratio_[0]:.1%} variance)', fontsize=12) | |
ax.set_ylabel(f'PC2 ({pca.explained_variance_ratio_[1]:.1%} variance)', fontsize=12) | |
else: | |
# For 3D or more, show first two components | |
scatter = ax.scatter(pca_result[:, 0], pca_result[:, 1], alpha=0.6) | |
ax.set_xlabel(f'PC1 ({pca.explained_variance_ratio_[0]:.1%} variance)', fontsize=12) | |
ax.set_ylabel(f'PC2 ({pca.explained_variance_ratio_[1]:.1%} variance)', fontsize=12) | |
ax.set_title('PCA Visualization of Economic Indicators', fontsize=16, fontweight='bold') | |
ax.grid(True, alpha=0.3) | |
plt.tight_layout() | |
# Save to bytes | |
img_buffer = io.BytesIO() | |
plt.savefig(img_buffer, format='png', dpi=300, bbox_inches='tight') | |
img_buffer.seek(0) | |
# Upload to S3 | |
chart_key = f"visualizations/pca_visualization_{datetime.now().strftime('%Y%m%d_%H%M%S')}.png" | |
self.s3_client.put_object( | |
Bucket=self.s3_bucket, | |
Key=chart_key, | |
Body=img_buffer.getvalue(), | |
ContentType='image/png' | |
) | |
plt.close() | |
self.chart_paths.append(chart_key) | |
return chart_key | |
except Exception as e: | |
print(f"Error creating PCA visualization: {e}") | |
return None | |
def create_forecast_chart(self, historical_data: pd.Series, forecast_data: List[float], | |
title: str = "Economic Forecast") -> str: | |
"""Create forecast chart and upload to S3""" | |
try: | |
fig, ax = plt.subplots(figsize=(15, 8)) | |
# Plot historical data | |
ax.plot(historical_data.index, historical_data.values, | |
label='Historical', linewidth=2, color='blue') | |
# Plot forecast | |
forecast_index = pd.date_range( | |
start=historical_data.index[-1] + pd.DateOffset(months=1), | |
periods=len(forecast_data), | |
freq='M' | |
) | |
ax.plot(forecast_index, forecast_data, | |
label='Forecast', linewidth=2, color='red', linestyle='--') | |
ax.set_title(title, fontsize=16, fontweight='bold') | |
ax.set_xlabel('Date', fontsize=12) | |
ax.set_ylabel('Value', fontsize=12) | |
ax.legend(fontsize=12) | |
ax.grid(True, alpha=0.3) | |
plt.xticks(rotation=45) | |
plt.tight_layout() | |
# Save to bytes | |
img_buffer = io.BytesIO() | |
plt.savefig(img_buffer, format='png', dpi=300, bbox_inches='tight') | |
img_buffer.seek(0) | |
# Upload to S3 | |
chart_key = f"visualizations/forecast_{datetime.now().strftime('%Y%m%d_%H%M%S')}.png" | |
self.s3_client.put_object( | |
Bucket=self.s3_bucket, | |
Key=chart_key, | |
Body=img_buffer.getvalue(), | |
ContentType='image/png' | |
) | |
plt.close() | |
self.chart_paths.append(chart_key) | |
return chart_key | |
except Exception as e: | |
print(f"Error creating forecast chart: {e}") | |
return None | |
def create_regression_diagnostics(self, y_true: List[float], y_pred: List[float], | |
residuals: List[float]) -> str: | |
"""Create regression diagnostics chart and upload to S3""" | |
try: | |
fig, axes = plt.subplots(2, 2, figsize=(15, 12)) | |
# Actual vs Predicted | |
axes[0, 0].scatter(y_true, y_pred, alpha=0.6) | |
axes[0, 0].plot([min(y_true), max(y_true)], [min(y_true), max(y_true)], 'r--', lw=2) | |
axes[0, 0].set_xlabel('Actual Values') | |
axes[0, 0].set_ylabel('Predicted Values') | |
axes[0, 0].set_title('Actual vs Predicted') | |
axes[0, 0].grid(True, alpha=0.3) | |
# Residuals vs Predicted | |
axes[0, 1].scatter(y_pred, residuals, alpha=0.6) | |
axes[0, 1].axhline(y=0, color='r', linestyle='--') | |
axes[0, 1].set_xlabel('Predicted Values') | |
axes[0, 1].set_ylabel('Residuals') | |
axes[0, 1].set_title('Residuals vs Predicted') | |
axes[0, 1].grid(True, alpha=0.3) | |
# Residuals histogram | |
axes[1, 0].hist(residuals, bins=20, alpha=0.7, edgecolor='black') | |
axes[1, 0].set_xlabel('Residuals') | |
axes[1, 0].set_ylabel('Frequency') | |
axes[1, 0].set_title('Residuals Distribution') | |
axes[1, 0].grid(True, alpha=0.3) | |
# Q-Q plot | |
from scipy import stats | |
stats.probplot(residuals, dist="norm", plot=axes[1, 1]) | |
axes[1, 1].set_title('Q-Q Plot of Residuals') | |
axes[1, 1].grid(True, alpha=0.3) | |
plt.tight_layout() | |
# Save to bytes | |
img_buffer = io.BytesIO() | |
plt.savefig(img_buffer, format='png', dpi=300, bbox_inches='tight') | |
img_buffer.seek(0) | |
# Upload to S3 | |
chart_key = f"visualizations/regression_diagnostics_{datetime.now().strftime('%Y%m%d_%H%M%S')}.png" | |
self.s3_client.put_object( | |
Bucket=self.s3_bucket, | |
Key=chart_key, | |
Body=img_buffer.getvalue(), | |
ContentType='image/png' | |
) | |
plt.close() | |
self.chart_paths.append(chart_key) | |
return chart_key | |
except Exception as e: | |
print(f"Error creating regression diagnostics: {e}") | |
return None | |
def create_clustering_chart(self, df: pd.DataFrame, n_clusters: int = 3) -> str: | |
"""Create clustering visualization and upload to S3""" | |
try: | |
from sklearn.cluster import KMeans | |
# Prepare data | |
df_clean = df.dropna() | |
scaler = StandardScaler() | |
scaled_data = scaler.fit_transform(df_clean) | |
# Perform clustering | |
kmeans = KMeans(n_clusters=n_clusters, random_state=42) | |
clusters = kmeans.fit_predict(scaled_data) | |
# PCA for visualization | |
pca = PCA(n_components=2) | |
pca_result = pca.fit_transform(scaled_data) | |
# Create visualization | |
fig, ax = plt.subplots(figsize=(12, 8)) | |
scatter = ax.scatter(pca_result[:, 0], pca_result[:, 1], | |
c=clusters, cmap='viridis', alpha=0.6) | |
# Add cluster centers | |
centers_pca = pca.transform(kmeans.cluster_centers_) | |
ax.scatter(centers_pca[:, 0], centers_pca[:, 1], | |
c='red', marker='x', s=200, linewidths=3, label='Cluster Centers') | |
ax.set_title(f'K-Means Clustering (k={n_clusters})', fontsize=16, fontweight='bold') | |
ax.set_xlabel(f'PC1 ({pca.explained_variance_ratio_[0]:.1%} variance)', fontsize=12) | |
ax.set_ylabel(f'PC2 ({pca.explained_variance_ratio_[1]:.1%} variance)', fontsize=12) | |
ax.legend() | |
ax.grid(True, alpha=0.3) | |
plt.tight_layout() | |
# Save to bytes | |
img_buffer = io.BytesIO() | |
plt.savefig(img_buffer, format='png', dpi=300, bbox_inches='tight') | |
img_buffer.seek(0) | |
# Upload to S3 | |
chart_key = f"visualizations/clustering_{datetime.now().strftime('%Y%m%d_%H%M%S')}.png" | |
self.s3_client.put_object( | |
Bucket=self.s3_bucket, | |
Key=chart_key, | |
Body=img_buffer.getvalue(), | |
ContentType='image/png' | |
) | |
plt.close() | |
self.chart_paths.append(chart_key) | |
return chart_key | |
except Exception as e: | |
print(f"Error creating clustering chart: {e}") | |
return None | |
def generate_comprehensive_visualizations(self, df: pd.DataFrame, analysis_type: str = "comprehensive") -> Dict[str, str]: | |
"""Generate comprehensive visualizations based on analysis type""" | |
visualizations = {} | |
try: | |
# Always create time series and correlation charts | |
visualizations['time_series'] = self.create_time_series_chart(df) | |
visualizations['correlation'] = self.create_correlation_heatmap(df) | |
visualizations['distributions'] = self.create_distribution_charts(df) | |
if analysis_type in ["comprehensive", "statistical"]: | |
# Add PCA visualization | |
visualizations['pca'] = self.create_pca_visualization(df) | |
# Add clustering | |
visualizations['clustering'] = self.create_clustering_chart(df) | |
if analysis_type in ["comprehensive", "forecasting"]: | |
# Add forecast visualization (using sample data) | |
sample_series = df.iloc[:, 0] if not df.empty else pd.Series([1, 2, 3, 4, 5]) | |
sample_forecast = [sample_series.iloc[-1] * 1.02, sample_series.iloc[-1] * 1.04] | |
visualizations['forecast'] = self.create_forecast_chart(sample_series, sample_forecast) | |
# Store visualization metadata | |
metadata = { | |
'analysis_type': analysis_type, | |
'timestamp': datetime.now().isoformat(), | |
'charts_generated': list(visualizations.keys()), | |
's3_bucket': self.s3_bucket | |
} | |
# Upload metadata | |
metadata_key = f"visualizations/metadata_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json" | |
self.s3_client.put_object( | |
Bucket=self.s3_bucket, | |
Key=metadata_key, | |
Body=json.dumps(metadata, indent=2), | |
ContentType='application/json' | |
) | |
return visualizations | |
except Exception as e: | |
print(f"Error generating comprehensive visualizations: {e}") | |
return {} | |
def get_chart_url(self, chart_key: str) -> str: | |
"""Get public URL for a chart""" | |
try: | |
return f"https://{self.s3_bucket}.s3.amazonaws.com/{chart_key}" | |
except Exception as e: | |
print(f"Error generating chart URL: {e}") | |
return None | |
def list_available_charts(self) -> List[Dict]: | |
"""List all available charts in S3""" | |
try: | |
response = self.s3_client.list_objects_v2( | |
Bucket=self.s3_bucket, | |
Prefix='visualizations/' | |
) | |
charts = [] | |
if 'Contents' in response: | |
for obj in response['Contents']: | |
if obj['Key'].endswith('.png'): | |
charts.append({ | |
'key': obj['Key'], | |
'last_modified': obj['LastModified'], | |
'size': obj['Size'], | |
'url': self.get_chart_url(obj['Key']) | |
}) | |
return sorted(charts, key=lambda x: x['last_modified'], reverse=True) | |
except Exception as e: | |
print(f"Error listing charts: {e}") | |
return [] |