|
import os |
|
import sys |
|
import re |
|
import gradio as gr |
|
import json |
|
import tempfile |
|
import base64 |
|
import io |
|
from typing import List, Dict, Any, Optional, Tuple, Union |
|
import logging |
|
import pandas as pd |
|
import plotly.express as px |
|
import plotly.graph_objects as go |
|
from plotly.subplots import make_subplots |
|
|
|
try: |
|
|
|
from langchain_community.agent_toolkits import create_sql_agent |
|
from langchain_community.agent_toolkits.sql.toolkit import SQLDatabaseToolkit |
|
from langchain_community.utilities import SQLDatabase |
|
from langchain_google_genai import ChatGoogleGenerativeAI |
|
from langchain.agents.agent_types import AgentType |
|
from langchain.memory import ConversationBufferWindowMemory |
|
from langchain_core.messages import AIMessage, HumanMessage, SystemMessage |
|
import pymysql |
|
from dotenv import load_dotenv |
|
|
|
DEPENDENCIES_AVAILABLE = True |
|
except ImportError as e: |
|
logger.warning(f"Some dependencies are not available: {e}") |
|
DEPENDENCIES_AVAILABLE = False |
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
|
|
def generate_chart(data: Union[Dict, List[Dict], pd.DataFrame], |
|
chart_type: str, |
|
x: str, |
|
y: str = None, |
|
title: str = "", |
|
x_label: str = None, |
|
y_label: str = None) -> str: |
|
""" |
|
Generate a chart from data and return it as a base64 encoded image. |
|
|
|
Args: |
|
data: The data to plot (can be a list of dicts or a pandas DataFrame) |
|
chart_type: Type of chart to generate (bar, line, pie, scatter, histogram) |
|
x: Column name for x-axis |
|
y: Column name for y-axis (not needed for pie charts) |
|
title: Chart title |
|
x_label: Label for x-axis |
|
y_label: Label for y-axis |
|
|
|
Returns: |
|
Markdown string with embedded image |
|
""" |
|
try: |
|
|
|
if isinstance(data, list): |
|
df = pd.DataFrame(data) |
|
elif isinstance(data, dict): |
|
df = pd.DataFrame([data]) |
|
else: |
|
df = data |
|
|
|
if not isinstance(df, pd.DataFrame): |
|
return "Error: Data must be a dictionary, list of dictionaries, or pandas DataFrame" |
|
|
|
|
|
fig = None |
|
if chart_type == 'bar': |
|
fig = px.bar(df, x=x, y=y, title=title) |
|
elif chart_type == 'line': |
|
fig = px.line(df, x=x, y=y, title=title) |
|
elif chart_type == 'pie': |
|
fig = px.pie(df, names=x, values=y, title=title) |
|
elif chart_type == 'scatter': |
|
fig = px.scatter(df, x=x, y=y, title=title) |
|
elif chart_type == 'histogram': |
|
fig = px.histogram(df, x=x, title=title) |
|
else: |
|
return "Error: Unsupported chart type. Use 'bar', 'line', 'pie', 'scatter', or 'histogram'" |
|
|
|
|
|
fig.update_layout( |
|
xaxis_title=x_label or x, |
|
yaxis_title=y_label or (y if y != x else ''), |
|
title=title or f"{chart_type.capitalize()} Chart of {x} vs {y}" if y else f"{chart_type.capitalize()} Chart of {x}", |
|
template="plotly_white", |
|
margin=dict(l=20, r=20, t=40, b=20), |
|
height=400 |
|
) |
|
|
|
|
|
temp_file = tempfile.NamedTemporaryFile(delete=False, suffix='.png') |
|
fig.write_image(temp_file.name, format='png', engine='kaleido') |
|
|
|
|
|
with open(temp_file.name, 'rb') as img_file: |
|
img_base64 = base64.b64encode(img_file.read()).decode('utf-8') |
|
|
|
|
|
os.unlink(temp_file.name) |
|
|
|
|
|
return f'<img src="data:image/png;base64,{img_base64}" style="max-width:100%;"/>' |
|
|
|
except Exception as e: |
|
error_msg = f"Error generating chart: {str(e)}" |
|
logger.error(error_msg, exc_info=True) |
|
return f"<div style='color: red;'>{error_msg}</div>" |
|
logger = logging.getLogger(__name__) |
|
|
|
def check_environment(): |
|
"""Verifica si el entorno está configurado correctamente.""" |
|
if not DEPENDENCIES_AVAILABLE: |
|
return False, "Missing required Python packages. Please install them with: pip install -r requirements.txt" |
|
|
|
|
|
required_vars = ["DB_USER", "DB_PASSWORD", "DB_HOST", "DB_NAME", "GOOGLE_API_KEY"] |
|
missing_vars = [var for var in required_vars if not os.getenv(var)] |
|
|
|
if missing_vars: |
|
return False, f"Missing required environment variables: {', '.join(missing_vars)}" |
|
|
|
return True, "Environment is properly configured" |
|
|
|
def setup_database_connection(): |
|
"""Intenta establecer una conexión a la base de datos.""" |
|
if not DEPENDENCIES_AVAILABLE: |
|
return None, "Dependencies not available" |
|
|
|
try: |
|
load_dotenv(override=True) |
|
|
|
|
|
logger.info("Environment variables:") |
|
for key, value in os.environ.items(): |
|
if any(s in key.lower() for s in ['pass', 'key', 'secret']): |
|
logger.info(f" {key}: {'*' * 8} (hidden for security)") |
|
else: |
|
logger.info(f" {key}: {value}") |
|
|
|
db_user = os.getenv("DB_USER") |
|
db_password = os.getenv("DB_PASSWORD") |
|
db_host = os.getenv("DB_HOST") |
|
db_name = os.getenv("DB_NAME") |
|
|
|
|
|
logger.info(f"Database connection attempt - Host: {db_host}, User: {db_user}, DB: {db_name}") |
|
if not all([db_user, db_password, db_host, db_name]): |
|
missing = [var for var, val in [ |
|
("DB_USER", db_user), |
|
("DB_PASSWORD", "*" if db_password else ""), |
|
("DB_HOST", db_host), |
|
("DB_NAME", db_name) |
|
] if not val] |
|
logger.error(f"Missing required database configuration: {', '.join(missing)}") |
|
return None, f"Missing database configuration: {', '.join(missing)}" |
|
|
|
if not all([db_user, db_password, db_host, db_name]): |
|
return None, "Missing database configuration" |
|
|
|
logger.info(f"Connecting to database: {db_user}@{db_host}/{db_name}") |
|
|
|
|
|
connection = pymysql.connect( |
|
host=db_host, |
|
user=db_user, |
|
password=db_password, |
|
database=db_name, |
|
connect_timeout=5, |
|
cursorclass=pymysql.cursors.DictCursor |
|
) |
|
connection.close() |
|
|
|
|
|
db_uri = f"mysql+pymysql://{db_user}:{db_password}@{db_host}/{db_name}" |
|
logger.info("Database connection successful") |
|
return SQLDatabase.from_uri(db_uri), "" |
|
|
|
except Exception as e: |
|
error_msg = f"Error connecting to database: {str(e)}" |
|
logger.error(error_msg) |
|
return None, error_msg |
|
|
|
def initialize_llm(): |
|
"""Inicializa el modelo de lenguaje.""" |
|
if not DEPENDENCIES_AVAILABLE: |
|
error_msg = "Dependencies not available. Make sure all required packages are installed." |
|
logger.error(error_msg) |
|
return None, error_msg |
|
|
|
google_api_key = os.getenv("GOOGLE_API_KEY") |
|
logger.info(f"GOOGLE_API_KEY found: {'Yes' if google_api_key else 'No'}") |
|
|
|
if not google_api_key: |
|
error_msg = "GOOGLE_API_KEY not found in environment variables. Please check your Hugging Face Space secrets." |
|
logger.error(error_msg) |
|
return None, error_msg |
|
|
|
try: |
|
logger.info("Initializing Google Generative AI...") |
|
llm = ChatGoogleGenerativeAI( |
|
model="gemini-2.0-flash", |
|
temperature=0, |
|
google_api_key=google_api_key, |
|
convert_system_message_to_human=True |
|
) |
|
|
|
|
|
test_prompt = "Hello, this is a test." |
|
logger.info(f"Testing model with prompt: {test_prompt}") |
|
test_response = llm.invoke(test_prompt) |
|
logger.info(f"Model test response: {str(test_response)[:100]}...") |
|
|
|
logger.info("Google Generative AI initialized successfully") |
|
return llm, "" |
|
|
|
except Exception as e: |
|
error_msg = f"Error initializing Google Generative AI: {str(e)}" |
|
logger.error(error_msg, exc_info=True) |
|
return None, error_msg |
|
|
|
def create_agent(): |
|
"""Crea el agente SQL si es posible.""" |
|
if not DEPENDENCIES_AVAILABLE: |
|
error_msg = "Dependencies not available. Please check if all required packages are installed." |
|
logger.error(error_msg) |
|
return None, error_msg |
|
|
|
logger.info("Starting agent creation process...") |
|
|
|
def create_agent(llm, db_connection): |
|
"""Create and return a SQL database agent with conversation memory.""" |
|
if not llm: |
|
error_msg = "Cannot create agent: LLM is not available" |
|
logger.error(error_msg) |
|
return None, error_msg |
|
|
|
if not db_connection: |
|
error_msg = "Cannot create agent: Database connection is not available" |
|
logger.error(error_msg) |
|
return None, error_msg |
|
|
|
try: |
|
logger.info("Creating SQL agent with memory...") |
|
|
|
|
|
memory = ConversationBufferWindowMemory( |
|
memory_key="chat_history", |
|
k=5, |
|
return_messages=True, |
|
output_key="output" |
|
) |
|
|
|
|
|
toolkit = SQLDatabaseToolkit( |
|
db=db_connection, |
|
llm=llm |
|
) |
|
|
|
|
|
agent = create_sql_agent( |
|
llm=llm, |
|
toolkit=toolkit, |
|
agent_type=AgentType.OPENAI_FUNCTIONS, |
|
verbose=True, |
|
handle_parsing_errors=True, |
|
max_iterations=10, |
|
early_stopping_method="generate", |
|
memory=memory, |
|
return_intermediate_steps=True |
|
) |
|
|
|
|
|
logger.info("Testing agent with a simple query...") |
|
try: |
|
test_query = "SELECT 1" |
|
test_result = agent.run(test_query) |
|
logger.info(f"Agent test query successful: {str(test_result)[:200]}...") |
|
except Exception as e: |
|
logger.warning(f"Agent test query failed (this might be expected): {str(e)}") |
|
|
|
|
|
logger.info("SQL agent created successfully") |
|
return agent, "" |
|
|
|
except Exception as e: |
|
error_msg = f"Error creating SQL agent: {str(e)}" |
|
logger.error(error_msg, exc_info=True) |
|
return None, error_msg |
|
|
|
|
|
logger.info("="*50) |
|
logger.info("Starting application initialization...") |
|
logger.info(f"Python version: {sys.version}") |
|
logger.info(f"Current working directory: {os.getcwd()}") |
|
logger.info(f"Files in working directory: {os.listdir()}") |
|
|
|
|
|
logger.info("Checking environment variables...") |
|
for var in ["DB_USER", "DB_PASSWORD", "DB_HOST", "DB_NAME", "GOOGLE_API_KEY"]: |
|
logger.info(f"{var}: {'*' * 8 if os.getenv(var) else 'NOT SET'}") |
|
|
|
|
|
logger.info("Initializing database connection...") |
|
db_connection, db_error = setup_database_connection() |
|
if db_error: |
|
logger.error(f"Failed to initialize database: {db_error}") |
|
|
|
logger.info("Initializing language model...") |
|
llm, llm_error = initialize_llm() |
|
if llm_error: |
|
logger.error(f"Failed to initialize language model: {llm_error}") |
|
|
|
logger.info("Initializing agent...") |
|
agent, agent_error = create_agent(llm, db_connection) |
|
db_connected = agent is not None |
|
|
|
if agent: |
|
logger.info("Agent initialized successfully") |
|
else: |
|
logger.error(f"Failed to initialize agent: {agent_error}") |
|
|
|
logger.info("="*50) |
|
|
|
def extract_sql_query(text): |
|
"""Extrae consultas SQL del texto usando expresiones regulares.""" |
|
if not text: |
|
return None |
|
|
|
|
|
sql_match = re.search(r'```(?:sql)?\s*(.*?)```', text, re.DOTALL) |
|
if sql_match: |
|
return sql_match.group(1).strip() |
|
|
|
|
|
sql_match = re.search(r'(SELECT|INSERT|UPDATE|DELETE|CREATE|ALTER|DROP|TRUNCATE).*?;', text, re.IGNORECASE | re.DOTALL) |
|
if sql_match: |
|
return sql_match.group(0).strip() |
|
|
|
return None |
|
|
|
def execute_sql_query(query, db_connection): |
|
"""Ejecuta una consulta SQL y devuelve los resultados como una cadena.""" |
|
if not db_connection: |
|
return "Error: No hay conexión a la base de datos" |
|
|
|
try: |
|
with db_connection._engine.connect() as connection: |
|
result = connection.execute(query) |
|
rows = result.fetchall() |
|
|
|
|
|
if not rows: |
|
return "La consulta no devolvió resultados" |
|
|
|
|
|
if len(rows) == 1 and len(rows[0]) == 1: |
|
return str(rows[0][0]) |
|
|
|
|
|
try: |
|
import pandas as pd |
|
df = pd.DataFrame(rows) |
|
return df.to_markdown(index=False) |
|
except ImportError: |
|
|
|
return "\n".join([str(row) for row in rows]) |
|
|
|
except Exception as e: |
|
return f"Error ejecutando la consulta: {str(e)}" |
|
|
|
def generate_plot(data, x_col, y_col, title, x_label, y_label): |
|
"""Generate a plot from data and return the file path.""" |
|
plt.figure(figsize=(10, 6)) |
|
plt.bar(data[x_col], data[y_col]) |
|
plt.title(title) |
|
plt.xlabel(x_label) |
|
plt.ylabel(y_label) |
|
plt.xticks(rotation=45) |
|
plt.tight_layout() |
|
|
|
|
|
temp_dir = tempfile.mkdtemp() |
|
plot_path = os.path.join(temp_dir, "plot.png") |
|
plt.savefig(plot_path) |
|
plt.close() |
|
|
|
return plot_path |
|
|
|
def convert_to_messages_format(chat_history): |
|
"""Convert chat history to the format expected by Gradio 5.x""" |
|
if not chat_history: |
|
return [] |
|
|
|
messages = [] |
|
|
|
|
|
if isinstance(chat_history[0], list): |
|
for msg in chat_history: |
|
if isinstance(msg, list) and len(msg) == 2: |
|
|
|
user_msg, bot_msg = msg |
|
if user_msg: |
|
messages.append({"role": "user", "content": user_msg}) |
|
if bot_msg: |
|
messages.append({"role": "assistant", "content": bot_msg}) |
|
else: |
|
|
|
for msg in chat_history: |
|
if isinstance(msg, dict) and "role" in msg and "content" in msg: |
|
messages.append(msg) |
|
elif isinstance(msg, str): |
|
|
|
messages.append({"role": "user", "content": msg}) |
|
|
|
return messages |
|
|
|
async def stream_agent_response(question: str, chat_history: List[Tuple[str, str]]) -> str: |
|
"""Procesa la pregunta del usuario y devuelve la respuesta del agente con memoria de conversación.""" |
|
global agent |
|
|
|
|
|
response_text = "" |
|
messages = [] |
|
|
|
|
|
for user_msg, assistant_msg in chat_history: |
|
if user_msg: |
|
messages.append(HumanMessage(content=user_msg)) |
|
if assistant_msg: |
|
messages.append(AIMessage(content=assistant_msg)) |
|
|
|
|
|
user_message = HumanMessage(content=question) |
|
messages.append(user_message) |
|
|
|
if not agent: |
|
error_msg = ( |
|
"## ⚠️ Error: Agente no inicializado\n\n" |
|
"No se pudo inicializar el agente de base de datos. Por favor, verifica que:\n" |
|
"1. Todas las variables de entorno estén configuradas correctamente\n" |
|
"2. La base de datos esté accesible\n" |
|
f"3. El modelo de lenguaje esté disponible\n\n" |
|
f"Error: {agent_error}" |
|
) |
|
assistant_message = {"role": "assistant", "content": error_msg} |
|
return [assistant_message] |
|
|
|
|
|
try: |
|
|
|
if hasattr(agent, 'memory') and agent.memory is not None: |
|
agent.memory.clear() |
|
|
|
|
|
for i in range(0, len(messages)-1, 2): |
|
if i+1 < len(messages): |
|
agent.memory.save_context( |
|
{"input": messages[i].content}, |
|
{"output": messages[i+1].content} |
|
) |
|
except Exception as e: |
|
logger.error(f"Error updating agent memory: {str(e)}", exc_info=True) |
|
|
|
try: |
|
|
|
assistant_message = {"role": "assistant", "content": ""} |
|
messages.append(assistant_message) |
|
|
|
|
|
try: |
|
response = await agent.ainvoke({"input": question, "chat_history": chat_history}) |
|
logger.info(f"Agent response type: {type(response)}") |
|
logger.info(f"Agent response content: {str(response)[:500]}...") |
|
|
|
|
|
if hasattr(response, 'output') and response.output: |
|
response_text = response.output |
|
elif isinstance(response, str): |
|
response_text = response |
|
elif hasattr(response, 'get') and callable(response.get) and 'output' in response: |
|
response_text = response['output'] |
|
else: |
|
response_text = str(response) |
|
|
|
logger.info(f"Extracted response text: {response_text[:200]}...") |
|
|
|
|
|
sql_query = extract_sql_query(response_text) |
|
if sql_query: |
|
logger.info(f"Detected SQL query: {sql_query}") |
|
|
|
db_connection, _ = setup_database_connection() |
|
if db_connection: |
|
query_result = execute_sql_query(sql_query, db_connection) |
|
|
|
|
|
response_text += f"\n\n### 🔍 Resultado de la consulta:\n```sql\n{sql_query}\n```\n\n{query_result}" |
|
|
|
|
|
try: |
|
if isinstance(query_result, str) and '|' in query_result and '---' in query_result: |
|
|
|
from io import StringIO |
|
import re |
|
|
|
|
|
lines = [line.strip() for line in query_result.split('\n') |
|
if line.strip() and '---' not in line and '|' in line] |
|
if len(lines) > 1: |
|
|
|
columns = [col.strip() for col in lines[0].split('|')[1:-1]] |
|
|
|
data = [] |
|
for line in lines[1:]: |
|
values = [val.strip() for val in line.split('|')[1:-1]] |
|
if len(values) == len(columns): |
|
data.append(dict(zip(columns, values))) |
|
|
|
if data and len(columns) >= 2: |
|
|
|
chart_type = 'bar' |
|
if len(columns) == 2: |
|
|
|
chart_html = generate_chart( |
|
data=data, |
|
chart_type=chart_type, |
|
x=columns[0], |
|
y=columns[1], |
|
title=f"{columns[1]} por {columns[0]}", |
|
x_label=columns[0], |
|
y_label=columns[1] |
|
) |
|
response_text += f"\n\n### 📊 Visualización:\n{chart_html}" |
|
elif len(columns) > 2: |
|
|
|
chart_html = generate_chart( |
|
data=data, |
|
chart_type='line', |
|
x=columns[0], |
|
y=columns[1], |
|
title=f"{', '.join(columns[1:])} por {columns[0]}", |
|
x_label=columns[0], |
|
y_label=", ".join(columns[1:]) |
|
) |
|
response_text += f"\n\n### 📊 Visualización:\n{chart_html}" |
|
except Exception as e: |
|
logger.error(f"Error generating chart: {str(e)}", exc_info=True) |
|
|
|
response_text += "\n\n⚠️ No se pudo generar la visualización de los datos." |
|
else: |
|
response_text += "\n\n⚠️ No se pudo conectar a la base de datos para ejecutar la consulta." |
|
|
|
|
|
assistant_message["content"] = response_text |
|
|
|
except Exception as e: |
|
error_msg = f"Error al ejecutar el agente: {str(e)}" |
|
logger.error(error_msg, exc_info=True) |
|
assistant_message["content"] = f"## ❌ Error\n\n{error_msg}" |
|
|
|
|
|
|
|
|
|
message_content = "" |
|
|
|
if isinstance(assistant_message, dict) and "content" in assistant_message: |
|
message_content = assistant_message["content"] |
|
elif isinstance(assistant_message, str): |
|
message_content = assistant_message |
|
else: |
|
message_content = str(assistant_message) |
|
|
|
|
|
|
|
return message_content |
|
|
|
except Exception as e: |
|
error_msg = f"## ❌ Error\n\nOcurrió un error al procesar tu solicitud:\n\n```\n{str(e)}\n```" |
|
logger.error(f"Error in stream_agent_response: {str(e)}", exc_info=True) |
|
|
|
return [(None, error_msg)] |
|
|
|
|
|
custom_css = """ |
|
.gradio-container { |
|
max-width: 1200px !important; |
|
margin: 0 auto !important; |
|
font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, Oxygen, Ubuntu, Cantarell, sans-serif; |
|
} |
|
|
|
#chatbot { |
|
min-height: 500px; |
|
border: 1px solid #e0e0e0; |
|
border-radius: 8px; |
|
margin-bottom: 20px; |
|
padding: 20px; |
|
background-color: #f9f9f9; |
|
} |
|
|
|
.user-message, .bot-message { |
|
padding: 12px 16px; |
|
border-radius: 18px; |
|
margin: 8px 0; |
|
max-width: 80%; |
|
line-height: 1.5; |
|
} |
|
|
|
.user-message { |
|
background-color: #007bff; |
|
color: white; |
|
margin-left: auto; |
|
border-bottom-right-radius: 4px; |
|
} |
|
|
|
.bot-message { |
|
background-color: #f1f1f1; |
|
color: #333; |
|
margin-right: auto; |
|
border-bottom-left-radius: 4px; |
|
} |
|
|
|
#question-input textarea { |
|
min-height: 50px !important; |
|
border-radius: 8px !important; |
|
padding: 12px !important; |
|
font-size: 16px !important; |
|
} |
|
|
|
#send-button { |
|
height: 100%; |
|
background-color: #007bff !important; |
|
color: white !important; |
|
border: none !important; |
|
border-radius: 8px !important; |
|
font-weight: 500 !important; |
|
transition: background-color 0.2s !important; |
|
} |
|
|
|
#send-button:hover { |
|
background-color: #0056b3 !important; |
|
} |
|
|
|
.status-message { |
|
text-align: center; |
|
color: #666; |
|
font-style: italic; |
|
margin: 10px 0; |
|
} |
|
""" |
|
|
|
def create_ui(): |
|
"""Crea y devuelve los componentes de la interfaz de usuario de Gradio.""" |
|
|
|
env_ok, env_message = check_environment() |
|
|
|
|
|
theme = gr.themes.Soft( |
|
primary_hue="blue", |
|
secondary_hue="indigo", |
|
neutral_hue="slate" |
|
) |
|
|
|
with gr.Blocks( |
|
css=custom_css, |
|
title="Asistente de Base de Datos SQL", |
|
theme=theme |
|
) as demo: |
|
|
|
gr.Markdown(""" |
|
# 🤖 Asistente de Base de Datos SQL |
|
|
|
Haz preguntas en lenguaje natural sobre tu base de datos y obtén resultados de consultas SQL. |
|
""") |
|
|
|
|
|
if not env_ok: |
|
gr.Warning("⚠️ " + env_message) |
|
|
|
|
|
with gr.Row(): |
|
chatbot = gr.Chatbot( |
|
[], |
|
elem_id="chatbot", |
|
bubble_full_width=False, |
|
avatar_images=( |
|
None, |
|
(os.path.join(os.path.dirname(__file__), "logo.svg")), |
|
), |
|
height=600, |
|
render_markdown=True, |
|
show_label=False, |
|
show_share_button=False, |
|
container=True, |
|
layout="panel" |
|
) |
|
|
|
|
|
with gr.Row(): |
|
question_input = gr.Textbox( |
|
label="", |
|
placeholder="Escribe tu pregunta aquí...", |
|
container=False, |
|
scale=5, |
|
min_width=300, |
|
max_lines=3, |
|
autofocus=True, |
|
elem_id="question-input" |
|
) |
|
submit_button = gr.Button( |
|
"Enviar", |
|
variant="primary", |
|
min_width=100, |
|
scale=1, |
|
elem_id="send-button" |
|
) |
|
|
|
|
|
with gr.Accordion("ℹ️ Estado del sistema", open=not env_ok): |
|
if not DEPENDENCIES_AVAILABLE: |
|
gr.Markdown(""" |
|
## ❌ Dependencias faltantes |
|
|
|
Para ejecutar esta aplicación localmente, necesitas instalar las dependencias: |
|
|
|
```bash |
|
pip install -r requirements.txt |
|
``` |
|
""") |
|
else: |
|
if not agent: |
|
gr.Markdown(f""" |
|
## ⚠️ Configuración incompleta |
|
|
|
No se pudo inicializar el agente de base de datos. Por favor, verifica que: |
|
|
|
1. Todas las variables de entorno estén configuradas correctamente |
|
2. La base de datos esté accesible |
|
3. La API de Google Gemini esté configurada |
|
|
|
**Error:** {agent_error if agent_error else 'No se pudo determinar el error'} |
|
|
|
### Configuración local |
|
|
|
Crea un archivo `.env` en la raíz del proyecto con las siguientes variables: |
|
|
|
``` |
|
DB_USER=tu_usuario |
|
DB_PASSWORD=tu_contraseña |
|
DB_HOST=tu_servidor |
|
DB_NAME=tu_base_de_datos |
|
GOOGLE_API_KEY=tu_api_key_de_google |
|
``` |
|
""") |
|
else: |
|
if os.getenv('SPACE_ID'): |
|
|
|
gr.Markdown(""" |
|
## 🚀 Modo Demo |
|
|
|
Esta es una demostración del asistente de base de datos SQL. Para usar la versión completa con conexión a base de datos: |
|
|
|
1. Clona este espacio en tu cuenta de Hugging Face |
|
2. Configura las variables de entorno en la configuración del espacio: |
|
- `DB_USER`: Tu usuario de base de datos |
|
- `DB_PASSWORD`: Tu contraseña de base de datos |
|
- `DB_HOST`: La dirección del servidor de base de datos |
|
- `DB_NAME`: El nombre de la base de datos |
|
- `GOOGLE_API_KEY`: Tu clave de API de Google Gemini |
|
|
|
**Nota:** Actualmente estás en modo de solo demostración. |
|
""") |
|
else: |
|
gr.Markdown(""" |
|
## ✅ Sistema listo |
|
|
|
El asistente está listo para responder tus preguntas sobre la base de datos. |
|
""") |
|
|
|
|
|
streaming_output_display = gr.Textbox(visible=False) |
|
|
|
return demo, chatbot, question_input, submit_button, streaming_output_display |
|
|
|
def create_application(): |
|
"""Create and configure the Gradio application.""" |
|
|
|
demo, chatbot, question_input, submit_button, streaming_output_display = create_ui() |
|
|
|
def user_message(user_input: str, chat_history: List[Tuple[str, str]]) -> Tuple[str, List[Tuple[str, str]]]: |
|
"""Add user message to chat history and clear input.""" |
|
if not user_input.strip(): |
|
return "", chat_history |
|
|
|
logger.info(f"User message: {user_input}") |
|
|
|
|
|
if chat_history is None: |
|
chat_history = [] |
|
|
|
|
|
chat_history.append((user_input, None)) |
|
|
|
|
|
return "", chat_history |
|
|
|
async def bot_response(chat_history: List[Tuple[str, str]]) -> List[Tuple[str, str]]: |
|
"""Get bot response and update chat history.""" |
|
if not chat_history: |
|
return chat_history |
|
|
|
|
|
if not chat_history[-1][0] or chat_history[-1][1] is not None: |
|
return chat_history |
|
|
|
try: |
|
question = chat_history[-1][0] |
|
logger.info(f"Processing question: {question}") |
|
|
|
|
|
assistant_message = await stream_agent_response(question, chat_history[:-1]) |
|
|
|
|
|
chat_history[-1] = (question, assistant_message) |
|
|
|
logger.info("Response generation complete") |
|
return chat_history |
|
|
|
except Exception as e: |
|
error_msg = f"## ❌ Error\n\nError al procesar la solicitud:\n\n```\n{str(e)}\n```" |
|
logger.error(error_msg, exc_info=True) |
|
if chat_history and len(chat_history[-1]) == 2 and chat_history[-1][1] is None: |
|
chat_history[-1] = (chat_history[-1][0], error_msg) |
|
return chat_history |
|
|
|
|
|
with demo: |
|
|
|
msg_submit = question_input.submit( |
|
fn=user_message, |
|
inputs=[question_input, chatbot], |
|
outputs=[question_input, chatbot], |
|
queue=True |
|
).then( |
|
fn=bot_response, |
|
inputs=[chatbot], |
|
outputs=[chatbot], |
|
api_name="ask" |
|
) |
|
|
|
|
|
btn_click = submit_button.click( |
|
fn=user_message, |
|
inputs=[question_input, chatbot], |
|
outputs=[question_input, chatbot], |
|
queue=True |
|
).then( |
|
fn=bot_response, |
|
inputs=[chatbot], |
|
outputs=[chatbot] |
|
) |
|
|
|
return demo |
|
|
|
|
|
demo = create_application() |
|
|
|
|
|
def get_app(): |
|
"""Obtiene la instancia de la aplicación Gradio para Hugging Face Spaces.""" |
|
|
|
if os.getenv('SPACE_ID'): |
|
|
|
demo.title = "🤖 Asistente de Base de Datos SQL (Demo)" |
|
demo.description = """ |
|
Este es un demo del asistente de base de datos SQL. |
|
Para usar la versión completa con conexión a base de datos, clona este espacio y configura las variables de entorno. |
|
""" |
|
|
|
return demo |
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
demo.launch( |
|
server_name="0.0.0.0", |
|
server_port=7860, |
|
debug=True, |
|
share=False |
|
) |
|
|