import os import gradio as gr import requests from smolagents import Tool, CodeAgent, FinalAnswerTool, AzureOpenAIServerModel from tavily import TavilyClient from dotenv import load_dotenv load_dotenv() # Cargar variables de entorno # Variables de entorno AZURE_OPENAI_MODEL = os.getenv("AZURE_OPENAI_MODEL") AZURE_OPENAI_ENDPOINT = os.getenv("AZURE_OPENAI_ENDPOINT") AZURE_OPENAI_API_KEY = os.getenv("AZURE_OPENAI_API_KEY") OPENAI_API_VERSION = os.getenv("OPENAI_API_VERSION") DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space" # Herramientas definidas duck_search = Tool.from_space("duckduckgo-search-v1", name="duck_search", description="Búsqueda en DuckDuckGo", api_name="/predict") google_search = Tool.from_space("google-search-v1", name="google_search", description="Búsqueda en Google", api_name="/predict") visit_page = Tool.from_space("visit-webpage-v1", name="visit_page", description="Visita páginas web", api_name="/predict") wiki_search = Tool.from_space("wikipedia-search-v1", name="wiki_search", description="Búsqueda en Wikipedia", api_name="/predict") do_python = Tool.from_space("python-interpreter-v1", name="do_python", description="Ejecuta código Python", api_name="/predict") final_answer = FinalAnswerTool() # Inicializar el cliente de Tavily tavily_search = TavilyClient() speech_to_text_tool = Tool.from_space("hf-audio/whisper-large-v3-turbo", name="speech_to_text_tool", description="Convierte audio en texto proporcionando un archivo o URL.", api_name="/predict") visual_qa_tool = Tool.from_space("sitammeur/PicQ", name="visual_qa_tool", description="Responde preguntas sobre una imagen proporcionada.", api_name="/predict") # Agente básico class BasicAgent: def __init__(self): # Agente para búsqueda web self.web_agent = CodeAgent( model=AzureOpenAIServerModel( model_id=AZURE_OPENAI_MODEL, azure_endpoint=AZURE_OPENAI_ENDPOINT, api_key=AZURE_OPENAI_API_KEY, api_version=OPENAI_API_VERSION ), tools=[duck_search, google_search, wiki_search, visit_page, final_answer], max_steps=8, name="web_agent", description="Realiza búsquedas web usando Google, DuckDuckGo y Wikipedia.", add_base_tools=True ) # Agente para convertir audio a texto self.audio_agent = CodeAgent( model=AzureOpenAIServerModel( model_id=AZURE_OPENAI_MODEL, azure_endpoint=AZURE_OPENAI_ENDPOINT, api_key=AZURE_OPENAI_API_KEY, api_version=OPENAI_API_VERSION ), tools=[speech_to_text_tool, final_answer], max_steps=4, name="audio_agent", description="Convierte audio en texto.", add_base_tools=True ) # Agente para ejecutar código Python self.py_agent = CodeAgent( model=AzureOpenAIServerModel( model_id=AZURE_OPENAI_MODEL, azure_endpoint=AZURE_OPENAI_ENDPOINT, api_key=AZURE_OPENAI_API_KEY, api_version=OPENAI_API_VERSION ), tools=[do_python, final_answer], additional_authorized_imports=["json", "pandas", "numpy", "regex"], max_steps=8, name="python_code_agent", description="Ejecuta y valida código Python.", add_base_tools=True ) # Agente para responder preguntas sobre imágenes self.visual_agent = CodeAgent( model=AzureOpenAIServerModel( model_id=AZURE_OPENAI_MODEL, azure_endpoint=AZURE_OPENAI_ENDPOINT, api_key=AZURE_OPENAI_API_KEY, api_version=OPENAI_API_VERSION ), tools=[visual_qa_tool, final_answer], max_steps=4, name="visual_qa_agent", description="Responde preguntas sobre imágenes.", add_base_tools=True ) # Agente principal que maneja los subagentes self.manager_agent = CodeAgent( model=AzureOpenAIServerModel( model_id=AZURE_OPENAI_MODEL, azure_endpoint=AZURE_OPENAI_ENDPOINT, api_key=AZURE_OPENAI_API_KEY, api_version=OPENAI_API_VERSION ), tools=[], managed_agents=[self.web_agent, self.audio_agent, self.py_agent, self.visual_agent], planning_interval=8, verbosity_level=2, max_steps=12, add_base_tools=True ) def forward(self, question: str, attachment: str = None) -> str: if attachment: result = self.manager_agent.run(question, additional_args={"attachment": attachment}) else: result = self.manager_agent.run(question) return result # Función para ejecutar el flujo de trabajo completo def run_and_submit_all(profile: gr.OAuthProfile | None): """ Obtiene todas las preguntas, ejecuta el BasicAgent sobre ellas, envía las respuestas y muestra los resultados. """ space_id = os.getenv("SPACE_ID") # Obtener el ID del espacio para enviar el enlace al código if profile: username = f"{profile.username}" print(f"Usuario logueado: {username}") else: print("Usuario no logueado.") return "Por favor, inicie sesión en Hugging Face.", None api_url = DEFAULT_API_URL questions_url = f"{api_url}/questions" attachments_url = f"{api_url}/files/" submit_url = f"{api_url}/submit" try: print("Iniciando agente...") agent = BasicAgent() # Aquí inicializamos el agente except Exception as e: print(f"Error al inicializar el agente: {e}") return f"Error al inicializar el agente: {e}", None # 2. Obtener preguntas print(f"Obteniendo preguntas de: {questions_url}") try: response = requests.get(questions_url, timeout=15) response.raise_for_status() questions_data = response.json() if not questions_data: print("La lista de preguntas está vacía.") return "La lista de preguntas está vacía o en formato incorrecto.", None print(f"Obtenidas {len(questions_data)} preguntas.") for q in questions_data: file_name = q.get("file_name", "") task_id = q.get("task_id") if file_name and task_id: try: att_response = requests.get(f"{attachments_url}{task_id}", timeout=15) att_response.raise_for_status() q["attachment_b64"] = att_response.text except Exception as e: print(f"Error al obtener archivo adjunto para tarea {task_id}: {e}") q["attachment_b64"] = None except requests.exceptions.RequestException as e: print(f"Error al obtener preguntas: {e}") return f"Error al obtener preguntas: {e}", None # 3. Ejecutar agente sobre preguntas results_log = [] answers_payload = [] print(f"Ejecutando agente sobre {len(questions_data)} preguntas...") for item in questions_data: task_id = item.get("task_id") question_text = item.get("question", "") attachment_b64 = item.get("attachment_b64", "") if attachment_b64: question_text = f"{question_text}\n\n[ATTACHMENT:]\n{attachment_b64}" if not task_id or question_text is None: print(f"Saltando elemento con task_id o pregunta faltante: {item}") continue try: submitted_answer = agent.forward(question_text) # Aquí ejecutamos el agente para obtener la respuesta answers_payload.append({"task_id": task_id, "submitted_answer": submitted_answer}) results_log.append({"Task ID": task_id, "Question": question_text, "Submitted Answer": submitted_answer}) except Exception as e: print(f"Error ejecutando agente sobre tarea {task_id}: {e}") results_log.append({"Task ID": task_id, "Question": question_text, "Submitted Answer": f"ERROR AGENTE: {e}"}) if not answers_payload: print("El agente no generó respuestas.") return "El agente no generó respuestas.", None # 4. Preparar la sumisión submission_data = {"username": username.strip(), "answers": answers_payload} print(f"Enviando respuestas para el usuario '{username}'...") # 5. Enviar respuestas try: response = requests.post(submit_url, json=submission_data, timeout=60) response.raise_for_status() result_data = response.json() final_status = ( f"¡Envío exitoso!\n" f"Usuario: {result_data.get('username')}\n" f"Puntuación total: {result_data.get('score', 'N/A')}% " f"({result_data.get('correct_count', '?')}/{result_data.get('total_attempted', '?')} correctas)" ) print("Envío exitoso.") return final_status, results_log except requests.exceptions.RequestException as e: status_message = f"Error al enviar respuestas: {e}" print(status_message) return status_message, results_log # Interfaz de Gradio with gr.Blocks() as demo: gr.Markdown("# Evaluador de Agente Básico") gr.LoginButton() run_button = gr.Button("Ejecutar Evaluación y Enviar Todas las Respuestas") status_output = gr.Textbox(label="Resultado de Ejecución / Envío", lines=5, interactive=False) results_table = gr.DataFrame(label="Preguntas y Respuestas del Agente", wrap=True) run_button.click(fn=run_and_submit_all, outputs=[status_output, results_table]) if __name__ == "__main__": demo.launch(debug=True, share=False)