Jeremy Live
d2
dc2d325
raw
history blame
24.6 kB
import os
import sys
import gradio as gr
import json
from typing import List, Dict, Any, Optional, Tuple
import logging
try:
# Intentar importar dependencias opcionales
from langchain_community.agent_toolkits import create_sql_agent
from langchain_community.utilities import SQLDatabase
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain.agents.agent_types import AgentType
import pymysql
from dotenv import load_dotenv
DEPENDENCIES_AVAILABLE = True
except ImportError:
# Si faltan dependencias, la aplicación funcionará en modo demo
DEPENDENCIES_AVAILABLE = False
# Configuración de logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def check_environment():
"""Verifica si el entorno está configurado correctamente."""
if not DEPENDENCIES_AVAILABLE:
return False, "Missing required Python packages. Please install them with: pip install -r requirements.txt"
# Verificar si estamos en un entorno con variables de entorno
required_vars = ["DB_USER", "DB_PASSWORD", "DB_HOST", "DB_NAME", "GOOGLE_API_KEY"]
missing_vars = [var for var in required_vars if not os.getenv(var)]
if missing_vars:
return False, f"Missing required environment variables: {', '.join(missing_vars)}"
return True, "Environment is properly configured"
def setup_database_connection():
"""Intenta establecer una conexión a la base de datos."""
if not DEPENDENCIES_AVAILABLE:
return None, "Dependencies not available"
try:
load_dotenv(override=True)
# Debug: Log all environment variables (without sensitive values)
logger.info("Environment variables:")
for key, value in os.environ.items():
if any(s in key.lower() for s in ['pass', 'key', 'secret']):
logger.info(f" {key}: {'*' * 8} (hidden for security)")
else:
logger.info(f" {key}: {value}")
db_user = os.getenv("DB_USER")
db_password = os.getenv("DB_PASSWORD")
db_host = os.getenv("DB_HOST")
db_name = os.getenv("DB_NAME")
# Debug: Log database connection info (without password)
logger.info(f"Database connection attempt - Host: {db_host}, User: {db_user}, DB: {db_name}")
if not all([db_user, db_password, db_host, db_name]):
missing = [var for var, val in [
("DB_USER", db_user),
("DB_PASSWORD", "*" if db_password else ""),
("DB_HOST", db_host),
("DB_NAME", db_name)
] if not val]
logger.error(f"Missing required database configuration: {', '.join(missing)}")
return None, f"Missing database configuration: {', '.join(missing)}"
if not all([db_user, db_password, db_host, db_name]):
return None, "Missing database configuration"
logger.info(f"Connecting to database: {db_user}@{db_host}/{db_name}")
# Probar conexión
connection = pymysql.connect(
host=db_host,
user=db_user,
password=db_password,
database=db_name,
connect_timeout=5,
cursorclass=pymysql.cursors.DictCursor
)
connection.close()
# Si la conexión es exitosa, crear motor SQLAlchemy
db_uri = f"mysql+pymysql://{db_user}:{db_password}@{db_host}/{db_name}"
logger.info("Database connection successful")
return SQLDatabase.from_uri(db_uri), ""
except Exception as e:
error_msg = f"Error connecting to database: {str(e)}"
logger.error(error_msg)
return None, error_msg
def initialize_llm():
"""Inicializa el modelo de lenguaje."""
if not DEPENDENCIES_AVAILABLE:
error_msg = "Dependencies not available. Make sure all required packages are installed."
logger.error(error_msg)
return None, error_msg
google_api_key = os.getenv("GOOGLE_API_KEY")
logger.info(f"GOOGLE_API_KEY found: {'Yes' if google_api_key else 'No'}")
if not google_api_key:
error_msg = "GOOGLE_API_KEY not found in environment variables. Please check your Hugging Face Space secrets."
logger.error(error_msg)
return None, error_msg
try:
logger.info("Initializing Google Generative AI...")
llm = ChatGoogleGenerativeAI(
model="gemini-2.0-flash",
temperature=0,
google_api_key=google_api_key
)
# Test the model with a simple prompt
test_prompt = "Hello, this is a test."
logger.info(f"Testing model with prompt: {test_prompt}")
test_response = llm.invoke(test_prompt)
logger.info(f"Model test response: {str(test_response)[:100]}...") # Log first 100 chars
logger.info("Google Generative AI initialized successfully")
return llm, ""
except Exception as e:
error_msg = f"Error initializing Google Generative AI: {str(e)}"
logger.error(error_msg, exc_info=True) # Include full stack trace
return None, error_msg
def create_agent():
"""Crea el agente SQL si es posible."""
if not DEPENDENCIES_AVAILABLE:
error_msg = "Dependencies not available. Please check if all required packages are installed."
logger.error(error_msg)
return None, error_msg
logger.info("Starting agent creation process...")
# Step 1: Set up database connection
logger.info("Setting up database connection...")
db, db_error = setup_database_connection()
if not db:
error_msg = f"Failed to connect to database: {db_error}"
logger.error(error_msg)
else:
logger.info("Database connection successful")
# Step 2: Initialize LLM
logger.info("Initializing language model...")
llm, llm_error = initialize_llm()
if not llm:
error_msg = f"Failed to initialize language model: {llm_error}"
logger.error(error_msg)
else:
logger.info("Language model initialized successfully")
# Check if both components are available
if not db or not llm:
error_msg = f"Cannot create agent. {db_error if not db else ''} {llm_error if not llm else ''}"
logger.error(error_msg)
return None, error_msg
# Step 3: Create SQL agent
try:
logger.info("Creating SQL agent...")
agent = create_sql_agent(
llm=llm,
db=db,
agent_type=AgentType.OPENAI_FUNCTIONS,
verbose=True
)
# Test the agent with a simple query
try:
logger.info("Testing agent with a simple query...")
test_result = agent.invoke({"input": "What tables are available?"})
logger.info(f"Agent test response: {str(test_result)[:200]}...") # Log first 200 chars
except Exception as test_error:
logger.warning(f"Agent test query failed (this might be expected): {str(test_error)}")
logger.info("SQL agent created and tested successfully")
return agent, ""
except Exception as e:
error_msg = f"Error creating SQL agent: {str(e)}"
logger.error(error_msg, exc_info=True) # Include full stack trace
return None, error_msg
# Inicializar el agente
logger.info("="*50)
logger.info("Starting application initialization...")
logger.info(f"Python version: {sys.version}")
logger.info(f"Current working directory: {os.getcwd()}")
logger.info(f"Files in working directory: {os.listdir('.')}")
# Check environment variables
logger.info("Checking environment variables...")
required_vars = ["DB_USER", "DB_HOST", "DB_NAME", "GOOGLE_API_KEY"]
for var in required_vars:
logger.info(f"{var}: {'*' * 8 if os.getenv(var) else 'NOT SET'}")
# Initialize agent
logger.info("Initializing agent...")
agent, agent_error = create_agent()
db_connected = agent is not None
if agent:
logger.info("Agent initialized successfully")
else:
logger.error(f"Failed to initialize agent: {agent_error}")
logger.info("="*50)
def extract_sql_query(text):
"""Extrae consultas SQL del texto usando expresiones regulares."""
if not text:
return None
# Buscar código SQL entre backticks
sql_match = re.search(r'```(?:sql)?\s*(.*?)```', text, re.DOTALL)
if sql_match:
return sql_match.group(1).strip()
# Si no hay backticks, buscar una consulta SQL simple
sql_match = re.search(r'(SELECT|INSERT|UPDATE|DELETE|CREATE|ALTER|DROP|TRUNCATE).*?;', text, re.IGNORECASE | re.DOTALL)
if sql_match:
return sql_match.group(0).strip()
return None
def execute_sql_query(query, db_connection):
"""Ejecuta una consulta SQL y devuelve los resultados como una cadena."""
if not db_connection:
return "Error: No hay conexión a la base de datos"
try:
with db_connection._engine.connect() as connection:
result = connection.execute(query)
rows = result.fetchall()
# Convertir los resultados a un formato legible
if not rows:
return "La consulta no devolvió resultados"
# Si es un solo resultado, devolverlo directamente
if len(rows) == 1 and len(rows[0]) == 1:
return str(rows[0][0])
# Si hay múltiples filas, formatear como tabla
try:
import pandas as pd
df = pd.DataFrame(rows)
return df.to_markdown(index=False)
except ImportError:
# Si pandas no está disponible, usar formato simple
return "\n".join([str(row) for row in rows])
except Exception as e:
return f"Error ejecutando la consulta: {str(e)}"
def generate_plot(data, x_col, y_col, title, x_label, y_label):
"""Generate a plot from data and return the file path."""
plt.figure(figsize=(10, 6))
plt.bar(data[x_col], data[y_col])
plt.title(title)
plt.xlabel(x_label)
plt.ylabel(y_label)
plt.xticks(rotation=45)
plt.tight_layout()
# Save to a temporary file
temp_dir = tempfile.mkdtemp()
plot_path = os.path.join(temp_dir, "plot.png")
plt.savefig(plot_path)
plt.close()
return plot_path
def convert_to_messages_format(chat_history):
"""Convert chat history to the format expected by Gradio 5.x"""
messages = []
for msg in chat_history:
if isinstance(msg, (list, tuple)) and len(msg) == 2:
if msg[0]: # User message
messages.append({"role": "user", "content": msg[0]})
if msg[1]: # Assistant message
messages.append({"role": "assistant", "content": msg[1]})
return messages
async def stream_agent_response(question: str, chat_history: List) -> Tuple[List, Dict]:
"""Procesa la pregunta del usuario y devuelve la respuesta del agente."""
# Convert to messages format for Gradio 5.x
messages = convert_to_messages_format(chat_history)
if not agent:
error_msg = (
"## ⚠️ Error: Agente no inicializado\n\n"
"No se pudo inicializar el agente de base de datos. Por favor, verifica que:\n"
"1. Todas las variables de entorno estén configuradas correctamente\n"
"2. La base de datos esté accesible\n"
f"3. El modelo de lenguaje esté disponible\n\n"
f"Error: {agent_error}"
)
messages.append({"role": "user", "content": question})
messages.append({"role": "assistant", "content": error_msg})
yield messages, gr.update(visible=False)
return
try:
# Add user's question to the chat history
messages.append({"role": "user", "content": question})
yield messages, gr.update(visible=False)
# Execute the agent
response = await agent.ainvoke({"input": question, "chat_history": chat_history})
# Process the response
if hasattr(response, 'output'):
response_text = response.output
# Check if the response contains an SQL query
sql_query = extract_sql_query(response_text)
if sql_query:
# Execute the query and update the response
db_connection, _ = setup_database_connection()
if db_connection:
query_result = execute_sql_query(sql_query, db_connection)
response_text += f"\n\n### 🔍 Resultado de la consulta:\n```sql\n{sql_query}\n```\n\n{query_result}"
else:
response_text += "\n\n⚠️ No se pudo conectar a la base de datos para ejecutar la consulta."
else:
response_text = "Error: No se recibió respuesta del agente."
# Add assistant's response to the chat history
messages.append({"role": "assistant", "content": response_text})
yield messages, gr.update(visible=False)
except Exception as e:
error_msg = f"## ❌ Error\n\nOcurrió un error al procesar tu solicitud:\n\n```\n{str(e)}\n```"
messages.append({"role": "assistant", "content": error_msg})
yield messages, gr.update(visible=False)
yield chat_history, gr.update(visible=False)
# Custom CSS for the app
custom_css = """
.gradio-container {
max-width: 1200px !important;
margin: 0 auto !important;
font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, Oxygen, Ubuntu, Cantarell, sans-serif;
}
#chatbot {
min-height: 500px;
border: 1px solid #e0e0e0;
border-radius: 8px;
margin-bottom: 20px;
padding: 20px;
background-color: #f9f9f9;
}
.user-message, .bot-message {
padding: 12px 16px;
border-radius: 18px;
margin: 8px 0;
max-width: 80%;
line-height: 1.5;
}
.user-message {
background-color: #007bff;
color: white;
margin-left: auto;
border-bottom-right-radius: 4px;
}
.bot-message {
background-color: #f1f1f1;
color: #333;
margin-right: auto;
border-bottom-left-radius: 4px;
}
#question-input textarea {
min-height: 50px !important;
border-radius: 8px !important;
padding: 12px !important;
font-size: 16px !important;
}
#send-button {
height: 100%;
background-color: #007bff !important;
color: white !important;
border: none !important;
border-radius: 8px !important;
font-weight: 500 !important;
transition: background-color 0.2s !important;
}
#send-button:hover {
background-color: #0056b3 !important;
}
.status-message {
text-align: center;
color: #666;
font-style: italic;
margin: 10px 0;
}
"""
def create_ui():
"""Crea y devuelve los componentes de la interfaz de usuario de Gradio."""
# Verificar el estado del entorno
env_ok, env_message = check_environment()
# Crear el tema personalizado
theme = gr.themes.Soft(
primary_hue="blue",
secondary_hue="indigo",
neutral_hue="slate"
)
with gr.Blocks(
css=custom_css,
title="Asistente de Base de Datos SQL",
theme=theme
) as demo:
# Encabezado
gr.Markdown("""
# 🤖 Asistente de Base de Datos SQL
Haz preguntas en lenguaje natural sobre tu base de datos y obtén resultados de consultas SQL.
""")
# Mensaje de estado
if not env_ok:
gr.Warning("⚠️ " + env_message)
with gr.Accordion("ℹ️ Estado del sistema", open=not env_ok):
if not DEPENDENCIES_AVAILABLE:
gr.Markdown("""
## ❌ Dependencias faltantes
Para ejecutar esta aplicación localmente, necesitas instalar las dependencias:
```bash
pip install -r requirements.txt
```
""")
else:
if not agent:
gr.Markdown(f"""
## ⚠️ Configuración incompleta
No se pudo inicializar el agente de base de datos. Por favor, verifica que:
1. Todas las variables de entorno estén configuradas correctamente
2. La base de datos esté accesible
3. La API de Google Gemini esté configurada
**Error:** {agent_error if agent_error else 'No se pudo determinar el error'}
### Configuración local
Crea un archivo `.env` en la raíz del proyecto con las siguientes variables:
```
DB_USER=tu_usuario
DB_PASSWORD=tu_contraseña
DB_HOST=tu_servidor
DB_NAME=tu_base_de_datos
GOOGLE_API_KEY=tu_api_key_de_google
```
""")
else:
if os.getenv('SPACE_ID'):
# Modo demo en Hugging Face Spaces
gr.Markdown("""
## 🚀 Modo Demo
Esta es una demostración del asistente de base de datos SQL. Para usar la versión completa con conexión a base de datos:
1. Clona este espacio en tu cuenta de Hugging Face
2. Configura las variables de entorno en la configuración del espacio:
- `DB_USER`: Tu usuario de base de datos
- `DB_PASSWORD`: Tu contraseña de base de datos
- `DB_HOST`: La dirección del servidor de base de datos
- `DB_NAME`: El nombre de la base de datos
- `GOOGLE_API_KEY`: Tu clave de API de Google Gemini
**Nota:** Actualmente estás en modo de solo demostración.
""")
else:
gr.Markdown("""
## ✅ Sistema listo
El asistente está listo para responder tus preguntas sobre la base de datos.
""")
# Interfaz de chat - usando el nuevo formato de mensajes
chatbot = gr.Chatbot(
label="Chat",
height=500,
type="messages" # Usando el nuevo formato de mensajes
)
# Área de entrada
with gr.Row():
question_input = gr.Textbox(
label="",
placeholder="Escribe tu pregunta sobre la base de datos...",
elem_id="question-input",
container=False,
scale=5,
min_width=300,
max_lines=3,
autofocus=True
)
submit_button = gr.Button(
"Enviar",
elem_id="send-button",
min_width=100,
scale=1,
variant="primary"
)
# Información del sistema (solo para depuración)
with gr.Accordion("🔍 Información de depuración", open=False):
gr.Markdown("""
### Estado del sistema
- **Base de datos**: {}
- **Modelo**: {}
- **Modo**: {}
""".format(
f"Conectado a {os.getenv('DB_HOST')}/{os.getenv('DB_NAME')}" if db_connected else "No conectado",
"gemini-2.0-flash" if agent else "No disponible",
"Completo" if agent else "Demo (sin conexión a base de datos)"
))
# Mostrar variables de entorno (solo para depuración)
if os.getenv("SHOW_ENV_DEBUG", "false").lower() == "true":
env_vars = {k: "***" if "PASS" in k or "KEY" in k else v
for k, v in os.environ.items()
if k.startswith(('DB_', 'GOOGLE_'))}
gr.Code(
json.dumps(env_vars, indent=2, ensure_ascii=False),
language="json",
label="Variables de entorno"
)
# Hidden component for streaming output
streaming_output_display = gr.Textbox(visible=False)
return demo, chatbot, question_input, submit_button, streaming_output_display
def create_application():
"""Create and configure the Gradio application."""
# Create the UI components
demo, chatbot, question_input, submit_button, streaming_output_display = create_ui()
def user_message(user_input: str, chat_history: List[Dict]) -> Tuple[str, List[Dict]]:
"""Add user message to chat history and clear input."""
if not user_input.strip():
return "", chat_history
logger.info(f"User message: {user_input}")
# Convert to messages format if needed
if chat_history and isinstance(chat_history[0], list):
chat_history = convert_to_messages_format(chat_history)
# Add user message to chat history
updated_history = chat_history + [{"role": "user", "content": user_input}]
return "", updated_history
async def bot_response(chat_history: List[Dict]) -> Tuple[List[Dict], Dict]:
"""Get bot response and update chat history."""
if not chat_history or not chat_history[-1].get("role") == "user":
return chat_history, gr.update(visible=False)
# Get the last user message
question = chat_history[-1]["content"]
logger.info(f"Processing question: {question}")
# Convert to old format for backward compatibility with stream_agent_response
old_format = []
for msg in chat_history:
if msg["role"] == "user":
old_format.append([msg["content"], None])
elif msg["role"] == "assistant" and old_format and len(old_format[-1]) == 2 and old_format[-1][1] is None:
old_format[-1][1] = msg["content"]
# Call the agent and get the response
# We need to consume the async generator and return the last value
last_response = None
async for response in stream_agent_response(question, old_format[:-1]):
last_response = response
return last_response
# Event handlers
with demo:
submit_click = submit_button.click(
fn=user_message,
inputs=[question_input, chatbot],
outputs=[question_input, chatbot],
queue=True
).then(
fn=bot_response,
inputs=[chatbot],
outputs=[chatbot, streaming_output_display],
api_name="ask"
)
question_input.submit(
fn=user_message,
inputs=[question_input, chatbot],
outputs=[question_input, chatbot],
queue=True
).then(
fn=bot_response,
inputs=[chatbot],
outputs=[chatbot, streaming_output_display]
)
return demo
# Create the application
demo = create_application()
# Configuración para Hugging Face Spaces
def get_app():
"""Obtiene la instancia de la aplicación Gradio para Hugging Face Spaces."""
# Verificar si estamos en un entorno de Hugging Face Spaces
if os.getenv('SPACE_ID'):
# Configuración específica para Spaces
demo.title = "🤖 Asistente de Base de Datos SQL (Demo)"
demo.description = """
Este es un demo del asistente de base de datos SQL.
Para usar la versión completa con conexión a base de datos, clona este espacio y configura las variables de entorno.
"""
return demo
# Para desarrollo local
if __name__ == "__main__":
# Configuración para desarrollo local - versión simplificada para Gradio 5.x
demo.launch(
server_name="0.0.0.0",
server_port=7860,
debug=True,
share=False
)