Spaces:
Running
Running
import requests | |
import os | |
import json | |
import time | |
import pickle | |
from typing import Dict, List, Optional, Tuple | |
from bs4 import BeautifulSoup | |
from urllib.parse import urljoin, urlparse | |
from langchain.text_splitter import RecursiveCharacterTextSplitter | |
from langchain_community.vectorstores import FAISS | |
from langchain_community.embeddings import HuggingFaceEmbeddings | |
from huggingface_hub import InferenceClient | |
import asyncio | |
# --- Configurações --- | |
BLOG_URL = "https://aldohenrique.com.br/" | |
VECTOR_STORE_PATH = "faiss_index_store.pkl" | |
PROCESSED_URLS_PATH = "processed_urls.pkl" | |
HF_TOKEN = os.getenv("HF_TOKEN") | |
# Validação do token com mensagem mais clara | |
if not HF_TOKEN: | |
print("ERRO: Token HF_TOKEN não encontrado!") | |
print("Execute: export HF_TOKEN='seu_token_aqui' ou defina como variável de ambiente") | |
exit(1) | |
print(f"Token HF encontrado: {HF_TOKEN[:10]}...") | |
# --- Modelos para teste (versão com InferenceClient) --- | |
MODELS = {} | |
# Lista de modelos mais estáveis e com maior chance de funcionar | |
NEW_MODELS_TO_TEST = [ | |
("Llama 3.2 3B", "meta-llama/Llama-3.2-3B-Instruct"), | |
("Mistral 7B", "mistralai/Mistral-7B-Instruct-v0.3"), | |
("Mistral Nemo", "mistralai/Mistral-Nemo-Instruct-2407"), | |
("Phi-3.5 Mini", "microsoft/Phi-3.5-mini-instruct"), | |
("Qwen2.5 7B", "Qwen/Qwen2.5-7B-Instruct"), | |
("Gemma 2 2B", "google/gemma-2-2b-it"), | |
("CodeLlama 7B", "codellama/CodeLlama-7b-Instruct-hf"), | |
("Zephyr 7B", "HuggingFaceH4/zephyr-7b-beta"), | |
("IBM 2B", "ibm-granite/granite-3.3-2b-instruct") | |
] | |
DEFAULT_MODEL = "Llama 3.2 3B" | |
# --- Gerenciamento de Sessão --- | |
user_sessions: Dict[str, Dict[str, List | Dict]] = {} | |
MAX_MEMORY_LENGTH = 8 # Aumentado para ter mais contexto útil | |
def get_session_memory_path(session_id: str) -> str: | |
"""Retorna o caminho do arquivo de memória para a sessão.""" | |
return f"conversation_memory_{session_id}.json" | |
def load_conversation_memory(session_id: str): | |
"""Carrega a memória da sessão do usuário.""" | |
if session_id in user_sessions: | |
return | |
memory_path = get_session_memory_path(session_id) | |
session_data = {'conversation': [], 'user_profile': {'nivel': 'intermediario', 'total_perguntas': 0}} | |
if os.path.exists(memory_path): | |
try: | |
with open(memory_path, 'r', encoding='utf-8') as f: | |
session_data = json.load(f) | |
except Exception as e: | |
print(f"Erro ao carregar memória para sessão '{session_id}': {e}") | |
user_sessions[session_id] = session_data | |
def save_conversation_memory(session_id: str): | |
"""Salva a memória da sessão do usuário.""" | |
memory_path = get_session_memory_path(session_id) | |
try: | |
with open(memory_path, 'w', encoding='utf-8') as f: | |
json.dump(user_sessions[session_id], f, ensure_ascii=False, indent=2) | |
except Exception as e: | |
print(f"Erro ao salvar memória para sessão '{session_id}': {e}") | |
def add_to_memory(session_id: str, user_message: str, assistant_response: str): | |
"""Adiciona uma troca de mensagens à memória da sessão.""" | |
load_conversation_memory(session_id) | |
conversation = user_sessions[session_id]['conversation'] | |
conversation.extend([ | |
{"role": "user", "content": user_message, "timestamp": time.time()}, | |
{"role": "assistant", "content": assistant_response, "timestamp": time.time()} | |
]) | |
# Mantém apenas as últimas conversas para evitar contexto muito longo | |
if len(conversation) > MAX_MEMORY_LENGTH * 2: | |
user_sessions[session_id]['conversation'] = conversation[-MAX_MEMORY_LENGTH * 2:] | |
save_conversation_memory(session_id) | |
def update_user_profile(session_id: str, user_message: str): | |
"""Atualiza o perfil do usuário com base na mensagem.""" | |
load_conversation_memory(session_id) | |
profile = user_sessions[session_id]['user_profile'] | |
message_lower = user_message.lower() | |
if any(word in message_lower for word in ['básico', 'iniciante']): | |
profile['nivel'] = 'iniciante' | |
elif any(word in message_lower for word in ['avançado', 'complexo']): | |
profile['nivel'] = 'avançado' | |
topics = { | |
'java': ['java', 'classe', 'objeto'], | |
'web': ['html', 'css', 'javascript'], | |
'ia': ['inteligência artificial', 'machine learning'] | |
} | |
for topic, keywords in topics.items(): | |
if any(keyword in message_lower for keyword in keywords): | |
profile[f'interesse_{topic}'] = profile.get(f'interesse_{topic}', 0) + 1 | |
profile['total_perguntas'] = profile.get('total_perguntas', 0) + 1 | |
user_sessions[session_id]['user_profile'] = profile | |
def get_conversation_messages(session_id: str) -> List[Dict]: | |
""" | |
NOVA FUNÇÃO: Retorna as mensagens da conversa em formato adequado para o modelo. | |
Esta é a chave para resolver o problema de duplicação! | |
""" | |
load_conversation_memory(session_id) | |
conversation = user_sessions[session_id]['conversation'] | |
# Pega apenas as últimas 6 mensagens (3 trocas) para não sobrecarregar | |
recent_conversation = conversation[-6:] if len(conversation) > 6 else conversation | |
# Converte para formato de mensagens do modelo | |
messages = [] | |
for msg in recent_conversation: | |
# Remove metadados desnecessários das mensagens antigas | |
clean_content = msg['content'] | |
# Remove a linha de informação do modelo das respostas antigas | |
if msg['role'] == 'assistant' and '*Resposta gerada pelo modelo:' in clean_content: | |
clean_content = clean_content.split('*Resposta gerada pelo modelo:')[0].strip() | |
messages.append({ | |
"role": msg['role'], | |
"content": clean_content | |
}) | |
return messages | |
def get_user_profile_context(session_id: str) -> str: | |
"""Gera o contexto do perfil do usuário de forma mais concisa.""" | |
load_conversation_memory(session_id) | |
profile = user_sessions[session_id]['user_profile'] | |
# Contexto mais conciso para não poluir o prompt | |
nivel = profile.get('nivel', 'intermediario') | |
total = profile.get('total_perguntas', 0) | |
context_parts = [f"Nível: {nivel}"] | |
# Só inclui interesses se há algum padrão significativo | |
interesses = [k.replace('interesse_', '').title() | |
for k, v in profile.items() | |
if k.startswith('interesse_') and v >= 2] # Só se perguntou pelo menos 2 vezes | |
if interesses: | |
context_parts.append(f"Interesses: {', '.join(interesses)}") | |
return " | ".join(context_parts) | |
def clear_memory(session_id: str) -> str: | |
"""Limpa a memória de uma sessão específica.""" | |
if session_id in user_sessions: | |
del user_sessions[session_id] | |
memory_path = get_session_memory_path(session_id) | |
if os.path.exists(memory_path): | |
os.remove(memory_path) | |
return "Memória limpa com sucesso!" | |
# --- RAG (Crawling e Vector Store) --- | |
vector_store: Optional[FAISS] = None | |
def get_all_blog_links(url: str) -> set: | |
"""Coleta todos os links do blog.""" | |
links = {url} | |
visited = set() | |
while links: | |
current_url = links.pop() | |
if current_url in visited: | |
continue | |
try: | |
response = requests.get(current_url, timeout=60) | |
soup = BeautifulSoup(response.content, 'html.parser') | |
visited.add(current_url) | |
for link in soup.find_all('a', href=True): | |
href = urljoin(url, link['href']) | |
if urlparse(href).netloc == urlparse(url).netloc and '/tag/' not in href and '/category/' not in href: | |
links.add(href) | |
except Exception as e: | |
print(f"Erro ao acessar {current_url}: {e}") | |
return visited | |
def scrape_text_from_url(url: str) -> str: | |
"""Extrai texto de uma URL.""" | |
try: | |
response = requests.get(url, timeout=10) | |
soup = BeautifulSoup(response.content, 'html.parser') | |
content = soup.find('article') or soup.find('main') | |
return content.get_text(separator='\n', strip=True) if content else "" | |
except Exception as e: | |
print(f"Erro ao raspar {url}: {e}") | |
return "" | |
def build_and_save_vector_store(): | |
"""Constrói e salva o vector store.""" | |
global vector_store | |
print("Construindo vector store...") | |
try: | |
links = get_all_blog_links(BLOG_URL) | |
texts = [scrape_text_from_url(link) for link in links if scrape_text_from_url(link)] | |
if not texts: | |
print("Nenhum conteúdo encontrado no blog.") | |
return "Nenhum conteúdo encontrado." | |
text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=150) | |
chunks = text_splitter.create_documents(texts) | |
embeddings = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2") | |
vector_store = FAISS.from_documents(chunks, embeddings) | |
with open(VECTOR_STORE_PATH, "wb") as f: | |
pickle.dump(vector_store, f) | |
with open(PROCESSED_URLS_PATH, "wb") as f: | |
pickle.dump(links, f) | |
print(f"Vector store criado com {len(chunks)} chunks.") | |
return f"Vector store criado com {len(chunks)} chunks." | |
except Exception as e: | |
print(f"Erro ao construir vector store: {e}") | |
return f"Erro ao construir vector store: {e}" | |
def load_vector_store(): | |
"""Carrega o vector store.""" | |
global vector_store | |
try: | |
if os.path.exists(VECTOR_STORE_PATH): | |
with open(VECTOR_STORE_PATH, "rb") as f: | |
vector_store = pickle.load(f) | |
print("Vector store carregado com sucesso.") | |
else: | |
print("Vector store não encontrado. Criando novo...") | |
build_and_save_vector_store() | |
except Exception as e: | |
print(f"Erro ao carregar vector store: {e}") | |
print("Tentando criar novo vector store...") | |
build_and_save_vector_store() | |
def retrieve_context_from_blog(query: str, k: int = 3) -> str: | |
"""Busca contexto relevante no vector store - Reduzido para evitar sobrecarga.""" | |
if vector_store: | |
try: | |
results = vector_store.similarity_search(query, k=k) | |
# Limita o tamanho do contexto para evitar tokens excessivos | |
context_parts = [] | |
total_chars = 0 | |
max_chars = 1500 # Limite de caracteres do contexto do blog | |
for doc in results: | |
if total_chars + len(doc.page_content) > max_chars: | |
break | |
context_parts.append(doc.page_content) | |
total_chars += len(doc.page_content) | |
return "\n---\n".join(context_parts) | |
except Exception as e: | |
print(f"Erro ao buscar contexto: {e}") | |
return "" | |
# --- Inference Client (Versão Melhorada com huggingface_hub) --- | |
class HuggingFaceInferenceClient: | |
def __init__(self, token: str): | |
self.token = token | |
self.clients = {} # Cache de clientes para diferentes modelos | |
def get_client(self, model_name: str) -> InferenceClient: | |
"""Obtém ou cria um cliente para o modelo especificado.""" | |
if model_name not in self.clients: | |
self.clients[model_name] = InferenceClient( | |
model=model_name, | |
token=self.token | |
) | |
return self.clients[model_name] | |
def check_model_status(self, model_name: str) -> Tuple[bool, str]: | |
"""Verifica se um modelo está disponível.""" | |
try: | |
print(f" Testando {model_name}...") | |
client = self.get_client(model_name) | |
# Teste simples com mensagem básica | |
test_messages = [ | |
{"role": "user", "content": "Hello"} | |
] | |
# Tenta fazer uma requisição de teste | |
response = client.chat_completion( | |
messages=test_messages, | |
max_tokens=5, | |
temperature=0.1 | |
) | |
if response and hasattr(response, 'choices') and len(response.choices) > 0: | |
return True, "Modelo disponível" | |
else: | |
return False, "Resposta inválida do modelo" | |
except Exception as e: | |
error_msg = str(e).lower() | |
if 'loading' in error_msg or 'currently loading' in error_msg: | |
return False, "Modelo carregando" | |
elif 'rate limit' in error_msg: | |
return False, "Rate limit atingido" | |
elif 'token' in error_msg or 'unauthorized' in error_msg: | |
return False, "Token inválido" | |
elif 'model not found' in error_msg: | |
return False, "Modelo não encontrado" | |
else: | |
return False, f"Erro: {str(e)[:100]}" | |
def query_model(self, model_name: str, messages: List[Dict], max_tokens: int = 1500, temperature: float = 0.5) -> str: | |
"""Faz requisição ao modelo usando chat completion - Reduzido max_tokens.""" | |
try: | |
client = self.get_client(model_name) | |
# Faz a requisição usando chat completion | |
response = client.chat_completion( | |
messages=messages, | |
max_tokens=max_tokens, | |
temperature=temperature, | |
stream=False | |
) | |
# Extrai a resposta | |
if response and hasattr(response, 'choices') and len(response.choices) > 0: | |
content = response.choices[0].message.content | |
return content.strip() if content else "Resposta vazia do modelo" | |
else: | |
return "Erro: Resposta inválida do modelo" | |
except Exception as e: | |
error_msg = str(e) | |
if 'loading' in error_msg.lower(): | |
return f"Modelo {model_name} está carregando. Tente novamente em alguns minutos." | |
elif 'rate limit' in error_msg.lower(): | |
return "Rate limit atingido. Aguarde alguns momentos e tente novamente." | |
elif 'token' in error_msg.lower() or 'unauthorized' in error_msg.lower(): | |
return "Erro de autenticação. Verifique seu token HuggingFace." | |
else: | |
return f"Erro ao consultar modelo: {error_msg}" | |
# --- Função para Testar e Atualizar Modelos --- | |
def test_and_update_models() -> int: | |
"""Testa modelos e adiciona TODOS à lista MODELS, independente da disponibilidade.""" | |
print("Testando disponibilidade dos modelos...") | |
print(f"Token HF disponível: {'Sim' if HF_TOKEN else 'Não'}") | |
print("-" * 60) | |
inference_client = HuggingFaceInferenceClient(HF_TOKEN) | |
model_status = {} # Para armazenar status de cada modelo | |
# Testa todos os modelos mas adiciona TODOS à lista MODELS | |
for model_label, model_name in NEW_MODELS_TO_TEST: | |
try: | |
is_available, message = inference_client.check_model_status(model_name) | |
# Armazena o status para exibição | |
model_status[model_label] = { | |
'available': is_available, | |
'message': message, | |
'model_name': model_name | |
} | |
if is_available: | |
print(f"✓ {model_label} - {message}") | |
else: | |
print(f"⚠ {model_label} - {message} (adicionado mesmo assim)") | |
except Exception as e: | |
print(f"⚠ {model_label} - Erro: {str(e)} (adicionado mesmo assim)") | |
model_status[model_label] = { | |
'available': False, | |
'message': f"Erro: {str(e)}", | |
'model_name': model_name | |
} | |
# Pausa para evitar rate limiting | |
time.sleep(3) | |
# SEMPRE adiciona TODOS os modelos, independente da disponibilidade | |
global MODELS | |
MODELS.clear() | |
for model_label, model_name in NEW_MODELS_TO_TEST: | |
MODELS[model_label] = model_name | |
print("\n" + "=" * 60) | |
print("TODOS OS MODELOS ADICIONADOS (INDEPENDENTE DE DISPONIBILIDADE):") | |
print("=" * 60) | |
for i, (label, name) in enumerate(MODELS.items(), 1): | |
status_info = model_status.get(label, {}) | |
status_symbol = "✓" if status_info.get('available', False) else "⚠" | |
status_msg = status_info.get('message', 'Status desconhecido') | |
print(f"{i}. {status_symbol} {label} ({name}) - {status_msg}") | |
print(f"\nTOTAL: {len(MODELS)} modelos adicionados") | |
print("=" * 60) | |
# Salva lista completa (incluindo status) | |
try: | |
models_with_status = {} | |
for label, name in MODELS.items(): | |
status_info = model_status.get(label, {}) | |
models_with_status[label] = { | |
'model_name': name, | |
'available': status_info.get('available', False), | |
'status_message': status_info.get('message', 'Status desconhecido'), | |
'last_checked': time.time() | |
} | |
with open("models_available.json", "w", encoding="utf-8") as f: | |
json.dump(models_with_status, f, ensure_ascii=False, indent=2) | |
print("Lista completa salva em 'models_available.json'") | |
except Exception as e: | |
print(f"Erro ao salvar lista: {e}") | |
return len(MODELS) | |
# --- Chat Principal (VERSÃO CORRIGIDA) --- | |
def responder_como_aldo(session_id: str, pergunta: str, modelo: str = None) -> str: | |
""" | |
FUNÇÃO PRINCIPAL CORRIGIDA: Gera resposta como Dr. Aldo Henrique sem duplicação. | |
""" | |
if not pergunta.strip(): | |
return "Por favor, faça uma pergunta válida." | |
# Usar primeiro modelo disponível se nenhum especificado | |
if not modelo or modelo not in MODELS: | |
if not MODELS: | |
return "Erro: Nenhum modelo disponível!" | |
modelo = list(MODELS.keys())[0] | |
load_conversation_memory(session_id) | |
update_user_profile(session_id, pergunta) | |
# === NOVA ABORDAGEM: Monta mensagens em formato adequado === | |
# 1. Obtém mensagens anteriores da conversa (já formatadas) | |
conversation_messages = get_conversation_messages(session_id) | |
# 2. Monta o system prompt (mais conciso) | |
perfil_info = get_user_profile_context(session_id) | |
system_prompt = f"""Você é o Dr. Aldo Henrique, Doutor em Ciências da Computação pela UnB (2024), professor universitário especializado em: | |
- Algoritmos e Estruturas de Dados | |
- Inteligência Artificial | |
- Ciência de Dados e Mineração de Dados | |
- Desenvolvimento de Software | |
Informações do usuário: {perfil_info} | |
Responda sempre em português, de forma didática e clara: | |
- Explique conceitos antes de mostrar código | |
- Use exemplos práticos adaptados ao nível do usuário | |
- Faça uma pequena observação interessante ou engraçada relacionada à pergunta | |
- Use Markdown para formatação | |
- Adicione comentários explicativos no código""" | |
# 3. Adiciona contexto do blog apenas se relevante (sem repetir na conversa) | |
blog_context = retrieve_context_from_blog(pergunta) | |
if blog_context: | |
system_prompt += f"\n\nContexto do seu blog (use apenas se relevante para a pergunta):\n{blog_context}" | |
# 4. Monta as mensagens finais | |
messages = [{"role": "system", "content": system_prompt}] | |
# Adiciona mensagens anteriores da conversa (sem duplicação) | |
messages.extend(conversation_messages) | |
# Adiciona a pergunta atual | |
messages.append({"role": "user", "content": pergunta}) | |
# === DEBUG: Log do que está sendo enviado === | |
print(f"\n=== DEBUG SESSION {session_id} ===") | |
print(f"Pergunta atual: {pergunta}") | |
print(f"Mensagens na conversa: {len(conversation_messages)}") | |
print(f"Total de mensagens enviadas: {len(messages)}") | |
print("=" * 40) | |
# 5. Faz requisição usando InferenceClient | |
inference_client = HuggingFaceInferenceClient(HF_TOKEN) | |
model_name = MODELS[modelo] | |
resposta = inference_client.query_model(model_name, messages, max_tokens=1200) # Reduzido | |
# 6. Limpa a resposta (remove possíveis repetições) | |
resposta_limpa = resposta.strip() | |
# Remove qualquer repetição óbvia da pergunta | |
if pergunta.lower() in resposta_limpa.lower()[:100]: # Se a pergunta aparece no início | |
lines = resposta_limpa.split('\n') | |
# Remove linhas que são muito similares à pergunta | |
filtered_lines = [] | |
for line in lines: | |
if not (len(line.strip()) > 0 and | |
any(word in line.lower() for word in pergunta.lower().split() if len(word) > 3) and | |
len(line.strip()) < len(pergunta) * 1.5): | |
filtered_lines.append(line) | |
resposta_limpa = '\n'.join(filtered_lines).strip() | |
# 7. Adiciona informação sobre modelo usado (mais discreta) | |
resposta_final = f"{resposta_limpa}\n\n*— {modelo}*" | |
# 8. Salva na memória (a resposta limpa, sem a informação do modelo) | |
add_to_memory(session_id, pergunta, resposta_limpa) | |
return resposta_final | |
# --- Inicialização --- | |
def inicializar_sistema(): | |
"""Inicializa o sistema.""" | |
print("Inicializando Chatbot Dr. Aldo...") | |
print("=" * 50) | |
# Verificar se huggingface_hub está instalado | |
try: | |
from huggingface_hub import InferenceClient | |
print("✓ huggingface_hub disponível") | |
except ImportError: | |
print("⚠ AVISO: huggingface_hub não encontrado!") | |
print("Execute: pip install huggingface_hub") | |
return False, {} | |
# Testa modelos (agora sempre retorna todos) | |
num_total_models = test_and_update_models() | |
# Sistema sempre é considerado inicializado, pois todos os modelos são adicionados | |
print(f"\n✓ Sistema inicializado com {num_total_models} modelos!") | |
print("⚠ Nem todos os modelos podem estar disponíveis no momento.") | |
print("⚠ O sistema tentará usar qualquer modelo selecionado.") | |
# Carrega vector store (opcional) | |
try: | |
load_vector_store() | |
print("✓ Vector store carregado!") | |
except Exception as e: | |
print(f"⚠ Erro ao carregar vector store: {e}") | |
print("⚠ Sistema funcionará sem contexto do blog.") | |
return True, MODELS | |
# --- Execução Principal --- | |
if __name__ == "__main__": | |
status, models = inicializar_sistema() | |
if status: | |
print("\n" + "="*50) | |
print("TESTE DO SISTEMA CORRIGIDO") | |
print("="*50) | |
session_id = "teste_123" | |
# Teste 1 | |
print("\n1. Testando pergunta básica...") | |
resposta1 = responder_como_aldo(session_id, "O que é Python?") | |
print(f"Resposta: {resposta1[:200]}...") | |
# Teste 2 - Pergunta relacionada (para testar memória) | |
print("\n2. Testando pergunta relacionada...") | |
resposta2 = responder_como_aldo(session_id, "Como posso começar a aprender Python?") | |
print(f"Resposta: {resposta2[:200]}...") | |
# Teste 3 - Pergunta completamente diferente | |
print("\n3. Testando pergunta diferente...") | |
resposta3 = responder_como_aldo(session_id, "Explique estruturas de dados") | |
print(f"Resposta: {resposta3[:200]}...") | |
# Limpeza | |
print(f"\n4. {clear_memory(session_id)}") | |
print("\n" + "="*50) | |
print("SISTEMA CORRIGIDO PRONTO!") | |
print("="*50) | |
print("✓ Memória sem duplicação implementada") | |
print("✓ Contexto otimizado para reduzir tokens") | |
print("✓ Respostas mais limpas e diretas") | |
else: | |
print("\n" + "="*50) | |
print("ERRO NA INICIALIZAÇÃO") | |
print("="*50) | |
print("Instale as dependências necessárias:") | |
print("pip install huggingface_hub") |