Edwin Salguero
Enhanced FRED ML with improved Reports & Insights page, fixed alignment analysis, and comprehensive analytics improvements
2469150
""" | |
Enhanced FRED Client | |
Advanced data collection for comprehensive economic indicators | |
""" | |
import logging | |
from datetime import datetime, timedelta | |
from typing import Dict, List, Optional, Union | |
import pandas as pd | |
from fredapi import Fred | |
logger = logging.getLogger(__name__) | |
class EnhancedFREDClient: | |
""" | |
Enhanced FRED API client for comprehensive economic data collection | |
with support for multiple frequencies and advanced data processing | |
""" | |
# Economic indicators mapping | |
ECONOMIC_INDICATORS = { | |
# Output & Activity | |
'GDPC1': 'Real Gross Domestic Product (chained 2012 dollars)', | |
'INDPRO': 'Industrial Production Index', | |
'RSAFS': 'Retail Sales', | |
'TCU': 'Capacity Utilization', | |
'PAYEMS': 'Total Nonfarm Payrolls', | |
# Prices & Inflation | |
'CPIAUCSL': 'Consumer Price Index for All Urban Consumers', | |
'PCE': 'Personal Consumption Expenditures', | |
# Financial & Monetary | |
'FEDFUNDS': 'Federal Funds Rate', | |
'DGS10': '10-Year Treasury Rate', | |
'M2SL': 'M2 Money Stock', | |
# International | |
'DEXUSEU': 'US/Euro Exchange Rate', | |
# Labor | |
'UNRATE': 'Unemployment Rate' | |
} | |
def __init__(self, api_key: str): | |
""" | |
Initialize enhanced FRED client | |
Args: | |
api_key: FRED API key | |
""" | |
self.fred = Fred(api_key=api_key) | |
self.data_cache = {} | |
def fetch_economic_data(self, indicators: List[str] = None, | |
start_date: str = '1990-01-01', | |
end_date: str = None, | |
frequency: str = 'auto') -> pd.DataFrame: | |
""" | |
Fetch comprehensive economic data | |
Args: | |
indicators: List of indicators to fetch. If None, fetch all available | |
start_date: Start date for data collection | |
end_date: End date for data collection. If None, use current date | |
frequency: Data frequency ('auto', 'M', 'Q', 'A') | |
Returns: | |
DataFrame with economic indicators | |
""" | |
if indicators is None: | |
indicators = list(self.ECONOMIC_INDICATORS.keys()) | |
if end_date is None: | |
end_date = datetime.now().strftime('%Y-%m-%d') | |
logger.info(f"Fetching economic data for {len(indicators)} indicators") | |
logger.info(f"Date range: {start_date} to {end_date}") | |
data_dict = {} | |
for indicator in indicators: | |
try: | |
if indicator in self.ECONOMIC_INDICATORS: | |
series_data = self._fetch_series(indicator, start_date, end_date, frequency) | |
if series_data is not None and not series_data.empty: | |
data_dict[indicator] = series_data | |
logger.info(f"Successfully fetched {indicator}: {len(series_data)} observations") | |
else: | |
logger.warning(f"No data available for {indicator}") | |
else: | |
logger.warning(f"Unknown indicator: {indicator}") | |
except Exception as e: | |
logger.error(f"Failed to fetch {indicator}: {e}") | |
if not data_dict: | |
raise ValueError("No data could be fetched for any indicators") | |
# Combine all series into a single DataFrame | |
combined_data = pd.concat(data_dict.values(), axis=1) | |
combined_data.columns = list(data_dict.keys()) | |
# Sort by date | |
combined_data = combined_data.sort_index() | |
logger.info(f"Combined data shape: {combined_data.shape}") | |
logger.info(f"Date range: {combined_data.index.min()} to {combined_data.index.max()}") | |
return combined_data | |
def _fetch_series(self, series_id: str, start_date: str, end_date: str, | |
frequency: str) -> Optional[pd.Series]: | |
""" | |
Fetch individual series with frequency handling | |
Args: | |
series_id: FRED series ID | |
start_date: Start date | |
end_date: End date | |
frequency: Data frequency (for post-processing) | |
Returns: | |
Series data or None if failed | |
""" | |
try: | |
# Fetch data without frequency parameter (FRED API doesn't support it) | |
series = self.fred.get_series( | |
series_id, | |
observation_start=start_date, | |
observation_end=end_date | |
) | |
if series.empty: | |
logger.warning(f"No data returned for {series_id}") | |
return None | |
# Handle frequency conversion if needed | |
if frequency == 'auto': | |
series = self._standardize_frequency(series, series_id) | |
elif frequency == 'Q': | |
# Convert to quarterly if requested | |
series = self._convert_to_quarterly(series, series_id) | |
elif frequency == 'M': | |
# Convert to monthly if requested | |
series = self._convert_to_monthly(series, series_id) | |
return series | |
except Exception as e: | |
logger.error(f"Error fetching {series_id}: {e}") | |
return None | |
def _convert_to_quarterly(self, series: pd.Series, series_id: str) -> pd.Series: | |
"""Convert series to quarterly frequency""" | |
if series_id in ['INDPRO', 'RSAFS', 'TCU', 'PAYEMS', 'CPIAUCSL', 'M2SL']: | |
return series.resample('Q').last() | |
else: | |
return series.resample('Q').mean() | |
def _convert_to_monthly(self, series: pd.Series, series_id: str) -> pd.Series: | |
"""Convert series to monthly frequency""" | |
return series.resample('M').last() | |
def _get_appropriate_frequency(self, series_id: str) -> str: | |
""" | |
Get appropriate frequency for a series based on its characteristics | |
Args: | |
series_id: FRED series ID | |
Returns: | |
Appropriate frequency string | |
""" | |
# Quarterly series | |
quarterly_series = ['GDPC1', 'PCE'] | |
# Monthly series (most common) | |
monthly_series = ['INDPRO', 'RSAFS', 'TCU', 'PAYEMS', 'CPIAUCSL', | |
'FEDFUNDS', 'DGS10', 'M2SL', 'DEXUSEU', 'UNRATE'] | |
if series_id in quarterly_series: | |
return 'Q' | |
elif series_id in monthly_series: | |
return 'M' | |
else: | |
return 'M' # Default to monthly | |
def _standardize_frequency(self, series: pd.Series, series_id: str) -> pd.Series: | |
""" | |
Standardize frequency for consistent analysis | |
Args: | |
series: Time series data | |
series_id: Series ID for context | |
Returns: | |
Standardized series | |
""" | |
# For quarterly analysis, convert monthly to quarterly | |
if series_id in ['INDPRO', 'RSAFS', 'TCU', 'PAYEMS', 'CPIAUCSL', | |
'FEDFUNDS', 'DGS10', 'M2SL', 'DEXUSEU', 'UNRATE']: | |
# Use end-of-quarter values for most series | |
if series_id in ['INDPRO', 'RSAFS', 'TCU', 'PAYEMS', 'CPIAUCSL', 'M2SL']: | |
return series.resample('Q').last() | |
else: | |
# For rates, use mean | |
return series.resample('Q').mean() | |
return series | |
def fetch_quarterly_data(self, indicators: List[str] = None, | |
start_date: str = '1990-01-01', | |
end_date: str = None) -> pd.DataFrame: | |
""" | |
Fetch data standardized to quarterly frequency | |
Args: | |
indicators: List of indicators to fetch | |
start_date: Start date | |
end_date: End date | |
Returns: | |
Quarterly DataFrame | |
""" | |
return self.fetch_economic_data(indicators, start_date, end_date, frequency='Q') | |
def fetch_monthly_data(self, indicators: List[str] = None, | |
start_date: str = '1990-01-01', | |
end_date: str = None) -> pd.DataFrame: | |
""" | |
Fetch data standardized to monthly frequency | |
Args: | |
indicators: List of indicators to fetch | |
start_date: Start date | |
end_date: End date | |
Returns: | |
Monthly DataFrame | |
""" | |
return self.fetch_economic_data(indicators, start_date, end_date, frequency='M') | |
def get_series_info(self, series_id: str) -> Dict: | |
""" | |
Get detailed information about a series | |
Args: | |
series_id: FRED series ID | |
Returns: | |
Dictionary with series information | |
""" | |
try: | |
info = self.fred.get_series_info(series_id) | |
return { | |
'id': info.id, | |
'title': info.title, | |
'units': info.units, | |
'frequency': info.frequency, | |
'seasonal_adjustment': info.seasonal_adjustment, | |
'last_updated': info.last_updated, | |
'notes': info.notes | |
} | |
except Exception as e: | |
logger.error(f"Failed to get info for {series_id}: {e}") | |
return {'error': str(e)} | |
def get_all_series_info(self, indicators: List[str] = None) -> Dict: | |
""" | |
Get information for all indicators | |
Args: | |
indicators: List of indicators. If None, use all available | |
Returns: | |
Dictionary with series information | |
""" | |
if indicators is None: | |
indicators = list(self.ECONOMIC_INDICATORS.keys()) | |
series_info = {} | |
for indicator in indicators: | |
if indicator in self.ECONOMIC_INDICATORS: | |
info = self.get_series_info(indicator) | |
series_info[indicator] = info | |
logger.info(f"Retrieved info for {indicator}") | |
return series_info | |
def validate_data_quality(self, data: pd.DataFrame) -> Dict: | |
""" | |
Validate data quality and check for common issues | |
Args: | |
data: DataFrame with economic indicators | |
Returns: | |
Dictionary with validation results | |
""" | |
validation_results = { | |
'missing_data': {}, | |
'outliers': {}, | |
'data_quality_score': 0.0, | |
'warnings': [], | |
'errors': [] | |
} | |
total_series = len(data.columns) | |
valid_series = 0 | |
for column in data.columns: | |
series = data[column].dropna() | |
if len(series) == 0: | |
validation_results['missing_data'][column] = 'No data available' | |
validation_results['errors'].append(f"{column}: No data available") | |
continue | |
# Check for missing data | |
missing_pct = (data[column].isna().sum() / len(data)) * 100 | |
if missing_pct > 20: | |
validation_results['missing_data'][column] = f"{missing_pct:.1f}% missing" | |
validation_results['warnings'].append(f"{column}: {missing_pct:.1f}% missing data") | |
# Check for outliers using IQR method | |
Q1 = series.quantile(0.25) | |
Q3 = series.quantile(0.75) | |
IQR = Q3 - Q1 | |
lower_bound = Q1 - 1.5 * IQR | |
upper_bound = Q3 + 1.5 * IQR | |
outliers = series[(series < lower_bound) | (series > upper_bound)] | |
outlier_pct = (len(outliers) / len(series)) * 100 | |
if outlier_pct > 5: | |
validation_results['outliers'][column] = f"{outlier_pct:.1f}% outliers" | |
validation_results['warnings'].append(f"{column}: {outlier_pct:.1f}% outliers detected") | |
# Validate scaling for known indicators | |
self._validate_economic_scaling(series, column, validation_results) | |
valid_series += 1 | |
# Calculate overall data quality score | |
if total_series > 0: | |
validation_results['data_quality_score'] = (valid_series / total_series) * 100 | |
return validation_results | |
def _validate_economic_scaling(self, series: pd.Series, indicator: str, validation_results: Dict): | |
""" | |
Validate economic indicator scaling using expected ranges | |
Args: | |
series: Time series data | |
indicator: Indicator name | |
validation_results: Validation results dictionary to update | |
""" | |
# Expected ranges for common economic indicators | |
scaling_ranges = { | |
'GDPC1': (15000, 25000), # Real GDP in billions (2020-2024 range) | |
'INDPRO': (90, 110), # Industrial Production Index | |
'CPIAUCSL': (250, 350), # Consumer Price Index | |
'FEDFUNDS': (0, 10), # Federal Funds Rate (%) | |
'DGS10': (0, 8), # 10-Year Treasury Rate (%) | |
'UNRATE': (3, 15), # Unemployment Rate (%) | |
'PAYEMS': (140000, 160000), # Total Nonfarm Payrolls (thousands) | |
'PCE': (15000, 25000), # Personal Consumption Expenditures (billions) | |
'M2SL': (20000, 25000), # M2 Money Stock (billions) | |
'TCU': (60, 90), # Capacity Utilization (%) | |
'DEXUSEU': (0.8, 1.2), # US/Euro Exchange Rate | |
'RSAFS': (400000, 600000) # Retail Sales (millions) | |
} | |
if indicator in scaling_ranges: | |
expected_min, expected_max = scaling_ranges[indicator] | |
# Check if values fall within expected range | |
vals = series.dropna() | |
if len(vals) > 0: | |
mask = (vals < expected_min) | (vals > expected_max) | |
outlier_pct = mask.mean() * 100 | |
if outlier_pct > 5: | |
validation_results['warnings'].append( | |
f"{indicator}: {outlier_pct:.1f}% of data outside expected range " | |
f"[{expected_min}, {expected_max}]. Check for scaling/unit issues." | |
) | |
else: | |
logger.debug(f"{indicator}: data within expected range [{expected_min}, {expected_max}]") | |
def generate_data_summary(self, data: pd.DataFrame) -> str: | |
""" | |
Generate comprehensive data summary report | |
Args: | |
data: Economic data DataFrame | |
Returns: | |
Formatted summary report | |
""" | |
quality_report = self.validate_data_quality(data) | |
summary = "ECONOMIC DATA SUMMARY\n" | |
summary += "=" * 50 + "\n\n" | |
summary += f"Dataset Overview:\n" | |
summary += f" Total Series: {quality_report['total_series']}\n" | |
summary += f" Total Observations: {quality_report['total_observations']}\n" | |
summary += f" Date Range: {quality_report['date_range']['start']} to {quality_report['date_range']['end']}\n\n" | |
summary += f"Series Information:\n" | |
for indicator in data.columns: | |
if indicator in self.ECONOMIC_INDICATORS: | |
summary += f" {indicator}: {self.ECONOMIC_INDICATORS[indicator]}\n" | |
summary += "\n" | |
summary += f"Data Quality:\n" | |
for series, metrics in quality_report['missing_data'].items(): | |
summary += f" {series}: {metrics['completeness']:.1f}% complete " | |
summary += f"({metrics['missing_count']} missing observations)\n" | |
summary += "\n" | |
return summary |