FREDML / src /core /fred_client.py
Edwin Salguero
Enterprise: Transform to production-grade architecture with FastAPI, Docker, K8s, monitoring, and comprehensive tooling
832348e
#!/usr/bin/env python3
"""
FRED Data Collector v2
A tool for collecting and analyzing Federal Reserve Economic Data (FRED)
using direct API calls instead of the fredapi library
"""
import os
import warnings
from datetime import datetime, timedelta
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import requests
import seaborn as sns
warnings.filterwarnings("ignore")
import os
import sys
sys.path.append(os.path.join(os.path.dirname(__file__), "..", ".."))
from config.settings import (DEFAULT_END_DATE, DEFAULT_START_DATE,
FRED_API_KEY, OUTPUT_DIR, PLOTS_DIR)
class FREDDataCollectorV2:
def __init__(self, api_key=None):
"""Initialize the FRED data collector with API key."""
self.api_key = api_key or FRED_API_KEY
self.base_url = "https://api.stlouisfed.org/fred"
# Create output directories
os.makedirs(OUTPUT_DIR, exist_ok=True)
os.makedirs(PLOTS_DIR, exist_ok=True)
# Common economic indicators
self.indicators = {
"GDP": "GDP", # Gross Domestic Product
"UNRATE": "UNRATE", # Unemployment Rate
"CPIAUCSL": "CPIAUCSL", # Consumer Price Index
"FEDFUNDS": "FEDFUNDS", # Federal Funds Rate
"DGS10": "DGS10", # 10-Year Treasury Rate
"DEXUSEU": "DEXUSEU", # US/Euro Exchange Rate
"PAYEMS": "PAYEMS", # Total Nonfarm Payrolls
"INDPRO": "INDPRO", # Industrial Production
"M2SL": "M2SL", # M2 Money Stock
"PCE": "PCE", # Personal Consumption Expenditures
}
def get_series_info(self, series_id):
"""Get information about a FRED series."""
try:
url = f"{self.base_url}/series"
params = {
"series_id": series_id,
"api_key": self.api_key,
"file_type": "json",
}
response = requests.get(url, params=params)
if response.status_code == 200:
data = response.json()
series = data.get("seriess", [])
if series:
s = series[0]
return {
"id": s["id"],
"title": s["title"],
"units": s.get("units", ""),
"frequency": s.get("frequency", ""),
"last_updated": s.get("last_updated", ""),
"notes": s.get("notes", ""),
}
return None
except Exception as e:
print(f"Error getting info for {series_id}: {e}")
return None
def get_economic_data(self, series_ids, start_date=None, end_date=None):
"""Fetch economic data for specified series."""
start_date = start_date or DEFAULT_START_DATE
end_date = end_date or DEFAULT_END_DATE
data = {}
for series_id in series_ids:
try:
print(f"Fetching data for {series_id}...")
url = f"{self.base_url}/series/observations"
params = {
"series_id": series_id,
"api_key": self.api_key,
"file_type": "json",
"start_date": start_date,
"end_date": end_date,
}
response = requests.get(url, params=params)
if response.status_code == 200:
response_data = response.json()
observations = response_data.get("observations", [])
if observations:
# Convert to pandas Series
dates = []
values = []
for obs in observations:
try:
date = pd.to_datetime(obs["date"])
value = (
float(obs["value"])
if obs["value"] != "."
else np.nan
)
dates.append(date)
values.append(value)
except (ValueError, KeyError):
continue
if dates and values:
series_data = pd.Series(values, index=dates, name=series_id)
data[series_id] = series_data
print(
f"✓ Retrieved {len(series_data)} observations for {series_id}"
)
else:
print(f"✗ No valid data for {series_id}")
else:
print(f"✗ No observations found for {series_id}")
else:
print(f"✗ Error fetching {series_id}: HTTP {response.status_code}")
except Exception as e:
print(f"✗ Error fetching {series_id}: {e}")
return data
def create_dataframe(self, data_dict):
"""Convert dictionary of series data to a pandas DataFrame."""
if not data_dict:
return pd.DataFrame()
# Find the common date range
all_dates = set()
for series in data_dict.values():
all_dates.update(series.index)
# Create a complete date range
if all_dates:
date_range = pd.date_range(min(all_dates), max(all_dates), freq="D")
df = pd.DataFrame(index=date_range)
# Add each series
for series_id, series_data in data_dict.items():
df[series_id] = series_data
df.index.name = "Date"
return df
return pd.DataFrame()
def save_data(self, df, filename):
"""Save data to CSV file."""
if df.empty:
print("No data to save")
return None
filepath = os.path.join(OUTPUT_DIR, filename)
df.to_csv(filepath)
print(f"Data saved to {filepath}")
return filepath
def plot_economic_indicators(self, df, indicators_to_plot=None):
"""Create plots for economic indicators."""
if df.empty:
print("No data to plot")
return
if indicators_to_plot is None:
indicators_to_plot = [col for col in df.columns if col in df.columns]
if not indicators_to_plot:
print("No indicators to plot")
return
# Set up the plotting style
plt.style.use("default")
sns.set_palette("husl")
# Create subplots
n_indicators = len(indicators_to_plot)
fig, axes = plt.subplots(n_indicators, 1, figsize=(15, 4 * n_indicators))
if n_indicators == 1:
axes = [axes]
for i, indicator in enumerate(indicators_to_plot):
if indicator in df.columns:
ax = axes[i]
df[indicator].dropna().plot(ax=ax, linewidth=2)
# Get series info for title
info = self.get_series_info(indicator)
title = f'{indicator} - {info["title"]}' if info else indicator
ax.set_title(title)
ax.set_ylabel("Value")
ax.grid(True, alpha=0.3)
plt.tight_layout()
plot_path = os.path.join(PLOTS_DIR, "economic_indicators.png")
plt.savefig(plot_path, dpi=300, bbox_inches="tight")
plt.show()
print(f"Plot saved to {plot_path}")
def generate_summary_statistics(self, df):
"""Generate summary statistics for the economic data."""
if df.empty:
return pd.DataFrame()
summary = df.describe()
# Add additional statistics
summary.loc["missing_values"] = df.isnull().sum()
summary.loc["missing_percentage"] = (df.isnull().sum() / len(df)) * 100
return summary
def run_analysis(self, series_ids=None, start_date=None, end_date=None):
"""Run a complete analysis of economic indicators."""
if series_ids is None:
series_ids = list(self.indicators.values())
print("=== FRED Economic Data Analysis v2 ===")
print(f"API Key: {self.api_key[:8]}...")
print(
f"Date Range: {start_date or DEFAULT_START_DATE} to {end_date or DEFAULT_END_DATE}"
)
print(f"Series to analyze: {series_ids}")
print("=" * 50)
# Fetch data
data = self.get_economic_data(series_ids, start_date, end_date)
if not data:
print("No data retrieved. Please check your API key and series IDs.")
return None, None
# Create DataFrame
df = self.create_dataframe(data)
if df.empty:
print("No data to analyze")
return None, None
# Save data
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
self.save_data(df, f"fred_economic_data_{timestamp}.csv")
# Generate summary statistics
summary = self.generate_summary_statistics(df)
print("\n=== Summary Statistics ===")
print(summary)
# Create plots
print("\n=== Creating Visualizations ===")
self.plot_economic_indicators(df)
return df, summary
def main():
"""Main function to run the FRED data analysis."""
collector = FREDDataCollectorV2()
# Example: Analyze key economic indicators
key_indicators = ["GDP", "UNRATE", "CPIAUCSL", "FEDFUNDS", "DGS10"]
try:
df, summary = collector.run_analysis(series_ids=key_indicators)
if df is not None:
print("\n=== Analysis Complete ===")
print(f"Data shape: {df.shape}")
print(f"Date range: {df.index.min()} to {df.index.max()}")
else:
print("\n=== Analysis Failed ===")
except Exception as e:
print(f"Error during analysis: {e}")
if __name__ == "__main__":
main()