|
from .base_pipeline import BasePipeline |
|
import requests |
|
import pandas as pd |
|
import os |
|
from datetime import datetime |
|
|
|
class FREDPipeline(BasePipeline): |
|
""" |
|
FRED Data Pipeline: Extracts, transforms, and loads FRED data using config. |
|
""" |
|
def __init__(self, config_path: str): |
|
super().__init__(config_path) |
|
self.fred_cfg = self.config['fred'] |
|
self.api_key = self.fred_cfg['api_key'] |
|
self.series = self.fred_cfg['series'] |
|
self.start_date = self.fred_cfg['start_date'] |
|
self.end_date = self.fred_cfg['end_date'] |
|
self.output_dir = self.fred_cfg['output_dir'] |
|
self.export_dir = self.fred_cfg['export_dir'] |
|
os.makedirs(self.output_dir, exist_ok=True) |
|
os.makedirs(self.export_dir, exist_ok=True) |
|
|
|
def extract(self): |
|
"""Extract data from FRED API for all configured series.""" |
|
base_url = "https://api.stlouisfed.org/fred/series/observations" |
|
data = {} |
|
for series_id in self.series: |
|
params = { |
|
'series_id': series_id, |
|
'api_key': self.api_key, |
|
'file_type': 'json', |
|
'start_date': self.start_date, |
|
'end_date': self.end_date |
|
} |
|
try: |
|
resp = requests.get(base_url, params=params) |
|
resp.raise_for_status() |
|
obs = resp.json().get('observations', []) |
|
dates, values = [], [] |
|
for o in obs: |
|
try: |
|
dates.append(pd.to_datetime(o['date'])) |
|
values.append(float(o['value']) if o['value'] != '.' else None) |
|
except Exception: |
|
continue |
|
data[series_id] = pd.Series(values, index=dates, name=series_id) |
|
self.logger.info(f"Extracted {len(values)} records for {series_id}") |
|
except Exception as e: |
|
self.logger.error(f"Failed to extract {series_id}: {e}") |
|
return data |
|
|
|
def transform(self, data): |
|
"""Transform raw data into a DataFrame, align dates, handle missing.""" |
|
if not data: |
|
self.logger.warning("No data to transform.") |
|
return pd.DataFrame() |
|
all_dates = set() |
|
for s in data.values(): |
|
all_dates.update(s.index) |
|
if not all_dates: |
|
return pd.DataFrame() |
|
date_range = pd.date_range(min(all_dates), max(all_dates), freq='D') |
|
df = pd.DataFrame(index=date_range) |
|
for k, v in data.items(): |
|
df[k] = v |
|
df.index.name = 'Date' |
|
self.logger.info(f"Transformed data to DataFrame with shape {df.shape}") |
|
return df |
|
|
|
def load(self, df): |
|
"""Save DataFrame to CSV in output_dir and export_dir.""" |
|
if df.empty: |
|
self.logger.warning("No data to load.") |
|
return None |
|
ts = datetime.now().strftime("%Y%m%d_%H%M%S") |
|
out_path = os.path.join(self.output_dir, f'fred_data_{ts}.csv') |
|
exp_path = os.path.join(self.export_dir, f'fred_data_{ts}.csv') |
|
df.to_csv(out_path) |
|
df.to_csv(exp_path) |
|
self.logger.info(f"Saved data to {out_path} and {exp_path}") |
|
return out_path, exp_path |
|
|
|
def run(self): |
|
self.logger.info("Starting FRED data pipeline run...") |
|
data = self.extract() |
|
df = self.transform(data) |
|
self.load(df) |
|
self.logger.info("FRED data pipeline run complete.") |