|
|
|
""" |
|
FRED ML Lambda Function |
|
AWS Lambda function for processing economic data analysis |
|
""" |
|
|
|
import json |
|
import os |
|
import boto3 |
|
import pandas as pd |
|
import numpy as np |
|
import matplotlib.pyplot as plt |
|
import seaborn as sns |
|
import io |
|
import base64 |
|
from datetime import datetime, timedelta |
|
import requests |
|
from typing import Dict, List, Optional, Tuple |
|
import logging |
|
|
|
|
|
logger = logging.getLogger() |
|
logger.setLevel(logging.INFO) |
|
|
|
|
|
s3_client = boto3.client('s3') |
|
lambda_client = boto3.client('lambda') |
|
|
|
|
|
FRED_API_KEY = os.environ.get('FRED_API_KEY') |
|
S3_BUCKET = os.environ.get('S3_BUCKET', 'fredmlv1') |
|
FRED_BASE_URL = "https://api.stlouisfed.org/fred" |
|
|
|
|
|
ECONOMIC_INDICATORS = { |
|
"GDP": "GDP", |
|
"UNRATE": "UNRATE", |
|
"CPIAUCSL": "CPIAUCSL", |
|
"FEDFUNDS": "FEDFUNDS", |
|
"DGS10": "DGS10", |
|
"DEXUSEU": "DEXUSEU", |
|
"PAYEMS": "PAYEMS", |
|
"INDPRO": "INDPRO", |
|
"M2SL": "M2SL", |
|
"PCE": "PCE" |
|
} |
|
|
|
def get_fred_data(series_id: str, start_date: str, end_date: str) -> Optional[pd.Series]: |
|
"""Fetch data from FRED API""" |
|
try: |
|
url = f"{FRED_BASE_URL}/series/observations" |
|
params = { |
|
"series_id": series_id, |
|
"api_key": FRED_API_KEY, |
|
"file_type": "json", |
|
"start_date": start_date, |
|
"end_date": end_date, |
|
} |
|
|
|
response = requests.get(url, params=params) |
|
|
|
if response.status_code == 200: |
|
data = response.json() |
|
observations = data.get("observations", []) |
|
|
|
if observations: |
|
dates = [] |
|
values = [] |
|
|
|
for obs in observations: |
|
try: |
|
date = pd.to_datetime(obs["date"]) |
|
value = float(obs["value"]) if obs["value"] != "." else np.nan |
|
dates.append(date) |
|
values.append(value) |
|
except (ValueError, KeyError): |
|
continue |
|
|
|
if dates and values: |
|
return pd.Series(values, index=dates, name=series_id) |
|
|
|
logger.error(f"Failed to fetch data for {series_id}") |
|
return None |
|
|
|
except Exception as e: |
|
logger.error(f"Error fetching data for {series_id}: {e}") |
|
return None |
|
|
|
def create_dataframe(series_data: Dict[str, pd.Series]) -> pd.DataFrame: |
|
"""Create DataFrame from series data""" |
|
if not series_data: |
|
return pd.DataFrame() |
|
|
|
|
|
all_dates = set() |
|
for series in series_data.values(): |
|
if series is not None: |
|
all_dates.update(series.index) |
|
|
|
if all_dates: |
|
date_range = pd.date_range(min(all_dates), max(all_dates), freq='D') |
|
df = pd.DataFrame(index=date_range) |
|
|
|
for series_id, series_data in series_data.items(): |
|
if series_data is not None: |
|
df[series_id] = series_data |
|
|
|
df.index.name = 'Date' |
|
return df |
|
|
|
return pd.DataFrame() |
|
|
|
def generate_statistics(df: pd.DataFrame) -> Dict: |
|
"""Generate statistical summary""" |
|
if df.empty: |
|
return {} |
|
|
|
stats = {} |
|
for column in df.columns: |
|
if column != 'Date': |
|
series = df[column].dropna() |
|
if not series.empty: |
|
stats[column] = { |
|
'mean': float(series.mean()), |
|
'std': float(series.std()), |
|
'min': float(series.min()), |
|
'max': float(series.max()), |
|
'count': int(len(series)), |
|
'missing': int(df[column].isna().sum()) |
|
} |
|
|
|
return stats |
|
|
|
def create_correlation_matrix(df: pd.DataFrame) -> Dict: |
|
"""Create correlation matrix""" |
|
if df.empty: |
|
return {} |
|
|
|
corr_matrix = df.corr() |
|
return corr_matrix.to_dict() |
|
|
|
def create_visualizations(df: pd.DataFrame, s3_bucket: str, report_id: str) -> List[str]: |
|
"""Create and upload visualizations to S3""" |
|
if df.empty: |
|
return [] |
|
|
|
visualization_keys = [] |
|
|
|
try: |
|
|
|
plt.figure(figsize=(12, 8)) |
|
for column in df.columns: |
|
if column != 'Date': |
|
plt.plot(df.index, df[column], label=column, linewidth=2) |
|
|
|
plt.title('Economic Indicators Time Series') |
|
plt.xlabel('Date') |
|
plt.ylabel('Value') |
|
plt.legend() |
|
plt.grid(True, alpha=0.3) |
|
plt.xticks(rotation=45) |
|
plt.tight_layout() |
|
|
|
|
|
img_buffer = io.BytesIO() |
|
plt.savefig(img_buffer, format='png', dpi=300, bbox_inches='tight') |
|
img_buffer.seek(0) |
|
|
|
time_series_key = f"visualizations/{report_id}/time_series.png" |
|
s3_client.put_object( |
|
Bucket=s3_bucket, |
|
Key=time_series_key, |
|
Body=img_buffer.getvalue(), |
|
ContentType='image/png' |
|
) |
|
visualization_keys.append(time_series_key) |
|
plt.close() |
|
|
|
|
|
if len(df.columns) > 1: |
|
plt.figure(figsize=(10, 8)) |
|
corr_matrix = df.corr() |
|
sns.heatmap(corr_matrix, annot=True, cmap='coolwarm', center=0) |
|
plt.title('Correlation Matrix') |
|
plt.tight_layout() |
|
|
|
img_buffer = io.BytesIO() |
|
plt.savefig(img_buffer, format='png', dpi=300, bbox_inches='tight') |
|
img_buffer.seek(0) |
|
|
|
correlation_key = f"visualizations/{report_id}/correlation.png" |
|
s3_client.put_object( |
|
Bucket=s3_bucket, |
|
Key=correlation_key, |
|
Body=img_buffer.getvalue(), |
|
ContentType='image/png' |
|
) |
|
visualization_keys.append(correlation_key) |
|
plt.close() |
|
|
|
|
|
for column in df.columns: |
|
if column != 'Date': |
|
plt.figure(figsize=(8, 6)) |
|
plt.hist(df[column].dropna(), bins=30, alpha=0.7, edgecolor='black') |
|
plt.title(f'Distribution of {column}') |
|
plt.xlabel('Value') |
|
plt.ylabel('Frequency') |
|
plt.grid(True, alpha=0.3) |
|
plt.tight_layout() |
|
|
|
img_buffer = io.BytesIO() |
|
plt.savefig(img_buffer, format='png', dpi=300, bbox_inches='tight') |
|
img_buffer.seek(0) |
|
|
|
dist_key = f"visualizations/{report_id}/distribution_{column}.png" |
|
s3_client.put_object( |
|
Bucket=s3_bucket, |
|
Key=dist_key, |
|
Body=img_buffer.getvalue(), |
|
ContentType='image/png' |
|
) |
|
visualization_keys.append(dist_key) |
|
plt.close() |
|
|
|
except Exception as e: |
|
logger.error(f"Error creating visualizations: {e}") |
|
|
|
return visualization_keys |
|
|
|
def save_report_to_s3(report_data: Dict, s3_bucket: str, report_id: str) -> str: |
|
"""Save report data to S3""" |
|
try: |
|
report_key = f"reports/{report_id}/report.json" |
|
|
|
s3_client.put_object( |
|
Bucket=s3_bucket, |
|
Key=report_key, |
|
Body=json.dumps(report_data, default=str), |
|
ContentType='application/json' |
|
) |
|
|
|
return report_key |
|
except Exception as e: |
|
logger.error(f"Error saving report to S3: {e}") |
|
raise |
|
|
|
def lambda_handler(event: Dict, context) -> Dict: |
|
"""Main Lambda handler function""" |
|
try: |
|
logger.info(f"Received event: {json.dumps(event)}") |
|
|
|
|
|
if isinstance(event.get('body'), str): |
|
payload = json.loads(event['body']) |
|
else: |
|
payload = event |
|
|
|
indicators = payload.get('indicators', ['GDP', 'UNRATE', 'CPIAUCSL']) |
|
start_date = payload.get('start_date', (datetime.now() - timedelta(days=365)).strftime('%Y-%m-%d')) |
|
end_date = payload.get('end_date', datetime.now().strftime('%Y-%m-%d')) |
|
options = payload.get('options', {}) |
|
|
|
|
|
report_id = f"report_{datetime.now().strftime('%Y%m%d_%H%M%S')}" |
|
|
|
logger.info(f"Processing analysis for indicators: {indicators}") |
|
logger.info(f"Date range: {start_date} to {end_date}") |
|
|
|
|
|
series_data = {} |
|
for indicator in indicators: |
|
if indicator in ECONOMIC_INDICATORS: |
|
series_id = ECONOMIC_INDICATORS[indicator] |
|
data = get_fred_data(series_id, start_date, end_date) |
|
if data is not None: |
|
series_data[indicator] = data |
|
logger.info(f"Successfully fetched data for {indicator}") |
|
else: |
|
logger.warning(f"Failed to fetch data for {indicator}") |
|
|
|
|
|
df = create_dataframe(series_data) |
|
|
|
if df.empty: |
|
raise ValueError("No data available for analysis") |
|
|
|
|
|
report_data = { |
|
'report_id': report_id, |
|
'timestamp': datetime.now().isoformat(), |
|
'indicators': indicators, |
|
'start_date': start_date, |
|
'end_date': end_date, |
|
'total_observations': len(df), |
|
'data_shape': df.shape, |
|
'statistics': generate_statistics(df), |
|
'correlation_matrix': create_correlation_matrix(df), |
|
'data': df.reset_index().to_dict('records') |
|
} |
|
|
|
|
|
if options.get('visualizations', True): |
|
visualization_keys = create_visualizations(df, S3_BUCKET, report_id) |
|
report_data['visualizations'] = visualization_keys |
|
|
|
|
|
report_key = save_report_to_s3(report_data, S3_BUCKET, report_id) |
|
|
|
logger.info(f"Analysis completed successfully. Report saved to: {report_key}") |
|
|
|
return { |
|
'statusCode': 200, |
|
'body': json.dumps({ |
|
'status': 'success', |
|
'report_id': report_id, |
|
'report_key': report_key, |
|
'message': 'Analysis completed successfully' |
|
}) |
|
} |
|
|
|
except Exception as e: |
|
logger.error(f"Error in lambda_handler: {e}") |
|
return { |
|
'statusCode': 500, |
|
'body': json.dumps({ |
|
'status': 'error', |
|
'message': str(e) |
|
}) |
|
} |