hassenhamdi commited on
Commit
2225026
·
verified ·
1 Parent(s): 09f04d8

Create runner.py

Browse files
Files changed (1) hide show
  1. runner.py +69 -0
runner.py ADDED
@@ -0,0 +1,69 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # runner.py
2
+ from settings import Settings
3
+ from models import Question, QuestionAnswerPair
4
+ from agent import ManagerAgent # Import ManagerAgent
5
+ import pandas as pd
6
+ import logging
7
+ import json
8
+ import asyncio
9
+ import nest_asyncio
10
+ nest_asyncio.apply() # Apply nest_asyncio for nested event loops (e.g., in Gradio)
11
+ logger = logging.getLogger(__name__)
12
+
13
+ class Runner:
14
+ def __init__(self, settings: Settings):
15
+ self.settings = settings
16
+ # Initialize ManagerAgent once to reuse across runs
17
+ self.manager_agent = ManagerAgent(self.settings)
18
+
19
+ def _save_pairs(self, pairs: list[QuestionAnswerPair], username: str):
20
+ """Write the question answer pairs to a user-specific file."""
21
+ answers = [pair.model_dump() for pair in pairs if pair is not None]
22
+ file_name = f"answers_{username}.json"
23
+ try:
24
+ with open(file_name, "w") as f:
25
+ json.dump(answers, f, indent=4)
26
+ logger.info(f"Saved {len(answers)} question-answer pairs to '{file_name}'.")
27
+ except Exception as e:
28
+ logger.error(f"Error saving question-answer pairs to '{file_name}': {e}")
29
+
30
+
31
+ async def _run_agent_async(self, item: Question):
32
+ """Runs the agent asynchronously on a single question."""
33
+ task_id = item.task_id
34
+ # Pass the full question object to the ManagerAgent's __call__
35
+ question_data = item.model_dump() # Convert Pydantic model to dict
36
+ try:
37
+ # Call the manager agent's __call__ method
38
+ answer = await asyncio.to_thread(self.manager_agent, question_data)
39
+ except Exception as e:
40
+ logger.error(f"Error running agent on task {task_id}: {e}")
41
+ answer = f"AGENT ERROR: {e}"
42
+ return QuestionAnswerPair(task_id=task_id,
43
+ question=item.question, answer=str(answer))
44
+
45
+ def _assign_questions(self, questions: list[Question]):
46
+ """Runs the asynchronous loop and returns task outputs."""
47
+ tasks = [self._run_agent_async(item) for item in questions]
48
+ return asyncio.gather(*tasks)
49
+
50
+ def run_agent(self, questions: list[Question], username: str) -> pd.DataFrame:
51
+ """Run the agent(s) async, save answers and return a dataframe"""
52
+ # Ensure an event loop is running or create a new one
53
+ try:
54
+ loop = asyncio.get_running_loop()
55
+ except RuntimeError:
56
+ loop = asyncio.new_event_loop()
57
+ asyncio.set_event_loop(loop)
58
+
59
+ # Run the asynchronous tasks in the event loop
60
+ pairs = loop.run_until_complete(self._assign_questions(questions))
61
+
62
+ # Save json to disk and return a dataframe
63
+ self._save_pairs(pairs, username)
64
+ results_log = [pair.model_dump() for pair in pairs if pair is not None]
65
+ if not results_log:
66
+ logger.warning("Agent did not produce any answers to submit.")
67
+
68
+ return pd.DataFrame(results_log)
69
+