Spaces:
Sleeping
Sleeping
# -*- coding: utf-8 -*- | |
import os | |
import requests | |
from tavily import TavilyClient | |
import markdownify | |
import wikipediaapi | |
from datetime import datetime, timezone | |
import urllib.parse | |
from readability import Document | |
from bs4 import BeautifulSoup | |
import os | |
# --- Configuração Inicial --- | |
TAVILY_API_KEY = os.getenv("TAVILY_API_KEY") | |
tavily = None # Será inicializado na função principal | |
# Configuração da API da Wikipedia | |
WIKI_LANG = 'en' # Linguagem da Wikipedia (atualizado para inglês) | |
wiki_wiki = wikipediaapi.Wikipedia( | |
language=WIKI_LANG, | |
extract_format=wikipediaapi.ExtractFormat.HTML, # Usado apenas para page.exists() | |
user_agent='MyCoolSearchBot/1.0 ([email protected])' # Definir um User-Agent é boa prática | |
) | |
MEDIAWIKI_API_URL = f"https://{WIKI_LANG}.wikipedia.org/w/api.php" | |
HEADERS = { | |
'User-Agent': 'MyCoolSearchBot/1.0 ([email protected])' | |
} | |
# --- Funções Auxiliares --- | |
def is_wikipedia_url(url): | |
"""Verifica se uma URL pertence ao domínio da Wikipedia.""" | |
return "wikipedia.org" in url.lower() | |
def download_and_convert_to_md(url): | |
"""Baixa o conteúdo HTML de uma URL e converte para Markdown.""" | |
print(f"Baixando e convertendo: {url}") | |
try: | |
response = requests.get(url, headers=HEADERS, timeout=20) | |
response.raise_for_status() # Verifica se houve erro no request | |
# Tenta detectar a codificação, mas assume UTF-8 como fallback | |
response.encoding = response.apparent_encoding or 'utf-8' | |
html_content = response.text | |
# Usa readability para extrair o conteúdo principal | |
doc = Document(html_content) | |
title = doc.short_title() | |
cleaned_html = doc.summary() | |
# Remove scripts e estilos restantes com BeautifulSoup (filtro extra) | |
soup = BeautifulSoup(cleaned_html, "html.parser") | |
for tag in soup(["script", "style", "noscript"]): | |
tag.decompose() | |
cleaned_html = str(soup) | |
# Usa markdownify para converter HTML para Markdown | |
md_content = markdownify.markdownify( | |
cleaned_html, | |
heading_style="ATX", | |
strip=['script', 'style'], | |
escape_underscores=False) | |
#return md_content | |
return f"# {title}\n\n" + md_content.strip() | |
except requests.exceptions.RequestException as e: | |
print(f"Erro ao acessar a URL {url}: {e}") | |
return None | |
except Exception as e: | |
print(f"Erro ao converter HTML para Markdown da URL {url}: {e}") | |
return None | |
# --- Funções Específicas da Wikipedia --- | |
def get_wikipedia_revision_info(page_title, target_date_str): | |
"""Busca o ID e timestamp da revisão mais próxima (<=) da data fornecida via API MediaWiki.""" | |
try: | |
# Converte a data string para um objeto datetime e formata para ISO 8601 com Z (UTC) | |
target_dt = datetime.strptime(target_date_str, '%Y-%m-%d') | |
# Precisamos do final do dia para garantir que incluímos todas as revisões daquele dia | |
target_dt_end_of_day = target_dt.replace(hour=23, minute=59, second=59, tzinfo=timezone.utc) | |
target_timestamp_iso = target_dt_end_of_day.strftime('%Y-%m-%dT%H:%M:%SZ') | |
except ValueError: | |
print("Formato de data inválido. Use AAAA-MM-DD.") | |
return None, None | |
params = { | |
"action": "query", | |
"prop": "revisions", | |
"titles": page_title, | |
"rvlimit": 1, | |
"rvdir": "older", # Busca a revisão imediatamente anterior ou igual ao timestamp | |
"rvprop": "ids|timestamp", # Queremos o ID da revisão e o timestamp | |
"rvstart": target_timestamp_iso, # Começa a busca a partir desta data/hora | |
"format": "json", | |
"formatversion": 2 # Formato JSON mais moderno e fácil de parsear | |
} | |
try: | |
print(f"Consultando API MediaWiki para revisão de '{page_title}' em {target_date_str}...") | |
response = requests.get(MEDIAWIKI_API_URL, params=params, headers=HEADERS, timeout=15) | |
response.raise_for_status() | |
data = response.json() | |
# Verifica se a página foi encontrada | |
page_data = data.get("query", {}).get("pages", []) | |
if not page_data or page_data[0].get("missing", False): | |
print(f"Página '{page_title}' não encontrada na API MediaWiki.") | |
# Tenta verificar com a biblioteca wikipediaapi como fallback (pode pegar redirecionamentos) | |
page = wiki_wiki.page(page_title) | |
if page.exists(): | |
print(f"Página '{page_title}' existe (possivelmente redirecionada para '{page.title}'). Tentando novamente com o título canônico...") | |
return get_wikipedia_revision_info(page.title, target_date_str) # Chama recursivamente com o título correto | |
else: | |
print(f"Página '{page_title}' realmente não encontrada.") | |
return None, None | |
# Extrai as revisões | |
revisions = page_data[0].get("revisions", []) | |
if not revisions: | |
print(f"Nenhuma revisão encontrada para '{page_title}' antes ou em {target_date_str}.") | |
# Pode acontecer se a página foi criada depois da data alvo | |
return None, None | |
revision = revisions[0] | |
revid = revision.get("revid") | |
timestamp = revision.get("timestamp") | |
print(f"Encontrada revisão: ID={revid}, Timestamp={timestamp}") | |
return revid, timestamp | |
except requests.exceptions.RequestException as e: | |
print(f"Erro ao chamar a API MediaWiki: {e}") | |
return None, None | |
except Exception as e: | |
print(f"Erro ao processar resposta da API MediaWiki: {e}") | |
return None, None | |
def get_wikipedia_page_historical_content(page_title, target_date_str): | |
"""Obtém o conteúdo Markdown de uma revisão histórica específica da Wikipedia.""" | |
# Busca o ID da revisão correta usando a API MediaWiki | |
revid, timestamp = get_wikipedia_revision_info(page_title, target_date_str) | |
if not revid: | |
print(f"Não foi possível encontrar uma revisão adequada para '{page_title}' em {target_date_str}.") | |
return None | |
# Constrói a URL para a revisão específica | |
# Nota: Codifica o título da página para a URL | |
# Precisamos garantir que estamos usando o título correto (pode ter sido redirecionado) | |
page_check = wiki_wiki.page(page_title) # Verifica novamente para obter o título canônico se houve redirecionamento | |
if not page_check.exists(): | |
print(f"Erro inesperado: Página '{page_title}' não encontrada após busca de revisão.") | |
return None | |
canonical_title = page_check.title | |
encoded_title = urllib.parse.quote(canonical_title.replace(' ', '_')) | |
revision_url = f"https://{WIKI_LANG}.wikipedia.org/w/index.php?title={encoded_title}&oldid={revid}" | |
print(f"Acessando URL da revisão: {revision_url}") | |
# Usa a função de download e conversão para obter o conteúdo em Markdown | |
md_content = download_and_convert_to_md(revision_url) | |
if md_content: | |
# Adiciona informação sobre a revisão no início do conteúdo (CORRIGIDO) | |
header = f"# Wikipedia Content for '{canonical_title}'\n" | |
header += f"*Revision from {timestamp} (ID: {revid})*\n" | |
header += f"*Regarding search date: {target_date_str}*\n\n" | |
header += "---\n\n" | |
return header + md_content | |
else: | |
print(f"Falha ao obter ou converter conteúdo da revisão {revid} para '{canonical_title}'.") | |
return None | |
# --- Função Principal de Busca --- | |
def search_and_extract(query, use_wikipedia_priority=False, wikipedia_date=None): | |
"""Realiza a busca, filtra resultados, baixa e converte conteúdo.""" | |
global tavily | |
if not tavily: | |
if not TAVILY_API_KEY or TAVILY_API_KEY == "INSIRA_SUA_CHAVE_TAVILY_AQUI": | |
print("Erro: Chave da API Tavily não configurada ou inválida.") | |
return [] | |
try: | |
tavily = TavilyClient(api_key=TAVILY_API_KEY) | |
except Exception as e: | |
print(f"Erro ao inicializar o cliente Tavily: {e}") | |
return [] | |
results_md = [] | |
# Sempre faz a busca com Tavily primeiro | |
print(f"\n--- Realizando busca por '{query}' usando Tavily ---") | |
try: | |
response = tavily.search(query=query, search_depth="basic", max_results=10) | |
search_results = response.get('results', []) | |
except Exception as e: | |
print(f"Erro ao realizar busca com Tavily: {e}") | |
return [] | |
if not search_results: | |
print("Nenhum resultado encontrado pela busca Tavily.") | |
return [] | |
urls_to_process = [] | |
if use_wikipedia_priority: | |
print("Prioridade para Wikipedia habilitada. Filtrando resultados Tavily por Wikipedia...") | |
wiki_urls = [res['url'] for res in search_results if is_wikipedia_url(res['url'])] | |
if wiki_urls: | |
# Pega o primeiro resultado da Wikipedia | |
first_wiki_url = wiki_urls[0] | |
page_title_guess = first_wiki_url.split('/')[-1].replace('_', ' ') | |
page_check = wiki_wiki.page(page_title_guess) | |
if page_check.exists(): | |
print(f"Encontrado resultado da Wikipedia: {first_wiki_url}") | |
if wikipedia_date: | |
# Busca revisão histórica | |
md_content = get_wikipedia_page_historical_content(page_check.title, wikipedia_date) | |
if md_content: | |
results_md.append({ | |
"source": f"Wikipedia Revision ({page_check.title} @ {wikipedia_date})", | |
"content": md_content | |
}) | |
return results_md | |
else: | |
# Usa a URL atual | |
urls_to_process = [first_wiki_url] | |
else: | |
print(f"Página da Wikipedia '{page_title_guess}' não encontrada. Usando os 5 primeiros resultados gerais.") | |
urls_to_process = [res['url'] for res in search_results[:5]] | |
else: | |
print("Nenhuma URL da Wikipedia encontrada nos resultados. Usando os 5 primeiros resultados gerais.") | |
urls_to_process = [res['url'] for res in search_results[:5]] | |
else: | |
print("Usando os 5 primeiros resultados gerais.") | |
urls_to_process = [res['url'] for res in search_results[:5]] | |
print(f"\n--- Processando {len(urls_to_process)} URLs selecionadas ---") | |
for url in urls_to_process: | |
md_content = download_and_convert_to_md(url) | |
if md_content: | |
results_md.append({ | |
"source": url, | |
"content": md_content | |
}) | |
else: | |
print(f"Falha ao processar URL: {url}") | |
return results_md | |
# --- Exemplo de Uso --- | |
if __name__ == "__main__": | |
# Verifica se a chave foi definida | |
if not TAVILY_API_KEY or TAVILY_API_KEY == "INSIRA_SUA_CHAVE_TAVILY_AQUI": | |
print("\n!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!") | |
print("!!! Chave da API Tavily não encontrada ou não definida. !!!") | |
print("!!! Verifique a variável TAVILY_API_KEY no início do script. !!!") | |
print("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!\n") | |
# exit() # Descomente para parar a execução se a chave não estiver presente | |
# Exemplo 1: Busca geral pelos 5 primeiros resultados (em inglês) | |
print("\n================ Example 1: General Search ================") | |
# Usando um tópico mais provável de ter bons resultados em inglês | |
if False: | |
resultados_gerais = search_and_extract("History of Artificial Intelligence") | |
for i, res in enumerate(resultados_gerais): | |
print(f"\nGeneral Result {i+1}: {res['source']}") | |
filename = f"general_result_{i+1}.md" | |
try: | |
with open(filename, "w", encoding="utf-8") as f: | |
f.write(f"# Source: {res['source']}\n\n") | |
f.write(res['content']) | |
print(f"Content saved to {filename}") | |
except Exception as e: | |
print(f"Error saving {filename}: {e}") | |
# print(res['content'][:300] + "...") # Mostra um trecho | |
# Exemplo 2: Busca com prioridade para Wikipedia (em inglês) | |
print("\n============= Example 2: Wikipedia Priority ============") | |
resultados_wiki = search_and_extract("studio albums published by Mercedes Sosa between.", use_wikipedia_priority=True, wikipedia_date='2022-12-31') | |
for i, res in enumerate(resultados_wiki): | |
print(f"\nWiki Priority Result {i+1}: {res['source']}") | |
filename = f"wiki_priority_result_{i+1}.md" | |
try: | |
with open(filename, "w", encoding="utf-8") as f: | |
f.write(f"# Source: {res['source']}\n\n") | |
f.write(res['content']) | |
print(f"Content saved to {filename}") | |
except Exception as e: | |
print(f"Error saving {filename}: {e}") | |
# print(res['content'][:300] + "...") | |
# Exemplo 3: Busca por revisão histórica da Wikipedia (em inglês) | |
print("\n========== Example 3: Historical Wikipedia Revision ==========") | |
# Nota: O título da página deve ser o mais exato possível. | |
# Vamos buscar uma revisão do artigo 'Python (programming language)' perto de 15 Jan 2010 | |
if False: | |
resultados_hist = search_and_extract("Python (programming language)", use_wikipedia_priority=True, wikipedia_date="2010-01-15") | |
for i, res in enumerate(resultados_hist): | |
print(f"\nHistorical Result {i+1}: {res['source']}") | |
filename = f"historical_result_python_2010.md" | |
try: | |
with open(filename, "w", encoding="utf-8") as f: | |
# O cabeçalho já está incluído pela função get_wikipedia_page_historical_content | |
f.write(res['content']) | |
print(f"Content saved to {filename}") | |
except Exception as e: | |
print(f"Error saving {filename}: {e}") | |
# print(res['content'][:300] + "...") | |
print("\nScript finished.") | |