Spaces:
Build error
Build error
import os | |
import numpy as np | |
from PIL import Image | |
import gradio as gr | |
from deepface import DeepFace | |
from datasets import load_dataset | |
import pickle | |
from io import BytesIO | |
from huggingface_hub import upload_file, hf_hub_download, list_repo_files | |
from pathlib import Path | |
import gc | |
import requests | |
import time | |
import shutil | |
import tarfile | |
import tensorflow as tf | |
# Configuración de GPU | |
print("Dispositivos GPU disponibles:", tf.config.list_physical_devices('GPU')) | |
# Configurar memoria GPU | |
gpus = tf.config.list_physical_devices('GPU') | |
if gpus: | |
try: | |
# Permitir crecimiento de memoria | |
for gpu in gpus: | |
tf.config.experimental.set_memory_growth(gpu, True) | |
print("✅ GPU configurada correctamente") | |
# Configurar para usar solo GPU | |
tf.config.set_visible_devices(gpus[0], 'GPU') | |
print(f"✅ Usando GPU: {gpus[0]}") | |
except RuntimeError as e: | |
print(f"⚠️ Error configurando GPU: {e}") | |
else: | |
print("⚠️ No se detectó GPU, usando CPU") | |
# Configurar para usar mixed precision | |
tf.keras.mixed_precision.set_global_policy('mixed_float16') | |
# 🔁 Limpiar almacenamiento temporal si existe | |
def clean_temp_dirs(): | |
print("🧹 Limpiando carpetas temporales...") | |
for folder in ["embeddings", "batches"]: | |
path = Path(folder) | |
if path.exists() and path.is_dir(): | |
shutil.rmtree(path) | |
print(f"✅ Carpeta eliminada: {folder}") | |
path.mkdir(exist_ok=True) | |
clean_temp_dirs() | |
# 📁 Parámetros | |
DATASET_ID = "Segizu/facial-recognition-preview" | |
EMBEDDINGS_SUBFOLDER = "embeddings" | |
LOCAL_EMB_DIR = Path("embeddings") | |
LOCAL_EMB_DIR.mkdir(exist_ok=True) | |
HF_TOKEN = os.getenv("HF_TOKEN") | |
headers = {"Authorization": f"Bearer {HF_TOKEN}"} if HF_TOKEN else {} | |
# 💾 Configuración | |
MAX_TEMP_STORAGE_GB = 40 | |
UPLOAD_EVERY = 50 | |
def get_folder_size(path): | |
total = 0 | |
for dirpath, _, filenames in os.walk(path): | |
for f in filenames: | |
fp = os.path.join(dirpath, f) | |
total += os.path.getsize(fp) | |
return total / (1024 ** 3) | |
def preprocess_image(img: Image.Image) -> np.ndarray: | |
# Convertir a RGB si no lo es | |
if img.mode != 'RGB': | |
img = img.convert('RGB') | |
# Obtener la orientación EXIF si existe | |
try: | |
exif = img._getexif() | |
if exif is not None: | |
orientation = exif.get(274) # 274 es el tag de orientación en EXIF | |
if orientation is not None: | |
# Rotar la imagen según la orientación EXIF | |
if orientation == 3: | |
img = img.rotate(180, expand=True) | |
elif orientation == 6: | |
img = img.rotate(270, expand=True) | |
elif orientation == 8: | |
img = img.rotate(90, expand=True) | |
except: | |
pass # Si no hay EXIF o hay error, continuamos con la imagen original | |
# Intentar detectar la orientación del rostro | |
try: | |
# Convertir a array numpy para DeepFace | |
img_array = np.array(img) | |
# Detectar rostros con GPU | |
face_objs = DeepFace.extract_faces( | |
img_path=img_array, | |
target_size=(160, 160), | |
detector_backend='retinaface', | |
enforce_detection=False | |
) | |
if face_objs and len(face_objs) > 0: | |
# Si se detecta un rostro, usar la imagen detectada | |
img_array = face_objs[0]['face'] | |
return img_array | |
except: | |
pass # Si falla la detección, continuamos con el procesamiento normal | |
# Si no se detectó rostro o falló la detección, redimensionar la imagen original | |
img_resized = img.resize((160, 160), Image.Resampling.LANCZOS) | |
return np.array(img_resized) | |
# ✅ Cargar CSV desde el dataset | |
dataset = load_dataset( | |
"csv", | |
data_files="metadata.csv", | |
split="train", | |
column_names=["image"], | |
header=0 | |
) | |
def build_database(): | |
print(f"📊 Uso actual de almacenamiento temporal INICIO: {get_folder_size('.'):.2f} GB") | |
print("🔄 Generando embeddings...") | |
batch_size = 10 | |
archive_batch_size = 50 | |
batch_files = [] | |
batch_index = 0 | |
ARCHIVE_DIR = Path("batches") | |
ARCHIVE_DIR.mkdir(exist_ok=True) | |
for i in range(0, len(dataset), batch_size): | |
batch = dataset[i:i + batch_size] | |
print(f"📦 Lote {i // batch_size + 1}/{(len(dataset) + batch_size - 1) // batch_size}") | |
for j in range(len(batch["image"])): | |
image_url = batch["image"][j] | |
if not isinstance(image_url, str) or not image_url.startswith("http") or image_url.strip().lower() == "image": | |
print(f"⚠️ Saltando {i + j} - URL inválida: {image_url}") | |
continue | |
name = f"image_{i + j}" | |
filename = LOCAL_EMB_DIR / f"{name}.pkl" | |
# Verificar si ya fue subido | |
try: | |
hf_hub_download( | |
repo_id=DATASET_ID, | |
repo_type="dataset", | |
filename=f"{EMBEDDINGS_SUBFOLDER}/batch_{batch_index:03}.tar.gz", | |
token=HF_TOKEN | |
) | |
print(f"⏩ Ya existe en remoto: {name}.pkl") | |
continue | |
except: | |
pass | |
try: | |
response = requests.get(image_url, headers=headers, timeout=10) | |
response.raise_for_status() | |
img = Image.open(BytesIO(response.content)).convert("RGB") | |
img_processed = preprocess_image(img) | |
embedding = DeepFace.represent( | |
img_path=img_processed, | |
model_name="Facenet", | |
enforce_detection=False | |
)[0]["embedding"] | |
with open(filename, "wb") as f: | |
pickle.dump({"name": name, "img": img, "embedding": embedding}, f) | |
batch_files.append(filename) | |
del img_processed | |
gc.collect() | |
if len(batch_files) >= archive_batch_size or get_folder_size(".") > MAX_TEMP_STORAGE_GB: | |
archive_path = ARCHIVE_DIR / f"batch_{batch_index:03}.tar.gz" | |
with tarfile.open(archive_path, "w:gz") as tar: | |
for file in batch_files: | |
tar.add(file, arcname=file.name) | |
print(f"📦 Empaquetado: {archive_path}") | |
upload_file( | |
path_or_fileobj=str(archive_path), | |
path_in_repo=f"{EMBEDDINGS_SUBFOLDER}/{archive_path.name}", | |
repo_id=DATASET_ID, | |
repo_type="dataset", | |
token=HF_TOKEN | |
) | |
print(f"✅ Subido: {archive_path.name}") | |
for f in batch_files: | |
f.unlink() | |
archive_path.unlink() | |
print("🧹 Limpieza completada tras subida") | |
batch_files = [] | |
batch_index += 1 | |
time.sleep(2) | |
print(f"📊 Uso actual FINAL: {get_folder_size('.'):.2f} GB") | |
except Exception as e: | |
print(f"❌ Error en {name}: {e}") | |
continue | |
if batch_files: | |
archive_path = ARCHIVE_DIR / f"batch_{batch_index:03}.tar.gz" | |
with tarfile.open(archive_path, "w:gz") as tar: | |
for file in batch_files: | |
tar.add(file, arcname=file.name) | |
print(f"📦 Empaquetado final: {archive_path}") | |
upload_file( | |
path_or_fileobj=str(archive_path), | |
path_in_repo=f"{EMBEDDINGS_SUBFOLDER}/{archive_path.name}", | |
repo_id=DATASET_ID, | |
repo_type="dataset", | |
token=HF_TOKEN | |
) | |
for f in batch_files: | |
f.unlink() | |
archive_path.unlink() | |
print("✅ Subida y limpieza final") | |
# 🔍 Buscar similitudes | |
def find_similar_faces(uploaded_image: Image.Image): | |
if uploaded_image is None: | |
return [], "⚠ Por favor, sube una imagen primero" | |
try: | |
print("🔄 Procesando imagen de entrada...") | |
# Convertir a RGB si no lo es | |
if uploaded_image.mode != 'RGB': | |
uploaded_image = uploaded_image.convert('RGB') | |
# Mostrar dimensiones de la imagen | |
print(f"📐 Dimensiones de la imagen: {uploaded_image.size}") | |
img_processed = preprocess_image(uploaded_image) | |
print("✅ Imagen preprocesada correctamente") | |
# Intentar primero con enforce_detection=True | |
try: | |
query_embedding = DeepFace.represent( | |
img_path=img_processed, | |
model_name="Facenet", | |
enforce_detection=True, | |
detector_backend='retinaface' | |
)[0]["embedding"] | |
print("✅ Rostro detectado con enforce_detection=True") | |
except Exception as e: | |
print(f"⚠ No se pudo detectar rostro con enforce_detection=True, intentando con False: {str(e)}") | |
# Si falla, intentar con enforce_detection=False | |
query_embedding = DeepFace.represent( | |
img_path=img_processed, | |
model_name="Facenet", | |
enforce_detection=False, | |
detector_backend='retinaface' | |
)[0]["embedding"] | |
print("✅ Embedding generado con enforce_detection=False") | |
del img_processed | |
gc.collect() | |
except Exception as e: | |
print(f"❌ Error en procesamiento de imagen: {str(e)}") | |
return [], f"⚠ Error procesando imagen: {str(e)}" | |
similarities = [] | |
print("🔍 Buscando similitudes en la base de datos...") | |
try: | |
embedding_files = [ | |
f for f in list_repo_files(DATASET_ID, repo_type="dataset", token=HF_TOKEN) | |
if f.startswith(f"{EMBEDDINGS_SUBFOLDER}/") and f.endswith(".tar.gz") | |
] | |
print(f"📁 Encontrados {len(embedding_files)} archivos de embeddings") | |
except Exception as e: | |
print(f"❌ Error obteniendo archivos: {str(e)}") | |
return [], f"⚠ Error obteniendo archivos: {str(e)}" | |
# Procesar en lotes para mejor rendimiento | |
batch_size = 10 | |
for i in range(0, len(embedding_files), batch_size): | |
batch_files = embedding_files[i:i + batch_size] | |
print(f"📦 Procesando lote {i//batch_size + 1}/{(len(embedding_files) + batch_size - 1)//batch_size}") | |
for file_path in batch_files: | |
try: | |
file_bytes = requests.get( | |
f"https://huggingface.co/datasets/{DATASET_ID}/resolve/main/{file_path}", | |
headers=headers, | |
timeout=30 | |
).content | |
# Crear un archivo temporal para el tar.gz | |
temp_archive = Path("temp_archive.tar.gz") | |
with open(temp_archive, "wb") as f: | |
f.write(file_bytes) | |
# Extraer el contenido | |
with tarfile.open(temp_archive, "r:gz") as tar: | |
tar.extractall(path="temp_extract") | |
# Procesar cada archivo .pkl en el tar | |
for pkl_file in Path("temp_extract").glob("*.pkl"): | |
with open(pkl_file, "rb") as f: | |
record = pickle.load(f) | |
name = record["name"] | |
img = record["img"] | |
emb = record["embedding"] | |
dist = np.linalg.norm(np.array(query_embedding) - np.array(emb)) | |
sim_score = 1 / (1 + dist) | |
similarities.append((sim_score, name, np.array(img))) | |
# Limpiar archivos temporales | |
shutil.rmtree("temp_extract") | |
temp_archive.unlink() | |
except Exception as e: | |
print(f"⚠ Error procesando {file_path}: {e}") | |
continue | |
if not similarities: | |
return [], "⚠ No se encontraron similitudes en la base de datos" | |
print(f"✅ Encontradas {len(similarities)} similitudes") | |
similarities.sort(reverse=True) | |
top = similarities[:5] | |
gallery = [(img, f"{name} - Similitud: {sim:.2f}") for sim, name, img in top] | |
summary = "\n".join([f"{name} - Similitud: {sim:.2f}" for sim, name, _ in top]) | |
return gallery, summary | |
# 🎛️ Interfaz Gradio | |
with gr.Blocks() as demo: | |
gr.Markdown("## 🔍 Reconocimiento facial con DeepFace + ZeroGPU") | |
with gr.Row(): | |
image_input = gr.Image(label="📤 Sube una imagen", type="pil") | |
find_btn = gr.Button("🔎 Buscar similares") | |
gallery = gr.Gallery(label="📸 Rostros similares") | |
summary = gr.Textbox(label="🧠 Detalle", lines=6) | |
find_btn.click(fn=find_similar_faces, inputs=image_input, outputs=[gallery, summary]) | |
with gr.Row(): | |
build_btn = gr.Button("⚙️ Construir base de embeddings (usa GPU)") | |
build_btn.click(fn=build_database, inputs=[], outputs=[]) | |
demo.launch(share=True) | |