Spaces:
Runtime error
Runtime error
| import os | |
| import json | |
| import traceback | |
| import sys | |
| from pathlib import Path | |
| cerebrum_path = Path.home() / "repos" / "terra" / "src" / "cerebrum" | |
| sys.path.insert(0, str(cerebrum_path)) | |
| REMOTE = os.getenv("HF_SPACE") == "true" | |
| CEREBRUM_AVAILABLE = False | |
| try: | |
| from aios.llm.apis import llm_chat, llm_call_tool, llm_chat_with_json_output | |
| from aios.interface import AutoTool | |
| from aios.config.config_manager import config as aios_config | |
| CEREBRUM_AVAILABLE = True | |
| print("β Loaded aios-agent-sdk (Cerebrum) successfully.") | |
| except ImportError: | |
| print("β οΈ cerebrum (aios-agent-sdk) not found. Falling back to mock mode.") | |
| def llm_chat(*args, **kwargs): | |
| return {"response": {"response_message": "[MOCK] Chat reply"}} | |
| def llm_call_tool(*args, **kwargs): | |
| return {"response": {"response_message": "[MOCK] Tool reply"}} | |
| def llm_chat_with_json_output(*args, **kwargs): | |
| return {"response": {"response_message": '[{"action_type": "chat", "action": "Mock Plan", "tool_use": []}]'}} | |
| # class AutoTool: | |
| # @staticmethod | |
| # def from_batch_preloaded(tools): | |
| # return [] | |
| from cerebrum.llm.apis import llm_chat, llm_call_tool, llm_chat_with_json_output | |
| from cerebrum.interface import AutoTool | |
| import os | |
| import json | |
| def get_config(): | |
| from cerebrum.config.config_manager import config | |
| return config | |
| config = get_config() | |
| aios_kernel_url = config.get_kernel_url() | |
| class DemoAgent: | |
| def __init__(self, agent_name): | |
| self.agent_name = agent_name | |
| self.config = self.load_config() | |
| self.tools = [ | |
| tool.get_tool_call_format() | |
| for tool in AutoTool.from_batch_preloaded(self.config["tools"]) | |
| ] | |
| self.plan_max_fail_times = 3 | |
| self.tool_call_max_fail_times = 3 | |
| self.start_time = None | |
| self.end_time = None | |
| self.request_waiting_times: list = [] | |
| self.request_turnaround_times: list = [] | |
| self.messages = [] | |
| self.workflow_mode = "manual" # (manual, automatic) | |
| self.rounds = 0 | |
| def load_config(self): | |
| script_path = os.path.abspath(__file__) | |
| script_dir = os.path.dirname(script_path) | |
| config_file = os.path.join(script_dir, "config.json") | |
| with open(config_file, "r") as f: | |
| config = json.load(f) | |
| return config | |
| def pre_select_tools(self, tool_names): | |
| pre_selected_tools = [] | |
| for tool_name in tool_names: | |
| for tool in self.tools: | |
| if tool["function"]["name"] == tool_name: | |
| pre_selected_tools.append(tool) | |
| break | |
| return pre_selected_tools | |
| def build_system_instruction(self): | |
| prefix = "".join(["".join(self.config["description"])]) | |
| plan_instruction = "".join( | |
| [ | |
| f"You are given the available tools from the tool list: {json.dumps(self.tools)} to help you solve problems. ", | |
| "Generate a plan with comprehensive yet minimal steps to fulfill the task. ", | |
| "The plan must follow the json format as below: ", | |
| "[", | |
| '{"action_type": "action_type_value", "action": "action_value","tool_use": [tool_name1, tool_name2,...]}', | |
| '{"action_type": "action_type_value", "action": "action_value", "tool_use": [tool_name1, tool_name2,...]}', | |
| "...", | |
| "]", | |
| "In each step of the planned plan, identify tools to use and recognize no tool is necessary. ", | |
| "Followings are some plan examples. ", | |
| "[" "[", | |
| '{"action_type": "tool_use", "action": "gather information from arxiv. ", "tool_use": ["arxiv"]},', | |
| '{"action_type": "chat", "action": "write a summarization based on the gathered information. ", "tool_use": []}', | |
| "];", | |
| "[", | |
| '{"action_type": "tool_use", "action": "gather information from arxiv. ", "tool_use": ["arxiv"]},', | |
| '{"action_type": "chat", "action": "understand the current methods and propose ideas that can improve ", "tool_use": []}', | |
| "]", | |
| "]", | |
| ] | |
| ) | |
| if self.workflow_mode == "manual": | |
| self.messages.append({"role": "system", "content": prefix}) | |
| else: | |
| assert self.workflow_mode == "automatic" | |
| self.messages.append({"role": "system", "content": prefix}) | |
| self.messages.append({"role": "user", "content": plan_instruction}) | |
| def automatic_workflow(self): | |
| for i in range(self.plan_max_fail_times): | |
| response = llm_chat_with_json_output( | |
| messages=self.messages, | |
| message_return_type="json" | |
| )["response"]["response_message"] | |
| try: | |
| workflow = json.loads(response) | |
| except: | |
| workflow = None | |
| self.rounds += 1 | |
| if workflow: | |
| return workflow | |
| else: | |
| self.messages.append( | |
| { | |
| "role": "assistant", | |
| "content": f"Fail {i+1} times to generate a valid plan. I need to regenerate a plan", | |
| } | |
| ) | |
| return None | |
| def manual_workflow(self): | |
| workflow = [ | |
| { | |
| "action_type": "call_tool", | |
| "action": "Search for relevant papers", | |
| "tool_use": ["demo_author/arxiv"], | |
| }, | |
| { | |
| "action_type": "chat", | |
| "action": "Provide responses based on the user's query", | |
| "tool_use": [], | |
| }, | |
| ] | |
| return workflow | |
| def run(self, task_input): | |
| self.build_system_instruction() | |
| self.messages.append({"role": "user", "content": task_input}) | |
| workflow = None | |
| if self.workflow_mode == "automatic": | |
| workflow = self.automatic_workflow() | |
| self.messages = self.messages[:1] # clear long context | |
| else: | |
| assert self.workflow_mode == "manual" | |
| workflow = self.manual_workflow() | |
| self.messages.append( | |
| { | |
| "role": "user", | |
| "content": f"[Thinking]: The workflow generated for the problem is {json.dumps(workflow)}. Follow the workflow to solve the problem step by step. ", | |
| } | |
| ) | |
| try: | |
| if workflow: | |
| final_result = "" | |
| for i, step in enumerate(workflow): | |
| action_type = step["action_type"] | |
| action = step["action"] | |
| tool_use = step["tool_use"] | |
| prompt = f"At step {i + 1}, you need to: {action}. " | |
| self.messages.append({"role": "user", "content": prompt}) | |
| if tool_use: | |
| selected_tools = self.pre_select_tools(tool_use) | |
| else: | |
| selected_tools = None | |
| if action_type == "call_tool": | |
| response = llm_call_tool( | |
| agent_name=self.agent_name, | |
| messages=self.messages, | |
| tools=selected_tools, | |
| base_url=aios_kernel_url | |
| )["response"] | |
| else: | |
| response = llm_chat( | |
| agent_name=self.agent_name, | |
| messages=self.messages, | |
| base_url=aios_kernel_url | |
| )["response"] | |
| self.messages.append({"role": "assistant", "content": response["response_message"]}) | |
| self.rounds += 1 | |
| final_result = self.messages[-1]["content"] | |
| return { | |
| "agent_name": self.agent_name, | |
| "result": final_result, | |
| "rounds": self.rounds, | |
| } | |
| else: | |
| return { | |
| "agent_name": self.agent_name, | |
| "result": "Failed to generate a valid workflow in the given times.", | |
| "rounds": self.rounds, | |
| } | |
| except Exception as e: | |
| return {} |