civerson916's picture
Added asyncio.new_event_loop
0a8fce8 verified
raw
history blame
2.92 kB
from settings import Settings
from models import QuestionAnswerPair
from agent import BasicAgent
import pandas as pd
import logging
import json
import asyncio
import nest_asyncio
nest_asyncio.apply()
logger = logging.getLogger(__name__)
class Runner():
def __init__(self, settings: Settings):
self.settings = settings
def _save_pairs(self, pairs: list[QuestionAnswerPair]):
answers = [pair.model_dump() for pair in pairs if pair is not None]
with open("answers.json", "w") as f:
json.dump(answers, f, indent=4)
def _enrich_question_text(self, item):
task_id = item.task_id
file_name = item.file_name
question_text = (
f"{item.question} "
"Think hard to answer. Parse all statements in the question to make a plan. "
"Your final answer should be a number or as few words as possible. "
"If needed, use a comma separated list of numbers and/or strings. Critically "
f"review your answer before making it the final answer. task_id: {task_id}."
)
if file_name:
question_text = f"{question_text} file_name: {file_name} (use tools to fetch the file)"
return question_text
async def _run_agent_async(self, item):
"""Runs the agent asynchronously."""
task_id = item.task_id
question_text = self._enrich_question_text(item)
try:
answer = await asyncio.to_thread(BasicAgent(self.settings), question_text)
except Exception as e:
logger.error(f"Error running agent on task {task_id}: {e}")
answer = f"AGENT ERROR: {e}"
return QuestionAnswerPair(task_id=task_id,
question=item.question, answer=str(answer))
def _assign_questions(self, questions):
"""Runs the asynchronous loop and returns task outputs."""
tasks = [self._run_agent_async(item) for item in questions]
return asyncio.gather(*tasks)
def run_agent(self, questions) -> pd.DataFrame:
"""Run the agent(s) async, save answers and return a dataframe"""
# Assign questions to agents and wait
try:
loop = asyncio.get_running_loop()
except RuntimeError: # No running loop, create one
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
def run_tasks_in_thread():
question_answer_pairs = loop.run_until_complete(
self._assign_questions(questions))
return question_answer_pairs
pairs = run_tasks_in_thread()
# save json to disk and return a dataframe
self._save_pairs(pairs)
results_log = [pair.model_dump() for pair in pairs if pair is not None]
if not results_log:
logger.warning("Agent did not produce any answers to submit.")
return pd.DataFrame(results_log)