Jeremy Live
INFO:__main__:Database connection successful INFO:__main__:Response generation complete
0369200
import os | |
import sys | |
import re | |
import gradio as gr | |
import json | |
import tempfile | |
import base64 | |
import io | |
from typing import List, Dict, Any, Optional, Tuple, Union | |
import logging | |
import pandas as pd | |
import plotly.express as px | |
import plotly.graph_objects as go | |
from plotly.subplots import make_subplots | |
try: | |
from sqlalchemy import text as sa_text | |
except Exception: | |
sa_text = None | |
try: | |
# Intentar importar dependencias opcionales | |
from langchain_community.agent_toolkits import create_sql_agent | |
from langchain_community.agent_toolkits.sql.toolkit import SQLDatabaseToolkit | |
from langchain_community.utilities import SQLDatabase | |
from langchain_google_genai import ChatGoogleGenerativeAI | |
from langchain.agents.agent_types import AgentType | |
from langchain.memory import ConversationBufferWindowMemory | |
from langchain_core.messages import AIMessage, HumanMessage, SystemMessage | |
import pymysql | |
from dotenv import load_dotenv | |
DEPENDENCIES_AVAILABLE = True | |
except ImportError as e: | |
logger.warning(f"Some dependencies are not available: {e}") | |
DEPENDENCIES_AVAILABLE = False | |
# Configuración de logging | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
# Configure logging | |
logging.basicConfig(level=logging.INFO) | |
def generate_chart(data: Union[Dict, List[Dict], pd.DataFrame], | |
chart_type: str, | |
x: str, | |
y: str = None, | |
title: str = "", | |
x_label: str = None, | |
y_label: str = None): | |
""" | |
Generate an interactive Plotly figure from data. | |
Args: | |
data: The data to plot (can be a list of dicts or a pandas DataFrame) | |
chart_type: Type of chart to generate (bar, line, pie, scatter, histogram) | |
x: Column name for x-axis (names for pie) | |
y: Column name for y-axis (values for pie) | |
title: Chart title | |
x_label: Label for x-axis | |
y_label: Label for y-axis | |
Returns: | |
A Plotly Figure object (interactive) or None on error | |
""" | |
try: | |
# Convert data to DataFrame if it's a list of dicts | |
if isinstance(data, list): | |
df = pd.DataFrame(data) | |
elif isinstance(data, dict): | |
df = pd.DataFrame([data]) | |
else: | |
df = data | |
if not isinstance(df, pd.DataFrame): | |
return None | |
# Generate the appropriate chart type | |
fig = None | |
if chart_type == 'bar': | |
fig = px.bar(df, x=x, y=y, title=title) | |
elif chart_type == 'line': | |
fig = px.line(df, x=x, y=y, title=title) | |
elif chart_type == 'pie': | |
fig = px.pie(df, names=x, values=y, title=title, hole=0) | |
elif chart_type == 'scatter': | |
fig = px.scatter(df, x=x, y=y, title=title) | |
elif chart_type == 'histogram': | |
fig = px.histogram(df, x=x, title=title) | |
else: | |
return None | |
# Update layout | |
fig.update_layout( | |
xaxis_title=x_label or x, | |
yaxis_title=y_label or (y if y != x else ''), | |
title=title or f"{chart_type.capitalize()} Chart of {x} vs {y}" if y else f"{chart_type.capitalize()} Chart of {x}", | |
template="plotly_white", | |
margin=dict(l=20, r=20, t=40, b=20), | |
height=400 | |
) | |
return fig | |
except Exception as e: | |
error_msg = f"Error generating chart: {str(e)}" | |
logger.error(error_msg, exc_info=True) | |
return None | |
logger = logging.getLogger(__name__) | |
def check_environment(): | |
"""Verifica si el entorno está configurado correctamente.""" | |
if not DEPENDENCIES_AVAILABLE: | |
return False, "Missing required Python packages. Please install them with: pip install -r requirements.txt" | |
# Verificar si estamos en un entorno con variables de entorno | |
required_vars = ["DB_USER", "DB_PASSWORD", "DB_HOST", "DB_NAME", "GOOGLE_API_KEY"] | |
missing_vars = [var for var in required_vars if not os.getenv(var)] | |
if missing_vars: | |
return False, f"Missing required environment variables: {', '.join(missing_vars)}" | |
return True, "Environment is properly configured" | |
def setup_database_connection(): | |
"""Intenta establecer una conexión a la base de datos.""" | |
if not DEPENDENCIES_AVAILABLE: | |
return None, "Dependencies not available" | |
try: | |
load_dotenv(override=True) | |
# Debug: Log all environment variables (without sensitive values) | |
logger.info("Environment variables:") | |
for key, value in os.environ.items(): | |
if any(s in key.lower() for s in ['pass', 'key', 'secret']): | |
logger.info(f" {key}: {'*' * 8} (hidden for security)") | |
else: | |
logger.info(f" {key}: {value}") | |
db_user = os.getenv("DB_USER") | |
db_password = os.getenv("DB_PASSWORD") | |
db_host = os.getenv("DB_HOST") | |
db_name = os.getenv("DB_NAME") | |
# Debug: Log database connection info (without password) | |
logger.info(f"Database connection attempt - Host: {db_host}, User: {db_user}, DB: {db_name}") | |
if not all([db_user, db_password, db_host, db_name]): | |
missing = [var for var, val in [ | |
("DB_USER", db_user), | |
("DB_PASSWORD", "*" if db_password else ""), | |
("DB_HOST", db_host), | |
("DB_NAME", db_name) | |
] if not val] | |
logger.error(f"Missing required database configuration: {', '.join(missing)}") | |
return None, f"Missing database configuration: {', '.join(missing)}" | |
if not all([db_user, db_password, db_host, db_name]): | |
return None, "Missing database configuration" | |
logger.info(f"Connecting to database: {db_user}@{db_host}/{db_name}") | |
# Probar conexión | |
connection = pymysql.connect( | |
host=db_host, | |
user=db_user, | |
password=db_password, | |
database=db_name, | |
connect_timeout=5, | |
cursorclass=pymysql.cursors.DictCursor | |
) | |
connection.close() | |
# Si la conexión es exitosa, crear motor SQLAlchemy | |
db_uri = f"mysql+pymysql://{db_user}:{db_password}@{db_host}/{db_name}" | |
logger.info("Database connection successful") | |
return SQLDatabase.from_uri(db_uri), "" | |
except Exception as e: | |
error_msg = f"Error connecting to database: {str(e)}" | |
logger.error(error_msg) | |
return None, error_msg | |
def initialize_llm(): | |
"""Inicializa el modelo de lenguaje.""" | |
if not DEPENDENCIES_AVAILABLE: | |
error_msg = "Dependencies not available. Make sure all required packages are installed." | |
logger.error(error_msg) | |
return None, error_msg | |
google_api_key = os.getenv("GOOGLE_API_KEY") | |
logger.info(f"GOOGLE_API_KEY found: {'Yes' if google_api_key else 'No'}") | |
if not google_api_key: | |
error_msg = "GOOGLE_API_KEY not found in environment variables. Please check your Hugging Face Space secrets." | |
logger.error(error_msg) | |
return None, error_msg | |
try: | |
logger.info("Initializing Google Generative AI...") | |
llm = ChatGoogleGenerativeAI( | |
model="gemini-2.0-flash", | |
temperature=0, | |
google_api_key=google_api_key, | |
convert_system_message_to_human=True # Convert system messages to human messages | |
) | |
# Test the model with a simple prompt | |
test_prompt = "Hello, this is a test." | |
logger.info(f"Testing model with prompt: {test_prompt}") | |
test_response = llm.invoke(test_prompt) | |
logger.info(f"Model test response: {str(test_response)[:100]}...") # Log first 100 chars | |
logger.info("Google Generative AI initialized successfully") | |
return llm, "" | |
except Exception as e: | |
error_msg = f"Error initializing Google Generative AI: {str(e)}" | |
logger.error(error_msg, exc_info=True) # Include full stack trace | |
return None, error_msg | |
def create_agent(): | |
"""Crea el agente SQL si es posible.""" | |
if not DEPENDENCIES_AVAILABLE: | |
error_msg = "Dependencies not available. Please check if all required packages are installed." | |
logger.error(error_msg) | |
return None, error_msg | |
logger.info("Starting agent creation process...") | |
def create_agent(llm, db_connection): | |
"""Create and return a SQL database agent with conversation memory.""" | |
if not llm: | |
error_msg = "Cannot create agent: LLM is not available" | |
logger.error(error_msg) | |
return None, error_msg | |
if not db_connection: | |
error_msg = "Cannot create agent: Database connection is not available" | |
logger.error(error_msg) | |
return None, error_msg | |
try: | |
logger.info("Creating SQL agent with memory...") | |
# Create conversation memory | |
memory = ConversationBufferWindowMemory( | |
memory_key="chat_history", | |
k=5, # Keep last 5 message exchanges in memory | |
return_messages=True, | |
output_key="output" | |
) | |
# Create the database toolkit with additional configuration | |
toolkit = SQLDatabaseToolkit( | |
db=db_connection, | |
llm=llm | |
) | |
# Create the agent with memory and more detailed configuration | |
agent = create_sql_agent( | |
llm=llm, | |
toolkit=toolkit, | |
agent_type=AgentType.OPENAI_FUNCTIONS, | |
verbose=True, | |
handle_parsing_errors=True, # Better error handling for parsing | |
max_iterations=10, # Limit the number of iterations | |
early_stopping_method="generate", # Stop early if the agent is stuck | |
memory=memory, # Add memory to the agent | |
return_intermediate_steps=True # Important for memory to work properly | |
) | |
# Test the agent with a simple query | |
logger.info("Testing agent with a simple query...") | |
try: | |
test_query = "SELECT 1" | |
test_result = agent.run(test_query) | |
logger.info(f"Agent test query successful: {str(test_result)[:200]}...") | |
except Exception as e: | |
logger.warning(f"Agent test query failed (this might be expected): {str(e)}") | |
# Continue even if test fails, as it might be due to model limitations | |
logger.info("SQL agent created successfully") | |
return agent, "" | |
except Exception as e: | |
error_msg = f"Error creating SQL agent: {str(e)}" | |
logger.error(error_msg, exc_info=True) | |
return None, error_msg | |
# Inicializar el agente | |
logger.info("="*50) | |
logger.info("Starting application initialization...") | |
logger.info(f"Python version: {sys.version}") | |
logger.info(f"Current working directory: {os.getcwd()}") | |
logger.info(f"Files in working directory: {os.listdir()}") | |
# Verificar las variables de entorno | |
logger.info("Checking environment variables...") | |
for var in ["DB_USER", "DB_PASSWORD", "DB_HOST", "DB_NAME", "GOOGLE_API_KEY"]: | |
logger.info(f"{var}: {'*' * 8 if os.getenv(var) else 'NOT SET'}") | |
# Initialize components | |
logger.info("Initializing database connection...") | |
db_connection, db_error = setup_database_connection() | |
if db_error: | |
logger.error(f"Failed to initialize database: {db_error}") | |
logger.info("Initializing language model...") | |
llm, llm_error = initialize_llm() | |
if llm_error: | |
logger.error(f"Failed to initialize language model: {llm_error}") | |
logger.info("Initializing agent...") | |
agent, agent_error = create_agent(llm, db_connection) | |
db_connected = agent is not None | |
if agent: | |
logger.info("Agent initialized successfully") | |
else: | |
logger.error(f"Failed to initialize agent: {agent_error}") | |
logger.info("="*50) | |
def looks_like_sql(s: str) -> bool: | |
"""Heuristic to check if a string looks like an executable SQL statement.""" | |
if not s: | |
return False | |
s_strip = s.strip().lstrip("-- ") | |
# common starters | |
return bool(re.match(r"^(WITH|SELECT|INSERT|UPDATE|DELETE|CREATE|ALTER|DROP|TRUNCATE)\b", s_strip, re.IGNORECASE)) | |
def extract_sql_query(text): | |
"""Extrae consultas SQL del texto. Acepta solo bloques etiquetados como ```sql | |
o cadenas que claramente parezcan SQL. Evita ejecutar texto genérico. | |
""" | |
if not text: | |
return None | |
# Buscar TODOS los bloques en backticks y elegir los que sean 'sql' | |
for m in re.finditer(r"```(\w+)?\s*(.*?)```", text, re.DOTALL | re.IGNORECASE): | |
lang = (m.group(1) or '').lower() | |
body = (m.group(2) or '').strip() | |
if lang in {"sql", "postgresql", "mysql"} and looks_like_sql(body): | |
return body | |
# Si no hay bloques etiquetados, buscar una consulta SQL simple con palabras clave | |
simple = re.search(r"(WITH|SELECT|INSERT|UPDATE|DELETE|CREATE|ALTER|DROP|TRUNCATE)[\s\S]*?;", text, re.IGNORECASE) | |
if simple: | |
candidate = simple.group(0).strip() | |
if looks_like_sql(candidate): | |
return candidate | |
return None | |
def execute_sql_query(query, db_connection): | |
"""Ejecuta una consulta SQL y devuelve los resultados como una cadena.""" | |
if not db_connection: | |
return "Error: No hay conexión a la base de datos" | |
try: | |
with db_connection._engine.connect() as connection: | |
# Ensure SQLAlchemy receives a SQL expression | |
if sa_text is not None and isinstance(query, str): | |
result = connection.execute(sa_text(query)) | |
else: | |
result = connection.execute(query) | |
rows = result.fetchall() | |
# Convertir los resultados a un formato legible | |
if not rows: | |
return "La consulta no devolvió resultados" | |
# Si es un solo resultado, devolverlo directamente | |
if len(rows) == 1 and len(rows[0]) == 1: | |
return str(rows[0][0]) | |
# Si hay múltiples filas, formatear como tabla | |
try: | |
import pandas as pd | |
df = pd.DataFrame(rows) | |
return df.to_markdown(index=False) | |
except ImportError: | |
# Si pandas no está disponible, usar formato simple | |
return "\n".join([str(row) for row in rows]) | |
except Exception as e: | |
return f"Error ejecutando la consulta: {str(e)}" | |
def generate_plot(data, x_col, y_col, title, x_label, y_label): | |
"""Generate a plot from data and return the file path.""" | |
plt.figure(figsize=(10, 6)) | |
plt.bar(data[x_col], data[y_col]) | |
plt.title(title) | |
plt.xlabel(x_label) | |
plt.ylabel(y_label) | |
plt.xticks(rotation=45) | |
plt.tight_layout() | |
# Save to a temporary file | |
temp_dir = tempfile.mkdtemp() | |
plot_path = os.path.join(temp_dir, "plot.png") | |
plt.savefig(plot_path) | |
plt.close() | |
return plot_path | |
def convert_to_messages_format(chat_history): | |
"""Convert chat history to the format expected by Gradio 5.x""" | |
if not chat_history: | |
return [] | |
messages = [] | |
# If the first element is a list, assume it's in the old format | |
if isinstance(chat_history[0], list): | |
for msg in chat_history: | |
if isinstance(msg, list) and len(msg) == 2: | |
# Format: [user_msg, bot_msg] | |
user_msg, bot_msg = msg | |
if user_msg: | |
messages.append({"role": "user", "content": user_msg}) | |
if bot_msg: | |
messages.append({"role": "assistant", "content": bot_msg}) | |
else: | |
# Assume it's already in the correct format or can be used as is | |
for msg in chat_history: | |
if isinstance(msg, dict) and "role" in msg and "content" in msg: | |
messages.append(msg) | |
elif isinstance(msg, str): | |
# If it's a string, assume it's a user message | |
messages.append({"role": "user", "content": msg}) | |
return messages | |
async def stream_agent_response(question: str, chat_history: List[List[str]]) -> Tuple[str, Optional["go.Figure"]]: | |
"""Procesa la pregunta del usuario y devuelve la respuesta del agente con memoria de conversación.""" | |
global agent # Make sure we can modify the agent's memory | |
# Initialize response | |
response_text = "" | |
chart_fig = None | |
messages = [] | |
# Add previous chat history in the correct format for the agent | |
for msg_pair in chat_history: | |
if len(msg_pair) >= 1 and msg_pair[0]: # User message | |
messages.append(HumanMessage(content=msg_pair[0])) | |
if len(msg_pair) >= 2 and msg_pair[1]: # Assistant message | |
messages.append(AIMessage(content=msg_pair[1])) | |
# Add current user's question | |
user_message = HumanMessage(content=question) | |
messages.append(user_message) | |
if not agent: | |
error_msg = ( | |
"## ⚠️ Error: Agente no inicializado\n\n" | |
"No se pudo inicializar el agente de base de datos. Por favor, verifica que:\n" | |
"1. Todas las variables de entorno estén configuradas correctamente\n" | |
"2. La base de datos esté accesible\n" | |
f"3. El modelo de lenguaje esté disponible\n\n" | |
f"Error: {agent_error}" | |
) | |
return error_msg, None | |
# Update the agent's memory with the full conversation history | |
try: | |
# Rebuild agent memory from chat history pairs | |
if hasattr(agent, 'memory') and agent.memory is not None: | |
agent.memory.clear() | |
for i in range(0, len(messages)-1, 2): # (user, assistant) | |
if i+1 < len(messages): | |
agent.memory.save_context( | |
{"input": messages[i].content}, | |
{"output": messages[i+1].content} | |
) | |
except Exception as e: | |
logger.error(f"Error updating agent memory: {str(e)}", exc_info=True) | |
try: | |
# Add empty assistant message that will be updated | |
assistant_message = {"role": "assistant", "content": ""} | |
messages.append(assistant_message) | |
# Execute the agent with proper error handling | |
try: | |
# Let the agent use its memory; don't pass raw chat_history | |
response = await agent.ainvoke({"input": question}) | |
logger.info(f"Agent response type: {type(response)}") | |
logger.info(f"Agent response content: {str(response)[:500]}...") | |
# Handle different response formats | |
if hasattr(response, 'output') and response.output: | |
response_text = response.output | |
elif isinstance(response, str): | |
response_text = response | |
elif hasattr(response, 'get') and callable(response.get) and 'output' in response: | |
response_text = response['output'] | |
else: | |
response_text = str(response) | |
logger.info(f"Extracted response text: {response_text[:200]}...") | |
# Check if the response contains an SQL query and it truly looks like SQL | |
sql_query = extract_sql_query(response_text) | |
if sql_query and looks_like_sql(sql_query): | |
logger.info(f"Detected SQL query: {sql_query}") | |
# Execute the query and update the response | |
db_connection, _ = setup_database_connection() | |
if db_connection: | |
query_result = execute_sql_query(sql_query, db_connection) | |
# Add the query and its result to the response | |
response_text += f"\n\n### 🔍 Resultado de la consulta:\n```sql\n{sql_query}\n```\n\n{query_result}" | |
# Try to generate an interactive chart if the result is tabular | |
try: | |
if isinstance(query_result, str) and '|' in query_result and '---' in query_result: | |
# Convert markdown table to DataFrame | |
from io import StringIO | |
import re | |
# Clean up the markdown table | |
lines = [line.strip() for line in query_result.split('\n') | |
if line.strip() and '---' not in line and '|' in line] | |
if len(lines) > 1: # At least header + 1 data row | |
# Get column names from the first line | |
columns = [col.strip() for col in lines[0].split('|')[1:-1]] | |
# Get data rows | |
data = [] | |
for line in lines[1:]: | |
values = [val.strip() for val in line.split('|')[1:-1]] | |
if len(values) == len(columns): | |
data.append(dict(zip(columns, values))) | |
if data and len(columns) >= 2: | |
# Determine chart type from user's question (supports pie chart) | |
q_lower = question.lower() | |
if any(k in q_lower for k in ["gráfico circular", "grafico circular", "pie", "pastel"]): | |
desired_type = 'pie' | |
elif any(k in q_lower for k in ["línea", "linea", "line"]): | |
desired_type = 'line' | |
elif any(k in q_lower for k in ["dispersión", "dispersion", "scatter"]): | |
desired_type = 'scatter' | |
elif any(k in q_lower for k in ["histograma", "histogram"]): | |
desired_type = 'histogram' | |
else: | |
desired_type = 'bar' | |
# Choose x/y columns (assume first is category, second numeric) | |
x_col = columns[0] | |
y_col = columns[1] | |
# Coerce numeric values for y | |
for row in data: | |
try: | |
row[y_col] = float(re.sub(r"[^0-9.\-]", "", str(row[y_col]))) | |
except Exception: | |
pass | |
chart_fig = generate_chart( | |
data=data, | |
chart_type=desired_type, | |
x=x_col, | |
y=y_col, | |
title=f"{y_col} por {x_col}" | |
) | |
except Exception as e: | |
logger.error(f"Error generating chart: {str(e)}", exc_info=True) | |
# Don't fail the whole request if chart generation fails | |
response_text += "\n\n⚠️ No se pudo generar la visualización de los datos." | |
else: | |
response_text += "\n\n⚠️ No se pudo conectar a la base de datos para ejecutar la consulta." | |
elif sql_query and not looks_like_sql(sql_query): | |
logger.info("Detected code block but it does not look like SQL; skipping execution.") | |
# Fallback: if user asked for a chart (e.g., pie) and we didn't get SQL or chart yet, | |
# parse the most recent assistant text for lines like "LABEL: NUMBER" (bulleted or plain). | |
if chart_fig is None: | |
q_lower = question.lower() | |
wants_chart = any(k in q_lower for k in ["gráfico", "grafico", "chart", "graph", "pastel", "pie"]) | |
if wants_chart and chat_history: | |
# Find the most recent assistant message with usable numeric pairs | |
candidate_text = "" | |
for pair in reversed(chat_history): | |
if len(pair) >= 2 and isinstance(pair[1], str) and pair[1].strip(): | |
candidate_text = pair[1] | |
break | |
if candidate_text: | |
raw_lines = candidate_text.split('\n') | |
# Normalize lines: strip bullets and markdown symbols | |
norm_lines = [] | |
for l in raw_lines: | |
s = l.strip() | |
if not s: | |
continue | |
s = s.lstrip("•*-\t ") | |
# Remove surrounding markdown emphasis from labels later | |
norm_lines.append(s) | |
data = [] | |
for l in norm_lines: | |
# Accept patterns like "**LABEL**: 123" or "LABEL: 1,234" | |
m = re.match(r"^(.+?):\s*([0-9][0-9.,]*)$", l) | |
if m: | |
label = m.group(1).strip() | |
# Strip common markdown emphasis | |
label = re.sub(r"[*_`]+", "", label).strip() | |
try: | |
val = float(m.group(2).replace(',', '')) | |
except Exception: | |
continue | |
data.append({"label": label, "value": val}) | |
if len(data) >= 2: | |
desired_type = 'pie' if any(k in q_lower for k in ["gráfico circular", "grafico circular", "pie", "pastel"]) else 'bar' | |
chart_fig = generate_chart( | |
data=data, | |
chart_type=desired_type, | |
x="label", | |
y="value", | |
title="Distribución" | |
) | |
# Update the assistant's message with the response | |
assistant_message["content"] = response_text | |
except Exception as e: | |
error_msg = f"Error al ejecutar el agente: {str(e)}" | |
logger.error(error_msg, exc_info=True) | |
assistant_message["content"] = f"## ❌ Error\n\n{error_msg}" | |
# Return the message in the correct format for Gradio Chatbot | |
# Format: list of tuples where each tuple is (user_msg, bot_msg) | |
# For a single response, we return [(None, message)] | |
message_content = "" | |
if isinstance(assistant_message, dict) and "content" in assistant_message: | |
message_content = assistant_message["content"] | |
elif isinstance(assistant_message, str): | |
message_content = assistant_message | |
else: | |
message_content = str(assistant_message) | |
# Return the assistant's response and an optional interactive chart figure | |
return message_content, chart_fig | |
except Exception as e: | |
error_msg = f"## ❌ Error\n\nOcurrió un error al procesar tu solicitud:\n\n```\n{str(e)}\n```" | |
logger.error(f"Error in stream_agent_response: {str(e)}", exc_info=True) | |
# Return error message and no chart | |
return error_msg, None | |
# Custom CSS for the app | |
custom_css = """ | |
.gradio-container { | |
max-width: 1200px !important; | |
margin: 0 auto !important; | |
font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, Oxygen, Ubuntu, Cantarell, sans-serif; | |
} | |
#chatbot { | |
min-height: 500px; | |
border: 1px solid #e0e0e0; | |
border-radius: 8px; | |
margin-bottom: 20px; | |
padding: 20px; | |
background-color: #f9f9f9; | |
} | |
.user-message, .bot-message { | |
padding: 12px 16px; | |
border-radius: 18px; | |
margin: 8px 0; | |
max-width: 80%; | |
line-height: 1.5; | |
} | |
.user-message { | |
background-color: #007bff; | |
color: white; | |
margin-left: auto; | |
border-bottom-right-radius: 4px; | |
} | |
.bot-message { | |
background-color: #f1f1f1; | |
color: #333; | |
margin-right: auto; | |
border-bottom-left-radius: 4px; | |
} | |
#question-input textarea { | |
min-height: 50px !important; | |
border-radius: 8px !important; | |
padding: 12px !important; | |
font-size: 16px !important; | |
} | |
#send-button { | |
height: 100%; | |
background-color: #007bff !important; | |
color: white !important; | |
border: none !important; | |
border-radius: 8px !important; | |
font-weight: 500 !important; | |
transition: background-color 0.2s !important; | |
} | |
#send-button:hover { | |
background-color: #0056b3 !important; | |
} | |
.status-message { | |
text-align: center; | |
color: #666; | |
font-style: italic; | |
margin: 10px 0; | |
} | |
""" | |
def create_ui(): | |
"""Crea y devuelve los componentes de la interfaz de usuario de Gradio.""" | |
# Verificar el estado del entorno | |
env_ok, env_message = check_environment() | |
# Crear el tema personalizado | |
theme = gr.themes.Soft( | |
primary_hue="blue", | |
secondary_hue="indigo", | |
neutral_hue="slate" | |
) | |
with gr.Blocks( | |
css=custom_css, | |
title="Asistente de Base de Datos SQL", | |
theme=theme | |
) as demo: | |
# Encabezado | |
gr.Markdown(""" | |
# 🤖 Asistente de Base de Datos SQL | |
Haz preguntas en lenguaje natural sobre tu base de datos y obtén resultados de consultas SQL. | |
""") | |
# Mensaje de estado | |
if not env_ok: | |
gr.Warning("⚠️ " + env_message) | |
# Create the chat interface | |
with gr.Row(): | |
chatbot = gr.Chatbot( | |
[], | |
elem_id="chatbot", | |
type="tuples", # keep current list-of-lists format | |
avatar_images=( | |
None, | |
(os.path.join(os.path.dirname(__file__), "logo.svg")), | |
), | |
height=600, | |
render_markdown=True, # Enable markdown rendering | |
show_label=False, | |
show_share_button=False, | |
container=True, | |
layout="panel" # Better layout for messages | |
) | |
# Chart display area (interactive Plotly figure) | |
chart_display = gr.Plot( | |
label="📊 Visualización", | |
) | |
# Input area | |
with gr.Row(): | |
question_input = gr.Textbox( | |
label="", | |
placeholder="Escribe tu pregunta aquí...", | |
container=False, | |
scale=5, | |
min_width=300, | |
max_lines=3, | |
autofocus=True, | |
elem_id="question-input" | |
) | |
submit_button = gr.Button( | |
"Enviar", | |
variant="primary", | |
min_width=100, | |
scale=1, | |
elem_id="send-button" | |
) | |
# System status | |
with gr.Accordion("ℹ️ Estado del sistema", open=not env_ok): | |
if not DEPENDENCIES_AVAILABLE: | |
gr.Markdown(""" | |
## ❌ Dependencias faltantes | |
Para ejecutar esta aplicación localmente, necesitas instalar las dependencias: | |
```bash | |
pip install -r requirements.txt | |
``` | |
""") | |
else: | |
if not agent: | |
gr.Markdown(f""" | |
## ⚠️ Configuración incompleta | |
No se pudo inicializar el agente de base de datos. Por favor, verifica que: | |
1. Todas las variables de entorno estén configuradas correctamente | |
2. La base de datos esté accesible | |
3. La API de Google Gemini esté configurada | |
**Error:** {agent_error if agent_error else 'No se pudo determinar el error'} | |
### Configuración local | |
Crea un archivo `.env` en la raíz del proyecto con las siguientes variables: | |
``` | |
DB_USER=tu_usuario | |
DB_PASSWORD=tu_contraseña | |
DB_HOST=tu_servidor | |
DB_NAME=tu_base_de_datos | |
GOOGLE_API_KEY=tu_api_key_de_google | |
``` | |
""") | |
else: | |
if os.getenv('SPACE_ID'): | |
# Modo demo en Hugging Face Spaces | |
gr.Markdown(""" | |
## 🚀 Modo Demo | |
Esta es una demostración del asistente de base de datos SQL. Para usar la versión completa con conexión a base de datos: | |
1. Clona este espacio en tu cuenta de Hugging Face | |
2. Configura las variables de entorno en la configuración del espacio: | |
- `DB_USER`: Tu usuario de base de datos | |
- `DB_PASSWORD`: Tu contraseña de base de datos | |
- `DB_HOST`: La dirección del servidor de base de datos | |
- `DB_NAME`: El nombre de la base de datos | |
- `GOOGLE_API_KEY`: Tu clave de API de Google Gemini | |
**Nota:** Actualmente estás en modo de solo demostración. | |
""") | |
else: | |
gr.Markdown(""" | |
## ✅ Sistema listo | |
El asistente está listo para responder tus preguntas sobre la base de datos. | |
""") | |
# Hidden component for streaming output | |
streaming_output_display = gr.Textbox(visible=False) | |
return demo, chatbot, chart_display, question_input, submit_button, streaming_output_display | |
def create_application(): | |
"""Create and configure the Gradio application.""" | |
# Create the UI components | |
demo, chatbot, chart_display, question_input, submit_button, streaming_output_display = create_ui() | |
def user_message(user_input: str, chat_history: List[List[str]]) -> Tuple[str, List[List[str]]]: | |
"""Add user message to chat history and clear input.""" | |
if not user_input.strip(): | |
return "", chat_history | |
logger.info(f"User message: {user_input}") | |
# Initialize chat history if needed | |
if chat_history is None: | |
chat_history = [] | |
# Add user message and empty assistant response | |
chat_history.append([user_input, None]) | |
# Clear the input | |
return "", chat_history | |
async def bot_response(chat_history: List[List[str]]) -> Tuple[List[List[str]], Optional[go.Figure]]: | |
"""Get bot response and update chat history and return optional chart figure.""" | |
if not chat_history: | |
return chat_history, None | |
# Get the last user message (first element of the last list if it exists) | |
if not chat_history[-1][0] or chat_history[-1][1] is not None: | |
return chat_history, None | |
try: | |
question = chat_history[-1][0] | |
logger.info(f"Processing question: {question}") | |
# Call the agent and get the response | |
assistant_message, chart_fig = await stream_agent_response(question, chat_history[:-1]) | |
# Update the assistant's message in the chat history | |
chat_history[-1] = [question, assistant_message] | |
logger.info("Response generation complete") | |
return chat_history, chart_fig | |
except Exception as e: | |
error_msg = f"## ❌ Error\n\nError al procesar la solicitud:\n\n```\n{str(e)}\n```" | |
logger.error(error_msg, exc_info=True) | |
if chat_history and len(chat_history[-1]) == 2 and chat_history[-1][1] is None: | |
chat_history[-1] = [chat_history[-1][0], error_msg] | |
return chat_history, None | |
# Event handlers | |
with demo: | |
# Handle form submission | |
msg_submit = question_input.submit( | |
fn=user_message, | |
inputs=[question_input, chatbot], | |
outputs=[question_input, chatbot], | |
queue=True | |
).then( | |
fn=bot_response, | |
inputs=[chatbot], | |
outputs=[chatbot, chart_display], | |
api_name="ask" | |
) | |
# Handle button click | |
btn_click = submit_button.click( | |
fn=user_message, | |
inputs=[question_input, chatbot], | |
outputs=[question_input, chatbot], | |
queue=True | |
).then( | |
fn=bot_response, | |
inputs=[chatbot], | |
outputs=[chatbot, chart_display] | |
) | |
return demo | |
# Create the application | |
demo = create_application() | |
# Configuración para Hugging Face Spaces | |
def get_app(): | |
"""Obtiene la instancia de la aplicación Gradio para Hugging Face Spaces.""" | |
# Verificar si estamos en un entorno de Hugging Face Spaces | |
if os.getenv('SPACE_ID'): | |
# Configuración específica para Spaces | |
demo.title = "🤖 Asistente de Base de Datos SQL (Demo)" | |
demo.description = """ | |
Este es un demo del asistente de base de datos SQL. | |
Para usar la versión completa con conexión a base de datos, clona este espacio y configura las variables de entorno. | |
""" | |
return demo | |
# Para desarrollo local | |
if __name__ == "__main__": | |
# Configuración para desarrollo local - versión simplificada para Gradio 5.x | |
demo.launch( | |
server_name="0.0.0.0", | |
server_port=7860, | |
debug=True, | |
share=False | |
) | |