Spaces:
Running
Data Factory Examples
Google Colab Version
Open this notebook in Google Colab
Dependencies
Make sure to install the necessary dependencies.
%pip install starfish-core
Enabling Async Execution
A workaround for enabling async code execution in Jupyter notebooks. Not for production use.
import nest_asyncio
nest_asyncio.apply()
Environment Setup
Load environment variables from .env
file.
from starfish.common.env_loader import load_env_file
load_env_file()
Example 1: Your First Data Factory: Simple Scaling
The @data_factory
decorator allows transforming an async function into a scalable data pipeline.
Create a StructuredLLM instance
json_llm = StructuredLLM(
model_name = "openai/gpt-4o-mini",
prompt = "Funny facts about city {{city_name}}.",
output_schema = [{'name': 'fact', 'type': 'str'}],
model_kwargs = {"temperature": 0.7},
)
json_llm_response = await json_llm.run(city_name='New York')
json_llm_response.data
Scale with Data Factory
from datetime import datetime
@data_factory(max_concurrency=10)
async def process_json_llm(city_name: str):
print(f"Processing {city_name} at {datetime.now()}")
json_llm_response = await json_llm.run(city_name=city_name)
return json_llm_response.data
process_json_llm.run(city_name=["New York", "London", "Tokyo", "Paris", "Sydney"])
Example 2: Works with any Async Function
Data Factory works with any async function. Here is a chained example:
@data_factory(max_concurrency=10)
async def complex_process_cities(topic: str):
generator_llm = StructuredLLM(
model_name="openai/gpt-4o-mini",
prompt="Generate question/answer pairs about {{topic}}.",
output_schema=[
{"name": "question", "type": "str"},
{"name": "answer", "type": "str"}
],
)
rater_llm = StructuredLLM(
model_name="openai/gpt-4o-mini",
prompt='''Rate the following Q&A pairs based on accuracy and clarity (1-10).\n
Pairs: {{generated_pairs}}''',
output_schema=[
{"name": "accuracy_rating", "type": "int"},
{"name": "clarity_rating", "type": "int"}
],
model_kwargs={"temperature": 0.5}
)
generation_response = await generator_llm.run(topic=topic, num_records=5)
rating_response = await rater_llm.run(generated_pairs=generation_response.data)
return merge_structured_outputs(generation_response.data, rating_response.data)
complex_process_cities_data = complex_process_cities.run(topic=['Science', 'History', 'Technology'])
Example 3: Working with Different Input Formats
Data Factory supports various input formats, enhancing flexibility.
@data_factory(max_concurrency=100)
async def input_format_mock_llm(city_name: str, num_records_per_city: int):
return await mock_llm_call(city_name=city_name, num_records_per_city=num_records_per_city, fail_rate=0.01)
# Example with different input formats.
input_format_data = input_format_mock_llm.run(city_name=["New York", "London"], num_records_per_city=1)
Example 4: Resilient Error Retry
Data Factory handles errors gracefully with retry mechanisms.
async def high_error_rate_mock_llm(city_name: str, num_records_per_city: int):
return await mock_llm_call(city_name=city_name, num_records_per_city=num_records_per_city, fail_rate=0.3)
cities = ["New York", "London", "Tokyo"] * 5
high_error_rate_mock_llm_data = high_error_rate_mock_llm.run(city_name=cities, num_records_per_city=1)
Advanced Usage
Data Factory offers hooks for customizations and coordination between tasks.
Resume
5. Resume
This is essential for long-running jobs with thousands of tasks.
If a job is interrupted, you can pick up where you left off using one of two resume methods:
Same Session Resume: If you're still in the same session where the job was interrupted, simply call - Same instance with .resume()
Cross-Session Resume: If you've closed your notebook or lost your session, you can resume using the job ID:
from starfish import DataFactory # Resume using the master job ID from a previous run data_factory = DataFactory.resume_from_checkpoint(job_id="your_job_id")
The key difference:
resume()
uses the same DataFactory instance you definedresume_from_checkpoint()
reconstructs your DataFactory from persistent storage where tasks and progress are saved
Note: Google Colab users may experience issues with
resume_from_checkpoint()
due to how Colab works
We're simulating an interruption here. In a real scenario, this might happen if your notebook errors out, is manually interrupted with a keyboard command, encounters API rate limits, or experiences any other issues that halt execution.
@data_factory(max_concurrency=10)
async def re_run_mock_llm(city_name: str, num_records_per_city: int):
return await mock_llm_call(city_name=city_name, num_records_per_city=num_records_per_city, fail_rate=0.3)
cities = ["New York", "London", "Tokyo", "Paris", "Sydney"] * 20 # 100 cities
re_run_mock_llm_data_1 = re_run_mock_llm.run(city_name=cities, num_records_per_city=1)
print("When a job is interrupted, you'll see a message like:")
print("[RESUME INFO] 🚨 Job stopped unexpectedly. You can resume the job by calling .resume()")
print("\nTo resume an interrupted job, simply call:")
print("interrupted_job_mock_llm.resume()")
print('')
print(f"For this example we have {len(re_run_mock_llm_data_1)}/{len(cities)} data generated and not finished yet!")
re_run_mock_llm_data_2 = re_run_mock_llm.resume()
Dry Run
Before running a large job, you can do a "dry run" to test your pipeline. This only processes a single item and doesn't save state to the database.
@data_factory(max_concurrency=10)
async def dry_run_mock_llm(city_name: str, num_records_per_city: int):
return await mock_llm_call(city_name=city_name, num_records_per_city=num_records_per_city, fail_rate=0.3)
dry_run_mock_llm_data = dry_run_mock_llm.dry_run(city_name=["New York", "London", "Tokyo", "Paris", "Sydney"]*20, num_records_per_city=1)