Edwin Salguero
Enhanced FRED ML with improved Reports & Insights page, fixed alignment analysis, and comprehensive analytics improvements
2469150
""" | |
Mathematical Fixes Module | |
Addresses key mathematical issues in economic data analysis: | |
1. Unit normalization and scaling | |
2. Frequency alignment and resampling | |
3. Correct growth rate calculation | |
4. Stationarity enforcement | |
5. Forecast period scaling | |
6. Safe error metrics | |
""" | |
import numpy as np | |
import pandas as pd | |
from typing import Dict, List, Tuple, Optional | |
import logging | |
logger = logging.getLogger(__name__) | |
class MathematicalFixes: | |
""" | |
Comprehensive mathematical fixes for economic data analysis | |
""" | |
def __init__(self): | |
"""Initialize mathematical fixes""" | |
self.frequency_map = { | |
'D': 30, # Daily -> 30 periods per quarter | |
'M': 3, # Monthly -> 3 periods per quarter | |
'Q': 1 # Quarterly -> 1 period per quarter | |
} | |
# Unit normalization factors - CORRECTED based on actual FRED data | |
self.unit_factors = { | |
'GDPC1': 1, # FRED GDPC1 is already in correct units (billions) | |
'INDPRO': 1, # Index, no change | |
'RSAFS': 1e3, # FRED RSAFS is in millions, convert to billions | |
'CPIAUCSL': 1, # Index, no change (should be ~316, not 21.9) | |
'FEDFUNDS': 1, # Percent, no change | |
'DGS10': 1, # Percent, no change | |
'UNRATE': 1, # Percent, no change | |
'PAYEMS': 1e3, # Convert to thousands | |
'PCE': 1e9, # Convert to billions | |
'M2SL': 1e9, # Convert to billions | |
'TCU': 1, # Percent, no change | |
'DEXUSEU': 1 # Exchange rate, no change | |
} | |
def normalize_units(self, data: pd.DataFrame) -> pd.DataFrame: | |
""" | |
Normalize units across all economic indicators | |
Args: | |
data: DataFrame with economic indicators | |
Returns: | |
DataFrame with normalized units | |
""" | |
logger.info("Normalizing units across economic indicators") | |
normalized_data = data.copy() | |
for column in data.columns: | |
if column in self.unit_factors: | |
factor = self.unit_factors[column] | |
if factor != 1: # Only convert if factor is not 1 | |
normalized_data[column] = data[column] * factor | |
logger.debug(f"Normalized {column} by factor {factor}") | |
else: | |
# Keep original values for factors of 1 | |
normalized_data[column] = data[column] | |
logger.debug(f"Kept {column} as original value") | |
return normalized_data | |
def align_frequencies(self, data: pd.DataFrame, target_freq: str = 'Q') -> pd.DataFrame: | |
""" | |
Align all series to a common frequency | |
Args: | |
data: DataFrame with economic indicators | |
target_freq: Target frequency ('D', 'M', 'Q') | |
Returns: | |
DataFrame with aligned frequencies | |
""" | |
logger.info(f"Aligning frequencies to {target_freq}") | |
aligned_data = pd.DataFrame() | |
for column in data.columns: | |
series = data[column].dropna() | |
if not series.empty: | |
# Resample to target frequency | |
if target_freq == 'Q': | |
# For quarterly, use mean for most series, last value for rates | |
if column in ['FEDFUNDS', 'DGS10', 'UNRATE', 'TCU']: | |
resampled = series.resample('QE').last() | |
else: | |
resampled = series.resample('QE').mean() | |
elif target_freq == 'M': | |
# For monthly, use mean for most series, last value for rates | |
if column in ['FEDFUNDS', 'DGS10', 'UNRATE', 'TCU']: | |
resampled = series.resample('ME').last() | |
else: | |
resampled = series.resample('ME').mean() | |
else: | |
# For daily, forward fill | |
resampled = series.resample('D').ffill() | |
aligned_data[column] = resampled | |
return aligned_data | |
def calculate_growth_rates(self, data: pd.DataFrame, method: str = 'pct_change') -> pd.DataFrame: | |
""" | |
Calculate growth rates with proper handling | |
Args: | |
data: DataFrame with economic indicators | |
method: Method for growth calculation ('pct_change', 'log_diff') | |
Returns: | |
DataFrame with growth rates | |
""" | |
logger.info(f"Calculating growth rates using {method} method") | |
growth_data = pd.DataFrame() | |
for column in data.columns: | |
series = data[column].dropna() | |
if len(series) > 1: | |
if method == 'pct_change': | |
# Calculate percent change | |
growth = series.pct_change() * 100 | |
elif method == 'log_diff': | |
# Calculate log difference | |
growth = np.log(series / series.shift(1)) * 100 | |
else: | |
# Default to percent change | |
growth = series.pct_change() * 100 | |
growth_data[column] = growth | |
return growth_data | |
def enforce_stationarity(self, data: pd.DataFrame, max_diffs: int = 2) -> Tuple[pd.DataFrame, Dict]: | |
""" | |
Enforce stationarity through differencing | |
Args: | |
data: DataFrame with economic indicators | |
max_diffs: Maximum number of differences to apply | |
Returns: | |
Tuple of (stationary_data, differencing_info) | |
""" | |
logger.info("Enforcing stationarity through differencing") | |
stationary_data = pd.DataFrame() | |
differencing_info = {} | |
for column in data.columns: | |
series = data[column].dropna() | |
if len(series) > 1: | |
# Apply differencing until stationary | |
diff_count = 0 | |
current_series = series | |
while diff_count < max_diffs: | |
# Simple stationarity check (can be enhanced with ADF test) | |
if self._is_stationary(current_series): | |
break | |
current_series = current_series.diff().dropna() | |
diff_count += 1 | |
stationary_data[column] = current_series | |
differencing_info[column] = { | |
'diffs_applied': diff_count, | |
'is_stationary': self._is_stationary(current_series) | |
} | |
return stationary_data, differencing_info | |
def _is_stationary(self, series: pd.Series, threshold: float = 0.05) -> bool: | |
""" | |
Simple stationarity check based on variance | |
Args: | |
series: Time series to check | |
threshold: Variance threshold for stationarity | |
Returns: | |
True if series appears stationary | |
""" | |
if len(series) < 10: | |
return True | |
# Split series into halves and compare variance | |
mid = len(series) // 2 | |
first_half = series[:mid] | |
second_half = series[mid:] | |
var_ratio = second_half.var() / first_half.var() | |
# If variance ratio is close to 1, series is likely stationary | |
return 0.5 <= var_ratio <= 2.0 | |
def scale_forecast_periods(self, forecast_periods: int, indicator: str, data: pd.DataFrame) -> int: | |
""" | |
Scale forecast periods based on indicator frequency | |
Args: | |
forecast_periods: Base forecast periods | |
indicator: Economic indicator name | |
data: DataFrame with economic data | |
Returns: | |
Scaled forecast periods | |
""" | |
if indicator not in data.columns: | |
return forecast_periods | |
series = data[indicator].dropna() | |
if len(series) < 2: | |
return forecast_periods | |
# Determine frequency from data | |
freq = self._infer_frequency(series) | |
# Scale forecast periods | |
if freq == 'D': | |
return forecast_periods * 30 # 30 days per quarter | |
elif freq == 'M': | |
return forecast_periods * 3 # 3 months per quarter | |
else: | |
return forecast_periods # Already quarterly | |
def _infer_frequency(self, series: pd.Series) -> str: | |
""" | |
Infer frequency from time series | |
Args: | |
series: Time series | |
Returns: | |
Frequency string ('D', 'M', 'Q') | |
""" | |
if len(series) < 2: | |
return 'Q' | |
# Calculate average time difference | |
time_diff = series.index.to_series().diff().dropna() | |
avg_diff = time_diff.mean() | |
if avg_diff.days <= 1: | |
return 'D' | |
elif avg_diff.days <= 35: | |
return 'M' | |
else: | |
return 'Q' | |
def safe_mape(self, actual: np.ndarray, forecast: np.ndarray) -> float: | |
""" | |
Calculate safe MAPE with protection against division by zero | |
Args: | |
actual: Actual values | |
forecast: Forecasted values | |
Returns: | |
MAPE value | |
""" | |
actual = np.array(actual) | |
forecast = np.array(forecast) | |
# Avoid division by zero | |
denominator = np.maximum(np.abs(actual), 1e-8) | |
mape = np.mean(np.abs((actual - forecast) / denominator)) * 100 | |
return mape | |
def safe_mae(self, actual: np.ndarray, forecast: np.ndarray) -> float: | |
""" | |
Calculate MAE (Mean Absolute Error) | |
Args: | |
actual: Actual values | |
forecast: Forecasted values | |
Returns: | |
MAE value | |
""" | |
actual = np.array(actual) | |
forecast = np.array(forecast) | |
return np.mean(np.abs(actual - forecast)) | |
def safe_rmse(self, actual: np.ndarray, forecast: np.ndarray) -> float: | |
"""Calculate RMSE safely handling edge cases""" | |
if len(actual) == 0 or len(forecast) == 0: | |
return np.inf | |
# Ensure same length | |
min_len = min(len(actual), len(forecast)) | |
if min_len == 0: | |
return np.inf | |
actual_trimmed = actual[:min_len] | |
forecast_trimmed = forecast[:min_len] | |
# Remove any infinite or NaN values | |
mask = np.isfinite(actual_trimmed) & np.isfinite(forecast_trimmed) | |
if not np.any(mask): | |
return np.inf | |
actual_clean = actual_trimmed[mask] | |
forecast_clean = forecast_trimmed[mask] | |
if len(actual_clean) == 0: | |
return np.inf | |
return np.sqrt(np.mean((actual_clean - forecast_clean) ** 2)) | |
def validate_scaling(self, series: pd.Series, | |
unit_hint: str, | |
expected_min: float, | |
expected_max: float): | |
""" | |
Checks if values fall within expected magnitude range. | |
Args: | |
series: pandas Series of numeric data. | |
unit_hint: description, e.g., "Real GDP". | |
expected_min / expected_max: plausible lower/upper bounds (same units). | |
Raises: | |
ValueError if data outside range for >5% of values. | |
""" | |
vals = series.dropna() | |
mask = (vals < expected_min) | (vals > expected_max) | |
if mask.mean() > 0.05: | |
raise ValueError(f"{unit_hint}: {mask.mean():.1%} of data " | |
f"outside [{expected_min}, {expected_max}]. " | |
"Check for scaling/unit issues.") | |
print(f"{unit_hint}: data within expected range.") | |
def apply_comprehensive_fixes(self, data: pd.DataFrame, | |
target_freq: str = 'Q', | |
growth_method: str = 'pct_change', | |
normalize_units: bool = True, | |
preserve_absolute_values: bool = False) -> Tuple[pd.DataFrame, Dict]: | |
""" | |
Apply comprehensive mathematical fixes to economic data | |
Args: | |
data: DataFrame with economic indicators | |
target_freq: Target frequency ('D', 'M', 'Q') | |
growth_method: Method for growth calculation ('pct_change', 'log_diff') | |
normalize_units: Whether to normalize units | |
preserve_absolute_values: Whether to preserve absolute values for display | |
Returns: | |
Tuple of (processed_data, fix_info) | |
""" | |
logger.info("Applying comprehensive mathematical fixes") | |
fix_info = { | |
'original_shape': data.shape, | |
'frequency_alignment': {}, | |
'unit_normalization': {}, | |
'growth_calculation': {}, | |
'stationarity_enforcement': {}, | |
'validation_results': {} | |
} | |
processed_data = data.copy() | |
# Step 1: Align frequencies | |
if target_freq != 'auto': | |
processed_data = self.align_frequencies(processed_data, target_freq) | |
fix_info['frequency_alignment'] = { | |
'target_frequency': target_freq, | |
'final_shape': processed_data.shape | |
} | |
# Step 2: Normalize units | |
if normalize_units: | |
processed_data = self.normalize_units(processed_data) | |
fix_info['unit_normalization'] = { | |
'normalized_indicators': list(processed_data.columns) | |
} | |
# Step 3: Calculate growth rates if requested | |
if growth_method in ['pct_change', 'log_diff']: | |
growth_data = self.calculate_growth_rates(processed_data, growth_method) | |
fix_info['growth_calculation'] = { | |
'method': growth_method, | |
'growth_indicators': list(growth_data.columns) | |
} | |
# For now, keep both absolute and growth data | |
if not preserve_absolute_values: | |
processed_data = growth_data | |
# Step 4: Enforce stationarity | |
stationary_data, differencing_info = self.enforce_stationarity(processed_data) | |
fix_info['stationarity_enforcement'] = differencing_info | |
# Step 5: Validate processed data | |
validation_results = self._validate_processed_data(processed_data) | |
fix_info['validation_results'] = validation_results | |
logger.info(f"Comprehensive fixes applied. Final shape: {processed_data.shape}") | |
return processed_data, fix_info | |
def _validate_processed_data(self, data: pd.DataFrame) -> Dict: | |
""" | |
Validate processed data for scaling and quality issues | |
Args: | |
data: Processed DataFrame | |
Returns: | |
Dictionary with validation results | |
""" | |
validation_results = { | |
'scaling_issues': [], | |
'quality_warnings': [], | |
'validation_score': 100.0 | |
} | |
for column in data.columns: | |
series = data[column].dropna() | |
if len(series) == 0: | |
validation_results['quality_warnings'].append(f"{column}: No data available") | |
continue | |
# Check for extreme values that might indicate scaling issues | |
mean_val = series.mean() | |
std_val = series.std() | |
# Check for values that are too large or too small | |
if abs(mean_val) > 1e6: | |
validation_results['scaling_issues'].append( | |
f"{column}: Mean value {mean_val:.2e} is extremely large - possible scaling issue" | |
) | |
if std_val > 1e5: | |
validation_results['scaling_issues'].append( | |
f"{column}: Standard deviation {std_val:.2e} is extremely large - possible scaling issue" | |
) | |
# Check for values that are too close to zero (might indicate unit conversion issues) | |
if abs(mean_val) < 1e-6 and std_val < 1e-6: | |
validation_results['scaling_issues'].append( | |
f"{column}: Values are extremely small - possible unit conversion issue" | |
) | |
# Calculate validation score | |
total_checks = len(data.columns) | |
failed_checks = len(validation_results['scaling_issues']) + len(validation_results['quality_warnings']) | |
if total_checks > 0: | |
validation_results['validation_score'] = max(0, 100 - (failed_checks / total_checks) * 100) | |
return validation_results |