portalprogramando / ai_logic.py
aldohenrique's picture
Update ai_logic.py
1565ea2 verified
raw
history blame
21.6 kB
import requests
import os
import json
import time
import pickle
from typing import Dict, List, Optional, Tuple
from bs4 import BeautifulSoup
from urllib.parse import urljoin, urlparse
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.vectorstores import FAISS
from langchain_community.embeddings import HuggingFaceEmbeddings
# --- Configurações ---
BLOG_URL = "https://aldohenrique.com.br/"
VECTOR_STORE_PATH = "faiss_index_store.pkl"
PROCESSED_URLS_PATH = "processed_urls.pkl"
HF_TOKEN = os.getenv("HF_TOKEN")
if not HF_TOKEN:
raise ValueError("Token HF_TOKEN não encontrado")
# Lista inicial de modelos
MODELS = {}
# --- Função para buscar modelos ---
headers = {
"Authorization": f"Bearer {HF_TOKEN}"
}
# Modelos fixos que você quer manter
NEW_MODELS_TO_TEST = [
("Phi-3 Mini (Mais rápido)", "microsoft/Phi-3-mini-4k-instruct"),
("Zephyr 7B (Meio Termo)", "HuggingFaceH4/zephyr-7b-beta"),
]
# --- Consulta a API da Hugging Face ---
url = "https://huggingface.co/api/models"
params = {
"limit": 300, # Aumente se quiser trazer mais modelos
"full": "true"
}
response = requests.get(url, headers=headers, params=params)
if response.status_code != 200:
raise Exception(f"Erro na API: {response.status_code} - {response.text}")
models_data = response.json()
# --- Filtra modelos que possuem base_model ---
for model in models_data:
tags = model.get("tags", [])
base_model_tags = [tag for tag in tags if tag.startswith("base_model:")]
if base_model_tags:
model_name = model.get("id")
display_name = model_name.split("/")[-1]
# Verifica se já não está na lista para evitar duplicados
if not any(model_name == m[1] for m in NEW_MODELS_TO_TEST):
NEW_MODELS_TO_TEST.append((display_name, model_name))
# --- Resultado ---
print("Lista atualizada de modelos:\n")
for name, model_id in NEW_MODELS_TO_TEST:
print(f'("{name}", "{model_id}"),')
print(f"\nTotal de modelos na lista: {len(NEW_MODELS_TO_TEST)}")
# Nota: Alguns modelos podem requerer aprovação ou ter restrições de acesso
# Recomenda-se testar cada modelo individualmente para verificar disponibilidade
DEFAULT_MODEL = "Zephyr 7B (Meio Termo)"
# --- Gerenciamento de Sessão ---
user_sessions: Dict[str, Dict[str, List | Dict]] = {}
MAX_MEMORY_LENGTH = 5
def get_session_memory_path(session_id: str) -> str:
"""Retorna o caminho do arquivo de memória para a sessão."""
return f"conversation_memory_{session_id}.json"
def load_conversation_memory(session_id: str):
"""Carrega a memória da sessão do usuário."""
if session_id in user_sessions:
return
memory_path = get_session_memory_path(session_id)
session_data = {'conversation': [], 'user_profile': {'nivel': 'intermediario', 'total_perguntas': 0}}
if os.path.exists(memory_path):
try:
with open(memory_path, 'r', encoding='utf-8') as f:
session_data = json.load(f)
except Exception as e:
print(f"Erro ao carregar memória para sessão '{session_id}': {e}")
user_sessions[session_id] = session_data
def save_conversation_memory(session_id: str):
"""Salva a memória da sessão do usuário."""
memory_path = get_session_memory_path(session_id)
try:
with open(memory_path, 'w', encoding='utf-8') as f:
json.dump(user_sessions[session_id], f, ensure_ascii=False, indent=2)
except Exception as e:
print(f"Erro ao salvar memória para sessão '{session_id}': {e}")
def add_to_memory(session_id: str, user_message: str, assistant_response: str):
"""Adiciona uma troca de mensagens à memória da sessão."""
load_conversation_memory(session_id)
conversation = user_sessions[session_id]['conversation']
conversation.extend([
{"role": "user", "content": user_message, "timestamp": time.time()},
{"role": "assistant", "content": assistant_response, "timestamp": time.time()}
])
if len(conversation) > MAX_MEMORY_LENGTH * 2:
user_sessions[session_id]['conversation'] = conversation[-MAX_MEMORY_LENGTH * 2:]
save_conversation_memory(session_id)
def update_user_profile(session_id: str, user_message: str):
"""Atualiza o perfil do usuário com base na mensagem."""
load_conversation_memory(session_id)
profile = user_sessions[session_id]['user_profile']
message_lower = user_message.lower()
if any(word in message_lower for word in ['básico', 'iniciante']):
profile['nivel'] = 'iniciante'
elif any(word in message_lower for word in ['avançado', 'complexo']):
profile['nivel'] = 'avançado'
topics = {
'java': ['java', 'classe', 'objeto'],
'web': ['html', 'css', 'javascript'],
'ia': ['inteligência artificial', 'machine learning']
}
for topic, keywords in topics.items():
if any(keyword in message_lower for keyword in keywords):
profile[f'interesse_{topic}'] = profile.get(f'interesse_{topic}', 0) + 1
profile['total_perguntas'] = profile.get('total_perguntas', 0) + 1
user_sessions[session_id]['user_profile'] = profile
def get_conversation_context(session_id: str) -> str:
"""Gera o contexto da conversa recente."""
load_conversation_memory(session_id)
conversation = user_sessions[session_id]['conversation'][-4:]
if not conversation:
return ""
return "\n".join(f"{msg['role'].upper()}: {msg['content']}" for msg in conversation)
def get_user_profile_context(session_id: str) -> str:
"""Gera o contexto do perfil do usuário."""
load_conversation_memory(session_id)
profile = user_sessions[session_id]['user_profile']
context = f"Nível: {profile.get('nivel', 'intermediario')}\n"
context += f"Total de perguntas: {profile.get('total_perguntas', 0)}\n"
interesses = [f"{k.replace('interesse_', '').title()} ({v})" for k, v in profile.items() if k.startswith('interesse_')]
if interesses:
context += f"Interesses: {', '.join(interesses)}\n"
return context
def clear_memory(session_id: str) -> str:
"""Limpa a memória de uma sessão específica."""
if session_id in user_sessions:
del user_sessions[session_id]
memory_path = get_session_memory_path(session_id)
if os.path.exists(memory_path):
os.remove(memory_path)
return "Memória limpa com sucesso!"
# --- RAG (Crawling e Vector Store) ---
vector_store: Optional[FAISS] = None
def get_all_blog_links(url: str) -> set:
"""Coleta todos os links do blog."""
links = {url}
visited = set()
while links:
current_url = links.pop()
if current_url in visited:
continue
try:
response = requests.get(current_url, timeout=500)
soup = BeautifulSoup(response.content, 'html.parser')
visited.add(current_url)
for link in soup.find_all('a', href=True):
href = urljoin(url, link['href'])
if urlparse(href).netloc == urlparse(url).netloc and '/tag/' not in href and '/category/' not in href:
links.add(href)
except Exception as e:
print(f"Erro ao acessar {current_url}: {e}")
return visited
def scrape_text_from_url(url: str) -> str:
"""Extrai texto de uma URL."""
try:
response = requests.get(url, timeout=500)
soup = BeautifulSoup(response.content, 'html.parser')
content = soup.find('article') or soup.find('main')
return content.get_text(separator='\n', strip=True) if content else ""
except Exception as e:
print(f"Erro ao raspar {url}: {e}")
return ""
def build_and_save_vector_store():
"""Constrói e salva o vector store."""
global vector_store
links = get_all_blog_links(BLOG_URL)
texts = [scrape_text_from_url(link) for link in links if scrape_text_from_url(link)]
if not texts:
return "Nenhum conteúdo encontrado."
text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=150)
chunks = text_splitter.create_documents(texts)
embeddings = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2")
vector_store = FAISS.from_documents(chunks, embeddings)
with open(VECTOR_STORE_PATH, "wb") as f:
pickle.dump(vector_store, f)
with open(PROCESSED_URLS_PATH, "wb") as f:
pickle.dump(links, f)
return f"Vector store criado com {len(chunks)} chunks."
def load_vector_store():
"""Carrega o vector store."""
global vector_store
if os.path.exists(VECTOR_STORE_PATH):
with open(VECTOR_STORE_PATH, "rb") as f:
vector_store = pickle.load(f)
else:
build_and_save_vector_store()
def retrieve_context_from_blog(query: str, k: int = 4) -> str:
"""Busca contexto relevante no vector store."""
if vector_store:
try:
results = vector_store.similarity_search(query, k=k)
return "\n".join(doc.page_content for doc in results)
except Exception as e:
print(f"Erro ao buscar contexto: {e}")
return ""
# --- API Client ---
class HuggingFaceAPIClient:
def __init__(self, token: str):
self.headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}
def check_model_info(self, model_name: str) -> Tuple[bool, str]:
"""Verifica informações do modelo via API do Hugging Face."""
url = f"https://huggingface.co/api/models/{model_name}"
try:
response = requests.get(url, headers=self.headers, timeout=90)
if response.status_code == 200:
model_info = response.json()
if model_info.get('disabled', False):
return False, "Modelo desabilitado"
if model_info.get('gated', False):
return False, "Modelo requer aprovação/aceite de licença"
return True, "Modelo disponível"
elif response.status_code == 404:
return False, "Modelo não encontrado"
else:
return False, f"Erro HTTP {response.status_code}"
except requests.exceptions.RequestException as e:
return False, f"Erro na requisição: {str(e)}"
def test_model_inference(self, model_name: str) -> Tuple[bool, str]:
"""Testa se o modelo está disponível para inferência."""
url = f"https://api-inference.huggingface.co/models/{model_name}"
test_payload = {
"inputs": "Teste de disponibilidade do modelo.",
"parameters": {
"max_new_tokens": 10,
"temperature": 0.1,
"return_full_text": False
}
}
try:
response = requests.post(url, headers=self.headers, json=test_payload, timeout=90)
if response.status_code == 200:
result = response.json()
if isinstance(result, list) and len(result) > 0:
return True, "Modelo disponível para inferência"
elif isinstance(result, dict) and 'error' not in result:
return True, "Modelo disponível para inferência"
else:
return False, f"Resposta inesperada: {result}"
elif response.status_code == 503:
return False, "Modelo está carregando (503)"
elif response.status_code == 400:
error_msg = response.json().get('error', 'Erro 400')
if 'loading' in error_msg.lower():
return False, "Modelo está carregando"
return False, f"Erro 400: {error_msg}"
elif response.status_code == 401:
return False, "Token inválido ou sem permissão"
elif response.status_code == 404:
return False, "Modelo não encontrado"
else:
return False, f"Erro HTTP {response.status_code}: {response.text}"
except requests.exceptions.Timeout:
return False, "Timeout na requisição"
except requests.exceptions.RequestException as e:
return False, f"Erro na requisição: {str(e)}"
def test_model_availability(self, model_name: str) -> Tuple[bool, str]:
"""Testa se um modelo está disponível, combinando verificação de info e inferência."""
print(f"Testando modelo: {model_name}")
info_available, info_msg = self.check_model_info(model_name)
if not info_available:
return False, f"Info check failed: {info_msg}"
print(f" ✓ Info check: {info_msg}")
inference_available, inference_msg = self.test_model_inference(model_name)
if inference_available:
print(f" ✓ Inference check: {inference_msg}")
return True, f"Disponível - {info_msg}"
else:
print(f" ✗ Inference check: {inference_msg}")
return False, f"Não disponível para inferência: {inference_msg}"
def query_model(self, model_name: str, messages: List[Dict], max_tokens: int = 1000) -> str:
"""Faz requisição ao modelo usando text-generation."""
prompt = self._convert_messages_to_prompt(messages)
url = f"https://api-inference.huggingface.co/models/{model_name}"
payload = {
"inputs": prompt,
"parameters": {
"max_new_tokens": max_tokens,
"temperature": 0.7,
"do_sample": True,
"return_full_text": False
}
}
try:
response = requests.post(url, headers=self.headers, json=payload, timeout=60)
response.raise_for_status()
result = response.json()
if isinstance(result, list) and len(result) > 0:
return result[0].get('generated_text', '').strip()
elif isinstance(result, dict) and 'generated_text' in result:
return result['generated_text'].strip()
else:
return f"Formato de resposta inesperado: {result}"
except requests.exceptions.HTTPError as http_err:
return f"Erro HTTP: {http_err.response.status_code} - {http_err.response.text}"
except requests.exceptions.RequestException as e:
return f"Erro na requisição: {str(e)}"
def _convert_messages_to_prompt(self, messages: List[Dict]) -> str:
"""Converte mensagens do formato chat para prompt simples."""
prompt_parts = []
for msg in messages:
role = msg['role']
content = msg['content']
if role == 'system':
prompt_parts.append(f"Sistema: {content}")
elif role == 'user':
prompt_parts.append(f"Usuário: {content}")
elif role == 'assistant':
prompt_parts.append(f"Assistente: {content}")
prompt_parts.append("Assistente:")
return "\n\n".join(prompt_parts)
api_client = HuggingFaceAPIClient(HF_TOKEN)
# --- Função para Testar e Atualizar Modelos ---
def test_and_update_models() -> int:
"""
Testa a disponibilidade dos novos modelos e atualiza a lista MODELS.
Garante que o DEFAULT_MODEL seja sempre o primeiro da lista.
Retorna o número de modelos disponíveis.
"""
print("Testando disponibilidade dos novos modelos...")
print(f"Token HF disponível: {'Sim' if HF_TOKEN else 'Não'}")
print("-" * 60)
# Cria um dicionário temporário para os modelos disponíveis
temp_models = {}
# Primeiro verifica o modelo padrão
default_label, default_name = "Mistral 7B (Mais acertivo)", "mistralai/Mistral-7B-Instruct-v0.3"
is_available, message = api_client.test_model_availability(default_name)
if is_available:
temp_models[default_label] = default_name
print(f"✓ {default_label} (DEFAULT MODEL)")
else:
print(f"✗ {default_label} - {message} (MODELO PADRÃO INDISPONÍVEL)")
# Depois verifica os outros modelos
for model_label, model_name in NEW_MODELS_TO_TEST:
# Pula o modelo padrão se já foi testado
if model_label == default_label and model_name == default_name:
continue
is_available, message = api_client.test_model_availability(model_name)
if is_available:
temp_models[model_label] = model_name
print(f"✓ {model_label}")
else:
print(f"✗ {model_label} - {message}")
time.sleep(1)
# Atualiza MODELS garantindo que o padrão seja o primeiro
global MODELS
MODELS.clear()
# Adiciona primeiro o modelo padrão (se disponível)
if default_label in temp_models:
MODELS[default_label] = temp_models.pop(default_label)
# Adiciona os demais modelos
MODELS.update(temp_models)
print("\n" + "=" * 60)
print("MODELOS DISPONÍVEIS (ORDEM):")
print("=" * 60)
for i, (label, name) in enumerate(MODELS.items(), 1):
print(f"{i}. {label}")
print(f"\nTOTAL DE MODELOS DISPONÍVEIS: {len(MODELS)}")
print("=" * 60)
save_updated_models()
return len(MODELS)
def save_updated_models():
"""Salva a lista atualizada de modelos em um arquivo."""
try:
with open("models_available.json", "w", encoding="utf-8") as f:
json.dump(MODELS, f, ensure_ascii=False, indent=2)
print("Lista de modelos disponíveis salva em 'models_available.json'")
except Exception as e:
print(f"Erro ao salvar lista de modelos: {e}")
# --- Chat Principal ---
def responder_como_aldo(session_id: str, pergunta: str, modelo: str = DEFAULT_MODEL) -> str:
"""Gera resposta como Dr. Aldo Henrique."""
if not pergunta.strip():
return "Por favor, faça uma pergunta válida."
load_conversation_memory(session_id)
update_user_profile(session_id, pergunta)
contexto = []
if perfil := get_user_profile_context(session_id):
contexto.append(f"**Perfil do Usuário**\n{perfil}")
if conversa := get_conversation_context(session_id):
contexto.append(f"**Conversa Anterior**\n{conversa}")
if blog := retrieve_context_from_blog(pergunta):
contexto.append(f"**Contexto do Blog**\n{blog}")
system_prompt = """Você é o Dr. Aldo Henrique,
Doutor em Ciências da Computação pela UnB (2024), mestre em Ciências da Computação pela UnB (2017) e bacharel em Sistemas de Informação pela UFV (2014).
Professor universitário, onde leciona disciplinas como Algoritmos, Inteligência Artificial, Ciência de Dados e Mineração de Dados.
Atua como analista de sistemas nível 4.
Regras de conduta:
- Responda em português, de forma clara, amigável e educativa.
- Explique conceitos antes de mostrar soluções.
- Use exemplos práticos e, se houver código, comente cada linha.
- Considere o nível do usuário (iniciante, intermediário ou avançado).
- Use Markdown para formatar respostas, com ``` para blocos de código.
- Foque em tecnologia; se a pergunta for fora do escopo, informe educadamente.
"""
conteudo_contexto = "\n".join(contexto)
mensagem_usuario = f"{conteudo_contexto}\n\n**Pergunta**: {pergunta}"
messages = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": mensagem_usuario}
]
model_name = MODELS.get(modelo, MODELS[DEFAULT_MODEL])
resposta = api_client.query_model(model_name, messages)
add_to_memory(session_id, pergunta, resposta)
return resposta
# --- Inicialização ---
def inicializar_sistema():
"""
Inicializa o sistema, garantindo no mínimo 3 modelos disponíveis.
Retorna uma tupla: (status: bool, models: dict)
- status: True se >= 3 modelos disponíveis, False caso contrário
- models: Dicionário com os modelos disponíveis
"""
print("Inicializando Chatbot Dr. Aldo...")
num_available_models = test_and_update_models()
if num_available_models >= 1:
load_vector_store()
print("Sistema inicializado e pronto para uso com modelos suficientes!")
return True, MODELS
else:
print(f"Erro: Apenas {num_available_models} modelos disponíveis. São necessários pelo menos 3 modelos para iniciar o sistema.")
return False, MODELS
if __name__ == "__main__":
status, models = inicializar_sistema()
if status:
print("\n" + "="*50)
print("SISTEMA INICIADO: Realizando teste básico do Chatbot...")
print("="*50)
session_id = "teste_123"
print(responder_como_aldo(session_id, "O que é Java?"))
print("\n" + "-"*50)
print(responder_como_aldo(session_id, "Mostre um exemplo de código Java."))
print("\n" + "-"*50)
print(clear_memory(session_id))
else:
print("\nSistema não pôde ser iniciado devido à falta de modelos suficientes.")
print(f"Modelos disponíveis: {', '.join(models.keys()) if models else 'Nenhum'}")
print("Por favor, verifique a conexão com o Hugging Face e o token de acesso.")