{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Google Colab Version: [Open this notebook in Google Colab](https://colab.research.google.com/github/starfishdata/starfish/blob/main/examples/data_factory.ipynb)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Dependencies " ] }, { "cell_type": "code", "execution_count": 23, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Looking in indexes: https://test.pypi.org/simple/, https://pypi.org/simple\n", "Requirement already satisfied: starfish-core in /Users/john/Documents/projects/aa/python/starfish/starfish/.venv/lib/python3.11/site-packages (0.1.2)\n", "Requirement already satisfied: aiofiles<25.0.0,>=24.1.0 in /Users/john/Documents/projects/aa/python/starfish/starfish/.venv/lib/python3.11/site-packages (from starfish-core) (24.1.0)\n", "Requirement already satisfied: aiosqlite<0.22.0,>=0.21.0 in /Users/john/Documents/projects/aa/python/starfish/starfish/.venv/lib/python3.11/site-packages (from starfish-core) (0.21.0)\n", "Requirement already satisfied: cachetools<6.0.0,>=5.5.2 in /Users/john/Documents/projects/aa/python/starfish/starfish/.venv/lib/python3.11/site-packages (from starfish-core) (5.5.2)\n", "Requirement already satisfied: cloudpickle<3.0.0,>=2.2.0 in /Users/john/Documents/projects/aa/python/starfish/starfish/.venv/lib/python3.11/site-packages (from starfish-core) (2.2.1)\n", "Requirement already satisfied: cryptography>=44.0.1 in /Users/john/Documents/projects/aa/python/starfish/starfish/.venv/lib/python3.11/site-packages (from starfish-core) (44.0.3)\n", "Requirement already satisfied: docstring_parser<0.17.0,>=0.16.0 in /Users/john/Documents/projects/aa/python/starfish/starfish/.venv/lib/python3.11/site-packages (from starfish-core) (0.16)\n", "Requirement already satisfied: litellm<2.0.0,>=1.65.1 in /Users/john/Documents/projects/aa/python/starfish/starfish/.venv/lib/python3.11/site-packages (from starfish-core) (1.69.3)\n", "Requirement already satisfied: loguru<0.8.0,>=0.7.3 in /Users/john/Documents/projects/aa/python/starfish/starfish/.venv/lib/python3.11/site-packages (from starfish-core) (0.7.3)\n", "Requirement already satisfied: mcp<2.0.0,>=1.8.1 in /Users/john/Documents/projects/aa/python/starfish/starfish/.venv/lib/python3.11/site-packages (from starfish-core) (1.9.0)\n", "Requirement already satisfied: nest_asyncio<2.0.0,>=1.6.0 in /Users/john/Documents/projects/aa/python/starfish/starfish/.venv/lib/python3.11/site-packages (from starfish-core) (1.6.0)\n", "Requirement already satisfied: ollama<0.5.0,>=0.4.7 in /Users/john/Documents/projects/aa/python/starfish/starfish/.venv/lib/python3.11/site-packages (from starfish-core) (0.4.8)\n", "Requirement already satisfied: posthog<4.0.0,>=3.11.0 in /Users/john/Documents/projects/aa/python/starfish/starfish/.venv/lib/python3.11/site-packages (from starfish-core) (3.25.0)\n", "Requirement already satisfied: psutil<8.0.0,>=7.0.0 in /Users/john/Documents/projects/aa/python/starfish/starfish/.venv/lib/python3.11/site-packages (from starfish-core) (7.0.0)\n", "Requirement already satisfied: python-dotenv<2.0.0,>=1.1.0 in /Users/john/Documents/projects/aa/python/starfish/starfish/.venv/lib/python3.11/site-packages (from starfish-core) (1.1.0)\n", "Requirement already satisfied: typing-extensions<5.0.0,>=4.0.0 in /Users/john/Documents/projects/aa/python/starfish/starfish/.venv/lib/python3.11/site-packages (from starfish-core) (4.13.2)\n", "Requirement already satisfied: cffi>=1.12 in /Users/john/Documents/projects/aa/python/starfish/starfish/.venv/lib/python3.11/site-packages (from cryptography>=44.0.1->starfish-core) (1.17.1)\n", "Requirement already satisfied: aiohttp in /Users/john/Documents/projects/aa/python/starfish/starfish/.venv/lib/python3.11/site-packages (from litellm<2.0.0,>=1.65.1->starfish-core) (3.11.18)\n", "Requirement already satisfied: click in /Users/john/Documents/projects/aa/python/starfish/starfish/.venv/lib/python3.11/site-packages (from litellm<2.0.0,>=1.65.1->starfish-core) (8.2.0)\n", "Requirement already satisfied: httpx>=0.23.0 in /Users/john/Documents/projects/aa/python/starfish/starfish/.venv/lib/python3.11/site-packages (from litellm<2.0.0,>=1.65.1->starfish-core) (0.28.1)\n", "Requirement already satisfied: importlib-metadata>=6.8.0 in /Users/john/Documents/projects/aa/python/starfish/starfish/.venv/lib/python3.11/site-packages (from litellm<2.0.0,>=1.65.1->starfish-core) (8.7.0)\n", "Requirement already satisfied: jinja2<4.0.0,>=3.1.2 in /Users/john/Documents/projects/aa/python/starfish/starfish/.venv/lib/python3.11/site-packages (from litellm<2.0.0,>=1.65.1->starfish-core) (3.1.6)\n", "Requirement already satisfied: jsonschema<5.0.0,>=4.22.0 in /Users/john/Documents/projects/aa/python/starfish/starfish/.venv/lib/python3.11/site-packages (from litellm<2.0.0,>=1.65.1->starfish-core) (4.23.0)\n", "Requirement already satisfied: openai<1.76.0,>=1.68.2 in /Users/john/Documents/projects/aa/python/starfish/starfish/.venv/lib/python3.11/site-packages (from litellm<2.0.0,>=1.65.1->starfish-core) (1.75.0)\n", "Requirement already satisfied: pydantic<3.0.0,>=2.0.0 in /Users/john/Documents/projects/aa/python/starfish/starfish/.venv/lib/python3.11/site-packages (from litellm<2.0.0,>=1.65.1->starfish-core) (2.11.4)\n", "Requirement already satisfied: tiktoken>=0.7.0 in /Users/john/Documents/projects/aa/python/starfish/starfish/.venv/lib/python3.11/site-packages (from litellm<2.0.0,>=1.65.1->starfish-core) (0.9.0)\n", "Requirement already satisfied: tokenizers in /Users/john/Documents/projects/aa/python/starfish/starfish/.venv/lib/python3.11/site-packages (from litellm<2.0.0,>=1.65.1->starfish-core) (0.21.1)\n", "Requirement already satisfied: anyio>=4.5 in /Users/john/Documents/projects/aa/python/starfish/starfish/.venv/lib/python3.11/site-packages (from mcp<2.0.0,>=1.8.1->starfish-core) (4.9.0)\n", "Requirement already satisfied: httpx-sse>=0.4 in /Users/john/Documents/projects/aa/python/starfish/starfish/.venv/lib/python3.11/site-packages (from mcp<2.0.0,>=1.8.1->starfish-core) (0.4.0)\n", "Requirement already satisfied: pydantic-settings>=2.5.2 in /Users/john/Documents/projects/aa/python/starfish/starfish/.venv/lib/python3.11/site-packages (from mcp<2.0.0,>=1.8.1->starfish-core) (2.9.1)\n", "Requirement already satisfied: python-multipart>=0.0.9 in /Users/john/Documents/projects/aa/python/starfish/starfish/.venv/lib/python3.11/site-packages (from mcp<2.0.0,>=1.8.1->starfish-core) (0.0.20)\n", "Requirement already satisfied: sse-starlette>=1.6.1 in /Users/john/Documents/projects/aa/python/starfish/starfish/.venv/lib/python3.11/site-packages (from mcp<2.0.0,>=1.8.1->starfish-core) (2.3.5)\n", "Requirement already satisfied: starlette>=0.27 in /Users/john/Documents/projects/aa/python/starfish/starfish/.venv/lib/python3.11/site-packages (from mcp<2.0.0,>=1.8.1->starfish-core) (0.46.2)\n", "Requirement already satisfied: uvicorn>=0.23.1 in /Users/john/Documents/projects/aa/python/starfish/starfish/.venv/lib/python3.11/site-packages (from mcp<2.0.0,>=1.8.1->starfish-core) (0.34.2)\n", "Requirement already satisfied: requests<3.0,>=2.7 in /Users/john/Documents/projects/aa/python/starfish/starfish/.venv/lib/python3.11/site-packages (from posthog<4.0.0,>=3.11.0->starfish-core) (2.32.3)\n", "Requirement already satisfied: six>=1.5 in /Users/john/Documents/projects/aa/python/starfish/starfish/.venv/lib/python3.11/site-packages (from posthog<4.0.0,>=3.11.0->starfish-core) (1.17.0)\n", "Requirement already satisfied: monotonic>=1.5 in /Users/john/Documents/projects/aa/python/starfish/starfish/.venv/lib/python3.11/site-packages (from posthog<4.0.0,>=3.11.0->starfish-core) (1.6)\n", "Requirement already satisfied: backoff>=1.10.0 in /Users/john/Documents/projects/aa/python/starfish/starfish/.venv/lib/python3.11/site-packages (from posthog<4.0.0,>=3.11.0->starfish-core) (2.2.1)\n", "Requirement already satisfied: python-dateutil>2.1 in /Users/john/Documents/projects/aa/python/starfish/starfish/.venv/lib/python3.11/site-packages (from posthog<4.0.0,>=3.11.0->starfish-core) (2.9.0.post0)\n", "Requirement already satisfied: distro>=1.5.0 in /Users/john/Documents/projects/aa/python/starfish/starfish/.venv/lib/python3.11/site-packages (from posthog<4.0.0,>=3.11.0->starfish-core) (1.9.0)\n", "Requirement already satisfied: idna>=2.8 in /Users/john/Documents/projects/aa/python/starfish/starfish/.venv/lib/python3.11/site-packages (from anyio>=4.5->mcp<2.0.0,>=1.8.1->starfish-core) (3.10)\n", "Requirement already satisfied: sniffio>=1.1 in /Users/john/Documents/projects/aa/python/starfish/starfish/.venv/lib/python3.11/site-packages (from anyio>=4.5->mcp<2.0.0,>=1.8.1->starfish-core) (1.3.1)\n", "Requirement already satisfied: pycparser in /Users/john/Documents/projects/aa/python/starfish/starfish/.venv/lib/python3.11/site-packages (from cffi>=1.12->cryptography>=44.0.1->starfish-core) (2.22)\n", "Requirement already satisfied: certifi in /Users/john/Documents/projects/aa/python/starfish/starfish/.venv/lib/python3.11/site-packages (from httpx>=0.23.0->litellm<2.0.0,>=1.65.1->starfish-core) (2025.4.26)\n", "Requirement already satisfied: httpcore==1.* in /Users/john/Documents/projects/aa/python/starfish/starfish/.venv/lib/python3.11/site-packages (from httpx>=0.23.0->litellm<2.0.0,>=1.65.1->starfish-core) (1.0.9)\n", "Requirement already satisfied: h11>=0.16 in /Users/john/Documents/projects/aa/python/starfish/starfish/.venv/lib/python3.11/site-packages (from httpcore==1.*->httpx>=0.23.0->litellm<2.0.0,>=1.65.1->starfish-core) (0.16.0)\n", "Requirement already satisfied: zipp>=3.20 in /Users/john/Documents/projects/aa/python/starfish/starfish/.venv/lib/python3.11/site-packages (from importlib-metadata>=6.8.0->litellm<2.0.0,>=1.65.1->starfish-core) (3.21.0)\n", "Requirement already satisfied: MarkupSafe>=2.0 in /Users/john/Documents/projects/aa/python/starfish/starfish/.venv/lib/python3.11/site-packages (from jinja2<4.0.0,>=3.1.2->litellm<2.0.0,>=1.65.1->starfish-core) (3.0.2)\n", "Requirement already satisfied: attrs>=22.2.0 in /Users/john/Documents/projects/aa/python/starfish/starfish/.venv/lib/python3.11/site-packages (from jsonschema<5.0.0,>=4.22.0->litellm<2.0.0,>=1.65.1->starfish-core) (25.3.0)\n", "Requirement already satisfied: jsonschema-specifications>=2023.03.6 in /Users/john/Documents/projects/aa/python/starfish/starfish/.venv/lib/python3.11/site-packages (from jsonschema<5.0.0,>=4.22.0->litellm<2.0.0,>=1.65.1->starfish-core) (2025.4.1)\n", "Requirement already satisfied: referencing>=0.28.4 in /Users/john/Documents/projects/aa/python/starfish/starfish/.venv/lib/python3.11/site-packages (from jsonschema<5.0.0,>=4.22.0->litellm<2.0.0,>=1.65.1->starfish-core) (0.36.2)\n", "Requirement already satisfied: rpds-py>=0.7.1 in /Users/john/Documents/projects/aa/python/starfish/starfish/.venv/lib/python3.11/site-packages (from jsonschema<5.0.0,>=4.22.0->litellm<2.0.0,>=1.65.1->starfish-core) (0.25.0)\n", "Requirement already satisfied: jiter<1,>=0.4.0 in /Users/john/Documents/projects/aa/python/starfish/starfish/.venv/lib/python3.11/site-packages (from openai<1.76.0,>=1.68.2->litellm<2.0.0,>=1.65.1->starfish-core) (0.9.0)\n", "Requirement already satisfied: tqdm>4 in /Users/john/Documents/projects/aa/python/starfish/starfish/.venv/lib/python3.11/site-packages (from openai<1.76.0,>=1.68.2->litellm<2.0.0,>=1.65.1->starfish-core) (4.67.1)\n", "Requirement already satisfied: annotated-types>=0.6.0 in /Users/john/Documents/projects/aa/python/starfish/starfish/.venv/lib/python3.11/site-packages (from pydantic<3.0.0,>=2.0.0->litellm<2.0.0,>=1.65.1->starfish-core) (0.7.0)\n", "Requirement already satisfied: pydantic-core==2.33.2 in /Users/john/Documents/projects/aa/python/starfish/starfish/.venv/lib/python3.11/site-packages (from pydantic<3.0.0,>=2.0.0->litellm<2.0.0,>=1.65.1->starfish-core) (2.33.2)\n", "Requirement already satisfied: typing-inspection>=0.4.0 in /Users/john/Documents/projects/aa/python/starfish/starfish/.venv/lib/python3.11/site-packages (from pydantic<3.0.0,>=2.0.0->litellm<2.0.0,>=1.65.1->starfish-core) (0.4.0)\n", "Requirement already satisfied: charset-normalizer<4,>=2 in /Users/john/Documents/projects/aa/python/starfish/starfish/.venv/lib/python3.11/site-packages (from requests<3.0,>=2.7->posthog<4.0.0,>=3.11.0->starfish-core) (3.4.2)\n", "Requirement already satisfied: urllib3<3,>=1.21.1 in /Users/john/Documents/projects/aa/python/starfish/starfish/.venv/lib/python3.11/site-packages (from requests<3.0,>=2.7->posthog<4.0.0,>=3.11.0->starfish-core) (2.4.0)\n", "Requirement already satisfied: regex>=2022.1.18 in /Users/john/Documents/projects/aa/python/starfish/starfish/.venv/lib/python3.11/site-packages (from tiktoken>=0.7.0->litellm<2.0.0,>=1.65.1->starfish-core) (2024.11.6)\n", "Requirement already satisfied: aiohappyeyeballs>=2.3.0 in /Users/john/Documents/projects/aa/python/starfish/starfish/.venv/lib/python3.11/site-packages (from aiohttp->litellm<2.0.0,>=1.65.1->starfish-core) (2.6.1)\n", "Requirement already satisfied: aiosignal>=1.1.2 in /Users/john/Documents/projects/aa/python/starfish/starfish/.venv/lib/python3.11/site-packages (from aiohttp->litellm<2.0.0,>=1.65.1->starfish-core) (1.3.2)\n", "Requirement already satisfied: frozenlist>=1.1.1 in /Users/john/Documents/projects/aa/python/starfish/starfish/.venv/lib/python3.11/site-packages (from aiohttp->litellm<2.0.0,>=1.65.1->starfish-core) (1.6.0)\n", "Requirement already satisfied: multidict<7.0,>=4.5 in /Users/john/Documents/projects/aa/python/starfish/starfish/.venv/lib/python3.11/site-packages (from aiohttp->litellm<2.0.0,>=1.65.1->starfish-core) (6.4.3)\n", "Requirement already satisfied: propcache>=0.2.0 in /Users/john/Documents/projects/aa/python/starfish/starfish/.venv/lib/python3.11/site-packages (from aiohttp->litellm<2.0.0,>=1.65.1->starfish-core) (0.3.1)\n", "Requirement already satisfied: yarl<2.0,>=1.17.0 in /Users/john/Documents/projects/aa/python/starfish/starfish/.venv/lib/python3.11/site-packages (from aiohttp->litellm<2.0.0,>=1.65.1->starfish-core) (1.20.0)\n", "Requirement already satisfied: huggingface-hub<1.0,>=0.16.4 in /Users/john/Documents/projects/aa/python/starfish/starfish/.venv/lib/python3.11/site-packages (from tokenizers->litellm<2.0.0,>=1.65.1->starfish-core) (0.31.2)\n", "Requirement already satisfied: filelock in /Users/john/Documents/projects/aa/python/starfish/starfish/.venv/lib/python3.11/site-packages (from huggingface-hub<1.0,>=0.16.4->tokenizers->litellm<2.0.0,>=1.65.1->starfish-core) (3.18.0)\n", "Requirement already satisfied: fsspec>=2023.5.0 in /Users/john/Documents/projects/aa/python/starfish/starfish/.venv/lib/python3.11/site-packages (from huggingface-hub<1.0,>=0.16.4->tokenizers->litellm<2.0.0,>=1.65.1->starfish-core) (2025.3.2)\n", "Requirement already satisfied: packaging>=20.9 in /Users/john/Documents/projects/aa/python/starfish/starfish/.venv/lib/python3.11/site-packages (from huggingface-hub<1.0,>=0.16.4->tokenizers->litellm<2.0.0,>=1.65.1->starfish-core) (25.0)\n", "Requirement already satisfied: pyyaml>=5.1 in /Users/john/Documents/projects/aa/python/starfish/starfish/.venv/lib/python3.11/site-packages (from huggingface-hub<1.0,>=0.16.4->tokenizers->litellm<2.0.0,>=1.65.1->starfish-core) (6.0.2)\n", "\n", "\u001b[1m[\u001b[0m\u001b[34;49mnotice\u001b[0m\u001b[1;39;49m]\u001b[0m\u001b[39;49m A new release of pip is available: \u001b[0m\u001b[31;49m25.0.1\u001b[0m\u001b[39;49m -> \u001b[0m\u001b[32;49m25.1.1\u001b[0m\n", "\u001b[1m[\u001b[0m\u001b[34;49mnotice\u001b[0m\u001b[1;39;49m]\u001b[0m\u001b[39;49m To update, run: \u001b[0m\u001b[32;49mpip install --upgrade pip\u001b[0m\n", "Note: you may need to restart the kernel to use updated packages.\n" ] } ], "source": [ "#%pip install starfish-core\n", "%pip install --index-url https://test.pypi.org/simple/ \\\n", " --extra-index-url https://pypi.org/simple \\\n", " starfish-core" ] }, { "cell_type": "code", "execution_count": 24, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "\u001b[32m2025-05-23 22:50:10\u001b[0m | \u001b[33m\u001b[1mWARNING \u001b[0m | \u001b[33m\u001b[1mFailed to load environment variables from /Users/john/Documents/projects/aa/python/starfish/starfish/.env\u001b[0m\n" ] } ], "source": [ "## Fix for Jupyter Notebook only — do NOT use in production\n", "## Enables async code execution in notebooks, but may cause issues with sync/async issues\n", "## For production, please run in standard .py files without this workaround\n", "## See: https://github.com/erdewit/nest_asyncio for more details\n", "import nest_asyncio\n", "nest_asyncio.apply()\n", "\n", "from starfish import StructuredLLM, data_factory\n", "from starfish.llm.utils import merge_structured_outputs\n", "\n", "from starfish.common.env_loader import load_env_file ## Load environment variables from .env file\n", "load_env_file()" ] }, { "cell_type": "code", "execution_count": 25, "metadata": {}, "outputs": [], "source": [ "## Helper function mock llm call\n", "# When developing data pipelines with LLMs, making thousands of real API calls\n", "# can be expensive. Using mock LLM calls lets you test your pipeline's reliability,\n", "# failure handling, and recovery without spending money on API calls.\n", "from starfish.data_factory.utils.mock import mock_llm_call" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### 3. Working with Different Input Formats\n", "\n", "\n", "Data Factory is flexible with how you provide inputs. Let's demonstrate different ways to pass parameters to data_factory functions.\n", "\n", "'data' is a reserved keyword expecting list(dict) or tuple(dict) - this design make it super easy to pass large data and support HuggingFace and Pandas dataframe very easily" ] }, { "cell_type": "code", "execution_count": 26, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[{'answer': 'New York_3'}, {'answer': 'New York_1'}, {'answer': 'New York_5'}]" ] }, "execution_count": 26, "metadata": {}, "output_type": "execute_result" } ], "source": [ "## We will be using mock llm call for rest of example to save on token\n", "## Mock LLM call is a function that simulates an LLM API call with random delays (controlled by sleep_time) and occasional failures (controlled by fail_rate)\n", "await mock_llm_call(city_name=\"New York\", num_records_per_city=3)" ] }, { "cell_type": "code", "execution_count": 27, "metadata": {}, "outputs": [], "source": [ "@data_factory(max_concurrency=100)\n", "async def input_format_mock_llm(city_name: str, num_records_per_city: int):\n", " return await mock_llm_call(city_name=city_name, num_records_per_city=num_records_per_city, fail_rate=0.01)" ] }, { "cell_type": "code", "execution_count": 28, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "\u001b[32m2025-05-23 22:50:10\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[1m\u001b[1m[JOB START]\u001b[0m \u001b[36mMaster Job ID: 4da82fc7-4112-4e05-b58c-53cf470747ad\u001b[0m | \u001b[33mLogging progress every 3 seconds\u001b[0m\u001b[0m\n", "\u001b[32m2025-05-23 22:50:10\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[1m[JOB PROGRESS] \u001b[32mCompleted: 0/5\u001b[0m | \u001b[33mRunning: 5\u001b[0m | \u001b[36mAttempted: 0\u001b[0m (\u001b[32mCompleted: 0\u001b[0m, \u001b[31mFailed: 0\u001b[0m, \u001b[35mFiltered: 0\u001b[0m, \u001b[34mDuplicate: 0\u001b[0m, \u001b[1;31mInDeadQueue: 0\u001b[0m)\u001b[0m\n", "\u001b[32m2025-05-23 22:50:11\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[1m[JOB FINISHED] \u001b[1mFinal Status:\u001b[0m \u001b[32mCompleted: 5/5\u001b[0m | \u001b[33mAttempted: 5\u001b[0m (Failed: 0, Filtered: 0, Duplicate: 0, InDeadQueue: 0)\u001b[0m\n" ] } ], "source": [ "# Format 1: Multiple lists that get zipped together\n", "input_format_data1 = input_format_mock_llm.run(city_name=[\"New York\", \"London\", \"Tokyo\", \"Paris\", \"Sydney\"], num_records_per_city=[2, 1, 1, 1, 1])" ] }, { "cell_type": "code", "execution_count": 29, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "\u001b[32m2025-05-23 22:50:11\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[1m\u001b[1m[JOB START]\u001b[0m \u001b[36mMaster Job ID: 73973449-6069-485e-ac8c-b1b3a6b3f1a4\u001b[0m | \u001b[33mLogging progress every 3 seconds\u001b[0m\u001b[0m\n", "\u001b[32m2025-05-23 22:50:11\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[1m[JOB PROGRESS] \u001b[32mCompleted: 0/5\u001b[0m | \u001b[33mRunning: 5\u001b[0m | \u001b[36mAttempted: 0\u001b[0m (\u001b[32mCompleted: 0\u001b[0m, \u001b[31mFailed: 0\u001b[0m, \u001b[35mFiltered: 0\u001b[0m, \u001b[34mDuplicate: 0\u001b[0m, \u001b[1;31mInDeadQueue: 0\u001b[0m)\u001b[0m\n", "\u001b[32m2025-05-23 22:50:12\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[1m[JOB FINISHED] \u001b[1mFinal Status:\u001b[0m \u001b[32mCompleted: 5/5\u001b[0m | \u001b[33mAttempted: 5\u001b[0m (Failed: 0, Filtered: 0, Duplicate: 0, InDeadQueue: 0)\u001b[0m\n" ] } ], "source": [ "# Format 2: List + single value (single value gets broadcasted)\n", "input_format_data2 = input_format_mock_llm.run(city_name=[\"New York\", \"London\", \"Tokyo\", \"Paris\", \"Sydney\"], num_records_per_city=1)" ] }, { "cell_type": "code", "execution_count": 30, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "\u001b[32m2025-05-23 22:50:12\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[1m\u001b[1m[JOB START]\u001b[0m \u001b[36mMaster Job ID: aa9954f9-fc18-4b42-959e-fb2a897987c7\u001b[0m | \u001b[33mLogging progress every 3 seconds\u001b[0m\u001b[0m\n", "\u001b[32m2025-05-23 22:50:12\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[1m[JOB PROGRESS] \u001b[32mCompleted: 0/5\u001b[0m | \u001b[33mRunning: 5\u001b[0m | \u001b[36mAttempted: 0\u001b[0m (\u001b[32mCompleted: 0\u001b[0m, \u001b[31mFailed: 0\u001b[0m, \u001b[35mFiltered: 0\u001b[0m, \u001b[34mDuplicate: 0\u001b[0m, \u001b[1;31mInDeadQueue: 0\u001b[0m)\u001b[0m\n", "\u001b[32m2025-05-23 22:50:13\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[1m[JOB FINISHED] \u001b[1mFinal Status:\u001b[0m \u001b[32mCompleted: 5/5\u001b[0m | \u001b[33mAttempted: 5\u001b[0m (Failed: 0, Filtered: 0, Duplicate: 0, InDeadQueue: 0)\u001b[0m\n" ] } ], "source": [ "# Format 3: Special 'data' parameter\n", "# 'data' is a reserved keyword expecting list(dict) or tuple(dict)\n", "# Makes integration with various data sources easier\n", "input_format_data3 = input_format_mock_llm.run(data=[{\"city_name\": \"New York\", \"num_records_per_city\": 2}, {\"city_name\": \"London\", \"num_records_per_city\": 1}, {\"city_name\": \"Tokyo\", \"num_records_per_city\": 1}, {\"city_name\": \"Paris\", \"num_records_per_city\": 1}, {\"city_name\": \"Sydney\", \"num_records_per_city\": 1}])" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### 4. Resilient error retry\n", "Data Factory automatically handles errors and retries, making your pipelines robust.\n", "\n", "Let's demonstrate with a high failure rate example." ] }, { "cell_type": "code", "execution_count": 31, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "\u001b[32m2025-05-23 22:50:13\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[1m\u001b[1m[JOB START]\u001b[0m \u001b[36mMaster Job ID: 730b766d-3c23-419a-a3dd-271d683818b1\u001b[0m | \u001b[33mLogging progress every 3 seconds\u001b[0m\u001b[0m\n", "\u001b[32m2025-05-23 22:50:13\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[1m[JOB PROGRESS] \u001b[32mCompleted: 0/25\u001b[0m | \u001b[33mRunning: 25\u001b[0m | \u001b[36mAttempted: 0\u001b[0m (\u001b[32mCompleted: 0\u001b[0m, \u001b[31mFailed: 0\u001b[0m, \u001b[35mFiltered: 0\u001b[0m, \u001b[34mDuplicate: 0\u001b[0m, \u001b[1;31mInDeadQueue: 0\u001b[0m)\u001b[0m\n", "\u001b[32m2025-05-23 22:50:15\u001b[0m | \u001b[31m\u001b[1mERROR \u001b[0m | \u001b[31m\u001b[1mError running task: Mock LLM failed to process city: Tokyo\u001b[0m\n", "\u001b[32m2025-05-23 22:50:15\u001b[0m | \u001b[31m\u001b[1mERROR \u001b[0m | \u001b[31m\u001b[1mError running task: Mock LLM failed to process city: New York\u001b[0m\n", "\u001b[32m2025-05-23 22:50:16\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[1m[JOB PROGRESS] \u001b[32mCompleted: 23/25\u001b[0m | \u001b[33mRunning: 0\u001b[0m | \u001b[36mAttempted: 25\u001b[0m (\u001b[32mCompleted: 23\u001b[0m, \u001b[31mFailed: 2\u001b[0m, \u001b[35mFiltered: 0\u001b[0m, \u001b[34mDuplicate: 0\u001b[0m, \u001b[1;31mInDeadQueue: 0\u001b[0m)\u001b[0m\n", "\u001b[32m2025-05-23 22:50:19\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[1m[JOB FINISHED] \u001b[1mFinal Status:\u001b[0m \u001b[32mCompleted: 25/25\u001b[0m | \u001b[33mAttempted: 27\u001b[0m (Failed: 2, Filtered: 0, Duplicate: 0, InDeadQueue: 0)\u001b[0m\n", "\n", "Successfully completed 25 out of 25 tasks\n", "Data Factory automatically handled the failures and continued processing\n", "The results only include successful tasks\n" ] } ], "source": [ "@data_factory(max_concurrency=100)\n", "async def high_error_rate_mock_llm(city_name: str, num_records_per_city: int):\n", " return await mock_llm_call(city_name=city_name, num_records_per_city=num_records_per_city, fail_rate=0.3) # Hardcode to 30% chance of failure\n", "\n", "# Process all cities - some will fail, but data_factory keeps going\n", "cities = [\"New York\", \"London\", \"Tokyo\", \"Paris\", \"Sydney\"] * 5 # 25 cities\n", "high_error_rate_mock_lllm_data = high_error_rate_mock_llm.run(city_name=cities, num_records_per_city=1)\n", "\n", "print(f\"\\nSuccessfully completed {len(high_error_rate_mock_lllm_data)} out of {len(cities)} tasks\")\n", "print(\"Data Factory automatically handled the failures and continued processing\")\n", "print(\"The results only include successful tasks\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### 5. Resume\n", "\n", "This is essential for long-running jobs with thousands of tasks.\n", "\n", "If a job is interrupted, you can pick up where you left off using one of two resume methods:\n", "\n", "\n", "1. **Same Session Resume**: If you're still in the same session where the job was interrupted, simply call - Same instance with .resume()\n", "\n", "2. **Cross-Session Resume**: If you've closed your notebook or lost your session, you can resume using the job ID:\n", " ```python\n", " from starfish import DataFactory\n", " # Resume using the master job ID from a previous run\n", " data_factory = DataFactory.resume_from_checkpoint(job_id=\"your_job_id\")\n", " ```\n", "\n", "The key difference:\n", "- `resume()` uses the same DataFactory instance you defined\n", "- `resume_from_checkpoint()` reconstructs your DataFactory from persistent storage where tasks and progress are saved\n", "\n", "> **Note**: Google Colab users may experience issues with `resume_from_checkpoint()` due to how Colab works\n", "\n", "We're simulating an interruption here. In a real scenario, this might happen if your notebook errors out, is manually interrupted with a keyboard command, encounters API rate limits, or experiences any other issues that halt execution." ] }, { "cell_type": "code", "execution_count": 32, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "\u001b[32m2025-05-23 22:50:19\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[1m\u001b[1m[JOB START]\u001b[0m \u001b[36mMaster Job ID: 6829de29-0b83-4a64-835b-cc79cbad5e3a\u001b[0m | \u001b[33mLogging progress every 3 seconds\u001b[0m\u001b[0m\n", "\u001b[32m2025-05-23 22:50:19\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[1m[JOB PROGRESS] \u001b[32mCompleted: 0/100\u001b[0m | \u001b[33mRunning: 10\u001b[0m | \u001b[36mAttempted: 0\u001b[0m (\u001b[32mCompleted: 0\u001b[0m, \u001b[31mFailed: 0\u001b[0m, \u001b[35mFiltered: 0\u001b[0m, \u001b[34mDuplicate: 0\u001b[0m, \u001b[1;31mInDeadQueue: 0\u001b[0m)\u001b[0m\n", "\u001b[32m2025-05-23 22:50:21\u001b[0m | \u001b[31m\u001b[1mERROR \u001b[0m | \u001b[31m\u001b[1mError running task: Mock LLM failed to process city: Paris\u001b[0m\n", "\u001b[32m2025-05-23 22:50:21\u001b[0m | \u001b[31m\u001b[1mERROR \u001b[0m | \u001b[31m\u001b[1mError running task: Mock LLM failed to process city: Sydney\u001b[0m\n", "\u001b[32m2025-05-23 22:50:21\u001b[0m | \u001b[31m\u001b[1mERROR \u001b[0m | \u001b[31m\u001b[1mError running task: Mock LLM failed to process city: New York\u001b[0m\n", "\u001b[32m2025-05-23 22:50:21\u001b[0m | \u001b[31m\u001b[1mERROR \u001b[0m | \u001b[31m\u001b[1mconsecutive_not_completed: in 3 times, stopping this job; please adjust factory config and input data then resume_from_checkpoint(6829de29-0b83-4a64-835b-cc79cbad5e3a)\u001b[0m\n", "\u001b[32m2025-05-23 22:50:21\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[1m[JOB FINISHED] \u001b[1mFinal Status:\u001b[0m \u001b[32mCompleted: 17/100\u001b[0m | \u001b[33mAttempted: 20\u001b[0m (Failed: 3, Filtered: 0, Duplicate: 0, InDeadQueue: 0)\u001b[0m\n" ] } ], "source": [ "@data_factory(max_concurrency=10)\n", "async def re_run_mock_llm(city_name: str, num_records_per_city: int):\n", " return await mock_llm_call(city_name=city_name, num_records_per_city=num_records_per_city, fail_rate=0.3)\n", "\n", "cities = [\"New York\", \"London\", \"Tokyo\", \"Paris\", \"Sydney\"] * 20 # 100 cities\n", "re_run_mock_llm_data_1 = re_run_mock_llm.run(city_name=cities, num_records_per_city=1)" ] }, { "cell_type": "code", "execution_count": 33, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "When a job is interrupted, you'll see a message like:\n", "[RESUME INFO] 🚨 Job stopped unexpectedly. You can resume the job by calling .resume()\n", "\n", "To resume an interrupted job, simply call:\n", "interrupted_job_mock_llm.resume()\n", "\n", "For this example we have 17/100 data generated and not finished yet!\n" ] } ], "source": [ "print(\"When a job is interrupted, you'll see a message like:\")\n", "print(\"[RESUME INFO] 🚨 Job stopped unexpectedly. You can resume the job by calling .resume()\")\n", "\n", "print(\"\\nTo resume an interrupted job, simply call:\")\n", "print(\"interrupted_job_mock_llm.resume()\")\n", "print('')\n", "print(f\"For this example we have {len(re_run_mock_llm_data_1)}/{len(cities)} data generated and not finished yet!\")" ] }, { "cell_type": "code", "execution_count": 34, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "\u001b[32m2025-05-23 22:50:22\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[1m\u001b[1m[JOB RESUME START]\u001b[0m \u001b[33mPICKING UP FROM WHERE THE JOB WAS LEFT OFF...\u001b[0m\n", "\u001b[0m\n", "\u001b[32m2025-05-23 22:50:22\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[1m\u001b[1m[RESUME PROGRESS] STATUS AT THE TIME OF RESUME:\u001b[0m \u001b[32mCompleted: 17 / 100\u001b[0m | \u001b[31mFailed: 3\u001b[0m | \u001b[31mDuplicate: 0\u001b[0m | \u001b[33mFiltered: 0\u001b[0m\u001b[0m\n", "\u001b[32m2025-05-23 22:50:22\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[1m[JOB PROGRESS] \u001b[32mCompleted: 17/100\u001b[0m | \u001b[33mRunning: 10\u001b[0m | \u001b[36mAttempted: 20\u001b[0m (\u001b[32mCompleted: 17\u001b[0m, \u001b[31mFailed: 3\u001b[0m, \u001b[35mFiltered: 0\u001b[0m, \u001b[34mDuplicate: 0\u001b[0m, \u001b[1;31mInDeadQueue: 0\u001b[0m)\u001b[0m\n", "\u001b[32m2025-05-23 22:50:24\u001b[0m | \u001b[31m\u001b[1mERROR \u001b[0m | \u001b[31m\u001b[1mError running task: Mock LLM failed to process city: Paris\u001b[0m\n", "\u001b[32m2025-05-23 22:50:24\u001b[0m | \u001b[31m\u001b[1mERROR \u001b[0m | \u001b[31m\u001b[1mconsecutive_not_completed: in 3 times, stopping this job; please adjust factory config and input data then resume_from_checkpoint(6829de29-0b83-4a64-835b-cc79cbad5e3a)\u001b[0m\n", "\u001b[32m2025-05-23 22:50:24\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[1m[JOB FINISHED] \u001b[1mFinal Status:\u001b[0m \u001b[32mCompleted: 30/100\u001b[0m | \u001b[33mAttempted: 34\u001b[0m (Failed: 4, Filtered: 0, Duplicate: 0, InDeadQueue: 0)\u001b[0m\n" ] } ], "source": [ "## Lets keep continue the rest of run by resume_from_checkpoint \n", "re_run_mock_llm_data_2 = re_run_mock_llm.resume()" ] }, { "cell_type": "code", "execution_count": 35, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Now we still able to finished with what is left!! 30 data generated!\n" ] } ], "source": [ "print(f\"Now we still able to finished with what is left!! {len(re_run_mock_llm_data_2)} data generated!\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### 6. Dry run\n", "Before running a large job, you can do a \"dry run\" to test your pipeline. This only processes a single item and doesn't save state to the database." ] }, { "cell_type": "code", "execution_count": 36, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "\u001b[32m2025-05-23 22:50:24\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[1m\u001b[1m[JOB START]\u001b[0m \u001b[36mMaster Job ID: None\u001b[0m | \u001b[33mLogging progress every 3 seconds\u001b[0m\u001b[0m\n", "\u001b[32m2025-05-23 22:50:24\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[1m[JOB PROGRESS] \u001b[32mCompleted: 0/1\u001b[0m | \u001b[33mRunning: 1\u001b[0m | \u001b[36mAttempted: 0\u001b[0m (\u001b[32mCompleted: 0\u001b[0m, \u001b[31mFailed: 0\u001b[0m, \u001b[35mFiltered: 0\u001b[0m, \u001b[34mDuplicate: 0\u001b[0m, \u001b[1;31mInDeadQueue: 0\u001b[0m)\u001b[0m\n", "\u001b[32m2025-05-23 22:50:25\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[1m[JOB FINISHED] \u001b[1mFinal Status:\u001b[0m \u001b[32mCompleted: 1/0\u001b[0m | \u001b[33mAttempted: 1\u001b[0m (Failed: 0, Filtered: 0, Duplicate: 0, InDeadQueue: 0)\u001b[0m\n" ] } ], "source": [ "@data_factory(max_concurrency=10)\n", "async def dry_run_mock_llm(city_name: str, num_records_per_city: int):\n", " return await mock_llm_call(city_name=city_name, num_records_per_city=num_records_per_city, fail_rate=0.3)\n", "\n", "dry_run_mock_llm_data = dry_run_mock_llm.dry_run(city_name=[\"New York\", \"London\", \"Tokyo\", \"Paris\", \"Sydney\"]*20, num_records_per_city=1)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### 8. Advanced Usage\n", "Data Factory offers more advanced capabilities for complete pipeline customization, including hooks that execute at key stages and shareable state to coordinate between tasks. These powerful features enable complex workflows and fine-grained control. Our dedicated examples for advanced data_factory usage will be coming soon!" ] } ], "metadata": { "kernelspec": { "display_name": ".venv", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.11.4" } }, "nbformat": 4, "nbformat_minor": 2 }