import requests import os import json import time import pickle from typing import Dict, List, Optional, Tuple from bs4 import BeautifulSoup from urllib.parse import urljoin, urlparse from langchain.text_splitter import RecursiveCharacterTextSplitter from langchain_community.vectorstores import FAISS from langchain_community.embeddings import HuggingFaceEmbeddings # --- Configurações --- BLOG_URL = "https://aldohenrique.com.br/" VECTOR_STORE_PATH = "faiss_index_store.pkl" PROCESSED_URLS_PATH = "processed_urls.pkl" HF_TOKEN = os.getenv("HF_TOKEN") if not HF_TOKEN: raise ValueError("Token HF_TOKEN não encontrado") # Lista inicial de modelos MODELS = { } # Novos modelos para testar NEW_MODELS_TO_TEST = [ ("Mistral 7B (Mais acertivo)", "mistralai/Mistral-7B-Instruct-v0.3"), ("Phi-3 Mini (Mais rápido)", "microsoft/Phi-3-mini-4k-instruct"), ("Zephyr 7B (Meio Termo)", "HuggingFaceH4/zephyr-7b-beta"), ("LLaMA 2-7B Chat", "meta-llama/Llama-2-7b-chat-hf"), ("LLaMA 3.2-3B Instruct", "meta-llama/Llama-3.2-3B-Instruct"), ("Gemma 2B Instruct", "google/gemma-2b-it"), ("Qwen2 7B Instruct", "Qwen/Qwen2-7B-Instruct"), ("Falcon 7B Instruct", "tiiuae/falcon-7b-instruct"), ("Mixtral 8x7B Instruct", "mistralai/Mixtral-8x7B-Instruct-v0.1"), ("LLaMA 3.1-8B Instruct", "meta-llama/Llama-3.1-8B-Instruct"), ("CodeLlama 7B Instruct", "codellama/CodeLlama-7b-Instruct-hf"), ("Starling LM 7B", "berkeley-nest/Starling-LM-7B-alpha"), ("OpenHermes 2.5 Mistral", "teknium/OpenHermes-2.5-Mistral-7B"), ("Gemma 7B Instruct", "google/gemma-7b-it"), ("Qwen 2.5-7B Instruct", "Qwen/Qwen2.5-7B-Instruct"), ("OLMo 7B Instruct", "allenai/OLMo-7B-Instruct-hf") ] DEFAULT_MODEL = "Mistral 7B (Mais acertivo)" # --- Gerenciamento de Sessão --- user_sessions: Dict[str, Dict[str, List | Dict]] = {} MAX_MEMORY_LENGTH = 5 def get_session_memory_path(session_id: str) -> str: """Retorna o caminho do arquivo de memória para a sessão.""" return f"conversation_memory_{session_id}.json" def load_conversation_memory(session_id: str): """Carrega a memória da sessão do usuário.""" if session_id in user_sessions: return memory_path = get_session_memory_path(session_id) session_data = {'conversation': [], 'user_profile': {'nivel': 'intermediario', 'total_perguntas': 0}} if os.path.exists(memory_path): try: with open(memory_path, 'r', encoding='utf-8') as f: session_data = json.load(f) except Exception as e: print(f"Erro ao carregar memória para sessão '{session_id}': {e}") user_sessions[session_id] = session_data def save_conversation_memory(session_id: str): """Salva a memória da sessão do usuário.""" memory_path = get_session_memory_path(session_id) try: with open(memory_path, 'w', encoding='utf-8') as f: json.dump(user_sessions[session_id], f, ensure_ascii=False, indent=2) except Exception as e: print(f"Erro ao salvar memória para sessão '{session_id}': {e}") def add_to_memory(session_id: str, user_message: str, assistant_response: str): """Adiciona uma troca de mensagens à memória da sessão.""" load_conversation_memory(session_id) conversation = user_sessions[session_id]['conversation'] conversation.extend([ {"role": "user", "content": user_message, "timestamp": time.time()}, {"role": "assistant", "content": assistant_response, "timestamp": time.time()} ]) if len(conversation) > MAX_MEMORY_LENGTH * 2: user_sessions[session_id]['conversation'] = conversation[-MAX_MEMORY_LENGTH * 2:] save_conversation_memory(session_id) def update_user_profile(session_id: str, user_message: str): """Atualiza o perfil do usuário com base na mensagem.""" load_conversation_memory(session_id) profile = user_sessions[session_id]['user_profile'] message_lower = user_message.lower() if any(word in message_lower for word in ['básico', 'iniciante']): profile['nivel'] = 'iniciante' elif any(word in message_lower for word in ['avançado', 'complexo']): profile['nivel'] = 'avançado' topics = { 'java': ['java', 'classe', 'objeto'], 'web': ['html', 'css', 'javascript'], 'ia': ['inteligência artificial', 'machine learning'] } for topic, keywords in topics.items(): if any(keyword in message_lower for keyword in keywords): profile[f'interesse_{topic}'] = profile.get(f'interesse_{topic}', 0) + 1 profile['total_perguntas'] = profile.get('total_perguntas', 0) + 1 user_sessions[session_id]['user_profile'] = profile def get_conversation_context(session_id: str) -> str: """Gera o contexto da conversa recente.""" load_conversation_memory(session_id) conversation = user_sessions[session_id]['conversation'][-4:] if not conversation: return "" return "\n".join(f"{msg['role'].upper()}: {msg['content']}" for msg in conversation) def get_user_profile_context(session_id: str) -> str: """Gera o contexto do perfil do usuário.""" load_conversation_memory(session_id) profile = user_sessions[session_id]['user_profile'] context = f"Nível: {profile.get('nivel', 'intermediario')}\n" context += f"Total de perguntas: {profile.get('total_perguntas', 0)}\n" interesses = [f"{k.replace('interesse_', '').title()} ({v})" for k, v in profile.items() if k.startswith('interesse_')] if interesses: context += f"Interesses: {', '.join(interesses)}\n" return context def clear_memory(session_id: str) -> str: """Limpa a memória de uma sessão específica.""" if session_id in user_sessions: del user_sessions[session_id] memory_path = get_session_memory_path(session_id) if os.path.exists(memory_path): os.remove(memory_path) return "Memória limpa com sucesso!" # --- RAG (Crawling e Vector Store) --- vector_store: Optional[FAISS] = None def get_all_blog_links(url: str) -> set: """Coleta todos os links do blog.""" links = {url} visited = set() while links: current_url = links.pop() if current_url in visited: continue try: response = requests.get(current_url, timeout=500) soup = BeautifulSoup(response.content, 'html.parser') visited.add(current_url) for link in soup.find_all('a', href=True): href = urljoin(url, link['href']) if urlparse(href).netloc == urlparse(url).netloc and '/tag/' not in href and '/category/' not in href: links.add(href) except Exception as e: print(f"Erro ao acessar {current_url}: {e}") return visited def scrape_text_from_url(url: str) -> str: """Extrai texto de uma URL.""" try: response = requests.get(url, timeout=500) soup = BeautifulSoup(response.content, 'html.parser') content = soup.find('article') or soup.find('main') return content.get_text(separator='\n', strip=True) if content else "" except Exception as e: print(f"Erro ao raspar {url}: {e}") return "" def build_and_save_vector_store(): """Constrói e salva o vector store.""" global vector_store links = get_all_blog_links(BLOG_URL) texts = [scrape_text_from_url(link) for link in links if scrape_text_from_url(link)] if not texts: return "Nenhum conteúdo encontrado." text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=150) chunks = text_splitter.create_documents(texts) embeddings = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2") vector_store = FAISS.from_documents(chunks, embeddings) with open(VECTOR_STORE_PATH, "wb") as f: pickle.dump(vector_store, f) with open(PROCESSED_URLS_PATH, "wb") as f: pickle.dump(links, f) return f"Vector store criado com {len(chunks)} chunks." def load_vector_store(): """Carrega o vector store.""" global vector_store if os.path.exists(VECTOR_STORE_PATH): with open(VECTOR_STORE_PATH, "rb") as f: vector_store = pickle.load(f) else: build_and_save_vector_store() def retrieve_context_from_blog(query: str, k: int = 3) -> str: """Busca contexto relevante no vector store.""" if vector_store: try: results = vector_store.similarity_search(query, k=k) return "\n".join(doc.page_content for doc in results) except Exception as e: print(f"Erro ao buscar contexto: {e}") return "" # --- API Client --- class HuggingFaceAPIClient: def __init__(self, token: str): self.headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"} def check_model_info(self, model_name: str) -> Tuple[bool, str]: """Verifica informações do modelo via API do Hugging Face.""" url = f"https://huggingface.co/api/models/{model_name}" try: response = requests.get(url, headers=self.headers, timeout=10) if response.status_code == 200: model_info = response.json() # Verifica se o modelo não está desabilitado if model_info.get('disabled', False): return False, "Modelo desabilitado" # Verifica se requer aprovação if model_info.get('gated', False): return False, "Modelo requer aprovação/aceite de licença" return True, "Modelo disponível" elif response.status_code == 404: return False, "Modelo não encontrado" else: return False, f"Erro HTTP {response.status_code}" except requests.exceptions.RequestException as e: return False, f"Erro na requisição: {str(e)}" def test_model_inference(self, model_name: str) -> Tuple[bool, str]: """Testa se o modelo está disponível para inferência.""" # Primeiro, tenta o endpoint de text-generation (mais comum) url = f"https://api-inference.huggingface.co/models/{model_name}" test_payload = { "inputs": "Teste de disponibilidade do modelo.", "parameters": { "max_new_tokens": 10, "temperature": 0.1, "return_full_text": False } } try: response = requests.post(url, headers=self.headers, json=test_payload, timeout=90) if response.status_code == 200: result = response.json() if isinstance(result, list) and len(result) > 0: return True, "Modelo disponível para inferência" elif isinstance(result, dict) and 'error' not in result: return True, "Modelo disponível para inferência" else: return False, f"Resposta inesperada: {result}" elif response.status_code == 503: return False, "Modelo está carregando (503)" elif response.status_code == 400: error_msg = response.json().get('error', 'Erro 400') if 'loading' in error_msg.lower(): return False, "Modelo está carregando" return False, f"Erro 400: {error_msg}" elif response.status_code == 401: return False, "Token inválido ou sem permissão" elif response.status_code == 404: return False, "Modelo não encontrado" else: return False, f"Erro HTTP {response.status_code}: {response.text}" except requests.exceptions.Timeout: return False, "Timeout na requisição" except requests.exceptions.RequestException as e: return False, f"Erro na requisição: {str(e)}" def test_model_availability(self, model_name: str) -> Tuple[bool, str]: """Testa se um modelo está disponível, combinando verificação de info e inferência.""" print(f"Testando modelo: {model_name}") # Primeiro verifica as informações do modelo info_available, info_msg = self.check_model_info(model_name) if not info_available: return False, f"Info check failed: {info_msg}" print(f" ✓ Info check: {info_msg}") # Em seguida testa a inferência inference_available, inference_msg = self.test_model_inference(model_name) if inference_available: print(f" ✓ Inference check: {inference_msg}") return True, f"Disponível - {info_msg}" else: print(f" ✗ Inference check: {inference_msg}") return False, f"Não disponível para inferência: {inference_msg}" def query_model(self, model_name: str, messages: List[Dict], max_tokens: int = 1000) -> str: """Faz requisição ao modelo usando text-generation.""" # Converte mensagens para prompt simples prompt = self._convert_messages_to_prompt(messages) url = f"https://api-inference.huggingface.co/models/{model_name}" payload = { "inputs": prompt, "parameters": { "max_new_tokens": max_tokens, "temperature": 0.7, "do_sample": True, "return_full_text": False } } try: response = requests.post(url, headers=self.headers, json=payload, timeout=60) response.raise_for_status() result = response.json() if isinstance(result, list) and len(result) > 0: return result[0].get('generated_text', '').strip() elif isinstance(result, dict) and 'generated_text' in result: return result['generated_text'].strip() else: return f"Formato de resposta inesperado: {result}" except requests.exceptions.HTTPError as http_err: return f"Erro HTTP: {http_err.response.status_code} - {http_err.response.text}" except requests.exceptions.RequestException as e: return f"Erro na requisição: {str(e)}" def _convert_messages_to_prompt(self, messages: List[Dict]) -> str: """Converte mensagens do formato chat para prompt simples.""" prompt_parts = [] for msg in messages: role = msg['role'] content = msg['content'] if role == 'system': prompt_parts.append(f"Sistema: {content}") elif role == 'user': prompt_parts.append(f"Usuário: {content}") elif role == 'assistant': prompt_parts.append(f"Assistente: {content}") prompt_parts.append("Assistente:") return "\n\n".join(prompt_parts) api_client = HuggingFaceAPIClient(HF_TOKEN) # --- Função para Testar e Atualizar Modelos --- def test_and_update_models() -> int: """ Testa a disponibilidade dos novos modelos e atualiza a lista MODELS. Retorna o número de modelos disponíveis (incluindo os iniciais). """ print("Testando disponibilidade dos novos modelos...") print(f"Token HF disponível: {'Sim' if HF_TOKEN else 'Não'}") print("-" * 60) initial_model_count = len(MODELS) # Contar os modelos já existentes available_models_during_test = [] unavailable_models = [] for model_label, model_name in NEW_MODELS_TO_TEST: is_available, message = api_client.test_model_availability(model_name) if is_available: if model_label not in MODELS: # Adiciona apenas se não estiver já na lista inicial MODELS[model_label] = model_name available_models_during_test.append((model_label, model_name, message)) print(f"✓ {model_label}") else: unavailable_models.append((model_label, model_name, message)) print(f"✗ {model_label} - {message}") # Pequena pausa para evitar rate limiting time.sleep(1) # Exibir resultados finais print("\n" + "=" * 60) print("RESULTADOS DA VALIDAÇÃO:") print("=" * 60) if available_models_during_test: print(f"\n✓ MODELOS DISPONÍVEIS (novos e testados: {len(available_models_during_test)}):") for label, name, msg in available_models_during_test: print(f" - {label}") print(f" {name}") print(f" Status: {msg}") print() if unavailable_models: print(f"\n✗ MODELOS NÃO DISPONÍVEIS ({len(unavailable_models)}):") for label, name, msg in unavailable_models: print(f" - {label}") print(f" {name}") print(f" Motivo: {msg}") print() print(f"TOTAL DE MODELOS ATUALMENTE DISPONÍVEIS: {len(MODELS)}") print("=" * 60) # Salva a lista atualizada de modelos save_updated_models() return len(MODELS) # Retorna a contagem total de modelos disponíveis def save_updated_models(): """Salva a lista atualizada de modelos em um arquivo.""" try: with open("models_available.json", "w", encoding="utf-8") as f: json.dump(MODELS, f, ensure_ascii=False, indent=2) print("Lista de modelos disponíveis salva em 'models_available.json'") except Exception as e: print(f"Erro ao salvar lista de modelos: {e}") # --- Chat Principal --- def responder_como_aldo(session_id: str, pergunta: str, modelo: str = DEFAULT_MODEL) -> str: """Gera resposta como Dr. Aldo Henrique.""" if not pergunta.strip(): return "Por favor, faça uma pergunta válida." load_conversation_memory(session_id) update_user_profile(session_id, pergunta) # Monta contexto contexto = [] if perfil := get_user_profile_context(session_id): contexto.append(f"**Perfil do Usuário**\n{perfil}") if conversa := get_conversation_context(session_id): contexto.append(f"**Conversa Anterior**\n{conversa}") if blog := retrieve_context_from_blog(pergunta): contexto.append(f"**Contexto do Blog**\n{blog}") system_prompt = """Você é o Dr. Aldo Henrique, Doutor em Ciências da Computação pela UnB (2024), mestre em Ciências da Computação pela UnB (2017) e bacharel em Sistemas de Informação pela UFV (2014). Professor universitário, onde leciona disciplinas como Algoritmos, Inteligência Artificial, Ciência de Dados e Mineração de Dados. Atua como analista de sistemas nível 4. Regras de conduta: - Responda em português, de forma clara, amigável e educativa. - Explique conceitos antes de mostrar soluções. - Use exemplos práticos e, se houver código, comente cada linha. - Considere o nível do usuário (iniciante, intermediário ou avançado). - Use Markdown para formatar respostas, com ``` para blocos de código. - Foque em tecnologia; se a pergunta for fora do escopo, informe educadamente. """ conteudo_contexto = "\n".join(contexto) mensagem_usuario = f"{conteudo_contexto}\n\n**Pergunta**: {pergunta}" messages = [ {"role": "system", "content": system_prompt}, {"role": "user", "content": mensagem_usuario} ] model_name = MODELS.get(modelo, MODELS[DEFAULT_MODEL]) resposta = api_client.query_model(model_name, messages) add_to_memory(session_id, pergunta, resposta) return resposta # --- Inicialização --- def inicializar_sistema(): """Inicializa o sistema, garantindo no mínimo 3 modelos disponíveis.""" print("Inicializando Chatbot Dr. Aldo...") num_available_models = test_and_update_models() # Retorna a contagem de modelos disponíveis if num_available_models >= 3: load_vector_store() print("Sistema inicializado e pronto para uso com modelos suficientes!") return True # Indica que a inicialização foi bem-sucedida else: print(f"Erro: Apenas {num_available_models} modelos disponíveis. São necessários pelo menos 3 modelos para iniciar o sistema.") return False # Indica que a inicialização falhou if __name__ == "__main__": # Apenas este bloco é executado quando o script é chamado diretamente if inicializar_sistema(): # Este é o local onde você colocaria o código para "carregar a página" # Por exemplo, iniciar um framework web como Flask ou Streamlit. # Por enquanto, vou manter seu teste básico como um placeholder para "carregar a página". print("\n" + "="*50) print("SISTEMA INICIADO: Realizando teste básico do Chatbot...") print("="*50) session_id = "teste_123" print(responder_como_aldo(session_id, "O que é Java?")) print("\n" + "-"*50) print(responder_como_aldo(session_id, "Mostre um exemplo de código Java.")) print("\n" + "-"*50) print(clear_memory(session_id)) else: print("\nSistema não pôde ser iniciado devido à falta de modelos suficientes.") print("Por favor, verifique a conexão com o Hugging Face e o token de acesso.")