|
import os |
|
import re |
|
import requests |
|
import base64 |
|
import io |
|
import pandas as pd |
|
from openai import OpenAI |
|
|
|
class GaiaAgent: |
|
def __init__(self): |
|
self.client = OpenAI(api_key=os.getenv("OPENAI_API_KEY")) |
|
self.api_url = "https://agents-course-unit4-scoring.hf.space" |
|
|
|
def clean(self, text): |
|
return text.strip().replace("Final Answer:", "").replace("\n", "").replace(".", "").strip() |
|
|
|
def fetch_file(self, task_id): |
|
try: |
|
r = requests.get(f"{self.api_url}/files/{task_id}", timeout=10) |
|
r.raise_for_status() |
|
return r.content, r.headers.get("Content-Type", "") |
|
except Exception as e: |
|
return None, f"[Fetch error: {e}]" |
|
|
|
def ask(self, prompt: str, model="gpt-4-turbo") -> str: |
|
res = self.client.chat.completions.create( |
|
model=model, |
|
messages=[ |
|
{"role": "system", "content": "You are a precise assistant. Think step by step and return only the final answer in the correct format."}, |
|
{"role": "user", "content": prompt + "\n\nFinal Answer:"} |
|
], |
|
temperature=0.0, |
|
) |
|
return self.clean(res.choices[0].message.content) |
|
|
|
def ask_image(self, image_bytes: bytes, question: str) -> str: |
|
b64 = base64.b64encode(image_bytes).decode() |
|
messages = [ |
|
{"role": "system", "content": "You are a visual assistant. Only return the final answer to the question."}, |
|
{ |
|
"role": "user", |
|
"content": [ |
|
{"type": "text", "text": question}, |
|
{"type": "image_url", "image_url": {"url": f"data:image/png;base64,{b64}"}} |
|
] |
|
} |
|
] |
|
res = self.client.chat.completions.create(model="gpt-4o", messages=messages) |
|
return self.clean(res.choices[0].message.content) |
|
|
|
def q_excel_sales(self, file: bytes, question: str) -> str: |
|
try: |
|
df = pd.read_excel(io.BytesIO(file), engine="openpyxl") |
|
food = df[df['category'].str.lower() == 'food'] |
|
total = food['sales'].sum() |
|
return f"${total:.2f}" |
|
except Exception as e: |
|
return f"[Excel error: {e}]" |
|
|
|
def q_audio_transcribe(self, file: bytes, question: str) -> str: |
|
audio_path = "/tmp/audio.mp3" |
|
with open(audio_path, "wb") as f: |
|
f.write(file) |
|
transcript = self.client.audio.transcriptions.create( |
|
model="whisper-1", |
|
file=open(audio_path, "rb") |
|
) |
|
content = transcript.text[:3000] |
|
prompt = f"Transcript: {content}\n\nQuestion: {question}" |
|
return self.ask(prompt) |
|
|
|
def extract_youtube_hint(self, question: str) -> str: |
|
match = re.search(r"https://www\.youtube\.com/watch\?v=([\w-]+)", question) |
|
if match: |
|
return f"This task is about a YouTube video (ID: {match.group(1)}). Assume the video visually or audibly answers the question." |
|
return "" |
|
|
|
def __call__(self, question: str, task_id: str = None) -> str: |
|
context = "" |
|
|
|
if "youtube.com/watch" in question: |
|
context += self.extract_youtube_hint(question) + "\n" |
|
|
|
if task_id: |
|
file, content_type = self.fetch_file(task_id) |
|
|
|
if isinstance(file, bytes) and content_type: |
|
if "image" in content_type: |
|
return self.ask_image(file, question) |
|
if "audio" in content_type or task_id.endswith(".mp3"): |
|
return self.q_audio_transcribe(file, question) |
|
if "spreadsheet" in content_type or content_type.endswith("excel") or content_type.endswith("xlsx"): |
|
return self.q_excel_sales(file, question) |
|
if "text" in content_type: |
|
try: |
|
text = file.decode("utf-8", errors="ignore")[:3000] |
|
context += f"File Content:\n{text}\n" |
|
except Exception: |
|
pass |
|
|
|
prompt = f"{context}\nQuestion: {question}" |
|
return self.ask(prompt) |