In [1]:
import json
with open('/mnt/data/zifeng.cao/reasoning/arc-agi/re-arc/origin_code.jsonl', 'r') as f:
    data_list = [json.loads(line) for line in f]


In [5]:
print(data_list[0]['verifier'])
print(data_list[0]['generator'])
print(data_list[0]['global_variable'])
print(data_list[0]['additional_functions'])

def verify_dbc1a6ce(I: Grid) -> Grid:
    x0 = leastcolor(I)
    x1 = ofcolor(I, x0)
    x2 = lbind(recolor, EIGHT)
    x3 = rbind(ofcolor, x0)
    x4 = chain(x2, backdrop, x3)
    x5 = fork(paint, identity, x4)
    x6 = height(I)
    x7 = vsplit(I, x6)
    x8 = mapply(x5, x7)
    x9 = ofcolor(x8, EIGHT)
    x10 = dmirror(I)
    x11 = width(I)
    x12 = vsplit(x10, x11)
    x13 = mapply(x5, x12)
    x14 = dmirror(x13)
    x15 = ofcolor(x14, EIGHT)
    x16 = combine(x9, x15)
    x17 = difference(x16, x1)
    x18 = fill(I, EIGHT, x17)
    return x18



def generate_dbc1a6ce(diff_lb: float, diff_ub: float) -> dict:
    dim_bounds = (3, 30)
    colopts = remove(8, interval(0, 10, 1))
    h = unifint(diff_lb, diff_ub, dim_bounds)
    w = unifint(diff_lb, diff_ub, dim_bounds)
    bgc = choice(colopts)
    c = canvas(bgc, (h, w))
    inds = totuple(asindices(c))
    card_bounds = (0, max(1, (h * w) // 4))
    num = unifint(diff_lb, diff_ub, card_bounds)
    s = sample(inds, num)
    fgcol = c

In [None]:
import asyncio
import json
import mimetypes
import os
import re
import uuid
from typing import Tuple

import aiohttp
import requests
from PIL import Image

from utils.common.logging import logger


def get_ext(url):
    rule = r"\.(.*?)\?"
    rst = re.findall(rule, url)[0]

    return rst.split(".")[-1]


async def download_file(url, local_filename):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            ext = get_ext(url)
            logger.info(f"------------------------------------------{ext}")
            logger.info(response.status)

            if response.status == 200:
                filename_with_ext = os.path.abspath(f"{local_filename}.{ext}")
                content = await response.read()

                with open(filename_with_ext, "wb") as f:
                    f.write(content)

                if ext == "webp":
                    im = Image.open(filename_with_ext).convert("RGB")
                    im.save(f"{local_filename}.jpg", "jpeg")
                    os.remove(filename_with_ext)
                    return f"{local_filename}.jpg"
                else:
                    return filename_with_ext
            else:
                logger.info(f"retalking hedra {url} download file failed")
                raise RuntimeError(f"{url} download failed")


class HedraClient:
    def __init__(self):
        self._base_url = "https://mercury.dev.dream-ai.com/api"
        self._check_task_url = "https://mercury.dev.dream-ai.com/api/v1/projects/{task_id}"
        self._key = "sk_hedra-TxkxBe8htuAuGXwoPYgjHhYpwcQ3gdFmcGdRTLksRKUcSQEpm7VCNzSNj2680fZC"
        self.timeout = aiohttp.ClientTimeout(total=10)
        os.makedirs("temp", exist_ok=True)

    async def post_audio(self, audio_url):
        headers = {
            "X-API-KEY": self._key,
        }
        local_audio = await download_file(audio_url, f"temp/{str(uuid.uuid4())}")
        try:
            async with aiohttp.ClientSession() as session:
                data = aiohttp.FormData()
                data.add_field("file", open(local_audio, "rb"))
                async with session.post(
                    f"{self._base_url}/v1/audio", headers=headers, data={"file": open(local_audio, "rb")}
                ) as resp:
                    return await resp.json()
        finally:
            if os.path.exists(local_audio):
                os.remove(local_audio)

    async def post_image(self, image_url):
        headers = {
            "X-API-KEY": self._key,
        }
        local_image = await download_file(image_url, f"temp/{str(uuid.uuid4())}")
        try:
            async with aiohttp.ClientSession() as session:
                data = aiohttp.FormData()
                data.add_field("file", open(local_image, "rb"))
                async with session.post(
                    f"{self._base_url}/v1/portrait", headers=headers, data={"file": open(local_image, "rb")}, timeout=10
                ) as resp:
                    return await resp.json()
        finally:
            if os.path.exists(local_image):
                os.remove(local_image)

    async def submit_task(self, audio_url: str, image_url: str, aspect_ratio: str) -> Tuple[str, str]:
        headers = {
            "X-API-KEY": self._key,
        }

        audio_task = asyncio.create_task(self.post_audio(audio_url))
        image_task = asyncio.create_task(self.post_image(image_url))
        audio_result, image_result = await asyncio.gather(audio_task, image_task)
        logger.info(image_result)
        logger.info(audio_result)
        payload = {
            "voiceUrl": audio_result["url"],
            "avatarImage": image_result["url"],
            "aspectRatio": aspect_ratio,
        }

        async with aiohttp.ClientSession(headers=headers, timeout=self.timeout) as session:
            async with session.post(f"{self._base_url}/v1/characters", json=payload) as response:
                data = await response.json()
                logger.info(data)
                task_id = data.get("jobId", None)
                assert task_id is not None, f"Failed to submit task, {data}"
                request_id = data.get("request_id", None)
                return task_id, request_id

    async def get_response(self, task_id: str) -> Tuple[str, float]:
        headers = {
            "X-API-KEY": self._key,
        }
        async with aiohttp.ClientSession(headers=headers, timeout=self.timeout) as session:
            while True:
                async with session.get(self._check_task_url.format(task_id=task_id)) as response:
                    data = await response.json()
                    status = data.get("status", None)
                    logger.info(f"Hedra Task {task_id}, status: {status}, get response: {data}")
                    if status == "Completed":
                        video_url = data.get("videoUrl", None)
                        assert video_url is not None, f"Failed to get video_url from response[{data}]"
                        video_duration = 4
                        return video_url, video_duration
                    elif status in ["Failed"] or status is None:
                        raise RuntimeError(
                            f"Task {task_id} failed or was canceled. {data.get('output', {}).get('message', '')}"
                        )
                    else:
                        await asyncio.sleep(4)
