import requests import os import json import time import pickle from typing import Dict, List, Optional, Tuple, Any from bs4 import BeautifulSoup from urllib.parse import urljoin, urlparse from langchain.text_splitter import RecursiveCharacterTextSplitter from langchain_community.vectorstores import FAISS from langchain_community.embeddings import HuggingFaceEmbeddings import random import asyncio import aiohttp # --- Configurações --- BLOG_URL = "https://aldohenrique.com.br/" VECTOR_STORE_PATH = "faiss_index_store.pkl" PROCESSED_URLS_PATH = "processed_urls.pkl" HF_TOKEN = os.getenv("HF_TOKEN") OR_TOKEN= os.getenv("OR_TOKEN") # --- Provedores de LLM Gratuitos --- class LLMProvider: """Classe base para provedores de LLM""" def __init__(self, name: str, base_url: str = None): self.name = name self.base_url = base_url self.is_available = False self.models = [] async def test_availability(self) -> bool: """Testa se o provedor está disponível""" return False async def query_model(self, messages: List[Dict], model_name: str = None, **kwargs) -> str: """Faz query no modelo""" return "" def convert_messages_to_prompt(self, messages: List[Dict]) -> str: """Converte mensagens para formato de prompt""" prompt_parts = [] for msg in messages: role = msg['role'] content = msg['content'] if role == 'system': prompt_parts.append(f"Sistema: {content}") elif role == 'user': prompt_parts.append(f"Usuário: {content}") elif role == 'assistant': prompt_parts.append(f"Assistente: {content}") prompt_parts.append("Assistente:") return "\n\n".join(prompt_parts) class HuggingFaceProvider(LLMProvider): """Provedor Hugging Face (original)""" def __init__(self, token: str): super().__init__("Hugging Face", "https://api-inference.huggingface.co") self.token = token self.headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"} self.models = [ ("Phi-3 Mini (Mais rápido)", "microsoft/Phi-3-mini-4k-instruct"), ("Zephyr 7B (Meio Termo)", "HuggingFaceH4/zephyr-7b-beta"), ("Mistral-7B", "mistralai/Mistral-7B-Instruct-v0.3"), ("Microsoft 8B", "meta-llama/Meta-Llama-3-8B-Instruct"), ("DialoGPT", "microsoft/DialoGPT-medium"), ("Google Flan-T5", "google/flan-t5-base"), ("Facebook BART", "facebook/bart-large-cnn") ] async def test_availability(self) -> bool: if not self.token: return False # Testa com um modelo simples test_model = "microsoft/DialoGPT-medium" url = f"{self.base_url}/models/{test_model}" try: async with aiohttp.ClientSession() as session: async with session.post( url, headers=self.headers, json={"inputs": "Hello", "parameters": {"max_new_tokens": 5}}, timeout=aiohttp.ClientTimeout(total=10) ) as response: self.is_available = response.status in [200, 503] # 503 = modelo carregando return self.is_available except: return False async def query_model(self, messages: List[Dict], model_name: str = None, **kwargs) -> str: if not model_name: model_name = "mistralai/Mistral-7B-Instruct-v0.3" prompt = self.convert_messages_to_prompt(messages) url = f"{self.base_url}/models/{model_name}" payload = { "inputs": prompt, "parameters": { "max_new_tokens": kwargs.get("max_tokens", 1000), "temperature": kwargs.get("temperature", 0.7), "do_sample": True, "return_full_text": False } } try: async with aiohttp.ClientSession() as session: async with session.post( url, headers=self.headers, json=payload, timeout=aiohttp.ClientTimeout(total=30) ) as response: if response.status == 200: result = await response.json() if isinstance(result, list) and len(result) > 0: return result[0].get('generated_text', '').strip() elif isinstance(result, dict) and 'generated_text' in result: return result['generated_text'].strip() return f"Erro HF: {response.status}" except Exception as e: return f"Erro na requisição HF: {str(e)}" class OllamaProvider(LLMProvider): """Provedor Ollama (local)""" def __init__(self, host: str = "http://localhost:11434"): super().__init__("Ollama", host) self.models = [ ("Llama 3.2", "llama3.2"), ("Phi 3", "phi3"), ("Gemma 2", "gemma2"), ("Mistral", "mistral"), ("CodeLlama", "codellama") ] async def test_availability(self) -> bool: try: async with aiohttp.ClientSession() as session: async with session.get( f"{self.base_url}/api/tags", timeout=aiohttp.ClientTimeout(total=5) ) as response: self.is_available = response.status == 200 return self.is_available except: return False async def query_model(self, messages: List[Dict], model_name: str = "llama3.2", **kwargs) -> str: prompt = self.convert_messages_to_prompt(messages) payload = { "model": model_name, "prompt": prompt, "stream": False, "options": { "num_predict": kwargs.get("max_tokens", 1000), "temperature": kwargs.get("temperature", 0.7) } } try: async with aiohttp.ClientSession() as session: async with session.post( f"{self.base_url}/api/generate", json=payload, timeout=aiohttp.ClientTimeout(total=60) ) as response: if response.status == 200: result = await response.json() return result.get('response', '').strip() return f"Erro Ollama: {response.status}" except Exception as e: return f"Erro na requisição Ollama: {str(e)}" class OpenRouterProvider(LLMProvider): """Provedor OpenRouter (alguns modelos gratuitos)""" def __init__(self, api_key: str = None): super().__init__("OpenRouter", "https://openrouter.ai/api/v1") self.api_key = api_key or os.getenv("OPENROUTER_API_KEY") self.headers = { "Authorization": f"Bearer {self.api_key}" if self.api_key else "", "Content-Type": "application/json" } self.models = [ ("Mistral 7B Free", "mistralai/mistral-7b-instruct:free"), ("Mythomist 7B", "gryphe/mythomist-7b:free"), ("Toppy M 7B", "undi95/toppy-m-7b:free"), ("OpenChat 3.5", "openchat/openchat-7b:free") ] async def test_availability(self) -> bool: if not self.api_key: return False try: async with aiohttp.ClientSession() as session: async with session.get( f"{self.base_url}/models", headers=self.headers, timeout=aiohttp.ClientTimeout(total=10) ) as response: self.is_available = response.status == 200 return self.is_available except: return False async def query_model(self, messages: List[Dict], model_name: str = "mistralai/mistral-7b-instruct:free", **kwargs) -> str: payload = { "model": model_name, "messages": messages, "max_tokens": kwargs.get("max_tokens", 1000), "temperature": kwargs.get("temperature", 0.7) } try: async with aiohttp.ClientSession() as session: async with session.post( f"{self.base_url}/chat/completions", headers=self.headers, json=payload, timeout=aiohttp.ClientTimeout(total=30) ) as response: if response.status == 200: result = await response.json() return result['choices'][0]['message']['content'].strip() return f"Erro OpenRouter: {response.status}" except Exception as e: return f"Erro na requisição OpenRouter: {str(e)}" class GroqProvider(LLMProvider): """Provedor Groq (gratuito com limites)""" def __init__(self, api_key: str = None): super().__init__("Groq", "https://api.groq.com/openai/v1") self.api_key = api_key or os.getenv("GROQ_API_KEY") self.headers = { "Authorization": f"Bearer {self.api_key}" if self.api_key else "", "Content-Type": "application/json" } self.models = [ ("Llama 3 8B", "llama3-8b-8192"), ("Llama 3 70B", "llama3-70b-8192"), ("Mixtral 8x7B", "mixtral-8x7b-32768"), ("Gemma 7B", "gemma-7b-it") ] async def test_availability(self) -> bool: if not self.api_key: return False try: async with aiohttp.ClientSession() as session: async with session.get( f"{self.base_url}/models", headers=self.headers, timeout=aiohttp.ClientTimeout(total=10) ) as response: self.is_available = response.status == 200 return self.is_available except: return False async def query_model(self, messages: List[Dict], model_name: str = "llama3-8b-8192", **kwargs) -> str: payload = { "model": model_name, "messages": messages, "max_tokens": kwargs.get("max_tokens", 1000), "temperature": kwargs.get("temperature", 0.7) } try: async with aiohttp.ClientSession() as session: async with session.post( f"{self.base_url}/chat/completions", headers=self.headers, json=payload, timeout=aiohttp.ClientTimeout(total=30) ) as response: if response.status == 200: result = await response.json() return result['choices'][0]['message']['content'].strip() return f"Erro Groq: {response.status}" except Exception as e: return f"Erro na requisição Groq: {str(e)}" class TogetherAIProvider(LLMProvider): """Provedor Together AI (alguns modelos gratuitos)""" def __init__(self, api_key: str = None): super().__init__("Together AI", "https://api.together.xyz/v1") self.api_key = api_key or os.getenv("TOGETHER_API_KEY") self.headers = { "Authorization": f"Bearer {self.api_key}" if self.api_key else "", "Content-Type": "application/json" } self.models = [ ("Mistral 7B", "mistralai/Mistral-7B-Instruct-v0.1"), ("Code Llama 7B", "codellama/CodeLlama-7b-Instruct-hf"), ("Llama 2 7B", "meta-llama/Llama-2-7b-chat-hf"), ("WizardCoder 15B", "WizardLM/WizardCoder-15B-V1.0") ] async def test_availability(self) -> bool: if not self.api_key: return False try: async with aiohttp.ClientSession() as session: async with session.get( f"{self.base_url}/models", headers=self.headers, timeout=aiohttp.ClientTimeout(total=10) ) as response: self.is_available = response.status == 200 return self.is_available except: return False async def query_model(self, messages: List[Dict], model_name: str = "mistralai/Mistral-7B-Instruct-v0.1", **kwargs) -> str: payload = { "model": model_name, "messages": messages, "max_tokens": kwargs.get("max_tokens", 1000), "temperature": kwargs.get("temperature", 0.7) } try: async with aiohttp.ClientSession() as session: async with session.post( f"{self.base_url}/chat/completions", headers=self.headers, json=payload, timeout=aiohttp.ClientTimeout(total=30) ) as response: if response.status == 200: result = await response.json() return result['choices'][0]['message']['content'].strip() return f"Erro Together AI: {response.status}" except Exception as e: return f"Erro na requisição Together AI: {str(e)}" # --- Gerenciador de Múltiplos Provedores --- class MultiProviderLLMClient: """Cliente que gerencia múltiplos provedores de LLM""" def __init__(self): self.providers = [] self.available_providers = [] self.current_provider_index = 0 self.models_cache = {} # Inicializa provedores if HF_TOKEN: self.providers.append(HuggingFaceProvider(HF_TOKEN)) # Adiciona outros provedores self.providers.extend([ OllamaProvider(), OpenRouterProvider(), GroqProvider(), TogetherAIProvider() ]) async def initialize_providers(self): """Inicializa e testa todos os provedores""" print("Testando provedores de LLM...") tasks = [] for provider in self.providers: tasks.append(self._test_provider(provider)) await asyncio.gather(*tasks) # Filtra apenas provedores disponíveis self.available_providers = [p for p in self.providers if p.is_available] if not self.available_providers: print("⚠️ Nenhum provedor de LLM disponível!") return False print(f"✅ {len(self.available_providers)} provedores disponíveis:") for provider in self.available_providers: print(f" - {provider.name}") self._build_models_cache() return True async def _test_provider(self, provider: LLMProvider): """Testa um provedor específico""" try: is_available = await provider.test_availability() if is_available: print(f"✅ {provider.name} - Disponível") else: print(f"❌ {provider.name} - Indisponível") except Exception as e: print(f"❌ {provider.name} - Erro: {e}") def _build_models_cache(self): """Constrói cache de modelos disponíveis""" self.models_cache = {} for provider in self.available_providers: for model_name, model_id in provider.models: full_name = f"{model_name} ({provider.name})" self.models_cache[full_name] = { "provider": provider, "model_id": model_id, "provider_name": provider.name } def get_available_models(self) -> Dict[str, str]: """Retorna modelos disponíveis em formato compatível""" return {name: info["model_id"] for name, info in self.models_cache.items()} async def query_model(self, messages: List[Dict], model_name: str = None, **kwargs) -> str: """Faz query tentando diferentes provedores como fallback""" if not self.available_providers: return "Nenhum provedor de LLM disponível. Por favor, configure ao menos um provedor." # Se modelo específico foi solicitado if model_name and model_name in self.models_cache: provider_info = self.models_cache[model_name] try: result = await provider_info["provider"].query_model( messages, provider_info["model_id"], **kwargs ) if not result.startswith("Erro"): return result except Exception as e: print(f"Erro com {provider_info['provider_name']}: {e}") # Fallback: tenta todos os provedores disponíveis for provider in self.available_providers: try: if provider.models: default_model = provider.models[0][1] # Pega o primeiro modelo result = await provider.query_model(messages, default_model, **kwargs) if not result.startswith("Erro"): return f"{result}\n\n_Resposta gerada por: {provider.name}_" except Exception as e: print(f"Erro com provedor {provider.name}: {e}") continue return "Desculpe, todos os provedores de LLM estão indisponíveis no momento. Tente novamente mais tarde." # --- Integração com o código original --- # Variáveis globais atualizadas MODELS = {} DEFAULT_MODEL = "Mistral 7B (Hugging Face)" multi_client = MultiProviderLLMClient() # Gerenciamento de Sessão (mantido igual) user_sessions: Dict[str, Dict[str, List | Dict]] = {} MAX_MEMORY_LENGTH = 5 def get_session_memory_path(session_id: str) -> str: """Retorna o caminho do arquivo de memória para a sessão.""" return f"conversation_memory_{session_id}.json" def load_conversation_memory(session_id: str): """Carrega a memória da sessão do usuário.""" if session_id in user_sessions: return memory_path = get_session_memory_path(session_id) session_data = {'conversation': [], 'user_profile': {'nivel': 'intermediario', 'total_perguntas': 0}} if os.path.exists(memory_path): try: with open(memory_path, 'r', encoding='utf-8') as f: session_data = json.load(f) except Exception as e: print(f"Erro ao carregar memória para sessão '{session_id}': {e}") user_sessions[session_id] = session_data def save_conversation_memory(session_id: str): """Salva a memória da sessão do usuário.""" memory_path = get_session_memory_path(session_id) try: with open(memory_path, 'w', encoding='utf-8') as f: json.dump(user_sessions[session_id], f, ensure_ascii=False, indent=2) except Exception as e: print(f"Erro ao salvar memória para sessão '{session_id}': {e}") def add_to_memory(session_id: str, user_message: str, assistant_response: str): """Adiciona uma troca de mensagens à memória da sessão.""" load_conversation_memory(session_id) conversation = user_sessions[session_id]['conversation'] conversation.extend([ {"role": "user", "content": user_message, "timestamp": time.time()}, {"role": "assistant", "content": assistant_response, "timestamp": time.time()} ]) if len(conversation) > MAX_MEMORY_LENGTH * 2: user_sessions[session_id]['conversation'] = conversation[-MAX_MEMORY_LENGTH * 2:] save_conversation_memory(session_id) def update_user_profile(session_id: str, user_message: str): """Atualiza o perfil do usuário com base na mensagem.""" load_conversation_memory(session_id) profile = user_sessions[session_id]['user_profile'] message_lower = user_message.lower() if any(word in message_lower for word in ['básico', 'iniciante']): profile['nivel'] = 'iniciante' elif any(word in message_lower for word in ['avançado', 'complexo']): profile['nivel'] = 'avançado' topics = { 'java': ['java', 'classe', 'objeto'], 'web': ['html', 'css', 'javascript'], 'ia': ['inteligência artificial', 'machine learning'] } for topic, keywords in topics.items(): if any(keyword in message_lower for keyword in keywords): profile[f'interesse_{topic}'] = profile.get(f'interesse_{topic}', 0) + 1 profile['total_perguntas'] = profile.get('total_perguntas', 0) + 1 user_sessions[session_id]['user_profile'] = profile def get_conversation_context(session_id: str) -> str: """Gera o contexto da conversa recente.""" load_conversation_memory(session_id) conversation = user_sessions[session_id]['conversation'][-4:] if not conversation: return "" return "\n".join(f"{msg['role'].upper()}: {msg['content']}" for msg in conversation) def get_user_profile_context(session_id: str) -> str: """Gera o contexto do perfil do usuário.""" load_conversation_memory(session_id) profile = user_sessions[session_id]['user_profile'] context = f"Nível: {profile.get('nivel', 'intermediario')}\n" context += f"Total de perguntas: {profile.get('total_perguntas', 0)}\n" interesses = [f"{k.replace('interesse_', '').title()} ({v})" for k, v in profile.items() if k.startswith('interesse_')] if interesses: context += f"Interesses: {', '.join(interesses)}\n" return context def clear_memory(session_id: str) -> str: """Limpa a memória de uma sessão específica.""" if session_id in user_sessions: del user_sessions[session_id] memory_path = get_session_memory_path(session_id) if os.path.exists(memory_path): os.remove(memory_path) return "Memória limpa com sucesso!" # --- RAG (mantido igual) --- vector_store: Optional[FAISS] = None def get_all_blog_links(url: str) -> set: """Coleta todos os links do blog.""" links = {url} visited = set() while links: current_url = links.pop() if current_url in visited: continue try: response = requests.get(current_url, timeout=500) soup = BeautifulSoup(response.content, 'html.parser') visited.add(current_url) for link in soup.find_all('a', href=True): href = urljoin(url, link['href']) if urlparse(href).netloc == urlparse(url).netloc and '/tag/' not in href and '/category/' not in href: links.add(href) except Exception as e: print(f"Erro ao acessar {current_url}: {e}") return visited def scrape_text_from_url(url: str) -> str: """Extrai texto de uma URL.""" try: response = requests.get(url, timeout=500) soup = BeautifulSoup(response.content, 'html.parser') content = soup.find('article') or soup.find('main') return content.get_text(separator='\n', strip=True) if content else "" except Exception as e: print(f"Erro ao raspar {url}: {e}") return "" def build_and_save_vector_store(): """Constrói e salva o vector store.""" global vector_store links = get_all_blog_links(BLOG_URL) texts = [scrape_text_from_url(link) for link in links if scrape_text_from_url(link)] if not texts: return "Nenhum conteúdo encontrado." text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=150) chunks = text_splitter.create_documents(texts) embeddings = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2") vector_store = FAISS.from_documents(chunks, embeddings) with open(VECTOR_STORE_PATH, "wb") as f: pickle.dump(vector_store, f) with open(PROCESSED_URLS_PATH, "wb") as f: pickle.dump(links, f) return f"Vector store criado com {len(chunks)} chunks." def load_vector_store(): """Carrega o vector store.""" global vector_store if os.path.exists(VECTOR_STORE_PATH): with open(VECTOR_STORE_PATH, "rb") as f: vector_store = pickle.load(f) else: build_and_save_vector_store() def retrieve_context_from_blog(query: str, k: int = 4) -> str: """Busca contexto relevante no vector store.""" if vector_store: try: results = vector_store.similarity_search(query, k=k) return "\n".join(doc.page_content for doc in results) except Exception as e: print(f"Erro ao buscar contexto: {e}") return "" # --- Função Principal Atualizada --- async def responder_como_aldo(session_id: str, pergunta: str, modelo: str = DEFAULT_MODEL) -> str: """Gera resposta como Dr. Aldo Henrique usando múltiplos provedores.""" if not pergunta.strip(): return "Por favor, faça uma pergunta válida." load_conversation_memory(session_id) update_user_profile(session_id, pergunta) contexto = [] if perfil := get_user_profile_context(session_id): contexto.append(f"**Perfil do Usuário**\n{perfil}") if conversa := get_conversation_context(session_id): contexto.append(f"**Conversa Anterior**\n{conversa}") if blog := retrieve_context_from_blog(pergunta): contexto.append(f"**Contexto do Blog**\n{blog}") system_prompt = """Você é o Dr. Aldo Henrique, Doutor em Ciências da Computação pela UnB (2024), mestre em Ciências da Computação pela UnB (2017) e bacharel em Sistemas de Informação pela UFV (2014). Professor universitário, onde leciona disciplinas como Algoritmos, Inteligência Artificial, Ciência de Dados e Mineração de Dados. Atua como analista de sistemas nível 4. Regras de conduta: - Sempre coloque uma piada ou trocadilho no final da resposta. - Responda em português, de forma clara, amigável e educativa. - Explique conceitos antes de mostrar soluções. - Use exemplos práticos. - Considere o nível do usuário (iniciante, intermediário ou avançado). - Use Markdown para formatar respostas, com ``` para blocos de código. - Dentro do código sempre coloque comentários explicando para o alunos aprender com os comentários. - Foque em tecnologia; se a pergunta for fora do escopo, informe educadamente que não é seu domínio. """ conteudo_contexto = "\n".join(contexto) mensagem_usuario = f"{conteudo_contexto}\n\n**Pergunta**: {pergunta}" messages = [ {"role": "system", "content": system_prompt}, {"role": "user", "content": mensagem_usuario} ] # Usa o cliente multi-provedor resposta = await multi_client.query_model(messages, modelo) add_to_memory(session_id, pergunta, resposta) return resposta # --- Função de Inicialização Atualizada --- async def inicializar_sistema(): """ Inicializa o sistema com múltiplos provedores de LLM. Retorna uma tupla: (status: bool, models: dict) """ print("Inicializando Sistema Multi-Provedor Dr. Aldo...") print("=" * 60) # Inicializa os provedores success = await multi_client.initialize_providers() if not success: print("❌ Nenhum provedor de LLM disponível!") return False, {} # Atualiza variáveis globais global MODELS, DEFAULT_MODEL MODELS = multi_client.get_available_models() if MODELS: # Define o primeiro modelo como padrão DEFAULT_MODEL = list(MODELS.keys())[0] print(f"✅ Modelo padrão definido: {DEFAULT_MODEL}") # Carrega o vector store load_vector_store() print("=" * 60) print("✅ Sistema inicializado com sucesso!") print(f"📊 Total de modelos disponíveis: {len(MODELS)}") print(f"🔧 Provedores ativos: {len(multi_client.available_providers)}") return True, MODELS # --- Função de Teste de Modelos --- async def testar_modelos_disponiveis(): """Testa todos os modelos disponíveis""" print("\n" + "=" * 60) print("TESTANDO MODELOS DISPONÍVEIS") print("=" * 60) test_message = [ {"role": "system", "content": "Você é um assistente útil."}, {"role": "user", "content": "Diga apenas 'Olá, funcionando!' em português."} ] for model_name in MODELS.keys(): print(f"\n🧪 Testando: {model_name}") try: result = await multi_client.query_model(test_message, model_name) if result and not result.startswith("Erro"): print(f"✅ {model_name} - Funcionando") print(f" Resposta: {result[:100]}...") else: print(f"❌ {model_name} - Falha: {result}") except Exception as e: print(f"❌ {model_name} - Erro: {e}") # Pequena pausa entre testes await asyncio.sleep(1) # --- Função de Configuração de Provedores --- def configurar_provedores(): """Exibe instruções para configurar os provedores""" print("\n" + "=" * 60) print("CONFIGURAÇÃO DE PROVEDORES LLM GRATUITOS") print("=" * 60) configs = { "Hugging Face": { "var": HF_TOKEN, "url": "https://huggingface.co/settings/tokens", "descricao": "Token gratuito para usar modelos do Hugging Face" }, "Groq": { "var": "GROQ_API_KEY", "url": "https://console.groq.com/keys", "descricao": "API key gratuita (limite diário)" }, "OpenRouter": { "var": OR_TOKEN, "url": "https://openrouter.ai/keys", "descricao": "Alguns modelos gratuitos disponíveis" }, "Together AI": { "var": "", "url": "https://api.together.xyz/settings/api-keys", "descricao": "Credits gratuitos iniciais" }, "Ollama": { "var": "Não necessário", "url": "https://ollama.ai/", "descricao": "Instalação local - totalmente gratuito" } } print("Para usar todos os provedores, configure as seguintes variáveis de ambiente:") print() for provedor, info in configs.items(): status = "✅ Configurado" if os.getenv(info["var"]) else "❌ Não configurado" print(f"{provedor}:") print(f" • Variável: {info['var']}") print(f" • URL: {info['url']}") print(f" • Descrição: {info['descricao']}") print(f" • Status: {status}") print() print("📝 Exemplo de configuração no seu .env:") print("export HF_TOKEN='seu_token_aqui'") print("export GROQ_API_KEY='sua_chave_aqui'") print("export OPENROUTER_API_KEY='sua_chave_aqui'") print("export TOGETHER_API_KEY='sua_chave_aqui'") # --- Função de Monitoramento --- async def monitorar_provedores(): """Monitora o status dos provedores""" print("\n" + "=" * 60) print("MONITORAMENTO DE PROVEDORES") print("=" * 60) for provider in multi_client.available_providers: print(f"\n🔍 Testando: {provider.name}") try: is_available = await provider.test_availability() status = "✅ Online" if is_available else "❌ Offline" print(f" Status: {status}") print(f" Modelos: {len(provider.models)}") except Exception as e: print(f" Status: ❌ Erro - {e}") # --- Função Principal Wrapper (para compatibilidade) --- def responder_como_aldo_sync(session_id: str, pergunta: str, modelo: str = DEFAULT_MODEL) -> str: """Wrapper síncrono para compatibilidade com código existente""" try: loop = asyncio.get_event_loop() if loop.is_running(): # Se já existe um loop rodando, cria uma nova task import concurrent.futures with concurrent.futures.ThreadPoolExecutor() as executor: future = executor.submit(asyncio.run, responder_como_aldo(session_id, pergunta, modelo)) return future.result() else: return loop.run_until_complete(responder_como_aldo(session_id, pergunta, modelo)) except Exception as e: return f"Erro ao processar resposta: {str(e)}" # --- Função de Salvamento de Configurações --- def salvar_configuracoes(): """Salva as configurações atuais""" config = { "provedores_disponiveis": [p.name for p in multi_client.available_providers], "modelos_disponiveis": list(MODELS.keys()), "modelo_padrao": DEFAULT_MODEL, "timestamp": time.time() } try: with open("llm_config.json", "w", encoding="utf-8") as f: json.dump(config, f, ensure_ascii=False, indent=2) print("✅ Configurações salvas em 'llm_config.json'") except Exception as e: print(f"❌ Erro ao salvar configurações: {e}") # --- Função Principal --- async def main(): """Função principal para teste e inicialização""" print("🚀 Iniciando Sistema Multi-Provedor LLM") print("=" * 60) # Mostra configurações configurar_provedores() # Inicializa sistema status, models = await inicializar_sistema_sync() if not status: print("❌ Sistema não pôde ser inicializado!") return # Salva configurações salvar_configuracoes() # Testa modelos await testar_modelos_disponiveis() # Monitora provedores await monitorar_provedores() # Teste básico do chatbot print("\n" + "=" * 60) print("TESTE DO CHATBOT") print("=" * 60) session_id = "teste_multi_provider" test_questions = [ "O que é Python?", "Mostre um exemplo de código Python simples", "Qual a diferença entre lista e tupla em Python?" ] for i, question in enumerate(test_questions, 1): print(f"\n📝 Pergunta {i}: {question}") print("-" * 40) try: resposta = await responder_como_aldo(session_id, question) print(f"🤖 Resposta: {resposta[:500]}...") except Exception as e: print(f"❌ Erro: {e}") await asyncio.sleep(2) # Pausa entre perguntas # Limpa memória do teste print(f"\n🧹 {clear_memory(session_id)}") # --- Compatibilidade com código original --- def inicializar_sistema_sync(): """Função síncrona para compatibilidade """ try: # A forma mais simples de rodar um evento asyncio a partir de um contexto síncrono # é usar asyncio.run(). A lógica de checar o loop pode ser simplificada. return asyncio.run(inicializar_sistema()) except Exception as e: print(f"Erro na inicialização: {e}") return False, {} # Para compatibilidade, mantém a função original responder_como_aldo_original = responder_como_aldo_sync if __name__ == "__main__": # Executa o sistema asyncio.run(main())