import os import json import traceback import sys from pathlib import Path cerebrum_path = Path.home() / "repos" / "terra" / "src" / "cerebrum" sys.path.insert(0, str(cerebrum_path)) REMOTE = os.getenv("HF_SPACE") == "true" CEREBRUM_AVAILABLE = False try: from aios.llm.apis import llm_chat, llm_call_tool, llm_chat_with_json_output from aios.interface import AutoTool from aios.config.config_manager import config as aios_config CEREBRUM_AVAILABLE = True print("✅ Loaded aios-agent-sdk (Cerebrum) successfully.") except ImportError: print("⚠️ cerebrum (aios-agent-sdk) not found. Falling back to mock mode.") def llm_chat(*args, **kwargs): return {"response": {"response_message": "[MOCK] Chat reply"}} def llm_call_tool(*args, **kwargs): return {"response": {"response_message": "[MOCK] Tool reply"}} def llm_chat_with_json_output(*args, **kwargs): return {"response": {"response_message": '[{"action_type": "chat", "action": "Mock Plan", "tool_use": []}]'}} # class AutoTool: # @staticmethod # def from_batch_preloaded(tools): # return [] from cerebrum.llm.apis import llm_chat, llm_call_tool, llm_chat_with_json_output from cerebrum.interface import AutoTool import os import json def get_config(): from cerebrum.config.config_manager import config return config config = get_config() aios_kernel_url = config.get_kernel_url() class DemoAgent: def __init__(self, agent_name): self.agent_name = agent_name self.config = self.load_config() self.tools = [ tool.get_tool_call_format() for tool in AutoTool.from_batch_preloaded(self.config["tools"]) ] self.plan_max_fail_times = 3 self.tool_call_max_fail_times = 3 self.start_time = None self.end_time = None self.request_waiting_times: list = [] self.request_turnaround_times: list = [] self.messages = [] self.workflow_mode = "manual" # (manual, automatic) self.rounds = 0 def load_config(self): script_path = os.path.abspath(__file__) script_dir = os.path.dirname(script_path) config_file = os.path.join(script_dir, "config.json") with open(config_file, "r") as f: config = json.load(f) return config def pre_select_tools(self, tool_names): pre_selected_tools = [] for tool_name in tool_names: for tool in self.tools: if tool["function"]["name"] == tool_name: pre_selected_tools.append(tool) break return pre_selected_tools def build_system_instruction(self): prefix = "".join(["".join(self.config["description"])]) plan_instruction = "".join( [ f"You are given the available tools from the tool list: {json.dumps(self.tools)} to help you solve problems. ", "Generate a plan with comprehensive yet minimal steps to fulfill the task. ", "The plan must follow the json format as below: ", "[", '{"action_type": "action_type_value", "action": "action_value","tool_use": [tool_name1, tool_name2,...]}', '{"action_type": "action_type_value", "action": "action_value", "tool_use": [tool_name1, tool_name2,...]}', "...", "]", "In each step of the planned plan, identify tools to use and recognize no tool is necessary. ", "Followings are some plan examples. ", "[" "[", '{"action_type": "tool_use", "action": "gather information from arxiv. ", "tool_use": ["arxiv"]},', '{"action_type": "chat", "action": "write a summarization based on the gathered information. ", "tool_use": []}', "];", "[", '{"action_type": "tool_use", "action": "gather information from arxiv. ", "tool_use": ["arxiv"]},', '{"action_type": "chat", "action": "understand the current methods and propose ideas that can improve ", "tool_use": []}', "]", "]", ] ) if self.workflow_mode == "manual": self.messages.append({"role": "system", "content": prefix}) else: assert self.workflow_mode == "automatic" self.messages.append({"role": "system", "content": prefix}) self.messages.append({"role": "user", "content": plan_instruction}) def automatic_workflow(self): for i in range(self.plan_max_fail_times): response = llm_chat_with_json_output( messages=self.messages, message_return_type="json" )["response"]["response_message"] try: workflow = json.loads(response) except: workflow = None self.rounds += 1 if workflow: return workflow else: self.messages.append( { "role": "assistant", "content": f"Fail {i+1} times to generate a valid plan. I need to regenerate a plan", } ) return None def manual_workflow(self): workflow = [ { "action_type": "call_tool", "action": "Search for relevant papers", "tool_use": ["demo_author/arxiv"], }, { "action_type": "chat", "action": "Provide responses based on the user's query", "tool_use": [], }, ] return workflow def run(self, task_input): self.build_system_instruction() self.messages.append({"role": "user", "content": task_input}) workflow = None if self.workflow_mode == "automatic": workflow = self.automatic_workflow() self.messages = self.messages[:1] # clear long context else: assert self.workflow_mode == "manual" workflow = self.manual_workflow() self.messages.append( { "role": "user", "content": f"[Thinking]: The workflow generated for the problem is {json.dumps(workflow)}. Follow the workflow to solve the problem step by step. ", } ) try: if workflow: final_result = "" for i, step in enumerate(workflow): action_type = step["action_type"] action = step["action"] tool_use = step["tool_use"] prompt = f"At step {i + 1}, you need to: {action}. " self.messages.append({"role": "user", "content": prompt}) if tool_use: selected_tools = self.pre_select_tools(tool_use) else: selected_tools = None if action_type == "call_tool": response = llm_call_tool( agent_name=self.agent_name, messages=self.messages, tools=selected_tools, base_url=aios_kernel_url )["response"] else: response = llm_chat( agent_name=self.agent_name, messages=self.messages, base_url=aios_kernel_url )["response"] self.messages.append({"role": "assistant", "content": response["response_message"]}) self.rounds += 1 final_result = self.messages[-1]["content"] return { "agent_name": self.agent_name, "result": final_result, "rounds": self.rounds, } else: return { "agent_name": self.agent_name, "result": "Failed to generate a valid workflow in the given times.", "rounds": self.rounds, } except Exception as e: return {}