FREDML / src /core /fred_pipeline.py
Edwin Salguero
Enterprise: Transform to production-grade architecture with FastAPI, Docker, K8s, monitoring, and comprehensive tooling
832348e
import os
from datetime import datetime
import pandas as pd
import requests
from .base_pipeline import BasePipeline
class FREDPipeline(BasePipeline):
"""
FRED Data Pipeline: Extracts, transforms, and loads FRED data using config.
"""
def __init__(self, config_path: str):
super().__init__(config_path)
self.fred_cfg = self.config["fred"]
self.api_key = self.fred_cfg["api_key"]
self.series = self.fred_cfg["series"]
self.start_date = self.fred_cfg["start_date"]
self.end_date = self.fred_cfg["end_date"]
self.output_dir = self.fred_cfg["output_dir"]
self.export_dir = self.fred_cfg["export_dir"]
os.makedirs(self.output_dir, exist_ok=True)
os.makedirs(self.export_dir, exist_ok=True)
def extract(self):
"""Extract data from FRED API for all configured series."""
base_url = "https://api.stlouisfed.org/fred/series/observations"
data = {}
for series_id in self.series:
params = {
"series_id": series_id,
"api_key": self.api_key,
"file_type": "json",
"start_date": self.start_date,
"end_date": self.end_date,
}
try:
resp = requests.get(base_url, params=params)
resp.raise_for_status()
obs = resp.json().get("observations", [])
dates, values = [], []
for o in obs:
try:
dates.append(pd.to_datetime(o["date"]))
values.append(float(o["value"]) if o["value"] != "." else None)
except Exception:
continue
data[series_id] = pd.Series(values, index=dates, name=series_id)
self.logger.info(f"Extracted {len(values)} records for {series_id}")
except Exception as e:
self.logger.error(f"Failed to extract {series_id}: {e}")
return data
def transform(self, data):
"""Transform raw data into a DataFrame, align dates, handle missing."""
if not data:
self.logger.warning("No data to transform.")
return pd.DataFrame()
all_dates = set()
for s in data.values():
all_dates.update(s.index)
if not all_dates:
return pd.DataFrame()
date_range = pd.date_range(min(all_dates), max(all_dates), freq="D")
df = pd.DataFrame(index=date_range)
for k, v in data.items():
df[k] = v
df.index.name = "Date"
self.logger.info(f"Transformed data to DataFrame with shape {df.shape}")
return df
def load(self, df):
"""Save DataFrame to CSV in output_dir and export_dir."""
if df.empty:
self.logger.warning("No data to load.")
return None
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
out_path = os.path.join(self.output_dir, f"fred_data_{ts}.csv")
exp_path = os.path.join(self.export_dir, f"fred_data_{ts}.csv")
df.to_csv(out_path)
df.to_csv(exp_path)
self.logger.info(f"Saved data to {out_path} and {exp_path}")
return out_path, exp_path
def run(self):
self.logger.info("Starting FRED data pipeline run...")
data = self.extract()
df = self.transform(data)
self.load(df)
self.logger.info("FRED data pipeline run complete.")