File size: 7,832 Bytes
e836bd4 4695b90 239dbcb e14ee37 4695b90 239dbcb e14ee37 239dbcb 4695b90 239dbcb 4695b90 239dbcb 4695b90 239dbcb e14ee37 239dbcb 4695b90 e14ee37 4695b90 239dbcb 4695b90 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 |
import os
import io
import requests
import mimetypes
import subprocess
import tempfile
from openai import OpenAI
from duckduckgo_search import DDGS
from PIL import Image
import pytesseract
import openpyxl
try:
import whisper
except ImportError:
whisper = None
try:
import pdfplumber
except ImportError:
pdfplumber = None
AGENT_API_URL = "https://agents-course-unit4-scoring.hf.space"
def safe_strip(text):
if not text:
return ""
if isinstance(text, bytes):
text = text.decode(errors="ignore")
return str(text).replace("\r", "").strip()
def parse_final_answer(text):
"""
Extracts only the final answer from an LLM reply, no explanations, no 'Final Answer:' prefix
"""
for line in reversed(text.splitlines()):
if "Final Answer:" in line:
return line.split("Final Answer:")[-1].strip()
return safe_strip(text.splitlines()[-1])
def run_web_search(query, max_results=3):
try:
ddgs = DDGS()
results = ddgs.text(query)
for i, r in enumerate(results):
if i >= max_results:
break
if r.get('body'):
return r['body']
elif r.get('title'):
return r['title']
return ""
except Exception:
return ""
def fetch_file(task_id):
url = f"{AGENT_API_URL}/files/{task_id}"
try:
resp = requests.get(url, timeout=30)
resp.raise_for_status()
content_type = resp.headers.get("Content-Type", "")
return resp.content, content_type
except Exception:
return None, None
def ocr_image(img_bytes):
try:
img = Image.open(io.BytesIO(img_bytes))
return safe_strip(pytesseract.image_to_string(img))
except Exception:
return ""
def read_excel(file_bytes):
try:
wb = openpyxl.load_workbook(io.BytesIO(file_bytes), data_only=True)
sheet = wb.active
rows = list(sheet.iter_rows(values_only=True))
text = "\n".join(["\t".join(str(cell) if cell is not None else "" for cell in row) for row in rows])
return safe_strip(text)
except Exception:
return ""
def read_pdf(file_bytes):
if not pdfplumber:
return ""
try:
with pdfplumber.open(io.BytesIO(file_bytes)) as pdf:
return safe_strip("\n".join(page.extract_text() or "" for page in pdf.pages))
except Exception:
return ""
def transcribe_audio(audio_bytes):
if not whisper:
return ""
try:
with tempfile.NamedTemporaryFile(suffix=".mp3", delete=True) as tmpfile:
tmpfile.write(audio_bytes)
tmpfile.flush()
model = whisper.load_model("base")
result = model.transcribe(tmpfile.name)
return safe_strip(result.get("text", ""))
except Exception:
return ""
def transcribe_youtube_audio(youtube_url):
"""
Download audio from YouTube, transcribe using whisper
"""
if not whisper:
return ""
try:
with tempfile.TemporaryDirectory() as tmpdir:
audio_path = os.path.join(tmpdir, "audio.mp3")
cmd = [
"yt-dlp", "-f", "bestaudio[ext=m4a]/bestaudio/best",
"--extract-audio", "--audio-format", "mp3",
"-o", audio_path, youtube_url
]
subprocess.run(cmd, check=True, capture_output=True)
model = whisper.load_model("base")
result = model.transcribe(audio_path)
return safe_strip(result.get("text", ""))
except Exception:
return ""
def extract_file_text(file_bytes, content_type, task_id=""):
# Images
if "image" in content_type:
return ocr_image(file_bytes)
# Excel
if "spreadsheet" in content_type or "excel" in content_type or task_id.endswith(".xlsx"):
return read_excel(file_bytes)
# PDF
if "pdf" in content_type or task_id.endswith(".pdf"):
return read_pdf(file_bytes)
# Audio
if "audio" in content_type or task_id.endswith(".mp3") or task_id.endswith(".wav"):
return transcribe_audio(file_bytes)
# Text, CSV, JSON
if "text" in content_type or "csv" in content_type or "json" in content_type or task_id.endswith(".csv") or task_id.endswith(".json") or task_id.endswith(".txt"):
return safe_strip(file_bytes[:10000])
return ""
def guess_youtube_link(question):
# If the question mentions YouTube or a video link, try to extract it
import re
matches = re.findall(r"(https?://[^\s]+)", question)
for url in matches:
if "youtube.com" in url or "youtu.be" in url:
return url
return None
class GaiaAgent:
def __init__(self):
self.client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
self.instructions = (
"You are a top-tier research assistant for the GAIA benchmark. "
"You analyze documents, reason step by step, and always provide a single, concise, and correct answer. "
"If a file is provided, extract all relevant information. Use only information from the question and file. "
"If the question refers to a video/audio file or YouTube link, always try to transcribe it. "
"If you need additional facts, summarize web search results provided. "
"Never apologize, never say you are unable, never output placeholders. "
"Always output the answer only—no explanations, no extra text."
)
def __call__(self, question: str, task_id: str = None) -> str:
file_text = ""
web_context = ""
video_transcript = ""
prompt_parts = [self.instructions]
# 1. File handling (image, Excel, CSV, PDF, text, audio)
if task_id:
file_bytes, content_type = fetch_file(task_id)
if file_bytes and content_type:
file_text = extract_file_text(file_bytes, content_type, task_id)
if file_text:
prompt_parts.append(f"Here is the extracted file content:\n{file_text}\n")
# 2. YouTube/video handling (by URL in question)
youtube_url = guess_youtube_link(question)
if youtube_url:
transcript = transcribe_youtube_audio(youtube_url)
if transcript:
prompt_parts.append(f"Here is the transcript of the video:\n{transcript}\n")
# 3. Web search fallback for open-world/factoid questions or if no file info
search_keywords = [
"who", "what", "when", "where", "name", "number", "how many",
"first", "last", "award", "recipient", "code", "surname", "year", "album", "actor", "winner"
]
if (not file_text and not youtube_url) or any(kw in question.lower() for kw in search_keywords):
search_results = run_web_search(question)
if search_results:
prompt_parts.append(f"Here are relevant web search results:\n{search_results}\n")
# 4. Compose prompt
prompt_parts.append(f"Question: {question}\nAnswer strictly and concisely.")
prompt = "\n".join(prompt_parts)
# 5. Call LLM
response = self.client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": self.instructions},
{"role": "user", "content": prompt}
],
temperature=0.0,
max_tokens=512,
)
raw_output = safe_strip(response.choices[0].message.content)
# 6. Only return the single-line answer, with no prefix
return parse_final_answer(raw_output)
# For compatibility with older interface (for "answer_question" import)
def answer_question(question, task_id=None):
agent = GaiaAgent()
return agent(question, task_id) |