Spaces:
Running
Running
import requests | |
import os | |
import json | |
import time | |
import pickle | |
from typing import Dict, List, Optional, Tuple, Any | |
from bs4 import BeautifulSoup | |
from urllib.parse import urljoin, urlparse | |
from langchain.text_splitter import RecursiveCharacterTextSplitter | |
from langchain_community.vectorstores import FAISS | |
from langchain_community.embeddings import HuggingFaceEmbeddings | |
import random | |
import asyncio | |
import aiohttp | |
# --- Configurações --- | |
BLOG_URL = "https://aldohenrique.com.br/" | |
VECTOR_STORE_PATH = "faiss_index_store.pkl" | |
PROCESSED_URLS_PATH = "processed_urls.pkl" | |
HF_TOKEN = os.getenv("HF_TOKEN") | |
OR_TOKEN= os.getenv("OR_TOKEN") | |
# --- Provedores de LLM Gratuitos --- | |
class LLMProvider: | |
"""Classe base para provedores de LLM""" | |
def __init__(self, name: str, base_url: str = None): | |
self.name = name | |
self.base_url = base_url | |
self.is_available = False | |
self.models = [] | |
async def test_availability(self) -> bool: | |
"""Testa se o provedor está disponível""" | |
return False | |
async def query_model(self, messages: List[Dict], model_name: str = None, **kwargs) -> str: | |
"""Faz query no modelo""" | |
return "" | |
def convert_messages_to_prompt(self, messages: List[Dict]) -> str: | |
"""Converte mensagens para formato de prompt""" | |
prompt_parts = [] | |
for msg in messages: | |
role = msg['role'] | |
content = msg['content'] | |
if role == 'system': | |
prompt_parts.append(f"Sistema: {content}") | |
elif role == 'user': | |
prompt_parts.append(f"Usuário: {content}") | |
elif role == 'assistant': | |
prompt_parts.append(f"Assistente: {content}") | |
prompt_parts.append("Assistente:") | |
return "\n\n".join(prompt_parts) | |
class HuggingFaceProvider(LLMProvider): | |
"""Provedor Hugging Face (original)""" | |
def __init__(self, token: str): | |
super().__init__("Hugging Face", "https://api-inference.huggingface.co") | |
self.token = token | |
self.headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"} | |
self.models = [ | |
("Phi-3 Mini (Mais rápido)", "microsoft/Phi-3-mini-4k-instruct"), | |
("Zephyr 7B (Meio Termo)", "HuggingFaceH4/zephyr-7b-beta"), | |
("Mistral-7B", "mistralai/Mistral-7B-Instruct-v0.3"), | |
("Microsoft 8B", "meta-llama/Meta-Llama-3-8B-Instruct"), | |
("DialoGPT", "microsoft/DialoGPT-medium"), | |
("Google Flan-T5", "google/flan-t5-base"), | |
("Facebook BART", "facebook/bart-large-cnn") | |
] | |
async def test_availability(self) -> bool: | |
if not self.token: | |
return False | |
# Testa com um modelo simples | |
test_model = "microsoft/DialoGPT-medium" | |
url = f"{self.base_url}/models/{test_model}" | |
try: | |
async with aiohttp.ClientSession() as session: | |
async with session.post( | |
url, | |
headers=self.headers, | |
json={"inputs": "Hello", "parameters": {"max_new_tokens": 5}}, | |
timeout=aiohttp.ClientTimeout(total=10) | |
) as response: | |
self.is_available = response.status in [200, 503] # 503 = modelo carregando | |
return self.is_available | |
except: | |
return False | |
async def query_model(self, messages: List[Dict], model_name: str = None, **kwargs) -> str: | |
if not model_name: | |
model_name = "mistralai/Mistral-7B-Instruct-v0.3" | |
prompt = self.convert_messages_to_prompt(messages) | |
url = f"{self.base_url}/models/{model_name}" | |
payload = { | |
"inputs": prompt, | |
"parameters": { | |
"max_new_tokens": kwargs.get("max_tokens", 1000), | |
"temperature": kwargs.get("temperature", 0.7), | |
"do_sample": True, | |
"return_full_text": False | |
} | |
} | |
try: | |
async with aiohttp.ClientSession() as session: | |
async with session.post( | |
url, | |
headers=self.headers, | |
json=payload, | |
timeout=aiohttp.ClientTimeout(total=30) | |
) as response: | |
if response.status == 200: | |
result = await response.json() | |
if isinstance(result, list) and len(result) > 0: | |
return result[0].get('generated_text', '').strip() | |
elif isinstance(result, dict) and 'generated_text' in result: | |
return result['generated_text'].strip() | |
return f"Erro HF: {response.status}" | |
except Exception as e: | |
return f"Erro na requisição HF: {str(e)}" | |
class OllamaProvider(LLMProvider): | |
"""Provedor Ollama (local)""" | |
def __init__(self, host: str = "http://localhost:11434"): | |
super().__init__("Ollama", host) | |
self.models = [ | |
("Llama 3.2", "llama3.2"), | |
("Phi 3", "phi3"), | |
("Gemma 2", "gemma2"), | |
("Mistral", "mistral"), | |
("CodeLlama", "codellama") | |
] | |
async def test_availability(self) -> bool: | |
try: | |
async with aiohttp.ClientSession() as session: | |
async with session.get( | |
f"{self.base_url}/api/tags", | |
timeout=aiohttp.ClientTimeout(total=5) | |
) as response: | |
self.is_available = response.status == 200 | |
return self.is_available | |
except: | |
return False | |
async def query_model(self, messages: List[Dict], model_name: str = "llama3.2", **kwargs) -> str: | |
prompt = self.convert_messages_to_prompt(messages) | |
payload = { | |
"model": model_name, | |
"prompt": prompt, | |
"stream": False, | |
"options": { | |
"num_predict": kwargs.get("max_tokens", 1000), | |
"temperature": kwargs.get("temperature", 0.7) | |
} | |
} | |
try: | |
async with aiohttp.ClientSession() as session: | |
async with session.post( | |
f"{self.base_url}/api/generate", | |
json=payload, | |
timeout=aiohttp.ClientTimeout(total=60) | |
) as response: | |
if response.status == 200: | |
result = await response.json() | |
return result.get('response', '').strip() | |
return f"Erro Ollama: {response.status}" | |
except Exception as e: | |
return f"Erro na requisição Ollama: {str(e)}" | |
class OpenRouterProvider(LLMProvider): | |
"""Provedor OpenRouter (alguns modelos gratuitos)""" | |
def __init__(self, api_key: str = None): | |
super().__init__("OpenRouter", "https://openrouter.ai/api/v1") | |
self.api_key = api_key or os.getenv("OPENROUTER_API_KEY") | |
self.headers = { | |
"Authorization": f"Bearer {self.api_key}" if self.api_key else "", | |
"Content-Type": "application/json" | |
} | |
self.models = [ | |
("Mistral 7B Free", "mistralai/mistral-7b-instruct:free"), | |
("Mythomist 7B", "gryphe/mythomist-7b:free"), | |
("Toppy M 7B", "undi95/toppy-m-7b:free"), | |
("OpenChat 3.5", "openchat/openchat-7b:free") | |
] | |
async def test_availability(self) -> bool: | |
if not self.api_key: | |
return False | |
try: | |
async with aiohttp.ClientSession() as session: | |
async with session.get( | |
f"{self.base_url}/models", | |
headers=self.headers, | |
timeout=aiohttp.ClientTimeout(total=10) | |
) as response: | |
self.is_available = response.status == 200 | |
return self.is_available | |
except: | |
return False | |
async def query_model(self, messages: List[Dict], model_name: str = "mistralai/mistral-7b-instruct:free", **kwargs) -> str: | |
payload = { | |
"model": model_name, | |
"messages": messages, | |
"max_tokens": kwargs.get("max_tokens", 1000), | |
"temperature": kwargs.get("temperature", 0.7) | |
} | |
try: | |
async with aiohttp.ClientSession() as session: | |
async with session.post( | |
f"{self.base_url}/chat/completions", | |
headers=self.headers, | |
json=payload, | |
timeout=aiohttp.ClientTimeout(total=30) | |
) as response: | |
if response.status == 200: | |
result = await response.json() | |
return result['choices'][0]['message']['content'].strip() | |
return f"Erro OpenRouter: {response.status}" | |
except Exception as e: | |
return f"Erro na requisição OpenRouter: {str(e)}" | |
class GroqProvider(LLMProvider): | |
"""Provedor Groq (gratuito com limites)""" | |
def __init__(self, api_key: str = None): | |
super().__init__("Groq", "https://api.groq.com/openai/v1") | |
self.api_key = api_key or os.getenv("GROQ_API_KEY") | |
self.headers = { | |
"Authorization": f"Bearer {self.api_key}" if self.api_key else "", | |
"Content-Type": "application/json" | |
} | |
self.models = [ | |
("Llama 3 8B", "llama3-8b-8192"), | |
("Llama 3 70B", "llama3-70b-8192"), | |
("Mixtral 8x7B", "mixtral-8x7b-32768"), | |
("Gemma 7B", "gemma-7b-it") | |
] | |
async def test_availability(self) -> bool: | |
if not self.api_key: | |
return False | |
try: | |
async with aiohttp.ClientSession() as session: | |
async with session.get( | |
f"{self.base_url}/models", | |
headers=self.headers, | |
timeout=aiohttp.ClientTimeout(total=10) | |
) as response: | |
self.is_available = response.status == 200 | |
return self.is_available | |
except: | |
return False | |
async def query_model(self, messages: List[Dict], model_name: str = "llama3-8b-8192", **kwargs) -> str: | |
payload = { | |
"model": model_name, | |
"messages": messages, | |
"max_tokens": kwargs.get("max_tokens", 1000), | |
"temperature": kwargs.get("temperature", 0.7) | |
} | |
try: | |
async with aiohttp.ClientSession() as session: | |
async with session.post( | |
f"{self.base_url}/chat/completions", | |
headers=self.headers, | |
json=payload, | |
timeout=aiohttp.ClientTimeout(total=30) | |
) as response: | |
if response.status == 200: | |
result = await response.json() | |
return result['choices'][0]['message']['content'].strip() | |
return f"Erro Groq: {response.status}" | |
except Exception as e: | |
return f"Erro na requisição Groq: {str(e)}" | |
class TogetherAIProvider(LLMProvider): | |
"""Provedor Together AI (alguns modelos gratuitos)""" | |
def __init__(self, api_key: str = None): | |
super().__init__("Together AI", "https://api.together.xyz/v1") | |
self.api_key = api_key or os.getenv("TOGETHER_API_KEY") | |
self.headers = { | |
"Authorization": f"Bearer {self.api_key}" if self.api_key else "", | |
"Content-Type": "application/json" | |
} | |
self.models = [ | |
("Mistral 7B", "mistralai/Mistral-7B-Instruct-v0.1"), | |
("Code Llama 7B", "codellama/CodeLlama-7b-Instruct-hf"), | |
("Llama 2 7B", "meta-llama/Llama-2-7b-chat-hf"), | |
("WizardCoder 15B", "WizardLM/WizardCoder-15B-V1.0") | |
] | |
async def test_availability(self) -> bool: | |
if not self.api_key: | |
return False | |
try: | |
async with aiohttp.ClientSession() as session: | |
async with session.get( | |
f"{self.base_url}/models", | |
headers=self.headers, | |
timeout=aiohttp.ClientTimeout(total=10) | |
) as response: | |
self.is_available = response.status == 200 | |
return self.is_available | |
except: | |
return False | |
async def query_model(self, messages: List[Dict], model_name: str = "mistralai/Mistral-7B-Instruct-v0.1", **kwargs) -> str: | |
payload = { | |
"model": model_name, | |
"messages": messages, | |
"max_tokens": kwargs.get("max_tokens", 1000), | |
"temperature": kwargs.get("temperature", 0.7) | |
} | |
try: | |
async with aiohttp.ClientSession() as session: | |
async with session.post( | |
f"{self.base_url}/chat/completions", | |
headers=self.headers, | |
json=payload, | |
timeout=aiohttp.ClientTimeout(total=30) | |
) as response: | |
if response.status == 200: | |
result = await response.json() | |
return result['choices'][0]['message']['content'].strip() | |
return f"Erro Together AI: {response.status}" | |
except Exception as e: | |
return f"Erro na requisição Together AI: {str(e)}" | |
# --- Gerenciador de Múltiplos Provedores --- | |
class MultiProviderLLMClient: | |
"""Cliente que gerencia múltiplos provedores de LLM""" | |
def __init__(self): | |
self.providers = [] | |
self.available_providers = [] | |
self.current_provider_index = 0 | |
self.models_cache = {} | |
# Inicializa provedores | |
if HF_TOKEN: | |
self.providers.append(HuggingFaceProvider(HF_TOKEN)) | |
# Adiciona outros provedores | |
self.providers.extend([ | |
OllamaProvider(), | |
OpenRouterProvider(), | |
GroqProvider(), | |
TogetherAIProvider() | |
]) | |
async def initialize_providers(self): | |
"""Inicializa e testa todos os provedores""" | |
print("Testando provedores de LLM...") | |
tasks = [] | |
for provider in self.providers: | |
tasks.append(self._test_provider(provider)) | |
await asyncio.gather(*tasks) | |
# Filtra apenas provedores disponíveis | |
self.available_providers = [p for p in self.providers if p.is_available] | |
if not self.available_providers: | |
print("⚠️ Nenhum provedor de LLM disponível!") | |
return False | |
print(f"✅ {len(self.available_providers)} provedores disponíveis:") | |
for provider in self.available_providers: | |
print(f" - {provider.name}") | |
self._build_models_cache() | |
return True | |
async def _test_provider(self, provider: LLMProvider): | |
"""Testa um provedor específico""" | |
try: | |
is_available = await provider.test_availability() | |
if is_available: | |
print(f"✅ {provider.name} - Disponível") | |
else: | |
print(f"❌ {provider.name} - Indisponível") | |
except Exception as e: | |
print(f"❌ {provider.name} - Erro: {e}") | |
def _build_models_cache(self): | |
"""Constrói cache de modelos disponíveis""" | |
self.models_cache = {} | |
for provider in self.available_providers: | |
for model_name, model_id in provider.models: | |
full_name = f"{model_name} ({provider.name})" | |
self.models_cache[full_name] = { | |
"provider": provider, | |
"model_id": model_id, | |
"provider_name": provider.name | |
} | |
def get_available_models(self) -> Dict[str, str]: | |
"""Retorna modelos disponíveis em formato compatível""" | |
return {name: info["model_id"] for name, info in self.models_cache.items()} | |
async def query_model(self, messages: List[Dict], model_name: str = None, **kwargs) -> str: | |
"""Faz query tentando diferentes provedores como fallback""" | |
if not self.available_providers: | |
return "Nenhum provedor de LLM disponível. Por favor, configure ao menos um provedor." | |
# Se modelo específico foi solicitado | |
if model_name and model_name in self.models_cache: | |
provider_info = self.models_cache[model_name] | |
try: | |
result = await provider_info["provider"].query_model( | |
messages, | |
provider_info["model_id"], | |
**kwargs | |
) | |
if not result.startswith("Erro"): | |
return result | |
except Exception as e: | |
print(f"Erro com {provider_info['provider_name']}: {e}") | |
# Fallback: tenta todos os provedores disponíveis | |
for provider in self.available_providers: | |
try: | |
if provider.models: | |
default_model = provider.models[0][1] # Pega o primeiro modelo | |
result = await provider.query_model(messages, default_model, **kwargs) | |
if not result.startswith("Erro"): | |
return f"{result}\n\n_Resposta gerada por: {provider.name}_" | |
except Exception as e: | |
print(f"Erro com provedor {provider.name}: {e}") | |
continue | |
return "Desculpe, todos os provedores de LLM estão indisponíveis no momento. Tente novamente mais tarde." | |
# --- Integração com o código original --- | |
# Variáveis globais atualizadas | |
MODELS = {} | |
DEFAULT_MODEL = "Mistral 7B (Hugging Face)" | |
multi_client = MultiProviderLLMClient() | |
# Gerenciamento de Sessão (mantido igual) | |
user_sessions: Dict[str, Dict[str, List | Dict]] = {} | |
MAX_MEMORY_LENGTH = 5 | |
def get_session_memory_path(session_id: str) -> str: | |
"""Retorna o caminho do arquivo de memória para a sessão.""" | |
return f"conversation_memory_{session_id}.json" | |
def load_conversation_memory(session_id: str): | |
"""Carrega a memória da sessão do usuário.""" | |
if session_id in user_sessions: | |
return | |
memory_path = get_session_memory_path(session_id) | |
session_data = {'conversation': [], 'user_profile': {'nivel': 'intermediario', 'total_perguntas': 0}} | |
if os.path.exists(memory_path): | |
try: | |
with open(memory_path, 'r', encoding='utf-8') as f: | |
session_data = json.load(f) | |
except Exception as e: | |
print(f"Erro ao carregar memória para sessão '{session_id}': {e}") | |
user_sessions[session_id] = session_data | |
def save_conversation_memory(session_id: str): | |
"""Salva a memória da sessão do usuário.""" | |
memory_path = get_session_memory_path(session_id) | |
try: | |
with open(memory_path, 'w', encoding='utf-8') as f: | |
json.dump(user_sessions[session_id], f, ensure_ascii=False, indent=2) | |
except Exception as e: | |
print(f"Erro ao salvar memória para sessão '{session_id}': {e}") | |
def add_to_memory(session_id: str, user_message: str, assistant_response: str): | |
"""Adiciona uma troca de mensagens à memória da sessão.""" | |
load_conversation_memory(session_id) | |
conversation = user_sessions[session_id]['conversation'] | |
conversation.extend([ | |
{"role": "user", "content": user_message, "timestamp": time.time()}, | |
{"role": "assistant", "content": assistant_response, "timestamp": time.time()} | |
]) | |
if len(conversation) > MAX_MEMORY_LENGTH * 2: | |
user_sessions[session_id]['conversation'] = conversation[-MAX_MEMORY_LENGTH * 2:] | |
save_conversation_memory(session_id) | |
def update_user_profile(session_id: str, user_message: str): | |
"""Atualiza o perfil do usuário com base na mensagem.""" | |
load_conversation_memory(session_id) | |
profile = user_sessions[session_id]['user_profile'] | |
message_lower = user_message.lower() | |
if any(word in message_lower for word in ['básico', 'iniciante']): | |
profile['nivel'] = 'iniciante' | |
elif any(word in message_lower for word in ['avançado', 'complexo']): | |
profile['nivel'] = 'avançado' | |
topics = { | |
'java': ['java', 'classe', 'objeto'], | |
'web': ['html', 'css', 'javascript'], | |
'ia': ['inteligência artificial', 'machine learning'] | |
} | |
for topic, keywords in topics.items(): | |
if any(keyword in message_lower for keyword in keywords): | |
profile[f'interesse_{topic}'] = profile.get(f'interesse_{topic}', 0) + 1 | |
profile['total_perguntas'] = profile.get('total_perguntas', 0) + 1 | |
user_sessions[session_id]['user_profile'] = profile | |
def get_conversation_context(session_id: str) -> str: | |
"""Gera o contexto da conversa recente.""" | |
load_conversation_memory(session_id) | |
conversation = user_sessions[session_id]['conversation'][-4:] | |
if not conversation: | |
return "" | |
return "\n".join(f"{msg['role'].upper()}: {msg['content']}" for msg in conversation) | |
def get_user_profile_context(session_id: str) -> str: | |
"""Gera o contexto do perfil do usuário.""" | |
load_conversation_memory(session_id) | |
profile = user_sessions[session_id]['user_profile'] | |
context = f"Nível: {profile.get('nivel', 'intermediario')}\n" | |
context += f"Total de perguntas: {profile.get('total_perguntas', 0)}\n" | |
interesses = [f"{k.replace('interesse_', '').title()} ({v})" for k, v in profile.items() if k.startswith('interesse_')] | |
if interesses: | |
context += f"Interesses: {', '.join(interesses)}\n" | |
return context | |
def clear_memory(session_id: str) -> str: | |
"""Limpa a memória de uma sessão específica.""" | |
if session_id in user_sessions: | |
del user_sessions[session_id] | |
memory_path = get_session_memory_path(session_id) | |
if os.path.exists(memory_path): | |
os.remove(memory_path) | |
return "Memória limpa com sucesso!" | |
# --- RAG (mantido igual) --- | |
vector_store: Optional[FAISS] = None | |
def get_all_blog_links(url: str) -> set: | |
"""Coleta todos os links do blog.""" | |
links = {url} | |
visited = set() | |
while links: | |
current_url = links.pop() | |
if current_url in visited: | |
continue | |
try: | |
response = requests.get(current_url, timeout=500) | |
soup = BeautifulSoup(response.content, 'html.parser') | |
visited.add(current_url) | |
for link in soup.find_all('a', href=True): | |
href = urljoin(url, link['href']) | |
if urlparse(href).netloc == urlparse(url).netloc and '/tag/' not in href and '/category/' not in href: | |
links.add(href) | |
except Exception as e: | |
print(f"Erro ao acessar {current_url}: {e}") | |
return visited | |
def scrape_text_from_url(url: str) -> str: | |
"""Extrai texto de uma URL.""" | |
try: | |
response = requests.get(url, timeout=500) | |
soup = BeautifulSoup(response.content, 'html.parser') | |
content = soup.find('article') or soup.find('main') | |
return content.get_text(separator='\n', strip=True) if content else "" | |
except Exception as e: | |
print(f"Erro ao raspar {url}: {e}") | |
return "" | |
def build_and_save_vector_store(): | |
"""Constrói e salva o vector store.""" | |
global vector_store | |
links = get_all_blog_links(BLOG_URL) | |
texts = [scrape_text_from_url(link) for link in links if scrape_text_from_url(link)] | |
if not texts: | |
return "Nenhum conteúdo encontrado." | |
text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=150) | |
chunks = text_splitter.create_documents(texts) | |
embeddings = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2") | |
vector_store = FAISS.from_documents(chunks, embeddings) | |
with open(VECTOR_STORE_PATH, "wb") as f: | |
pickle.dump(vector_store, f) | |
with open(PROCESSED_URLS_PATH, "wb") as f: | |
pickle.dump(links, f) | |
return f"Vector store criado com {len(chunks)} chunks." | |
def load_vector_store(): | |
"""Carrega o vector store.""" | |
global vector_store | |
if os.path.exists(VECTOR_STORE_PATH): | |
with open(VECTOR_STORE_PATH, "rb") as f: | |
vector_store = pickle.load(f) | |
else: | |
build_and_save_vector_store() | |
def retrieve_context_from_blog(query: str, k: int = 4) -> str: | |
"""Busca contexto relevante no vector store.""" | |
if vector_store: | |
try: | |
results = vector_store.similarity_search(query, k=k) | |
return "\n".join(doc.page_content for doc in results) | |
except Exception as e: | |
print(f"Erro ao buscar contexto: {e}") | |
return "" | |
# --- Função Principal Atualizada --- | |
async def responder_como_aldo(session_id: str, pergunta: str, modelo: str = DEFAULT_MODEL) -> str: | |
"""Gera resposta como Dr. Aldo Henrique usando múltiplos provedores.""" | |
if not pergunta.strip(): | |
return "Por favor, faça uma pergunta válida." | |
load_conversation_memory(session_id) | |
update_user_profile(session_id, pergunta) | |
contexto = [] | |
if perfil := get_user_profile_context(session_id): | |
contexto.append(f"**Perfil do Usuário**\n{perfil}") | |
if conversa := get_conversation_context(session_id): | |
contexto.append(f"**Conversa Anterior**\n{conversa}") | |
if blog := retrieve_context_from_blog(pergunta): | |
contexto.append(f"**Contexto do Blog**\n{blog}") | |
system_prompt = """Você é o Dr. Aldo Henrique, | |
Doutor em Ciências da Computação pela UnB (2024), mestre em Ciências da Computação pela UnB (2017) e bacharel em Sistemas de Informação pela UFV (2014). | |
Professor universitário, onde leciona disciplinas como Algoritmos, Inteligência Artificial, Ciência de Dados e Mineração de Dados. | |
Atua como analista de sistemas nível 4. | |
Regras de conduta: | |
- Sempre coloque uma piada ou trocadilho no final da resposta. | |
- Responda em português, de forma clara, amigável e educativa. | |
- Explique conceitos antes de mostrar soluções. | |
- Use exemplos práticos. | |
- Considere o nível do usuário (iniciante, intermediário ou avançado). | |
- Use Markdown para formatar respostas, com ``` para blocos de código. | |
- Dentro do código sempre coloque comentários explicando para o alunos aprender com os comentários. | |
- Foque em tecnologia; se a pergunta for fora do escopo, informe educadamente que não é seu domínio. | |
""" | |
conteudo_contexto = "\n".join(contexto) | |
mensagem_usuario = f"{conteudo_contexto}\n\n**Pergunta**: {pergunta}" | |
messages = [ | |
{"role": "system", "content": system_prompt}, | |
{"role": "user", "content": mensagem_usuario} | |
] | |
# Usa o cliente multi-provedor | |
resposta = await multi_client.query_model(messages, modelo) | |
add_to_memory(session_id, pergunta, resposta) | |
return resposta | |
# --- Função de Inicialização Atualizada --- | |
async def inicializar_sistema(): | |
""" | |
Inicializa o sistema com múltiplos provedores de LLM. | |
Retorna uma tupla: (status: bool, models: dict) | |
""" | |
print("Inicializando Sistema Multi-Provedor Dr. Aldo...") | |
print("=" * 60) | |
# Inicializa os provedores | |
success = await multi_client.initialize_providers() | |
if not success: | |
print("❌ Nenhum provedor de LLM disponível!") | |
return False, {} | |
# Atualiza variáveis globais | |
global MODELS, DEFAULT_MODEL | |
MODELS = multi_client.get_available_models() | |
if MODELS: | |
# Define o primeiro modelo como padrão | |
DEFAULT_MODEL = list(MODELS.keys())[0] | |
print(f"✅ Modelo padrão definido: {DEFAULT_MODEL}") | |
# Carrega o vector store | |
load_vector_store() | |
print("=" * 60) | |
print("✅ Sistema inicializado com sucesso!") | |
print(f"📊 Total de modelos disponíveis: {len(MODELS)}") | |
print(f"🔧 Provedores ativos: {len(multi_client.available_providers)}") | |
return True, MODELS | |
# --- Função de Teste de Modelos --- | |
async def testar_modelos_disponiveis(): | |
"""Testa todos os modelos disponíveis""" | |
print("\n" + "=" * 60) | |
print("TESTANDO MODELOS DISPONÍVEIS") | |
print("=" * 60) | |
test_message = [ | |
{"role": "system", "content": "Você é um assistente útil."}, | |
{"role": "user", "content": "Diga apenas 'Olá, funcionando!' em português."} | |
] | |
for model_name in MODELS.keys(): | |
print(f"\n🧪 Testando: {model_name}") | |
try: | |
result = await multi_client.query_model(test_message, model_name) | |
if result and not result.startswith("Erro"): | |
print(f"✅ {model_name} - Funcionando") | |
print(f" Resposta: {result[:100]}...") | |
else: | |
print(f"❌ {model_name} - Falha: {result}") | |
except Exception as e: | |
print(f"❌ {model_name} - Erro: {e}") | |
# Pequena pausa entre testes | |
await asyncio.sleep(1) | |
# --- Função de Configuração de Provedores --- | |
def configurar_provedores(): | |
"""Exibe instruções para configurar os provedores""" | |
print("\n" + "=" * 60) | |
print("CONFIGURAÇÃO DE PROVEDORES LLM GRATUITOS") | |
print("=" * 60) | |
configs = { | |
"Hugging Face": { | |
"var": HF_TOKEN, | |
"url": "https://huggingface.co/settings/tokens", | |
"descricao": "Token gratuito para usar modelos do Hugging Face" | |
}, | |
"Groq": { | |
"var": "GROQ_API_KEY", | |
"url": "https://console.groq.com/keys", | |
"descricao": "API key gratuita (limite diário)" | |
}, | |
"OpenRouter": { | |
"var": OR_TOKEN, | |
"url": "https://openrouter.ai/keys", | |
"descricao": "Alguns modelos gratuitos disponíveis" | |
}, | |
"Together AI": { | |
"var": "", | |
"url": "https://api.together.xyz/settings/api-keys", | |
"descricao": "Credits gratuitos iniciais" | |
}, | |
"Ollama": { | |
"var": "Não necessário", | |
"url": "https://ollama.ai/", | |
"descricao": "Instalação local - totalmente gratuito" | |
} | |
} | |
print("Para usar todos os provedores, configure as seguintes variáveis de ambiente:") | |
print() | |
for provedor, info in configs.items(): | |
status = "✅ Configurado" if os.getenv(info["var"]) else "❌ Não configurado" | |
print(f"{provedor}:") | |
print(f" • Variável: {info['var']}") | |
print(f" • URL: {info['url']}") | |
print(f" • Descrição: {info['descricao']}") | |
print(f" • Status: {status}") | |
print() | |
print("📝 Exemplo de configuração no seu .env:") | |
print("export HF_TOKEN='seu_token_aqui'") | |
print("export GROQ_API_KEY='sua_chave_aqui'") | |
print("export OPENROUTER_API_KEY='sua_chave_aqui'") | |
print("export TOGETHER_API_KEY='sua_chave_aqui'") | |
# --- Função de Monitoramento --- | |
async def monitorar_provedores(): | |
"""Monitora o status dos provedores""" | |
print("\n" + "=" * 60) | |
print("MONITORAMENTO DE PROVEDORES") | |
print("=" * 60) | |
for provider in multi_client.available_providers: | |
print(f"\n🔍 Testando: {provider.name}") | |
try: | |
is_available = await provider.test_availability() | |
status = "✅ Online" if is_available else "❌ Offline" | |
print(f" Status: {status}") | |
print(f" Modelos: {len(provider.models)}") | |
except Exception as e: | |
print(f" Status: ❌ Erro - {e}") | |
# --- Função Principal Wrapper (para compatibilidade) --- | |
def responder_como_aldo_sync(session_id: str, pergunta: str, modelo: str = DEFAULT_MODEL) -> str: | |
"""Wrapper síncrono para compatibilidade com código existente""" | |
try: | |
loop = asyncio.get_event_loop() | |
if loop.is_running(): | |
# Se já existe um loop rodando, cria uma nova task | |
import concurrent.futures | |
with concurrent.futures.ThreadPoolExecutor() as executor: | |
future = executor.submit(asyncio.run, responder_como_aldo(session_id, pergunta, modelo)) | |
return future.result() | |
else: | |
return loop.run_until_complete(responder_como_aldo(session_id, pergunta, modelo)) | |
except Exception as e: | |
return f"Erro ao processar resposta: {str(e)}" | |
# --- Função de Salvamento de Configurações --- | |
def salvar_configuracoes(): | |
"""Salva as configurações atuais""" | |
config = { | |
"provedores_disponiveis": [p.name for p in multi_client.available_providers], | |
"modelos_disponiveis": list(MODELS.keys()), | |
"modelo_padrao": DEFAULT_MODEL, | |
"timestamp": time.time() | |
} | |
try: | |
with open("llm_config.json", "w", encoding="utf-8") as f: | |
json.dump(config, f, ensure_ascii=False, indent=2) | |
print("✅ Configurações salvas em 'llm_config.json'") | |
except Exception as e: | |
print(f"❌ Erro ao salvar configurações: {e}") | |
# --- Função Principal --- | |
async def main(): | |
"""Função principal para teste e inicialização""" | |
print("🚀 Iniciando Sistema Multi-Provedor LLM") | |
print("=" * 60) | |
# Mostra configurações | |
configurar_provedores() | |
# Inicializa sistema | |
status, models = await inicializar_sistema_sync() | |
if not status: | |
print("❌ Sistema não pôde ser inicializado!") | |
return | |
# Salva configurações | |
salvar_configuracoes() | |
# Testa modelos | |
await testar_modelos_disponiveis() | |
# Monitora provedores | |
await monitorar_provedores() | |
# Teste básico do chatbot | |
print("\n" + "=" * 60) | |
print("TESTE DO CHATBOT") | |
print("=" * 60) | |
session_id = "teste_multi_provider" | |
test_questions = [ | |
"O que é Python?", | |
"Mostre um exemplo de código Python simples", | |
"Qual a diferença entre lista e tupla em Python?" | |
] | |
for i, question in enumerate(test_questions, 1): | |
print(f"\n📝 Pergunta {i}: {question}") | |
print("-" * 40) | |
try: | |
resposta = await responder_como_aldo(session_id, question) | |
print(f"🤖 Resposta: {resposta[:500]}...") | |
except Exception as e: | |
print(f"❌ Erro: {e}") | |
await asyncio.sleep(2) # Pausa entre perguntas | |
# Limpa memória do teste | |
print(f"\n🧹 {clear_memory(session_id)}") | |
# --- Compatibilidade com código original --- | |
def inicializar_sistema_sync(): | |
"""Função síncrona para compatibilidade """ | |
try: | |
# A forma mais simples de rodar um evento asyncio a partir de um contexto síncrono | |
# é usar asyncio.run(). A lógica de checar o loop pode ser simplificada. | |
return asyncio.run(inicializar_sistema()) | |
except Exception as e: | |
print(f"Erro na inicialização: {e}") | |
return False, {} | |
# Para compatibilidade, mantém a função original | |
responder_como_aldo_original = responder_como_aldo_sync | |
if __name__ == "__main__": | |
# Executa o sistema | |
asyncio.run(main()) |