|
|
|
""" |
|
🚀 GAIA Multi-Agent System - UNIVERSAL MULTIMODAL AI AGENT |
|
Enhanced with comprehensive multimodal capabilities for ANY type of question: |
|
- 🎥 Video Processing & Analysis |
|
- 🎵 Audio Processing & Speech Recognition |
|
- 🎨 Image Generation & Advanced Computer Vision |
|
- 📊 Data Visualization & Chart Generation |
|
- 🎙️ Speech Synthesis & Voice Generation |
|
- 🎬 Video Generation & Editing |
|
- 🧬 Scientific Computing & Analysis |
|
- 📈 Advanced Analytics & Modeling |
|
""" |
|
|
|
import os |
|
import sys |
|
import re |
|
import json |
|
import time |
|
import random |
|
import logging |
|
import requests |
|
import tempfile |
|
import base64 |
|
import hashlib |
|
import subprocess |
|
from typing import Dict, List, Any, Optional, Tuple, Union |
|
from dataclasses import dataclass |
|
from enum import Enum |
|
from urllib.parse import urlparse, urljoin |
|
import math |
|
import statistics |
|
|
|
|
|
try: |
|
from huggingface_hub import InferenceClient |
|
HF_AVAILABLE = True |
|
except ImportError: |
|
HF_AVAILABLE = False |
|
print("⚠️ huggingface_hub not available. AI features limited.") |
|
|
|
try: |
|
import openai |
|
OPENAI_AVAILABLE = True |
|
except ImportError: |
|
OPENAI_AVAILABLE = False |
|
print("⚠️ OpenAI not available. GPT models unavailable.") |
|
|
|
|
|
try: |
|
from bs4 import BeautifulSoup |
|
BS4_AVAILABLE = True |
|
except ImportError: |
|
BS4_AVAILABLE = False |
|
print("⚠️ BeautifulSoup not available. Web scraping limited.") |
|
|
|
|
|
try: |
|
from PIL import Image, ImageDraw, ImageFont |
|
PIL_AVAILABLE = True |
|
except ImportError: |
|
PIL_AVAILABLE = False |
|
print("⚠️ Pillow not available. Image processing limited.") |
|
|
|
|
|
try: |
|
import cv2 |
|
CV2_AVAILABLE = True |
|
except ImportError: |
|
CV2_AVAILABLE = False |
|
print("⚠️ OpenCV not available. Video processing unavailable.") |
|
|
|
|
|
try: |
|
import librosa |
|
import soundfile as sf |
|
AUDIO_AVAILABLE = True |
|
except ImportError: |
|
AUDIO_AVAILABLE = False |
|
print("⚠️ Audio libraries not available. Audio processing unavailable.") |
|
|
|
|
|
try: |
|
import speech_recognition as sr |
|
SPEECH_AVAILABLE = True |
|
except ImportError: |
|
SPEECH_AVAILABLE = False |
|
print("⚠️ Speech recognition not available.") |
|
|
|
|
|
try: |
|
import pyttsx3 |
|
TTS_AVAILABLE = True |
|
except ImportError: |
|
TTS_AVAILABLE = False |
|
print("⚠️ Text-to-speech not available.") |
|
|
|
|
|
try: |
|
import matplotlib.pyplot as plt |
|
import plotly.graph_objects as go |
|
import plotly.express as px |
|
VIZ_AVAILABLE = True |
|
|
|
try: |
|
import seaborn as sns |
|
SEABORN_AVAILABLE = True |
|
except ImportError: |
|
SEABORN_AVAILABLE = False |
|
sns = None |
|
except ImportError: |
|
VIZ_AVAILABLE = False |
|
SEABORN_AVAILABLE = False |
|
plt = None |
|
go = None |
|
px = None |
|
sns = None |
|
print("⚠️ Visualization libraries not available.") |
|
|
|
|
|
try: |
|
import numpy as np |
|
import pandas as pd |
|
import scipy.stats as stats |
|
from sklearn.preprocessing import StandardScaler |
|
from sklearn.cluster import KMeans |
|
SCIENCE_AVAILABLE = True |
|
except ImportError: |
|
SCIENCE_AVAILABLE = False |
|
print("⚠️ Scientific computing libraries not available.") |
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
logger = logging.getLogger(__name__) |
|
|
|
class ToolType(Enum): |
|
"""🛠️ Universal tool types for any content type""" |
|
|
|
WEB_SEARCH = "web_search" |
|
BROWSE_URL = "browse_url" |
|
DOWNLOAD_FILE = "download_file" |
|
READ_PDF = "read_pdf" |
|
ANALYZE_IMAGE = "analyze_image" |
|
CALCULATOR = "calculator" |
|
|
|
|
|
PROCESS_VIDEO = "process_video" |
|
ANALYZE_AUDIO = "analyze_audio" |
|
GENERATE_IMAGE = "generate_image" |
|
SYNTHESIZE_SPEECH = "synthesize_speech" |
|
CREATE_VISUALIZATION = "create_visualization" |
|
ANALYZE_DATA = "analyze_data" |
|
GENERATE_VIDEO = "generate_video" |
|
EXTRACT_AUDIO = "extract_audio" |
|
TRANSCRIBE_SPEECH = "transcribe_speech" |
|
DETECT_OBJECTS = "detect_objects" |
|
FACE_RECOGNITION = "face_recognition" |
|
SCIENTIFIC_COMPUTE = "scientific_compute" |
|
|
|
@dataclass |
|
class ToolCall: |
|
tool: ToolType |
|
parameters: Dict[str, Any] |
|
|
|
class UniversalMultimodalToolkit: |
|
"""🌟 Universal toolkit for processing ANY type of content""" |
|
|
|
def __init__(self, hf_token: str = None, openai_key: str = None): |
|
self.hf_token = hf_token |
|
self.openai_key = openai_key |
|
self.temp_dir = tempfile.mkdtemp() |
|
|
|
|
|
self._init_multimodal_clients() |
|
|
|
def _init_multimodal_clients(self): |
|
"""Initialize all multimodal AI clients""" |
|
self.clients = {} |
|
|
|
if self.hf_token and HF_AVAILABLE: |
|
|
|
self.clients['vision'] = InferenceClient(model="Salesforce/blip-image-captioning-large", token=self.hf_token) |
|
self.clients['image_gen'] = InferenceClient(model="stabilityai/stable-diffusion-xl-base-1.0", token=self.hf_token) |
|
self.clients['object_detection'] = InferenceClient(model="facebook/detr-resnet-50", token=self.hf_token) |
|
|
|
|
|
self.clients['speech_to_text'] = InferenceClient( |
|
provider="hf-inference", |
|
api_key=self.hf_token, |
|
) |
|
self.clients['audio_classification'] = InferenceClient(model="facebook/wav2vec2-base-960h", token=self.hf_token) |
|
|
|
|
|
self.clients['text_gen'] = InferenceClient(model="meta-llama/Meta-Llama-3-8B-Instruct", token=self.hf_token) |
|
|
|
|
|
def process_video(self, video_path: str, task: str = "analyze") -> str: |
|
"""🎥 Process and analyze video content""" |
|
if not CV2_AVAILABLE: |
|
return "❌ Video processing unavailable. Install opencv-python." |
|
|
|
try: |
|
logger.info(f"🎥 Processing video: {video_path} | Task: {task}") |
|
|
|
cap = cv2.VideoCapture(video_path) |
|
if not cap.isOpened(): |
|
return f"❌ Could not open video: {video_path}" |
|
|
|
|
|
fps = cap.get(cv2.CAP_PROP_FPS) |
|
frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) |
|
duration = frame_count / fps if fps > 0 else 0 |
|
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) |
|
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) |
|
|
|
video_info = f"Video: {width}x{height}, {fps:.1f} FPS, {duration:.1f}s, {frame_count} frames" |
|
|
|
if task == "extract_frames": |
|
|
|
frames_extracted = [] |
|
frame_interval = max(1, frame_count // 10) |
|
|
|
for i in range(0, frame_count, frame_interval): |
|
cap.set(cv2.CAP_PROP_POS_FRAMES, i) |
|
ret, frame = cap.read() |
|
if ret: |
|
frame_path = os.path.join(self.temp_dir, f"frame_{i}.jpg") |
|
cv2.imwrite(frame_path, frame) |
|
frames_extracted.append(frame_path) |
|
|
|
cap.release() |
|
|
|
|
|
frame_analyses = [] |
|
for frame_path in frames_extracted[:3]: |
|
analysis = self.analyze_image(frame_path, "Describe what you see in this video frame") |
|
frame_analyses.append(analysis) |
|
|
|
return f"{video_info}. Frame analysis: {'; '.join(frame_analyses)}" |
|
|
|
elif task == "motion_detection": |
|
|
|
ret, frame1 = cap.read() |
|
if not ret: |
|
cap.release() |
|
return f"{video_info}. Motion detection failed." |
|
|
|
frame1_gray = cv2.cvtColor(frame1, cv2.COLOR_BGR2GRAY) |
|
motion_detected = 0 |
|
|
|
while True: |
|
ret, frame2 = cap.read() |
|
if not ret: |
|
break |
|
|
|
frame2_gray = cv2.cvtColor(frame2, cv2.COLOR_BGR2GRAY) |
|
diff = cv2.absdiff(frame1_gray, frame2_gray) |
|
|
|
if cv2.countNonZero(diff) > 5000: |
|
motion_detected += 1 |
|
|
|
frame1_gray = frame2_gray |
|
|
|
cap.release() |
|
motion_percentage = (motion_detected / frame_count) * 100 |
|
|
|
return f"{video_info}. Motion detected in {motion_percentage:.1f}% of frames." |
|
|
|
else: |
|
cap.release() |
|
return f"{video_info}. Basic video analysis complete." |
|
|
|
except Exception as e: |
|
logger.error(f"❌ Video processing error: {e}") |
|
return f"❌ Video processing failed: {e}" |
|
|
|
|
|
def analyze_audio(self, audio_path: str, task: str = "analyze") -> str: |
|
"""🎵 Analyze audio content""" |
|
if not AUDIO_AVAILABLE: |
|
return "❌ Audio processing unavailable. Install librosa and soundfile." |
|
|
|
try: |
|
logger.info(f"🎵 Analyzing audio: {audio_path} | Task: {task}") |
|
|
|
|
|
y, sr = librosa.load(audio_path, sr=None) |
|
duration = len(y) / sr |
|
|
|
audio_info = f"Audio: {duration:.1f}s, {sr} Hz, {len(y)} samples" |
|
|
|
if task == "transcribe": |
|
return self.transcribe_speech(audio_path) |
|
elif task == "features": |
|
|
|
tempo, beats = librosa.beat.beat_track(y=y, sr=sr) |
|
spectral_centroids = librosa.feature.spectral_centroid(y=y, sr=sr)[0] |
|
spectral_rolloff = librosa.feature.spectral_rolloff(y=y, sr=sr)[0] |
|
zero_crossing_rate = librosa.feature.zero_crossing_rate(y)[0] |
|
|
|
features = { |
|
"tempo": float(tempo), |
|
"avg_spectral_centroid": float(np.mean(spectral_centroids)), |
|
"avg_spectral_rolloff": float(np.mean(spectral_rolloff)), |
|
"avg_zero_crossing_rate": float(np.mean(zero_crossing_rate)) |
|
} |
|
|
|
return f"{audio_info}. Features: {json.dumps(features, indent=2)}" |
|
else: |
|
return f"{audio_info}. Basic audio analysis complete." |
|
|
|
except Exception as e: |
|
logger.error(f"❌ Audio analysis error: {e}") |
|
return f"❌ Audio analysis failed: {e}" |
|
|
|
def transcribe_speech(self, audio_path: str) -> str: |
|
"""🎙️ Convert speech to text using Whisper via HuggingFace Inference API""" |
|
try: |
|
logger.info(f"🎙️ Transcribing speech from: {audio_path}") |
|
|
|
if self.hf_token and HF_AVAILABLE and 'speech_to_text' in self.clients: |
|
|
|
try: |
|
result = self.clients['speech_to_text'].automatic_speech_recognition( |
|
audio_path, |
|
model="openai/whisper-large-v3" |
|
) |
|
|
|
if isinstance(result, dict) and 'text' in result: |
|
transcription = result['text'].strip() |
|
elif isinstance(result, str): |
|
transcription = result.strip() |
|
else: |
|
transcription = str(result).strip() |
|
|
|
if transcription: |
|
return f"Transcription: {transcription}" |
|
else: |
|
return "❌ No transcription available" |
|
|
|
except Exception as hf_error: |
|
logger.warning(f"⚠️ HuggingFace speech recognition failed: {hf_error}") |
|
|
|
|
|
|
|
if SPEECH_AVAILABLE: |
|
try: |
|
r = sr.Recognizer() |
|
with sr.AudioFile(audio_path) as source: |
|
audio = r.record(source) |
|
text = r.recognize_google(audio) |
|
return f"Transcription: {text}" |
|
except sr.UnknownValueError: |
|
return "❌ Could not understand audio" |
|
except sr.RequestError as e: |
|
return f"❌ Speech recognition error: {e}" |
|
else: |
|
return "❌ Speech recognition unavailable. Need HuggingFace token or speech_recognition library." |
|
|
|
except Exception as e: |
|
logger.error(f"❌ Transcription error: {e}") |
|
return f"❌ Transcription failed: {e}" |
|
|
|
|
|
def generate_image(self, prompt: str, style: str = "realistic") -> str: |
|
"""🎨 Generate images from text descriptions""" |
|
try: |
|
logger.info(f"🎨 Generating image: {prompt} | Style: {style}") |
|
|
|
if self.hf_token and 'image_gen' in self.clients: |
|
|
|
enhanced_prompt = f"{prompt}, {style} style, high quality, detailed" |
|
|
|
image = self.clients['image_gen'].text_to_image(enhanced_prompt) |
|
|
|
|
|
image_path = os.path.join(self.temp_dir, f"generated_{int(time.time())}.png") |
|
image.save(image_path) |
|
|
|
return f"✅ Image generated and saved to: {image_path}" |
|
|
|
elif self.openai_key and OPENAI_AVAILABLE: |
|
|
|
client = openai.OpenAI(api_key=self.openai_key) |
|
response = client.images.generate( |
|
model="dall-e-3", |
|
prompt=f"{prompt}, {style} style", |
|
size="1024x1024", |
|
quality="standard", |
|
n=1, |
|
) |
|
|
|
image_url = response.data[0].url |
|
|
|
|
|
img_response = requests.get(image_url) |
|
image_path = os.path.join(self.temp_dir, f"dalle_generated_{int(time.time())}.png") |
|
with open(image_path, 'wb') as f: |
|
f.write(img_response.content) |
|
|
|
return f"✅ DALL-E image generated and saved to: {image_path}" |
|
else: |
|
return "❌ Image generation unavailable. Need HuggingFace token or OpenAI key." |
|
|
|
except Exception as e: |
|
logger.error(f"❌ Image generation error: {e}") |
|
return f"❌ Image generation failed: {e}" |
|
|
|
|
|
def synthesize_speech(self, text: str, voice: str = "default") -> str: |
|
"""🎙️ Convert text to speech""" |
|
try: |
|
logger.info(f"🎙️ Synthesizing speech: {text[:50]}... | Voice: {voice}") |
|
|
|
if TTS_AVAILABLE: |
|
engine = pyttsx3.init() |
|
|
|
|
|
voices = engine.getProperty('voices') |
|
if voices and len(voices) > 0: |
|
if voice == "female" and len(voices) > 1: |
|
engine.setProperty('voice', voices[1].id) |
|
else: |
|
engine.setProperty('voice', voices[0].id) |
|
|
|
|
|
engine.setProperty('rate', 150) |
|
engine.setProperty('volume', 0.9) |
|
|
|
|
|
speech_path = os.path.join(self.temp_dir, f"speech_{int(time.time())}.wav") |
|
engine.save_to_file(text, speech_path) |
|
engine.runAndWait() |
|
|
|
return f"✅ Speech synthesized and saved to: {speech_path}" |
|
else: |
|
return "❌ Text-to-speech unavailable. Install pyttsx3." |
|
|
|
except Exception as e: |
|
logger.error(f"❌ Speech synthesis error: {e}") |
|
return f"❌ Speech synthesis failed: {e}" |
|
|
|
|
|
def create_visualization(self, data: Dict[str, Any], chart_type: str = "bar") -> str: |
|
"""📊 Create data visualizations and charts""" |
|
try: |
|
logger.info(f"📊 Creating {chart_type} chart") |
|
|
|
if not VIZ_AVAILABLE: |
|
return "❌ Visualization unavailable. Install matplotlib, seaborn, and plotly." |
|
|
|
|
|
if isinstance(data, dict) and 'x' in data and 'y' in data: |
|
x_data = data['x'] |
|
y_data = data['y'] |
|
title = data.get('title', 'Data Visualization') |
|
else: |
|
return "❌ Invalid data format. Need dict with 'x' and 'y' keys." |
|
|
|
|
|
plt.figure(figsize=(10, 6)) |
|
|
|
if chart_type == "bar": |
|
plt.bar(x_data, y_data) |
|
elif chart_type == "line": |
|
plt.plot(x_data, y_data, marker='o') |
|
elif chart_type == "scatter": |
|
plt.scatter(x_data, y_data) |
|
elif chart_type == "pie": |
|
plt.pie(y_data, labels=x_data, autopct='%1.1f%%') |
|
else: |
|
plt.plot(x_data, y_data) |
|
|
|
plt.title(title) |
|
plt.xlabel(data.get('xlabel', 'X')) |
|
plt.ylabel(data.get('ylabel', 'Y')) |
|
plt.grid(True, alpha=0.3) |
|
|
|
|
|
chart_path = os.path.join(self.temp_dir, f"chart_{int(time.time())}.png") |
|
plt.savefig(chart_path, dpi=300, bbox_inches='tight') |
|
plt.close() |
|
|
|
return f"✅ {chart_type.title()} chart created and saved to: {chart_path}" |
|
|
|
except Exception as e: |
|
logger.error(f"❌ Visualization error: {e}") |
|
return f"❌ Visualization failed: {e}" |
|
|
|
|
|
def scientific_compute(self, operation: str, data: Dict[str, Any]) -> str: |
|
"""🧬 Perform scientific computations and analysis""" |
|
try: |
|
if not SCIENCE_AVAILABLE: |
|
return "❌ Scientific computing unavailable. Install numpy, pandas, scipy, sklearn." |
|
|
|
logger.info(f"🧬 Scientific computation: {operation}") |
|
|
|
if operation == "statistics": |
|
values = data.get('values', []) |
|
if not values: |
|
return "❌ No values provided for statistics" |
|
|
|
result = { |
|
"mean": float(np.mean(values)), |
|
"median": float(np.median(values)), |
|
"std": float(np.std(values)), |
|
"min": float(np.min(values)), |
|
"max": float(np.max(values)), |
|
"variance": float(np.var(values)), |
|
"skewness": float(stats.skew(values)), |
|
"kurtosis": float(stats.kurtosis(values)) |
|
} |
|
|
|
return f"Statistics: {json.dumps(result, indent=2)}" |
|
|
|
elif operation == "correlation": |
|
x = data.get('x', []) |
|
y = data.get('y', []) |
|
if not x or not y or len(x) != len(y): |
|
return "❌ Need equal length x and y arrays for correlation" |
|
|
|
correlation = float(np.corrcoef(x, y)[0, 1]) |
|
p_value = float(stats.pearsonr(x, y)[1]) |
|
|
|
return f"Correlation: {correlation:.4f}, P-value: {p_value:.4f}" |
|
|
|
elif operation == "clustering": |
|
data_points = data.get('data', []) |
|
n_clusters = data.get('clusters', 3) |
|
|
|
if not data_points: |
|
return "❌ No data points provided for clustering" |
|
|
|
|
|
scaler = StandardScaler() |
|
scaled_data = scaler.fit_transform(data_points) |
|
|
|
kmeans = KMeans(n_clusters=n_clusters, random_state=42) |
|
labels = kmeans.fit_predict(scaled_data) |
|
|
|
return f"Clustering complete. Labels: {labels.tolist()}" |
|
|
|
else: |
|
return f"❌ Unknown scientific operation: {operation}" |
|
|
|
except Exception as e: |
|
logger.error(f"❌ Scientific computation error: {e}") |
|
return f"❌ Scientific computation failed: {e}" |
|
|
|
|
|
def detect_objects(self, image_path: str) -> str: |
|
"""🔍 Detect and identify objects in images""" |
|
try: |
|
logger.info(f"🔍 Detecting objects in: {image_path}") |
|
|
|
if self.hf_token and 'object_detection' in self.clients: |
|
with open(image_path, 'rb') as img_file: |
|
result = self.clients['object_detection'].object_detection(img_file.read()) |
|
|
|
if result: |
|
objects = [] |
|
for detection in result: |
|
label = detection.get('label', 'unknown') |
|
score = detection.get('score', 0) |
|
objects.append(f"{label} ({score:.2f})") |
|
|
|
return f"Objects detected: {', '.join(objects)}" |
|
else: |
|
return "No objects detected" |
|
else: |
|
return "❌ Object detection unavailable. Need HuggingFace token." |
|
|
|
except Exception as e: |
|
logger.error(f"❌ Object detection error: {e}") |
|
return f"❌ Object detection failed: {e}" |
|
|
|
|
|
def web_search(self, query: str, num_results: int = 5) -> str: |
|
"""🔍 Enhanced web search with comprehensive crawling and browsing""" |
|
try: |
|
logger.info(f"🔍 Web search: {query}") |
|
|
|
|
|
search_url = f"https://duckduckgo.com/html/?q={requests.utils.quote(query)}" |
|
headers = { |
|
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' |
|
} |
|
|
|
response = requests.get(search_url, headers=headers, timeout=15) |
|
response.raise_for_status() |
|
|
|
if not BS4_AVAILABLE: |
|
return f"⚠️ Search completed but parsing limited. Raw response length: {len(response.text)}" |
|
|
|
soup = BeautifulSoup(response.text, 'html.parser') |
|
results = [] |
|
|
|
|
|
result_selectors = [ |
|
'div.result', |
|
'div[data-result-index]', |
|
'article', |
|
'li.result' |
|
] |
|
|
|
for selector in result_selectors: |
|
search_results = soup.select(selector)[:num_results] |
|
if search_results: |
|
break |
|
else: |
|
search_results = [] |
|
|
|
for result in search_results: |
|
|
|
title_elem = (result.find('a', class_='result__a') or |
|
result.find('h2') or |
|
result.find('h3') or |
|
result.find('a')) |
|
|
|
|
|
snippet_elem = (result.find('a', class_='result__snippet') or |
|
result.find('span', class_='result__snippet') or |
|
result.find('p')) |
|
|
|
if title_elem: |
|
title = title_elem.get_text(strip=True) |
|
url = title_elem.get('href', '') |
|
snippet = snippet_elem.get_text(strip=True) if snippet_elem else "" |
|
|
|
|
|
if url and not url.startswith('http'): |
|
if url.startswith('//'): |
|
url = 'https:' + url |
|
elif url.startswith('/'): |
|
url = 'https://duckduckgo.com' + url |
|
|
|
results.append({ |
|
'title': title, |
|
'url': url, |
|
'snippet': snippet |
|
}) |
|
|
|
if results: |
|
|
|
formatted_results = [] |
|
for i, result in enumerate(results, 1): |
|
formatted_results.append( |
|
f"{i}. {result['title']}\n" |
|
f" {result['snippet']}\n" |
|
f" URL: {result['url']}" |
|
) |
|
|
|
return "\n\n".join(formatted_results) |
|
else: |
|
|
|
try: |
|
alt_url = f"https://html.duckduckgo.com/html/?q={requests.utils.quote(query)}" |
|
alt_response = requests.get(alt_url, headers=headers, timeout=10) |
|
if alt_response.status_code == 200: |
|
return f"Search completed for '{query}' - found {len(alt_response.text)} characters of content" |
|
except: |
|
pass |
|
|
|
return f"🔍 No results found for '{query}'" |
|
|
|
except Exception as e: |
|
logger.error(f"❌ Web search error: {e}") |
|
return f"❌ Web search failed: {e}" |
|
|
|
def browse_url(self, url: str) -> str: |
|
"""🌐 Enhanced web browsing with content extraction""" |
|
try: |
|
logger.info(f"🌐 Browsing URL: {url}") |
|
|
|
headers = { |
|
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36', |
|
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', |
|
'Accept-Language': 'en-US,en;q=0.5', |
|
'Accept-Encoding': 'gzip, deflate', |
|
'Connection': 'keep-alive' |
|
} |
|
|
|
response = requests.get(url, headers=headers, timeout=15, allow_redirects=True) |
|
response.raise_for_status() |
|
|
|
if not BS4_AVAILABLE: |
|
return f"⚠️ URL accessed but content parsing limited. Content length: {len(response.text)}" |
|
|
|
soup = BeautifulSoup(response.text, 'html.parser') |
|
|
|
|
|
for script in soup(["script", "style", "nav", "footer", "header"]): |
|
script.decompose() |
|
|
|
|
|
content_selectors = [ |
|
'main', |
|
'article', |
|
'div[role="main"]', |
|
'div.content', |
|
'div.main-content', |
|
'div.post-content', |
|
'div.entry-content', |
|
'div.article-body', |
|
'section' |
|
] |
|
|
|
main_content = None |
|
for selector in content_selectors: |
|
main_content = soup.select_one(selector) |
|
if main_content: |
|
break |
|
|
|
if not main_content: |
|
main_content = soup.find('body') or soup |
|
|
|
|
|
text_content = main_content.get_text(separator=' ', strip=True) |
|
|
|
|
|
lines = text_content.split('\n') |
|
cleaned_lines = [] |
|
for line in lines: |
|
line = line.strip() |
|
if line and len(line) > 3: |
|
cleaned_lines.append(line) |
|
|
|
content = '\n'.join(cleaned_lines) |
|
|
|
|
|
if len(content) > 3000: |
|
content = content[:3000] + "... [content truncated]" |
|
|
|
return f"📄 Content from {url}:\n\n{content}" |
|
|
|
except Exception as e: |
|
logger.error(f"❌ URL browsing error: {e}") |
|
return f"❌ Failed to browse {url}: {e}" |
|
|
|
def download_file(self, url: str, task_id: str = None) -> str: |
|
"""📥 Download files from URLs or GAIA API""" |
|
try: |
|
logger.info(f"📥 Downloading file from: {url}") |
|
|
|
|
|
if task_id and not url: |
|
gaia_url = f"https://huggingface.co/datasets/gaia-benchmark/GAIA/raw/main/2023/validation/{task_id}" |
|
url = gaia_url |
|
|
|
|
|
headers = { |
|
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36' |
|
} |
|
|
|
|
|
response = requests.get(url, headers=headers, timeout=30, stream=True) |
|
response.raise_for_status() |
|
|
|
|
|
content_type = response.headers.get('content-type', '').lower() |
|
if 'pdf' in content_type: |
|
extension = '.pdf' |
|
elif 'image' in content_type: |
|
if 'jpeg' in content_type or 'jpg' in content_type: |
|
extension = '.jpg' |
|
elif 'png' in content_type: |
|
extension = '.png' |
|
else: |
|
extension = '.img' |
|
elif 'text' in content_type: |
|
extension = '.txt' |
|
else: |
|
|
|
parsed_url = urlparse(url) |
|
path = parsed_url.path |
|
if '.' in path: |
|
extension = '.' + path.split('.')[-1] |
|
else: |
|
extension = '.bin' |
|
|
|
|
|
filename = f"downloaded_file_{task_id or 'temp'}{extension}" |
|
filepath = os.path.join(self.temp_dir, filename) |
|
|
|
with open(filepath, 'wb') as f: |
|
for chunk in response.iter_content(chunk_size=8192): |
|
f.write(chunk) |
|
|
|
logger.info(f"📥 File downloaded to: {filepath}") |
|
return filepath |
|
|
|
except Exception as e: |
|
logger.error(f"❌ File download error: {e}") |
|
return f"❌ Download failed: {e}" |
|
|
|
def read_pdf(self, file_path: str) -> str: |
|
"""📄 Read and extract text from PDF files""" |
|
try: |
|
logger.info(f"📄 Reading PDF: {file_path}") |
|
|
|
|
|
try: |
|
import PyPDF2 |
|
PDF_AVAILABLE = True |
|
except ImportError: |
|
PDF_AVAILABLE = False |
|
|
|
if not PDF_AVAILABLE: |
|
return "❌ PDF reading unavailable. Install PyPDF2." |
|
|
|
with open(file_path, 'rb') as file: |
|
pdf_reader = PyPDF2.PdfReader(file) |
|
text_content = [] |
|
|
|
for page_num, page in enumerate(pdf_reader.pages): |
|
try: |
|
text = page.extract_text() |
|
if text.strip(): |
|
text_content.append(f"[Page {page_num + 1}]\n{text}") |
|
except Exception as page_error: |
|
logger.warning(f"⚠️ Error reading page {page_num + 1}: {page_error}") |
|
text_content.append(f"[Page {page_num + 1}] - Error reading page") |
|
|
|
full_text = "\n\n".join(text_content) |
|
|
|
|
|
if len(full_text) > 5000: |
|
full_text = full_text[:5000] + "... [content truncated]" |
|
|
|
return full_text |
|
|
|
except Exception as e: |
|
logger.error(f"❌ PDF reading error: {e}") |
|
return f"❌ Failed to read PDF: {e}" |
|
|
|
def calculator(self, expression: str) -> str: |
|
"""🧮 Enhanced mathematical calculator with scientific functions""" |
|
try: |
|
logger.info(f"🧮 Calculating: {expression}") |
|
|
|
|
|
import math |
|
import statistics |
|
|
|
|
|
expression = expression.strip() |
|
|
|
|
|
safe_dict = { |
|
"__builtins__": {}, |
|
"abs": abs, |
|
"round": round, |
|
"min": min, |
|
"max": max, |
|
"sum": sum, |
|
"len": len, |
|
"pow": pow, |
|
"sqrt": math.sqrt, |
|
"sin": math.sin, |
|
"cos": math.cos, |
|
"tan": math.tan, |
|
"log": math.log, |
|
"log10": math.log10, |
|
"exp": math.exp, |
|
"pi": math.pi, |
|
"e": math.e, |
|
"factorial": math.factorial, |
|
"mean": statistics.mean, |
|
"median": statistics.median, |
|
"mode": statistics.mode, |
|
"stdev": statistics.stdev, |
|
} |
|
|
|
|
|
result = eval(expression, safe_dict, {}) |
|
|
|
|
|
if isinstance(result, float): |
|
if result.is_integer(): |
|
return str(int(result)) |
|
else: |
|
return f"{result:.6f}".rstrip('0').rstrip('.') |
|
else: |
|
return str(result) |
|
|
|
except Exception as e: |
|
logger.error(f"❌ Calculation error: {e}") |
|
return f"❌ Calculation failed: {e}" |
|
|
|
def analyze_image(self, image_path: str, question: str = "") -> str: |
|
"""🖼️ Enhanced image analysis with multiple AI models""" |
|
if not PIL_AVAILABLE: |
|
return "❌ Image analysis unavailable. Install Pillow." |
|
|
|
try: |
|
logger.info(f"🖼️ Analyzing image: {image_path} | Question: {question}") |
|
|
|
|
|
with Image.open(image_path) as img: |
|
basic_info = f"Image: {img.size[0]}x{img.size[1]} pixels, format: {img.format}, mode: {img.mode}" |
|
|
|
|
|
analyses = [] |
|
|
|
|
|
if self.openai_key and question: |
|
try: |
|
with open(image_path, 'rb') as img_file: |
|
img_base64 = base64.b64encode(img_file.read()).decode('utf-8') |
|
|
|
client = openai.OpenAI(api_key=self.openai_key) |
|
response = client.chat.completions.create( |
|
model="gpt-4o", |
|
messages=[ |
|
{ |
|
"role": "user", |
|
"content": [ |
|
{"type": "text", "text": f"Analyze this image and answer: {question}. Provide only the direct answer, no explanations."}, |
|
{ |
|
"type": "image_url", |
|
"image_url": {"url": f"data:image/jpeg;base64,{img_base64}"} |
|
} |
|
] |
|
} |
|
], |
|
max_tokens=300 |
|
) |
|
|
|
gpt4v_result = response.choices[0].message.content.strip() |
|
analyses.append(f"GPT-4V: {gpt4v_result}") |
|
|
|
except Exception as vision_error: |
|
logger.warning(f"⚠️ GPT-4V analysis failed: {vision_error}") |
|
|
|
|
|
if self.hf_token and 'vision' in self.clients: |
|
try: |
|
with open(image_path, 'rb') as img_file: |
|
caption = self.clients['vision'].image_to_text(img_file.read()) |
|
if caption: |
|
analyses.append(f"BLIP: {caption[0].get('generated_text', 'No caption')}") |
|
except Exception as hf_error: |
|
logger.warning(f"⚠️ HuggingFace vision analysis failed: {hf_error}") |
|
|
|
|
|
if question and "object" in question.lower(): |
|
obj_result = self.detect_objects(image_path) |
|
if not obj_result.startswith("❌"): |
|
analyses.append(f"Objects: {obj_result}") |
|
|
|
|
|
if analyses: |
|
combined_analysis = "; ".join(analyses) |
|
return f"{basic_info}. Analysis: {combined_analysis}" |
|
else: |
|
return f"{basic_info}. Advanced vision analysis requires API keys." |
|
|
|
except Exception as e: |
|
logger.error(f"❌ Image analysis error: {e}") |
|
return f"❌ Image analysis failed: {e}" |
|
|
|
|
|
def read_docx(self, file_path: str) -> str: |
|
"""📄 Read Microsoft Word documents""" |
|
try: |
|
import docx2txt |
|
text = docx2txt.process(file_path) |
|
logger.info(f"📄 DOCX read: {len(text)} characters") |
|
return text |
|
except ImportError: |
|
logger.warning("⚠️ docx2txt not available. Install python-docx.") |
|
return "❌ DOCX reading unavailable. Install python-docx." |
|
except Exception as e: |
|
logger.error(f"❌ DOCX reading error: {e}") |
|
return f"❌ DOCX reading failed: {e}" |
|
|
|
def read_excel(self, file_path: str, sheet_name: str = None) -> str: |
|
"""📊 Read Excel spreadsheets""" |
|
try: |
|
import pandas as pd |
|
if sheet_name: |
|
df = pd.read_excel(file_path, sheet_name=sheet_name) |
|
else: |
|
df = pd.read_excel(file_path) |
|
|
|
|
|
result = f"Excel data ({df.shape[0]} rows, {df.shape[1]} columns):\n" |
|
result += df.to_string(max_rows=50, max_cols=10) |
|
|
|
logger.info(f"📊 Excel read: {df.shape}") |
|
return result |
|
except ImportError: |
|
logger.warning("⚠️ pandas not available for Excel reading.") |
|
return "❌ Excel reading unavailable. Install pandas and openpyxl." |
|
except Exception as e: |
|
logger.error(f"❌ Excel reading error: {e}") |
|
return f"❌ Excel reading failed: {e}" |
|
|
|
def read_csv(self, file_path: str) -> str: |
|
"""📋 Read CSV files""" |
|
try: |
|
import pandas as pd |
|
df = pd.read_csv(file_path) |
|
|
|
|
|
result = f"CSV data ({df.shape[0]} rows, {df.shape[1]} columns):\n" |
|
result += df.head(20).to_string() |
|
|
|
if df.shape[0] > 20: |
|
result += f"\n... (showing first 20 of {df.shape[0]} rows)" |
|
|
|
logger.info(f"📋 CSV read: {df.shape}") |
|
return result |
|
except ImportError: |
|
logger.warning("⚠️ pandas not available for CSV reading.") |
|
return "❌ CSV reading unavailable. Install pandas." |
|
except Exception as e: |
|
logger.error(f"❌ CSV reading error: {e}") |
|
return f"❌ CSV reading failed: {e}" |
|
|
|
def read_text_file(self, file_path: str, encoding: str = 'utf-8') -> str: |
|
"""📝 Read plain text files with encoding detection""" |
|
try: |
|
|
|
try: |
|
with open(file_path, 'r', encoding='utf-8') as f: |
|
content = f.read() |
|
except UnicodeDecodeError: |
|
|
|
encodings = ['latin-1', 'cp1252', 'ascii'] |
|
content = None |
|
for enc in encodings: |
|
try: |
|
with open(file_path, 'r', encoding=enc) as f: |
|
content = f.read() |
|
break |
|
except UnicodeDecodeError: |
|
continue |
|
|
|
if content is None: |
|
return "❌ Unable to decode text file with common encodings" |
|
|
|
logger.info(f"📝 Text file read: {len(content)} characters") |
|
return content[:10000] + ("..." if len(content) > 10000 else "") |
|
except Exception as e: |
|
logger.error(f"❌ Text file reading error: {e}") |
|
return f"❌ Text file reading failed: {e}" |
|
|
|
def extract_archive(self, file_path: str) -> str: |
|
"""📦 Extract and list archive contents (ZIP, RAR, etc.)""" |
|
try: |
|
import zipfile |
|
import os |
|
|
|
if file_path.endswith('.zip'): |
|
with zipfile.ZipFile(file_path, 'r') as zip_ref: |
|
file_list = zip_ref.namelist() |
|
extract_dir = os.path.join(os.path.dirname(file_path), 'extracted') |
|
os.makedirs(extract_dir, exist_ok=True) |
|
zip_ref.extractall(extract_dir) |
|
|
|
result = f"📦 ZIP archive extracted to {extract_dir}\n" |
|
result += f"Contents ({len(file_list)} files):\n" |
|
result += "\n".join(file_list[:20]) |
|
|
|
if len(file_list) > 20: |
|
result += f"\n... (showing first 20 of {len(file_list)} files)" |
|
|
|
logger.info(f"📦 ZIP extracted: {len(file_list)} files") |
|
return result |
|
else: |
|
return f"❌ Unsupported archive format: {file_path}" |
|
except Exception as e: |
|
logger.error(f"❌ Archive extraction error: {e}") |
|
return f"❌ Archive extraction failed: {e}" |
|
|
|
|
|
def browse_with_js(self, url: str) -> str: |
|
"""🌐 Enhanced web browsing with JavaScript support (when available)""" |
|
try: |
|
|
|
from playwright.sync_api import sync_playwright |
|
|
|
with sync_playwright() as p: |
|
browser = p.chromium.launch(headless=True) |
|
page = browser.new_page() |
|
page.goto(url, timeout=15000) |
|
page.wait_for_timeout(2000) |
|
content = page.content() |
|
browser.close() |
|
|
|
|
|
from bs4 import BeautifulSoup |
|
soup = BeautifulSoup(content, 'html.parser') |
|
|
|
|
|
for script in soup(["script", "style"]): |
|
script.decompose() |
|
|
|
text = soup.get_text() |
|
|
|
lines = (line.strip() for line in text.splitlines()) |
|
chunks = (phrase.strip() for line in lines for phrase in line.split(" ")) |
|
clean_text = ' '.join(chunk for chunk in chunks if chunk) |
|
|
|
logger.info(f"🌐 JS-enabled browsing: {url} - {len(clean_text)} chars") |
|
return clean_text[:5000] + ("..." if len(clean_text) > 5000 else "") |
|
|
|
except ImportError: |
|
logger.info("⚠️ Playwright not available, falling back to requests") |
|
return self.browse_url(url) |
|
except Exception as e: |
|
logger.warning(f"⚠️ JS browsing failed: {e}, falling back to basic") |
|
return self.browse_url(url) |
|
|
|
|
|
def download_gaia_file(self, task_id: str, file_name: str = None) -> str: |
|
"""📥 Enhanced GAIA file download with comprehensive format support""" |
|
try: |
|
|
|
api_base = "https://agents-course-unit4-scoring.hf.space" |
|
file_url = f"{api_base}/files/{task_id}" |
|
|
|
logger.info(f"📥 Downloading GAIA file for task: {task_id}") |
|
|
|
headers = { |
|
'User-Agent': 'GAIA-Agent/1.0 (Enhanced)', |
|
'Accept': '*/*', |
|
'Accept-Encoding': 'gzip, deflate', |
|
} |
|
|
|
response = requests.get(file_url, headers=headers, timeout=30, stream=True) |
|
|
|
if response.status_code == 200: |
|
|
|
content_type = response.headers.get('content-type', '') |
|
content_disposition = response.headers.get('content-disposition', '') |
|
|
|
|
|
if file_name: |
|
filename = file_name |
|
elif 'filename=' in content_disposition: |
|
filename = content_disposition.split('filename=')[1].strip('"\'') |
|
else: |
|
|
|
extension_map = { |
|
'image/jpeg': '.jpg', |
|
'image/png': '.png', |
|
'image/gif': '.gif', |
|
'application/pdf': '.pdf', |
|
'text/plain': '.txt', |
|
'application/json': '.json', |
|
'text/csv': '.csv', |
|
'application/vnd.ms-excel': '.xlsx', |
|
'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet': '.xlsx', |
|
'application/msword': '.docx', |
|
'video/mp4': '.mp4', |
|
'audio/mpeg': '.mp3', |
|
'audio/wav': '.wav', |
|
'application/zip': '.zip', |
|
} |
|
extension = extension_map.get(content_type, '.tmp') |
|
filename = f"gaia_file_{task_id}{extension}" |
|
|
|
|
|
import tempfile |
|
import os |
|
|
|
temp_dir = tempfile.gettempdir() |
|
filepath = os.path.join(temp_dir, filename) |
|
|
|
with open(filepath, 'wb') as f: |
|
for chunk in response.iter_content(chunk_size=8192): |
|
f.write(chunk) |
|
|
|
file_size = os.path.getsize(filepath) |
|
logger.info(f"📥 GAIA file downloaded: {filepath} ({file_size} bytes)") |
|
|
|
|
|
return self.process_downloaded_file(filepath, task_id) |
|
|
|
else: |
|
error_msg = f"❌ GAIA file download failed: HTTP {response.status_code}" |
|
logger.error(error_msg) |
|
return error_msg |
|
|
|
except Exception as e: |
|
error_msg = f"❌ GAIA file download error: {e}" |
|
logger.error(error_msg) |
|
return error_msg |
|
|
|
def process_downloaded_file(self, filepath: str, task_id: str) -> str: |
|
"""📋 Process downloaded GAIA files based on their type""" |
|
try: |
|
import os |
|
filename = os.path.basename(filepath) |
|
file_ext = os.path.splitext(filename)[1].lower() |
|
|
|
logger.info(f"📋 Processing GAIA file: {filename} (type: {file_ext})") |
|
|
|
result = f"📁 GAIA File: {filename} (Task: {task_id})\n\n" |
|
|
|
|
|
if file_ext in ['.jpg', '.jpeg', '.png', '.gif', '.bmp', '.webp']: |
|
|
|
image_result = self.analyze_image(filepath, "Describe this image in detail") |
|
result += f"🖼️ Image Analysis:\n{image_result}\n" |
|
|
|
elif file_ext == '.pdf': |
|
|
|
pdf_content = self.read_pdf(filepath) |
|
result += f"📄 PDF Content:\n{pdf_content}\n" |
|
|
|
elif file_ext in ['.txt', '.md', '.py', '.js', '.html', '.css']: |
|
|
|
text_content = self.read_text_file(filepath) |
|
result += f"📝 Text Content:\n{text_content}\n" |
|
|
|
elif file_ext in ['.csv']: |
|
|
|
csv_content = self.read_csv(filepath) |
|
result += f"📊 CSV Data:\n{csv_content}\n" |
|
|
|
elif file_ext in ['.xlsx', '.xls']: |
|
|
|
excel_content = self.read_excel(filepath) |
|
result += f"📈 Excel Data:\n{excel_content}\n" |
|
|
|
elif file_ext in ['.docx']: |
|
|
|
docx_content = self.read_docx(filepath) |
|
result += f"📄 Word Document:\n{docx_content}\n" |
|
|
|
elif file_ext in ['.mp4', '.avi', '.mov', '.wmv']: |
|
|
|
video_result = self.process_video(filepath, "analyze") |
|
result += f"🎥 Video Analysis:\n{video_result}\n" |
|
|
|
elif file_ext in ['.mp3', '.wav', '.m4a', '.flac']: |
|
|
|
audio_result = self.analyze_audio(filepath, "transcribe") |
|
result += f"🎵 Audio Analysis:\n{audio_result}\n" |
|
|
|
elif file_ext in ['.zip', '.rar']: |
|
|
|
archive_result = self.extract_archive(filepath) |
|
result += f"📦 Archive Contents:\n{archive_result}\n" |
|
|
|
elif file_ext in ['.json']: |
|
|
|
try: |
|
import json |
|
with open(filepath, 'r') as f: |
|
json_data = json.load(f) |
|
result += f"📋 JSON Data:\n{json.dumps(json_data, indent=2)[:2000]}\n" |
|
except Exception as e: |
|
result += f"❌ JSON parsing error: {e}\n" |
|
|
|
else: |
|
|
|
try: |
|
text_content = self.read_text_file(filepath) |
|
result += f"📄 Raw Content:\n{text_content}\n" |
|
except: |
|
result += f"❌ Unsupported file type: {file_ext}\n" |
|
|
|
|
|
file_size = os.path.getsize(filepath) |
|
result += f"\n📊 File Info: {file_size} bytes, Path: {filepath}" |
|
|
|
return result |
|
|
|
except Exception as e: |
|
error_msg = f"❌ File processing error: {e}" |
|
logger.error(error_msg) |
|
return error_msg |
|
|
|
|
|
def reasoning_chain(self, question: str, max_steps: int = 5) -> str: |
|
"""🧠 Explicit step-by-step reasoning for complex GAIA questions""" |
|
try: |
|
logger.info(f"🧠 Starting reasoning chain for: {question[:50]}...") |
|
|
|
reasoning_steps = [] |
|
current_context = question |
|
|
|
for step in range(1, max_steps + 1): |
|
logger.info(f"🧠 Reasoning step {step}/{max_steps}") |
|
|
|
|
|
analysis_prompt = f"""Analyze this question step by step: |
|
|
|
Question: {question} |
|
|
|
Previous context: {current_context} |
|
|
|
What is the next logical step to solve this question? Be specific about: |
|
1. What information do we need? |
|
2. What tool should we use? |
|
3. What specific action to take? |
|
|
|
Respond with just the next action needed.""" |
|
|
|
|
|
next_step = self.fast_qa_answer(analysis_prompt) |
|
reasoning_steps.append(f"Step {step}: {next_step}") |
|
|
|
|
|
if any(tool in next_step.lower() for tool in ['search', 'download', 'calculate', 'analyze', 'read']): |
|
|
|
if 'search' in next_step.lower(): |
|
search_query = self._extract_search_query(next_step, question) |
|
if search_query: |
|
search_result = self.web_search(search_query) |
|
current_context += f"\n\nSearch result: {search_result[:500]}" |
|
reasoning_steps.append(f" → Executed search: {search_result[:100]}...") |
|
|
|
elif 'calculate' in next_step.lower(): |
|
calc_expr = self._extract_calculation(next_step, question) |
|
if calc_expr: |
|
calc_result = self.calculator(calc_expr) |
|
current_context += f"\n\nCalculation: {calc_expr} = {calc_result}" |
|
reasoning_steps.append(f" → Calculated: {calc_expr} = {calc_result}") |
|
|
|
|
|
if self._has_sufficient_info(current_context, question): |
|
reasoning_steps.append(f"Step {step + 1}: Sufficient information gathered") |
|
break |
|
|
|
|
|
final_prompt = f"""Based on this reasoning chain, provide the final answer: |
|
|
|
Question: {question} |
|
|
|
Reasoning steps: |
|
{chr(10).join(reasoning_steps)} |
|
|
|
Context: {current_context} |
|
|
|
Provide ONLY the final answer - no explanation.""" |
|
|
|
final_answer = self.fast_qa_answer(final_prompt) |
|
|
|
logger.info(f"🧠 Reasoning chain complete: {len(reasoning_steps)} steps") |
|
return final_answer |
|
|
|
except Exception as e: |
|
logger.error(f"❌ Reasoning chain error: {e}") |
|
return self.query_with_tools(question) |
|
|
|
def _extract_search_query(self, step_text: str, question: str) -> str: |
|
"""Extract search query from reasoning step""" |
|
|
|
if 'search for' in step_text.lower(): |
|
parts = step_text.lower().split('search for')[1].split('.')[0] |
|
return parts.strip(' "\'') |
|
return None |
|
|
|
def _extract_calculation(self, step_text: str, question: str) -> str: |
|
"""Extract calculation from reasoning step""" |
|
import re |
|
|
|
math_patterns = [ |
|
r'[\d+\-*/().\s]+', |
|
r'\d+\s*[+\-*/]\s*\d+', |
|
] |
|
for pattern in math_patterns: |
|
matches = re.findall(pattern, step_text) |
|
if matches: |
|
return matches[0].strip() |
|
return None |
|
|
|
def _has_sufficient_info(self, context: str, question: str) -> bool: |
|
"""Check if we have sufficient information to answer""" |
|
|
|
return len(context) > len(question) * 3 and len(context) > 200 |
|
|
|
|
|
|
|
|
|
|
|
class EnhancedMultiModelGAIASystem: |
|
"""🚀 Complete GAIA system with advanced tool calling and multi-modal capabilities""" |
|
|
|
def __init__(self, hf_token: str = None, openai_key: str = None): |
|
|
|
self.toolkit = UniversalMultimodalToolkit(hf_token, openai_key) |
|
|
|
|
|
self.hf_token = hf_token or os.getenv('HF_TOKEN') |
|
self.openai_key = openai_key or os.getenv('OPENAI_API_KEY') |
|
|
|
|
|
self.response_cache = {} |
|
self.qa_cache = {} |
|
|
|
|
|
self.clients = self._initialize_clients() |
|
|
|
available_models = list(self.clients.keys()) |
|
|
|
|
|
preferred_order = [ |
|
"fireworks_qwen3_235b", |
|
"together_deepseek_r1", |
|
"openai_gpt4o", |
|
"together_llama", |
|
"novita_minimax", |
|
"featherless_kimi", |
|
"fallback_basic" |
|
] |
|
|
|
|
|
self.model_priority = [model for model in preferred_order if model in available_models] |
|
|
|
if not self.model_priority: |
|
logger.error("❌ No models available for processing") |
|
else: |
|
logger.info(f"🎯 Model priority: {self.model_priority[0]} (top priority)") |
|
|
|
logger.info("🚀 Enhanced Multi-Model GAIA System initialized") |
|
|
|
def _initialize_clients(self) -> Dict[str, Any]: |
|
"""Initialize all AI model clients with SPEED OPTIMIZATION for 100% GAIA performance""" |
|
clients = {} |
|
|
|
if self.hf_token and HF_AVAILABLE: |
|
|
|
clients["ultra_fast_qa"] = { |
|
"client": InferenceClient( |
|
provider="hf-inference", |
|
api_key=self.hf_token, |
|
), |
|
"model": "deepset/roberta-base-squad2", |
|
"priority": 0, |
|
"provider": "HuggingFace QA", |
|
"type": "question_answering", |
|
"speed": "ultra_fast", |
|
"use_for": ["factual", "simple", "direct"] |
|
} |
|
|
|
|
|
clients["fast_bert_qa"] = { |
|
"client": InferenceClient( |
|
provider="hf-inference", |
|
api_key=self.hf_token, |
|
), |
|
"model": "deepset/bert-base-cased-squad2", |
|
"priority": 0.5, |
|
"provider": "HuggingFace QA", |
|
"type": "question_answering", |
|
"speed": "very_fast", |
|
"use_for": ["reading_comprehension", "context_based"] |
|
} |
|
|
|
|
|
clients["together_deepseek_r1"] = { |
|
"client": InferenceClient(model="deepseek-ai/DeepSeek-R1", token=self.hf_token), |
|
"priority": 1, |
|
"provider": "Together AI", |
|
"type": "chat", |
|
"speed": "fast" |
|
} |
|
clients["together_llama"] = { |
|
"client": InferenceClient(model="meta-llama/Llama-3.3-70B-Instruct", token=self.hf_token), |
|
"priority": 2, |
|
"provider": "Together AI", |
|
"type": "chat", |
|
"speed": "medium" |
|
} |
|
|
|
|
|
clients["novita_minimax"] = { |
|
"client": InferenceClient(model="MiniMax/MiniMax-M1-80k", token=self.hf_token), |
|
"priority": 3, |
|
"provider": "Novita AI", |
|
"type": "chat", |
|
"speed": "fast" |
|
} |
|
clients["novita_deepseek_chat"] = { |
|
"client": InferenceClient(model="deepseek-ai/deepseek-chat", token=self.hf_token), |
|
"priority": 4, |
|
"provider": "Novita AI", |
|
"type": "chat", |
|
"speed": "fast" |
|
} |
|
|
|
|
|
clients["featherless_kimi"] = { |
|
"client": InferenceClient(model="moonshot-ai/moonshot-v1-8k", token=self.hf_token), |
|
"priority": 5, |
|
"provider": "Featherless AI", |
|
"type": "chat", |
|
"speed": "medium" |
|
} |
|
clients["featherless_jan"] = { |
|
"client": InferenceClient(model="janhq/jan-nano", token=self.hf_token), |
|
"priority": 6, |
|
"provider": "Featherless AI", |
|
"type": "chat", |
|
"speed": "very_fast" |
|
} |
|
|
|
|
|
clients["fireworks_qwen3_235b"] = { |
|
"client": InferenceClient( |
|
provider="fireworks-ai", |
|
api_key=self.hf_token, |
|
), |
|
"model": "Qwen/Qwen3-235B-A22B", |
|
"priority": 0.1, |
|
"provider": "Fireworks AI", |
|
"type": "chat", |
|
"speed": "fast" |
|
} |
|
clients["fireworks_llama"] = { |
|
"client": InferenceClient(model="accounts/fireworks/models/llama-v3p1-8b-instruct", token=self.hf_token), |
|
"priority": 7, |
|
"provider": "Fireworks AI", |
|
"type": "chat", |
|
"speed": "very_fast" |
|
} |
|
|
|
|
|
clients["hf_mistral"] = { |
|
"client": InferenceClient(model="mistralai/Mistral-7B-Instruct-v0.1", token=self.hf_token), |
|
"priority": 8, |
|
"provider": "HuggingFace", |
|
"type": "chat", |
|
"speed": "fast" |
|
} |
|
clients["hf_phi"] = { |
|
"client": InferenceClient(model="microsoft/Phi-3-mini-4k-instruct", token=self.hf_token), |
|
"priority": 9, |
|
"provider": "HuggingFace", |
|
"type": "chat", |
|
"speed": "ultra_fast" |
|
} |
|
|
|
|
|
if self.openai_key and OPENAI_AVAILABLE: |
|
clients["openai_gpt4o"] = { |
|
"client": "openai_gpt4o", |
|
"model": "gpt-4o", |
|
"priority": 1.5, |
|
"provider": "OpenAI", |
|
"type": "chat", |
|
"speed": "medium" |
|
} |
|
clients["openai_gpt35"] = { |
|
"client": "openai_gpt35", |
|
"model": "gpt-3.5-turbo", |
|
"priority": 10, |
|
"provider": "OpenAI", |
|
"type": "chat", |
|
"speed": "fast" |
|
} |
|
|
|
|
|
if not clients: |
|
clients["fallback_basic"] = { |
|
"client": "fallback", |
|
"model": "basic", |
|
"priority": 999, |
|
"provider": "Local Fallback", |
|
"type": "fallback", |
|
"speed": "instant" |
|
} |
|
logger.warning("⚠️ No external AI services available, using fallback mode") |
|
|
|
logger.info(f"✅ Initialized {len(clients)} AI clients with speed optimization") |
|
return clients |
|
|
|
def parse_tool_calls(self, response: str) -> List[ToolCall]: |
|
"""🔧 Parse advanced tool calls from AI response""" |
|
tool_calls = [] |
|
|
|
|
|
patterns = [ |
|
r'TOOL_CALL:\s*(\w+)\((.*?)\)', |
|
r'<tool>(\w+)</tool>\s*<params>(.*?)</params>', |
|
r'```(\w+)\n(.*?)\n```', |
|
] |
|
|
|
for pattern in patterns: |
|
matches = re.findall(pattern, response, re.DOTALL | re.IGNORECASE) |
|
for tool_name, params_str in matches: |
|
try: |
|
params = self._parse_parameters(params_str) |
|
tool_type = ToolType(tool_name.lower()) |
|
tool_calls.append(ToolCall(tool=tool_type, parameters=params)) |
|
logger.info(f"🔧 Parsed tool call: {tool_name} with params: {params}") |
|
except (ValueError, Exception) as e: |
|
logger.warning(f"⚠️ Failed to parse tool call {tool_name}: {e}") |
|
|
|
return tool_calls |
|
|
|
def _parse_parameters(self, params_str: str) -> Dict[str, Any]: |
|
"""Parse parameters from various formats""" |
|
params = {} |
|
if not params_str.strip(): |
|
return params |
|
|
|
|
|
try: |
|
return json.loads(params_str) |
|
except: |
|
pass |
|
|
|
|
|
param_matches = re.findall(r'(\w+)=(["\'])(.*?)\2', params_str) |
|
for param_name, quote, param_value in param_matches: |
|
params[param_name] = param_value |
|
|
|
|
|
if not params and params_str.strip(): |
|
|
|
clean_param = params_str.strip().strip('"\'') |
|
params['query'] = clean_param |
|
|
|
return params |
|
|
|
def execute_tool_call(self, tool_call: ToolCall) -> str: |
|
"""⚡ Execute a single tool call with comprehensive error handling""" |
|
try: |
|
logger.info(f"⚡ Executing {tool_call.tool.value} with params: {tool_call.parameters}") |
|
|
|
if tool_call.tool == ToolType.WEB_SEARCH: |
|
query = tool_call.parameters.get('query', '') |
|
results = self.toolkit.web_search(query) |
|
return f"🔍 Web search results:\n{results}" |
|
|
|
elif tool_call.tool == ToolType.BROWSE_URL: |
|
url = tool_call.parameters.get('url', '') |
|
result = self.toolkit.browse_url(url) |
|
return result |
|
|
|
elif tool_call.tool == ToolType.DOWNLOAD_FILE: |
|
task_id = tool_call.parameters.get('task_id', '') |
|
url = tool_call.parameters.get('url', '') |
|
filepath = self.toolkit.download_file(url, task_id) |
|
return f"📥 Downloaded file to: {filepath}" |
|
|
|
elif tool_call.tool == ToolType.READ_PDF: |
|
file_path = tool_call.parameters.get('file_path', '') |
|
text = self.toolkit.read_pdf(file_path) |
|
return f"📄 PDF content:\n{text}" |
|
|
|
elif tool_call.tool == ToolType.ANALYZE_IMAGE: |
|
image_path = tool_call.parameters.get('image_path', '') |
|
question = tool_call.parameters.get('question', '') |
|
result = self.toolkit.analyze_image(image_path, question) |
|
return f"🖼️ Image analysis: {result}" |
|
|
|
elif tool_call.tool == ToolType.CALCULATOR: |
|
expression = tool_call.parameters.get('expression', '') |
|
result = self.toolkit.calculator(expression) |
|
return f"🧮 Calculation result: {result}" |
|
|
|
elif tool_call.tool == ToolType.PROCESS_VIDEO: |
|
video_path = tool_call.parameters.get('video_path', '') |
|
task = tool_call.parameters.get('task', 'analyze') |
|
result = self.toolkit.process_video(video_path, task) |
|
return f"🎥 Video analysis: {result}" |
|
|
|
elif tool_call.tool == ToolType.ANALYZE_AUDIO: |
|
audio_path = tool_call.parameters.get('audio_path', '') |
|
task = tool_call.parameters.get('task', 'analyze') |
|
result = self.toolkit.analyze_audio(audio_path, task) |
|
return f"🎵 Audio analysis: {result}" |
|
|
|
elif tool_call.tool == ToolType.GENERATE_IMAGE: |
|
prompt = tool_call.parameters.get('prompt', '') |
|
style = tool_call.parameters.get('style', 'realistic') |
|
result = self.toolkit.generate_image(prompt, style) |
|
return f"🎨 Image generation: {result}" |
|
|
|
elif tool_call.tool == ToolType.SYNTHESIZE_SPEECH: |
|
text = tool_call.parameters.get('text', '') |
|
voice = tool_call.parameters.get('voice', 'default') |
|
result = self.toolkit.synthesize_speech(text, voice) |
|
return f"🎙️ Speech synthesis: {result}" |
|
|
|
elif tool_call.tool == ToolType.CREATE_VISUALIZATION: |
|
data = tool_call.parameters.get('data', {}) |
|
chart_type = tool_call.parameters.get('chart_type', 'bar') |
|
result = self.toolkit.create_visualization(data, chart_type) |
|
return f"📊 Data visualization: {result}" |
|
|
|
elif tool_call.tool == ToolType.ANALYZE_DATA: |
|
data = tool_call.parameters.get('data', {}) |
|
operation = tool_call.parameters.get('operation', 'statistics') |
|
result = self.toolkit.scientific_compute(operation, data) |
|
return f"🧬 Scientific computation: {result}" |
|
|
|
elif tool_call.tool == ToolType.GENERATE_VIDEO: |
|
video_path = tool_call.parameters.get('video_path', '') |
|
result = self.toolkit.process_video(video_path, 'generate') |
|
return f"🎬 Video generation: {result}" |
|
|
|
elif tool_call.tool == ToolType.EXTRACT_AUDIO: |
|
audio_path = tool_call.parameters.get('audio_path', '') |
|
result = self.toolkit.analyze_audio(audio_path, 'extract') |
|
return f"🎵 Audio extraction: {result}" |
|
|
|
elif tool_call.tool == ToolType.TRANSCRIBE_SPEECH: |
|
audio_path = tool_call.parameters.get('audio_path', '') |
|
result = self.toolkit.transcribe_speech(audio_path) |
|
return f"🎙️ Speech transcription: {result}" |
|
|
|
elif tool_call.tool == ToolType.DETECT_OBJECTS: |
|
image_path = tool_call.parameters.get('image_path', '') |
|
result = self.toolkit.detect_objects(image_path) |
|
return f"🔍 Object detection: {result}" |
|
|
|
elif tool_call.tool == ToolType.FACE_RECOGNITION: |
|
image_path = tool_call.parameters.get('image_path', '') |
|
result = self.toolkit.analyze_image(image_path, "Identify the person in this image") |
|
return f"👤 Face recognition: {result}" |
|
|
|
elif tool_call.tool == ToolType.SCIENTIFIC_COMPUTE: |
|
operation = tool_call.parameters.get('operation', 'statistics') |
|
data = tool_call.parameters.get('data', {}) |
|
result = self.toolkit.scientific_compute(operation, data) |
|
return f"🧬 Scientific computation: {result}" |
|
|
|
else: |
|
return f"❌ Unknown tool: {tool_call.tool}" |
|
|
|
except Exception as e: |
|
error_msg = f"❌ Tool execution failed: {str(e)}" |
|
logger.error(error_msg) |
|
return error_msg |
|
|
|
def fast_qa_answer(self, question: str, context: str = "") -> str: |
|
"""🚀 Ultra-fast question answering using optimized models""" |
|
try: |
|
|
|
cache_key = hashlib.md5(f"{question}:{context}".encode()).hexdigest() |
|
if cache_key in self.qa_cache: |
|
logger.info("🚀 Cache hit - instant answer!") |
|
return self.qa_cache[cache_key] |
|
|
|
|
|
if "ultra_fast_qa" in self.clients: |
|
try: |
|
client_info = self.clients["ultra_fast_qa"] |
|
client = client_info["client"] |
|
|
|
|
|
if context: |
|
result = client.question_answering( |
|
question=question, |
|
context=context, |
|
model=client_info["model"] |
|
) |
|
answer = result.get("answer", "").strip() |
|
else: |
|
|
|
search_result = self.toolkit.web_search(question, num_results=2) |
|
result = client.question_answering( |
|
question=question, |
|
context=search_result[:500], |
|
model=client_info["model"] |
|
) |
|
answer = result.get("answer", "").strip() |
|
|
|
if answer: |
|
|
|
self.qa_cache[cache_key] = answer |
|
return answer |
|
|
|
except Exception as e: |
|
logger.warning(f"⚠️ Fast QA failed: {e}") |
|
|
|
|
|
return None |
|
|
|
except Exception as e: |
|
logger.error(f"❌ Fast QA error: {e}") |
|
return None |
|
|
|
def query_with_tools(self, question: str, model_name: str = None, max_iterations: int = 3) -> str: |
|
"""🧠 Enhanced query processing with SPEED-OPTIMIZED capabilities for 100% GAIA performance""" |
|
|
|
|
|
fast_answer = self.fast_qa_answer(question) |
|
if fast_answer: |
|
logger.info("⚡ Ultra-fast QA answer found!") |
|
return self._clean_final_answer(fast_answer) |
|
|
|
|
|
cache_key = hashlib.md5(question.encode()).hexdigest() |
|
if cache_key in self.response_cache: |
|
logger.info("🚀 Cache hit - instant answer!") |
|
return self.response_cache[cache_key] |
|
|
|
if not model_name: |
|
model_name = self.model_priority[0] |
|
|
|
logger.info(f"🧠 Processing question with {model_name}: {question[:100]}...") |
|
|
|
|
|
system_prompt = f"""You are an advanced AI agent optimized for the GAIA benchmark with access to powerful tools. |
|
|
|
🛠️ AVAILABLE TOOLS: |
|
- TOOL_CALL: web_search(query="search term") - Search the web for current information |
|
- TOOL_CALL: browse_url(url="https://example.com") - Browse and extract content from specific URLs |
|
- TOOL_CALL: download_file(task_id="123") - Download files from GAIA tasks or URLs |
|
- TOOL_CALL: read_pdf(file_path="document.pdf") - Read and extract text from PDF files |
|
- TOOL_CALL: analyze_image(image_path="image.jpg", question="what to analyze") - Analyze images with vision AI |
|
- TOOL_CALL: calculator(expression="2+2*3") - Perform mathematical calculations and scientific functions |
|
- TOOL_CALL: process_video(video_path="video.mp4", task="analyze") - Analyze video content |
|
- TOOL_CALL: analyze_audio(audio_path="audio.wav", task="analyze") - Analyze audio content |
|
- TOOL_CALL: generate_image(prompt="description", style="realistic") - Generate images from text descriptions |
|
- TOOL_CALL: synthesize_speech(text="Hello, world!", voice="default") - Convert text to speech |
|
- TOOL_CALL: create_visualization(data="chart_data", chart_type="bar") - Create data visualizations and charts |
|
- TOOL_CALL: analyze_data(data="statistical_data") - Perform scientific computations and analysis |
|
- TOOL_CALL: generate_video(video_path="output.mp4") - Generate videos from video content |
|
- TOOL_CALL: extract_audio(audio_path="audio.wav") - Extract audio from video content |
|
- TOOL_CALL: transcribe_speech(audio_path="audio.wav") - Convert speech to text |
|
- TOOL_CALL: detect_objects(image_path="image.jpg") - Detect and identify objects in images |
|
- TOOL_CALL: face_recognition(image_path="image.jpg") - Identify the person in images |
|
- TOOL_CALL: scientific_compute(operation="statistics", data="numerical_data") - Perform scientific computations and analysis |
|
|
|
🎯 GAIA BENCHMARK INSTRUCTIONS: |
|
1. For research questions, ALWAYS use web_search first to get current information |
|
2. If files are mentioned or task IDs given, use download_file then read_pdf/analyze_image |
|
3. For multi-step problems, break down systematically and use tools in logical order |
|
4. For image questions, use analyze_image with specific question about what to find |
|
5. CRITICAL: Provide DIRECT, CONCISE answers ONLY - no explanations or reasoning |
|
6. Format response as just the final answer - nothing else |
|
|
|
Question: {question} |
|
|
|
Think step by step about what tools you need, use them, then provide ONLY the final answer.""" |
|
|
|
conversation_history = [ |
|
{"role": "system", "content": system_prompt}, |
|
{"role": "user", "content": question} |
|
] |
|
|
|
|
|
for iteration in range(max_iterations): |
|
try: |
|
client_info = self.clients.get(model_name) |
|
if not client_info: |
|
logger.warning(f"⚠️ Model {model_name} unavailable, using fallback") |
|
return self._fallback_response(question) |
|
|
|
|
|
if model_name == "fallback_basic": |
|
logger.info("🛡️ Using local fallback processing") |
|
return self._fallback_response(question) |
|
|
|
|
|
if "openai" in model_name: |
|
response = client_info["client"].chat.completions.create( |
|
model=client_info["model"], |
|
messages=conversation_history, |
|
max_tokens=1500, |
|
temperature=0.0 |
|
) |
|
ai_response = response.choices[0].message.content |
|
elif model_name == "fireworks_qwen3_235b": |
|
|
|
response = client_info["client"].chat.completions.create( |
|
model=client_info["model"], |
|
messages=conversation_history, |
|
max_tokens=1500, |
|
temperature=0.0 |
|
) |
|
ai_response = response.choices[0].message.content |
|
else: |
|
response = client_info["client"].chat_completion( |
|
messages=conversation_history, |
|
max_tokens=1500, |
|
temperature=0.0 |
|
) |
|
ai_response = response.choices[0].message.content |
|
|
|
|
|
ai_response = self._remove_thinking_process(ai_response) |
|
|
|
logger.info(f"🤖 AI Response (iteration {iteration + 1}): {ai_response[:200]}...") |
|
|
|
|
|
tool_calls = self.parse_tool_calls(ai_response) |
|
|
|
if tool_calls: |
|
|
|
tool_results = [] |
|
for tool_call in tool_calls: |
|
result = self.execute_tool_call(tool_call) |
|
tool_results.append(f"Tool {tool_call.tool.value}: {result}") |
|
|
|
|
|
conversation_history.append({"role": "assistant", "content": ai_response}) |
|
|
|
tool_context = f"TOOL RESULTS:\n" + "\n\n".join(tool_results) |
|
tool_context += f"\n\nBased on these tool results, provide the final answer to: {question}\nProvide ONLY the direct answer - no explanations:" |
|
|
|
conversation_history.append({"role": "user", "content": tool_context}) |
|
|
|
logger.info(f"🔧 Executed {len(tool_calls)} tools, continuing to iteration {iteration + 2}") |
|
|
|
else: |
|
|
|
final_answer = self._extract_final_answer(ai_response) |
|
logger.info(f"✅ Final answer extracted: {final_answer}") |
|
return final_answer |
|
|
|
except Exception as e: |
|
logger.error(f"❌ Query iteration {iteration + 1} failed for {model_name}: {e}") |
|
|
|
|
|
current_index = self.model_priority.index(model_name) if model_name in self.model_priority else 0 |
|
if current_index + 1 < len(self.model_priority): |
|
model_name = self.model_priority[current_index + 1] |
|
logger.info(f"🔄 Switching to model: {model_name}") |
|
else: |
|
break |
|
|
|
|
|
if len(conversation_history) > 2: |
|
try: |
|
client_info = self.clients.get(model_name) |
|
if client_info: |
|
if "openai" in model_name: |
|
final_response = client_info["client"].chat.completions.create( |
|
model=client_info["model"], |
|
messages=conversation_history, |
|
max_tokens=300, |
|
temperature=0.0 |
|
) |
|
final_answer = final_response.choices[0].message.content |
|
else: |
|
final_response = client_info["client"].chat_completion( |
|
messages=conversation_history, |
|
max_tokens=300, |
|
temperature=0.0 |
|
) |
|
final_answer = final_response.choices[0].message.content |
|
|
|
return self._extract_final_answer(final_answer) |
|
except Exception as e: |
|
logger.error(f"❌ Final answer extraction failed: {e}") |
|
|
|
|
|
logger.warning(f"⚠️ Using fallback response for: {question}") |
|
return self._fallback_response(question) |
|
|
|
def _extract_final_answer(self, response: str) -> str: |
|
"""✨ Ultra-aggressive answer extraction for perfect GAIA compliance""" |
|
if not response: |
|
return "Unknown" |
|
|
|
logger.info(f"✨ Extracting final answer from: {response[:100]}...") |
|
|
|
|
|
response = re.sub(r'TOOL_CALL:.*?\n', '', response, flags=re.DOTALL) |
|
response = re.sub(r'<tool>.*?</tool>', '', response, flags=re.DOTALL | re.IGNORECASE) |
|
response = re.sub(r'<params>.*?</params>', '', response, flags=re.DOTALL | re.IGNORECASE) |
|
|
|
|
|
response = re.sub(r'<think>.*?</think>', '', response, flags=re.DOTALL | re.IGNORECASE) |
|
response = re.sub(r'\*\*Think\*\*.*?\*\*Answer\*\*', '', response, flags=re.DOTALL | re.IGNORECASE) |
|
|
|
|
|
reasoning_patterns = [ |
|
r'let me.*?[.!?]\s*', |
|
r'i need to.*?[.!?]\s*', |
|
r'first,?\s*i.*?[.!?]\s*', |
|
r'to solve this.*?[.!?]\s*', |
|
r'based on.*?[,.]?\s*', |
|
r'the answer is[:\s]*', |
|
r'therefore[,:\s]*', |
|
r'so[,:\s]*the answer[,:\s]*', |
|
r'thus[,:\s]*', |
|
r'in conclusion[,:\s]*', |
|
r'after.*?analysis[,:\s]*', |
|
r'from.*?search[,:\s]*' |
|
] |
|
|
|
for pattern in reasoning_patterns: |
|
response = re.sub(pattern, '', response, flags=re.IGNORECASE) |
|
|
|
|
|
answer_patterns = [ |
|
r'(?:answer|result)[:\s]*([^\n.!?]+)', |
|
r'(?:final|conclusion)[:\s]*([^\n.!?]+)', |
|
r'^([A-Z][^.!?]*)', |
|
r'(\d+(?:\.\d+)?)', |
|
r'([A-Z][a-z]+(?:\s+[A-Z][a-z]+)?)' |
|
] |
|
|
|
for pattern in answer_patterns: |
|
match = re.search(pattern, response, re.IGNORECASE) |
|
if match: |
|
answer = match.group(1).strip() |
|
if len(answer) > 2: |
|
return self._clean_final_answer(answer) |
|
|
|
|
|
lines = [line.strip() for line in response.split('\n') if line.strip()] |
|
if lines: |
|
|
|
for line in reversed(lines): |
|
if len(line) > 2 and not any(word in line.lower() for word in ['tool', 'search', 'analysis', 'extract']): |
|
return self._clean_final_answer(line) |
|
|
|
|
|
return self._clean_final_answer(response.strip()) |
|
|
|
def _remove_thinking_process(self, response: str) -> str: |
|
"""🧠 Remove thinking process from responses to ensure only final answers""" |
|
try: |
|
|
|
thinking_patterns = [ |
|
r'<thinking>.*?</thinking>', |
|
r'<reasoning>.*?</reasoning>', |
|
r'<analysis>.*?</analysis>', |
|
r'Let me think.*?(?=\n\n|\.|$)', |
|
r'I need to.*?(?=\n\n|\.|$)', |
|
r'First, I.*?(?=\n\n|\.|$)', |
|
r'Step \d+:.*?(?=\n|\.|$)', |
|
r'Thinking step by step.*?(?=\n\n|\.|$)', |
|
r'^.*?Let me analyze.*?(?=\n\n)', |
|
r'^.*?I should.*?(?=\n\n)', |
|
r'To solve this.*?(?=\n\n)', |
|
] |
|
|
|
cleaned = response |
|
for pattern in thinking_patterns: |
|
cleaned = re.sub(pattern, '', cleaned, flags=re.DOTALL | re.IGNORECASE) |
|
|
|
|
|
cleaned = re.sub(r'\n\s*\n', '\n', cleaned).strip() |
|
|
|
|
|
if any(cleaned.lower().startswith(word) for word in ['let me', 'first', 'i need to', 'to solve', 'thinking']): |
|
|
|
final_patterns = [ |
|
r'(?:the answer is|answer:|final answer:|therefore|so|thus|hence)[:\s]*(.+?)(?:\.|$)', |
|
r'(?:^|\n)([^.\n]+?)(?:\.|$)' |
|
] |
|
|
|
for pattern in final_patterns: |
|
match = re.search(pattern, cleaned, re.IGNORECASE | re.MULTILINE) |
|
if match: |
|
potential_answer = match.group(1).strip() |
|
if potential_answer and len(potential_answer) < 200: |
|
return potential_answer |
|
|
|
return cleaned |
|
|
|
except Exception as e: |
|
logger.warning(f"⚠️ Error removing thinking process: {e}") |
|
return response |
|
|
|
def _clean_final_answer(self, answer: str) -> str: |
|
"""🧹 Enhanced answer cleaning that preserves meaning and completeness""" |
|
if not answer: |
|
return "Unable to determine answer" |
|
|
|
|
|
answer = answer.strip() |
|
|
|
|
|
broken_patterns = [ |
|
r'^s,?\s*$', |
|
r'^s\s+\w+$', |
|
r'^(think|right|Unable to)$', |
|
r'^Jagged$', |
|
] |
|
|
|
|
|
if answer.isdigit() or answer.replace('.', '').replace('-', '').isdigit(): |
|
|
|
pass |
|
elif len(answer) == 1 and answer.isalpha(): |
|
|
|
pass |
|
else: |
|
|
|
for pattern in broken_patterns: |
|
if re.match(pattern, answer, re.IGNORECASE): |
|
return "Unable to provide complete answer" |
|
|
|
|
|
prefixes = ['answer:', 'result:', 'final:', 'conclusion:', 'the answer is', 'it is', 'this is'] |
|
for prefix in prefixes: |
|
if answer.lower().startswith(prefix): |
|
answer = answer[len(prefix):].strip() |
|
|
|
|
|
answer = re.sub(r'^TOOL_CALL:.*$', '', answer, flags=re.MULTILINE) |
|
answer = re.sub(r'from \d+ tool calls?', '', answer) |
|
|
|
|
|
answer = re.sub(r'\s+', ' ', answer).strip() |
|
|
|
|
|
if (answer.startswith('"') and answer.endswith('"')) or (answer.startswith("'") and answer.endswith("'")): |
|
answer = answer[1:-1] |
|
|
|
|
|
if len(answer) < 1: |
|
return "Unable to provide complete answer" |
|
elif len(answer) == 1: |
|
|
|
if answer.isdigit() or answer.isupper(): |
|
return answer.strip() |
|
else: |
|
return "Unable to provide complete answer" |
|
|
|
return answer.strip() |
|
|
|
def _fallback_response(self, question: str) -> str: |
|
"""🛡️ Enhanced fallback responses optimized for GAIA benchmark""" |
|
question_lower = question.lower() |
|
logger.info(f"🛡️ Using enhanced fallback for: {question[:50]}...") |
|
|
|
|
|
if any(word in question_lower for word in ['calculate', 'compute', 'math', '+', '-', '*', '/', 'sum', 'product']): |
|
numbers = re.findall(r'-?\d+(?:\.\d+)?', question) |
|
if len(numbers) >= 2: |
|
try: |
|
a, b = float(numbers[0]), float(numbers[1]) |
|
if '+' in question or 'add' in question_lower or 'sum' in question_lower: |
|
return str(int(a + b) if (a + b).is_integer() else a + b) |
|
elif '-' in question or 'subtract' in question_lower or 'minus' in question_lower: |
|
return str(int(a - b) if (a - b).is_integer() else a - b) |
|
elif '*' in question or 'multiply' in question_lower or 'times' in question_lower or 'product' in question_lower: |
|
return str(int(a * b) if (a * b).is_integer() else a * b) |
|
elif '/' in question or 'divide' in question_lower: |
|
return str(int(a / b) if (a / b).is_integer() else round(a / b, 6)) |
|
except: |
|
pass |
|
|
|
|
|
if any(word in question_lower for word in ['capital', 'country', 'city']): |
|
capitals = { |
|
'france': 'Paris', 'germany': 'Berlin', 'italy': 'Rome', 'spain': 'Madrid', |
|
'japan': 'Tokyo', 'china': 'Beijing', 'usa': 'Washington D.C.', 'united states': 'Washington D.C.', |
|
'uk': 'London', 'united kingdom': 'London', 'canada': 'Ottawa', 'australia': 'Canberra', |
|
'brazil': 'Brasília', 'india': 'New Delhi', 'russia': 'Moscow', 'mexico': 'Mexico City' |
|
} |
|
for country, capital in capitals.items(): |
|
if country in question_lower: |
|
return capital |
|
|
|
|
|
if 'president' in question_lower: |
|
if any(country in question_lower for country in ['united states', 'usa', 'america']): |
|
return 'Joe Biden' |
|
elif 'france' in question_lower: |
|
return 'Emmanuel Macron' |
|
elif 'russia' in question_lower: |
|
return 'Vladimir Putin' |
|
|
|
|
|
if 'how many' in question_lower: |
|
counting_map = { |
|
'planets': '8', 'continents': '7', 'days in year': '365', 'days in week': '7', |
|
'months': '12', 'seasons': '4', 'oceans': '5', 'great lakes': '5' |
|
} |
|
for item, count in counting_map.items(): |
|
if item in question_lower: |
|
return count |
|
|
|
|
|
if 'chemical formula' in question_lower or 'formula' in question_lower: |
|
formulas = { |
|
'water': 'H2O', 'carbon dioxide': 'CO2', 'methane': 'CH4', 'ammonia': 'NH3', |
|
'salt': 'NaCl', 'sugar': 'C12H22O11', 'alcohol': 'C2H5OH', 'oxygen': 'O2' |
|
} |
|
for compound, formula in formulas.items(): |
|
if compound in question_lower: |
|
return formula |
|
|
|
|
|
if any(word in question_lower for word in ['meter', 'kilogram', 'second', 'celsius', 'fahrenheit']): |
|
if 'freezing point' in question_lower and 'water' in question_lower: |
|
if 'celsius' in question_lower: |
|
return '0' |
|
elif 'fahrenheit' in question_lower: |
|
return '32' |
|
|
|
|
|
if 'color' in question_lower or 'colour' in question_lower: |
|
if 'sun' in question_lower: |
|
return 'yellow' |
|
elif 'grass' in question_lower: |
|
return 'green' |
|
elif 'sky' in question_lower: |
|
return 'blue' |
|
|
|
|
|
if any(word in question_lower for word in ['when', 'where', 'who', 'what', 'which', 'how']): |
|
return "Information not available without web search" |
|
|
|
|
|
return "Unable to determine answer without additional tools" |
|
|
|
def cleanup(self): |
|
"""🧹 Cleanup temporary resources""" |
|
pass |
|
|
|
|
|
class MultiModelGAIASystem(EnhancedMultiModelGAIASystem): |
|
"""Alias for backward compatibility""" |
|
pass |
|
|
|
def create_gaia_system(hf_token: str = None, openai_key: str = None) -> EnhancedMultiModelGAIASystem: |
|
"""🚀 Create an enhanced GAIA system with all advanced capabilities""" |
|
return EnhancedMultiModelGAIASystem(hf_token=hf_token, openai_key=openai_key) |
|
|
|
class BasicAgent: |
|
"""🤖 GAIA-compatible agent interface with comprehensive tool calling""" |
|
|
|
def __init__(self, hf_token: str = None, openai_key: str = None): |
|
self.system = create_gaia_system(hf_token, openai_key) |
|
logger.info("🤖 BasicAgent with enhanced GAIA capabilities initialized") |
|
|
|
def query(self, question: str) -> str: |
|
"""Process GAIA question with full tool calling support""" |
|
try: |
|
result = self.system.query_with_tools(question) |
|
return result |
|
except Exception as e: |
|
logger.error(f"❌ Agent query failed: {e}") |
|
return self.system._fallback_response(question) |
|
|
|
def clean_for_api_submission(self, response: str) -> str: |
|
"""Clean response for GAIA API submission""" |
|
return self.system._extract_final_answer(response) |
|
|
|
def __call__(self, question: str) -> str: |
|
"""Callable interface for backward compatibility""" |
|
return self.query(question) |
|
|
|
def cleanup(self): |
|
"""Cleanup resources""" |
|
self.system.cleanup() |
|
|
|
|
|
def test_enhanced_gaia_system(): |
|
"""🧪 Test the enhanced GAIA system with tool calling""" |
|
print("🧪 Testing Enhanced GAIA System with Tool Calling") |
|
|
|
|
|
agent = BasicAgent() |
|
|
|
|
|
test_questions = [ |
|
"What is 15 + 27?", |
|
"What is the capital of France?", |
|
"Search for the current weather in Paris", |
|
"How many planets are in our solar system?", |
|
"What is 2 * 3 + 4?", |
|
] |
|
|
|
print("\n" + "="*50) |
|
print("🎯 ENHANCED GAIA COMPLIANCE TEST") |
|
print("="*50) |
|
|
|
for question in test_questions: |
|
print(f"\nQ: {question}") |
|
response = agent.query(question) |
|
print(f"A: {response}") |
|
|
|
|
|
agent.cleanup() |
|
print("\n✅ Enhanced GAIA system test complete!") |
|
|
|
if __name__ == "__main__": |
|
test_enhanced_gaia_system() |