|
|
|
|
|
import os |
|
import asyncio |
|
from llama_index.llms.openai import OpenAI |
|
from llama_index.core.agent import FunctionCallingAgent |
|
from llama_index.core.tools import FunctionTool |
|
|
|
from langchain_community.utilities.wikipedia import WikipediaAPIWrapper |
|
from langchain_community.document_loaders import YoutubeLoader |
|
from langchain_experimental.tools.python.tool import PythonREPLTool |
|
|
|
import whisper |
|
import openpyxl |
|
|
|
|
|
if os.getenv("OPENAI_API_KEY"): |
|
print("✅ Detected OPENAI_API_KEY") |
|
else: |
|
print("⚠️ Missing OPENAI_API_KEY – LLM may fail") |
|
|
|
|
|
wiki_api = WikipediaAPIWrapper(top_k_results=1, doc_content_chars_max=1000) |
|
|
|
def search_wikipedia(query: str) -> str: |
|
"""Search Wikipedia for a given query and return relevant summary.""" |
|
try: |
|
return wiki_api.run(query) |
|
except Exception as e: |
|
return f"Tool not available. ({e})" |
|
|
|
|
|
python_tool = PythonREPLTool() |
|
|
|
def run_python_code(code: str) -> str: |
|
"""Run Python code safely.""" |
|
try: |
|
if 'print' not in code: |
|
code = f"print({repr(code)})" |
|
return python_tool.run(code) |
|
except Exception as e: |
|
return f"[PYTHON ERROR] {e}" |
|
|
|
|
|
|
|
def get_youtube_transcript(url: str) -> str: |
|
"""Get transcript from YouTube video.""" |
|
try: |
|
loader = YoutubeLoader.from_youtube_url(url, add_video_info=False) |
|
docs = loader.load() |
|
return " ".join(d.page_content for d in docs) |
|
except Exception as e: |
|
return f"Tool not available. ({e})" |
|
|
|
|
|
|
|
def transcribe_audio(file_path: str) -> str: |
|
if not os.path.exists(file_path): |
|
return "Tool not available. (Missing file)" |
|
try: |
|
model = whisper.load_model("base") |
|
res = model.transcribe(file_path) |
|
return res["text"] |
|
except Exception as e: |
|
return f"Tool not available. ({e})" |
|
|
|
|
|
|
|
def extract_excel_total_food_sales(file_path: str) -> str: |
|
if not os.path.exists(file_path): |
|
return "Tool not available. (Missing file)" |
|
try: |
|
wb = openpyxl.load_workbook(file_path) |
|
sheet = wb.active |
|
total = 0.0 |
|
for _, category, amount in sheet.iter_rows(min_row=2, values_only=True): |
|
if isinstance(category, str) and "food" in category.lower(): |
|
total += float(amount or 0) |
|
return f"${total:.2f}" |
|
except Exception as e: |
|
return f"Tool not available. ({e})" |
|
|
|
|
|
TOOLS = [ |
|
FunctionTool.from_defaults(search_wikipedia, return_direct=True, name="search_wikipedia"), |
|
FunctionTool.from_defaults(run_python_code, return_direct=True, name="run_python"), |
|
FunctionTool.from_defaults(get_youtube_transcript, return_direct=True, name="get_youtube_transcript"), |
|
FunctionTool.from_defaults(transcribe_audio, return_direct=True, name="transcribe_audio"), |
|
FunctionTool.from_defaults(extract_excel_total_food_sales, return_direct=True, name="extract_excel_total_food_sales") |
|
] |
|
|
|
|
|
llm = OpenAI(model="gpt-4") |
|
|
|
agent = FunctionCallingAgent.from_tools( |
|
tools=TOOLS, |
|
llm=llm, |
|
system_prompt=""" |
|
You are a highly capable AI agent completing the GAIA benchmark. |
|
|
|
You MUST: |
|
- Always use tools when available. |
|
- If a tool returns 'None' or fails, try another or say 'Tool not available'. |
|
- Return only final answer in strict format (comma-separated, numbers, names, no explanations). |
|
- Do not guess. If unsure, state 'Tool not available'. |
|
- NEVER return raw tool errors, only result or fallback. |
|
""" |
|
) |
|
|
|
|
|
async def answer_question(question: str) -> str: |
|
try: |
|
response = await agent.achat(question) |
|
return response.response.strip() |
|
except Exception as e: |
|
print("❌ Agent error:", e) |
|
return "[ERROR] " + str(e) |