File size: 1,206 Bytes
f35bff2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import abc
import logging
import yaml
import os

class BasePipeline(abc.ABC):
    """
    Abstract base class for all data pipelines.
    Handles config loading, logging, and pipeline orchestration.
    """
    def __init__(self, config_path: str):
        self.config = self.load_config(config_path)
        self.logger = self.setup_logger()

    @staticmethod
    def load_config(config_path: str):
        with open(config_path, 'r') as f:
            return yaml.safe_load(f)

    def setup_logger(self):
        log_cfg = self.config.get('logging', {})
        log_level = getattr(logging, log_cfg.get('level', 'INFO').upper(), logging.INFO)
        log_file = log_cfg.get('file', 'pipeline.log')
        os.makedirs(os.path.dirname(log_file), exist_ok=True)
        logging.basicConfig(
            level=log_level,
            format='%(asctime)s %(levelname)s %(name)s %(message)s',
            handlers=[
                logging.FileHandler(log_file),
                logging.StreamHandler()
            ]
        )
        return logging.getLogger(self.__class__.__name__)

    @abc.abstractmethod
    def run(self):
        """Run the pipeline (to be implemented by subclasses)."""
        pass