File size: 3,500 Bytes
f35bff2
 
 
832348e
 
 
 
 
 
f35bff2
 
 
 
832348e
f35bff2
 
832348e
 
 
 
 
 
 
f35bff2
 
 
 
 
 
 
 
 
832348e
 
 
 
 
f35bff2
 
 
 
832348e
f35bff2
 
 
832348e
 
f35bff2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
832348e
f35bff2
 
 
832348e
f35bff2
 
 
 
 
 
 
 
 
832348e
 
f35bff2
 
 
 
 
 
 
 
 
 
832348e
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
import os
from datetime import datetime

import pandas as pd
import requests

from .base_pipeline import BasePipeline


class FREDPipeline(BasePipeline):
    """
    FRED Data Pipeline: Extracts, transforms, and loads FRED data using config.
    """

    def __init__(self, config_path: str):
        super().__init__(config_path)
        self.fred_cfg = self.config["fred"]
        self.api_key = self.fred_cfg["api_key"]
        self.series = self.fred_cfg["series"]
        self.start_date = self.fred_cfg["start_date"]
        self.end_date = self.fred_cfg["end_date"]
        self.output_dir = self.fred_cfg["output_dir"]
        self.export_dir = self.fred_cfg["export_dir"]
        os.makedirs(self.output_dir, exist_ok=True)
        os.makedirs(self.export_dir, exist_ok=True)

    def extract(self):
        """Extract data from FRED API for all configured series."""
        base_url = "https://api.stlouisfed.org/fred/series/observations"
        data = {}
        for series_id in self.series:
            params = {
                "series_id": series_id,
                "api_key": self.api_key,
                "file_type": "json",
                "start_date": self.start_date,
                "end_date": self.end_date,
            }
            try:
                resp = requests.get(base_url, params=params)
                resp.raise_for_status()
                obs = resp.json().get("observations", [])
                dates, values = [], []
                for o in obs:
                    try:
                        dates.append(pd.to_datetime(o["date"]))
                        values.append(float(o["value"]) if o["value"] != "." else None)
                    except Exception:
                        continue
                data[series_id] = pd.Series(values, index=dates, name=series_id)
                self.logger.info(f"Extracted {len(values)} records for {series_id}")
            except Exception as e:
                self.logger.error(f"Failed to extract {series_id}: {e}")
        return data

    def transform(self, data):
        """Transform raw data into a DataFrame, align dates, handle missing."""
        if not data:
            self.logger.warning("No data to transform.")
            return pd.DataFrame()
        all_dates = set()
        for s in data.values():
            all_dates.update(s.index)
        if not all_dates:
            return pd.DataFrame()
        date_range = pd.date_range(min(all_dates), max(all_dates), freq="D")
        df = pd.DataFrame(index=date_range)
        for k, v in data.items():
            df[k] = v
        df.index.name = "Date"
        self.logger.info(f"Transformed data to DataFrame with shape {df.shape}")
        return df

    def load(self, df):
        """Save DataFrame to CSV in output_dir and export_dir."""
        if df.empty:
            self.logger.warning("No data to load.")
            return None
        ts = datetime.now().strftime("%Y%m%d_%H%M%S")
        out_path = os.path.join(self.output_dir, f"fred_data_{ts}.csv")
        exp_path = os.path.join(self.export_dir, f"fred_data_{ts}.csv")
        df.to_csv(out_path)
        df.to_csv(exp_path)
        self.logger.info(f"Saved data to {out_path} and {exp_path}")
        return out_path, exp_path

    def run(self):
        self.logger.info("Starting FRED data pipeline run...")
        data = self.extract()
        df = self.transform(data)
        self.load(df)
        self.logger.info("FRED data pipeline run complete.")