Edwin Salguero
Enterprise: Transform to production-grade architecture with FastAPI, Docker, K8s, monitoring, and comprehensive tooling
832348e
#!/usr/bin/env python3 | |
""" | |
FRED Data Collector v2 | |
A tool for collecting and analyzing Federal Reserve Economic Data (FRED) | |
using direct API calls instead of the fredapi library | |
""" | |
import os | |
import warnings | |
from datetime import datetime, timedelta | |
import matplotlib.pyplot as plt | |
import numpy as np | |
import pandas as pd | |
import requests | |
import seaborn as sns | |
warnings.filterwarnings("ignore") | |
import os | |
import sys | |
sys.path.append(os.path.join(os.path.dirname(__file__), "..", "..")) | |
from config.settings import (DEFAULT_END_DATE, DEFAULT_START_DATE, | |
FRED_API_KEY, OUTPUT_DIR, PLOTS_DIR) | |
class FREDDataCollectorV2: | |
def __init__(self, api_key=None): | |
"""Initialize the FRED data collector with API key.""" | |
self.api_key = api_key or FRED_API_KEY | |
self.base_url = "https://api.stlouisfed.org/fred" | |
# Create output directories | |
os.makedirs(OUTPUT_DIR, exist_ok=True) | |
os.makedirs(PLOTS_DIR, exist_ok=True) | |
# Common economic indicators | |
self.indicators = { | |
"GDP": "GDP", # Gross Domestic Product | |
"UNRATE": "UNRATE", # Unemployment Rate | |
"CPIAUCSL": "CPIAUCSL", # Consumer Price Index | |
"FEDFUNDS": "FEDFUNDS", # Federal Funds Rate | |
"DGS10": "DGS10", # 10-Year Treasury Rate | |
"DEXUSEU": "DEXUSEU", # US/Euro Exchange Rate | |
"PAYEMS": "PAYEMS", # Total Nonfarm Payrolls | |
"INDPRO": "INDPRO", # Industrial Production | |
"M2SL": "M2SL", # M2 Money Stock | |
"PCE": "PCE", # Personal Consumption Expenditures | |
} | |
def get_series_info(self, series_id): | |
"""Get information about a FRED series.""" | |
try: | |
url = f"{self.base_url}/series" | |
params = { | |
"series_id": series_id, | |
"api_key": self.api_key, | |
"file_type": "json", | |
} | |
response = requests.get(url, params=params) | |
if response.status_code == 200: | |
data = response.json() | |
series = data.get("seriess", []) | |
if series: | |
s = series[0] | |
return { | |
"id": s["id"], | |
"title": s["title"], | |
"units": s.get("units", ""), | |
"frequency": s.get("frequency", ""), | |
"last_updated": s.get("last_updated", ""), | |
"notes": s.get("notes", ""), | |
} | |
return None | |
except Exception as e: | |
print(f"Error getting info for {series_id}: {e}") | |
return None | |
def get_economic_data(self, series_ids, start_date=None, end_date=None): | |
"""Fetch economic data for specified series.""" | |
start_date = start_date or DEFAULT_START_DATE | |
end_date = end_date or DEFAULT_END_DATE | |
data = {} | |
for series_id in series_ids: | |
try: | |
print(f"Fetching data for {series_id}...") | |
url = f"{self.base_url}/series/observations" | |
params = { | |
"series_id": series_id, | |
"api_key": self.api_key, | |
"file_type": "json", | |
"start_date": start_date, | |
"end_date": end_date, | |
} | |
response = requests.get(url, params=params) | |
if response.status_code == 200: | |
response_data = response.json() | |
observations = response_data.get("observations", []) | |
if observations: | |
# Convert to pandas Series | |
dates = [] | |
values = [] | |
for obs in observations: | |
try: | |
date = pd.to_datetime(obs["date"]) | |
value = ( | |
float(obs["value"]) | |
if obs["value"] != "." | |
else np.nan | |
) | |
dates.append(date) | |
values.append(value) | |
except (ValueError, KeyError): | |
continue | |
if dates and values: | |
series_data = pd.Series(values, index=dates, name=series_id) | |
data[series_id] = series_data | |
print( | |
f"✓ Retrieved {len(series_data)} observations for {series_id}" | |
) | |
else: | |
print(f"✗ No valid data for {series_id}") | |
else: | |
print(f"✗ No observations found for {series_id}") | |
else: | |
print(f"✗ Error fetching {series_id}: HTTP {response.status_code}") | |
except Exception as e: | |
print(f"✗ Error fetching {series_id}: {e}") | |
return data | |
def create_dataframe(self, data_dict): | |
"""Convert dictionary of series data to a pandas DataFrame.""" | |
if not data_dict: | |
return pd.DataFrame() | |
# Find the common date range | |
all_dates = set() | |
for series in data_dict.values(): | |
all_dates.update(series.index) | |
# Create a complete date range | |
if all_dates: | |
date_range = pd.date_range(min(all_dates), max(all_dates), freq="D") | |
df = pd.DataFrame(index=date_range) | |
# Add each series | |
for series_id, series_data in data_dict.items(): | |
df[series_id] = series_data | |
df.index.name = "Date" | |
return df | |
return pd.DataFrame() | |
def save_data(self, df, filename): | |
"""Save data to CSV file.""" | |
if df.empty: | |
print("No data to save") | |
return None | |
filepath = os.path.join(OUTPUT_DIR, filename) | |
df.to_csv(filepath) | |
print(f"Data saved to {filepath}") | |
return filepath | |
def plot_economic_indicators(self, df, indicators_to_plot=None): | |
"""Create plots for economic indicators.""" | |
if df.empty: | |
print("No data to plot") | |
return | |
if indicators_to_plot is None: | |
indicators_to_plot = [col for col in df.columns if col in df.columns] | |
if not indicators_to_plot: | |
print("No indicators to plot") | |
return | |
# Set up the plotting style | |
plt.style.use("default") | |
sns.set_palette("husl") | |
# Create subplots | |
n_indicators = len(indicators_to_plot) | |
fig, axes = plt.subplots(n_indicators, 1, figsize=(15, 4 * n_indicators)) | |
if n_indicators == 1: | |
axes = [axes] | |
for i, indicator in enumerate(indicators_to_plot): | |
if indicator in df.columns: | |
ax = axes[i] | |
df[indicator].dropna().plot(ax=ax, linewidth=2) | |
# Get series info for title | |
info = self.get_series_info(indicator) | |
title = f'{indicator} - {info["title"]}' if info else indicator | |
ax.set_title(title) | |
ax.set_ylabel("Value") | |
ax.grid(True, alpha=0.3) | |
plt.tight_layout() | |
plot_path = os.path.join(PLOTS_DIR, "economic_indicators.png") | |
plt.savefig(plot_path, dpi=300, bbox_inches="tight") | |
plt.show() | |
print(f"Plot saved to {plot_path}") | |
def generate_summary_statistics(self, df): | |
"""Generate summary statistics for the economic data.""" | |
if df.empty: | |
return pd.DataFrame() | |
summary = df.describe() | |
# Add additional statistics | |
summary.loc["missing_values"] = df.isnull().sum() | |
summary.loc["missing_percentage"] = (df.isnull().sum() / len(df)) * 100 | |
return summary | |
def run_analysis(self, series_ids=None, start_date=None, end_date=None): | |
"""Run a complete analysis of economic indicators.""" | |
if series_ids is None: | |
series_ids = list(self.indicators.values()) | |
print("=== FRED Economic Data Analysis v2 ===") | |
print(f"API Key: {self.api_key[:8]}...") | |
print( | |
f"Date Range: {start_date or DEFAULT_START_DATE} to {end_date or DEFAULT_END_DATE}" | |
) | |
print(f"Series to analyze: {series_ids}") | |
print("=" * 50) | |
# Fetch data | |
data = self.get_economic_data(series_ids, start_date, end_date) | |
if not data: | |
print("No data retrieved. Please check your API key and series IDs.") | |
return None, None | |
# Create DataFrame | |
df = self.create_dataframe(data) | |
if df.empty: | |
print("No data to analyze") | |
return None, None | |
# Save data | |
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
self.save_data(df, f"fred_economic_data_{timestamp}.csv") | |
# Generate summary statistics | |
summary = self.generate_summary_statistics(df) | |
print("\n=== Summary Statistics ===") | |
print(summary) | |
# Create plots | |
print("\n=== Creating Visualizations ===") | |
self.plot_economic_indicators(df) | |
return df, summary | |
def main(): | |
"""Main function to run the FRED data analysis.""" | |
collector = FREDDataCollectorV2() | |
# Example: Analyze key economic indicators | |
key_indicators = ["GDP", "UNRATE", "CPIAUCSL", "FEDFUNDS", "DGS10"] | |
try: | |
df, summary = collector.run_analysis(series_ids=key_indicators) | |
if df is not None: | |
print("\n=== Analysis Complete ===") | |
print(f"Data shape: {df.shape}") | |
print(f"Date range: {df.index.min()} to {df.index.max()}") | |
else: | |
print("\n=== Analysis Failed ===") | |
except Exception as e: | |
print(f"Error during analysis: {e}") | |
if __name__ == "__main__": | |
main() | |