portalprogramando / ai_logic.py
aldohenrique's picture
Update ai_logic.py
c67b3dc verified
import requests
import os
import json
import time
import pickle
from typing import Dict, List, Optional, Tuple
from bs4 import BeautifulSoup
from urllib.parse import urljoin, urlparse
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.vectorstores import FAISS
from langchain_community.embeddings import HuggingFaceEmbeddings
from huggingface_hub import InferenceClient
import asyncio
# --- Configurações ---
BLOG_URL = "https://aldohenrique.com.br/"
VECTOR_STORE_PATH = "faiss_index_store.pkl"
PROCESSED_URLS_PATH = "processed_urls.pkl"
HF_TOKEN = os.getenv("HF_TOKEN")
# Validação do token com mensagem mais clara
if not HF_TOKEN:
print("ERRO: Token HF_TOKEN não encontrado!")
print("Execute: export HF_TOKEN='seu_token_aqui' ou defina como variável de ambiente")
exit(1)
print(f"Token HF encontrado: {HF_TOKEN[:10]}...")
# --- Modelos para teste (versão com InferenceClient) ---
MODELS = {}
# Lista de modelos mais estáveis e com maior chance de funcionar
NEW_MODELS_TO_TEST = [
("Llama 3.2 3B", "meta-llama/Llama-3.2-3B-Instruct"),
("Mistral 7B", "mistralai/Mistral-7B-Instruct-v0.3"),
("Mistral Nemo", "mistralai/Mistral-Nemo-Instruct-2407"),
("Phi-3.5 Mini", "microsoft/Phi-3.5-mini-instruct"),
("Qwen2.5 7B", "Qwen/Qwen2.5-7B-Instruct"),
("Gemma 2 2B", "google/gemma-2-2b-it"),
("CodeLlama 7B", "codellama/CodeLlama-7b-Instruct-hf"),
("Zephyr 7B", "HuggingFaceH4/zephyr-7b-beta"),
("IBM 2B", "ibm-granite/granite-3.3-2b-instruct")
]
DEFAULT_MODEL = "Llama 3.2 3B"
# --- Gerenciamento de Sessão ---
user_sessions: Dict[str, Dict[str, List | Dict]] = {}
MAX_MEMORY_LENGTH = 8 # Aumentado para ter mais contexto útil
def get_session_memory_path(session_id: str) -> str:
"""Retorna o caminho do arquivo de memória para a sessão."""
return f"conversation_memory_{session_id}.json"
def load_conversation_memory(session_id: str):
"""Carrega a memória da sessão do usuário."""
if session_id in user_sessions:
return
memory_path = get_session_memory_path(session_id)
session_data = {'conversation': [], 'user_profile': {'nivel': 'intermediario', 'total_perguntas': 0}}
if os.path.exists(memory_path):
try:
with open(memory_path, 'r', encoding='utf-8') as f:
session_data = json.load(f)
except Exception as e:
print(f"Erro ao carregar memória para sessão '{session_id}': {e}")
user_sessions[session_id] = session_data
def save_conversation_memory(session_id: str):
"""Salva a memória da sessão do usuário."""
memory_path = get_session_memory_path(session_id)
try:
with open(memory_path, 'w', encoding='utf-8') as f:
json.dump(user_sessions[session_id], f, ensure_ascii=False, indent=2)
except Exception as e:
print(f"Erro ao salvar memória para sessão '{session_id}': {e}")
def add_to_memory(session_id: str, user_message: str, assistant_response: str):
"""Adiciona uma troca de mensagens à memória da sessão."""
load_conversation_memory(session_id)
conversation = user_sessions[session_id]['conversation']
conversation.extend([
{"role": "user", "content": user_message, "timestamp": time.time()},
{"role": "assistant", "content": assistant_response, "timestamp": time.time()}
])
# Mantém apenas as últimas conversas para evitar contexto muito longo
if len(conversation) > MAX_MEMORY_LENGTH * 2:
user_sessions[session_id]['conversation'] = conversation[-MAX_MEMORY_LENGTH * 2:]
save_conversation_memory(session_id)
def update_user_profile(session_id: str, user_message: str):
"""Atualiza o perfil do usuário com base na mensagem."""
load_conversation_memory(session_id)
profile = user_sessions[session_id]['user_profile']
message_lower = user_message.lower()
if any(word in message_lower for word in ['básico', 'iniciante']):
profile['nivel'] = 'iniciante'
elif any(word in message_lower for word in ['avançado', 'complexo']):
profile['nivel'] = 'avançado'
topics = {
'java': ['java', 'classe', 'objeto'],
'web': ['html', 'css', 'javascript'],
'ia': ['inteligência artificial', 'machine learning']
}
for topic, keywords in topics.items():
if any(keyword in message_lower for keyword in keywords):
profile[f'interesse_{topic}'] = profile.get(f'interesse_{topic}', 0) + 1
profile['total_perguntas'] = profile.get('total_perguntas', 0) + 1
user_sessions[session_id]['user_profile'] = profile
def get_conversation_messages(session_id: str) -> List[Dict]:
"""
NOVA FUNÇÃO: Retorna as mensagens da conversa em formato adequado para o modelo.
Esta é a chave para resolver o problema de duplicação!
"""
load_conversation_memory(session_id)
conversation = user_sessions[session_id]['conversation']
# Pega apenas as últimas 6 mensagens (3 trocas) para não sobrecarregar
recent_conversation = conversation[-6:] if len(conversation) > 6 else conversation
# Converte para formato de mensagens do modelo
messages = []
for msg in recent_conversation:
# Remove metadados desnecessários das mensagens antigas
clean_content = msg['content']
# Remove a linha de informação do modelo das respostas antigas
if msg['role'] == 'assistant' and '*Resposta gerada pelo modelo:' in clean_content:
clean_content = clean_content.split('*Resposta gerada pelo modelo:')[0].strip()
messages.append({
"role": msg['role'],
"content": clean_content
})
return messages
def get_user_profile_context(session_id: str) -> str:
"""Gera o contexto do perfil do usuário de forma mais concisa."""
load_conversation_memory(session_id)
profile = user_sessions[session_id]['user_profile']
# Contexto mais conciso para não poluir o prompt
nivel = profile.get('nivel', 'intermediario')
total = profile.get('total_perguntas', 0)
context_parts = [f"Nível: {nivel}"]
# Só inclui interesses se há algum padrão significativo
interesses = [k.replace('interesse_', '').title()
for k, v in profile.items()
if k.startswith('interesse_') and v >= 2] # Só se perguntou pelo menos 2 vezes
if interesses:
context_parts.append(f"Interesses: {', '.join(interesses)}")
return " | ".join(context_parts)
def clear_memory(session_id: str) -> str:
"""Limpa a memória de uma sessão específica."""
if session_id in user_sessions:
del user_sessions[session_id]
memory_path = get_session_memory_path(session_id)
if os.path.exists(memory_path):
os.remove(memory_path)
return "Memória limpa com sucesso!"
# --- RAG (Crawling e Vector Store) ---
vector_store: Optional[FAISS] = None
def get_all_blog_links(url: str) -> set:
"""Coleta todos os links do blog."""
links = {url}
visited = set()
while links:
current_url = links.pop()
if current_url in visited:
continue
try:
response = requests.get(current_url, timeout=60)
soup = BeautifulSoup(response.content, 'html.parser')
visited.add(current_url)
for link in soup.find_all('a', href=True):
href = urljoin(url, link['href'])
if urlparse(href).netloc == urlparse(url).netloc and '/tag/' not in href and '/category/' not in href:
links.add(href)
except Exception as e:
print(f"Erro ao acessar {current_url}: {e}")
return visited
def scrape_text_from_url(url: str) -> str:
"""Extrai texto de uma URL."""
try:
response = requests.get(url, timeout=10)
soup = BeautifulSoup(response.content, 'html.parser')
content = soup.find('article') or soup.find('main')
return content.get_text(separator='\n', strip=True) if content else ""
except Exception as e:
print(f"Erro ao raspar {url}: {e}")
return ""
def build_and_save_vector_store():
"""Constrói e salva o vector store."""
global vector_store
print("Construindo vector store...")
try:
links = get_all_blog_links(BLOG_URL)
texts = [scrape_text_from_url(link) for link in links if scrape_text_from_url(link)]
if not texts:
print("Nenhum conteúdo encontrado no blog.")
return "Nenhum conteúdo encontrado."
text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=150)
chunks = text_splitter.create_documents(texts)
embeddings = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2")
vector_store = FAISS.from_documents(chunks, embeddings)
with open(VECTOR_STORE_PATH, "wb") as f:
pickle.dump(vector_store, f)
with open(PROCESSED_URLS_PATH, "wb") as f:
pickle.dump(links, f)
print(f"Vector store criado com {len(chunks)} chunks.")
return f"Vector store criado com {len(chunks)} chunks."
except Exception as e:
print(f"Erro ao construir vector store: {e}")
return f"Erro ao construir vector store: {e}"
def load_vector_store():
"""Carrega o vector store."""
global vector_store
try:
if os.path.exists(VECTOR_STORE_PATH):
with open(VECTOR_STORE_PATH, "rb") as f:
vector_store = pickle.load(f)
print("Vector store carregado com sucesso.")
else:
print("Vector store não encontrado. Criando novo...")
build_and_save_vector_store()
except Exception as e:
print(f"Erro ao carregar vector store: {e}")
print("Tentando criar novo vector store...")
build_and_save_vector_store()
def retrieve_context_from_blog(query: str, k: int = 3) -> str:
"""Busca contexto relevante no vector store - Reduzido para evitar sobrecarga."""
if vector_store:
try:
results = vector_store.similarity_search(query, k=k)
# Limita o tamanho do contexto para evitar tokens excessivos
context_parts = []
total_chars = 0
max_chars = 1500 # Limite de caracteres do contexto do blog
for doc in results:
if total_chars + len(doc.page_content) > max_chars:
break
context_parts.append(doc.page_content)
total_chars += len(doc.page_content)
return "\n---\n".join(context_parts)
except Exception as e:
print(f"Erro ao buscar contexto: {e}")
return ""
# --- Inference Client (Versão Melhorada com huggingface_hub) ---
class HuggingFaceInferenceClient:
def __init__(self, token: str):
self.token = token
self.clients = {} # Cache de clientes para diferentes modelos
def get_client(self, model_name: str) -> InferenceClient:
"""Obtém ou cria um cliente para o modelo especificado."""
if model_name not in self.clients:
self.clients[model_name] = InferenceClient(
model=model_name,
token=self.token
)
return self.clients[model_name]
def check_model_status(self, model_name: str) -> Tuple[bool, str]:
"""Verifica se um modelo está disponível."""
try:
print(f" Testando {model_name}...")
client = self.get_client(model_name)
# Teste simples com mensagem básica
test_messages = [
{"role": "user", "content": "Hello"}
]
# Tenta fazer uma requisição de teste
response = client.chat_completion(
messages=test_messages,
max_tokens=5,
temperature=0.1
)
if response and hasattr(response, 'choices') and len(response.choices) > 0:
return True, "Modelo disponível"
else:
return False, "Resposta inválida do modelo"
except Exception as e:
error_msg = str(e).lower()
if 'loading' in error_msg or 'currently loading' in error_msg:
return False, "Modelo carregando"
elif 'rate limit' in error_msg:
return False, "Rate limit atingido"
elif 'token' in error_msg or 'unauthorized' in error_msg:
return False, "Token inválido"
elif 'model not found' in error_msg:
return False, "Modelo não encontrado"
else:
return False, f"Erro: {str(e)[:100]}"
def query_model(self, model_name: str, messages: List[Dict], max_tokens: int = 1500, temperature: float = 0.5) -> str:
"""Faz requisição ao modelo usando chat completion - Reduzido max_tokens."""
try:
client = self.get_client(model_name)
# Faz a requisição usando chat completion
response = client.chat_completion(
messages=messages,
max_tokens=max_tokens,
temperature=temperature,
stream=False
)
# Extrai a resposta
if response and hasattr(response, 'choices') and len(response.choices) > 0:
content = response.choices[0].message.content
return content.strip() if content else "Resposta vazia do modelo"
else:
return "Erro: Resposta inválida do modelo"
except Exception as e:
error_msg = str(e)
if 'loading' in error_msg.lower():
return f"Modelo {model_name} está carregando. Tente novamente em alguns minutos."
elif 'rate limit' in error_msg.lower():
return "Rate limit atingido. Aguarde alguns momentos e tente novamente."
elif 'token' in error_msg.lower() or 'unauthorized' in error_msg.lower():
return "Erro de autenticação. Verifique seu token HuggingFace."
else:
return f"Erro ao consultar modelo: {error_msg}"
# --- Função para Testar e Atualizar Modelos ---
def test_and_update_models() -> int:
"""Testa modelos e adiciona TODOS à lista MODELS, independente da disponibilidade."""
print("Testando disponibilidade dos modelos...")
print(f"Token HF disponível: {'Sim' if HF_TOKEN else 'Não'}")
print("-" * 60)
inference_client = HuggingFaceInferenceClient(HF_TOKEN)
model_status = {} # Para armazenar status de cada modelo
# Testa todos os modelos mas adiciona TODOS à lista MODELS
for model_label, model_name in NEW_MODELS_TO_TEST:
try:
is_available, message = inference_client.check_model_status(model_name)
# Armazena o status para exibição
model_status[model_label] = {
'available': is_available,
'message': message,
'model_name': model_name
}
if is_available:
print(f"✓ {model_label} - {message}")
else:
print(f"⚠ {model_label} - {message} (adicionado mesmo assim)")
except Exception as e:
print(f"⚠ {model_label} - Erro: {str(e)} (adicionado mesmo assim)")
model_status[model_label] = {
'available': False,
'message': f"Erro: {str(e)}",
'model_name': model_name
}
# Pausa para evitar rate limiting
time.sleep(3)
# SEMPRE adiciona TODOS os modelos, independente da disponibilidade
global MODELS
MODELS.clear()
for model_label, model_name in NEW_MODELS_TO_TEST:
MODELS[model_label] = model_name
print("\n" + "=" * 60)
print("TODOS OS MODELOS ADICIONADOS (INDEPENDENTE DE DISPONIBILIDADE):")
print("=" * 60)
for i, (label, name) in enumerate(MODELS.items(), 1):
status_info = model_status.get(label, {})
status_symbol = "✓" if status_info.get('available', False) else "⚠"
status_msg = status_info.get('message', 'Status desconhecido')
print(f"{i}. {status_symbol} {label} ({name}) - {status_msg}")
print(f"\nTOTAL: {len(MODELS)} modelos adicionados")
print("=" * 60)
# Salva lista completa (incluindo status)
try:
models_with_status = {}
for label, name in MODELS.items():
status_info = model_status.get(label, {})
models_with_status[label] = {
'model_name': name,
'available': status_info.get('available', False),
'status_message': status_info.get('message', 'Status desconhecido'),
'last_checked': time.time()
}
with open("models_available.json", "w", encoding="utf-8") as f:
json.dump(models_with_status, f, ensure_ascii=False, indent=2)
print("Lista completa salva em 'models_available.json'")
except Exception as e:
print(f"Erro ao salvar lista: {e}")
return len(MODELS)
# --- Chat Principal (VERSÃO CORRIGIDA) ---
def responder_como_aldo(session_id: str, pergunta: str, modelo: str = None) -> str:
"""
FUNÇÃO PRINCIPAL CORRIGIDA: Gera resposta como Dr. Aldo Henrique sem duplicação.
"""
if not pergunta.strip():
return "Por favor, faça uma pergunta válida."
# Usar primeiro modelo disponível se nenhum especificado
if not modelo or modelo not in MODELS:
if not MODELS:
return "Erro: Nenhum modelo disponível!"
modelo = list(MODELS.keys())[0]
load_conversation_memory(session_id)
update_user_profile(session_id, pergunta)
# === NOVA ABORDAGEM: Monta mensagens em formato adequado ===
# 1. Obtém mensagens anteriores da conversa (já formatadas)
conversation_messages = get_conversation_messages(session_id)
# 2. Monta o system prompt (mais conciso)
perfil_info = get_user_profile_context(session_id)
system_prompt = f"""Você é o Dr. Aldo Henrique, Doutor em Ciências da Computação pela UnB (2024), professor universitário especializado em:
- Algoritmos e Estruturas de Dados
- Inteligência Artificial
- Ciência de Dados e Mineração de Dados
- Desenvolvimento de Software
Informações do usuário: {perfil_info}
Responda sempre em português, de forma didática e clara:
- Explique conceitos antes de mostrar código
- Use exemplos práticos adaptados ao nível do usuário
- Faça uma pequena observação interessante ou engraçada relacionada à pergunta
- Use Markdown para formatação
- Adicione comentários explicativos no código"""
# 3. Adiciona contexto do blog apenas se relevante (sem repetir na conversa)
blog_context = retrieve_context_from_blog(pergunta)
if blog_context:
system_prompt += f"\n\nContexto do seu blog (use apenas se relevante para a pergunta):\n{blog_context}"
# 4. Monta as mensagens finais
messages = [{"role": "system", "content": system_prompt}]
# Adiciona mensagens anteriores da conversa (sem duplicação)
messages.extend(conversation_messages)
# Adiciona a pergunta atual
messages.append({"role": "user", "content": pergunta})
# === DEBUG: Log do que está sendo enviado ===
print(f"\n=== DEBUG SESSION {session_id} ===")
print(f"Pergunta atual: {pergunta}")
print(f"Mensagens na conversa: {len(conversation_messages)}")
print(f"Total de mensagens enviadas: {len(messages)}")
print("=" * 40)
# 5. Faz requisição usando InferenceClient
inference_client = HuggingFaceInferenceClient(HF_TOKEN)
model_name = MODELS[modelo]
resposta = inference_client.query_model(model_name, messages, max_tokens=1200) # Reduzido
# 6. Limpa a resposta (remove possíveis repetições)
resposta_limpa = resposta.strip()
# Remove qualquer repetição óbvia da pergunta
if pergunta.lower() in resposta_limpa.lower()[:100]: # Se a pergunta aparece no início
lines = resposta_limpa.split('\n')
# Remove linhas que são muito similares à pergunta
filtered_lines = []
for line in lines:
if not (len(line.strip()) > 0 and
any(word in line.lower() for word in pergunta.lower().split() if len(word) > 3) and
len(line.strip()) < len(pergunta) * 1.5):
filtered_lines.append(line)
resposta_limpa = '\n'.join(filtered_lines).strip()
# 7. Adiciona informação sobre modelo usado (mais discreta)
resposta_final = f"{resposta_limpa}\n\n*— {modelo}*"
# 8. Salva na memória (a resposta limpa, sem a informação do modelo)
add_to_memory(session_id, pergunta, resposta_limpa)
return resposta_final
# --- Inicialização ---
def inicializar_sistema():
"""Inicializa o sistema."""
print("Inicializando Chatbot Dr. Aldo...")
print("=" * 50)
# Verificar se huggingface_hub está instalado
try:
from huggingface_hub import InferenceClient
print("✓ huggingface_hub disponível")
except ImportError:
print("⚠ AVISO: huggingface_hub não encontrado!")
print("Execute: pip install huggingface_hub")
return False, {}
# Testa modelos (agora sempre retorna todos)
num_total_models = test_and_update_models()
# Sistema sempre é considerado inicializado, pois todos os modelos são adicionados
print(f"\n✓ Sistema inicializado com {num_total_models} modelos!")
print("⚠ Nem todos os modelos podem estar disponíveis no momento.")
print("⚠ O sistema tentará usar qualquer modelo selecionado.")
# Carrega vector store (opcional)
try:
load_vector_store()
print("✓ Vector store carregado!")
except Exception as e:
print(f"⚠ Erro ao carregar vector store: {e}")
print("⚠ Sistema funcionará sem contexto do blog.")
return True, MODELS
# --- Execução Principal ---
if __name__ == "__main__":
status, models = inicializar_sistema()
if status:
print("\n" + "="*50)
print("TESTE DO SISTEMA CORRIGIDO")
print("="*50)
session_id = "teste_123"
# Teste 1
print("\n1. Testando pergunta básica...")
resposta1 = responder_como_aldo(session_id, "O que é Python?")
print(f"Resposta: {resposta1[:200]}...")
# Teste 2 - Pergunta relacionada (para testar memória)
print("\n2. Testando pergunta relacionada...")
resposta2 = responder_como_aldo(session_id, "Como posso começar a aprender Python?")
print(f"Resposta: {resposta2[:200]}...")
# Teste 3 - Pergunta completamente diferente
print("\n3. Testando pergunta diferente...")
resposta3 = responder_como_aldo(session_id, "Explique estruturas de dados")
print(f"Resposta: {resposta3[:200]}...")
# Limpeza
print(f"\n4. {clear_memory(session_id)}")
print("\n" + "="*50)
print("SISTEMA CORRIGIDO PRONTO!")
print("="*50)
print("✓ Memória sem duplicação implementada")
print("✓ Contexto otimizado para reduzir tokens")
print("✓ Respostas mais limpas e diretas")
else:
print("\n" + "="*50)
print("ERRO NA INICIALIZAÇÃO")
print("="*50)
print("Instale as dependências necessárias:")
print("pip install huggingface_hub")