Spaces:
Running
Running
import requests | |
import os | |
import json | |
import re | |
import time | |
import pickle | |
from typing import Dict, Any, List, Optional, Tuple | |
from bs4 import BeautifulSoup | |
from urllib.parse import urljoin, urlparse | |
from langchain.text_splitter import RecursiveCharacterTextSplitter | |
from langchain.vectorstores import FAISS | |
from langchain.embeddings import HuggingFaceEmbeddings | |
# --- Configuração do RAG --- | |
BLOG_URL = "https://aldohenrique.com.br/" | |
VECTOR_STORE_PATH = "faiss_index_store.pkl" | |
PROCESSED_URLS_PATH = "processed_urls.pkl" | |
# CONVERSATION_MEMORY_PATH não será mais uma constante única | |
# --- Configuração da API Hugging Face --- | |
HF_TOKEN = os.getenv("HF_TOKEN") | |
if not HF_TOKEN: | |
raise ValueError("Token HF_TOKEN não encontrado nas variáveis de ambiente") | |
MODELS = { | |
"Mistral 7B": "mistralai/Mistral-7B-Instruct-v0.3", | |
"Phi-3 Mini (Microsoft)": "microsoft/Phi-3-mini-4k-instruct", | |
"Mistral (code) 7B": "mistralai/Mamba-Codestral-7B-v0.1", | |
"Zephyr 7B": "HuggingFaceH4/zephyr-7b-beta" | |
} | |
DEFAULT_MODEL = "Phi-3 Mini (Microsoft)" | |
# --- Variáveis Globais para o RAG e Memória --- | |
vector_store: Optional[FAISS] = None | |
# Dicionário para armazenar a memória de cada usuário/sessão | |
# Formato: {session_id: {'conversation': List[Dict], 'user_profile': Dict}} | |
user_sessions: Dict[str, Dict[str, Any]] = {} | |
max_memory_length = 10 # Máximo de trocas de mensagens na memória | |
# ============================================================================== | |
# SEÇÃO MEMÓRIA: GERENCIAMENTO DA CONVERSA E PERFIL DO USUÁRIO | |
# ============================================================================== | |
def get_session_memory_path(session_id: str) -> str: | |
"""Retorna o caminho do arquivo de memória para uma dada sessão.""" | |
return f"conversation_memory_{session_id}.json" | |
def load_conversation_memory(session_id: str): | |
"""Carrega a memória da conversa para uma sessão específica.""" | |
memory_path = get_session_memory_path(session_id) | |
session_data = {'conversation': [], 'user_profile': {}} | |
try: | |
if os.path.exists(memory_path): | |
with open(memory_path, 'r', encoding='utf-8') as f: | |
data = json.load(f) | |
session_data['conversation'] = data.get('conversation', []) | |
session_data['user_profile'] = data.get('user_profile', {}) | |
print(f"Memória para sessão '{session_id}' carregada: {len(session_data['conversation'])} mensagens") | |
else: | |
print(f"Nova conversa iniciada para sessão '{session_id}'") | |
except Exception as e: | |
print(f"Erro ao carregar memória para sessão '{session_id}': {e}") | |
user_sessions[session_id] = session_data | |
def save_conversation_memory(session_id: str): | |
"""Salva a memória da conversa para uma sessão específica.""" | |
memory_path = get_session_memory_path(session_id) | |
session_data = user_sessions.get(session_id, {'conversation': [], 'user_profile': {}}) | |
try: | |
data = { | |
'conversation': session_data['conversation'], | |
'user_profile': session_data['user_profile'], | |
'last_updated': time.time() | |
} | |
with open(memory_path, 'w', encoding='utf-8') as f: | |
json.dump(data, f, ensure_ascii=False, indent=2) | |
except Exception as e: | |
print(f"Erro ao salvar memória para sessão '{session_id}': {e}") | |
def add_to_memory(session_id: str, user_message: str, assistant_response: str): | |
"""Adiciona uma troca de mensagens à memória de uma sessão específica.""" | |
if session_id not in user_sessions: | |
load_conversation_memory(session_id) # Garante que a sessão está carregada | |
session_data = user_sessions[session_id] | |
conversation = session_data['conversation'] | |
conversation.append({ | |
"role": "user", | |
"content": user_message, | |
"timestamp": time.time() | |
}) | |
conversation.append({ | |
"role": "assistant", | |
"content": assistant_response, | |
"timestamp": time.time() | |
}) | |
# Limita o tamanho da memória | |
if len(conversation) > max_memory_length * 2: # *2 porque temos user + assistant | |
user_sessions[session_id]['conversation'] = conversation[-max_memory_length * 2:] | |
save_conversation_memory(session_id) | |
def update_user_profile(session_id: str, user_message: str): | |
"""Atualiza o perfil do usuário de uma sessão específica baseado nas mensagens.""" | |
if session_id not in user_sessions: | |
load_conversation_memory(session_id) # Garante que a sessão está carregada | |
user_profile = user_sessions[session_id]['user_profile'] | |
# Detecta tópicos de interesse | |
topics = { | |
'java': ['java', 'classe', 'objeto', 'herança', 'polimorfismo'], | |
'c': ['linguagem c', 'ponteiro', 'malloc', 'struct'], | |
'web': ['html', 'css', 'javascript', 'react', 'node'], | |
'ia': ['inteligência artificial', 'machine learning', 'neural', 'algoritmo'], | |
'banco_dados': ['sql', 'database', 'banco de dados', 'mysql'] | |
} | |
user_message_lower = user_message.lower() | |
for topic, keywords in topics.items(): | |
if any(keyword in user_message_lower for keyword in keywords): | |
user_profile[f'interesse_{topic}'] = user_profile.get(f'interesse_{topic}', 0) + 1 | |
# Detecta nível de conhecimento baseado na complexidade das perguntas | |
if any(word in user_message_lower for word in ['básico', 'iniciante', 'começar', 'o que é']): | |
user_profile['nivel'] = 'iniciante' | |
elif any(word in user_message_lower for word in ['avançado', 'complexo', 'otimização', 'performance']): | |
user_profile['nivel'] = 'avançado' | |
elif user_profile.get('nivel') is None: | |
user_profile['nivel'] = 'intermediario' | |
user_profile['total_perguntas'] = user_profile.get('total_perguntas', 0) + 1 | |
user_sessions[session_id]['user_profile'] = user_profile # Atualiza no dicionário global | |
def get_conversation_context(session_id: str) -> str: | |
"""Gera um resumo do contexto da conversa para o prompt de uma sessão específica.""" | |
if session_id not in user_sessions: | |
load_conversation_memory(session_id) | |
conversation_history = user_sessions[session_id]['conversation'] | |
if not conversation_history: | |
return "" | |
# Pega as últimas 6 mensagens (3 trocas) | |
recent_messages = conversation_history[-6:] if len(conversation_history) > 6 else conversation_history | |
context = "--- CONTEXTO DA CONVERSA ANTERIOR ---\n" | |
for msg in recent_messages: | |
role = "USUÁRIO" if msg["role"] == "user" else "PROFESSOR" | |
# Limita o tamanho de cada mensagem no contexto | |
content = msg["content"][:200] + "..." if len(msg["content"]) > 200 else msg["content"] | |
context += f"{role}: {content}\n" | |
context += "--- FIM DO CONTEXTO DA CONVERSA ---\n" | |
return context | |
def get_user_profile_context(session_id: str) -> str: | |
"""Gera informações sobre o perfil do usuário de uma sessão específica para personalizar a resposta.""" | |
if session_id not in user_sessions: | |
load_conversation_memory(session_id) | |
user_profile = user_sessions[session_id]['user_profile'] | |
if not user_profile: | |
return "" | |
context = "--- PERFIL DO ALUNO ---\n" | |
# Nível de conhecimento | |
nivel = user_profile.get('nivel', 'intermediario') | |
context += f"Nível: {nivel}\n" | |
# Principais interesses | |
interesses = [] | |
for key, value in user_profile.items(): | |
if key.startswith('interesse_') and value > 0: | |
topic = key.replace('interesse_', '').replace('_', ' ') | |
interesses.append(f"{topic} ({value}x)") | |
if interesses: | |
context += f"Principais interesses: {', '.join(interesses)}\n" | |
total = user_profile.get('total_perguntas', 0) | |
context += f"Total de perguntas feitas: {total}\n" | |
context += "--- FIM DO PERFIL DO ALUNO ---\n" | |
return context | |
def clear_memory(session_id: str): | |
"""Limpa a memória da conversa de uma sessão específica.""" | |
memory_path = get_session_memory_path(session_id) | |
if session_id in user_sessions: | |
del user_sessions[session_id] # Remove da memória em tempo de execução | |
try: | |
if os.path.exists(memory_path): | |
os.remove(memory_path) | |
return "✅ Memória da conversa limpa com sucesso!" | |
except Exception as e: | |
return f"❌ Erro ao limpar memória: {e}" | |
# ============================================================================== | |
# SEÇÃO RAG: FUNÇÕES PARA CRAWLING, EMBEDDING E ARMAZENAMENTO (SEM ALTERAÇÕES) | |
# ============================================================================== | |
def get_all_blog_links(url: str, processed_urls: set) -> set: | |
"""Navega pelo blog para encontrar todos os links de posts e páginas.""" | |
links_to_visit = {url} | |
visited_links = set() | |
while links_to_visit: | |
current_url = links_to_visit.pop() | |
if current_url in visited_links: | |
continue | |
try: | |
response = requests.get(current_url, timeout=10) | |
response.raise_for_status() | |
soup = BeautifulSoup(response.content, 'html.parser') | |
visited_links.add(current_url) | |
print(f"Visitando: {current_url}") | |
for link in soup.find_all('a', href=True): | |
href = link['href'] | |
full_url = urljoin(url, href) | |
if urlparse(full_url).netloc == urlparse(url).netloc and full_url not in visited_links: | |
links_to_visit.add(full_url) | |
except requests.RequestException as e: | |
print(f"Erro ao acessar {current_url}: {e}") | |
final_links = {link for link in visited_links if '/tag/' not in link and '/category/' not in link and '?' not in link} | |
return final_links | |
def scrape_text_from_url(url: str) -> str: | |
"""Extrai o texto principal (de artigos) de uma URL.""" | |
try: | |
response = requests.get(url, timeout=10) | |
soup = BeautifulSoup(response.content, 'html.parser') | |
main_content = soup.find('article') or soup.find('main') | |
if main_content: | |
return main_content.get_text(separator='\n', strip=True) | |
return "" | |
except Exception as e: | |
print(f"Erro ao raspar {url}: {e}") | |
return "" | |
def build_and_save_vector_store() -> Tuple[str, Optional[str], Optional[str]]: | |
""" | |
Função principal do RAG: raspa o blog, cria chunks, gera embeddings e salva o vector store. | |
""" | |
global vector_store | |
start_time = time.time() | |
print("Iniciando o processo de retreino do RAG...") | |
processed_urls = set() | |
all_links = get_all_blog_links(BLOG_URL, processed_urls) | |
print(f"Encontrados {len(all_links)} links para processar.") | |
all_texts = [scrape_text_from_url(link) for link in all_links if link not in processed_urls] | |
all_texts = [text for text in all_texts if text] | |
print(f"Textos extraídos de {len(all_texts)} novas páginas.") | |
if not all_texts: | |
return "Nenhum novo conteúdo encontrado para treinar.", None, None | |
text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=150) | |
chunks = text_splitter.create_documents(all_texts) | |
print(f"Textos divididos em {len(chunks)} chunks.") | |
print("Carregando modelo de embedding...") | |
embeddings_model = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2") | |
print("Criando o vector store com FAISS...") | |
vector_store = FAISS.from_documents(chunks, embeddings_model) | |
with open(VECTOR_STORE_PATH, "wb") as f: | |
pickle.dump(vector_store, f) | |
with open(PROCESSED_URLS_PATH, "wb") as f: | |
pickle.dump(all_links, f) | |
end_time = time.time() | |
message = f"✅ Retreino do RAG concluído em {end_time - start_time:.2f} segundos. {len(chunks)} chunks de texto processados." | |
return message, VECTOR_STORE_PATH, PROCESSED_URLS_PATH | |
def load_vector_store(): | |
"""Carrega o vector store do arquivo, se existir.""" | |
global vector_store | |
if os.path.exists(VECTOR_STORE_PATH): | |
print(f"Carregando vector store existente de '{VECTOR_STORE_PATH}'...") | |
with open(VECTOR_STORE_PATH, "rb") as f: | |
vector_store = pickle.load(f) | |
print("Vector store carregado com sucesso.") | |
else: | |
print("Nenhum vector store encontrado. É necessário treinar o modelo.") | |
message, _, _ = build_and_save_vector_store() | |
print(message) | |
def retrieve_context_from_blog(query: str, k: int = 3) -> str: | |
"""Busca no vector store por chunks de texto similares à pergunta.""" | |
if vector_store: | |
try: | |
results = vector_store.similarity_search(query, k=k) | |
context = "\n\n---\n\n".join([doc.page_content for doc in results]) | |
return context | |
except Exception as e: | |
return f"Erro ao buscar contexto: {e}" | |
return "" | |
# ============================================================================== | |
# SEÇÃO API CLIENT: CÓDIGO ORIGINAL PARA CHAMAR A API DO HUGGING FACE | |
# ============================================================================== | |
class HuggingFaceAPIClient: | |
def __init__(self, token: str): | |
self.token = token | |
self.headers = { | |
"Authorization": f"Bearer {token}", | |
"Content-Type": "application/json" | |
} | |
def query_model(self, model_name: str, messages: list, max_tokens: int = 1500) -> str: | |
"""Faz requisição para a API do Hugging Face""" | |
url = f"https://api-inference.huggingface.co/models/{model_name}/v1/chat/completions" | |
payload = { | |
"model": model_name, | |
"messages": messages, | |
"max_tokens": max_tokens, | |
"temperature": 0.7, | |
"top_p": 0.9, | |
"stream": False | |
} | |
try: | |
response = requests.post(url, headers=self.headers, json=payload, timeout=9999) | |
if response.status_code == 200: | |
result = response.json() | |
return result["choices"][0]["message"]["content"] | |
else: | |
return self._fallback_text_generation(model_name, messages, max_tokens) | |
except Exception as e: | |
return f"Erro na API: {str(e)}" | |
def _fallback_text_generation(self, model_name: str, messages: list, max_tokens: int) -> str: | |
url = f"https://api-inference.huggingface.co/models/{model_name}" | |
prompt = self._messages_to_prompt(messages) | |
payload = { | |
"inputs": prompt, | |
"parameters": { | |
"max_new_tokens": max_tokens, "temperature": 0.7, "top_p": 0.9, | |
"do_sample": True, "return_full_text": False | |
}, | |
"options": {"wait_for_model": True, "use_cache": False} | |
} | |
try: | |
response = requests.post(url, headers=self.headers, json=payload, timeout=9999) | |
if response.status_code == 200: | |
result = response.json() | |
if isinstance(result, list) and len(result) > 0: | |
generated_text = result[0].get("generated_text", "") | |
if generated_text: | |
if "Assistente: " in generated_text: | |
parts = generated_text.split("Assistente: ") | |
if len(parts) > 1: return parts[-1].strip() | |
return generated_text.strip() | |
return "Resposta vazia" | |
elif isinstance(result, dict): | |
if "error" in result: return f"Erro do modelo: {result['error']}" | |
elif "generated_text" in result: return result["generated_text"].strip() | |
return "Formato de resposta inesperado" | |
elif response.status_code == 404: return f"❌ Modelo '{model_name}' não encontrado." | |
elif response.status_code == 503: return "⏳ Modelo carregando... Tente novamente." | |
elif response.status_code == 429: return "⚠️ Muitas requisições. Tente novamente." | |
else: return f"Erro HTTP {response.status_code}: {response.text[:200]}..." | |
except requests.Timeout: | |
return "⏰ Timeout - Modelo demorou muito para responder." | |
except Exception as e: | |
return f"Erro na requisição: {str(e)}" | |
def _messages_to_prompt(self, messages: list) -> str: | |
prompt = "" | |
for msg in messages: | |
prompt += f"{msg['role'].capitalize()}: {msg['content']}\n\n" | |
prompt += "Assistente: " | |
return prompt | |
# Inicializar cliente da API | |
api_client = HuggingFaceAPIClient(HF_TOKEN) | |
# ============================================================================== | |
# SEÇÃO PRINCIPAL: LÓGICA DO CHATBOT COM MEMÓRIA POR USUÁRIO | |
# ============================================================================== | |
def formatar_resposta_com_codigo(resposta: str) -> str: | |
"""Formata a resposta destacando códigos em blocos separados.""" | |
if not resposta: return resposta | |
resposta = resposta.replace('<', '<').replace('>', '>') | |
resposta_formatada = re.sub( | |
r'```(\w+)?\n(.*?)\n```', | |
r'<div style="background-color: #f8f9fa; color: #1a1a1a; border: 1px solid #e9ecef; border-radius: 8px; padding: 15px; margin: 10px 0; font-family: Monaco, Consolas, monospace; overflow-x: auto;"><strong style="color: #1a1a1a;">💻 Código:</strong><br><pre style="color: #1a1a1a; margin: 5px 0; white-space: pre-wrap; word-wrap: break-word;"><code>\2</code></pre></div>', | |
resposta, flags=re.DOTALL | |
) | |
resposta_formatada = re.sub( | |
r'`([^`]+)`', | |
r'<code style="background-color: #f1f3f4; color: #1a1a1a; padding: 2px 4px; border-radius: 4px; font-family: Monaco, Consolas, monospace;">\1</code>', | |
resposta_formatada | |
) | |
resposta_formatada = resposta_formatada.replace('\n', '<br>') | |
resposta_formatada = re.sub( | |
r'^\*\*(.*?)\*\*', | |
r'<h3 style="color: #1a1a1a; margin-top: 20px; margin-bottom: 10px;">\1</h3>', | |
resposta_formatada, flags=re.MULTILINE | |
) | |
return resposta_formatada | |
def responder_como_aldo(session_id: str, pergunta: str, modelo_escolhido: str = DEFAULT_MODEL) -> str: | |
"""Função principal para gerar respostas, agora com RAG e MEMÓRIA por sessão.""" | |
if not pergunta.strip(): | |
return "Por favor, faça uma pergunta." | |
try: | |
# Garante que a sessão do usuário está carregada | |
if session_id not in user_sessions: | |
load_conversation_memory(session_id) | |
# Atualiza o perfil do usuário baseado na pergunta | |
update_user_profile(session_id, pergunta) | |
# --- ETAPA DE RAG --- | |
print(f"Buscando contexto para a pergunta: '{pergunta[:50]}...' para sessão '{session_id}'") | |
contexto_blog = retrieve_context_from_blog(pergunta) | |
# --- ETAPA DE MEMÓRIA --- | |
contexto_conversa = get_conversation_context(session_id) | |
contexto_perfil = get_user_profile_context(session_id) | |
# Prompt do sistema personalizado baseado no perfil do usuário | |
nivel = user_sessions[session_id]['user_profile'].get('nivel', 'intermediario') | |
system_prompt = f"""Você é o professor Dr. Aldo Henrique, especialista em C, Java, desenvolvimento web e inteligência artificial. | |
PERSONALIDADE E COMPORTAMENTO: | |
- Seja caloroso, acolhedor e paciente como um professor humano experiente | |
- Demonstre interesse genuíno pelo aprendizado do aluno | |
- Use um tom conversacional e amigável, mas mantenha autoridade acadêmica | |
- Quando apropriado, faça conexões com conversas anteriores | |
- Celebre o progresso do aluno e encoraje quando necessário | |
- Adapte sua linguagem ao nível do aluno: {nivel} | |
ESTILO DE ENSINO: | |
- Sempre explique o "porquê" antes do "como" | |
- Use analogias e exemplos práticos | |
- Encoraje perguntas e curiosidade | |
- Quando mostrar código, sempre explique com comentários detalhados | |
- Foque na compreensão, não apenas na solução | |
- Conecte conceitos com aplicações do mundo real | |
REGRAS: | |
- Responda sempre em português brasileiro | |
- Use blocos de código formatados com ``` | |
- Só responda perguntas relacionadas a programação e tecnologia | |
- Se não for sobre TI, informe educadamente que sua especialidade é tecnologia | |
- Quando apresentar código, sempre explique linha por linha nos comentários""" | |
# Monta o prompt completo com todos os contextos | |
prompt_parts = [] | |
if contexto_perfil: | |
prompt_parts.append(contexto_perfil) | |
if contexto_conversa: | |
prompt_parts.append(contexto_conversa) | |
if contexto_blog: | |
prompt_parts.append("--- CONTEXTO DO SEU BLOG ---") | |
prompt_parts.append(contexto_blog) | |
prompt_parts.append("--- FIM DO CONTEXTO DO BLOG ---") | |
prompt_parts.append(f"PERGUNTA ATUAL DO ALUNO: {pergunta}") | |
# Adiciona instruções específicas baseadas no contexto | |
if contexto_conversa: | |
prompt_parts.append("\nIMPORTANTE: Considere o contexto da nossa conversa anterior ao responder. Se esta pergunta se relaciona com algo que já discutimos, faça essa conexão naturalmente.") | |
pergunta_completa = "\n\n".join(prompt_parts) | |
messages = [ | |
{"role": "system", "content": system_prompt}, | |
{"role": "user", "content": pergunta_completa} | |
] | |
model_name = MODELS.get(modelo_escolhido, MODELS[DEFAULT_MODEL]) | |
resposta = api_client.query_model(model_name, messages) | |
if resposta.startswith("Assistente: "): | |
resposta = resposta.replace("Assistente: ", "") | |
# Adiciona a conversa à memória da sessão | |
add_to_memory(session_id, pergunta, resposta) | |
resposta_formatada = formatar_resposta_com_codigo(resposta.strip()) | |
return resposta_formatada | |
except Exception as e: | |
return f"Erro ao processar sua pergunta: {str(e)}" | |
# ============================================================================== | |
# FUNÇÕES AUXILIARES E DE TESTE | |
# ============================================================================== | |
def verificar_modelo_disponivel(model_name: str) -> str: | |
try: | |
url = f"[https://api-inference.huggingface.co/models/](https://api-inference.huggingface.co/models/){model_name}" | |
headers = {"Authorization": f"Bearer {HF_TOKEN}"} | |
payload = {"inputs": "Hello", "parameters": {"max_new_tokens": 5}} | |
response = requests.post(url, headers=headers, json=payload, timeout=9999) | |
if response.status_code == 200: return "✅ Disponível" | |
elif response.status_code == 404: return "❌ Não encontrado" | |
elif response.status_code == 503: return "⏳ Carregando..." | |
else: return f"⚠️ Status {response.status_code}" | |
except Exception as e: | |
return f"❌ Erro: {str(e)[:50]}..." | |
def testar_todos_modelos(): | |
resultados = [] | |
for nome, modelo in MODELS.items(): | |
status = verificar_modelo_disponivel(modelo) | |
resultados.append(f"{nome}: {status}") | |
return "\n".join(resultados) | |
def get_memory_stats(session_id: str) -> str: | |
"""Retorna estatísticas da memória atual para uma sessão específica.""" | |
if session_id not in user_sessions: | |
load_conversation_memory(session_id) | |
session_data = user_sessions[session_id] | |
conversation_history = session_data['conversation'] | |
user_profile = session_data['user_profile'] | |
total_messages = len(conversation_history) | |
user_messages = len([m for m in conversation_history if m["role"] == "user"]) | |
stats = f"📊 **Estatísticas da Memória para Sessão '{session_id}':**\n" | |
stats += f"• Total de mensagens: {total_messages}\n" | |
stats += f"• Perguntas do usuário: {user_messages}\n" | |
stats += f"• Nível detectado: {user_profile.get('nivel', 'Não definido')}\n" | |
# Principais interesses | |
interesses = [] | |
for key, value in user_profile.items(): | |
if key.startswith('interesse_') and value > 0: | |
topic = key.replace('interesse_', '').replace('_', ' ').title() | |
interesses.append(f"{topic} ({value})") | |
if interesses: | |
stats += f"• Principais interesses: {', '.join(interesses)}\n" | |
return stats | |
# ============================================================================== | |
# INICIALIZAÇÃO DO SISTEMA | |
# ============================================================================== | |
def inicializar_sistema(): | |
"""Inicializa todos os componentes do sistema.""" | |
print("🚀 Inicializando o Chatbot Dr. Aldo Henrique com Memória...") | |
# Carrega o vector store (RAG) | |
load_vector_store() | |
# **NÃO CARREGAMOS MAIS UMA MEMÓRIA GLOBAL AQUI** | |
# A memória será carregada e gerenciada por sessão individualmente | |
print("✅ Sistema inicializado com sucesso!") | |
print(f"🧠 Vector Store: {'Carregado' if vector_store else 'Não encontrado'}") | |
# Chama a inicialização quando o módulo é carregado | |
if __name__ == "__main__": | |
inicializar_sistema() | |
# Exemplo de uso para testar com sessões diferentes: | |
print("\n--- Testando sessões independentes ---") | |
# Simulando um usuário A | |
session_id_a = "user_A_session_123" | |
print(f"\nUsuário A ({session_id_a}):") | |
print(responder_como_aldo(session_id_a, "Olá, o que é Java?")) | |
print(responder_como_aldo(session_id_a, "Pode me dar um exemplo de código Java para 'Hello World'?")) | |
print(get_memory_stats(session_id_a)) | |
# Simulando um usuário B | |
session_id_b = "user_B_session_456" | |
print(f"\nUsuário B ({session_id_b}):") | |
print(responder_como_aldo(session_id_b, "Qual a diferença entre IA e Machine Learning?")) | |
print(get_memory_stats(session_id_b)) | |
# Usuário A continua sua conversa | |
print(f"\nUsuário A ({session_id_a}) continua:") | |
print(responder_como_aldo(session_id_a, "E sobre polimorfismo em Java?")) | |
print(get_memory_stats(session_id_a)) | |
# Limpar a memória de uma sessão específica | |
print(f"\nLimpando memória do Usuário B ({session_id_b}):") | |
print(clear_memory(session_id_b)) | |
print(get_memory_stats(session_id_b)) # Deve mostrar memória vazia ou não encontrada para B | |
# Usuário A ainda tem sua memória | |
print(f"\nVerificando memória do Usuário A ({session_id_a}) após limpar B:") | |
print(get_memory_stats(session_id_a)) |