Jeremy Live
template
f639c56
raw
history blame
17.6 kB
import os
import gradio as gr
import json
from typing import List, Dict, Any, Optional, Tuple
import logging
try:
# Intentar importar dependencias opcionales
from langchain_community.agent_toolkits import create_sql_agent
from langchain_community.utilities import SQLDatabase
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain.agents.agent_types import AgentType
import pymysql
from dotenv import load_dotenv
DEPENDENCIES_AVAILABLE = True
except ImportError:
# Si faltan dependencias, la aplicación funcionará en modo demo
DEPENDENCIES_AVAILABLE = False
# Configuración de logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def check_environment():
"""Verifica si el entorno está configurado correctamente."""
if not DEPENDENCIES_AVAILABLE:
return False, "Missing required Python packages. Please install them with: pip install -r requirements.txt"
# Verificar si estamos en un entorno con variables de entorno
required_vars = ["DB_USER", "DB_PASSWORD", "DB_HOST", "DB_NAME", "GOOGLE_API_KEY"]
missing_vars = [var for var in required_vars if not os.getenv(var)]
if missing_vars:
return False, f"Missing required environment variables: {', '.join(missing_vars)}"
return True, "Environment is properly configured"
def setup_database_connection():
"""Intenta establecer una conexión a la base de datos."""
if not DEPENDENCIES_AVAILABLE:
return None, "Dependencies not available"
try:
load_dotenv(override=True)
db_user = os.getenv("DB_USER")
db_password = os.getenv("DB_PASSWORD")
db_host = os.getenv("DB_HOST")
db_name = os.getenv("DB_NAME")
if not all([db_user, db_password, db_host, db_name]):
return None, "Missing database configuration"
logger.info(f"Connecting to database: {db_user}@{db_host}/{db_name}")
# Probar conexión
connection = pymysql.connect(
host=db_host,
user=db_user,
password=db_password,
database=db_name,
connect_timeout=5,
cursorclass=pymysql.cursors.DictCursor
)
connection.close()
# Si la conexión es exitosa, crear motor SQLAlchemy
db_uri = f"mysql+pymysql://{db_user}:{db_password}@{db_host}/{db_name}"
logger.info("Database connection successful")
return SQLDatabase.from_uri(db_uri), ""
except Exception as e:
error_msg = f"Error connecting to database: {str(e)}"
logger.error(error_msg)
return None, error_msg
def initialize_llm():
"""Inicializa el modelo de lenguaje."""
if not DEPENDENCIES_AVAILABLE:
return None, "Dependencies not available"
google_api_key = os.getenv("GOOGLE_API_KEY")
if not google_api_key:
return None, "GOOGLE_API_KEY not found in environment variables"
try:
llm = ChatGoogleGenerativeAI(
model="gemini-2.0-flash",
temperature=0,
google_api_key=google_api_key
)
logger.info("Google Generative AI initialized successfully")
return llm, ""
except Exception as e:
error_msg = f"Error initializing Google Generative AI: {str(e)}"
logger.error(error_msg)
return None, error_msg
def create_agent():
"""Crea el agente SQL si es posible."""
if not DEPENDENCIES_AVAILABLE:
return None, "Dependencies not available"
db, db_error = setup_database_connection()
llm, llm_error = initialize_llm()
if not db or not llm:
error_msg = " | ".join(filter(None, [db_error, llm_error]))
return None, f"Cannot create agent: {error_msg}"
try:
logger.info("Creating SQL agent...")
agent = create_sql_agent(
llm=llm,
db=db,
agent_type=AgentType.OPENAI_FUNCTIONS,
verbose=True
)
logger.info("SQL agent created successfully")
return agent, ""
except Exception as e:
error_msg = f"Error creating SQL agent: {str(e)}"
logger.error(error_msg)
return None, error_msg
# Inicializar el agente
agent, agent_error = create_agent()
db_connected = agent is not None
def extract_sql_query(text):
"""Extrae consultas SQL del texto usando expresiones regulares."""
if not text:
return None
# Buscar código SQL entre backticks
sql_match = re.search(r'```(?:sql)?\s*(.*?)```', text, re.DOTALL)
if sql_match:
return sql_match.group(1).strip()
# Si no hay backticks, buscar una consulta SQL simple
sql_match = re.search(r'(SELECT|INSERT|UPDATE|DELETE|CREATE|ALTER|DROP|TRUNCATE).*?;', text, re.IGNORECASE | re.DOTALL)
if sql_match:
return sql_match.group(0).strip()
return None
def execute_sql_query(query, db_connection):
"""Ejecuta una consulta SQL y devuelve los resultados como una cadena."""
if not db_connection:
return "Error: No hay conexión a la base de datos"
try:
with db_connection._engine.connect() as connection:
result = connection.execute(query)
rows = result.fetchall()
# Convertir los resultados a un formato legible
if not rows:
return "La consulta no devolvió resultados"
# Si es un solo resultado, devolverlo directamente
if len(rows) == 1 and len(rows[0]) == 1:
return str(rows[0][0])
# Si hay múltiples filas, formatear como tabla
try:
import pandas as pd
df = pd.DataFrame(rows)
return df.to_markdown(index=False)
except ImportError:
# Si pandas no está disponible, usar formato simple
return "\n".join([str(row) for row in rows])
except Exception as e:
return f"Error ejecutando la consulta: {str(e)}"
def generate_plot(data, x_col, y_col, title, x_label, y_label):
"""Generate a plot from data and return the file path."""
plt.figure(figsize=(10, 6))
plt.bar(data[x_col], data[y_col])
plt.title(title)
plt.xlabel(x_label)
plt.ylabel(y_label)
plt.xticks(rotation=45)
plt.tight_layout()
# Save to a temporary file
temp_dir = tempfile.mkdtemp()
plot_path = os.path.join(temp_dir, "plot.png")
plt.savefig(plot_path)
plt.close()
return plot_path
async def stream_agent_response(question: str, chat_history: List) -> Tuple[List, Dict]:
"""Procesa la pregunta del usuario y devuelve la respuesta del agente."""
if not agent:
error_msg = (
"## ⚠️ Error: Agente no inicializado\n\n"
"No se pudo inicializar el agente de base de datos. Por favor, verifica que:\n"
"1. Todas las variables de entorno estén configuradas correctamente\n"
"2. La base de datos esté accesible\n"
f"3. El modelo de lenguaje esté disponible\n\n"
f"Error: {agent_error}"
)
return chat_history + [[question, error_msg]], gr.update(visible=False)
try:
# Agregar un mensaje de "pensando"
chat_history = chat_history + [[question, None]]
yield chat_history, gr.update(visible=False)
# Ejecutar el agente
response = await agent.ainvoke({"input": question, "chat_history": chat_history[:-1]})
# Procesar la respuesta
if hasattr(response, 'output'):
response_text = response.output
# Verificar si la respuesta contiene una consulta SQL
sql_query = extract_sql_query(response_text)
if sql_query:
# Ejecutar la consulta y actualizar la respuesta
db_connection, _ = setup_database_connection()
query_result = execute_sql_query(sql_query, db_connection)
response_text += f"\n\n### 🔍 Resultado de la consulta:\n```sql\n{sql_query}\n```\n\n{query_result}"
else:
response_text = "Error: No se recibió respuesta del agente."
# Actualizar el historial con la respuesta completa
chat_history[-1][1] = response_text
return chat_history, gr.update(visible=False)
except Exception as e:
error_msg = f"## ❌ Error\n\nOcurrió un error al procesar tu solicitud:\n\n```\n{str(e)}\n```"
chat_history[-1][1] = error_msg
return chat_history, gr.update(visible=False)
# Custom CSS for the app
custom_css = """
.gradio-container {
max-width: 1200px !important;
margin: 0 auto !important;
font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, Oxygen, Ubuntu, Cantarell, sans-serif;
}
#chatbot {
min-height: 500px;
border: 1px solid #e0e0e0;
border-radius: 8px;
margin-bottom: 20px;
padding: 20px;
background-color: #f9f9f9;
}
.user-message, .bot-message {
padding: 12px 16px;
border-radius: 18px;
margin: 8px 0;
max-width: 80%;
line-height: 1.5;
}
.user-message {
background-color: #007bff;
color: white;
margin-left: auto;
border-bottom-right-radius: 4px;
}
.bot-message {
background-color: #f1f1f1;
color: #333;
margin-right: auto;
border-bottom-left-radius: 4px;
}
#question-input textarea {
min-height: 50px !important;
border-radius: 8px !important;
padding: 12px !important;
font-size: 16px !important;
}
#send-button {
height: 100%;
background-color: #007bff !important;
color: white !important;
border: none !important;
border-radius: 8px !important;
font-weight: 500 !important;
transition: background-color 0.2s !important;
}
#send-button:hover {
background-color: #0056b3 !important;
}
.status-message {
text-align: center;
color: #666;
font-style: italic;
margin: 10px 0;
}
"""
def create_ui():
"""Crea y devuelve los componentes de la interfaz de usuario de Gradio."""
# Verificar el estado del entorno
env_ok, env_message = check_environment()
# Crear el tema personalizado
theme = gr.themes.Soft(
primary_hue="blue",
secondary_hue="indigo",
neutral_hue="slate"
)
with gr.Blocks(
css=custom_css,
title="Asistente de Base de Datos SQL",
theme=theme
) as demo:
# Encabezado
gr.Markdown("""
# 🤖 Asistente de Base de Datos SQL
Haz preguntas en lenguaje natural sobre tu base de datos y obtén resultados de consultas SQL.
""")
# Mensaje de estado
if not env_ok:
gr.Warning("⚠️ " + env_message)
with gr.Accordion("ℹ️ Estado del sistema", open=not env_ok):
if not DEPENDENCIES_AVAILABLE:
gr.Markdown("""
## ❌ Dependencias faltantes
Para ejecutar esta aplicación localmente, necesitas instalar las dependencias:
```bash
pip install -r requirements.txt
```
""")
else:
if not agent:
gr.Markdown(f"""
## ⚠️ Configuración incompleta
No se pudo inicializar el agente de base de datos. Por favor, verifica que:
1. Todas las variables de entorno estén configuradas correctamente
2. La base de datos esté accesible
3. La API de Google Gemini esté configurada
**Error:** {agent_error if agent_error else 'No se pudo determinar el error'}
### Configuración local
Crea un archivo `.env` en la raíz del proyecto con las siguientes variables:
```
DB_USER=tu_usuario
DB_PASSWORD=tu_contraseña
DB_HOST=tu_servidor
DB_NAME=tu_base_de_datos
GOOGLE_API_KEY=tu_api_key_de_google
```
""")
else:
gr.Markdown("""
## ✅ Sistema listo
El asistente está listo para responder tus preguntas sobre la base de datos.
""")
# Interfaz de chat
chatbot = gr.Chatbot(
elem_id="chatbot",
show_label=False,
height=500,
bubble_full_width=False,
avatar_images=(
"https://i.imgur.com/8O1mCJx.png", # User avatar
"https://i.imgur.com/7I12Ybh.png" # Bot avatar
),
render_markdown=True,
show_copy_button=True,
show_share_button=True,
likeable=True
)
# Área de entrada
with gr.Row():
question_input = gr.Textbox(
label="",
placeholder="Escribe tu pregunta sobre la base de datos...",
elem_id="question-input",
container=False,
scale=5,
min_width=300,
max_lines=3,
autofocus=True
)
submit_button = gr.Button(
"Enviar",
elem_id="send-button",
min_width=100,
scale=1,
variant="primary"
)
# Información del sistema (solo para depuración)
with gr.Accordion("🔍 Información de depuración", open=False):
gr.Markdown("""
### Estado del sistema
- **Base de datos**: {}
- **Modelo**: {}
- **Modo**: {}
""".format(
f"Conectado a {os.getenv('DB_HOST')}/{os.getenv('DB_NAME')}" if db_connected else "No conectado",
"gemini-2.0-flash" if agent else "No disponible",
"Completo" if agent else "Demo (sin conexión a base de datos)"
))
# Mostrar variables de entorno (solo para depuración)
if os.getenv("SHOW_ENV_DEBUG", "false").lower() == "true":
env_vars = {k: "***" if "PASS" in k or "KEY" in k else v
for k, v in os.environ.items()
if k.startswith(('DB_', 'GOOGLE_'))}
gr.Code(
json.dumps(env_vars, indent=2, ensure_ascii=False),
language="json",
label="Variables de entorno"
)
# Hidden component for streaming output
streaming_output_display = gr.Textbox(visible=False)
return demo, chatbot, question_input, submit_button, streaming_output_display
# Create the UI components
demo, chatbot, question_input, submit_button, streaming_output_display = create_ui()
def user_message(user_input: str, chat_history: List) -> Tuple[str, List]:
"""Add user message to chat history and clear input."""
if not user_input.strip():
return "", chat_history
logger.info(f"User message: {user_input}")
return "", chat_history + [[user_input, None]]
def bot_response(chat_history: List) -> Tuple[List, Dict]:
"""Get bot response and update chat history."""
if not chat_history or not chat_history[-1][0]:
return chat_history, gr.update(visible=False)
question = chat_history[-1][0]
logger.info(f"Processing question: {question}")
return stream_agent_response(question, chat_history[:-1])
# Event handlers
submit_click = submit_button.click(
fn=user_message,
inputs=[question_input, chatbot],
outputs=[question_input, chatbot],
queue=True
).then(
fn=bot_response,
inputs=[chatbot],
outputs=[chatbot, streaming_output_display],
api_name="ask"
)
question_input.submit(
fn=user_message,
inputs=[question_input, chatbot],
outputs=[question_input, chatbot],
queue=True
).then(
fn=bot_response,
inputs=[chatbot],
outputs=[chatbot, streaming_output_display]
)
# Configuración para Hugging Face Spaces
def get_app():
"""Obtiene la instancia de la aplicación Gradio para Hugging Face Spaces."""
# Verificar si estamos en un entorno de Hugging Face Spaces
if os.getenv('SPACE_ID'):
# Configuración específica para Spaces
demo.title = "🤖 Asistente de Base de Datos SQL (Demo)"
demo.description = """
Este es un demo del asistente de base de datos SQL.
Para usar la versión completa con conexión a base de datos, clona este espacio y configura las variables de entorno.
"""
return demo
# Para desarrollo local
if __name__ == "__main__":
# Configuración para desarrollo local
demo.queue(concurrency_count=5).launch(
server_name="0.0.0.0",
server_port=7860,
debug=True,
share=False,
show_api=True,
favicon_path=None,
show_error=True,
show_tips=True
)