|
import os |
|
import requests |
|
import mimetypes |
|
from openai import OpenAI |
|
from duckduckgo_search import DDGS |
|
from PIL import Image |
|
import pytesseract |
|
import io |
|
import openpyxl |
|
|
|
class GaiaAgent: |
|
def __init__(self): |
|
self.client = OpenAI(api_key=os.getenv("OPENAI_API_KEY")) |
|
self.instructions = ( |
|
"You are a top-tier research assistant for the GAIA benchmark. " |
|
"You analyze documents, reason step by step, and always provide a single, concise, and correct answer. " |
|
"If a file is provided, extract all relevant information. Use only information from the question and file. " |
|
"Always output only 'Final Answer: <answer>' as the last line, no explanation after." |
|
) |
|
self.api_url = "https://agents-course-unit4-scoring.hf.space" |
|
|
|
def fetch_file(self, task_id: str): |
|
try: |
|
url = f"{self.api_url}/files/{task_id}" |
|
resp = requests.get(url, timeout=15) |
|
resp.raise_for_status() |
|
content_type = resp.headers.get("Content-Type", "") |
|
return resp.content, content_type |
|
except Exception as e: |
|
return None, None |
|
|
|
def ocr_image(self, img_bytes): |
|
try: |
|
img = Image.open(io.BytesIO(img_bytes)) |
|
return pytesseract.image_to_string(img) |
|
except Exception as e: |
|
return "[ERROR: Unable to OCR image]" |
|
|
|
def read_excel(self, file_bytes): |
|
try: |
|
wb = openpyxl.load_workbook(io.BytesIO(file_bytes), data_only=True) |
|
sheet = wb.active |
|
rows = list(sheet.iter_rows(values_only=True)) |
|
text = "\n".join(["\t".join(str(cell) if cell is not None else "" for cell in row) for row in rows]) |
|
return text |
|
except Exception as e: |
|
return "[ERROR: Unable to read Excel file]" |
|
|
|
def web_search(self, query, max_results=3): |
|
try: |
|
ddgs = DDGS() |
|
results = ddgs.text(query) |
|
summaries = [] |
|
for i, r in enumerate(results): |
|
if i >= max_results: break |
|
summaries.append(f"{r['title']}: {r['body']}") |
|
return "\n".join(summaries) |
|
except Exception as e: |
|
return f"[ERROR: Web search failed: {e}]" |
|
|
|
def call_llm(self, prompt): |
|
response = self.client.chat.completions.create( |
|
model="gpt-4o", |
|
messages=[ |
|
{"role": "system", "content": self.instructions}, |
|
{"role": "user", "content": prompt} |
|
], |
|
temperature=0.0, |
|
max_tokens=1024, |
|
) |
|
return response.choices[0].message.content.strip() |
|
|
|
def parse_final_answer(self, text): |
|
for line in reversed(text.splitlines()): |
|
if "Final Answer:" in line: |
|
return line.replace("Final Answer:", "").strip() |
|
|
|
return text.strip() |
|
|
|
def __call__(self, question: str, task_id: str = None) -> str: |
|
file_context = "" |
|
file_text = "" |
|
file_type = None |
|
|
|
|
|
if task_id: |
|
file_bytes, content_type = self.fetch_file(task_id) |
|
if not file_bytes or not content_type: |
|
file_context = "[ERROR: Could not download file]" |
|
elif "image" in content_type: |
|
file_text = self.ocr_image(file_bytes) |
|
file_context = f"Extracted text from image:\n{file_text}\n" |
|
elif "spreadsheet" in content_type or "excel" in content_type or task_id.endswith(".xlsx"): |
|
file_text = self.read_excel(file_bytes) |
|
file_context = f"Extracted text from Excel:\n{file_text}\n" |
|
elif "text" in content_type or "csv" in content_type or "json" in content_type: |
|
file_text = file_bytes.decode(errors="ignore")[:6000] |
|
file_context = f"File content:\n{file_text}\n" |
|
else: |
|
file_context = "[Unsupported or unknown file type]\n" |
|
|
|
|
|
|
|
search_needed = False |
|
search_keywords = ["who", "what", "when", "where", "name", "number", "how many", "first", "last", "award", "recipient"] |
|
if any(kw in question.lower() for kw in search_keywords): |
|
search_results = self.web_search(question) |
|
if search_results and "ERROR" not in search_results: |
|
file_context += f"\nHere are relevant web search results:\n{search_results}\n" |
|
search_needed = True |
|
|
|
|
|
prompt = ( |
|
f"{self.instructions}\n\n" |
|
f"{file_context}" |
|
f"Question: {question}\n" |
|
"Show your reasoning step by step, then provide the final answer as 'Final Answer: <answer>'." |
|
) |
|
llm_response = self.call_llm(prompt) |
|
answer = self.parse_final_answer(llm_response) |
|
|
|
|
|
return answer |