{ "cells": [ { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [], "source": [ "import json\n", "with open('/mnt/data/zifeng.cao/reasoning/arc-agi/re-arc/origin_code.jsonl', 'r') as f:\n", " data_list = [json.loads(line) for line in f]\n" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "def verify_dbc1a6ce(I: Grid) -> Grid:\n", " x0 = leastcolor(I)\n", " x1 = ofcolor(I, x0)\n", " x2 = lbind(recolor, EIGHT)\n", " x3 = rbind(ofcolor, x0)\n", " x4 = chain(x2, backdrop, x3)\n", " x5 = fork(paint, identity, x4)\n", " x6 = height(I)\n", " x7 = vsplit(I, x6)\n", " x8 = mapply(x5, x7)\n", " x9 = ofcolor(x8, EIGHT)\n", " x10 = dmirror(I)\n", " x11 = width(I)\n", " x12 = vsplit(x10, x11)\n", " x13 = mapply(x5, x12)\n", " x14 = dmirror(x13)\n", " x15 = ofcolor(x14, EIGHT)\n", " x16 = combine(x9, x15)\n", " x17 = difference(x16, x1)\n", " x18 = fill(I, EIGHT, x17)\n", " return x18\n", "\n", "\n", "\n", "def generate_dbc1a6ce(diff_lb: float, diff_ub: float) -> dict:\n", " dim_bounds = (3, 30)\n", " colopts = remove(8, interval(0, 10, 1))\n", " h = unifint(diff_lb, diff_ub, dim_bounds)\n", " w = unifint(diff_lb, diff_ub, dim_bounds)\n", " bgc = choice(colopts)\n", " c = canvas(bgc, (h, w))\n", " inds = totuple(asindices(c))\n", " card_bounds = (0, max(1, (h * w) // 4))\n", " num = unifint(diff_lb, diff_ub, card_bounds)\n", " s = sample(inds, num)\n", " fgcol = choice(remove(bgc, colopts))\n", " gi = fill(c, fgcol, s)\n", " resh = frozenset()\n", " for x, r in enumerate(gi):\n", " if r.count(fgcol) > 1:\n", " resh = combine(resh, connect((x, r.index(fgcol)), (x, -1 + w - r[::-1].index(fgcol))))\n", " go = fill(c, 8, resh)\n", " resv = frozenset()\n", " for x, r in enumerate(dmirror(gi)):\n", " if r.count(fgcol) > 1:\n", " resv = combine(resv, connect((x, r.index(fgcol)), (x, -1 + h - r[::-1].index(fgcol))))\n", " go = dmirror(fill(dmirror(go), 8, resv))\n", " go = fill(go, fgcol, s)\n", " return {'input': gi, 'output': go}\n", "\n", "\n", "\n", "\n", "\n", "from typing import (\n", " List,\n", " Union,\n", " Tuple,\n", " Any,\n", " Container,\n", " Callable,\n", " FrozenSet,\n", " Iterable\n", ")\n", "\n", "Boolean = bool\n", "Integer = int\n", "IntegerTuple = Tuple[Integer, Integer]\n", "Numerical = Union[Integer, IntegerTuple]\n", "IntegerSet = FrozenSet[Integer]\n", "Grid = Tuple[Tuple[Integer]]\n", "Cell = Tuple[Integer, IntegerTuple]\n", "Object = FrozenSet[Cell]\n", "Objects = FrozenSet[Object]\n", "Indices = FrozenSet[IntegerTuple]\n", "IndicesSet = FrozenSet[Indices]\n", "Patch = Union[Object, Indices]\n", "Element = Union[Object, Grid]\n", "Piece = Union[Grid, Patch]\n", "TupleTuple = Tuple[Tuple]\n", "ContainerContainer = Container[Container]\n", "\n", "ZERO = 0\n", "ONE = 1\n", "TWO = 2\n", "THREE = 3\n", "FOUR = 4\n", "FIVE = 5\n", "SIX = 6\n", "SEVEN = 7\n", "EIGHT = 8\n", "NINE = 9\n", "TEN = 10\n", "F = False\n", "T = True\n", "\n", "NEG_ONE = -1\n", "\n", "ORIGIN = (0, 0)\n", "UNITY = (1, 1)\n", "DOWN = (1, 0)\n", "RIGHT = (0, 1)\n", "UP = (-1, 0)\n", "LEFT = (0, -1)\n", "\n", "NEG_TWO = -2\n", "NEG_UNITY = (-1, -1)\n", "UP_RIGHT = (-1, 1)\n", "DOWN_LEFT = (1, -1)\n", "\n", "ZERO_BY_TWO = (0, 2)\n", "TWO_BY_ZERO = (2, 0)\n", "TWO_BY_TWO = (2, 2)\n", "THREE_BY_THREE = (3, 3)\n", "\n", "def fill(\n", " grid: Grid,\n", " value: Integer,\n", " patch: Patch\n", ") -> Grid:\n", " \"\"\" fill value at indices \"\"\"\n", " h, w = len(grid), len(grid[0])\n", " grid_filled = list(list(row) for row in grid)\n", " for i, j in toindices(patch):\n", " if 0 <= i < h and 0 <= j < w:\n", " grid_filled[i][j] = value\n", " return tuple(tuple(row) for row in grid_filled)\n", "\n", "\n", "\n", "def asindices(\n", " grid: Grid\n", ") -> Indices:\n", " \"\"\" indices of all grid cells \"\"\"\n", " return frozenset((i, j) for i in range(len(grid)) for j in range(len(grid[0])))\n", "\n", "\n", "\n", "def width(\n", " piece: Piece\n", ") -> Integer:\n", " \"\"\" width of grid or patch \"\"\"\n", " if len(piece) == 0:\n", " return 0\n", " if isinstance(piece, tuple):\n", " return len(piece[0])\n", " return rightmost(piece) - leftmost(piece) + 1\n", "\n", "\n", "\n", "def index(\n", " grid: Grid,\n", " loc: IntegerTuple\n", ") -> Integer:\n", " \"\"\" color at location \"\"\"\n", " i, j = loc\n", " h, w = len(grid), len(grid[0])\n", " if not (0 <= i < h and 0 <= j < w):\n", " return None\n", " return grid[loc[0]][loc[1]] \n", "\n", "\n", "\n", "def unifint(\n", " diff_lb: float,\n", " diff_ub: float,\n", " bounds: Tuple[int, int]\n", ") -> int:\n", " \"\"\"\n", " diff_lb: lower bound for difficulty, must be in range [0, diff_ub]\n", " diff_ub: upper bound for difficulty, must be in range [diff_lb, 1]\n", " bounds: interval [a, b] determining the integer values that can be sampled\n", " \"\"\"\n", " a, b = bounds\n", " d = uniform(diff_lb, diff_ub)\n", " global rng\n", " rng.append(d)\n", " return min(max(a, round(a + (b - a) * d)), b)\n", "\n", "\n", "\n", "def interval(\n", " start: Integer,\n", " stop: Integer,\n", " step: Integer\n", ") -> Tuple:\n", " \"\"\" range \"\"\"\n", " return tuple(range(start, stop, step))\n", "\n", "\n", "\n", "def chain(\n", " h: Callable,\n", " g: Callable,\n", " f: Callable\n", ") -> Callable:\n", " \"\"\" function composition with three functions \"\"\"\n", " return lambda x: h(g(f(x)))\n", "\n", "\n", "\n", "def mapply(\n", " function: Callable,\n", " container: ContainerContainer\n", ") -> FrozenSet:\n", " \"\"\" apply and merge \"\"\"\n", " return merge(apply(function, container))\n", "\n", "\n", "\n", "def difference(\n", " a: Container,\n", " b: Container\n", ") -> Container:\n", " \"\"\" difference \"\"\"\n", " return type(a)(e for e in a if e not in b)\n", "\n", "\n", "\n", "def ofcolor(\n", " grid: Grid,\n", " value: Integer\n", ") -> Indices:\n", " \"\"\" indices of all grid cells with value \"\"\"\n", " return frozenset((i, j) for i, r in enumerate(grid) for j, v in enumerate(r) if v == value)\n", "\n", "\n", "\n", "def rbind(\n", " function: Callable,\n", " fixed: Any\n", ") -> Callable:\n", " \"\"\" fix the rightmost argument \"\"\"\n", " n = function.__code__.co_argcount\n", " if n == 2:\n", " return lambda x: function(x, fixed)\n", " elif n == 3:\n", " return lambda x, y: function(x, y, fixed)\n", " else:\n", " return lambda x, y, z: function(x, y, z, fixed)\n", "\n", "\n", "\n", "def dmirror(\n", " piece: Piece\n", ") -> Piece:\n", " \"\"\" mirroring along diagonal \"\"\"\n", " if isinstance(piece, tuple):\n", " return tuple(zip(*piece))\n", " a, b = ulcorner(piece)\n", " if isinstance(next(iter(piece))[1], tuple):\n", " return frozenset((v, (j - b + a, i - a + b)) for v, (i, j) in piece)\n", " return frozenset((j - b + a, i - a + b) for i, j in piece)\n", "\n", "\n", "\n", "def leastcolor(\n", " element: Element\n", ") -> Integer:\n", " \"\"\" least common color \"\"\"\n", " values = [v for r in element for v in r] if isinstance(element, tuple) else [v for v, _ in element]\n", " return min(set(values), key=values.count)\n", "\n", "\n", "\n", "def fork(\n", " outer: Callable,\n", " a: Callable,\n", " b: Callable\n", ") -> Callable:\n", " \"\"\" creates a wrapper function \"\"\"\n", " return lambda x: outer(a(x), b(x))\n", "\n", "\n", "\n", "def combine(\n", " a: Container,\n", " b: Container\n", ") -> Container:\n", " \"\"\" union \"\"\"\n", " return type(a)((*a, *b))\n", "\n", "\n", "\n", "def remove(\n", " value: Any,\n", " container: Container\n", ") -> Container:\n", " \"\"\" remove item from container \"\"\"\n", " return type(container)(e for e in container if e != value)\n", "\n", "\n", "\n", "def height(\n", " piece: Piece\n", ") -> Integer:\n", " \"\"\" height of grid or patch \"\"\"\n", " if len(piece) == 0:\n", " return 0\n", " if isinstance(piece, tuple):\n", " return len(piece)\n", " return lowermost(piece) - uppermost(piece) + 1\n", "\n", "\n", "\n", "def connect(\n", " a: IntegerTuple,\n", " b: IntegerTuple\n", ") -> Indices:\n", " \"\"\" line between two points \"\"\"\n", " ai, aj = a\n", " bi, bj = b\n", " si = min(ai, bi)\n", " ei = max(ai, bi) + 1\n", " sj = min(aj, bj)\n", " ej = max(aj, bj) + 1\n", " if ai == bi:\n", " return frozenset((ai, j) for j in range(sj, ej))\n", " elif aj == bj:\n", " return frozenset((i, aj) for i in range(si, ei))\n", " elif bi - ai == bj - aj:\n", " return frozenset((i, j) for i, j in zip(range(si, ei), range(sj, ej)))\n", " elif bi - ai == aj - bj:\n", " return frozenset((i, j) for i, j in zip(range(si, ei), range(ej - 1, sj - 1, -1)))\n", " return frozenset()\n", "\n", "\n", "\n", "def totuple(\n", " container: FrozenSet\n", ") -> Tuple:\n", " \"\"\" conversion to tuple \"\"\"\n", " return tuple(container)\n", "\n", "\n", "\n", "def lbind(\n", " function: Callable,\n", " fixed: Any\n", ") -> Callable:\n", " \"\"\" fix the leftmost argument \"\"\"\n", " n = function.__code__.co_argcount\n", " if n == 2:\n", " return lambda y: function(fixed, y)\n", " elif n == 3:\n", " return lambda y, z: function(fixed, y, z)\n", " else:\n", " return lambda y, z, a: function(fixed, y, z, a)\n", "\n", "\n", "\n", "def vsplit(\n", " grid: Grid,\n", " n: Integer\n", ") -> Tuple:\n", " \"\"\" split grid vertically \"\"\"\n", " h, w = len(grid) // n, len(grid[0])\n", " offset = len(grid) % n != 0\n", " return tuple(crop(grid, (h * i + i * offset, 0), (h, w)) for i in range(n))\n", "\n", "\n", "\n", "def canvas(\n", " value: Integer,\n", " dimensions: IntegerTuple\n", ") -> Grid:\n", " \"\"\" grid construction \"\"\"\n", " return tuple(tuple(value for j in range(dimensions[1])) for i in range(dimensions[0]))\n", "\n", "\n", "\n", "\n" ] } ], "source": [ "print(data_list[0]['verifier'])\n", "print(data_list[0]['generator'])\n", "print(data_list[0]['global_variable'])\n", "print(data_list[0]['additional_functions'])" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import asyncio\n", "import json\n", "import mimetypes\n", "import os\n", "import re\n", "import uuid\n", "from typing import Tuple\n", "\n", "import aiohttp\n", "import requests\n", "from PIL import Image\n", "\n", "from utils.common.logging import logger\n", "\n", "\n", "def get_ext(url):\n", " rule = r\"\\.(.*?)\\?\"\n", " rst = re.findall(rule, url)[0]\n", "\n", " return rst.split(\".\")[-1]\n", "\n", "\n", "async def download_file(url, local_filename):\n", " async with aiohttp.ClientSession() as session:\n", " async with session.get(url) as response:\n", " ext = get_ext(url)\n", " logger.info(f\"------------------------------------------{ext}\")\n", " logger.info(response.status)\n", "\n", " if response.status == 200:\n", " filename_with_ext = os.path.abspath(f\"{local_filename}.{ext}\")\n", " content = await response.read()\n", "\n", " with open(filename_with_ext, \"wb\") as f:\n", " f.write(content)\n", "\n", " if ext == \"webp\":\n", " im = Image.open(filename_with_ext).convert(\"RGB\")\n", " im.save(f\"{local_filename}.jpg\", \"jpeg\")\n", " os.remove(filename_with_ext)\n", " return f\"{local_filename}.jpg\"\n", " else:\n", " return filename_with_ext\n", " else:\n", " logger.info(f\"retalking hedra {url} download file failed\")\n", " raise RuntimeError(f\"{url} download failed\")\n", "\n", "\n", "class HedraClient:\n", " def __init__(self):\n", " self._base_url = \"https://mercury.dev.dream-ai.com/api\"\n", " self._check_task_url = \"https://mercury.dev.dream-ai.com/api/v1/projects/{task_id}\"\n", " self._key = \"sk_hedra-TxkxBe8htuAuGXwoPYgjHhYpwcQ3gdFmcGdRTLksRKUcSQEpm7VCNzSNj2680fZC\"\n", " self.timeout = aiohttp.ClientTimeout(total=10)\n", " os.makedirs(\"temp\", exist_ok=True)\n", "\n", " async def post_audio(self, audio_url):\n", " headers = {\n", " \"X-API-KEY\": self._key,\n", " }\n", " local_audio = await download_file(audio_url, f\"temp/{str(uuid.uuid4())}\")\n", " try:\n", " async with aiohttp.ClientSession() as session:\n", " data = aiohttp.FormData()\n", " data.add_field(\"file\", open(local_audio, \"rb\"))\n", " async with session.post(\n", " f\"{self._base_url}/v1/audio\", headers=headers, data={\"file\": open(local_audio, \"rb\")}\n", " ) as resp:\n", " return await resp.json()\n", " finally:\n", " if os.path.exists(local_audio):\n", " os.remove(local_audio)\n", "\n", " async def post_image(self, image_url):\n", " headers = {\n", " \"X-API-KEY\": self._key,\n", " }\n", " local_image = await download_file(image_url, f\"temp/{str(uuid.uuid4())}\")\n", " try:\n", " async with aiohttp.ClientSession() as session:\n", " data = aiohttp.FormData()\n", " data.add_field(\"file\", open(local_image, \"rb\"))\n", " async with session.post(\n", " f\"{self._base_url}/v1/portrait\", headers=headers, data={\"file\": open(local_image, \"rb\")}, timeout=10\n", " ) as resp:\n", " return await resp.json()\n", " finally:\n", " if os.path.exists(local_image):\n", " os.remove(local_image)\n", "\n", " async def submit_task(self, audio_url: str, image_url: str, aspect_ratio: str) -> Tuple[str, str]:\n", " headers = {\n", " \"X-API-KEY\": self._key,\n", " }\n", "\n", " audio_task = asyncio.create_task(self.post_audio(audio_url))\n", " image_task = asyncio.create_task(self.post_image(image_url))\n", " audio_result, image_result = await asyncio.gather(audio_task, image_task)\n", " logger.info(image_result)\n", " logger.info(audio_result)\n", " payload = {\n", " \"voiceUrl\": audio_result[\"url\"],\n", " \"avatarImage\": image_result[\"url\"],\n", " \"aspectRatio\": aspect_ratio,\n", " }\n", "\n", " async with aiohttp.ClientSession(headers=headers, timeout=self.timeout) as session:\n", " async with session.post(f\"{self._base_url}/v1/characters\", json=payload) as response:\n", " data = await response.json()\n", " logger.info(data)\n", " task_id = data.get(\"jobId\", None)\n", " assert task_id is not None, f\"Failed to submit task, {data}\"\n", " request_id = data.get(\"request_id\", None)\n", " return task_id, request_id\n", "\n", " async def get_response(self, task_id: str) -> Tuple[str, float]:\n", " headers = {\n", " \"X-API-KEY\": self._key,\n", " }\n", " async with aiohttp.ClientSession(headers=headers, timeout=self.timeout) as session:\n", " while True:\n", " async with session.get(self._check_task_url.format(task_id=task_id)) as response:\n", " data = await response.json()\n", " status = data.get(\"status\", None)\n", " logger.info(f\"Hedra Task {task_id}, status: {status}, get response: {data}\")\n", " if status == \"Completed\":\n", " video_url = data.get(\"videoUrl\", None)\n", " assert video_url is not None, f\"Failed to get video_url from response[{data}]\"\n", " video_duration = 4\n", " return video_url, video_duration\n", " elif status in [\"Failed\"] or status is None:\n", " raise RuntimeError(\n", " f\"Task {task_id} failed or was canceled. {data.get('output', {}).get('message', '')}\"\n", " )\n", " else:\n", " await asyncio.sleep(4)\n" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.9.19" } }, "nbformat": 4, "nbformat_minor": 2 }