|
import os |
|
import gradio as gr |
|
import requests |
|
import inspect |
|
import time |
|
import pandas as pd |
|
from smolagents import DuckDuckGoSearchTool |
|
import threading |
|
from typing import Dict, List, Optional, Tuple, Union |
|
import json |
|
from huggingface_hub import InferenceClient |
|
import base64 |
|
from PIL import Image |
|
import io |
|
import tempfile |
|
import urllib.parse |
|
from pathlib import Path |
|
import re |
|
from bs4 import BeautifulSoup |
|
import mimetypes |
|
|
|
|
|
DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space" |
|
|
|
|
|
cached_answers = {} |
|
cached_questions = [] |
|
processing_status = {"is_processing": False, "progress": 0, "total": 0} |
|
|
|
|
|
class WebContentFetcher: |
|
def __init__(self, debug: bool = True): |
|
self.debug = debug |
|
self.session = requests.Session() |
|
self.session.headers.update({ |
|
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' |
|
}) |
|
|
|
def extract_urls_from_text(self, text: str) -> List[str]: |
|
"""Extract URLs from text using regex.""" |
|
url_pattern = r'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+' |
|
urls = re.findall(url_pattern, text) |
|
return list(set(urls)) |
|
|
|
def fetch_url_content(self, url: str) -> Dict[str, str]: |
|
""" |
|
Fetch content from a URL and extract text, handling different content types. |
|
Returns a dictionary with 'content', 'title', 'content_type', and 'error' keys. |
|
""" |
|
try: |
|
|
|
url = url.strip() |
|
if not url.startswith(('http://', 'https://')): |
|
url = 'https://' + url |
|
|
|
if self.debug: |
|
print(f"Fetching URL: {url}") |
|
|
|
response = self.session.get(url, timeout=30, allow_redirects=True) |
|
response.raise_for_status() |
|
|
|
content_type = response.headers.get('content-type', '').lower() |
|
|
|
result = { |
|
'url': url, |
|
'content_type': content_type, |
|
'title': '', |
|
'content': '', |
|
'error': None |
|
} |
|
|
|
|
|
if 'text/html' in content_type: |
|
|
|
soup = BeautifulSoup(response.content, 'html.parser') |
|
|
|
|
|
title_tag = soup.find('title') |
|
result['title'] = title_tag.get_text().strip() if title_tag else 'No title' |
|
|
|
|
|
for script in soup(["script", "style"]): |
|
script.decompose() |
|
|
|
|
|
text_content = soup.get_text() |
|
|
|
|
|
lines = (line.strip() for line in text_content.splitlines()) |
|
chunks = (phrase.strip() for line in lines for phrase in line.split(" ")) |
|
text_content = ' '.join(chunk for chunk in chunks if chunk) |
|
|
|
|
|
if len(text_content) > 8000: |
|
text_content = text_content[:8000] + "... (truncated)" |
|
|
|
result['content'] = text_content |
|
|
|
elif 'text/plain' in content_type: |
|
|
|
text_content = response.text |
|
if len(text_content) > 8000: |
|
text_content = text_content[:8000] + "... (truncated)" |
|
result['content'] = text_content |
|
result['title'] = f"Text document from {url}" |
|
|
|
elif 'application/json' in content_type: |
|
|
|
try: |
|
json_data = response.json() |
|
result['content'] = json.dumps(json_data, indent=2)[:8000] |
|
result['title'] = f"JSON document from {url}" |
|
except: |
|
result['content'] = response.text[:8000] |
|
result['title'] = f"JSON document from {url}" |
|
|
|
elif any(x in content_type for x in ['application/pdf', 'application/msword', 'application/vnd.openxmlformats']): |
|
|
|
result['content'] = f"Document file detected ({content_type}). Content extraction for this file type is not implemented." |
|
result['title'] = f"Document from {url}" |
|
|
|
else: |
|
|
|
if response.text: |
|
content = response.text[:8000] |
|
result['content'] = content |
|
result['title'] = f"Content from {url}" |
|
else: |
|
result['content'] = f"Non-text content detected ({content_type})" |
|
result['title'] = f"File from {url}" |
|
|
|
if self.debug: |
|
print(f"Successfully fetched content from {url}: {len(result['content'])} characters") |
|
|
|
return result |
|
|
|
except requests.exceptions.RequestException as e: |
|
error_msg = f"Failed to fetch {url}: {str(e)}" |
|
if self.debug: |
|
print(error_msg) |
|
return { |
|
'url': url, |
|
'content_type': 'error', |
|
'title': f"Error fetching {url}", |
|
'content': '', |
|
'error': error_msg |
|
} |
|
except Exception as e: |
|
error_msg = f"Unexpected error fetching {url}: {str(e)}" |
|
if self.debug: |
|
print(error_msg) |
|
return { |
|
'url': url, |
|
'content_type': 'error', |
|
'title': f"Error fetching {url}", |
|
'content': '', |
|
'error': error_msg |
|
} |
|
|
|
def fetch_multiple_urls(self, urls: List[str]) -> List[Dict[str, str]]: |
|
"""Fetch content from multiple URLs.""" |
|
results = [] |
|
for url in urls[:5]: |
|
result = self.fetch_url_content(url) |
|
results.append(result) |
|
time.sleep(1) |
|
return results |
|
|
|
|
|
def save_attachment_to_file(attachment_data: Union[str, bytes, dict], temp_dir: str, file_name: str = None) -> Optional[str]: |
|
""" |
|
Save attachment data to a temporary file. |
|
Returns the local file path if successful, None otherwise. |
|
""" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
try: |
|
|
|
if not file_name: |
|
file_name = f"attachment_{int(time.time())}" |
|
|
|
|
|
if isinstance(attachment_data, dict): |
|
|
|
if 'data' in attachment_data: |
|
file_data = attachment_data['data'] |
|
file_type = attachment_data.get('type', '').lower() |
|
original_name = attachment_data.get('name', file_name) |
|
elif 'content' in attachment_data: |
|
file_data = attachment_data['content'] |
|
file_type = attachment_data.get('mime_type', '').lower() |
|
original_name = attachment_data.get('filename', file_name) |
|
else: |
|
|
|
file_data = str(attachment_data) |
|
file_type = '' |
|
original_name = file_name |
|
|
|
|
|
if original_name and original_name != file_name: |
|
file_name = original_name |
|
|
|
elif isinstance(attachment_data, str): |
|
|
|
file_data = attachment_data |
|
file_type = '' |
|
|
|
elif isinstance(attachment_data, bytes): |
|
|
|
file_data = attachment_data |
|
file_type = '' |
|
|
|
else: |
|
print(f"Unknown attachment data type: {type(attachment_data)}") |
|
return None |
|
|
|
|
|
if '.' not in file_name: |
|
|
|
if 'image' in file_type: |
|
if 'jpeg' in file_type or 'jpg' in file_type: |
|
file_name += '.jpg' |
|
elif 'png' in file_type: |
|
file_name += '.png' |
|
else: |
|
file_name += '.img' |
|
elif 'audio' in file_type: |
|
if 'mp3' in file_type: |
|
file_name += '.mp3' |
|
elif 'wav' in file_type: |
|
file_name += '.wav' |
|
else: |
|
file_name += '.audio' |
|
elif 'python' in file_type or 'text' in file_type: |
|
file_name += '.py' |
|
else: |
|
file_name += '.file' |
|
|
|
file_path = os.path.join(temp_dir, file_name) |
|
|
|
|
|
if isinstance(file_data, str): |
|
|
|
try: |
|
|
|
if len(file_data) > 100 and '=' in file_data[-5:]: |
|
decoded_data = base64.b64decode(file_data) |
|
with open(file_path, 'wb') as f: |
|
f.write(decoded_data) |
|
else: |
|
|
|
with open(file_path, 'w', encoding='utf-8') as f: |
|
f.write(file_data) |
|
except: |
|
|
|
with open(file_path, 'w', encoding='utf-8') as f: |
|
f.write(file_data) |
|
else: |
|
|
|
with open(file_path, 'wb') as f: |
|
f.write(file_data) |
|
|
|
print(f"Saved attachment: {file_path}") |
|
return file_path |
|
|
|
except Exception as e: |
|
print(f"Failed to save attachment: {e}") |
|
return None |
|
|
|
|
|
|
|
|
|
class CodeAnalysisTool: |
|
def __init__(self, model_name: str = "meta-llama/Llama-3.1-8B-Instruct"): |
|
self.client = InferenceClient(model=model_name, provider="sambanova") |
|
|
|
def analyze_code(self, code_path: str) -> str: |
|
""" |
|
Analyze Python code and return insights. |
|
""" |
|
try: |
|
with open(code_path, 'r', encoding='utf-8') as f: |
|
code_content = f.read() |
|
|
|
|
|
if len(code_content) > 5000: |
|
code_content = code_content[:5000] + "\n... (truncated)" |
|
|
|
analysis_prompt = f"""Analyze this Python code and provide a concise summary of: |
|
1. What the code does (main functionality) |
|
2. Key functions/classes |
|
3. Any notable patterns or issues |
|
4. Input/output behavior if applicable |
|
|
|
Code: |
|
```python |
|
{code_content} |
|
``` |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Provide a brief, focused analysis:""" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
messages = [{"role": "user", "content": analysis_prompt}] |
|
response = self.client.chat_completion( |
|
messages=messages, |
|
max_tokens=500, |
|
temperature=0.3 |
|
) |
|
|
|
return response.choices[0].message.content.strip() |
|
|
|
except Exception as e: |
|
return f"Code analysis failed: {e}" |
|
|
|
|
|
class ImageAnalysisTool: |
|
def __init__(self, model_name: str = "microsoft/Florence-2-large"): |
|
self.client = InferenceClient(model=model_name) |
|
|
|
def analyze_image(self, image_path: str, prompt: str = "Describe this image in detail") -> str: |
|
""" |
|
Analyze an image and return a description. |
|
""" |
|
try: |
|
|
|
with open(image_path, "rb") as f: |
|
image_bytes = f.read() |
|
|
|
|
|
response = self.client.image_to_text( |
|
image=image_bytes, |
|
model="microsoft/Florence-2-large" |
|
) |
|
|
|
return response.get("generated_text", "Could not analyze image") |
|
|
|
except Exception as e: |
|
try: |
|
|
|
response = self.client.image_to_text( |
|
image=image_bytes, |
|
model="Salesforce/blip-image-captioning-large" |
|
) |
|
return response.get("generated_text", f"Image analysis error: {e}") |
|
except: |
|
return f"Image analysis failed: {e}" |
|
|
|
def extract_text_from_image(self, image_path: str) -> str: |
|
""" |
|
Extract text from an image using OCR. |
|
""" |
|
try: |
|
with open(image_path, "rb") as f: |
|
image_bytes = f.read() |
|
|
|
|
|
response = self.client.image_to_text( |
|
image=image_bytes, |
|
model="microsoft/trocr-base-printed" |
|
) |
|
|
|
return response.get("generated_text", "No text found in image") |
|
|
|
except Exception as e: |
|
return f"OCR failed: {e}" |
|
|
|
|
|
class AudioTranscriptionTool: |
|
def __init__(self, model_name: str = "openai/whisper-large-v3"): |
|
self.client = InferenceClient(model=model_name) |
|
|
|
def transcribe_audio(self, audio_path: str) -> str: |
|
""" |
|
Transcribe audio file to text. |
|
""" |
|
try: |
|
with open(audio_path, "rb") as f: |
|
audio_bytes = f.read() |
|
|
|
|
|
response = self.client.automatic_speech_recognition( |
|
audio=audio_bytes |
|
) |
|
|
|
return response.get("text", "Could not transcribe audio") |
|
|
|
except Exception as e: |
|
try: |
|
|
|
response = self.client.automatic_speech_recognition( |
|
audio=audio_bytes, |
|
model="facebook/wav2vec2-large-960h-lv60-self" |
|
) |
|
return response.get("text", f"Audio transcription error: {e}") |
|
except: |
|
return f"Audio transcription failed: {e}" |
|
|
|
|
|
class IntelligentAgent: |
|
def __init__(self, debug: bool = True, model_name: str = "meta-llama/Llama-3.1-8B-Instruct"): |
|
self.search = DuckDuckGoSearchTool() |
|
self.client = InferenceClient(model=model_name, provider="sambanova") |
|
self.image_tool = ImageAnalysisTool() |
|
self.audio_tool = AudioTranscriptionTool() |
|
self.code_tool = CodeAnalysisTool(model_name) |
|
self.web_fetcher = WebContentFetcher(debug) |
|
self.debug = debug |
|
if self.debug: |
|
print(f"IntelligentAgent initialized with model: {model_name}") |
|
|
|
def _chat_completion(self, prompt: str, max_tokens: int = 500, temperature: float = 0.3) -> str: |
|
""" |
|
Use chat completion instead of text generation to avoid provider compatibility issues. |
|
""" |
|
try: |
|
messages = [{"role": "user", "content": prompt}] |
|
|
|
|
|
try: |
|
response = self.client.chat_completion( |
|
messages=messages, |
|
max_tokens=max_tokens, |
|
temperature=temperature |
|
) |
|
return response.choices[0].message.content.strip() |
|
except Exception as chat_error: |
|
if self.debug: |
|
print(f"Chat completion failed: {chat_error}, trying text generation...") |
|
|
|
|
|
response = self.client.conversational( |
|
prompt, |
|
max_new_tokens=max_tokens, |
|
temperature=temperature, |
|
do_sample=temperature > 0 |
|
) |
|
return response.strip() |
|
|
|
except Exception as e: |
|
if self.debug: |
|
print(f"Both chat completion and text generation failed: {e}") |
|
raise e |
|
|
|
def _extract_and_process_urls(self, question_text: str) -> str: |
|
""" |
|
Extract URLs from question text and fetch their content. |
|
Returns formatted content from all URLs. |
|
""" |
|
urls = self.web_fetcher.extract_urls_from_text(question_text) |
|
|
|
if not urls: |
|
return "" |
|
|
|
if self.debug: |
|
print(f"...Found {len(urls)} URLs in question: {urls}") |
|
|
|
url_contents = self.web_fetcher.fetch_multiple_urls(urls) |
|
|
|
if not url_contents: |
|
return "" |
|
|
|
|
|
formatted_content = [] |
|
for content_data in url_contents: |
|
if content_data['error']: |
|
formatted_content.append(f"URL: {content_data['url']}\nError: {content_data['error']}") |
|
else: |
|
formatted_content.append( |
|
f"URL: {content_data['url']}\n" |
|
f"Title: {content_data['title']}\n" |
|
f"Content Type: {content_data['content_type']}\n" |
|
f"Content: {content_data['content']}" |
|
) |
|
|
|
return "\n\n" + "="*50 + "\n".join(formatted_content) + "\n" + "="*50 |
|
|
|
def _detect_and_process_direct_attachments(self, file_name: str) -> Tuple[List[str], List[str], List[str]]: |
|
""" |
|
Detect and process a single attachment directly attached to a question (not as a URL). |
|
Returns (image_files, audio_files, code_files) |
|
""" |
|
image_files = [] |
|
audio_files = [] |
|
code_files = [] |
|
|
|
if not file_name: |
|
return image_files, audio_files, code_files |
|
|
|
try: |
|
|
|
file_path = os.path.join(os.getcwd(), file_name) |
|
|
|
|
|
if not os.path.exists(file_path): |
|
if self.debug: |
|
print(f"File not found: {file_path}") |
|
return image_files, audio_files, code_files |
|
|
|
|
|
file_ext = Path(file_name).suffix.lower() |
|
|
|
|
|
is_image = ( |
|
file_ext in ['.jpg', '.jpeg', '.png', '.gif', '.bmp', '.webp', '.tiff'] |
|
) |
|
is_audio = ( |
|
file_ext in ['.mp3', '.wav', '.m4a', '.ogg', '.flac', '.aac'] |
|
) |
|
is_code = ( |
|
file_ext in ['.py', '.txt', '.js', '.html', '.css', '.json', '.xml', '.md', '.c', '.cpp', '.java'] |
|
) |
|
|
|
|
|
if is_image: |
|
image_files.append(file_path) |
|
elif is_audio: |
|
audio_files.append(file_path) |
|
elif is_code: |
|
code_files.append(file_path) |
|
else: |
|
|
|
code_files.append(file_path) |
|
|
|
if self.debug: |
|
print(f"Processed file: {file_name} -> {'image' if is_image else 'audio' if is_audio else 'code'}") |
|
|
|
except Exception as e: |
|
if self.debug: |
|
print(f"Error processing attachment {file_name}: {e}") |
|
|
|
if self.debug: |
|
print(f"Processed attachment: {len(image_files)} images, {len(audio_files)} audio, {len(code_files)} code files") |
|
|
|
return image_files, audio_files, code_files |
|
|
|
def process_question_with_attachments(self, question_data: dict) -> str: |
|
""" |
|
Process a question that may have attachments and URLs. |
|
""" |
|
question_text = question_data.get('question', '') |
|
if self.debug: |
|
print(f"Question data keys: {list(question_data.keys())}") |
|
print(f"\n1. Processing question with potential attachments and URLs: {question_text[:300]}...") |
|
|
|
try: |
|
|
|
if self.debug: |
|
print(f"2. Detecting and processing URLs...") |
|
|
|
url_context = self._extract_and_process_urls(question_text) |
|
|
|
if self.debug and url_context: |
|
print(f"URL context found: {len(url_context)} characters") |
|
except Exception as e: |
|
if self.debug: |
|
print(f"Error extracting URLs: {e}") |
|
url_context = "" |
|
|
|
try: |
|
|
|
if self.debug: |
|
print(f"3. Searching for images, audio or code attachments...") |
|
|
|
attachment_name = question_data.get('file_name', '') |
|
if self.debug: |
|
print(f"Attachment name from question_data: '{attachment_name}'") |
|
|
|
image_files, audio_files, code_files = self._detect_and_process_direct_attachments(attachment_name) |
|
|
|
|
|
attachment_context = self._process_attachments(image_files, audio_files, code_files) |
|
|
|
if self.debug and attachment_context: |
|
print(f"Attachment context: {attachment_context[:200]}...") |
|
|
|
|
|
if self._should_search(question_text, attachment_context, url_context): |
|
if self.debug: |
|
print("5. Using search-based approach") |
|
answer = self._answer_with_search(question_text, attachment_context, url_context) |
|
else: |
|
if self.debug: |
|
print("5. Using LLM-only approach") |
|
answer = self._answer_with_llm(question_text, attachment_context, url_context) |
|
if self.debug: |
|
print(f"LLM answer: {answer}") |
|
|
|
|
|
|
|
|
|
except Exception as e: |
|
if self.debug: |
|
print(f"Error in attachment processing: {e}") |
|
answer = f"Sorry, I encountered an error: {e}" |
|
|
|
if self.debug: |
|
print(f"6. Agent returning answer: {answer[:100]}...") |
|
return answer |
|
def fetch_questions() -> Tuple[str, Optional[pd.DataFrame]]: |
|
""" |
|
Fetch questions from the API and cache them. |
|
""" |
|
global cached_questions |
|
|
|
api_url = DEFAULT_API_URL |
|
questions_url = f"{api_url}/questions" |
|
|
|
print(f"Fetching questions from: {questions_url}") |
|
try: |
|
response = requests.get(questions_url, timeout=15) |
|
response.raise_for_status() |
|
questions_data = response.json() |
|
|
|
if not questions_data: |
|
return "Fetched questions list is empty.", None |
|
|
|
cached_questions = questions_data |
|
|
|
|
|
display_data = [] |
|
for item in questions_data: |
|
|
|
has_attachments = False |
|
attachment_info = "" |
|
|
|
|
|
attachment_fields = ['attachments', 'files', 'media', 'resources'] |
|
for field in attachment_fields: |
|
if field in item and item[field]: |
|
has_attachments = True |
|
if isinstance(item[field], list): |
|
attachment_info += f"{len(item[field])} {field}, " |
|
else: |
|
attachment_info += f"{field}, " |
|
|
|
|
|
question_text = item.get("question", "") |
|
if 'http' in question_text: |
|
has_attachments = True |
|
attachment_info += "URLs in text, " |
|
|
|
if attachment_info: |
|
attachment_info = attachment_info.rstrip(", ") |
|
|
|
display_data.append({ |
|
"Task ID": item.get("task_id", "Unknown"), |
|
"Question": question_text[:100] + "..." if len(question_text) > 100 else question_text, |
|
"Has Attachments": "Yes" if has_attachments else "No", |
|
"Attachment Info": attachment_info |
|
}) |
|
|
|
df = pd.DataFrame(display_data) |
|
|
|
attachment_count = sum(1 for item in display_data if item["Has Attachments"] == "Yes") |
|
status_msg = f"Successfully fetched {len(questions_data)} questions. {attachment_count} questions have attachments. Ready to generate answers." |
|
|
|
return status_msg, df |
|
|
|
except requests.exceptions.RequestException as e: |
|
return f"Error fetching questions: {e}", None |
|
except Exception as e: |
|
return f"An unexpected error occurred: {e}", None |
|
|
|
def generate_answers_async(model_name: str = "meta-llama/Llama-3.1-8B-Instruct", progress_callback=None): |
|
""" |
|
Generate answers for all cached questions asynchronously using the intelligent agent. |
|
""" |
|
global cached_answers, processing_status |
|
|
|
if not cached_questions: |
|
return "No questions available. Please fetch questions first." |
|
|
|
processing_status["is_processing"] = True |
|
processing_status["progress"] = 0 |
|
processing_status["total"] = len(cached_questions) |
|
|
|
try: |
|
agent = IntelligentAgent(debug=True, model_name=model_name) |
|
cached_answers = {} |
|
|
|
for i, question_data in enumerate(cached_questions): |
|
if not processing_status["is_processing"]: |
|
break |
|
|
|
task_id = question_data.get("task_id") |
|
question_text = question_data.get("question") |
|
|
|
if not task_id or question_text is None: |
|
continue |
|
|
|
try: |
|
|
|
answer = agent.process_question_with_attachments(question_data) |
|
cached_answers[task_id] = { |
|
"question": question_text, |
|
"answer": answer |
|
} |
|
except Exception as e: |
|
cached_answers[task_id] = { |
|
"question": question_text, |
|
"answer": f"AGENT ERROR: {e}" |
|
} |
|
|
|
processing_status["progress"] = i + 1 |
|
if progress_callback: |
|
progress_callback(i + 1, len(cached_questions)) |
|
|
|
except Exception as e: |
|
print(f"Error in generate_answers_async: {e}") |
|
finally: |
|
processing_status["is_processing"] = False |
|
|
|
def start_answer_generation(model_choice: str): |
|
""" |
|
Start the answer generation process in a separate thread. |
|
""" |
|
if processing_status["is_processing"]: |
|
return "Answer generation is already in progress." |
|
|
|
if not cached_questions: |
|
return "No questions available. Please fetch questions first." |
|
|
|
|
|
model_map = { |
|
"Llama 3.1 8B": "meta-llama/Llama-3.1-8B-Instruct", |
|
"Llama 3.3 70B": "meta-llama/Llama-3.3-70B-Instruct", |
|
"Llama 3.3 Shallow 70B": "tokyotech-llm/Llama-3.3-Swallow-70B-Instruct-v0.4", |
|
"Mistral 7B": "mistralai/Mistral-7B-Instruct-v0.3", |
|
"Qwen 2.5": "Qwen/Qwen‑2.5‑Omni‑7B", |
|
|
|
"Qwen 3": "Qwen/Qwen3-32B" |
|
|
|
} |
|
|
|
selected_model = model_map.get(model_choice, "meta-llama/Llama-3.1-8B-Instruct") |
|
|
|
|
|
thread = threading.Thread(target=generate_answers_async, args=(selected_model,)) |
|
thread.daemon = True |
|
thread.start() |
|
|
|
return f"Answer generation started using {model_choice}. Check progress." |
|
|
|
|
|
def get_generation_progress(): |
|
""" |
|
Get the current progress of answer generation. |
|
""" |
|
if not processing_status["is_processing"] and processing_status["progress"] == 0: |
|
return "Not started" |
|
|
|
if processing_status["is_processing"]: |
|
progress = processing_status["progress"] |
|
total = processing_status["total"] |
|
status_msg = f"Generating answers... {progress}/{total} completed" |
|
return status_msg |
|
else: |
|
|
|
if cached_answers: |
|
|
|
display_data = [] |
|
for task_id, data in cached_answers.items(): |
|
display_data.append({ |
|
"Task ID": task_id, |
|
"Question": data["question"][:100] + "..." if len(data["question"]) > 100 else data["question"], |
|
"Generated Answer": data["answer"][:200] + "..." if len(data["answer"]) > 200 else data["answer"] |
|
}) |
|
|
|
df = pd.DataFrame(display_data) |
|
status_msg = f"Answer generation completed! {len(cached_answers)} answers ready for submission." |
|
return status_msg, df |
|
else: |
|
return "Answer generation completed but no answers were generated." |
|
|
|
def submit_cached_answers(profile: gr.OAuthProfile | None): |
|
""" |
|
Submit the cached answers to the evaluation API. |
|
""" |
|
global cached_answers |
|
|
|
if not profile: |
|
return "Please log in to Hugging Face first.", None |
|
|
|
if not cached_answers: |
|
return "No cached answers available. Please generate answers first.", None |
|
|
|
username = profile.username |
|
space_id = os.getenv("SPACE_ID") |
|
agent_code = f"https://huggingface.co/spaces/{space_id}/tree/main" if space_id else "Unknown" |
|
|
|
|
|
answers_payload = [] |
|
for task_id, data in cached_answers.items(): |
|
answers_payload.append({ |
|
"task_id": task_id, |
|
"submitted_answer": data["answer"] |
|
}) |
|
|
|
submission_data = { |
|
"username": username.strip(), |
|
"agent_code": agent_code, |
|
"answers": answers_payload |
|
} |
|
|
|
|
|
api_url = DEFAULT_API_URL |
|
submit_url = f"{api_url}/submit" |
|
|
|
print(f"Submitting {len(answers_payload)} answers to: {submit_url}") |
|
|
|
try: |
|
response = requests.post(submit_url, json=submission_data, timeout=60) |
|
response.raise_for_status() |
|
result_data = response.json() |
|
|
|
final_status = ( |
|
f"Submission Successful!\n" |
|
f"User: {result_data.get('username')}\n" |
|
f"Overall Score: {result_data.get('score', 'N/A')}% " |
|
f"({result_data.get('correct_count', '?')}/{result_data.get('total_attempted', '?')} correct)\n" |
|
f"Message: {result_data.get('message', 'No message received.')}" |
|
) |
|
|
|
|
|
results_log = [] |
|
for task_id, data in cached_answers.items(): |
|
results_log.append({ |
|
"Task ID": task_id, |
|
"Question": data["question"], |
|
"Submitted Answer": data["answer"] |
|
}) |
|
|
|
results_df = pd.DataFrame(results_log) |
|
return final_status, results_df |
|
|
|
except requests.exceptions.HTTPError as e: |
|
error_detail = f"Server responded with status {e.response.status_code}." |
|
try: |
|
error_json = e.response.json() |
|
error_detail += f" Detail: {error_json.get('detail', e.response.text)}" |
|
except: |
|
error_detail += f" Response: {e.response.text[:500]}" |
|
return f"Submission Failed: {error_detail}", None |
|
|
|
except requests.exceptions.Timeout: |
|
return "Submission Failed: The request timed out.", None |
|
|
|
except Exception as e: |
|
return f"Submission Failed: {e}", None |
|
|
|
def clear_cache(): |
|
""" |
|
Clear all cached data. |
|
""" |
|
global cached_answers, cached_questions, processing_status |
|
cached_answers = {} |
|
cached_questions = [] |
|
processing_status = {"is_processing": False, "progress": 0, "total": 0} |
|
return "Cache cleared successfully.", None |
|
|
|
|
|
with gr.Blocks(title="Intelligent Agent with Media Processing") as demo: |
|
gr.Markdown("# Intelligent Agent with Conditional Search and Media Processing") |
|
gr.Markdown("This agent can process images and audio files, uses an LLM to decide when search is needed, optimizing for both accuracy and efficiency.") |
|
|
|
with gr.Row(): |
|
gr.LoginButton() |
|
clear_btn = gr.Button("Clear Cache", variant="secondary") |
|
|
|
with gr.Tab("Step 1: Fetch Questions"): |
|
gr.Markdown("### Fetch Questions from API") |
|
fetch_btn = gr.Button("Fetch Questions", variant="primary") |
|
fetch_status = gr.Textbox(label="Fetch Status", lines=2, interactive=False) |
|
questions_table = gr.DataFrame(label="Available Questions", wrap=True) |
|
|
|
fetch_btn.click( |
|
fn=fetch_questions, |
|
outputs=[fetch_status, questions_table] |
|
) |
|
|
|
with gr.Tab("Step 2: Generate Answers"): |
|
gr.Markdown("### Generate Answers with Intelligent Search Decision") |
|
|
|
with gr.Row(): |
|
model_choice = gr.Dropdown( |
|
choices=["Llama 3.1 8B", "Llama 3.3 70B", "Llama 3.3 Shallow 70B", "Mistral 7B", "Qwen 2.5", "Qwen 3"], |
|
value="Llama 3.1 8B", |
|
label="Select Model" |
|
) |
|
generate_btn = gr.Button("Start Answer Generation", variant="primary") |
|
refresh_btn = gr.Button("Refresh Progress", variant="secondary") |
|
|
|
generation_status = gr.Textbox(label="Generation Status", lines=2, interactive=False) |
|
answers_table = gr.DataFrame(label="Generated Answers", wrap=True) |
|
|
|
generate_btn.click( |
|
fn=start_answer_generation, |
|
inputs=[model_choice], |
|
outputs=generation_status |
|
) |
|
|
|
refresh_btn.click( |
|
fn=get_generation_progress, |
|
outputs=[generation_status, answers_table] |
|
) |
|
|
|
with gr.Tab("Step 3: Submit Results"): |
|
gr.Markdown("### Submit Generated Answers") |
|
submit_btn = gr.Button("Submit Answers", variant="primary") |
|
submit_status = gr.Textbox(label="Submission Status", lines=4, interactive=False) |
|
results_table = gr.DataFrame(label="Submission Results", wrap=True) |
|
|
|
submit_btn.click( |
|
fn=submit_cached_answers, |
|
outputs=[submit_status, results_table] |
|
) |
|
|
|
|
|
|
|
clear_btn.click( |
|
fn=clear_cache, |
|
outputs=[fetch_status, questions_table] |
|
) |
|
|
|
if __name__ == "__main__": |
|
demo.launch() |
|
|