import requests import os from dotenv import load_dotenv from typing import Dict, List, Optional, Any import logging load_dotenv() logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" ) logger = logging.getLogger(__name__) FMP_API_KEY = os.getenv("FMP_API_KEY") ALPHAVANTAGE_API_KEY = os.getenv("ALPHAVANTAGE_API_KEY") FMP_BASE_URL = "https://financialmodelingprep.com/api/v3" ALPHAVANTAGE_BASE_URL = "https://www.alphavantage.co/query" class DataIngestionError(Exception): """Custom exception for data ingestion API errors.""" pass class FMPFetchError(DataIngestionError): """Specific error for FMP fetching issues.""" pass class AVFetchError(DataIngestionError): """Specific error for AlphaVantage fetching issues.""" pass def _fetch_from_fmp(ticker: str, api_key: str) -> Dict[str, Dict[str, Any]]: """Internal function to fetch data from FMP. Uses /historical-price-full/ as recommended.""" endpoint = f"{FMP_BASE_URL}/historical-price-full/{ticker}" params = {"apikey": api_key} logger.info( f"Fetching historical daily data for {ticker} from FMP (using /historical-price-full/)." ) try: response = requests.get(endpoint, params=params, timeout=30) response.raise_for_status() data = response.json() if isinstance(data, dict): if "Error Message" in data: raise FMPFetchError( f"FMP API returned error for {ticker}: {data['Error Message']}" ) if data.get("symbol") and "historical" in data: historical_data_list = data.get("historical") if isinstance(historical_data_list, list): if not historical_data_list: logger.warning( f"FMP API returned empty historical data list for {ticker} (from /historical-price-full/)." ) return {} prices_dict: Dict[str, Dict[str, Any]] = {} for record in historical_data_list: if isinstance(record, dict) and "date" in record: prices_dict[record["date"]] = record else: logger.warning( f"Skipping invalid FMP record format for {ticker}: {record}" ) logger.info( f"Successfully fetched and formatted {len(prices_dict)} historical records for {ticker} from FMP." ) return prices_dict else: raise FMPFetchError( f"FMP API historical data for {ticker} has unexpected 'historical' type: {type(historical_data_list)}" ) else: raise FMPFetchError( f"FMP API response for {ticker} (from /historical-price-full/) missing expected structure (symbol/historical keys). Response: {str(data)[:200]}" ) elif isinstance(data, list): if not data: logger.warning( f"FMP API returned empty list for {ticker} (from /historical-price-full/)." ) return {} if isinstance(data[0], dict) and ( "Error Message" in data[0] or "error" in data[0] ): error_msg = data[0].get( "Error Message", data[0].get("error", "Unknown error in list") ) raise FMPFetchError( f"FMP API returned error list for {ticker}: {error_msg}" ) else: raise FMPFetchError( f"FMP API returned unexpected top-level list structure for {ticker} (from /historical-price-full/). Response: {str(data)[:200]}" ) else: raise FMPFetchError( f"FMP API returned unexpected response type for {ticker} (from /historical-price-full/): {type(data)}. Response: {str(data)[:200]}" ) except requests.exceptions.RequestException as e: raise FMPFetchError(f"FMP data fetch (network) failed for {ticker}: {e}") except Exception as e: raise FMPFetchError( f"FMP data fetch (processing) failed for {ticker}: {e}. Response: {str(locals().get('data', 'N/A'))[:200]}" ) def _fetch_from_alphavantage(ticker: str, api_key: str) -> Dict[str, Dict[str, Any]]: """Internal function to fetch data from AlphaVantage.""" endpoint = f"{ALPHAVANTAGE_BASE_URL}/query" params = { "function": "TIME_SERIES_DAILY_ADJUSTED", "symbol": ticker, "apikey": api_key, "outputsize": "compact", } logger.info(f"Fetching historical daily data for {ticker} from AlphaVantage.") try: response = requests.get(endpoint, params=params, timeout=30) response.raise_for_status() data = response.json() if not isinstance(data, dict): raise AVFetchError( f"AlphaVantage API returned unexpected response type for {ticker}: {type(data)}. Expected dict. Response: {str(data)[:200]}" ) if "Error Message" in data: raise AVFetchError( f"AlphaVantage API returned error for {ticker}: {data['Error Message']}" ) if "Note" in data: logger.warning( f"AlphaVantage API returned note for {ticker}: {data['Note']} - treating as no data." ) return {} time_series_data = data.get("Time Series (Daily)") if time_series_data is None: if not data: logger.warning( f"AlphaVantage API returned an empty dictionary for {ticker}." ) return {} else: raise AVFetchError( f"AlphaVantage API response for {ticker} missing 'Time Series (Daily)' key. Response: {str(data)[:200]}" ) if not isinstance(time_series_data, dict): raise AVFetchError( f"AlphaVantage API 'Time Series (Daily)' for {ticker} is not a dictionary. Type: {type(time_series_data)}. Response: {str(data)[:200]}" ) if not time_series_data: logger.warning( f"AlphaVantage API returned empty time series data for {ticker}." ) return {} prices_dict: Dict[str, Dict[str, Any]] = {} for date_str, values_dict in time_series_data.items(): if isinstance(values_dict, dict): cleaned_values: Dict[str, Any] = {} if "1. open" in values_dict: cleaned_values["open"] = values_dict["1. open"] if "2. high" in values_dict: cleaned_values["high"] = values_dict["2. high"] if "3. low" in values_dict: cleaned_values["low"] = values_dict["3. low"] if "4. close" in values_dict: cleaned_values["close"] = values_dict["4. close"] if "5. adjusted close" in values_dict: cleaned_values["adjClose"] = values_dict["5. adjusted close"] if "6. volume" in values_dict: cleaned_values["volume"] = values_dict["6. volume"] if cleaned_values: prices_dict[date_str] = cleaned_values else: logger.warning( f"AlphaVantage data for {ticker} on {date_str} missing expected price keys within daily record." ) else: logger.warning( f"Skipping invalid AlphaVantage daily record (not a dict) for {ticker} on {date_str}: {values_dict}" ) logger.info( f"Successfully fetched and formatted {len(prices_dict)} historical records for {ticker} from AlphaVantage." ) return prices_dict except requests.exceptions.RequestException as e: raise AVFetchError( f"AlphaVantage data fetch (network) failed for {ticker}: {e}" ) except Exception as e: raise AVFetchError( f"AlphaVantage data fetch (processing) failed for {ticker}: {e}. Response: {str(locals().get('data', 'N/A'))[:200]}" ) def get_daily_adjusted_prices(ticker: str) -> Dict[str, Dict[str, Any]]: """ Fetches historical daily adjusted prices for a single ticker. Tries FMP first if key is available. If FMP fails, tries AlphaVantage if key is available. Returns a dictionary mapping date strings to price dictionaries. Raises DataIngestionError if no keys are configured or if both APIs fail. """ fmp_key_available = bool(FMP_API_KEY) av_key_available = bool(ALPHAVANTAGE_API_KEY) if not fmp_key_available and not av_key_available: raise DataIngestionError( "No API keys configured for historical price data (FMP, AlphaVantage)." ) fmp_error_detail = None av_error_detail = None data_from_fmp = {} data_from_av = {} if fmp_key_available: try: data_from_fmp = _fetch_from_fmp(ticker, FMP_API_KEY) if data_from_fmp: return data_from_fmp else: fmp_error_detail = f"FMP API returned no data for {ticker}." logger.warning(fmp_error_detail) except FMPFetchError as e: fmp_error_detail = str(e) logger.error(f"FMPFetchError for {ticker}: {fmp_error_detail}") except Exception as e: fmp_error_detail = ( f"An unexpected error occurred during FMP fetch for {ticker}: {e}" ) logger.error(fmp_error_detail) if av_key_available: try: data_from_av = _fetch_from_alphavantage(ticker, ALPHAVANTAGE_API_KEY) if data_from_av: return data_from_av else: av_error_detail = f"AlphaVantage API returned no data for {ticker}." logger.warning(av_error_detail) except AVFetchError as e: av_error_detail = str(e) logger.error(f"AVFetchError for {ticker}: {av_error_detail}") except Exception as e: av_error_detail = f"An unexpected error occurred during AlphaVantage fetch for {ticker}: {e}" logger.error(av_error_detail) error_messages = [] if fmp_key_available: if fmp_error_detail: error_messages.append(f"FMP: {fmp_error_detail}") elif not data_from_fmp: error_messages.append(f"FMP: Returned no data for {ticker}.") if av_key_available: if av_error_detail: error_messages.append(f"AlphaVantage: {av_error_detail}") elif not data_from_av: error_messages.append(f"AlphaVantage: Returned no data for {ticker}.") providers_tried = [] if fmp_key_available: providers_tried.append("FMP") if av_key_available: providers_tried.append("AlphaVantage") final_message = f"Failed to fetch historical data for {ticker} after trying {', '.join(providers_tried) if providers_tried else 'available providers'}." if error_messages: final_message += " Details: " + "; ".join(error_messages) else: final_message += " No data was returned from any attempted source." raise DataIngestionError(final_message)