import requests
import os
from dotenv import load_dotenv
from typing import Dict, List, Optional, Any
import logging
load_dotenv()
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)
FMP_API_KEY = os.getenv("FMP_API_KEY")
ALPHAVANTAGE_API_KEY = os.getenv("ALPHAVANTAGE_API_KEY")
FMP_BASE_URL = "https://financialmodelingprep.com/api/v3"
ALPHAVANTAGE_BASE_URL = "https://www.alphavantage.co/query"
class DataIngestionError(Exception):
"""Custom exception for data ingestion API errors."""
pass
class FMPFetchError(DataIngestionError):
"""Specific error for FMP fetching issues."""
pass
class AVFetchError(DataIngestionError):
"""Specific error for AlphaVantage fetching issues."""
pass
def _fetch_from_fmp(ticker: str, api_key: str) -> Dict[str, Dict[str, Any]]:
"""Internal function to fetch data from FMP. Uses /historical-price-full/ as recommended."""
endpoint = f"{FMP_BASE_URL}/historical-price-full/{ticker}"
params = {"apikey": api_key}
logger.info(
f"Fetching historical daily data for {ticker} from FMP (using /historical-price-full/)."
)
try:
response = requests.get(endpoint, params=params, timeout=30)
response.raise_for_status()
data = response.json()
if isinstance(data, dict):
if "Error Message" in data:
raise FMPFetchError(
f"FMP API returned error for {ticker}: {data['Error Message']}"
)
if data.get("symbol") and "historical" in data:
historical_data_list = data.get("historical")
if isinstance(historical_data_list, list):
if not historical_data_list:
logger.warning(
f"FMP API returned empty historical data list for {ticker} (from /historical-price-full/)."
)
return {}
prices_dict: Dict[str, Dict[str, Any]] = {}
for record in historical_data_list:
if isinstance(record, dict) and "date" in record:
prices_dict[record["date"]] = record
else:
logger.warning(
f"Skipping invalid FMP record format for {ticker}: {record}"
)
logger.info(
f"Successfully fetched and formatted {len(prices_dict)} historical records for {ticker} from FMP."
)
return prices_dict
else:
raise FMPFetchError(
f"FMP API historical data for {ticker} has unexpected 'historical' type: {type(historical_data_list)}"
)
else:
raise FMPFetchError(
f"FMP API response for {ticker} (from /historical-price-full/) missing expected structure (symbol/historical keys). Response: {str(data)[:200]}"
)
elif isinstance(data, list):
if not data:
logger.warning(
f"FMP API returned empty list for {ticker} (from /historical-price-full/)."
)
return {}
if isinstance(data[0], dict) and (
"Error Message" in data[0] or "error" in data[0]
):
error_msg = data[0].get(
"Error Message", data[0].get("error", "Unknown error in list")
)
raise FMPFetchError(
f"FMP API returned error list for {ticker}: {error_msg}"
)
else:
raise FMPFetchError(
f"FMP API returned unexpected top-level list structure for {ticker} (from /historical-price-full/). Response: {str(data)[:200]}"
)
else:
raise FMPFetchError(
f"FMP API returned unexpected response type for {ticker} (from /historical-price-full/): {type(data)}. Response: {str(data)[:200]}"
)
except requests.exceptions.RequestException as e:
raise FMPFetchError(f"FMP data fetch (network) failed for {ticker}: {e}")
except Exception as e:
raise FMPFetchError(
f"FMP data fetch (processing) failed for {ticker}: {e}. Response: {str(locals().get('data', 'N/A'))[:200]}"
)
def _fetch_from_alphavantage(ticker: str, api_key: str) -> Dict[str, Dict[str, Any]]:
"""Internal function to fetch data from AlphaVantage."""
endpoint = f"{ALPHAVANTAGE_BASE_URL}/query"
params = {
"function": "TIME_SERIES_DAILY_ADJUSTED",
"symbol": ticker,
"apikey": api_key,
"outputsize": "compact",
}
logger.info(f"Fetching historical daily data for {ticker} from AlphaVantage.")
try:
response = requests.get(endpoint, params=params, timeout=30)
response.raise_for_status()
data = response.json()
if not isinstance(data, dict):
raise AVFetchError(
f"AlphaVantage API returned unexpected response type for {ticker}: {type(data)}. Expected dict. Response: {str(data)[:200]}"
)
if "Error Message" in data:
raise AVFetchError(
f"AlphaVantage API returned error for {ticker}: {data['Error Message']}"
)
if "Note" in data:
logger.warning(
f"AlphaVantage API returned note for {ticker}: {data['Note']} - treating as no data."
)
return {}
time_series_data = data.get("Time Series (Daily)")
if time_series_data is None:
if not data:
logger.warning(
f"AlphaVantage API returned an empty dictionary for {ticker}."
)
return {}
else:
raise AVFetchError(
f"AlphaVantage API response for {ticker} missing 'Time Series (Daily)' key. Response: {str(data)[:200]}"
)
if not isinstance(time_series_data, dict):
raise AVFetchError(
f"AlphaVantage API 'Time Series (Daily)' for {ticker} is not a dictionary. Type: {type(time_series_data)}. Response: {str(data)[:200]}"
)
if not time_series_data:
logger.warning(
f"AlphaVantage API returned empty time series data for {ticker}."
)
return {}
prices_dict: Dict[str, Dict[str, Any]] = {}
for date_str, values_dict in time_series_data.items():
if isinstance(values_dict, dict):
cleaned_values: Dict[str, Any] = {}
if "1. open" in values_dict:
cleaned_values["open"] = values_dict["1. open"]
if "2. high" in values_dict:
cleaned_values["high"] = values_dict["2. high"]
if "3. low" in values_dict:
cleaned_values["low"] = values_dict["3. low"]
if "4. close" in values_dict:
cleaned_values["close"] = values_dict["4. close"]
if "5. adjusted close" in values_dict:
cleaned_values["adjClose"] = values_dict["5. adjusted close"]
if "6. volume" in values_dict:
cleaned_values["volume"] = values_dict["6. volume"]
if cleaned_values:
prices_dict[date_str] = cleaned_values
else:
logger.warning(
f"AlphaVantage data for {ticker} on {date_str} missing expected price keys within daily record."
)
else:
logger.warning(
f"Skipping invalid AlphaVantage daily record (not a dict) for {ticker} on {date_str}: {values_dict}"
)
logger.info(
f"Successfully fetched and formatted {len(prices_dict)} historical records for {ticker} from AlphaVantage."
)
return prices_dict
except requests.exceptions.RequestException as e:
raise AVFetchError(
f"AlphaVantage data fetch (network) failed for {ticker}: {e}"
)
except Exception as e:
raise AVFetchError(
f"AlphaVantage data fetch (processing) failed for {ticker}: {e}. Response: {str(locals().get('data', 'N/A'))[:200]}"
)
def get_daily_adjusted_prices(ticker: str) -> Dict[str, Dict[str, Any]]:
"""
Fetches historical daily adjusted prices for a single ticker.
Tries FMP first if key is available. If FMP fails, tries AlphaVantage if key is available.
Returns a dictionary mapping date strings to price dictionaries.
Raises DataIngestionError if no keys are configured or if both APIs fail.
"""
fmp_key_available = bool(FMP_API_KEY)
av_key_available = bool(ALPHAVANTAGE_API_KEY)
if not fmp_key_available and not av_key_available:
raise DataIngestionError(
"No API keys configured for historical price data (FMP, AlphaVantage)."
)
fmp_error_detail = None
av_error_detail = None
data_from_fmp = {}
data_from_av = {}
if fmp_key_available:
try:
data_from_fmp = _fetch_from_fmp(ticker, FMP_API_KEY)
if data_from_fmp:
return data_from_fmp
else:
fmp_error_detail = f"FMP API returned no data for {ticker}."
logger.warning(fmp_error_detail)
except FMPFetchError as e:
fmp_error_detail = str(e)
logger.error(f"FMPFetchError for {ticker}: {fmp_error_detail}")
except Exception as e:
fmp_error_detail = (
f"An unexpected error occurred during FMP fetch for {ticker}: {e}"
)
logger.error(fmp_error_detail)
if av_key_available:
try:
data_from_av = _fetch_from_alphavantage(ticker, ALPHAVANTAGE_API_KEY)
if data_from_av:
return data_from_av
else:
av_error_detail = f"AlphaVantage API returned no data for {ticker}."
logger.warning(av_error_detail)
except AVFetchError as e:
av_error_detail = str(e)
logger.error(f"AVFetchError for {ticker}: {av_error_detail}")
except Exception as e:
av_error_detail = f"An unexpected error occurred during AlphaVantage fetch for {ticker}: {e}"
logger.error(av_error_detail)
error_messages = []
if fmp_key_available:
if fmp_error_detail:
error_messages.append(f"FMP: {fmp_error_detail}")
elif not data_from_fmp:
error_messages.append(f"FMP: Returned no data for {ticker}.")
if av_key_available:
if av_error_detail:
error_messages.append(f"AlphaVantage: {av_error_detail}")
elif not data_from_av:
error_messages.append(f"AlphaVantage: Returned no data for {ticker}.")
providers_tried = []
if fmp_key_available:
providers_tried.append("FMP")
if av_key_available:
providers_tried.append("AlphaVantage")
final_message = f"Failed to fetch historical data for {ticker} after trying {', '.join(providers_tried) if providers_tried else 'available providers'}."
if error_messages:
final_message += " Details: " + "; ".join(error_messages)
else:
final_message += " No data was returned from any attempted source."
raise DataIngestionError(final_message)