starfish_data_ai / vibe_coding /data_factory_examples.md
John-Jiang's picture
init commit
5301c48
|
raw
history blame
6.35 kB

Data Factory Examples

Google Colab Version

Open this notebook in Google Colab

Dependencies

Make sure to install the necessary dependencies.

%pip install starfish-core

Enabling Async Execution

A workaround for enabling async code execution in Jupyter notebooks. Not for production use.

import nest_asyncio
nest_asyncio.apply()

Environment Setup

Load environment variables from .env file.

from starfish.common.env_loader import load_env_file
load_env_file()

Example 1: Your First Data Factory: Simple Scaling

The @data_factory decorator allows transforming an async function into a scalable data pipeline.

Create a StructuredLLM instance

json_llm = StructuredLLM(
    model_name = "openai/gpt-4o-mini",
    prompt = "Funny facts about city {{city_name}}.",
    output_schema = [{'name': 'fact', 'type': 'str'}],
    model_kwargs = {"temperature": 0.7},
)
json_llm_response = await json_llm.run(city_name='New York')
json_llm_response.data

Scale with Data Factory

from datetime import datetime
@data_factory(max_concurrency=10)
async def process_json_llm(city_name: str):
    print(f"Processing {city_name} at {datetime.now()}")
    json_llm_response = await json_llm.run(city_name=city_name)
    return json_llm_response.data
process_json_llm.run(city_name=["New York", "London", "Tokyo", "Paris", "Sydney"])

Example 2: Works with any Async Function

Data Factory works with any async function. Here is a chained example:

@data_factory(max_concurrency=10)
async def complex_process_cities(topic: str):
    generator_llm = StructuredLLM(
        model_name="openai/gpt-4o-mini",
        prompt="Generate question/answer pairs about {{topic}}.",
        output_schema=[
            {"name": "question", "type": "str"},
            {"name": "answer", "type": "str"}
        ],
    )

    rater_llm = StructuredLLM(
        model_name="openai/gpt-4o-mini",
        prompt='''Rate the following Q&A pairs based on accuracy and clarity (1-10).\n
        Pairs: {{generated_pairs}}''',
        output_schema=[
            {"name": "accuracy_rating", "type": "int"},
            {"name": "clarity_rating", "type": "int"}
        ],
        model_kwargs={"temperature": 0.5}
    )

    generation_response = await generator_llm.run(topic=topic, num_records=5)
    rating_response = await rater_llm.run(generated_pairs=generation_response.data)
    return merge_structured_outputs(generation_response.data, rating_response.data)

complex_process_cities_data = complex_process_cities.run(topic=['Science', 'History', 'Technology'])

Example 3: Working with Different Input Formats

Data Factory supports various input formats, enhancing flexibility.

@data_factory(max_concurrency=100)
async def input_format_mock_llm(city_name: str, num_records_per_city: int):
    return await mock_llm_call(city_name=city_name, num_records_per_city=num_records_per_city, fail_rate=0.01)

# Example with different input formats.
input_format_data = input_format_mock_llm.run(city_name=["New York", "London"], num_records_per_city=1)

Example 4: Resilient Error Retry

Data Factory handles errors gracefully with retry mechanisms.

async def high_error_rate_mock_llm(city_name: str, num_records_per_city: int):
    return await mock_llm_call(city_name=city_name, num_records_per_city=num_records_per_city, fail_rate=0.3)

cities = ["New York", "London", "Tokyo"] * 5
high_error_rate_mock_llm_data = high_error_rate_mock_llm.run(city_name=cities, num_records_per_city=1)

Advanced Usage

Data Factory offers hooks for customizations and coordination between tasks.

Resume

5. Resume

This is essential for long-running jobs with thousands of tasks.

If a job is interrupted, you can pick up where you left off using one of two resume methods:

  1. Same Session Resume: If you're still in the same session where the job was interrupted, simply call - Same instance with .resume()

  2. Cross-Session Resume: If you've closed your notebook or lost your session, you can resume using the job ID:

    from starfish import DataFactory
    # Resume using the master job ID from a previous run
    data_factory = DataFactory.resume_from_checkpoint(job_id="your_job_id")
    

The key difference:

  • resume() uses the same DataFactory instance you defined
  • resume_from_checkpoint() reconstructs your DataFactory from persistent storage where tasks and progress are saved

Note: Google Colab users may experience issues with resume_from_checkpoint() due to how Colab works

We're simulating an interruption here. In a real scenario, this might happen if your notebook errors out, is manually interrupted with a keyboard command, encounters API rate limits, or experiences any other issues that halt execution.

@data_factory(max_concurrency=10)
async def re_run_mock_llm(city_name: str, num_records_per_city: int):
    return await mock_llm_call(city_name=city_name, num_records_per_city=num_records_per_city, fail_rate=0.3)

cities = ["New York", "London", "Tokyo", "Paris", "Sydney"] * 20  # 100 cities
re_run_mock_llm_data_1 = re_run_mock_llm.run(city_name=cities, num_records_per_city=1)

print("When a job is interrupted, you'll see a message like:")
print("[RESUME INFO] 🚨 Job stopped unexpectedly. You can resume the job by calling .resume()")

print("\nTo resume an interrupted job, simply call:")
print("interrupted_job_mock_llm.resume()")
print('')
print(f"For this example we have {len(re_run_mock_llm_data_1)}/{len(cities)} data generated and not finished yet!")

re_run_mock_llm_data_2 = re_run_mock_llm.resume()

Dry Run

Before running a large job, you can do a "dry run" to test your pipeline. This only processes a single item and doesn't save state to the database.

@data_factory(max_concurrency=10)
async def dry_run_mock_llm(city_name: str, num_records_per_city: int):
    return await mock_llm_call(city_name=city_name, num_records_per_city=num_records_per_city, fail_rate=0.3)

dry_run_mock_llm_data = dry_run_mock_llm.dry_run(city_name=["New York", "London", "Tokyo", "Paris", "Sydney"]*20, num_records_per_city=1)