Spaces:
Running
Running
import requests | |
import os | |
import json | |
import re | |
import time | |
import pickle | |
from typing import Dict, Any, List, Optional, Tuple | |
from bs4 import BeautifulSoup | |
from urllib.parse import urljoin, urlparse | |
from langchain.text_splitter import RecursiveCharacterTextSplitter | |
from langchain.vectorstores import FAISS | |
from langchain.embeddings import HuggingFaceEmbeddings | |
# --- Configuração do RAG --- | |
BLOG_URL = "https://aldohenrique.com.br/" | |
VECTOR_STORE_PATH = "faiss_index_store.pkl" | |
PROCESSED_URLS_PATH = "processed_urls.pkl" | |
# --- Configuração da API Hugging Face --- | |
HF_TOKEN = os.getenv("HF_TOKEN") | |
if not HF_TOKEN: | |
raise ValueError("Token HF_TOKEN não encontrado nas variáveis de ambiente") | |
MODELS = { | |
"Phi-3 Mini (Microsoft)": "microsoft/Phi-3-mini-4k-instruct", | |
"Mistral 7B": "mistralai/Mistral-7B-Instruct-v0.3", | |
"Zephyr 7B": "HuggingFaceH4/zephyr-7b-beta", | |
#"Llama 3.2 3B (Meta)": "meta-llama/Llama-3.2-3B-Instruct", | |
#"DeepSeek-Coder-V2": "deepseek-ai/DeepSeek-Coder-V2-Lite-Instruct", | |
} | |
DEFAULT_MODEL = "Phi-3 Mini (Microsoft)" | |
# --- Variáveis Globais para o RAG --- | |
vector_store: Optional[FAISS] = None | |
# ============================================================================== | |
# SEÇÃO RAG: FUNÇÕES PARA CRAWLING, EMBEDDING E ARMAZENAMENTO | |
# ============================================================================== | |
def get_all_blog_links(url: str, processed_urls: set) -> set: | |
"""Navega pelo blog para encontrar todos os links de posts e páginas.""" | |
links_to_visit = {url} | |
visited_links = set() | |
while links_to_visit: | |
current_url = links_to_visit.pop() | |
if current_url in visited_links: | |
continue | |
try: | |
response = requests.get(current_url, timeout=10) | |
response.raise_for_status() | |
soup = BeautifulSoup(response.content, 'html.parser') | |
visited_links.add(current_url) | |
print(f"Visitando: {current_url}") | |
for link in soup.find_all('a', href=True): | |
href = link['href'] | |
full_url = urljoin(url, href) | |
# Garante que estamos no mesmo domínio e não é um link de âncora | |
if urlparse(full_url).netloc == urlparse(url).netloc and full_url not in visited_links: | |
links_to_visit.add(full_url) | |
except requests.RequestException as e: | |
print(f"Erro ao acessar {current_url}: {e}") | |
# Filtra apenas as páginas que parecem ser posts ou páginas de conteúdo | |
final_links = {link for link in visited_links if '/tag/' not in link and '/category/' not in link and '?' not in link} | |
return final_links | |
def scrape_text_from_url(url: str) -> str: | |
"""Extrai o texto principal (de artigos) de uma URL.""" | |
try: | |
response = requests.get(url, timeout=10) | |
soup = BeautifulSoup(response.content, 'html.parser') | |
# Tenta encontrar a tag <article> ou <main> que geralmente contém o conteúdo principal | |
main_content = soup.find('article') or soup.find('main') | |
if main_content: | |
return main_content.get_text(separator='\n', strip=True) | |
return "" | |
except Exception as e: | |
print(f"Erro ao raspar {url}: {e}") | |
return "" | |
def build_and_save_vector_store() -> Tuple[str, Optional[str], Optional[str]]: | |
""" | |
Função principal do RAG: raspa o blog, cria chunks, gera embeddings e salva o vector store. | |
Esta é a nossa função de "treino". | |
Retorna uma tupla (mensagem_status, caminho_do_arquivo_faiss_para_download, caminho_do_arquivo_urls_para_download). | |
""" | |
global vector_store | |
start_time = time.time() | |
print("Iniciando o processo de retreino do RAG...") | |
processed_urls = set() | |
# 1. Obter todos os links do blog | |
all_links = get_all_blog_links(BLOG_URL, processed_urls) | |
print(f"Encontrados {len(all_links)} links para processar.") | |
# 2. Raspar o texto de cada link | |
all_texts = [scrape_text_from_url(link) for link in all_links if link not in processed_urls] | |
all_texts = [text for text in all_texts if text] # Remove textos vazios | |
print(f"Textos extraídos de {len(all_texts)} novas páginas.") | |
if not all_texts: | |
return "Nenhum novo conteúdo encontrado para treinar.", None, None # Retorna None para os arquivos se não houver conteúdo | |
# 3. Dividir os textos em chunks | |
text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=150) | |
chunks = text_splitter.create_documents(all_texts) | |
print(f"Textos divididos em {len(chunks)} chunks.") | |
# 4. Criar embeddings e o vector store (FAISS) | |
print("Carregando modelo de embedding...") | |
embeddings_model = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2") | |
print("Criando o vector store com FAISS...") | |
vector_store = FAISS.from_documents(chunks, embeddings_model) | |
# 5. Salvar o vector store e as URLs processadas em disco | |
with open(VECTOR_STORE_PATH, "wb") as f: | |
pickle.dump(vector_store, f) | |
with open(PROCESSED_URLS_PATH, "wb") as f: | |
pickle.dump(all_links, f) | |
end_time = time.time() | |
message = f"✅ Retreino do RAG concluído em {end_time - start_time:.2f} segundos. {len(chunks)} chunks de texto processados." | |
return message, VECTOR_STORE_PATH, PROCESSED_URLS_PATH | |
def load_vector_store(): | |
"""Carrega o vector store do arquivo, se existir.""" | |
global vector_store | |
if os.path.exists(VECTOR_STORE_PATH): | |
print(f"Carregando vector store existente de '{VECTOR_STORE_PATH}'...") | |
with open(VECTOR_STORE_PATH, "rb") as f: | |
vector_store = pickle.load(f) | |
print("Vector store carregado com sucesso.") | |
else: | |
print("Nenhum vector store encontrado. É necessário treinar o modelo.") | |
# Inicia o treino automaticamente se não houver um índice | |
# Modificado para ignorar o retorno dos caminhos dos arquivos ao carregar | |
message, _, _ = build_and_save_vector_store() | |
print(message) # Imprime a mensagem de status do treino inicial | |
def retrieve_context_from_blog(query: str, k: int = 3) -> str: | |
"""Busca no vector store por chunks de texto similares à pergunta.""" | |
if vector_store: | |
try: | |
results = vector_store.similarity_search(query, k=k) | |
context = "\n\n---\n\n".join([doc.page_content for doc in results]) | |
return context | |
except Exception as e: | |
return f"Erro ao buscar contexto: {e}" | |
return "" | |
# ============================================================================== | |
# SEÇÃO API CLIENT: CÓDIGO ORIGINAL PARA CHAMAR A API DO HUGGING FACE | |
# ============================================================================== | |
class HuggingFaceAPIClient: | |
def __init__(self, token: str): | |
self.token = token | |
self.headers = { | |
"Authorization": f"Bearer {token}", | |
"Content-Type": "application/json" | |
} | |
def query_model(self, model_name: str, messages: list, max_tokens: int = 1500) -> str: | |
"""Faz requisição para a API do Hugging Face""" | |
url = f"https://api-inference.huggingface.co/models/{model_name}/v1/chat/completions" | |
payload = { | |
"model": model_name, | |
"messages": messages, | |
"max_tokens": max_tokens, | |
"temperature": 0.7, | |
"top_p": 0.9, | |
"stream": False | |
} | |
try: | |
response = requests.post(url, headers=self.headers, json=payload, timeout=9999) | |
if response.status_code == 200: | |
result = response.json() | |
return result["choices"][0]["message"]["content"] | |
else: | |
return self._fallback_text_generation(model_name, messages, max_tokens) | |
except Exception as e: | |
return f"Erro na API: {str(e)}" | |
def _fallback_text_generation(self, model_name: str, messages: list, max_tokens: int) -> str: | |
url = f"https://api-inference.huggingface.co/models/{model_name}" | |
prompt = self._messages_to_prompt(messages) | |
payload = { | |
"inputs": prompt, | |
"parameters": { | |
"max_new_tokens": max_tokens, "temperature": 0.7, "top_p": 0.9, | |
"do_sample": True, "return_full_text": False | |
}, | |
"options": {"wait_for_model": True, "use_cache": False} | |
} | |
try: | |
response = requests.post(url, headers=self.headers, json=payload, timeout=9999) | |
if response.status_code == 200: | |
result = response.json() | |
if isinstance(result, list) and len(result) > 0: | |
generated_text = result[0].get("generated_text", "") | |
if generated_text: | |
if "Assistente: " in generated_text: | |
parts = generated_text.split("Assistente: ") | |
if len(parts) > 1: return parts[-1].strip() | |
return generated_text.strip() | |
return "Resposta vazia" | |
elif isinstance(result, dict): | |
if "error" in result: return f"Erro do modelo: {result['error']}" | |
elif "generated_text" in result: return result["generated_text"].strip() | |
return "Formato de resposta inesperado" | |
elif response.status_code == 404: return f"❌ Modelo '{model_name}' não encontrado." | |
elif response.status_code == 503: return "⏳ Modelo carregando... Tente novamente." | |
elif response.status_code == 429: return "⚠️ Muitas requisições. Tente novamente." | |
else: return f"Erro HTTP {response.status_code}: {response.text[:200]}..." | |
except requests.Timeout: | |
return "⏰ Timeout - Modelo demorou muito para responder." | |
except Exception as e: | |
return f"Erro na requisição: {str(e)}" | |
def _messages_to_prompt(self, messages: list) -> str: | |
prompt = "" | |
for msg in messages: | |
prompt += f"{msg['role'].capitalize()}: {msg['content']}\n\n" | |
prompt += "Assistente: " | |
return prompt | |
# Inicializar cliente da API | |
api_client = HuggingFaceAPIClient(HF_TOKEN) | |
# ============================================================================== | |
# SEÇÃO PRINCIPAL: LÓGICA DO CHATBOT | |
# ============================================================================== | |
def formatar_resposta_com_codigo(resposta: str) -> str: | |
"""Formata a resposta destacando códigos em blocos separados.""" | |
if not resposta: return resposta | |
# Primeiro, substituir < e > por entidades HTML para evitar interpretação como tags | |
resposta = resposta.replace('<', '<').replace('>', '>') | |
resposta_formatada = re.sub( | |
r'```(\w+)?\n(.*?)\n```', | |
r'<div style="background-color: #f8f9fa; color: #1a1a1a; border: 1px solid #e9ecef; border-radius: 8px; padding: 15px; margin: 10px 0; font-family: Monaco, Consolas, monospace; overflow-x: auto;"><strong style="color: #1a1a1a;">💻 Código:</strong><br><pre style="color: #1a1a1a; margin: 5px 0; white-space: pre-wrap; word-wrap: break-word;"><code>\2</code></pre></div>', | |
resposta, flags=re.DOTALL | |
) | |
resposta_formatada = re.sub( | |
r'`([^`]+)`', | |
r'<code style="background-color: #f1f3f4; color: #1a1a1a; padding: 2px 4px; border-radius: 4px; font-family: Monaco, Consolas, monospace;">\1</code>', | |
resposta_formatada | |
) | |
resposta_formatada = resposta_formatada.replace('\n', '<br>') | |
resposta_formatada = re.sub( | |
r'^\*\*(.*?)\*\*', | |
r'<h3 style="color: #1a1a1a; margin-top: 20px; margin-bottom: 10px;">\1</h3>', | |
resposta_formatada, flags=re.MULTILINE | |
) | |
return resposta_formatada | |
def responder_como_aldo(pergunta: str, modelo_escolhido: str = DEFAULT_MODEL) -> str: | |
"""Função principal para gerar respostas, agora com RAG.""" | |
if not pergunta.strip(): | |
return "Por favor, faça uma pergunta." | |
try: | |
# --- ETAPA DE RAG --- | |
print(f"Buscando contexto para a pergunta: '{pergunta[:50]}...'") | |
contexto_blog = retrieve_context_from_blog(pergunta) | |
# Montar o prompt do sistema com o contexto do RAG | |
system_prompt = ( | |
"Você é o professor Dr. Aldo Henrique, especialista em C, Java, desenvolvimento web e inteligência artificial. " | |
"Responda com clareza, profundidade e tom acadêmico. Foque em explicar e não em só mostrar o resultado. " | |
"Responda sempre em português brasileiro. Use blocos de código formatados com ```. " | |
"Não responda se a pergunta não for sobre o universo de programação e tecnologia." | |
"Nem sempre fornecer código, mas quando tiver código, sempre explique utilizando comentários, o aluno precisa aprender lendo os comentários." | |
"Quando for pergunta sobre disciplinas, foque no conteúdo do blog." | |
) | |
# Montar prompt do usuário, injetando o contexto do blog | |
if contexto_blog: | |
pergunta_completa = ( | |
"Você é o professor Dr. Aldo Henrique, especialista em C, Java, desenvolvimento web e inteligência artificial. " | |
"Com base no seguinte contexto extraído do seu blog, responda à pergunta do usuário.\n\n" | |
"--- CONTEXTO DO BLOG ---\n" | |
f"{contexto_blog}\n" | |
"--- FIM DO CONTEXTO ---\n\n" | |
f"PERGUNTA DO USUÁRIO: {pergunta}" | |
"Responda sempre em português brasileiro. Use blocos de código formatados com ```. " | |
"Não responda nada se a pergunta não for sobre o universo de programação e tecnologia." | |
"Nem sempre fornecer código, mas quando tiver código, sempre explique utilizando comentários, o aluno precisa aprender lendo os comentários." | |
"Quando for pergunta sobre disciplinas, foque no conteúdo do blog." | |
) | |
print("Contexto encontrado e injetado no prompt.") | |
else: | |
pergunta_completa = f"{pergunta} Não responda nada se a pergunta não for sobre o universo de programação e tecnologia, informe que o Dr. Aldo Henrique só tem domínio em TI. Você é o Professor Dr. Aldo Henrique, foque em explicar e não em só mostrar o resultado. Quando apresentar código, use blocos de código formatados com ```. Sempre responda primeiro a explicação e depois modestre o código." | |
print("Nenhum contexto relevante encontrado no blog, usando prompt padrão.") | |
messages = [ | |
{"role": "system", "content": system_prompt}, | |
{"role": "user", "content": pergunta_completa} | |
] | |
model_name = MODELS.get(modelo_escolhido, MODELS[DEFAULT_MODEL]) | |
resposta = api_client.query_model(model_name, messages) | |
if resposta.startswith("Assistente: "): | |
resposta = resposta.replace("Assistente: ", "") | |
resposta_formatada = formatar_resposta_com_codigo(resposta.strip()) | |
return resposta_formatada | |
except Exception as e: | |
return f"Erro ao processar sua pergunta: {str(e)}" | |
# Funções de teste | |
def verificar_modelo_disponivel(model_name: str) -> str: | |
try: | |
url = f"https://api-inference.huggingface.co/models/{model_name}" | |
headers = {"Authorization": f"Bearer {HF_TOKEN}"} | |
payload = {"inputs": "Hello", "parameters": {"max_new_tokens": 5}} | |
response = requests.post(url, headers=headers, json=payload, timeout=9999) | |
if response.status_code == 200: return "✅ Disponível" | |
elif response.status_code == 404: return "❌ Não encontrado" | |
elif response.status_code == 503: return "⏳ Carregando..." | |
else: return f"⚠️ Status {response.status_code}" | |
except Exception as e: | |
return f"❌ Erro: {str(e)[:50]}..." | |
def testar_todos_modelos(): | |
resultados = [] | |
for nome, modelo in MODELS.items(): | |
status = verificar_modelo_disponivel(modelo) | |
resultados.append(f"{nome}: {status}") | |
return "\n".join(resultados) |