starfish_data_ai / vibe_coding /data_factory.md
John-Jiang's picture
init commit
5301c48

data_factory Function Overview

The data_factory function is a decorator designed for creating data processing pipelines. It is defined in the factory.py file of the starfish.data_factory module. This decorator facilitates the set up and execution of data pipeline tasks, supporting various configurations for concurrency, error handling, and storage options.

Function Signature

def data_factory(
    storage: str = STORAGE_TYPE_LOCAL,
    batch_size: int = 1,
    target_count: int = 0,
    dead_queue_threshold: int = 3,
    max_concurrency: int = 10,
    initial_state_values: Optional[Dict[str, Any]] = None,
    on_record_complete: Optional[List[Callable]] = None,
    on_record_error: Optional[List[Callable]] = None,
    show_progress: bool = True,
    task_runner_timeout: int = TASK_RUNNER_TIMEOUT,
    job_run_stop_threshold: int = NOT_COMPLETED_THRESHOLD,
) -> Callable[[Callable[P, T]], DataFactoryProtocol[P, T]]:

Key Arguments

  • storage: Type of storage backend to use, such as 'local' or 'in_memory'.
  • batch_size: Number of records processed in each batch.
  • target_count: The target number of records to generate. A value of 0 denotes processing all available input records.
  • max_concurrency: Maximum number of concurrent tasks that can be executed.
  • initial_state_values: Initial shared state values for the factory.
  • on_record_complete: List of callback functions to execute upon the successful processing of a record.
  • on_record_error: List of callback functions to execute if record processing fails.
  • show_progress: Boolean indicating whether a progress bar should be displayed.
  • task_runner_timeout: Timeout for task execution in seconds.
  • job_run_stop_threshold: Threshold to stop the job if a significant number of records fail processing.

Functionality

  • Decorator Creation: The data_factory function serves as a decorator that wraps a function responsible for processing data. It provides mechanisms for customizing various aspects of the pipeline such as concurrency and error handling.

  • Configuration: It initializes a configuration object FactoryMasterConfig, which holds the aforementioned parameters.

  • Factory Initialization: The decorator internally initializes or updates a factory instance, using the provided function and state values.

  • Resume Capability: The decorator adds a static method resume_from_checkpoint to allow a paused data processing job to be resumed.

This structured and highly configurable decorator pattern allows for scalability and flexibility in creating sophisticated data processing pipelines.