Spaces:
Sleeping
Sleeping
| import json | |
| import os | |
| import time | |
| import requests | |
| from aworld.core.common import Observation, ActionResult | |
| from typing_extensions import Any | |
| from aworld.runners.callback.decorator import reg_callback | |
| from aworld.logs.util import logger | |
| def gen_video(actionResult:ActionResult) -> ActionResult: | |
| try: | |
| if not actionResult or not actionResult.content: | |
| return actionResult | |
| content = json.loads(actionResult.content) | |
| task_id = content.get("video_id") | |
| if not task_id: | |
| return actionResult | |
| gen_video_item(task_id) | |
| return actionResult | |
| except Exception as e: | |
| logger.warning(f"Exception gen_video occurred: {e}") | |
| return actionResult | |
| def gen_video_item(task_id:str) -> Any: | |
| if not task_id: | |
| return None | |
| try: | |
| from dotenv import load_dotenv | |
| load_dotenv() | |
| api_key = os.getenv('DASHSCOPE_API_KEY') | |
| query_base_url = os.getenv('DASHSCOPE_QUERY_BASE_URL', '') | |
| # Step 2: Poll for results | |
| max_attempts = int(os.getenv('DASHSCOPE_VIDEO_RETRY_TIMES', 10)) # Increased default retries for video | |
| wait_time = int(os.getenv('DASHSCOPE_VIDEO_SLEEP_TIME', 5)) # Increased default wait time for video | |
| query_url = f"{query_base_url}{task_id}" | |
| for attempt in range(max_attempts): | |
| # Wait before polling | |
| time.sleep(wait_time) | |
| logger.info(f"Polling attempt {attempt + 1}/{max_attempts}...") | |
| # Poll for results | |
| query_response = requests.get(query_url, headers={'Authorization': f'Bearer {api_key}'}) | |
| if query_response.status_code != 200: | |
| logger.info(f"Poll request failed with status code {query_response.status_code}") | |
| continue | |
| try: | |
| query_result = query_response.json() | |
| except json.JSONDecodeError as e: | |
| logger.warning(f"Failed to parse response as JSON: {e}") | |
| continue | |
| # Check task status | |
| task_status = query_result.get("output", {}).get("task_status") | |
| if task_status == "SUCCEEDED": | |
| # Extract video URL | |
| video_url = query_result.get("output", {}).get("video_url") | |
| if video_url: | |
| # Return as array of objects with video_url for consistency with image API | |
| return json.dumps({"video_url": video_url}) | |
| else: | |
| logger.info("Video URL not found in the response") | |
| return None | |
| elif task_status in ["PENDING", "RUNNING"]: | |
| # If still running, continue to next polling attempt | |
| logger.info(f"gen_video_item Task status: {task_status}, continuing to next poll...") | |
| continue | |
| elif task_status == "FAILED": | |
| logger.warning("Task failed") | |
| return None | |
| else: | |
| # Any other status, return None | |
| logger.warning(f"Unexpected status: {task_status}") | |
| return None | |
| # If we get here, polling timed out | |
| logger.warning("Polling timed out after maximum attempts") | |
| return None | |
| except Exception as e: | |
| logger.warning(f"Exception gen_video_item occurred: {e}") | |
| return None |