File size: 3,359 Bytes
476b409 73b442d ebda551 476b409 73b442d 476b409 82f2d56 476b409 ebda551 476b409 73b442d 476b409 ebda551 476b409 73b442d 476b409 73b442d 476b409 0a8fce8 476b409 73b442d 476b409 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 |
from settings import Settings
from models import Question, QuestionAnswerPair
from agent import ManagerAgent
import pandas as pd
import logging
import json
import asyncio
import nest_asyncio
nest_asyncio.apply()
logger = logging.getLogger(__name__)
class Runner():
def __init__(self, settings: Settings):
self.settings = settings
def _save_pairs(self, pairs: list[QuestionAnswerPair], username: str):
"""Write the question answer pairs to a user-specific file."""
answers = [pair.model_dump() for pair in pairs if pair is not None]
file_name = f"answers_{username}.json"
with open(file_name, "w") as f:
json.dump(answers, f, indent=4)
def _enrich_question_text(self, item):
task_id = item.task_id
file_name = item.file_name
question_text = (
f"{item.question} "
"Think hard to answer. Parse all statements in the question to make a plan. "
"Your final answer should be a number or as few words as possible. "
"Only use abbreviations when the question calls for abbreviations. "
"If needed, use a comma separated list of values; the comma is always followed by a space"
f"Critically review your answer before making it the final answer. "
f"Double check the answer to make sure it meets all format requirements stated in the question. "
f"task_id: {task_id}."
)
if file_name:
question_text = f"{question_text} file_name: {file_name} (use tools to fetch the file)"
return question_text
async def _run_agent_async(self, item: Question):
"""Runs the agent asynchronously."""
task_id = item.task_id
question_text = self._enrich_question_text(item)
try:
answer = await asyncio.to_thread(ManagerAgent(self.settings), question_text)
except Exception as e:
logger.error(f"Error running agent on task {task_id}: {e}")
answer = f"AGENT ERROR: {e}"
return QuestionAnswerPair(task_id=task_id,
question=item.question, answer=str(answer))
def _assign_questions(self, questions: list[Question]):
"""Runs the asynchronous loop and returns task outputs."""
tasks = [self._run_agent_async(item) for item in questions]
return asyncio.gather(*tasks)
def run_agent(self, questions: list[Question], username: str) -> pd.DataFrame:
"""Run the agent(s) async, save answers and return a dataframe"""
# Assign questions to agents and wait
try:
loop = asyncio.get_running_loop()
except RuntimeError: # No running loop, create one
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
def run_tasks_in_thread():
question_answer_pairs = loop.run_until_complete(
self._assign_questions(questions))
return question_answer_pairs
pairs = run_tasks_in_thread()
# save json to disk and return a dataframe
self._save_pairs(pairs, username)
results_log = [pair.model_dump() for pair in pairs if pair is not None]
if not results_log:
logger.warning("Agent did not produce any answers to submit.")
return pd.DataFrame(results_log)
|