|
|
|
|
|
import asyncio |
|
from typing import List, Dict, Union |
|
|
|
from aworld.config import RunConfig |
|
from aworld.config.conf import TaskConfig |
|
from aworld.agents.llm_agent import Agent |
|
from aworld.core.agent.swarm import Swarm |
|
from aworld.core.common import Config |
|
from aworld.core.task import Task, TaskResponse, Runner |
|
from aworld.output import StreamingOutputs |
|
from aworld.runners.utils import choose_runners, execute_runner |
|
from aworld.utils.common import sync_exec |
|
|
|
|
|
class Runners: |
|
"""Unified entrance to the utility class of the runnable task of execution.""" |
|
|
|
@staticmethod |
|
def streamed_run_task(task: Task) -> StreamingOutputs: |
|
"""Run the task in stream output.""" |
|
if not task.conf: |
|
task.conf = TaskConfig() |
|
|
|
streamed_result = StreamingOutputs( |
|
input=task.input, |
|
usage={}, |
|
is_complete=False |
|
) |
|
task.outputs = streamed_result |
|
|
|
streamed_result._run_impl_task = asyncio.create_task( |
|
Runners.run_task(task) |
|
) |
|
return streamed_result |
|
|
|
@staticmethod |
|
async def run_task(task: Union[Task, List[Task]], run_conf: RunConfig = None) -> Dict[str, TaskResponse]: |
|
"""Run tasks for some complex scenarios where agents cannot be directly used. |
|
|
|
Args: |
|
task: User task define. |
|
run_conf: |
|
""" |
|
if isinstance(task, Task): |
|
task = [task] |
|
|
|
runners: List[Runner] = await choose_runners(task) |
|
return await execute_runner(runners, run_conf) |
|
|
|
@staticmethod |
|
def sync_run_task(task: Union[Task, List[Task]], run_conf: Config = None) -> Dict[str, TaskResponse]: |
|
return sync_exec(Runners.run_task, task=task, run_conf=run_conf) |
|
|
|
@staticmethod |
|
def sync_run( |
|
input: str, |
|
agent: Agent = None, |
|
swarm: Swarm = None, |
|
tool_names: List[str] = [], |
|
session_id: str = None |
|
) -> TaskResponse: |
|
return sync_exec( |
|
Runners.run, |
|
input=input, |
|
agent=agent, |
|
swarm=swarm, |
|
tool_names=tool_names, |
|
session_id=session_id |
|
) |
|
|
|
@staticmethod |
|
async def run( |
|
input: str, |
|
agent: Agent = None, |
|
swarm: Swarm = None, |
|
tool_names: List[str] = [], |
|
session_id: str = None |
|
) -> TaskResponse: |
|
"""Run agent directly with input and tool names. |
|
|
|
Args: |
|
input: User query. |
|
agent: An agent with AI model configured, prompts, tools, mcp servers and other agents. |
|
swarm: Multi-agent topo. |
|
tool_names: Tool name list. |
|
session_id: Session id. |
|
|
|
Returns: |
|
TaskResponse: Task response. |
|
""" |
|
if agent and swarm: |
|
raise ValueError("`agent` and `swarm` only choose one.") |
|
|
|
if not input: |
|
raise ValueError('`input` is empty.') |
|
|
|
if agent: |
|
agent.task = input |
|
swarm = Swarm(agent) |
|
|
|
task = Task(input=input, swarm=swarm, tool_names=tool_names, |
|
event_driven=swarm.event_driven, session_id=session_id) |
|
res = await Runners.run_task(task) |
|
return res.get(task.id) |
|
|