import requests import os import json import time import pickle from typing import Dict, List, Optional, Tuple from bs4 import BeautifulSoup from urllib.parse import urljoin, urlparse from langchain.text_splitter import RecursiveCharacterTextSplitter from langchain_community.vectorstores import FAISS from langchain_community.embeddings import HuggingFaceEmbeddings from huggingface_hub import InferenceClient import asyncio # --- Configurações --- BLOG_URL = "https://aldohenrique.com.br/" VECTOR_STORE_PATH = "faiss_index_store.pkl" PROCESSED_URLS_PATH = "processed_urls.pkl" HF_TOKEN = os.getenv("HF_TOKEN") # Validação do token com mensagem mais clara if not HF_TOKEN: print("ERRO: Token HF_TOKEN não encontrado!") print("Execute: export HF_TOKEN='seu_token_aqui' ou defina como variável de ambiente") exit(1) print(f"Token HF encontrado: {HF_TOKEN[:10]}...") # --- Modelos para teste (versão com InferenceClient) --- MODELS = {} # Lista de modelos mais estáveis e com maior chance de funcionar NEW_MODELS_TO_TEST = [ ("Llama 3.2 3B", "meta-llama/Llama-3.2-3B-Instruct"), ("Mistral 7B", "mistralai/Mistral-7B-Instruct-v0.3"), ("Mistral Nemo", "mistralai/Mistral-Nemo-Instruct-2407"), ("Phi-3.5 Mini", "microsoft/Phi-3.5-mini-instruct"), ("Qwen2.5 7B", "Qwen/Qwen2.5-7B-Instruct"), ("Gemma 2 2B", "google/gemma-2-2b-it"), ("CodeLlama 7B", "codellama/CodeLlama-7b-Instruct-hf"), ("Zephyr 7B", "HuggingFaceH4/zephyr-7b-beta"), ("IBM 2B", "ibm-granite/granite-3.3-2b-instruct") ] DEFAULT_MODEL = "Llama 3.2 3B" # --- Gerenciamento de Sessão --- user_sessions: Dict[str, Dict[str, List | Dict]] = {} MAX_MEMORY_LENGTH = 8 # Aumentado para ter mais contexto útil def get_session_memory_path(session_id: str) -> str: """Retorna o caminho do arquivo de memória para a sessão.""" return f"conversation_memory_{session_id}.json" def load_conversation_memory(session_id: str): """Carrega a memória da sessão do usuário.""" if session_id in user_sessions: return memory_path = get_session_memory_path(session_id) session_data = {'conversation': [], 'user_profile': {'nivel': 'intermediario', 'total_perguntas': 0}} if os.path.exists(memory_path): try: with open(memory_path, 'r', encoding='utf-8') as f: session_data = json.load(f) except Exception as e: print(f"Erro ao carregar memória para sessão '{session_id}': {e}") user_sessions[session_id] = session_data def save_conversation_memory(session_id: str): """Salva a memória da sessão do usuário.""" memory_path = get_session_memory_path(session_id) try: with open(memory_path, 'w', encoding='utf-8') as f: json.dump(user_sessions[session_id], f, ensure_ascii=False, indent=2) except Exception as e: print(f"Erro ao salvar memória para sessão '{session_id}': {e}") def add_to_memory(session_id: str, user_message: str, assistant_response: str): """Adiciona uma troca de mensagens à memória da sessão.""" load_conversation_memory(session_id) conversation = user_sessions[session_id]['conversation'] conversation.extend([ {"role": "user", "content": user_message, "timestamp": time.time()}, {"role": "assistant", "content": assistant_response, "timestamp": time.time()} ]) # Mantém apenas as últimas conversas para evitar contexto muito longo if len(conversation) > MAX_MEMORY_LENGTH * 2: user_sessions[session_id]['conversation'] = conversation[-MAX_MEMORY_LENGTH * 2:] save_conversation_memory(session_id) def update_user_profile(session_id: str, user_message: str): """Atualiza o perfil do usuário com base na mensagem.""" load_conversation_memory(session_id) profile = user_sessions[session_id]['user_profile'] message_lower = user_message.lower() if any(word in message_lower for word in ['básico', 'iniciante']): profile['nivel'] = 'iniciante' elif any(word in message_lower for word in ['avançado', 'complexo']): profile['nivel'] = 'avançado' topics = { 'java': ['java', 'classe', 'objeto'], 'web': ['html', 'css', 'javascript'], 'ia': ['inteligência artificial', 'machine learning'] } for topic, keywords in topics.items(): if any(keyword in message_lower for keyword in keywords): profile[f'interesse_{topic}'] = profile.get(f'interesse_{topic}', 0) + 1 profile['total_perguntas'] = profile.get('total_perguntas', 0) + 1 user_sessions[session_id]['user_profile'] = profile def get_conversation_messages(session_id: str) -> List[Dict]: """ NOVA FUNÇÃO: Retorna as mensagens da conversa em formato adequado para o modelo. Esta é a chave para resolver o problema de duplicação! """ load_conversation_memory(session_id) conversation = user_sessions[session_id]['conversation'] # Pega apenas as últimas 6 mensagens (3 trocas) para não sobrecarregar recent_conversation = conversation[-6:] if len(conversation) > 6 else conversation # Converte para formato de mensagens do modelo messages = [] for msg in recent_conversation: # Remove metadados desnecessários das mensagens antigas clean_content = msg['content'] # Remove a linha de informação do modelo das respostas antigas if msg['role'] == 'assistant' and '*Resposta gerada pelo modelo:' in clean_content: clean_content = clean_content.split('*Resposta gerada pelo modelo:')[0].strip() messages.append({ "role": msg['role'], "content": clean_content }) return messages def get_user_profile_context(session_id: str) -> str: """Gera o contexto do perfil do usuário de forma mais concisa.""" load_conversation_memory(session_id) profile = user_sessions[session_id]['user_profile'] # Contexto mais conciso para não poluir o prompt nivel = profile.get('nivel', 'intermediario') total = profile.get('total_perguntas', 0) context_parts = [f"Nível: {nivel}"] # Só inclui interesses se há algum padrão significativo interesses = [k.replace('interesse_', '').title() for k, v in profile.items() if k.startswith('interesse_') and v >= 2] # Só se perguntou pelo menos 2 vezes if interesses: context_parts.append(f"Interesses: {', '.join(interesses)}") return " | ".join(context_parts) def clear_memory(session_id: str) -> str: """Limpa a memória de uma sessão específica.""" if session_id in user_sessions: del user_sessions[session_id] memory_path = get_session_memory_path(session_id) if os.path.exists(memory_path): os.remove(memory_path) return "Memória limpa com sucesso!" # --- RAG (Crawling e Vector Store) --- vector_store: Optional[FAISS] = None def get_all_blog_links(url: str) -> set: """Coleta todos os links do blog.""" links = {url} visited = set() while links: current_url = links.pop() if current_url in visited: continue try: response = requests.get(current_url, timeout=60) soup = BeautifulSoup(response.content, 'html.parser') visited.add(current_url) for link in soup.find_all('a', href=True): href = urljoin(url, link['href']) if urlparse(href).netloc == urlparse(url).netloc and '/tag/' not in href and '/category/' not in href: links.add(href) except Exception as e: print(f"Erro ao acessar {current_url}: {e}") return visited def scrape_text_from_url(url: str) -> str: """Extrai texto de uma URL.""" try: response = requests.get(url, timeout=10) soup = BeautifulSoup(response.content, 'html.parser') content = soup.find('article') or soup.find('main') return content.get_text(separator='\n', strip=True) if content else "" except Exception as e: print(f"Erro ao raspar {url}: {e}") return "" def build_and_save_vector_store(): """Constrói e salva o vector store.""" global vector_store print("Construindo vector store...") try: links = get_all_blog_links(BLOG_URL) texts = [scrape_text_from_url(link) for link in links if scrape_text_from_url(link)] if not texts: print("Nenhum conteúdo encontrado no blog.") return "Nenhum conteúdo encontrado." text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=150) chunks = text_splitter.create_documents(texts) embeddings = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2") vector_store = FAISS.from_documents(chunks, embeddings) with open(VECTOR_STORE_PATH, "wb") as f: pickle.dump(vector_store, f) with open(PROCESSED_URLS_PATH, "wb") as f: pickle.dump(links, f) print(f"Vector store criado com {len(chunks)} chunks.") return f"Vector store criado com {len(chunks)} chunks." except Exception as e: print(f"Erro ao construir vector store: {e}") return f"Erro ao construir vector store: {e}" def load_vector_store(): """Carrega o vector store.""" global vector_store try: if os.path.exists(VECTOR_STORE_PATH): with open(VECTOR_STORE_PATH, "rb") as f: vector_store = pickle.load(f) print("Vector store carregado com sucesso.") else: print("Vector store não encontrado. Criando novo...") build_and_save_vector_store() except Exception as e: print(f"Erro ao carregar vector store: {e}") print("Tentando criar novo vector store...") build_and_save_vector_store() def retrieve_context_from_blog(query: str, k: int = 3) -> str: """Busca contexto relevante no vector store - Reduzido para evitar sobrecarga.""" if vector_store: try: results = vector_store.similarity_search(query, k=k) # Limita o tamanho do contexto para evitar tokens excessivos context_parts = [] total_chars = 0 max_chars = 1500 # Limite de caracteres do contexto do blog for doc in results: if total_chars + len(doc.page_content) > max_chars: break context_parts.append(doc.page_content) total_chars += len(doc.page_content) return "\n---\n".join(context_parts) except Exception as e: print(f"Erro ao buscar contexto: {e}") return "" # --- Inference Client (Versão Melhorada com huggingface_hub) --- class HuggingFaceInferenceClient: def __init__(self, token: str): self.token = token self.clients = {} # Cache de clientes para diferentes modelos def get_client(self, model_name: str) -> InferenceClient: """Obtém ou cria um cliente para o modelo especificado.""" if model_name not in self.clients: self.clients[model_name] = InferenceClient( model=model_name, token=self.token ) return self.clients[model_name] def check_model_status(self, model_name: str) -> Tuple[bool, str]: """Verifica se um modelo está disponível.""" try: print(f" Testando {model_name}...") client = self.get_client(model_name) # Teste simples com mensagem básica test_messages = [ {"role": "user", "content": "Hello"} ] # Tenta fazer uma requisição de teste response = client.chat_completion( messages=test_messages, max_tokens=5, temperature=0.1 ) if response and hasattr(response, 'choices') and len(response.choices) > 0: return True, "Modelo disponível" else: return False, "Resposta inválida do modelo" except Exception as e: error_msg = str(e).lower() if 'loading' in error_msg or 'currently loading' in error_msg: return False, "Modelo carregando" elif 'rate limit' in error_msg: return False, "Rate limit atingido" elif 'token' in error_msg or 'unauthorized' in error_msg: return False, "Token inválido" elif 'model not found' in error_msg: return False, "Modelo não encontrado" else: return False, f"Erro: {str(e)[:100]}" def query_model(self, model_name: str, messages: List[Dict], max_tokens: int = 1500, temperature: float = 0.5) -> str: """Faz requisição ao modelo usando chat completion - Reduzido max_tokens.""" try: client = self.get_client(model_name) # Faz a requisição usando chat completion response = client.chat_completion( messages=messages, max_tokens=max_tokens, temperature=temperature, stream=False ) # Extrai a resposta if response and hasattr(response, 'choices') and len(response.choices) > 0: content = response.choices[0].message.content return content.strip() if content else "Resposta vazia do modelo" else: return "Erro: Resposta inválida do modelo" except Exception as e: error_msg = str(e) if 'loading' in error_msg.lower(): return f"Modelo {model_name} está carregando. Tente novamente em alguns minutos." elif 'rate limit' in error_msg.lower(): return "Rate limit atingido. Aguarde alguns momentos e tente novamente." elif 'token' in error_msg.lower() or 'unauthorized' in error_msg.lower(): return "Erro de autenticação. Verifique seu token HuggingFace." else: return f"Erro ao consultar modelo: {error_msg}" # --- Função para Testar e Atualizar Modelos --- def test_and_update_models() -> int: """Testa modelos e adiciona TODOS à lista MODELS, independente da disponibilidade.""" print("Testando disponibilidade dos modelos...") print(f"Token HF disponível: {'Sim' if HF_TOKEN else 'Não'}") print("-" * 60) inference_client = HuggingFaceInferenceClient(HF_TOKEN) model_status = {} # Para armazenar status de cada modelo # Testa todos os modelos mas adiciona TODOS à lista MODELS for model_label, model_name in NEW_MODELS_TO_TEST: try: is_available, message = inference_client.check_model_status(model_name) # Armazena o status para exibição model_status[model_label] = { 'available': is_available, 'message': message, 'model_name': model_name } if is_available: print(f"✓ {model_label} - {message}") else: print(f"⚠ {model_label} - {message} (adicionado mesmo assim)") except Exception as e: print(f"⚠ {model_label} - Erro: {str(e)} (adicionado mesmo assim)") model_status[model_label] = { 'available': False, 'message': f"Erro: {str(e)}", 'model_name': model_name } # Pausa para evitar rate limiting time.sleep(3) # SEMPRE adiciona TODOS os modelos, independente da disponibilidade global MODELS MODELS.clear() for model_label, model_name in NEW_MODELS_TO_TEST: MODELS[model_label] = model_name print("\n" + "=" * 60) print("TODOS OS MODELOS ADICIONADOS (INDEPENDENTE DE DISPONIBILIDADE):") print("=" * 60) for i, (label, name) in enumerate(MODELS.items(), 1): status_info = model_status.get(label, {}) status_symbol = "✓" if status_info.get('available', False) else "⚠" status_msg = status_info.get('message', 'Status desconhecido') print(f"{i}. {status_symbol} {label} ({name}) - {status_msg}") print(f"\nTOTAL: {len(MODELS)} modelos adicionados") print("=" * 60) # Salva lista completa (incluindo status) try: models_with_status = {} for label, name in MODELS.items(): status_info = model_status.get(label, {}) models_with_status[label] = { 'model_name': name, 'available': status_info.get('available', False), 'status_message': status_info.get('message', 'Status desconhecido'), 'last_checked': time.time() } with open("models_available.json", "w", encoding="utf-8") as f: json.dump(models_with_status, f, ensure_ascii=False, indent=2) print("Lista completa salva em 'models_available.json'") except Exception as e: print(f"Erro ao salvar lista: {e}") return len(MODELS) # --- Chat Principal (VERSÃO CORRIGIDA) --- def responder_como_aldo(session_id: str, pergunta: str, modelo: str = None) -> str: """ FUNÇÃO PRINCIPAL CORRIGIDA: Gera resposta como Dr. Aldo Henrique sem duplicação. """ if not pergunta.strip(): return "Por favor, faça uma pergunta válida." # Usar primeiro modelo disponível se nenhum especificado if not modelo or modelo not in MODELS: if not MODELS: return "Erro: Nenhum modelo disponível!" modelo = list(MODELS.keys())[0] load_conversation_memory(session_id) update_user_profile(session_id, pergunta) # === NOVA ABORDAGEM: Monta mensagens em formato adequado === # 1. Obtém mensagens anteriores da conversa (já formatadas) conversation_messages = get_conversation_messages(session_id) # 2. Monta o system prompt (mais conciso) perfil_info = get_user_profile_context(session_id) system_prompt = f"""Você é o Dr. Aldo Henrique, Doutor em Ciências da Computação pela UnB (2024), professor universitário especializado em: - Algoritmos e Estruturas de Dados - Inteligência Artificial - Ciência de Dados e Mineração de Dados - Desenvolvimento de Software Informações do usuário: {perfil_info} Responda sempre em português, de forma didática e clara: - Explique conceitos antes de mostrar código - Use exemplos práticos adaptados ao nível do usuário - Faça uma pequena observação interessante ou engraçada relacionada à pergunta - Use Markdown para formatação - Adicione comentários explicativos no código""" # 3. Adiciona contexto do blog apenas se relevante (sem repetir na conversa) blog_context = retrieve_context_from_blog(pergunta) if blog_context: system_prompt += f"\n\nContexto do seu blog (use apenas se relevante para a pergunta):\n{blog_context}" # 4. Monta as mensagens finais messages = [{"role": "system", "content": system_prompt}] # Adiciona mensagens anteriores da conversa (sem duplicação) messages.extend(conversation_messages) # Adiciona a pergunta atual messages.append({"role": "user", "content": pergunta}) # === DEBUG: Log do que está sendo enviado === print(f"\n=== DEBUG SESSION {session_id} ===") print(f"Pergunta atual: {pergunta}") print(f"Mensagens na conversa: {len(conversation_messages)}") print(f"Total de mensagens enviadas: {len(messages)}") print("=" * 40) # 5. Faz requisição usando InferenceClient inference_client = HuggingFaceInferenceClient(HF_TOKEN) model_name = MODELS[modelo] resposta = inference_client.query_model(model_name, messages, max_tokens=1200) # Reduzido # 6. Limpa a resposta (remove possíveis repetições) resposta_limpa = resposta.strip() # Remove qualquer repetição óbvia da pergunta if pergunta.lower() in resposta_limpa.lower()[:100]: # Se a pergunta aparece no início lines = resposta_limpa.split('\n') # Remove linhas que são muito similares à pergunta filtered_lines = [] for line in lines: if not (len(line.strip()) > 0 and any(word in line.lower() for word in pergunta.lower().split() if len(word) > 3) and len(line.strip()) < len(pergunta) * 1.5): filtered_lines.append(line) resposta_limpa = '\n'.join(filtered_lines).strip() # 7. Adiciona informação sobre modelo usado (mais discreta) resposta_final = f"{resposta_limpa}\n\n*— {modelo}*" # 8. Salva na memória (a resposta limpa, sem a informação do modelo) add_to_memory(session_id, pergunta, resposta_limpa) return resposta_final # --- Inicialização --- def inicializar_sistema(): """Inicializa o sistema.""" print("Inicializando Chatbot Dr. Aldo...") print("=" * 50) # Verificar se huggingface_hub está instalado try: from huggingface_hub import InferenceClient print("✓ huggingface_hub disponível") except ImportError: print("⚠ AVISO: huggingface_hub não encontrado!") print("Execute: pip install huggingface_hub") return False, {} # Testa modelos (agora sempre retorna todos) num_total_models = test_and_update_models() # Sistema sempre é considerado inicializado, pois todos os modelos são adicionados print(f"\n✓ Sistema inicializado com {num_total_models} modelos!") print("⚠ Nem todos os modelos podem estar disponíveis no momento.") print("⚠ O sistema tentará usar qualquer modelo selecionado.") # Carrega vector store (opcional) try: load_vector_store() print("✓ Vector store carregado!") except Exception as e: print(f"⚠ Erro ao carregar vector store: {e}") print("⚠ Sistema funcionará sem contexto do blog.") return True, MODELS # --- Execução Principal --- if __name__ == "__main__": status, models = inicializar_sistema() if status: print("\n" + "="*50) print("TESTE DO SISTEMA CORRIGIDO") print("="*50) session_id = "teste_123" # Teste 1 print("\n1. Testando pergunta básica...") resposta1 = responder_como_aldo(session_id, "O que é Python?") print(f"Resposta: {resposta1[:200]}...") # Teste 2 - Pergunta relacionada (para testar memória) print("\n2. Testando pergunta relacionada...") resposta2 = responder_como_aldo(session_id, "Como posso começar a aprender Python?") print(f"Resposta: {resposta2[:200]}...") # Teste 3 - Pergunta completamente diferente print("\n3. Testando pergunta diferente...") resposta3 = responder_como_aldo(session_id, "Explique estruturas de dados") print(f"Resposta: {resposta3[:200]}...") # Limpeza print(f"\n4. {clear_memory(session_id)}") print("\n" + "="*50) print("SISTEMA CORRIGIDO PRONTO!") print("="*50) print("✓ Memória sem duplicação implementada") print("✓ Contexto otimizado para reduzir tokens") print("✓ Respostas mais limpas e diretas") else: print("\n" + "="*50) print("ERRO NA INICIALIZAÇÃO") print("="*50) print("Instale as dependências necessárias:") print("pip install huggingface_hub")