|
import requests |
|
import os |
|
from dotenv import load_dotenv |
|
from typing import Dict, List, Optional, Any |
|
import logging |
|
|
|
load_dotenv() |
|
|
|
logging.basicConfig( |
|
level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" |
|
) |
|
logger = logging.getLogger(__name__) |
|
|
|
FMP_API_KEY = os.getenv("FMP_API_KEY") |
|
ALPHAVANTAGE_API_KEY = os.getenv("ALPHAVANTAGE_API_KEY") |
|
|
|
FMP_BASE_URL = "https://financialmodelingprep.com/api/v3" |
|
ALPHAVANTAGE_BASE_URL = "https://www.alphavantage.co/query" |
|
|
|
|
|
class DataIngestionError(Exception): |
|
"""Custom exception for data ingestion API errors.""" |
|
|
|
pass |
|
|
|
|
|
class FMPFetchError(DataIngestionError): |
|
"""Specific error for FMP fetching issues.""" |
|
|
|
pass |
|
|
|
|
|
class AVFetchError(DataIngestionError): |
|
"""Specific error for AlphaVantage fetching issues.""" |
|
|
|
pass |
|
|
|
|
|
def _fetch_from_fmp(ticker: str, api_key: str) -> Dict[str, Dict[str, Any]]: |
|
"""Internal function to fetch data from FMP. Uses /historical-price-full/ as recommended.""" |
|
|
|
endpoint = f"{FMP_BASE_URL}/historical-price-full/{ticker}" |
|
params = {"apikey": api_key} |
|
logger.info( |
|
f"Fetching historical daily data for {ticker} from FMP (using /historical-price-full/)." |
|
) |
|
try: |
|
response = requests.get(endpoint, params=params, timeout=30) |
|
response.raise_for_status() |
|
data = response.json() |
|
|
|
if isinstance(data, dict): |
|
|
|
if "Error Message" in data: |
|
raise FMPFetchError( |
|
f"FMP API returned error for {ticker}: {data['Error Message']}" |
|
) |
|
if data.get("symbol") and "historical" in data: |
|
historical_data_list = data.get("historical") |
|
|
|
if isinstance(historical_data_list, list): |
|
if not historical_data_list: |
|
logger.warning( |
|
f"FMP API returned empty historical data list for {ticker} (from /historical-price-full/)." |
|
) |
|
return {} |
|
|
|
prices_dict: Dict[str, Dict[str, Any]] = {} |
|
for record in historical_data_list: |
|
if isinstance(record, dict) and "date" in record: |
|
prices_dict[record["date"]] = record |
|
else: |
|
logger.warning( |
|
f"Skipping invalid FMP record format for {ticker}: {record}" |
|
) |
|
logger.info( |
|
f"Successfully fetched and formatted {len(prices_dict)} historical records for {ticker} from FMP." |
|
) |
|
return prices_dict |
|
else: |
|
raise FMPFetchError( |
|
f"FMP API historical data for {ticker} has unexpected 'historical' type: {type(historical_data_list)}" |
|
) |
|
else: |
|
raise FMPFetchError( |
|
f"FMP API response for {ticker} (from /historical-price-full/) missing expected structure (symbol/historical keys). Response: {str(data)[:200]}" |
|
) |
|
|
|
elif isinstance(data, list): |
|
if not data: |
|
logger.warning( |
|
f"FMP API returned empty list for {ticker} (from /historical-price-full/)." |
|
) |
|
return {} |
|
if isinstance(data[0], dict) and ( |
|
"Error Message" in data[0] or "error" in data[0] |
|
): |
|
error_msg = data[0].get( |
|
"Error Message", data[0].get("error", "Unknown error in list") |
|
) |
|
raise FMPFetchError( |
|
f"FMP API returned error list for {ticker}: {error_msg}" |
|
) |
|
else: |
|
raise FMPFetchError( |
|
f"FMP API returned unexpected top-level list structure for {ticker} (from /historical-price-full/). Response: {str(data)[:200]}" |
|
) |
|
else: |
|
raise FMPFetchError( |
|
f"FMP API returned unexpected response type for {ticker} (from /historical-price-full/): {type(data)}. Response: {str(data)[:200]}" |
|
) |
|
|
|
except requests.exceptions.RequestException as e: |
|
raise FMPFetchError(f"FMP data fetch (network) failed for {ticker}: {e}") |
|
except Exception as e: |
|
raise FMPFetchError( |
|
f"FMP data fetch (processing) failed for {ticker}: {e}. Response: {str(locals().get('data', 'N/A'))[:200]}" |
|
) |
|
|
|
|
|
def _fetch_from_alphavantage(ticker: str, api_key: str) -> Dict[str, Dict[str, Any]]: |
|
"""Internal function to fetch data from AlphaVantage.""" |
|
endpoint = f"{ALPHAVANTAGE_BASE_URL}/query" |
|
params = { |
|
"function": "TIME_SERIES_DAILY_ADJUSTED", |
|
"symbol": ticker, |
|
"apikey": api_key, |
|
"outputsize": "compact", |
|
} |
|
logger.info(f"Fetching historical daily data for {ticker} from AlphaVantage.") |
|
try: |
|
response = requests.get(endpoint, params=params, timeout=30) |
|
response.raise_for_status() |
|
data = response.json() |
|
|
|
if not isinstance(data, dict): |
|
raise AVFetchError( |
|
f"AlphaVantage API returned unexpected response type for {ticker}: {type(data)}. Expected dict. Response: {str(data)[:200]}" |
|
) |
|
|
|
if "Error Message" in data: |
|
raise AVFetchError( |
|
f"AlphaVantage API returned error for {ticker}: {data['Error Message']}" |
|
) |
|
if "Note" in data: |
|
logger.warning( |
|
f"AlphaVantage API returned note for {ticker}: {data['Note']} - treating as no data." |
|
) |
|
|
|
return {} |
|
|
|
time_series_data = data.get("Time Series (Daily)") |
|
|
|
if time_series_data is None: |
|
|
|
if not data: |
|
logger.warning( |
|
f"AlphaVantage API returned an empty dictionary for {ticker}." |
|
) |
|
return {} |
|
else: |
|
raise AVFetchError( |
|
f"AlphaVantage API response for {ticker} missing 'Time Series (Daily)' key. Response: {str(data)[:200]}" |
|
) |
|
|
|
if not isinstance(time_series_data, dict): |
|
raise AVFetchError( |
|
f"AlphaVantage API 'Time Series (Daily)' for {ticker} is not a dictionary. Type: {type(time_series_data)}. Response: {str(data)[:200]}" |
|
) |
|
|
|
if not time_series_data: |
|
logger.warning( |
|
f"AlphaVantage API returned empty time series data for {ticker}." |
|
) |
|
return {} |
|
|
|
prices_dict: Dict[str, Dict[str, Any]] = {} |
|
for date_str, values_dict in time_series_data.items(): |
|
if isinstance(values_dict, dict): |
|
cleaned_values: Dict[str, Any] = {} |
|
if "1. open" in values_dict: |
|
cleaned_values["open"] = values_dict["1. open"] |
|
if "2. high" in values_dict: |
|
cleaned_values["high"] = values_dict["2. high"] |
|
if "3. low" in values_dict: |
|
cleaned_values["low"] = values_dict["3. low"] |
|
if "4. close" in values_dict: |
|
cleaned_values["close"] = values_dict["4. close"] |
|
if "5. adjusted close" in values_dict: |
|
cleaned_values["adjClose"] = values_dict["5. adjusted close"] |
|
if "6. volume" in values_dict: |
|
cleaned_values["volume"] = values_dict["6. volume"] |
|
|
|
if cleaned_values: |
|
prices_dict[date_str] = cleaned_values |
|
else: |
|
logger.warning( |
|
f"AlphaVantage data for {ticker} on {date_str} missing expected price keys within daily record." |
|
) |
|
else: |
|
logger.warning( |
|
f"Skipping invalid AlphaVantage daily record (not a dict) for {ticker} on {date_str}: {values_dict}" |
|
) |
|
logger.info( |
|
f"Successfully fetched and formatted {len(prices_dict)} historical records for {ticker} from AlphaVantage." |
|
) |
|
return prices_dict |
|
|
|
except requests.exceptions.RequestException as e: |
|
raise AVFetchError( |
|
f"AlphaVantage data fetch (network) failed for {ticker}: {e}" |
|
) |
|
except Exception as e: |
|
raise AVFetchError( |
|
f"AlphaVantage data fetch (processing) failed for {ticker}: {e}. Response: {str(locals().get('data', 'N/A'))[:200]}" |
|
) |
|
|
|
|
|
def get_daily_adjusted_prices(ticker: str) -> Dict[str, Dict[str, Any]]: |
|
""" |
|
Fetches historical daily adjusted prices for a single ticker. |
|
Tries FMP first if key is available. If FMP fails, tries AlphaVantage if key is available. |
|
Returns a dictionary mapping date strings to price dictionaries. |
|
Raises DataIngestionError if no keys are configured or if both APIs fail. |
|
""" |
|
fmp_key_available = bool(FMP_API_KEY) |
|
av_key_available = bool(ALPHAVANTAGE_API_KEY) |
|
|
|
if not fmp_key_available and not av_key_available: |
|
raise DataIngestionError( |
|
"No API keys configured for historical price data (FMP, AlphaVantage)." |
|
) |
|
|
|
fmp_error_detail = None |
|
av_error_detail = None |
|
data_from_fmp = {} |
|
data_from_av = {} |
|
|
|
if fmp_key_available: |
|
try: |
|
data_from_fmp = _fetch_from_fmp(ticker, FMP_API_KEY) |
|
if data_from_fmp: |
|
return data_from_fmp |
|
else: |
|
|
|
fmp_error_detail = f"FMP API returned no data for {ticker}." |
|
logger.warning(fmp_error_detail) |
|
except FMPFetchError as e: |
|
fmp_error_detail = str(e) |
|
logger.error(f"FMPFetchError for {ticker}: {fmp_error_detail}") |
|
except Exception as e: |
|
fmp_error_detail = ( |
|
f"An unexpected error occurred during FMP fetch for {ticker}: {e}" |
|
) |
|
logger.error(fmp_error_detail) |
|
|
|
if av_key_available: |
|
try: |
|
data_from_av = _fetch_from_alphavantage(ticker, ALPHAVANTAGE_API_KEY) |
|
if data_from_av: |
|
return data_from_av |
|
else: |
|
|
|
av_error_detail = f"AlphaVantage API returned no data for {ticker}." |
|
logger.warning(av_error_detail) |
|
except AVFetchError as e: |
|
av_error_detail = str(e) |
|
logger.error(f"AVFetchError for {ticker}: {av_error_detail}") |
|
except Exception as e: |
|
av_error_detail = f"An unexpected error occurred during AlphaVantage fetch for {ticker}: {e}" |
|
logger.error(av_error_detail) |
|
|
|
error_messages = [] |
|
if fmp_key_available: |
|
if fmp_error_detail: |
|
error_messages.append(f"FMP: {fmp_error_detail}") |
|
elif not data_from_fmp: |
|
error_messages.append(f"FMP: Returned no data for {ticker}.") |
|
|
|
if av_key_available: |
|
if av_error_detail: |
|
error_messages.append(f"AlphaVantage: {av_error_detail}") |
|
elif not data_from_av: |
|
error_messages.append(f"AlphaVantage: Returned no data for {ticker}.") |
|
|
|
providers_tried = [] |
|
if fmp_key_available: |
|
providers_tried.append("FMP") |
|
if av_key_available: |
|
providers_tried.append("AlphaVantage") |
|
|
|
final_message = f"Failed to fetch historical data for {ticker} after trying {', '.join(providers_tried) if providers_tried else 'available providers'}." |
|
if error_messages: |
|
final_message += " Details: " + "; ".join(error_messages) |
|
else: |
|
final_message += " No data was returned from any attempted source." |
|
|
|
raise DataIngestionError(final_message) |
|
|