|
import os |
|
import requests |
|
import re |
|
import base64 |
|
import pandas as pd |
|
import io |
|
from openai import OpenAI |
|
|
|
class GaiaAgent: |
|
def __init__(self): |
|
self.client = OpenAI(api_key=os.getenv("OPENAI_API_KEY")) |
|
self.api_url = "https://agents-course-unit4-scoring.hf.space" |
|
self.instructions = ( |
|
"You are a highly skilled and concise research assistant solving GAIA benchmark questions.\n" |
|
"Analyze attached files, video links, and images. Reason step-by-step internally.\n" |
|
"Return only the final factual answer. Do not explain." |
|
) |
|
|
|
def fetch_file(self, task_id: str): |
|
try: |
|
url = f"{self.api_url}/files/{task_id}" |
|
response = requests.get(url, timeout=10) |
|
response.raise_for_status() |
|
content_type = response.headers.get("Content-Type", "") |
|
return response.content, content_type |
|
except Exception as e: |
|
return None, f"[File error: {e}]" |
|
|
|
def extract_youtube_context(self, question: str) -> str: |
|
match = re.search(r"https://www\.youtube\.com/watch\?v=([\w-]+)", question) |
|
if match: |
|
video_id = match.group(1) |
|
return ( |
|
f"This question refers to a YouTube video with ID: {video_id}.\n" |
|
f"Assume the video contains relevant visual or auditory cues.\n" |
|
) |
|
return "" |
|
|
|
def extract_image_prompt(self, image_bytes: bytes) -> dict: |
|
image_b64 = base64.b64encode(image_bytes).decode("utf-8") |
|
return { |
|
"role": "user", |
|
"content": [ |
|
{"type": "text", "text": "Please analyze the image and answer the question accurately."}, |
|
{"type": "image_url", "image_url": {"url": f"data:image/png;base64,{image_b64}"}} |
|
] |
|
} |
|
|
|
def handle_excel_sales_question(self, excel_bytes: bytes, question: str) -> str: |
|
try: |
|
df = pd.read_excel(io.BytesIO(excel_bytes)) |
|
if 'category' in df.columns and 'sales' in df.columns: |
|
food_only = df[df['category'].str.lower() == 'food'] |
|
total = food_only['sales'].sum() |
|
return f"${total:.2f}" |
|
return "[SKIPPED: Required columns not found in Excel]" |
|
except Exception as e: |
|
return f"[Excel processing error: {e}]" |
|
|
|
def __call__(self, question: str, task_id: str = None) -> str: |
|
messages = [{"role": "system", "content": self.instructions}] |
|
|
|
if task_id: |
|
file_data, content_type = self.fetch_file(task_id) |
|
|
|
if isinstance(content_type, str) and "image" in content_type: |
|
image_message = self.extract_image_prompt(file_data) |
|
messages.append(image_message) |
|
messages.append({"role": "user", "content": question}) |
|
try: |
|
response = self.client.chat.completions.create( |
|
model="gpt-4o", |
|
messages=messages |
|
) |
|
return response.choices[0].message.content.strip() |
|
except Exception as e: |
|
return f"[Image answer error: {e}]" |
|
|
|
elif isinstance(content_type, str) and ("text" in content_type or "csv" in content_type or "json" in content_type): |
|
context = file_data.decode(errors="ignore")[:3000] |
|
messages.append({"role": "user", "content": f"File Content:\n{context}\n\nQuestion: {question}"}) |
|
|
|
elif isinstance(content_type, str) and "pdf" in content_type: |
|
messages.append({"role": "user", "content": f"[PDF content detected]\n\nQuestion: {question}"}) |
|
|
|
elif isinstance(content_type, str) and "audio" in content_type: |
|
messages.append({"role": "user", "content": f"[Audio content detected]\n\nQuestion: {question}"}) |
|
|
|
elif isinstance(content_type, str) and "spreadsheet" in content_type or content_type.endswith("excel") or content_type.endswith("xlsx"): |
|
return self.handle_excel_sales_question(file_data, question) |
|
|
|
video_context = self.extract_youtube_context(question) |
|
if video_context: |
|
messages.append({"role": "user", "content": f"{video_context}\n\nQuestion: {question}"}) |
|
elif not any(m["role"] == "user" for m in messages): |
|
messages.append({"role": "user", "content": question}) |
|
|
|
try: |
|
response = self.client.chat.completions.create( |
|
model="gpt-4-turbo", |
|
messages=messages, |
|
temperature=0.0 |
|
) |
|
return response.choices[0].message.content.strip() |
|
except Exception as e: |
|
return f"[Answer error: {e}]" |
|
|