FREDML / src /core /enhanced_fred_client.py
Edwin Salguero
Enhanced FRED ML with improved Reports & Insights page, fixed alignment analysis, and comprehensive analytics improvements
2469150
"""
Enhanced FRED Client
Advanced data collection for comprehensive economic indicators
"""
import logging
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Union
import pandas as pd
from fredapi import Fred
logger = logging.getLogger(__name__)
class EnhancedFREDClient:
"""
Enhanced FRED API client for comprehensive economic data collection
with support for multiple frequencies and advanced data processing
"""
# Economic indicators mapping
ECONOMIC_INDICATORS = {
# Output & Activity
'GDPC1': 'Real Gross Domestic Product (chained 2012 dollars)',
'INDPRO': 'Industrial Production Index',
'RSAFS': 'Retail Sales',
'TCU': 'Capacity Utilization',
'PAYEMS': 'Total Nonfarm Payrolls',
# Prices & Inflation
'CPIAUCSL': 'Consumer Price Index for All Urban Consumers',
'PCE': 'Personal Consumption Expenditures',
# Financial & Monetary
'FEDFUNDS': 'Federal Funds Rate',
'DGS10': '10-Year Treasury Rate',
'M2SL': 'M2 Money Stock',
# International
'DEXUSEU': 'US/Euro Exchange Rate',
# Labor
'UNRATE': 'Unemployment Rate'
}
def __init__(self, api_key: str):
"""
Initialize enhanced FRED client
Args:
api_key: FRED API key
"""
self.fred = Fred(api_key=api_key)
self.data_cache = {}
def fetch_economic_data(self, indicators: List[str] = None,
start_date: str = '1990-01-01',
end_date: str = None,
frequency: str = 'auto') -> pd.DataFrame:
"""
Fetch comprehensive economic data
Args:
indicators: List of indicators to fetch. If None, fetch all available
start_date: Start date for data collection
end_date: End date for data collection. If None, use current date
frequency: Data frequency ('auto', 'M', 'Q', 'A')
Returns:
DataFrame with economic indicators
"""
if indicators is None:
indicators = list(self.ECONOMIC_INDICATORS.keys())
if end_date is None:
end_date = datetime.now().strftime('%Y-%m-%d')
logger.info(f"Fetching economic data for {len(indicators)} indicators")
logger.info(f"Date range: {start_date} to {end_date}")
data_dict = {}
for indicator in indicators:
try:
if indicator in self.ECONOMIC_INDICATORS:
series_data = self._fetch_series(indicator, start_date, end_date, frequency)
if series_data is not None and not series_data.empty:
data_dict[indicator] = series_data
logger.info(f"Successfully fetched {indicator}: {len(series_data)} observations")
else:
logger.warning(f"No data available for {indicator}")
else:
logger.warning(f"Unknown indicator: {indicator}")
except Exception as e:
logger.error(f"Failed to fetch {indicator}: {e}")
if not data_dict:
raise ValueError("No data could be fetched for any indicators")
# Combine all series into a single DataFrame
combined_data = pd.concat(data_dict.values(), axis=1)
combined_data.columns = list(data_dict.keys())
# Sort by date
combined_data = combined_data.sort_index()
logger.info(f"Combined data shape: {combined_data.shape}")
logger.info(f"Date range: {combined_data.index.min()} to {combined_data.index.max()}")
return combined_data
def _fetch_series(self, series_id: str, start_date: str, end_date: str,
frequency: str) -> Optional[pd.Series]:
"""
Fetch individual series with frequency handling
Args:
series_id: FRED series ID
start_date: Start date
end_date: End date
frequency: Data frequency (for post-processing)
Returns:
Series data or None if failed
"""
try:
# Fetch data without frequency parameter (FRED API doesn't support it)
series = self.fred.get_series(
series_id,
observation_start=start_date,
observation_end=end_date
)
if series.empty:
logger.warning(f"No data returned for {series_id}")
return None
# Handle frequency conversion if needed
if frequency == 'auto':
series = self._standardize_frequency(series, series_id)
elif frequency == 'Q':
# Convert to quarterly if requested
series = self._convert_to_quarterly(series, series_id)
elif frequency == 'M':
# Convert to monthly if requested
series = self._convert_to_monthly(series, series_id)
return series
except Exception as e:
logger.error(f"Error fetching {series_id}: {e}")
return None
def _convert_to_quarterly(self, series: pd.Series, series_id: str) -> pd.Series:
"""Convert series to quarterly frequency"""
if series_id in ['INDPRO', 'RSAFS', 'TCU', 'PAYEMS', 'CPIAUCSL', 'M2SL']:
return series.resample('Q').last()
else:
return series.resample('Q').mean()
def _convert_to_monthly(self, series: pd.Series, series_id: str) -> pd.Series:
"""Convert series to monthly frequency"""
return series.resample('M').last()
def _get_appropriate_frequency(self, series_id: str) -> str:
"""
Get appropriate frequency for a series based on its characteristics
Args:
series_id: FRED series ID
Returns:
Appropriate frequency string
"""
# Quarterly series
quarterly_series = ['GDPC1', 'PCE']
# Monthly series (most common)
monthly_series = ['INDPRO', 'RSAFS', 'TCU', 'PAYEMS', 'CPIAUCSL',
'FEDFUNDS', 'DGS10', 'M2SL', 'DEXUSEU', 'UNRATE']
if series_id in quarterly_series:
return 'Q'
elif series_id in monthly_series:
return 'M'
else:
return 'M' # Default to monthly
def _standardize_frequency(self, series: pd.Series, series_id: str) -> pd.Series:
"""
Standardize frequency for consistent analysis
Args:
series: Time series data
series_id: Series ID for context
Returns:
Standardized series
"""
# For quarterly analysis, convert monthly to quarterly
if series_id in ['INDPRO', 'RSAFS', 'TCU', 'PAYEMS', 'CPIAUCSL',
'FEDFUNDS', 'DGS10', 'M2SL', 'DEXUSEU', 'UNRATE']:
# Use end-of-quarter values for most series
if series_id in ['INDPRO', 'RSAFS', 'TCU', 'PAYEMS', 'CPIAUCSL', 'M2SL']:
return series.resample('Q').last()
else:
# For rates, use mean
return series.resample('Q').mean()
return series
def fetch_quarterly_data(self, indicators: List[str] = None,
start_date: str = '1990-01-01',
end_date: str = None) -> pd.DataFrame:
"""
Fetch data standardized to quarterly frequency
Args:
indicators: List of indicators to fetch
start_date: Start date
end_date: End date
Returns:
Quarterly DataFrame
"""
return self.fetch_economic_data(indicators, start_date, end_date, frequency='Q')
def fetch_monthly_data(self, indicators: List[str] = None,
start_date: str = '1990-01-01',
end_date: str = None) -> pd.DataFrame:
"""
Fetch data standardized to monthly frequency
Args:
indicators: List of indicators to fetch
start_date: Start date
end_date: End date
Returns:
Monthly DataFrame
"""
return self.fetch_economic_data(indicators, start_date, end_date, frequency='M')
def get_series_info(self, series_id: str) -> Dict:
"""
Get detailed information about a series
Args:
series_id: FRED series ID
Returns:
Dictionary with series information
"""
try:
info = self.fred.get_series_info(series_id)
return {
'id': info.id,
'title': info.title,
'units': info.units,
'frequency': info.frequency,
'seasonal_adjustment': info.seasonal_adjustment,
'last_updated': info.last_updated,
'notes': info.notes
}
except Exception as e:
logger.error(f"Failed to get info for {series_id}: {e}")
return {'error': str(e)}
def get_all_series_info(self, indicators: List[str] = None) -> Dict:
"""
Get information for all indicators
Args:
indicators: List of indicators. If None, use all available
Returns:
Dictionary with series information
"""
if indicators is None:
indicators = list(self.ECONOMIC_INDICATORS.keys())
series_info = {}
for indicator in indicators:
if indicator in self.ECONOMIC_INDICATORS:
info = self.get_series_info(indicator)
series_info[indicator] = info
logger.info(f"Retrieved info for {indicator}")
return series_info
def validate_data_quality(self, data: pd.DataFrame) -> Dict:
"""
Validate data quality and check for common issues
Args:
data: DataFrame with economic indicators
Returns:
Dictionary with validation results
"""
validation_results = {
'missing_data': {},
'outliers': {},
'data_quality_score': 0.0,
'warnings': [],
'errors': []
}
total_series = len(data.columns)
valid_series = 0
for column in data.columns:
series = data[column].dropna()
if len(series) == 0:
validation_results['missing_data'][column] = 'No data available'
validation_results['errors'].append(f"{column}: No data available")
continue
# Check for missing data
missing_pct = (data[column].isna().sum() / len(data)) * 100
if missing_pct > 20:
validation_results['missing_data'][column] = f"{missing_pct:.1f}% missing"
validation_results['warnings'].append(f"{column}: {missing_pct:.1f}% missing data")
# Check for outliers using IQR method
Q1 = series.quantile(0.25)
Q3 = series.quantile(0.75)
IQR = Q3 - Q1
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR
outliers = series[(series < lower_bound) | (series > upper_bound)]
outlier_pct = (len(outliers) / len(series)) * 100
if outlier_pct > 5:
validation_results['outliers'][column] = f"{outlier_pct:.1f}% outliers"
validation_results['warnings'].append(f"{column}: {outlier_pct:.1f}% outliers detected")
# Validate scaling for known indicators
self._validate_economic_scaling(series, column, validation_results)
valid_series += 1
# Calculate overall data quality score
if total_series > 0:
validation_results['data_quality_score'] = (valid_series / total_series) * 100
return validation_results
def _validate_economic_scaling(self, series: pd.Series, indicator: str, validation_results: Dict):
"""
Validate economic indicator scaling using expected ranges
Args:
series: Time series data
indicator: Indicator name
validation_results: Validation results dictionary to update
"""
# Expected ranges for common economic indicators
scaling_ranges = {
'GDPC1': (15000, 25000), # Real GDP in billions (2020-2024 range)
'INDPRO': (90, 110), # Industrial Production Index
'CPIAUCSL': (250, 350), # Consumer Price Index
'FEDFUNDS': (0, 10), # Federal Funds Rate (%)
'DGS10': (0, 8), # 10-Year Treasury Rate (%)
'UNRATE': (3, 15), # Unemployment Rate (%)
'PAYEMS': (140000, 160000), # Total Nonfarm Payrolls (thousands)
'PCE': (15000, 25000), # Personal Consumption Expenditures (billions)
'M2SL': (20000, 25000), # M2 Money Stock (billions)
'TCU': (60, 90), # Capacity Utilization (%)
'DEXUSEU': (0.8, 1.2), # US/Euro Exchange Rate
'RSAFS': (400000, 600000) # Retail Sales (millions)
}
if indicator in scaling_ranges:
expected_min, expected_max = scaling_ranges[indicator]
# Check if values fall within expected range
vals = series.dropna()
if len(vals) > 0:
mask = (vals < expected_min) | (vals > expected_max)
outlier_pct = mask.mean() * 100
if outlier_pct > 5:
validation_results['warnings'].append(
f"{indicator}: {outlier_pct:.1f}% of data outside expected range "
f"[{expected_min}, {expected_max}]. Check for scaling/unit issues."
)
else:
logger.debug(f"{indicator}: data within expected range [{expected_min}, {expected_max}]")
def generate_data_summary(self, data: pd.DataFrame) -> str:
"""
Generate comprehensive data summary report
Args:
data: Economic data DataFrame
Returns:
Formatted summary report
"""
quality_report = self.validate_data_quality(data)
summary = "ECONOMIC DATA SUMMARY\n"
summary += "=" * 50 + "\n\n"
summary += f"Dataset Overview:\n"
summary += f" Total Series: {quality_report['total_series']}\n"
summary += f" Total Observations: {quality_report['total_observations']}\n"
summary += f" Date Range: {quality_report['date_range']['start']} to {quality_report['date_range']['end']}\n\n"
summary += f"Series Information:\n"
for indicator in data.columns:
if indicator in self.ECONOMIC_INDICATORS:
summary += f" {indicator}: {self.ECONOMIC_INDICATORS[indicator]}\n"
summary += "\n"
summary += f"Data Quality:\n"
for series, metrics in quality_report['missing_data'].items():
summary += f" {series}: {metrics['completeness']:.1f}% complete "
summary += f"({metrics['missing_count']} missing observations)\n"
summary += "\n"
return summary