Spaces:
Running
Running
from fastapi import APIRouter, HTTPException, Query | |
from fastapi.responses import JSONResponse | |
import httpx | |
import json | |
import re | |
from urllib.parse import unquote | |
from PIL import Image | |
import io | |
import asyncio | |
import struct | |
from typing import Optional, Tuple | |
router = APIRouter() | |
async def search( | |
q: str = Query(..., description="Termo de pesquisa para imagens"), | |
min_width: int = Query(1200, description="Largura mínima das imagens (padrão: 1200px)") | |
): | |
""" | |
Busca imagens no Google Imagens e retorna uma lista estruturada | |
Agora com filtro de largura mínima | |
""" | |
# URL do Google Imagens com parâmetros para imagens grandes | |
google_images_url = "http://www.google.com/search" | |
params = { | |
"tbm": "isch", # Google Images | |
"q": q, | |
"start": 0, | |
"sa": "N", | |
"asearch": "arc", | |
"cs": "1", | |
"tbs": "isz:l", # Adiciona filtro para imagens grandes (Large) | |
# Outras opções disponíveis: | |
# "isz:m" = Medium | |
# "isz:i" = Icon | |
# "isz:lt,islt:2mp" = Larger than 2MP | |
# "isz:ex,iszw:1920,iszh:1080" = Exact size | |
"async": f"arc_id:srp_GgSMaOPQOtL_5OUPvbSTOQ_110,ffilt:all,ve_name:MoreResultsContainer,inf:1,_id:arc-srp_GgSMaOPQOtL_5OUPvbSTOQ_110,_pms:s,_fmt:pc" | |
} | |
headers = { | |
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", | |
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", | |
"Accept-Language": "pt-BR,pt;q=0.8,en-US;q=0.5,en;q=0.3", | |
"Accept-Encoding": "gzip, deflate", | |
"Connection": "keep-alive", | |
"Referer": "https://www.google.com/" | |
} | |
try: | |
async with httpx.AsyncClient(timeout=30.0) as client: | |
response = await client.get(google_images_url, params=params, headers=headers) | |
if response.status_code != 200: | |
raise HTTPException(status_code=response.status_code, detail="Erro ao buscar no Google Imagens") | |
# Extrair dados das imagens do conteúdo retornado | |
images = extract_images_from_response(response.text) | |
# Enriquecer com dimensões reais das imagens (otimizado) | |
enriched_images = await enrich_images_with_dimensions_optimized(images) | |
# Filtrar apenas imagens que têm dimensões válidas E largura >= min_width | |
valid_images = [ | |
img for img in enriched_images | |
if img.get('width') is not None | |
and img.get('height') is not None | |
and img.get('width') >= min_width | |
] | |
# Se não temos resultados suficientes, tenta buscar mais com filtros mais agressivos | |
if len(valid_images) < 20: | |
print(f"Poucos resultados com largura >= {min_width}px, buscando mais imagens...") | |
# Tenta uma segunda busca com filtro de imagens extra grandes | |
params["tbs"] = "isz:lt,islt:4mp" # Larger than 4MP | |
async with httpx.AsyncClient(timeout=30.0) as client: | |
response2 = await client.get(google_images_url, params=params, headers=headers) | |
if response2.status_code == 200: | |
additional_images = extract_images_from_response(response2.text) | |
additional_enriched = await enrich_images_with_dimensions_optimized(additional_images) | |
# Combina os resultados e remove duplicatas | |
all_images = enriched_images + additional_enriched | |
seen_urls = set() | |
unique_images = [] | |
for img in all_images: | |
if (img.get('url') not in seen_urls | |
and img.get('width') is not None | |
and img.get('height') is not None | |
and img.get('width') >= min_width): | |
seen_urls.add(img.get('url')) | |
unique_images.append(img) | |
valid_images = unique_images | |
# Ordena por largura (maiores primeiro) e limita a 50 resultados | |
valid_images.sort(key=lambda x: x.get('width', 0), reverse=True) | |
final_images = valid_images[:50] | |
return JSONResponse(content={ | |
"query": q, | |
"min_width_filter": min_width, | |
"total_found": len(final_images), | |
"images": final_images | |
}) | |
except httpx.TimeoutException: | |
raise HTTPException(status_code=408, detail="Timeout na requisição ao Google") | |
except Exception as e: | |
raise HTTPException(status_code=500, detail=f"Erro ao executar a busca: {str(e)}") | |
def clean_wikimedia_url(url: str) -> str: | |
""" | |
Remove 'thumb/' das URLs do Wikimedia para obter imagem em resolução original | |
Funciona com URLs que terminam direto no arquivo ou com redimensionamento | |
""" | |
if 'wikimedia.org' in url and '/thumb/' in url: | |
try: | |
# Casos possíveis: | |
# 1. https://upload.wikimedia.org/wikipedia/commons/thumb/7/79/James_Gunn_%2828557194032%29_%28cropped%29.jpg | |
# 2. https://upload.wikimedia.org/wikipedia/commons/thumb/7/79/James_Gunn_%2828557194032%29_%28cropped%29.jpg/220px-James_Gunn_%2828557194032%29_%28cropped%29.jpg | |
# Ambos devem virar: https://upload.wikimedia.org/wikipedia/commons/7/79/James_Gunn_%2828557194032%29_%28cropped%29.jpg | |
# Divide a URL na parte /thumb/ | |
parts = url.split('/thumb/') | |
if len(parts) == 2: | |
before_thumb = parts[0] # https://upload.wikimedia.org/wikipedia/commons | |
after_thumb = parts[1] # 7/79/James_Gunn_%2828557194032%29_%28cropped%29.jpg ou 7/79/James_Gunn_%2828557194032%29_%28cropped%29.jpg/220px-... | |
# Divide o after_thumb por barras | |
path_parts = after_thumb.split('/') | |
if len(path_parts) >= 3: | |
# Estrutura: ['7', '79', 'filename.jpg'] ou ['7', '79', 'filename.jpg', 'resized-filename.jpg'] | |
# Queremos sempre pegar os 3 primeiros elementos (diretório + nome original) | |
original_path = '/'.join(path_parts[:3]) # '7/79/James_Gunn_%2828557194032%29_%28cropped%29.jpg' | |
cleaned_url = f"{before_thumb}/{original_path}" | |
print(f"URL limpa: {url} -> {cleaned_url}") | |
return cleaned_url | |
elif len(path_parts) == 2: | |
# Caso onde só tem diretório/arquivo: 7/79 (sem o nome do arquivo) | |
# Neste caso, a URL original pode estar malformada, retorna como está | |
print(f"URL do Wikimedia malformada (sem nome do arquivo): {url}") | |
except Exception as e: | |
print(f"Erro ao limpar URL do Wikimedia: {e}") | |
return url | |
def extract_images_from_response(response_text: str) -> list: | |
""" | |
Extrai informações das imagens do HTML/JavaScript retornado pelo Google | |
Agora busca mais URLs para garantir resultados com alta resolução | |
""" | |
images = [] | |
try: | |
# Usar o regex antigo que funcionava para pegar todas as URLs | |
pattern = r'https?:\/\/[^\s"\'<>]+?\.(?:jpg|png|webp|jpeg)\b' | |
image_urls = re.findall(pattern, response_text, re.IGNORECASE) | |
# Remove duplicatas mantendo a ordem | |
seen_urls = set() | |
unique_urls = [] | |
for url in image_urls: | |
# Limpa a URL imediatamente ao extrair | |
cleaned_url = clean_wikimedia_url(url) | |
if cleaned_url not in seen_urls: | |
seen_urls.add(cleaned_url) | |
unique_urls.append(cleaned_url) | |
# Extrai mais URLs inicialmente (150) porque muitas serão filtradas por largura | |
# Isso garante que tenhamos pelo menos 50 resultados válidos com largura >= 1200px | |
for url in unique_urls[:150]: | |
images.append({ | |
"url": url, | |
"width": None, | |
"height": None | |
}) | |
except Exception as e: | |
print(f"Erro na extração: {e}") | |
return images | |
def get_image_size_from_bytes(data: bytes) -> Optional[Tuple[int, int]]: | |
""" | |
Extrai dimensões da imagem usando apenas os primeiros bytes (muito rápido) | |
Suporta JPEG, PNG, GIF, WebP sem usar PIL - versão melhorada | |
""" | |
if len(data) < 24: | |
return None | |
try: | |
# JPEG | |
if data[:2] == b'\xff\xd8': | |
i = 2 | |
while i < len(data) - 8: | |
if data[i:i+2] == b'\xff\xc0' or data[i:i+2] == b'\xff\xc2': | |
if i + 9 <= len(data): | |
height = struct.unpack('>H', data[i+5:i+7])[0] | |
width = struct.unpack('>H', data[i+7:i+9])[0] | |
if width > 0 and height > 0: | |
return width, height | |
i += 1 | |
# PNG | |
elif data[:8] == b'\x89PNG\r\n\x1a\n': | |
if len(data) >= 24: | |
width = struct.unpack('>I', data[16:20])[0] | |
height = struct.unpack('>I', data[20:24])[0] | |
if width > 0 and height > 0: | |
return width, height | |
# GIF | |
elif data[:6] in (b'GIF87a', b'GIF89a'): | |
if len(data) >= 10: | |
width = struct.unpack('<H', data[6:8])[0] | |
height = struct.unpack('<H', data[8:10])[0] | |
if width > 0 and height > 0: | |
return width, height | |
# WebP | |
elif data[:4] == b'RIFF' and len(data) > 12 and data[8:12] == b'WEBP': | |
if len(data) >= 30: | |
if data[12:16] == b'VP8 ': | |
# VP8 format | |
if len(data) >= 30: | |
width = struct.unpack('<H', data[26:28])[0] & 0x3fff | |
height = struct.unpack('<H', data[28:30])[0] & 0x3fff | |
if width > 0 and height > 0: | |
return width, height | |
elif data[12:16] == b'VP8L': | |
# VP8L format | |
if len(data) >= 25: | |
bits = struct.unpack('<I', data[21:25])[0] | |
width = (bits & 0x3fff) + 1 | |
height = ((bits >> 14) & 0x3fff) + 1 | |
if width > 0 and height > 0: | |
return width, height | |
elif data[12:16] == b'VP8X': | |
# VP8X format (extended) | |
if len(data) >= 30: | |
width = struct.unpack('<I', data[24:27] + b'\x00')[0] + 1 | |
height = struct.unpack('<I', data[27:30] + b'\x00')[0] + 1 | |
if width > 0 and height > 0: | |
return width, height | |
except (struct.error, IndexError) as e: | |
# Se houver erro no parsing, retorna None silenciosamente | |
pass | |
return None | |
async def get_image_dimensions_fast(client: httpx.AsyncClient, url: str) -> Tuple[str, Optional[int], Optional[int]]: | |
""" | |
Obtém dimensões da imagem de forma otimizada | |
A URL já foi limpa na função extract_images_from_response | |
""" | |
try: | |
# Limpa a URL de caracteres escapados e problemáticos | |
clean_url = url.replace('\\u003d', '=').replace('\\u0026', '&').replace('\\\\', '').replace('\\/', '/') | |
# Headers otimizados - primeiro tenta com range pequeno | |
headers = { | |
'Range': 'bytes=0-2048', # Aumentado para 2KB para ser mais confiável | |
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36', | |
'Accept': 'image/*', | |
'Accept-Encoding': 'gzip, deflate', | |
'Connection': 'close' | |
} | |
# Timeout bem baixo para ser rápido | |
response = await client.get(clean_url, headers=headers, timeout=5.0) | |
if response.status_code in [200, 206]: # 206 = Partial Content (normal com Range) | |
# Tenta primeiro com parsing manual (mais rápido) | |
dimensions = get_image_size_from_bytes(response.content) | |
if dimensions: | |
print(f"Dimensões obtidas via parsing manual para {clean_url}: {dimensions[0]}x{dimensions[1]}") | |
return clean_url, dimensions[0], dimensions[1] | |
# Se não conseguiu com parsing manual, tenta baixar mais dados | |
print(f"Parsing manual falhou para {clean_url}, tentando baixar mais dados...") | |
# Remove o Range header para baixar mais dados | |
headers_full = { | |
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36', | |
'Accept': 'image/*', | |
'Accept-Encoding': 'gzip, deflate', | |
'Connection': 'close' | |
} | |
# Tenta baixar os primeiros 10KB | |
headers_full['Range'] = 'bytes=0-10240' | |
response_full = await client.get(clean_url, headers=headers_full, timeout=5.0) | |
if response_full.status_code in [200, 206]: | |
# Tenta novamente com parsing manual | |
dimensions = get_image_size_from_bytes(response_full.content) | |
if dimensions: | |
print(f"Dimensões obtidas via parsing manual (10KB) para {clean_url}: {dimensions[0]}x{dimensions[1]}") | |
return clean_url, dimensions[0], dimensions[1] | |
# Fallback para PIL se necessário | |
try: | |
image = Image.open(io.BytesIO(response_full.content)) | |
width, height = image.size | |
print(f"Dimensões obtidas via PIL para {clean_url}: {width}x{height}") | |
return clean_url, width, height | |
except Exception as pil_error: | |
print(f"PIL também falhou para {clean_url}: {pil_error}") | |
else: | |
print(f"Erro HTTP {response.status_code} para {clean_url}") | |
except Exception as e: | |
print(f"Erro ao obter dimensões para {url}: {e}") | |
print(f"Não foi possível obter dimensões para {url}") | |
return url, None, None | |
async def enrich_images_with_dimensions_optimized(images: list) -> list: | |
""" | |
Versão otimizada para obter dimensões das imagens | |
""" | |
if not images: | |
return [] | |
# Configurações otimizadas para velocidade | |
connector = httpx.AsyncClient( | |
timeout=httpx.Timeout(3.0), # Timeout bem baixo | |
limits=httpx.Limits( | |
max_keepalive_connections=20, | |
max_connections=30, | |
keepalive_expiry=5.0 | |
) | |
# http2=True removido para evitar dependência extra | |
) | |
# Semáforo para controlar concorrência (aumentado para 15 para processar mais rápido) | |
semaphore = asyncio.Semaphore(15) | |
async def process_image_with_semaphore(image_data): | |
async with semaphore: | |
url, width, height = await get_image_dimensions_fast(connector, image_data["url"]) | |
return { | |
"url": url, | |
"width": width, | |
"height": height | |
} | |
try: | |
# Processa todas as imagens em paralelo | |
tasks = [process_image_with_semaphore(img) for img in images] | |
results = await asyncio.gather(*tasks, return_exceptions=True) | |
# Filtra apenas resultados válidos | |
valid_images = [] | |
for result in results: | |
if not isinstance(result, Exception): | |
# Adiciona informação se a URL foi limpa (para debug) | |
if 'wikimedia.org' in result['url'] and '/thumb/' not in result['url']: | |
result['cleaned_wikimedia_url'] = True | |
valid_images.append(result) | |
return valid_images | |
finally: | |
await connector.aclose() |