Spaces:
Sleeping
Sleeping
# services/economic_data.py | |
import os | |
import requests | |
from config import FRED_API_KEY, FRED_BASE_URL, DEFAULT_REPO_RATE, DEFAULT_INFLATION_RATE | |
from models.market_data import EconomicIndicators | |
class EconomicDataFetcher: | |
"""Fetch economic data from FRED API""" | |
def __init__(self): | |
self.fred_api_key = FRED_API_KEY | |
self.base_url = FRED_BASE_URL | |
def _fetch_fred_series(self, series_id: str) -> EconomicIndicators: | |
"""Helper to fetch a single series from FRED""" | |
try: | |
params = { | |
'series_id': series_id, | |
'api_key': self.fred_api_key, | |
'file_type': 'json', | |
'sort_order': 'desc', | |
'limit': 1 | |
} | |
response = requests.get(self.base_url, params=params) | |
response.raise_for_status() # Raise an exception for bad status codes | |
data = response.json() | |
if 'observations' in data and len(data['observations']) > 0: | |
value_str = data['observations'][0]['value'] | |
# Handle cases where FRED returns '.' for no data | |
if value_str != '.': | |
return float(value_str) | |
return None | |
except Exception as e: | |
print(f"Error fetching FRED series {series_id}: {e}") # Use logging | |
return None | |
def get_rbi_repo_rate(self) -> float: | |
"""Fetch RBI repo rate approximation from FRED (INTDSRINM193N)""" | |
# Note: This might not be the exact repo rate, but a proxy | |
fetched_rate = self._fetch_fred_series('INTDSRINM193N') | |
return fetched_rate if fetched_rate is not None else DEFAULT_REPO_RATE | |
def get_indian_inflation_rate(self) -> float: | |
"""Fetch Indian inflation rate from FRED (FPCPITOTLZGIND)""" | |
fetched_rate = self._fetch_fred_series('FPCPITOTLZGIND') | |
return fetched_rate if fetched_rate is not None else DEFAULT_INFLATION_RATE | |