import os import sys import gradio as gr import json from typing import List, Dict, Any, Optional, Tuple import logging try: # Intentar importar dependencias opcionales from langchain_community.agent_toolkits import create_sql_agent from langchain_community.utilities import SQLDatabase from langchain_google_genai import ChatGoogleGenerativeAI from langchain.agents.agent_types import AgentType import pymysql from dotenv import load_dotenv DEPENDENCIES_AVAILABLE = True except ImportError: # Si faltan dependencias, la aplicación funcionará en modo demo DEPENDENCIES_AVAILABLE = False # Configuración de logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) def check_environment(): """Verifica si el entorno está configurado correctamente.""" if not DEPENDENCIES_AVAILABLE: return False, "Missing required Python packages. Please install them with: pip install -r requirements.txt" # Verificar si estamos en un entorno con variables de entorno required_vars = ["DB_USER", "DB_PASSWORD", "DB_HOST", "DB_NAME", "GOOGLE_API_KEY"] missing_vars = [var for var in required_vars if not os.getenv(var)] if missing_vars: return False, f"Missing required environment variables: {', '.join(missing_vars)}" return True, "Environment is properly configured" def setup_database_connection(): """Intenta establecer una conexión a la base de datos.""" if not DEPENDENCIES_AVAILABLE: return None, "Dependencies not available" try: load_dotenv(override=True) # Debug: Log all environment variables (without sensitive values) logger.info("Environment variables:") for key, value in os.environ.items(): if any(s in key.lower() for s in ['pass', 'key', 'secret']): logger.info(f" {key}: {'*' * 8} (hidden for security)") else: logger.info(f" {key}: {value}") db_user = os.getenv("DB_USER") db_password = os.getenv("DB_PASSWORD") db_host = os.getenv("DB_HOST") db_name = os.getenv("DB_NAME") # Debug: Log database connection info (without password) logger.info(f"Database connection attempt - Host: {db_host}, User: {db_user}, DB: {db_name}") if not all([db_user, db_password, db_host, db_name]): missing = [var for var, val in [ ("DB_USER", db_user), ("DB_PASSWORD", "*" if db_password else ""), ("DB_HOST", db_host), ("DB_NAME", db_name) ] if not val] logger.error(f"Missing required database configuration: {', '.join(missing)}") return None, f"Missing database configuration: {', '.join(missing)}" if not all([db_user, db_password, db_host, db_name]): return None, "Missing database configuration" logger.info(f"Connecting to database: {db_user}@{db_host}/{db_name}") # Probar conexión connection = pymysql.connect( host=db_host, user=db_user, password=db_password, database=db_name, connect_timeout=5, cursorclass=pymysql.cursors.DictCursor ) connection.close() # Si la conexión es exitosa, crear motor SQLAlchemy db_uri = f"mysql+pymysql://{db_user}:{db_password}@{db_host}/{db_name}" logger.info("Database connection successful") return SQLDatabase.from_uri(db_uri), "" except Exception as e: error_msg = f"Error connecting to database: {str(e)}" logger.error(error_msg) return None, error_msg def initialize_llm(): """Inicializa el modelo de lenguaje.""" if not DEPENDENCIES_AVAILABLE: error_msg = "Dependencies not available. Make sure all required packages are installed." logger.error(error_msg) return None, error_msg google_api_key = os.getenv("GOOGLE_API_KEY") logger.info(f"GOOGLE_API_KEY found: {'Yes' if google_api_key else 'No'}") if not google_api_key: error_msg = "GOOGLE_API_KEY not found in environment variables. Please check your Hugging Face Space secrets." logger.error(error_msg) return None, error_msg try: logger.info("Initializing Google Generative AI...") llm = ChatGoogleGenerativeAI( model="gemini-2.0-flash", temperature=0, google_api_key=google_api_key ) # Test the model with a simple prompt test_prompt = "Hello, this is a test." logger.info(f"Testing model with prompt: {test_prompt}") test_response = llm.invoke(test_prompt) logger.info(f"Model test response: {str(test_response)[:100]}...") # Log first 100 chars logger.info("Google Generative AI initialized successfully") return llm, "" except Exception as e: error_msg = f"Error initializing Google Generative AI: {str(e)}" logger.error(error_msg, exc_info=True) # Include full stack trace return None, error_msg def create_agent(): """Crea el agente SQL si es posible.""" if not DEPENDENCIES_AVAILABLE: error_msg = "Dependencies not available. Please check if all required packages are installed." logger.error(error_msg) return None, error_msg logger.info("Starting agent creation process...") # Step 1: Set up database connection logger.info("Setting up database connection...") db, db_error = setup_database_connection() if not db: error_msg = f"Failed to connect to database: {db_error}" logger.error(error_msg) else: logger.info("Database connection successful") # Step 2: Initialize LLM logger.info("Initializing language model...") llm, llm_error = initialize_llm() if not llm: error_msg = f"Failed to initialize language model: {llm_error}" logger.error(error_msg) else: logger.info("Language model initialized successfully") # Check if both components are available if not db or not llm: error_msg = f"Cannot create agent. {db_error if not db else ''} {llm_error if not llm else ''}" logger.error(error_msg) return None, error_msg # Step 3: Create SQL agent try: logger.info("Creating SQL agent...") agent = create_sql_agent( llm=llm, db=db, agent_type=AgentType.OPENAI_FUNCTIONS, verbose=True ) # Test the agent with a simple query try: logger.info("Testing agent with a simple query...") test_result = agent.invoke({"input": "What tables are available?"}) logger.info(f"Agent test response: {str(test_result)[:200]}...") # Log first 200 chars except Exception as test_error: logger.warning(f"Agent test query failed (this might be expected): {str(test_error)}") logger.info("SQL agent created and tested successfully") return agent, "" except Exception as e: error_msg = f"Error creating SQL agent: {str(e)}" logger.error(error_msg, exc_info=True) # Include full stack trace return None, error_msg # Inicializar el agente logger.info("="*50) logger.info("Starting application initialization...") logger.info(f"Python version: {sys.version}") logger.info(f"Current working directory: {os.getcwd()}") logger.info(f"Files in working directory: {os.listdir('.')}") # Check environment variables logger.info("Checking environment variables...") required_vars = ["DB_USER", "DB_HOST", "DB_NAME", "GOOGLE_API_KEY"] for var in required_vars: logger.info(f"{var}: {'*' * 8 if os.getenv(var) else 'NOT SET'}") # Initialize agent logger.info("Initializing agent...") agent, agent_error = create_agent() db_connected = agent is not None if agent: logger.info("Agent initialized successfully") else: logger.error(f"Failed to initialize agent: {agent_error}") logger.info("="*50) def extract_sql_query(text): """Extrae consultas SQL del texto usando expresiones regulares.""" if not text: return None # Buscar código SQL entre backticks sql_match = re.search(r'```(?:sql)?\s*(.*?)```', text, re.DOTALL) if sql_match: return sql_match.group(1).strip() # Si no hay backticks, buscar una consulta SQL simple sql_match = re.search(r'(SELECT|INSERT|UPDATE|DELETE|CREATE|ALTER|DROP|TRUNCATE).*?;', text, re.IGNORECASE | re.DOTALL) if sql_match: return sql_match.group(0).strip() return None def execute_sql_query(query, db_connection): """Ejecuta una consulta SQL y devuelve los resultados como una cadena.""" if not db_connection: return "Error: No hay conexión a la base de datos" try: with db_connection._engine.connect() as connection: result = connection.execute(query) rows = result.fetchall() # Convertir los resultados a un formato legible if not rows: return "La consulta no devolvió resultados" # Si es un solo resultado, devolverlo directamente if len(rows) == 1 and len(rows[0]) == 1: return str(rows[0][0]) # Si hay múltiples filas, formatear como tabla try: import pandas as pd df = pd.DataFrame(rows) return df.to_markdown(index=False) except ImportError: # Si pandas no está disponible, usar formato simple return "\n".join([str(row) for row in rows]) except Exception as e: return f"Error ejecutando la consulta: {str(e)}" def generate_plot(data, x_col, y_col, title, x_label, y_label): """Generate a plot from data and return the file path.""" plt.figure(figsize=(10, 6)) plt.bar(data[x_col], data[y_col]) plt.title(title) plt.xlabel(x_label) plt.ylabel(y_label) plt.xticks(rotation=45) plt.tight_layout() # Save to a temporary file temp_dir = tempfile.mkdtemp() plot_path = os.path.join(temp_dir, "plot.png") plt.savefig(plot_path) plt.close() return plot_path def convert_to_messages_format(chat_history): """Convert chat history to the format expected by Gradio 5.x""" if not chat_history: return [] messages = [] # If the first element is a list, assume it's in the old format if isinstance(chat_history[0], list): for msg in chat_history: if isinstance(msg, list) and len(msg) == 2: # Format: [user_msg, bot_msg] user_msg, bot_msg = msg if user_msg: messages.append({"role": "user", "content": user_msg}) if bot_msg: messages.append({"role": "assistant", "content": bot_msg}) else: # Assume it's already in the correct format or can be used as is for msg in chat_history: if isinstance(msg, dict) and "role" in msg and "content" in msg: messages.append(msg) elif isinstance(msg, str): # If it's a string, assume it's a user message messages.append({"role": "user", "content": msg}) return messages async def stream_agent_response(question: str, chat_history: List) -> tuple: """Procesa la pregunta del usuario y devuelve la respuesta del agente.""" # Initialize response response_text = "" messages = [] # Add previous chat history if chat_history: messages.extend(chat_history) # Add user's question messages.append({"role": "user", "content": question}) if not agent: error_msg = ( "## ⚠️ Error: Agente no inicializado\n\n" "No se pudo inicializar el agente de base de datos. Por favor, verifica que:\n" "1. Todas las variables de entorno estén configuradas correctamente\n" "2. La base de datos esté accesible\n" f"3. El modelo de lenguaje esté disponible\n\n" f"Error: {agent_error}" ) messages.append({"role": "assistant", "content": error_msg}) yield messages return try: # Execute the agent response = await agent.ainvoke({"input": question, "chat_history": chat_history}) # Process the response if hasattr(response, 'output'): response_text = response.output # Check if the response contains an SQL query sql_query = extract_sql_query(response_text) if sql_query: # Execute the query and update the response db_connection, _ = setup_database_connection() if db_connection: query_result = execute_sql_query(sql_query, db_connection) response_text += f"\n\n### 🔍 Resultado de la consulta:\n```sql\n{sql_query}\n```\n\n{query_result}" else: response_text += "\n\n⚠️ No se pudo conectar a la base de datos para ejecutar la consulta." else: response_text = "Error: No se recibió respuesta del agente." # Add assistant's response to the chat history messages.append({"role": "assistant", "content": response_text}) # Yield the updated messages yield messages except Exception as e: error_msg = f"## ❌ Error\n\nOcurrió un error al procesar tu solicitud:\n\n```\n{str(e)}\n```" messages.append({"role": "assistant", "content": error_msg}) yield messages # Custom CSS for the app custom_css = """ .gradio-container { max-width: 1200px !important; margin: 0 auto !important; font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, Oxygen, Ubuntu, Cantarell, sans-serif; } #chatbot { min-height: 500px; border: 1px solid #e0e0e0; border-radius: 8px; margin-bottom: 20px; padding: 20px; background-color: #f9f9f9; } .user-message, .bot-message { padding: 12px 16px; border-radius: 18px; margin: 8px 0; max-width: 80%; line-height: 1.5; } .user-message { background-color: #007bff; color: white; margin-left: auto; border-bottom-right-radius: 4px; } .bot-message { background-color: #f1f1f1; color: #333; margin-right: auto; border-bottom-left-radius: 4px; } #question-input textarea { min-height: 50px !important; border-radius: 8px !important; padding: 12px !important; font-size: 16px !important; } #send-button { height: 100%; background-color: #007bff !important; color: white !important; border: none !important; border-radius: 8px !important; font-weight: 500 !important; transition: background-color 0.2s !important; } #send-button:hover { background-color: #0056b3 !important; } .status-message { text-align: center; color: #666; font-style: italic; margin: 10px 0; } """ def create_ui(): """Crea y devuelve los componentes de la interfaz de usuario de Gradio.""" # Verificar el estado del entorno env_ok, env_message = check_environment() # Crear el tema personalizado theme = gr.themes.Soft( primary_hue="blue", secondary_hue="indigo", neutral_hue="slate" ) with gr.Blocks( css=custom_css, title="Asistente de Base de Datos SQL", theme=theme ) as demo: # Encabezado gr.Markdown(""" # 🤖 Asistente de Base de Datos SQL Haz preguntas en lenguaje natural sobre tu base de datos y obtén resultados de consultas SQL. """) # Mensaje de estado if not env_ok: gr.Warning("⚠️ " + env_message) # Chatbot component chatbot = gr.Chatbot( label="Chat", height=500, show_label=True, container=True, type="messages", elem_id="chatbot" ) # Input area with gr.Row(): question_input = gr.Textbox( label="", placeholder="Escribe tu pregunta aquí...", container=False, scale=5, min_width=300, max_lines=3, autofocus=True, elem_id="question-input" ) submit_button = gr.Button( "Enviar", variant="primary", min_width=100, scale=1, elem_id="send-button" ) # System status with gr.Accordion("ℹ️ Estado del sistema", open=not env_ok): if not DEPENDENCIES_AVAILABLE: gr.Markdown(""" ## ❌ Dependencias faltantes Para ejecutar esta aplicación localmente, necesitas instalar las dependencias: ```bash pip install -r requirements.txt ``` """) else: if not agent: gr.Markdown(f""" ## ⚠️ Configuración incompleta No se pudo inicializar el agente de base de datos. Por favor, verifica que: 1. Todas las variables de entorno estén configuradas correctamente 2. La base de datos esté accesible 3. La API de Google Gemini esté configurada **Error:** {agent_error if agent_error else 'No se pudo determinar el error'} ### Configuración local Crea un archivo `.env` en la raíz del proyecto con las siguientes variables: ``` DB_USER=tu_usuario DB_PASSWORD=tu_contraseña DB_HOST=tu_servidor DB_NAME=tu_base_de_datos GOOGLE_API_KEY=tu_api_key_de_google ``` """) else: if os.getenv('SPACE_ID'): # Modo demo en Hugging Face Spaces gr.Markdown(""" ## 🚀 Modo Demo Esta es una demostración del asistente de base de datos SQL. Para usar la versión completa con conexión a base de datos: 1. Clona este espacio en tu cuenta de Hugging Face 2. Configura las variables de entorno en la configuración del espacio: - `DB_USER`: Tu usuario de base de datos - `DB_PASSWORD`: Tu contraseña de base de datos - `DB_HOST`: La dirección del servidor de base de datos - `DB_NAME`: El nombre de la base de datos - `GOOGLE_API_KEY`: Tu clave de API de Google Gemini **Nota:** Actualmente estás en modo de solo demostración. """) else: gr.Markdown(""" ## ✅ Sistema listo El asistente está listo para responder tus preguntas sobre la base de datos. """) # Hidden component for streaming output streaming_output_display = gr.Textbox(visible=False) return demo, chatbot, question_input, submit_button, streaming_output_display def create_application(): """Create and configure the Gradio application.""" # Create the UI components demo, chatbot, question_input, submit_button, streaming_output_display = create_ui() def user_message(user_input: str, chat_history: List[Dict]) -> Tuple[str, List[Dict]]: """Add user message to chat history and clear input.""" if not user_input.strip(): return "", chat_history logger.info(f"User message: {user_input}") # Initialize chat history if needed if chat_history is None: chat_history = [] # Add user message chat_history.append({"role": "user", "content": user_input}) # Add empty assistant response chat_history.append({"role": "assistant", "content": ""}) # Clear the input return "", chat_history async def bot_response(chat_history: List[Dict]) -> List[Dict]: """Get bot response and update chat history.""" if not chat_history or chat_history[-1]["role"] != "assistant": return chat_history try: # Get the user's question (second to last message) if len(chat_history) < 2: return chat_history question = chat_history[-2]["content"] logger.info(f"Processing question: {question}") # Call the agent and stream the response async for response in stream_agent_response(question, chat_history[:-2]): if isinstance(response, tuple) and len(response) > 0 and isinstance(response[0], list): messages = response[0] if messages and messages[-1]["role"] == "assistant": # Update the assistant's response chat_history[-1] = messages[-1] yield chat_history logger.info("Response generation complete") except Exception as e: error_msg = f"Error al procesar la solicitud: {str(e)}" logger.error(error_msg, exc_info=True) chat_history[-1]["content"] = error_msg yield chat_history # Event handlers with demo: # Handle form submission msg_submit = question_input.submit( fn=user_message, inputs=[question_input, chatbot], outputs=[question_input, chatbot], queue=True ).then( fn=bot_response, inputs=[chatbot], outputs=[chatbot], api_name="ask" ) # Handle button click btn_click = submit_button.click( fn=user_message, inputs=[question_input, chatbot], outputs=[question_input, chatbot], queue=True ).then( fn=bot_response, inputs=[chatbot], outputs=[chatbot] ) return demo # Create the application demo = create_application() # Configuración para Hugging Face Spaces def get_app(): """Obtiene la instancia de la aplicación Gradio para Hugging Face Spaces.""" # Verificar si estamos en un entorno de Hugging Face Spaces if os.getenv('SPACE_ID'): # Configuración específica para Spaces demo.title = "🤖 Asistente de Base de Datos SQL (Demo)" demo.description = """ Este es un demo del asistente de base de datos SQL. Para usar la versión completa con conexión a base de datos, clona este espacio y configura las variables de entorno. """ return demo # Para desarrollo local if __name__ == "__main__": # Configuración para desarrollo local - versión simplificada para Gradio 5.x demo.launch( server_name="0.0.0.0", server_port=7860, debug=True, share=False )