Spaces:
Running
Running
import os | |
import re | |
import time | |
import pickle | |
import requests | |
from typing import Dict, Any, List, Optional, Tuple | |
from bs4 import BeautifulSoup | |
from urllib.parse import urljoin, urlparse | |
from langchain.text_splitter import RecursiveCharacterTextSplitter | |
from langchain.vectorstores import FAISS | |
from langchain.embeddings import HuggingFaceEmbeddings | |
# --- Configurações --- | |
# Chave da API da Hugging Face (essencial para o funcionamento) | |
HF_TOKEN = os.getenv("HF_TOKEN") | |
if not HF_TOKEN: | |
raise ValueError("A variável de ambiente HF_TOKEN não foi definida. Defina-a com seu token da Hugging Face.") | |
# URL do blog para a base de conhecimento (RAG) | |
BLOG_URL = "https://aldohenrique.com.br/" | |
# Caminhos para os arquivos do RAG | |
VECTOR_STORE_PATH = "faiss_index_store.pkl" | |
PROCESSED_URLS_PATH = "processed_urls.pkl" | |
# Modelos disponíveis na Hugging Face | |
MODELS = { | |
"Mistral 7B": "mistralai/Mistral-7B-Instruct-v0.3", | |
"Phi-3 Mini (Microsoft)": "microsoft/Phi-3-mini-4k-instruct", | |
"Deepseek (chat) 7B": "deepseek-ai/deepseek-vl-7b-chat", | |
"Gemma 7B (Google)":"google/gemma-7b-it", | |
"Zephyr 7B": "HuggingFaceH4/zephyr-7b-beta" | |
} | |
DEFAULT_MODEL = "Phi-3 Mini (Microsoft)" | |
# --- Variáveis Globais --- | |
# Armazena o índice vetorial para busca de contexto (RAG) | |
vector_store: Optional[FAISS] = None | |
# Dicionário para gerenciar todas as sessões de usuário em memória | |
# Estrutura: {session_id: {"history": [...], "profile": {...}}} | |
user_sessions: Dict[str, Dict[str, Any]] = {} | |
MAX_MEMORY_TURNS = 5 # Manter as últimas 5 trocas (usuário + assistente) | |
# ============================================================================== | |
# SEÇÃO DE GERENCIAMENTO DA SESSÃO (MEMÓRIA E PERFIL) | |
# ============================================================================== | |
def get_or_create_session(session_id: str) -> Dict[str, Any]: | |
""" | |
Obtém uma sessão de usuário existente ou cria uma nova. | |
A sessão é mantida apenas em memória. | |
""" | |
if session_id not in user_sessions: | |
print(f"Nova sessão criada para o ID: {session_id}") | |
user_sessions[session_id] = { | |
"history": [], | |
"profile": {"nivel": "indefinido", "interesses": {}, "total_perguntas": 0} | |
} | |
return user_sessions[session_id] | |
def update_memory(session_id: str, user_message: str, assistant_response: str): | |
"""Adiciona a troca de mensagens ao histórico da sessão.""" | |
session = get_or_create_session(session_id) | |
# Adiciona as mensagens mais recentes | |
session["history"].append({"role": "user", "content": user_message}) | |
session["history"].append({"role": "assistant", "content": assistant_response}) | |
# Garante que o histórico não exceda o tamanho máximo | |
if len(session["history"]) > MAX_MEMORY_TURNS * 2: | |
session["history"] = session["history"][-(MAX_MEMORY_TURNS * 2):] | |
def update_user_profile(session_id: str, user_message: str): | |
""" | |
Analisa a mensagem do usuário para inferir e atualizar seu perfil de interesses e nível. | |
""" | |
session = get_or_create_session(session_id) | |
profile = session["profile"] | |
msg_lower = user_message.lower() | |
# Atualiza contador de perguntas | |
profile["total_perguntas"] += 1 | |
# Inferência de nível | |
if any(word in msg_lower for word in ['básico', 'iniciante', 'começar', 'o que é']): | |
profile['nivel'] = 'iniciante' | |
elif any(word in msg_lower for word in ['avançado', 'complexo', 'otimização', 'performance', 'arquitetura']): | |
profile['nivel'] = 'avançado' | |
elif profile['nivel'] == 'indefinido': # Define como intermediário se ainda não tiver um nível | |
profile['nivel'] = 'intermediário' | |
# Inferência de interesses | |
topics = { | |
'java': ['java', 'spring', 'jpa', 'jvm'], | |
'python': ['python', 'django', 'flask', 'pandas'], | |
'web': ['html', 'css', 'javascript', 'react', 'node'], | |
'ia': ['inteligência artificial', 'machine learning', 'llm', 'rag'], | |
'banco de dados': ['sql', 'nosql', 'mongodb', 'postgresql'] | |
} | |
for topic, keywords in topics.items(): | |
if any(keyword in msg_lower for keyword in keywords): | |
profile['interesses'][topic] = profile['interesses'].get(topic, 0) + 1 | |
def clear_session_memory(session_id: str) -> str: | |
"""Limpa a memória de uma sessão específica.""" | |
if session_id in user_sessions: | |
del user_sessions[session_id] | |
return f"✅ Memória da sessão '{session_id}' foi limpa." | |
return f"⚠️ Sessão '{session_id}' não encontrada." | |
# ============================================================================== | |
# SEÇÃO RAG: BUSCA E PROCESSAMENTO DE CONTEÚDO (SEM ALTERAÇÕES SIGNIFICATIVAS) | |
# ============================================================================== | |
def scrape_text_from_url(url: str) -> str: | |
"""Extrai texto de uma URL, focando no conteúdo principal.""" | |
try: | |
response = requests.get(url, timeout=10) | |
response.raise_for_status() | |
soup = BeautifulSoup(response.content, 'html.parser') | |
main_content = soup.find('article') or soup.find('main') | |
return main_content.get_text(separator='\n', strip=True) if main_content else "" | |
except requests.RequestException as e: | |
print(f"Erro ao acessar {url}: {e}") | |
return "" | |
def build_and_save_vector_store(): | |
"""Coleta dados do blog, processa e cria um índice vetorial com FAISS.""" | |
global vector_store | |
print("Iniciando construção do RAG...") | |
# Lógica simplificada de coleta de links (pode ser expandida se necessário) | |
# Para este exemplo, vamos focar em uma URL principal | |
all_texts = [scrape_text_from_url(BLOG_URL)] | |
# Adicione mais URLs manualmente se desejar | |
# additional_urls = [f"{BLOG_URL}/sobre", f"{BLOG_URL}/contato"] | |
# all_texts.extend([scrape_text_from_url(url) for url in additional_urls]) | |
valid_texts = [text for text in all_texts if text and len(text) > 100] | |
if not valid_texts: | |
print("Nenhum texto válido encontrado para criar o RAG.") | |
return | |
print(f"Processando {len(valid_texts)} página(s).") | |
text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=150) | |
chunks = text_splitter.create_documents(valid_texts) | |
print(f"Criando {len(chunks)} chunks de texto.") | |
embeddings_model = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2") | |
vector_store = FAISS.from_documents(chunks, embeddings_model) | |
with open(VECTOR_STORE_PATH, "wb") as f: | |
pickle.dump(vector_store, f) | |
print("✅ RAG construído e salvo com sucesso!") | |
def load_vector_store(): | |
"""Carrega o índice vetorial do disco.""" | |
global vector_store | |
if os.path.exists(VECTOR_STORE_PATH): | |
print(f"Carregando RAG de '{VECTOR_STORE_PATH}'...") | |
with open(VECTOR_STORE_PATH, "rb") as f: | |
vector_store = pickle.load(f) | |
print("✅ RAG carregado.") | |
else: | |
print("Índice RAG não encontrado. Construindo um novo...") | |
build_and_save_vector_store() | |
def retrieve_rag_context(query: str, k: int = 3) -> str: | |
"""Busca no RAG por contexto relevante para a pergunta.""" | |
if vector_store: | |
try: | |
results = vector_store.similarity_search(query, k=k) | |
return "\n\n---\n\n".join([doc.page_content for doc in results]) | |
except Exception as e: | |
print(f"Erro ao buscar contexto no RAG: {e}") | |
return "" | |
# ============================================================================== | |
# SEÇÃO DA API E CONSTRUÇÃO DO PROMPT | |
# ============================================================================== | |
class HuggingFaceAPIClient: | |
"""Cliente para interagir com a API de Inferência da Hugging Face.""" | |
def __init__(self, token: str): | |
self.headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"} | |
def query(self, model_id: str, messages: List[Dict[str, str]], max_tokens: int = 2048) -> str: | |
api_url = f"https://api-inference.huggingface.co/models/{model_id}" | |
payload = { | |
"inputs": self._format_prompt_for_model(messages), | |
"parameters": { | |
"max_new_tokens": max_tokens, | |
"temperature": 0.7, | |
"top_p": 0.95, | |
"return_full_text": False, | |
}, | |
"options": {"wait_for_model": True} | |
} | |
try: | |
response = requests.post(api_url, headers=self.headers, json=payload, timeout=60) | |
response.raise_for_status() | |
result = response.json() | |
# A resposta da API de inferência pode vir em uma lista | |
if isinstance(result, list) and result: | |
return result[0].get("generated_text", "").strip() | |
# Ou em um dicionário | |
elif isinstance(result, dict): | |
return result.get("generated_text", f"Erro: Resposta inesperada do modelo: {result.get('error', '')}").strip() | |
return "Erro: Resposta vazia ou em formato inesperado." | |
except requests.Timeout: | |
return "Erro: A requisição à API demorou muito para responder (timeout)." | |
except requests.HTTPError as http_err: | |
return f"Erro HTTP: {http_err}. Detalhes: {response.text}" | |
except Exception as e: | |
return f"Ocorreu um erro inesperado na chamada da API: {e}" | |
def _format_prompt_for_model(self, messages: List[Dict[str, str]]) -> str: | |
"""Formata a lista de mensagens em uma string única para a API de inferência.""" | |
prompt_str = "" | |
for msg in messages: | |
if msg['role'] == 'system': | |
prompt_str += f"<|system|>\n{msg['content']}</s>\n" | |
elif msg['role'] == 'user': | |
prompt_str += f"<|user|>\n{msg['content']}</s>\n" | |
elif msg['role'] == 'assistant': | |
prompt_str += f"<|assistant|>\n{msg['content']}</s>\n" | |
prompt_str += "<|assistant|>\n" # Solicita a continuação do assistente | |
return prompt_str | |
class PromptBuilder: | |
"""Constrói o prompt final a ser enviado para o modelo.""" | |
SYS_PROMPT_TEMPLATE = """Você é o Professor Aldo, um especialista em programação (Java, C, Web) e IA. | |
**Sua Personalidade:** | |
- **Didático e Paciente:** Aja como um professor experiente. Explique o "porquê" das coisas, não apenas o "como". | |
- **Acolhedor e Amigável:** Use uma linguagem calorosa e acessível. | |
- **Adaptável:** Ajuste a complexidade da sua resposta ao nível de conhecimento do aluno. | |
- **Contextual:** Se a pergunta atual se conectar a algo que já discutimos, mencione essa conexão. | |
**Suas Regras:** | |
1. Responda sempre em português do Brasil. | |
2. Use blocos de código (```java, ```python, etc.) para exemplos. Comente o código para explicar cada parte. | |
3. Se a pergunta não for sobre tecnologia ou programação, educadamente informe que sua especialidade é outra. | |
4. Baseie sua resposta primariamente nas informações do seu blog (se houver contexto) e no nosso histórico de conversa. | |
A seguir, informações para te ajudar a contextualizar sua resposta:""" | |
def __init__(self, session_id: str, rag_context: str): | |
self.session = get_or_create_session(session_id) | |
self.rag_context = rag_context | |
self.parts = [] | |
def _add_profile_context(self): | |
profile = self.session["profile"] | |
if profile["total_perguntas"] > 0: | |
profile_summary = [f"**Perfil do Aluno (Inferido):**"] | |
profile_summary.append(f"- Nível de conhecimento: {profile['nivel'].capitalize()}") | |
interesses = sorted(profile['interesses'].items(), key=lambda item: item[1], reverse=True) | |
if interesses: | |
formatted_interesses = [f"{topic.capitalize()} ({count}x)" for topic, count in interesses] | |
profile_summary.append(f"- Principais interesses: {', '.join(formatted_interesses)}") | |
self.parts.append("\n".join(profile_summary)) | |
def _add_rag_context(self): | |
if self.rag_context: | |
self.parts.append(f"**Contexto Relevante do seu Blog (RAG):**\n{self.rag_context}") | |
def _add_history_context(self, current_question: str) -> List[Dict[str, str]]: | |
"""Prepara o histórico de mensagens para o modelo.""" | |
history = self.session.get("history", []) | |
# Pega as mensagens do histórico e adiciona a pergunta atual | |
messages = history + [{"role": "user", "content": current_question}] | |
return messages | |
def build(self, user_question: str) -> List[Dict[str, str]]: | |
# Adiciona os contextos ao prompt do sistema | |
self._add_profile_context() | |
self._add_rag_context() | |
system_content = self.SYS_PROMPT_TEMPLATE | |
if self.parts: | |
system_content += "\n\n" + "\n\n".join(self.parts) | |
# Monta a lista final de mensagens | |
messages = [{"role": "system", "content": system_content}] | |
messages.extend(self._add_history_context(user_question)) | |
return messages | |
# ============================================================================== | |
# FUNÇÃO PRINCIPAL E INICIALIZAÇÃO | |
# ============================================================================== | |
# Inicializa o cliente da API | |
api_client = HuggingFaceAPIClient(token=HF_TOKEN) | |
def formatar_resposta(resposta: str) -> str: | |
"""Formata a resposta com HTML para melhor visualização de código e texto.""" | |
resposta_html = resposta.replace('<', '<').replace('>', '>') | |
# Formata blocos de código | |
resposta_html = re.sub( | |
r'```(\w+)?\n(.*?)\n```', | |
r'<div style="background-color:#f0f0f0; border:1px solid #ddd; border-radius:8px; padding:15px; margin:1em 0; font-family:monospace; color:black;"><pre><code>\2</code></pre></div>', | |
resposta_html, | |
flags=re.DOTALL | |
) | |
# Formata negrito | |
resposta_html = re.sub(r'\*\*(.*?)\*\*', r'<strong>\1</strong>', resposta_html) | |
# Formata nova linha | |
return resposta_html.replace('\n', '<br>') | |
def responder_pergunta(session_id: str, pergunta: str, modelo_escolhido: str = DEFAULT_MODEL) -> str: | |
""" | |
Função principal que orquestra todo o processo de resposta. | |
""" | |
if not pergunta.strip(): | |
return "Por favor, faça uma pergunta." | |
print(f"\n--- Processando pergunta para a sessão: {session_id} ---") | |
# 1. Atualizar perfil do usuário com base na pergunta atual | |
update_user_profile(session_id, pergunta) | |
# 2. Buscar contexto relevante no RAG | |
print("Buscando no RAG...") | |
rag_context = retrieve_rag_context(pergunta) | |
if rag_context: | |
print("Contexto encontrado no RAG.") | |
# 3. Construir o prompt completo usando o PromptBuilder | |
print("Construindo prompt...") | |
builder = PromptBuilder(session_id, rag_context) | |
messages = builder.build(pergunta) | |
# DEBUG: Descomente a linha abaixo para ver o prompt exato enviado ao modelo | |
# print("MENSAGENS ENVIADAS AO MODELO:", json.dumps(messages, indent=2, ensure_ascii=False)) | |
# 4. Chamar a API da Hugging Face | |
print(f"Enviando para o modelo '{modelo_escolhido}'...") | |
model_id = MODELS.get(modelo_escolhido, MODELS[DEFAULT_MODEL]) | |
resposta_bruta = api_client.query(model_id, messages) | |
# 5. Adicionar a interação à memória da sessão | |
update_memory(session_id, pergunta, resposta_bruta) | |
# 6. Formatar e retornar a resposta | |
return formatar_resposta(resposta_bruta) | |
def inicializar_sistema(): | |
"""Carrega o RAG ao iniciar.""" | |
print("🚀 Inicializando o sistema...") | |
load_vector_store() | |
print("✅ Sistema pronto para uso.") | |
# ============================================================================== | |
# BLOCO DE TESTE | |
# ============================================================================== | |
if __name__ == "__main__": | |
inicializar_sistema() | |
# --- Simulação de Conversas --- | |
# Sessão do Usuário A (interessado em Java) | |
session_a = "aluno_java_123" | |
print("\n--- INÍCIO DA CONVERSA COM ALUNO A (Java) ---") | |
pergunta1_a = "Olá! Pode me explicar o que é Polimorfismo em Java de uma forma simples?" | |
resposta1_a = responder_pergunta(session_a, pergunta1_a) | |
print(f"ALUNO A: {pergunta1_a}\nPROFESSOR ALDO:\n{resposta1_a}\n") | |
pergunta2_a = "Entendi! Pode me dar um exemplo de código com classes e herança para ilustrar?" | |
resposta2_a = responder_pergunta(session_a, pergunta2_a) | |
print(f"ALUNO A: {pergunta2_a}\nPROFESSOR ALDO:\n{resposta2_a}\n") | |
# Sessão do Usuário B (interessado em IA) | |
session_b = "aluna_ia_456" | |
print("\n--- INÍCIO DA CONVERSA COM ALUNA B (IA) ---") | |
pergunta1_b = "Oi, professor! Eu sou nova na área. Qual a diferença entre Inteligência Artificial e Machine Learning?" | |
resposta1_b = responder_pergunta(session_b, pergunta1_b) | |
print(f"ALUNA B: {pergunta1_b}\nPROFESSOR ALDO:\n{resposta1_b}\n") | |
# Usuário A continua sua conversa, a memória do usuário B não deve interferir | |
print("\n--- ALUNO A CONTINUA SUA CONVERSA ---") | |
pergunta3_a = "Faz sentido. E como o conceito de 'override' se encaixa nisso que acabamos de ver?" | |
resposta3_a = responder_pergunta(session_a, pergunta3_a) | |
print(f"ALUNO A: {pergunta3_a}\nPROFESSOR ALDO:\n{resposta3_a}\n") | |
# Exibe o estado final das sessões | |
print("\n--- ESTADO FINAL DAS SESSÕES EM MEMÓRIA ---") | |
print(f"\nSessão A ({session_a}):") | |
# print(json.dumps(user_sessions.get(session_a), indent=2, ensure_ascii=False)) | |
print(f" Nível: {user_sessions[session_a]['profile']['nivel']}") | |
print(f" Interesses: {user_sessions[session_a]['profile']['interesses']}") | |
print(f" Tamanho do Histórico: {len(user_sessions[session_a]['history'])} mensagens") | |
print(f"\nSessão B ({session_b}):") | |
# print(json.dumps(user_sessions.get(session_b), indent=2, ensure_ascii=False)) | |
print(f" Nível: {user_sessions[session_b]['profile']['nivel']}") | |
print(f" Interesses: {user_sessions[session_b]['profile']['interesses']}") | |
print(f" Tamanho do Histórico: {len(user_sessions[session_b]['history'])} mensagens") | |
# Limpando a memória de uma sessão | |
print("\n--- LIMPANDO MEMÓRIA ---") | |
print(clear_session_memory(session_a)) | |
print(f"Sessões ativas agora: {list(user_sessions.keys())}") |