Spaces:
Sleeping
Sleeping
# aworld/runners/handler/output.py | |
import json | |
from typing import AsyncGenerator | |
from aworld.core.task import TaskResponse | |
from aworld.models.model_response import ModelResponse | |
from aworld.runners.handler.base import DefaultHandler | |
from aworld.output.base import StepOutput, MessageOutput, ToolResultOutput, Output | |
from aworld.core.common import TaskItem | |
from aworld.core.context.base import Context | |
from aworld.core.event.base import Message, Constants, TopicType | |
from aworld.logs.util import logger | |
class DefaultOutputHandler(DefaultHandler): | |
def __init__(self, runner): | |
self.runner = runner | |
async def handle(self, message): | |
if message.category != Constants.OUTPUT: | |
return | |
# 1. get outputs | |
outputs = self.runner.task.outputs | |
if not outputs: | |
yield Message( | |
category=Constants.TASK, | |
payload=TaskItem(msg="Cannot get outputs.", data=message, stop=True), | |
sender=self.name(), | |
session_id=Context.instance().session_id, | |
topic=TopicType.ERROR, | |
headers={"context": message.context} | |
) | |
return | |
# 2. build Output | |
payload = message.payload | |
mark_complete = False | |
output = None | |
try: | |
if isinstance(payload, Output): | |
output = payload | |
elif isinstance(payload, TaskResponse): | |
logger.info(f"output get task_response with usage: {json.dumps(payload.usage)}") | |
if message.topic == TopicType.FINISHED or message.topic == TopicType.ERROR: | |
mark_complete = True | |
elif isinstance(payload, ModelResponse) or isinstance(payload, AsyncGenerator): | |
output = MessageOutput(source=payload) | |
except Exception as e: | |
logger.warning(f"Failed to parse output: {e}") | |
yield Message( | |
category=Constants.TASK, | |
payload=TaskItem(msg="Failed to parse output.", data=payload, stop=True), | |
sender=self.name(), | |
session_id=Context.instance().session_id, | |
topic=TopicType.ERROR, | |
headers={"context": message.context} | |
) | |
finally: | |
if output: | |
if not output.metadata: | |
output.metadata = {} | |
output.metadata['sender'] = message.sender | |
output.metadata['receiver'] = message.receiver | |
await outputs.add_output(output) | |
if mark_complete: | |
await outputs.mark_completed() | |
return |