Edwin Salguero
Enterprise: Transform to production-grade architecture with FastAPI, Docker, K8s, monitoring, and comprehensive tooling
832348e
import abc | |
import logging | |
import os | |
import yaml | |
class BasePipeline(abc.ABC): | |
""" | |
Abstract base class for all data pipelines. | |
Handles config loading, logging, and pipeline orchestration. | |
""" | |
def __init__(self, config_path: str): | |
self.config = self.load_config(config_path) | |
self.logger = self.setup_logger() | |
def load_config(config_path: str): | |
with open(config_path, "r") as f: | |
return yaml.safe_load(f) | |
def setup_logger(self): | |
log_cfg = self.config.get("logging", {}) | |
log_level = getattr(logging, log_cfg.get("level", "INFO").upper(), logging.INFO) | |
log_file = log_cfg.get("file", "pipeline.log") | |
os.makedirs(os.path.dirname(log_file), exist_ok=True) | |
logging.basicConfig( | |
level=log_level, | |
format="%(asctime)s %(levelname)s %(name)s %(message)s", | |
handlers=[logging.FileHandler(log_file), logging.StreamHandler()], | |
) | |
return logging.getLogger(self.__class__.__name__) | |
def run(self): | |
"""Run the pipeline (to be implemented by subclasses).""" | |
pass | |