PruebaBM / app.py
Danielbrdz's picture
Update app.py
3f5d163 verified
raw
history blame
9.78 kB
import gradio as gr
import os
import json
import requests
from datetime import datetime
import tempfile
import subprocess
import sys
GROQ_API_KEY = os.environ.get("GROQ_API_KEY")
GROQ_API_URL = "https://api.groq.com/openai/v1/chat/completions"
SYSTEM_MESSAGE = os.environ.get("System_Prompt")
MODEL_NAME = "meta-llama/llama-4-maverick-17b-128e-instruct"
MAX_TOKENS = 4096
TEMPERATURE = 0.7
TOP_P = 0.95
# Credenciales de MEGA
MEGA_EMAIL = os.environ.get("MEGA_EMAIL")
MEGA_PASSWORD = os.environ.get("MEGA_PASSWORD")
REMOTE_DESTINATION_FOLDER = "Conversations"
def install_megapy():
"""Instala megapy si no está disponible"""
try:
import megapy
return True
except ImportError:
try:
subprocess.check_call([sys.executable, "-m", "pip", "install", "megapy"])
import megapy
return True
except Exception as e:
print(f"No se pudo instalar megapy: {e}")
return False
def get_mega_client_simple():
"""Intenta conectar usando diferentes métodos"""
# Método 1: Intentar mega.py
try:
from mega import Mega
print("Usando mega.py...")
mega = Mega({'verbose': False})
m = mega.login(MEGA_EMAIL, MEGA_PASSWORD)
print("Conexión exitosa con mega.py")
return m, "mega.py"
except Exception as e:
print(f"mega.py falló: {e}")
# Método 2: Intentar megapy
if install_megapy():
try:
import megapy
print("Usando megapy...")
client = megapy.MegaApi()
# Aquí implementarías la lógica de megapy
print("megapy inicializado (requiere implementación adicional)")
return None, "megapy"
except Exception as e:
print(f"megapy falló: {e}")
# Método 3: Usar megatools (si está disponible)
try:
result = subprocess.run(['which', 'megaput'], capture_output=True, text=True)
if result.returncode == 0:
print("Encontrado megatools, usando método de línea de comandos")
return "megatools", "megatools"
except Exception as e:
print(f"megatools no disponible: {e}")
return None, None
def upload_with_megatools(file_path, remote_path):
"""Sube archivo usando megatools"""
try:
# Configurar credenciales
config_content = f"""[Login]
Username = {MEGA_EMAIL}
Password = {MEGA_PASSWORD}
"""
config_path = "/tmp/.megarc"
with open(config_path, 'w') as f:
f.write(config_content)
# Subir archivo
cmd = ['megaput', '--config', config_path, '--path', f'/{REMOTE_DESTINATION_FOLDER}', file_path]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode == 0:
print("Archivo subido exitosamente con megatools")
return True
else:
print(f"Error con megatools: {result.stderr}")
return False
except Exception as e:
print(f"Error usando megatools: {e}")
return False
def persist_data_simple(session_data):
"""Versión simplificada para guardar datos"""
if not MEGA_EMAIL or not MEGA_PASSWORD:
print("Warning: Credenciales de MEGA no configuradas.")
save_locally_as_backup(session_data)
return
try:
# Formatear el log de la conversación
formatted_log = ""
for user_msg, assistant_msg in session_data:
formatted_log += f"User: {user_msg}\n"
if assistant_msg:
formatted_log += f"Assistant: {assistant_msg}\n"
formatted_log += "-----\n"
# Generar nombre único para el archivo
timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
log_name = f"session_{timestamp}.log"
# Crear archivo temporal
with tempfile.NamedTemporaryFile(mode='w', suffix='.log', delete=False, encoding='utf-8') as temp_file:
temp_file.write(formatted_log)
temp_file_path = temp_file.name
try:
# Intentar diferentes métodos de subida
mega_client, method = get_mega_client_simple()
if method == "mega.py" and mega_client:
# Método original con mega.py
try:
files = mega_client.get_files()
conversations_folder_id = None
# Buscar carpeta Conversations
for file_id, file_info in files.items():
if file_info['a'] and file_info['a'].get('n') == REMOTE_DESTINATION_FOLDER:
conversations_folder_id = file_id
break
# Crear carpeta si no existe
if not conversations_folder_id:
folder = mega_client.create_folder(REMOTE_DESTINATION_FOLDER)
conversations_folder_id = folder[0]
# Subir archivo
uploaded_file = mega_client.upload(temp_file_path, conversations_folder_id)
mega_client.rename(uploaded_file, log_name)
print(f"Log guardado exitosamente en MEGA con mega.py: {log_name}")
except Exception as e:
print(f"Error con mega.py: {e}")
save_locally_as_backup(session_data)
elif method == "megatools":
# Usar megatools
if upload_with_megatools(temp_file_path, log_name):
print(f"Log guardado con megatools: {log_name}")
else:
save_locally_as_backup(session_data)
else:
# Fallback a guardado local
print("No se pudo conectar a MEGA, guardando localmente")
save_locally_as_backup(session_data)
finally:
# Limpiar archivo temporal
if os.path.exists(temp_file_path):
os.unlink(temp_file_path)
except Exception as e:
print(f"Error durante la persistencia de datos: {e}")
save_locally_as_backup(session_data)
def save_locally_as_backup(session_data):
"""Guarda las conversaciones localmente como respaldo"""
try:
# Formatear el log de la conversación
formatted_log = ""
for user_msg, assistant_msg in session_data:
formatted_log += f"User: {user_msg}\n"
if assistant_msg:
formatted_log += f"Assistant: {assistant_msg}\n"
formatted_log += "-----\n"
# Generar nombre único para el archivo
timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
log_name = f"backup_session_{timestamp}.log"
# Crear directorio de respaldo si no existe
backup_dir = "/tmp/chat_backups"
os.makedirs(backup_dir, exist_ok=True)
# Guardar archivo local
local_path = os.path.join(backup_dir, log_name)
with open(local_path, 'w', encoding='utf-8') as f:
f.write(formatted_log)
print(f"Conversación guardada localmente como respaldo: {local_path}")
except Exception as e:
print(f"Error al guardar respaldo local: {e}")
def respond(message, history: list[tuple[str, str]]):
messages = [{"role": "system", "content": SYSTEM_MESSAGE}]
for user_msg, assistant_msg in history:
if user_msg:
messages.append({"role": "user", "content": user_msg})
if assistant_msg:
messages.append({"role": "assistant", "content": assistant_msg})
messages.append({"role": "user", "content": message})
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {GROQ_API_KEY}"
}
payload = {
"model": MODEL_NAME,
"messages": messages,
"max_tokens": MAX_TOKENS,
"temperature": TEMPERATURE,
"top_p": TOP_P,
"stream": True
}
response = requests.post(
GROQ_API_URL,
headers=headers,
json=payload,
stream=True
)
accumulated_response = ""
for line in response.iter_lines():
if line:
line_text = line.decode('utf-8')
if line_text.startswith("data: "):
data_str = line_text[6:]
if data_str == "[DONE]":
break
try:
data = json.loads(data_str)
if 'choices' in data and len(data['choices']) > 0:
delta = data['choices'][0].get('delta', {})
if 'content' in delta and delta['content']:
token = delta['content']
accumulated_response += token
yield accumulated_response
except json.JSONDecodeError:
continue
if not accumulated_response:
yield "Lo siento, ocurrió un error al procesar tu solicitud."
else:
current_session = history + [(message, accumulated_response)]
persist_data_simple(current_session)
demo = gr.ChatInterface(
respond,
examples=[["¡Bienvenido a Tu Aliado Momentum!"],
["¿En qué consiste el programa y para quién es?"],
["¿Qué beneficios obtengo y con qué empresas me conecto?"],
["¿Cómo puedo participar o registrarme?"]
]
)
if __name__ == "__main__":
demo.launch()