# runner.py from settings import Settings from models import Question, QuestionAnswerPair from agent import ManagerAgent # Import ManagerAgent import pandas as pd import logging import json import asyncio import nest_asyncio nest_asyncio.apply() # Apply nest_asyncio for nested event loops (e.g., in Gradio) logger = logging.getLogger(__name__) class Runner: def __init__(self, settings: Settings): self.settings = settings # Initialize ManagerAgent once to reuse across runs self.manager_agent = ManagerAgent(self.settings) def _save_pairs(self, pairs: list[QuestionAnswerPair], username: str): """Write the question answer pairs to a user-specific file.""" answers = [pair.model_dump() for pair in pairs if pair is not None] file_name = f"answers_{username}.json" try: with open(file_name, "w") as f: json.dump(answers, f, indent=4) logger.info(f"Saved {len(answers)} question-answer pairs to '{file_name}'.") except Exception as e: logger.error(f"Error saving question-answer pairs to '{file_name}': {e}") async def _run_agent_async(self, item: Question): """Runs the agent asynchronously on a single question.""" task_id = item.task_id # Pass the full question object to the ManagerAgent's __call__ question_data = item.model_dump() # Convert Pydantic model to dict try: # Call the manager agent's __call__ method answer = await asyncio.to_thread(self.manager_agent, question_data) except Exception as e: logger.error(f"Error running agent on task {task_id}: {e}") answer = f"AGENT ERROR: {e}" return QuestionAnswerPair(task_id=task_id, question=item.question, answer=str(answer)) def _assign_questions(self, questions: list[Question]): """Runs the asynchronous loop and returns task outputs.""" tasks = [self._run_agent_async(item) for item in questions] return asyncio.gather(*tasks) def run_agent(self, questions: list[Question], username: str) -> pd.DataFrame: """Run the agent(s) async, save answers and return a dataframe""" # Ensure an event loop is running or create a new one try: loop = asyncio.get_running_loop() except RuntimeError: loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) # Run the asynchronous tasks in the event loop pairs = loop.run_until_complete(self._assign_questions(questions)) # Save json to disk and return a dataframe self._save_pairs(pairs, username) results_log = [pair.model_dump() for pair in pairs if pair is not None] if not results_log: logger.warning("Agent did not produce any answers to submit.") return pd.DataFrame(results_log)