import asyncio import os from datetime import date from consts import PROJECT_ROOT_DIR # from dotenv import find_dotenv, load_dotenv from generate_arxiv_responses import ArxivResponseGenerator from llama_index.core.agent.workflow import AgentWorkflow, ReActAgent from llama_index.core.tools import FunctionTool from llama_index.llms.huggingface_api import HuggingFaceInferenceAPI from llama_index.tools.duckduckgo import DuckDuckGoSearchToolSpec from src.agent_hackathon.logger import get_logger # _ = load_dotenv(dotenv_path=find_dotenv(raise_error_if_not_found=False), override=True) logger = get_logger(log_name="multiagent", log_dir=PROJECT_ROOT_DIR / "logs") class MultiAgentWorkflow: """Multi-agent workflow for retrieving research papers and related events.""" def __init__(self) -> None: """Initialize the workflow with LLM, tools, and generator.""" logger.info("Initializing MultiAgentWorkflow.") self.llm = HuggingFaceInferenceAPI( model="meta-llama/Llama-3.3-70B-Instruct", provider="auto", # provider="nebius", temperature=0.1, top_p=0.95, max_tokens=8192 # api_key=os.getenv(key="NEBIUS_API_KEY"), # base_url="https://api.studio.nebius.com/v1/", ) self._generator = ArxivResponseGenerator( vector_store_path=PROJECT_ROOT_DIR / "db/arxiv_docs.db" ) # self._arxiv_rag_tool = FunctionTool.from_defaults( # fn=self._arxiv_rag, # name="arxiv_rag", # description="Retrieves arxiv research papers.", # return_direct=True, # ) self._duckduckgo_search_tool = [ tool for tool in DuckDuckGoSearchToolSpec().to_tool_list() if tool.metadata.name == "duckduckgo_full_search" ] # self._arxiv_agent = ReActAgent( # name="arxiv_agent", # description="Retrieves information about arxiv research papers", # system_prompt="You are arxiv research paper agent, who retrieves information " # "about arxiv research papers.", # tools=[self._arxiv_rag_tool], # llm=self.llm, # ) self._websearch_agent = ReActAgent( name="web_search", description="Searches the web", system_prompt="You are search engine who searches the web using duckduckgo tool", tools=self._duckduckgo_search_tool, llm=self.llm, ) self._workflow = AgentWorkflow( agents=[self._websearch_agent], root_agent="web_search", timeout=180, ) # AgentWorkflow.from_tools_or_functions( # tools_or_functions=self._duckduckgo_search_tool, # llm=self.llm, # system_prompt="You are an expert that " # "searches for any corresponding events related to the " # "user query " # "using the duckduckgo_search_tool and returns the final results." \ # "Don't return the steps but execute the necessary tools that you have " \ # "access to and return the results.", # timeout=180, # ) logger.info("MultiAgentWorkflow initialized.") def _arxiv_rag(self, query: str) -> str: """Retrieve research papers from arXiv based on the query. Args: query (str): The search query. Returns: str: Retrieved research papers as a string. """ return self._generator.retrieve_arxiv_papers(query=query) def _clean_response(self, result: str) -> str: """Removes the think tags. Args: result (str): The result with the content. Returns: str: The result without the content. """ if result.find(""): result = result[result.find("") + len("") :] return result async def run(self, user_query: str) -> str: """Run the multi-agent workflow for a given user query. Args: user_query (str): The user's search query. Returns: str: The output string. """ logger.info("Running multi-agent workflow.") try: research_papers = self._arxiv_rag(query=user_query) user_msg = ( f"search with the web search agent to find any relevant events related to: {user_query}.\n" f" The web search results relevant to the current year: {date.today().year}. \n" ) web_search_results = await self._workflow.run(user_msg=user_msg) final_res = ( research_papers + "\n\n" + web_search_results.response.blocks[0].text ) logger.info("Workflow run completed successfully.") return final_res except Exception as err: logger.error(f"Workflow run failed: {err}") raise if __name__ == "__main__": USER_QUERY = "i want to learn more about nlp" workflow = MultiAgentWorkflow() logger.info("Starting workflow for user query.") try: result = asyncio.run(workflow.run(user_query=USER_QUERY)) logger.info("Workflow finished. Output below:") print(result) except Exception as err: logger.error(f"Error during workflow execution: {err}")