Spaces:
Running
Running
import requests | |
import os | |
import json | |
import time | |
import pickle | |
from typing import Dict, List, Optional, Tuple | |
from bs4 import BeautifulSoup | |
from urllib.parse import urljoin, urlparse | |
from langchain.text_splitter import RecursiveCharacterTextSplitter | |
from langchain_community.vectorstores import FAISS | |
from langchain_community.embeddings import HuggingFaceEmbeddings | |
# --- Configurações --- | |
BLOG_URL = "https://aldohenrique.com.br/" | |
VECTOR_STORE_PATH = "faiss_index_store.pkl" | |
PROCESSED_URLS_PATH = "processed_urls.pkl" | |
HF_TOKEN = os.getenv("HF_TOKEN") | |
if not HF_TOKEN: | |
raise ValueError("Token HF_TOKEN não encontrado") | |
# Lista inicial de modelos | |
MODELS = {} | |
# --- Função para buscar modelos --- | |
headers = { | |
"Authorization": f"Bearer {HF_TOKEN}" | |
} | |
# Modelos fixos que você quer manter | |
NEW_MODELS_TO_TEST = [ | |
("Phi-3 Mini (Mais rápido)", "microsoft/Phi-3-mini-4k-instruct"), | |
("Zephyr 7B (Meio Termo)", "HuggingFaceH4/zephyr-7b-beta"), | |
] | |
# --- Consulta a API da Hugging Face --- | |
url = "https://huggingface.co/api/models" | |
params = { | |
"limit": 300, # Aumente se quiser trazer mais modelos | |
"full": "true" | |
} | |
response = requests.get(url, headers=headers, params=params) | |
if response.status_code != 200: | |
raise Exception(f"Erro na API: {response.status_code} - {response.text}") | |
models_data = response.json() | |
# --- Filtra modelos que possuem base_model --- | |
for model in models_data: | |
tags = model.get("tags", []) | |
base_model_tags = [tag for tag in tags if tag.startswith("base_model:")] | |
if base_model_tags: | |
model_name = model.get("id") | |
display_name = model_name.split("/")[-1] | |
# Verifica se já não está na lista para evitar duplicados | |
if not any(model_name == m[1] for m in NEW_MODELS_TO_TEST): | |
NEW_MODELS_TO_TEST.append((display_name, model_name)) | |
# --- Resultado --- | |
print("Lista atualizada de modelos:\n") | |
for name, model_id in NEW_MODELS_TO_TEST: | |
print(f'("{name}", "{model_id}"),') | |
print(f"\nTotal de modelos na lista: {len(NEW_MODELS_TO_TEST)}") | |
# Nota: Alguns modelos podem requerer aprovação ou ter restrições de acesso | |
# Recomenda-se testar cada modelo individualmente para verificar disponibilidade | |
DEFAULT_MODEL = "Zephyr 7B (Meio Termo)" | |
# --- Gerenciamento de Sessão --- | |
user_sessions: Dict[str, Dict[str, List | Dict]] = {} | |
MAX_MEMORY_LENGTH = 5 | |
def get_session_memory_path(session_id: str) -> str: | |
"""Retorna o caminho do arquivo de memória para a sessão.""" | |
return f"conversation_memory_{session_id}.json" | |
def load_conversation_memory(session_id: str): | |
"""Carrega a memória da sessão do usuário.""" | |
if session_id in user_sessions: | |
return | |
memory_path = get_session_memory_path(session_id) | |
session_data = {'conversation': [], 'user_profile': {'nivel': 'intermediario', 'total_perguntas': 0}} | |
if os.path.exists(memory_path): | |
try: | |
with open(memory_path, 'r', encoding='utf-8') as f: | |
session_data = json.load(f) | |
except Exception as e: | |
print(f"Erro ao carregar memória para sessão '{session_id}': {e}") | |
user_sessions[session_id] = session_data | |
def save_conversation_memory(session_id: str): | |
"""Salva a memória da sessão do usuário.""" | |
memory_path = get_session_memory_path(session_id) | |
try: | |
with open(memory_path, 'w', encoding='utf-8') as f: | |
json.dump(user_sessions[session_id], f, ensure_ascii=False, indent=2) | |
except Exception as e: | |
print(f"Erro ao salvar memória para sessão '{session_id}': {e}") | |
def add_to_memory(session_id: str, user_message: str, assistant_response: str): | |
"""Adiciona uma troca de mensagens à memória da sessão.""" | |
load_conversation_memory(session_id) | |
conversation = user_sessions[session_id]['conversation'] | |
conversation.extend([ | |
{"role": "user", "content": user_message, "timestamp": time.time()}, | |
{"role": "assistant", "content": assistant_response, "timestamp": time.time()} | |
]) | |
if len(conversation) > MAX_MEMORY_LENGTH * 2: | |
user_sessions[session_id]['conversation'] = conversation[-MAX_MEMORY_LENGTH * 2:] | |
save_conversation_memory(session_id) | |
def update_user_profile(session_id: str, user_message: str): | |
"""Atualiza o perfil do usuário com base na mensagem.""" | |
load_conversation_memory(session_id) | |
profile = user_sessions[session_id]['user_profile'] | |
message_lower = user_message.lower() | |
if any(word in message_lower for word in ['básico', 'iniciante']): | |
profile['nivel'] = 'iniciante' | |
elif any(word in message_lower for word in ['avançado', 'complexo']): | |
profile['nivel'] = 'avançado' | |
topics = { | |
'java': ['java', 'classe', 'objeto'], | |
'web': ['html', 'css', 'javascript'], | |
'ia': ['inteligência artificial', 'machine learning'] | |
} | |
for topic, keywords in topics.items(): | |
if any(keyword in message_lower for keyword in keywords): | |
profile[f'interesse_{topic}'] = profile.get(f'interesse_{topic}', 0) + 1 | |
profile['total_perguntas'] = profile.get('total_perguntas', 0) + 1 | |
user_sessions[session_id]['user_profile'] = profile | |
def get_conversation_context(session_id: str) -> str: | |
"""Gera o contexto da conversa recente.""" | |
load_conversation_memory(session_id) | |
conversation = user_sessions[session_id]['conversation'][-4:] | |
if not conversation: | |
return "" | |
return "\n".join(f"{msg['role'].upper()}: {msg['content']}" for msg in conversation) | |
def get_user_profile_context(session_id: str) -> str: | |
"""Gera o contexto do perfil do usuário.""" | |
load_conversation_memory(session_id) | |
profile = user_sessions[session_id]['user_profile'] | |
context = f"Nível: {profile.get('nivel', 'intermediario')}\n" | |
context += f"Total de perguntas: {profile.get('total_perguntas', 0)}\n" | |
interesses = [f"{k.replace('interesse_', '').title()} ({v})" for k, v in profile.items() if k.startswith('interesse_')] | |
if interesses: | |
context += f"Interesses: {', '.join(interesses)}\n" | |
return context | |
def clear_memory(session_id: str) -> str: | |
"""Limpa a memória de uma sessão específica.""" | |
if session_id in user_sessions: | |
del user_sessions[session_id] | |
memory_path = get_session_memory_path(session_id) | |
if os.path.exists(memory_path): | |
os.remove(memory_path) | |
return "Memória limpa com sucesso!" | |
# --- RAG (Crawling e Vector Store) --- | |
vector_store: Optional[FAISS] = None | |
def get_all_blog_links(url: str) -> set: | |
"""Coleta todos os links do blog.""" | |
links = {url} | |
visited = set() | |
while links: | |
current_url = links.pop() | |
if current_url in visited: | |
continue | |
try: | |
response = requests.get(current_url, timeout=500) | |
soup = BeautifulSoup(response.content, 'html.parser') | |
visited.add(current_url) | |
for link in soup.find_all('a', href=True): | |
href = urljoin(url, link['href']) | |
if urlparse(href).netloc == urlparse(url).netloc and '/tag/' not in href and '/category/' not in href: | |
links.add(href) | |
except Exception as e: | |
print(f"Erro ao acessar {current_url}: {e}") | |
return visited | |
def scrape_text_from_url(url: str) -> str: | |
"""Extrai texto de uma URL.""" | |
try: | |
response = requests.get(url, timeout=500) | |
soup = BeautifulSoup(response.content, 'html.parser') | |
content = soup.find('article') or soup.find('main') | |
return content.get_text(separator='\n', strip=True) if content else "" | |
except Exception as e: | |
print(f"Erro ao raspar {url}: {e}") | |
return "" | |
def build_and_save_vector_store(): | |
"""Constrói e salva o vector store.""" | |
global vector_store | |
links = get_all_blog_links(BLOG_URL) | |
texts = [scrape_text_from_url(link) for link in links if scrape_text_from_url(link)] | |
if not texts: | |
return "Nenhum conteúdo encontrado." | |
text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=150) | |
chunks = text_splitter.create_documents(texts) | |
embeddings = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2") | |
vector_store = FAISS.from_documents(chunks, embeddings) | |
with open(VECTOR_STORE_PATH, "wb") as f: | |
pickle.dump(vector_store, f) | |
with open(PROCESSED_URLS_PATH, "wb") as f: | |
pickle.dump(links, f) | |
return f"Vector store criado com {len(chunks)} chunks." | |
def load_vector_store(): | |
"""Carrega o vector store.""" | |
global vector_store | |
if os.path.exists(VECTOR_STORE_PATH): | |
with open(VECTOR_STORE_PATH, "rb") as f: | |
vector_store = pickle.load(f) | |
else: | |
build_and_save_vector_store() | |
def retrieve_context_from_blog(query: str, k: int = 4) -> str: | |
"""Busca contexto relevante no vector store.""" | |
if vector_store: | |
try: | |
results = vector_store.similarity_search(query, k=k) | |
return "\n".join(doc.page_content for doc in results) | |
except Exception as e: | |
print(f"Erro ao buscar contexto: {e}") | |
return "" | |
# --- API Client --- | |
class HuggingFaceAPIClient: | |
def __init__(self, token: str): | |
self.headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"} | |
def check_model_info(self, model_name: str) -> Tuple[bool, str]: | |
"""Verifica informações do modelo via API do Hugging Face.""" | |
url = f"https://huggingface.co/api/models/{model_name}" | |
try: | |
response = requests.get(url, headers=self.headers, timeout=90) | |
if response.status_code == 200: | |
model_info = response.json() | |
if model_info.get('disabled', False): | |
return False, "Modelo desabilitado" | |
if model_info.get('gated', False): | |
return False, "Modelo requer aprovação/aceite de licença" | |
return True, "Modelo disponível" | |
elif response.status_code == 404: | |
return False, "Modelo não encontrado" | |
else: | |
return False, f"Erro HTTP {response.status_code}" | |
except requests.exceptions.RequestException as e: | |
return False, f"Erro na requisição: {str(e)}" | |
def test_model_inference(self, model_name: str) -> Tuple[bool, str]: | |
"""Testa se o modelo está disponível para inferência.""" | |
url = f"https://api-inference.huggingface.co/models/{model_name}" | |
test_payload = { | |
"inputs": "Teste de disponibilidade do modelo.", | |
"parameters": { | |
"max_new_tokens": 10, | |
"temperature": 0.1, | |
"return_full_text": False | |
} | |
} | |
try: | |
response = requests.post(url, headers=self.headers, json=test_payload, timeout=90) | |
if response.status_code == 200: | |
result = response.json() | |
if isinstance(result, list) and len(result) > 0: | |
return True, "Modelo disponível para inferência" | |
elif isinstance(result, dict) and 'error' not in result: | |
return True, "Modelo disponível para inferência" | |
else: | |
return False, f"Resposta inesperada: {result}" | |
elif response.status_code == 503: | |
return False, "Modelo está carregando (503)" | |
elif response.status_code == 400: | |
error_msg = response.json().get('error', 'Erro 400') | |
if 'loading' in error_msg.lower(): | |
return False, "Modelo está carregando" | |
return False, f"Erro 400: {error_msg}" | |
elif response.status_code == 401: | |
return False, "Token inválido ou sem permissão" | |
elif response.status_code == 404: | |
return False, "Modelo não encontrado" | |
else: | |
return False, f"Erro HTTP {response.status_code}: {response.text}" | |
except requests.exceptions.Timeout: | |
return False, "Timeout na requisição" | |
except requests.exceptions.RequestException as e: | |
return False, f"Erro na requisição: {str(e)}" | |
def test_model_availability(self, model_name: str) -> Tuple[bool, str]: | |
"""Testa se um modelo está disponível, combinando verificação de info e inferência.""" | |
print(f"Testando modelo: {model_name}") | |
info_available, info_msg = self.check_model_info(model_name) | |
if not info_available: | |
return False, f"Info check failed: {info_msg}" | |
print(f" ✓ Info check: {info_msg}") | |
inference_available, inference_msg = self.test_model_inference(model_name) | |
if inference_available: | |
print(f" ✓ Inference check: {inference_msg}") | |
return True, f"Disponível - {info_msg}" | |
else: | |
print(f" ✗ Inference check: {inference_msg}") | |
return False, f"Não disponível para inferência: {inference_msg}" | |
def query_model(self, model_name: str, messages: List[Dict], max_tokens: int = 1000) -> str: | |
"""Faz requisição ao modelo usando text-generation.""" | |
prompt = self._convert_messages_to_prompt(messages) | |
url = f"https://api-inference.huggingface.co/models/{model_name}" | |
payload = { | |
"inputs": prompt, | |
"parameters": { | |
"max_new_tokens": max_tokens, | |
"temperature": 0.7, | |
"do_sample": True, | |
"return_full_text": False | |
} | |
} | |
try: | |
response = requests.post(url, headers=self.headers, json=payload, timeout=60) | |
response.raise_for_status() | |
result = response.json() | |
if isinstance(result, list) and len(result) > 0: | |
return result[0].get('generated_text', '').strip() | |
elif isinstance(result, dict) and 'generated_text' in result: | |
return result['generated_text'].strip() | |
else: | |
return f"Formato de resposta inesperado: {result}" | |
except requests.exceptions.HTTPError as http_err: | |
return f"Erro HTTP: {http_err.response.status_code} - {http_err.response.text}" | |
except requests.exceptions.RequestException as e: | |
return f"Erro na requisição: {str(e)}" | |
def _convert_messages_to_prompt(self, messages: List[Dict]) -> str: | |
"""Converte mensagens do formato chat para prompt simples.""" | |
prompt_parts = [] | |
for msg in messages: | |
role = msg['role'] | |
content = msg['content'] | |
if role == 'system': | |
prompt_parts.append(f"Sistema: {content}") | |
elif role == 'user': | |
prompt_parts.append(f"Usuário: {content}") | |
elif role == 'assistant': | |
prompt_parts.append(f"Assistente: {content}") | |
prompt_parts.append("Assistente:") | |
return "\n\n".join(prompt_parts) | |
api_client = HuggingFaceAPIClient(HF_TOKEN) | |
# --- Função para Testar e Atualizar Modelos --- | |
def test_and_update_models() -> int: | |
""" | |
Testa a disponibilidade dos novos modelos e atualiza a lista MODELS. | |
Garante que o DEFAULT_MODEL seja sempre o primeiro da lista. | |
Retorna o número de modelos disponíveis. | |
""" | |
print("Testando disponibilidade dos novos modelos...") | |
print(f"Token HF disponível: {'Sim' if HF_TOKEN else 'Não'}") | |
print("-" * 60) | |
# Cria um dicionário temporário para os modelos disponíveis | |
temp_models = {} | |
# Primeiro verifica o modelo padrão | |
default_label, default_name = "Mistral 7B (Mais acertivo)", "mistralai/Mistral-7B-Instruct-v0.3" | |
is_available, message = api_client.test_model_availability(default_name) | |
if is_available: | |
temp_models[default_label] = default_name | |
print(f"✓ {default_label} (DEFAULT MODEL)") | |
else: | |
print(f"✗ {default_label} - {message} (MODELO PADRÃO INDISPONÍVEL)") | |
# Depois verifica os outros modelos | |
for model_label, model_name in NEW_MODELS_TO_TEST: | |
# Pula o modelo padrão se já foi testado | |
if model_label == default_label and model_name == default_name: | |
continue | |
is_available, message = api_client.test_model_availability(model_name) | |
if is_available: | |
temp_models[model_label] = model_name | |
print(f"✓ {model_label}") | |
else: | |
print(f"✗ {model_label} - {message}") | |
time.sleep(1) | |
# Atualiza MODELS garantindo que o padrão seja o primeiro | |
global MODELS | |
MODELS.clear() | |
# Adiciona primeiro o modelo padrão (se disponível) | |
if default_label in temp_models: | |
MODELS[default_label] = temp_models.pop(default_label) | |
# Adiciona os demais modelos | |
MODELS.update(temp_models) | |
print("\n" + "=" * 60) | |
print("MODELOS DISPONÍVEIS (ORDEM):") | |
print("=" * 60) | |
for i, (label, name) in enumerate(MODELS.items(), 1): | |
print(f"{i}. {label}") | |
print(f"\nTOTAL DE MODELOS DISPONÍVEIS: {len(MODELS)}") | |
print("=" * 60) | |
save_updated_models() | |
return len(MODELS) | |
def save_updated_models(): | |
"""Salva a lista atualizada de modelos em um arquivo.""" | |
try: | |
with open("models_available.json", "w", encoding="utf-8") as f: | |
json.dump(MODELS, f, ensure_ascii=False, indent=2) | |
print("Lista de modelos disponíveis salva em 'models_available.json'") | |
except Exception as e: | |
print(f"Erro ao salvar lista de modelos: {e}") | |
# --- Chat Principal --- | |
def responder_como_aldo(session_id: str, pergunta: str, modelo: str = DEFAULT_MODEL) -> str: | |
"""Gera resposta como Dr. Aldo Henrique.""" | |
if not pergunta.strip(): | |
return "Por favor, faça uma pergunta válida." | |
load_conversation_memory(session_id) | |
update_user_profile(session_id, pergunta) | |
contexto = [] | |
if perfil := get_user_profile_context(session_id): | |
contexto.append(f"**Perfil do Usuário**\n{perfil}") | |
if conversa := get_conversation_context(session_id): | |
contexto.append(f"**Conversa Anterior**\n{conversa}") | |
if blog := retrieve_context_from_blog(pergunta): | |
contexto.append(f"**Contexto do Blog**\n{blog}") | |
system_prompt = """Você é o Dr. Aldo Henrique, | |
Doutor em Ciências da Computação pela UnB (2024), mestre em Ciências da Computação pela UnB (2017) e bacharel em Sistemas de Informação pela UFV (2014). | |
Professor universitário, onde leciona disciplinas como Algoritmos, Inteligência Artificial, Ciência de Dados e Mineração de Dados. | |
Atua como analista de sistemas nível 4. | |
Regras de conduta: | |
- Responda em português, de forma clara, amigável e educativa. | |
- Explique conceitos antes de mostrar soluções. | |
- Use exemplos práticos e, se houver código, comente cada linha. | |
- Considere o nível do usuário (iniciante, intermediário ou avançado). | |
- Use Markdown para formatar respostas, com ``` para blocos de código. | |
- Foque em tecnologia; se a pergunta for fora do escopo, informe educadamente. | |
""" | |
conteudo_contexto = "\n".join(contexto) | |
mensagem_usuario = f"{conteudo_contexto}\n\n**Pergunta**: {pergunta}" | |
messages = [ | |
{"role": "system", "content": system_prompt}, | |
{"role": "user", "content": mensagem_usuario} | |
] | |
model_name = MODELS.get(modelo, MODELS[DEFAULT_MODEL]) | |
resposta = api_client.query_model(model_name, messages) | |
add_to_memory(session_id, pergunta, resposta) | |
return resposta | |
# --- Inicialização --- | |
def inicializar_sistema(): | |
""" | |
Inicializa o sistema, garantindo no mínimo 3 modelos disponíveis. | |
Retorna uma tupla: (status: bool, models: dict) | |
- status: True se >= 3 modelos disponíveis, False caso contrário | |
- models: Dicionário com os modelos disponíveis | |
""" | |
print("Inicializando Chatbot Dr. Aldo...") | |
num_available_models = test_and_update_models() | |
if num_available_models >= 1: | |
load_vector_store() | |
print("Sistema inicializado e pronto para uso com modelos suficientes!") | |
return True, MODELS | |
else: | |
print(f"Erro: Apenas {num_available_models} modelos disponíveis. São necessários pelo menos 3 modelos para iniciar o sistema.") | |
return False, MODELS | |
if __name__ == "__main__": | |
status, models = inicializar_sistema() | |
if status: | |
print("\n" + "="*50) | |
print("SISTEMA INICIADO: Realizando teste básico do Chatbot...") | |
print("="*50) | |
session_id = "teste_123" | |
print(responder_como_aldo(session_id, "O que é Java?")) | |
print("\n" + "-"*50) | |
print(responder_como_aldo(session_id, "Mostre um exemplo de código Java.")) | |
print("\n" + "-"*50) | |
print(clear_memory(session_id)) | |
else: | |
print("\nSistema não pôde ser iniciado devido à falta de modelos suficientes.") | |
print(f"Modelos disponíveis: {', '.join(models.keys()) if models else 'Nenhum'}") | |
print("Por favor, verifique a conexão com o Hugging Face e o token de acesso.") |