File size: 6,353 Bytes
5301c48
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
# Data Factory Examples

## Google Colab Version

[Open this notebook in Google Colab](https://colab.research.google.com/github/starfishdata/starfish/blob/main/examples/data_factory.ipynb)

## Dependencies

Make sure to install the necessary dependencies.

```python
%pip install starfish-core
```

## Enabling Async Execution

A workaround for enabling async code execution in Jupyter notebooks. Not for production use.

```python
import nest_asyncio
nest_asyncio.apply()
```

## Environment Setup

Load environment variables from `.env` file.

```python
from starfish.common.env_loader import load_env_file
load_env_file()
```

## Example 1: Your First Data Factory: Simple Scaling

The `@data_factory` decorator allows transforming an async function into a scalable data pipeline.

### Create a StructuredLLM instance

```python
json_llm = StructuredLLM(
    model_name = "openai/gpt-4o-mini",
    prompt = "Funny facts about city {{city_name}}.",
    output_schema = [{'name': 'fact', 'type': 'str'}],
    model_kwargs = {"temperature": 0.7},
)
json_llm_response = await json_llm.run(city_name='New York')
json_llm_response.data
```

### Scale with Data Factory

```python
from datetime import datetime
@data_factory(max_concurrency=10)
async def process_json_llm(city_name: str):
    print(f"Processing {city_name} at {datetime.now()}")
    json_llm_response = await json_llm.run(city_name=city_name)
    return json_llm_response.data
process_json_llm.run(city_name=["New York", "London", "Tokyo", "Paris", "Sydney"])
```

## Example 2: Works with any Async Function

Data Factory works with any async function. Here is a chained example:

```python
@data_factory(max_concurrency=10)
async def complex_process_cities(topic: str):
    generator_llm = StructuredLLM(
        model_name="openai/gpt-4o-mini",
        prompt="Generate question/answer pairs about {{topic}}.",
        output_schema=[
            {"name": "question", "type": "str"},
            {"name": "answer", "type": "str"}
        ],
    )

    rater_llm = StructuredLLM(
        model_name="openai/gpt-4o-mini",
        prompt='''Rate the following Q&A pairs based on accuracy and clarity (1-10).\n
        Pairs: {{generated_pairs}}''',
        output_schema=[
            {"name": "accuracy_rating", "type": "int"},
            {"name": "clarity_rating", "type": "int"}
        ],
        model_kwargs={"temperature": 0.5}
    )

    generation_response = await generator_llm.run(topic=topic, num_records=5)
    rating_response = await rater_llm.run(generated_pairs=generation_response.data)
    return merge_structured_outputs(generation_response.data, rating_response.data)

complex_process_cities_data = complex_process_cities.run(topic=['Science', 'History', 'Technology'])
```

## Example 3: Working with Different Input Formats

Data Factory supports various input formats, enhancing flexibility.

```python
@data_factory(max_concurrency=100)
async def input_format_mock_llm(city_name: str, num_records_per_city: int):
    return await mock_llm_call(city_name=city_name, num_records_per_city=num_records_per_city, fail_rate=0.01)

# Example with different input formats.
input_format_data = input_format_mock_llm.run(city_name=["New York", "London"], num_records_per_city=1)
```

## Example 4: Resilient Error Retry

Data Factory handles errors gracefully with retry mechanisms.

```python
async def high_error_rate_mock_llm(city_name: str, num_records_per_city: int):
    return await mock_llm_call(city_name=city_name, num_records_per_city=num_records_per_city, fail_rate=0.3)

cities = ["New York", "London", "Tokyo"] * 5
high_error_rate_mock_llm_data = high_error_rate_mock_llm.run(city_name=cities, num_records_per_city=1)
```

## Advanced Usage

Data Factory offers hooks for customizations and coordination between tasks.

#### Resume
#### 5. Resume

This is essential for long-running jobs with thousands of tasks.

If a job is interrupted, you can pick up where you left off using one of two resume methods:


1. **Same Session Resume**: If you're still in the same session where the job was interrupted, simply call - Same instance with .resume()

2. **Cross-Session Resume**: If you've closed your notebook or lost your session, you can resume using the job ID:
   ```python
   from starfish import DataFactory
   # Resume using the master job ID from a previous run
   data_factory = DataFactory.resume_from_checkpoint(job_id="your_job_id")
   ```

The key difference:
- `resume()` uses the same DataFactory instance you defined
- `resume_from_checkpoint()` reconstructs your DataFactory from persistent storage where tasks and progress are saved

> **Note**: Google Colab users may experience issues with `resume_from_checkpoint()` due to how Colab works

We're simulating an interruption here. In a real scenario, this might happen if your notebook errors out, is manually interrupted with a keyboard command, encounters API rate limits, or experiences any other issues that halt execution.

```python
@data_factory(max_concurrency=10)
async def re_run_mock_llm(city_name: str, num_records_per_city: int):
    return await mock_llm_call(city_name=city_name, num_records_per_city=num_records_per_city, fail_rate=0.3)

cities = ["New York", "London", "Tokyo", "Paris", "Sydney"] * 20  # 100 cities
re_run_mock_llm_data_1 = re_run_mock_llm.run(city_name=cities, num_records_per_city=1)

print("When a job is interrupted, you'll see a message like:")
print("[RESUME INFO] 🚨 Job stopped unexpectedly. You can resume the job by calling .resume()")

print("\nTo resume an interrupted job, simply call:")
print("interrupted_job_mock_llm.resume()")
print('')
print(f"For this example we have {len(re_run_mock_llm_data_1)}/{len(cities)} data generated and not finished yet!")

re_run_mock_llm_data_2 = re_run_mock_llm.resume()

```

#### Dry Run

Before running a large job, you can do a "dry run" to test your pipeline. This only processes a single item and doesn't save state to the database.

```python
@data_factory(max_concurrency=10)
async def dry_run_mock_llm(city_name: str, num_records_per_city: int):
    return await mock_llm_call(city_name=city_name, num_records_per_city=num_records_per_city, fail_rate=0.3)

dry_run_mock_llm_data = dry_run_mock_llm.dry_run(city_name=["New York", "London", "Tokyo", "Paris", "Sydney"]*20, num_records_per_city=1)
```