portalprogramando / ai_logic.py
aldohenrique's picture
Update ai_logic.py
942ce93 verified
raw
history blame
11.2 kB
import requests
import os
import json
import time
import pickle
from typing import Dict, List, Optional
from bs4 import BeautifulSoup
from urllib.parse import urljoin, urlparse
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.vectorstores import FAISS
from langchain_community.embeddings import HuggingFaceEmbeddings
# --- Configurações ---
BLOG_URL = "https://aldohenrique.com.br/"
VECTOR_STORE_PATH = "faiss_index_store.pkl"
PROCESSED_URLS_PATH = "processed_urls.pkl"
HF_TOKEN = os.getenv("HF_TOKEN")
if not HF_TOKEN:
raise ValueError("Token HF_TOKEN não encontrado")
MODELS = {
"Phi-3 Mini": "microsoft/Phi-3-mini-4k-instruct",
"Mistral 7B": "mistralai/Mistral-7B-Instruct-v0.3"
}
DEFAULT_MODEL = "Phi-3 Mini"
# --- Gerenciamento de Sessão ---
user_sessions: Dict[str, Dict[str, List | Dict]] = {} # {session_id: {'conversation': [], 'user_profile': {}}}
MAX_MEMORY_LENGTH = 5 # Máximo de trocas (user + assistant)
def get_session_memory_path(session_id: str) -> str:
"""Retorna o caminho do arquivo de memória para a sessão."""
return f"conversation_memory_{session_id}.json"
def load_conversation_memory(session_id: str):
"""Carrega a memória da sessão do usuário."""
if session_id in user_sessions:
return
memory_path = get_session_memory_path(session_id)
session_data = {'conversation': [], 'user_profile': {'nivel': 'intermediario', 'total_perguntas': 0}}
if os.path.exists(memory_path):
try:
with open(memory_path, 'r', encoding='utf-8') as f:
session_data = json.load(f)
except Exception as e:
print(f"Erro ao carregar memória para sessão '{session_id}': {e}")
user_sessions[session_id] = session_data
def save_conversation_memory(session_id: str):
"""Salva a memória da sessão do usuário."""
memory_path = get_session_memory_path(session_id)
try:
with open(memory_path, 'w', encoding='utf-8') as f:
json.dump(user_sessions[session_id], f, ensure_ascii=False, indent=2)
except Exception as e:
print(f"Erro ao salvar memória para sessão '{session_id}': {e}")
def add_to_memory(session_id: str, user_message: str, assistant_response: str):
"""Adiciona uma troca de mensagens à memória da sessão."""
load_conversation_memory(session_id)
conversation = user_sessions[session_id]['conversation']
conversation.extend([
{"role": "user", "content": user_message, "timestamp": time.time()},
{"role": "assistant", "content": assistant_response, "timestamp": time.time()}
])
if len(conversation) > MAX_MEMORY_LENGTH * 2:
user_sessions[session_id]['conversation'] = conversation[-MAX_MEMORY_LENGTH * 2:]
save_conversation_memory(session_id)
def update_user_profile(session_id: str, user_message: str):
"""Atualiza o perfil do usuário com base na mensagem."""
load_conversation_memory(session_id)
profile = user_sessions[session_id]['user_profile']
message_lower = user_message.lower()
# Atualiza nível
if any(word in message_lower for word in ['básico', 'iniciante']):
profile['nivel'] = 'iniciante'
elif any(word in message_lower for word in ['avançado', 'complexo']):
profile['nivel'] = 'avançado'
# Atualiza interesses
topics = {
'java': ['java', 'classe', 'objeto'],
'web': ['html', 'css', 'javascript'],
'ia': ['inteligência artificial', 'machine learning']
}
for topic, keywords in topics.items():
if any(keyword in message_lower for keyword in keywords):
profile[f'interesse_{topic}'] = profile.get(f'interesse_{topic}', 0) + 1
profile['total_perguntas'] = profile.get('total_perguntas', 0) + 1
user_sessions[session_id]['user_profile'] = profile
def get_conversation_context(session_id: str) -> str:
"""Gera o contexto da conversa recente."""
load_conversation_memory(session_id)
conversation = user_sessions[session_id]['conversation'][-4:] # Últimas 2 trocas
if not conversation:
return ""
return "\n".join(f"{msg['role'].upper()}: {msg['content']}" for msg in conversation)
def get_user_profile_context(session_id: str) -> str:
"""Gera o contexto do perfil do usuário."""
load_conversation_memory(session_id)
profile = user_sessions[session_id]['user_profile']
context = f"Nível: {profile.get('nivel', 'intermediario')}\n"
context += f"Total de perguntas: {profile.get('total_perguntas', 0)}\n"
interesses = [f"{k.replace('interesse_', '').title()} ({v})" for k, v in profile.items() if k.startswith('interesse_')]
if interesses:
context += f"Interesses: {', '.join(interesses)}\n"
return context
def clear_memory(session_id: str) -> str:
"""Limpa a memória de uma sessão específica."""
if session_id in user_sessions:
del user_sessions[session_id]
memory_path = get_session_memory_path(session_id)
if os.path.exists(memory_path):
os.remove(memory_path)
return "Memória limpa com sucesso!"
# --- RAG (Crawling e Vector Store) ---
vector_store: Optional[FAISS] = None
def get_all_blog_links(url: str) -> set:
"""Coleta todos os links do blog."""
links = {url}
visited = set()
while links:
current_url = links.pop()
if current_url in visited:
continue
try:
response = requests.get(current_url, timeout=10)
soup = BeautifulSoup(response.content, 'html.parser')
visited.add(current_url)
for link in soup.find_all('a', href=True):
href = urljoin(url, link['href'])
if urlparse(href).netloc == urlparse(url).netloc and '/tag/' not in href and '/category/' not in href:
links.add(href)
except Exception as e:
print(f"Erro ao acessar {current_url}: {e}")
return visited
def scrape_text_from_url(url: str) -> str:
"""Extrai texto de uma URL."""
try:
response = requests.get(url, timeout=10)
soup = BeautifulSoup(response.content, 'html.parser')
content = soup.find('article') or soup.find('main')
return content.get_text(separator='\n', strip=True) if content else ""
except Exception as e:
print(f"Erro ao raspar {url}: {e}")
return ""
def build_and_save_vector_store():
"""Constrói e salva o vector store."""
global vector_store
links = get_all_blog_links(BLOG_URL)
texts = [scrape_text_from_url(link) for link in links if scrape_text_from_url(link)]
if not texts:
return "Nenhum conteúdo encontrado."
text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=150)
chunks = text_splitter.create_documents(texts)
embeddings = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2")
vector_store = FAISS.from_documents(chunks, embeddings)
with open(VECTOR_STORE_PATH, "wb") as f:
pickle.dump(vector_store, f)
with open(PROCESSED_URLS_PATH, "wb") as f:
pickle.dump(links, f)
return f"Vector store criado com {len(chunks)} chunks."
def load_vector_store():
"""Carrega o vector store."""
global vector_store
if os.path.exists(VECTOR_STORE_PATH):
with open(VECTOR_STORE_PATH, "rb") as f:
vector_store = pickle.load(f)
else:
build_and_save_vector_store()
def retrieve_context_from_blog(query: str, k: int = 3) -> str:
"""Busca contexto relevante no vector store."""
if vector_store:
try:
results = vector_store.similarity_search(query, k=k)
return "\n".join(doc.page_content for doc in results)
except Exception as e:
print(f"Erro ao buscar contexto: {e}")
return ""
# --- API Client ---
class HuggingFaceAPIClient:
def __init__(self, token: str):
self.headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}
def query_model(self, model_name: str, messages: List[Dict], max_tokens: int = 1000) -> str:
"""Faz requisição à API do Hugging Face."""
url = f"https://api-inference.huggingface.co/models/{model_name}/v1/chat/completions"
payload = {
"model": model_name,
"messages": messages,
"max_tokens": max_tokens,
"temperature": 0.7
}
try:
response = requests.post(url, headers=self.headers, json=payload, timeout=30)
response.raise_for_status()
return response.json()["choices"][0]["message"]["content"].strip()
except Exception as e:
return f"Erro na API: {str(e)}"
api_client = HuggingFaceAPIClient(HF_TOKEN)
# --- Chat Principal ---
def responder_como_aldo(session_id: str, pergunta: str, modelo: str = DEFAULT_MODEL) -> str:
"""Gera resposta como Dr. Aldo Henrique."""
if not pergunta.strip():
return "Por favor, faça uma pergunta válida."
load_conversation_memory(session_id)
update_user_profile(session_id, pergunta)
# Monta contexto
contexto = []
if perfil := get_user_profile_context(session_id):
contexto.append(f"**Perfil do Usuário**\n{perfil}")
if conversa := get_conversation_context(session_id):
contexto.append(f"**Conversa Anterior**\n{conversa}")
if blog := retrieve_context_from_blog(pergunta):
contexto.append(f"**Contexto do Blog**\n{blog}")
system_prompt = """Você é o Dr. Aldo Henrique, especialista em programação e tecnologia. Tem doutorado em Ciencias da Computação
Regras de conduta:
- Nunca mencione qual modelo de linguagem está respondendo (ex: Phi-3, Mistral, etc).
- Responda em português, de forma clara, amigável e educativa.
- Explique conceitos antes de mostrar soluções.
- Use exemplos práticos e, se houver código, comente cada linha.
- Considere o nível do usuário (iniciante, intermediário ou avançado).
- Use Markdown para formatar respostas, com ``` para blocos de código.
- Foque em tecnologia; se a pergunta for fora do escopo, informe educadamente.
"""
conteudo_contexto = "\n".join(contexto)
mensagem_usuario = f"{conteudo_contexto}\n\n**Pergunta**: {pergunta}"
messages = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": mensagem_usuario}
]
model_name = MODELS.get(modelo, MODELS[DEFAULT_MODEL])
resposta = api_client.query_model(model_name, messages)
add_to_memory(session_id, pergunta, resposta)
return resposta
# --- Inicialização ---
def inicializar_sistema():
"""Inicializa o sistema."""
print("Inicializando Chatbot Dr. Aldo...")
load_vector_store()
print("Sistema inicializado!")
if __name__ == "__main__":
inicializar_sistema()
session_id = "teste_123"
print(responder_como_aldo(session_id, "O que é Java?"))
print(responder_como_aldo(session_id, "Mostre um exemplo de código Java."))
print(clear_memory(session_id))