Spaces:
Running
Running
| import gradio as gr | |
| import os | |
| import json | |
| import requests | |
| from datetime import datetime | |
| import tempfile | |
| import subprocess | |
| import sys | |
| GROQ_API_KEY = os.environ.get("GROQ_API_KEY") | |
| GROQ_API_URL = "https://api.groq.com/openai/v1/chat/completions" | |
| SYSTEM_MESSAGE = os.environ.get("System_Prompt") | |
| MODEL_NAME = "meta-llama/llama-4-maverick-17b-128e-instruct" | |
| MAX_TOKENS = 4096 | |
| TEMPERATURE = 0.7 | |
| TOP_P = 0.95 | |
| # Credenciales de MEGA | |
| MEGA_EMAIL = os.environ.get("MEGA_EMAIL") | |
| MEGA_PASSWORD = os.environ.get("MEGA_PASSWORD") | |
| REMOTE_DESTINATION_FOLDER = "Conversations" | |
| def install_megapy(): | |
| """Instala megapy si no está disponible""" | |
| try: | |
| import megapy | |
| return True | |
| except ImportError: | |
| try: | |
| subprocess.check_call([sys.executable, "-m", "pip", "install", "megapy"]) | |
| import megapy | |
| return True | |
| except Exception as e: | |
| print(f"No se pudo instalar megapy: {e}") | |
| return False | |
| def get_mega_client_simple(): | |
| """Intenta conectar usando diferentes métodos""" | |
| # Método 1: Intentar mega.py | |
| try: | |
| from mega import Mega | |
| print("Usando mega.py...") | |
| mega = Mega({'verbose': False}) | |
| m = mega.login(MEGA_EMAIL, MEGA_PASSWORD) | |
| print("Conexión exitosa con mega.py") | |
| return m, "mega.py" | |
| except Exception as e: | |
| print(f"mega.py falló: {e}") | |
| # Método 2: Intentar megapy | |
| if install_megapy(): | |
| try: | |
| import megapy | |
| print("Usando megapy...") | |
| client = megapy.MegaApi() | |
| # Aquí implementarías la lógica de megapy | |
| print("megapy inicializado (requiere implementación adicional)") | |
| return None, "megapy" | |
| except Exception as e: | |
| print(f"megapy falló: {e}") | |
| # Método 3: Usar megatools (si está disponible) | |
| try: | |
| result = subprocess.run(['which', 'megaput'], capture_output=True, text=True) | |
| if result.returncode == 0: | |
| print("Encontrado megatools, usando método de línea de comandos") | |
| return "megatools", "megatools" | |
| except Exception as e: | |
| print(f"megatools no disponible: {e}") | |
| return None, None | |
| def upload_with_megatools(file_path, remote_path): | |
| """Sube archivo usando megatools""" | |
| try: | |
| # Configurar credenciales | |
| config_content = f"""[Login] | |
| Username = {MEGA_EMAIL} | |
| Password = {MEGA_PASSWORD} | |
| """ | |
| config_path = "/tmp/.megarc" | |
| with open(config_path, 'w') as f: | |
| f.write(config_content) | |
| # Subir archivo | |
| cmd = ['megaput', '--config', config_path, '--path', f'/{REMOTE_DESTINATION_FOLDER}', file_path] | |
| result = subprocess.run(cmd, capture_output=True, text=True) | |
| if result.returncode == 0: | |
| print("Archivo subido exitosamente con megatools") | |
| return True | |
| else: | |
| print(f"Error con megatools: {result.stderr}") | |
| return False | |
| except Exception as e: | |
| print(f"Error usando megatools: {e}") | |
| return False | |
| def persist_data_simple(session_data): | |
| """Versión simplificada para guardar datos""" | |
| if not MEGA_EMAIL or not MEGA_PASSWORD: | |
| print("Warning: Credenciales de MEGA no configuradas.") | |
| save_locally_as_backup(session_data) | |
| return | |
| try: | |
| # Formatear el log de la conversación | |
| formatted_log = "" | |
| for user_msg, assistant_msg in session_data: | |
| formatted_log += f"User: {user_msg}\n" | |
| if assistant_msg: | |
| formatted_log += f"Assistant: {assistant_msg}\n" | |
| formatted_log += "-----\n" | |
| # Generar nombre único para el archivo | |
| timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S") | |
| log_name = f"session_{timestamp}.log" | |
| # Crear archivo temporal | |
| with tempfile.NamedTemporaryFile(mode='w', suffix='.log', delete=False, encoding='utf-8') as temp_file: | |
| temp_file.write(formatted_log) | |
| temp_file_path = temp_file.name | |
| try: | |
| # Intentar diferentes métodos de subida | |
| mega_client, method = get_mega_client_simple() | |
| if method == "mega.py" and mega_client: | |
| # Método original con mega.py | |
| try: | |
| files = mega_client.get_files() | |
| conversations_folder_id = None | |
| # Buscar carpeta Conversations | |
| for file_id, file_info in files.items(): | |
| if file_info['a'] and file_info['a'].get('n') == REMOTE_DESTINATION_FOLDER: | |
| conversations_folder_id = file_id | |
| break | |
| # Crear carpeta si no existe | |
| if not conversations_folder_id: | |
| folder = mega_client.create_folder(REMOTE_DESTINATION_FOLDER) | |
| conversations_folder_id = folder[0] | |
| # Subir archivo | |
| uploaded_file = mega_client.upload(temp_file_path, conversations_folder_id) | |
| mega_client.rename(uploaded_file, log_name) | |
| print(f"Log guardado exitosamente en MEGA con mega.py: {log_name}") | |
| except Exception as e: | |
| print(f"Error con mega.py: {e}") | |
| save_locally_as_backup(session_data) | |
| elif method == "megatools": | |
| # Usar megatools | |
| if upload_with_megatools(temp_file_path, log_name): | |
| print(f"Log guardado con megatools: {log_name}") | |
| else: | |
| save_locally_as_backup(session_data) | |
| else: | |
| # Fallback a guardado local | |
| print("No se pudo conectar a MEGA, guardando localmente") | |
| save_locally_as_backup(session_data) | |
| finally: | |
| # Limpiar archivo temporal | |
| if os.path.exists(temp_file_path): | |
| os.unlink(temp_file_path) | |
| except Exception as e: | |
| print(f"Error durante la persistencia de datos: {e}") | |
| save_locally_as_backup(session_data) | |
| def save_locally_as_backup(session_data): | |
| """Guarda las conversaciones localmente como respaldo""" | |
| try: | |
| # Formatear el log de la conversación | |
| formatted_log = "" | |
| for user_msg, assistant_msg in session_data: | |
| formatted_log += f"User: {user_msg}\n" | |
| if assistant_msg: | |
| formatted_log += f"Assistant: {assistant_msg}\n" | |
| formatted_log += "-----\n" | |
| # Generar nombre único para el archivo | |
| timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S") | |
| log_name = f"backup_session_{timestamp}.log" | |
| # Crear directorio de respaldo si no existe | |
| backup_dir = "/tmp/chat_backups" | |
| os.makedirs(backup_dir, exist_ok=True) | |
| # Guardar archivo local | |
| local_path = os.path.join(backup_dir, log_name) | |
| with open(local_path, 'w', encoding='utf-8') as f: | |
| f.write(formatted_log) | |
| print(f"Conversación guardada localmente como respaldo: {local_path}") | |
| except Exception as e: | |
| print(f"Error al guardar respaldo local: {e}") | |
| def respond(message, history: list[tuple[str, str]]): | |
| messages = [{"role": "system", "content": SYSTEM_MESSAGE}] | |
| for user_msg, assistant_msg in history: | |
| if user_msg: | |
| messages.append({"role": "user", "content": user_msg}) | |
| if assistant_msg: | |
| messages.append({"role": "assistant", "content": assistant_msg}) | |
| messages.append({"role": "user", "content": message}) | |
| headers = { | |
| "Content-Type": "application/json", | |
| "Authorization": f"Bearer {GROQ_API_KEY}" | |
| } | |
| payload = { | |
| "model": MODEL_NAME, | |
| "messages": messages, | |
| "max_tokens": MAX_TOKENS, | |
| "temperature": TEMPERATURE, | |
| "top_p": TOP_P, | |
| "stream": True | |
| } | |
| response = requests.post( | |
| GROQ_API_URL, | |
| headers=headers, | |
| json=payload, | |
| stream=True | |
| ) | |
| accumulated_response = "" | |
| for line in response.iter_lines(): | |
| if line: | |
| line_text = line.decode('utf-8') | |
| if line_text.startswith("data: "): | |
| data_str = line_text[6:] | |
| if data_str == "[DONE]": | |
| break | |
| try: | |
| data = json.loads(data_str) | |
| if 'choices' in data and len(data['choices']) > 0: | |
| delta = data['choices'][0].get('delta', {}) | |
| if 'content' in delta and delta['content']: | |
| token = delta['content'] | |
| accumulated_response += token | |
| yield accumulated_response | |
| except json.JSONDecodeError: | |
| continue | |
| if not accumulated_response: | |
| yield "Lo siento, ocurrió un error al procesar tu solicitud." | |
| else: | |
| current_session = history + [(message, accumulated_response)] | |
| persist_data_simple(current_session) | |
| demo = gr.ChatInterface( | |
| respond, | |
| examples=[["¡Bienvenido a Tu Aliado Momentum!"], | |
| ["¿En qué consiste el programa y para quién es?"], | |
| ["¿Qué beneficios obtengo y con qué empresas me conecto?"], | |
| ["¿Cómo puedo participar o registrarme?"] | |
| ] | |
| ) | |
| if __name__ == "__main__": | |
| demo.launch() |