import os import numpy as np from PIL import Image import gradio as gr from deepface import DeepFace from datasets import load_dataset import pickle from io import BytesIO from huggingface_hub import upload_file, hf_hub_download, list_repo_files from pathlib import Path import gc import requests import time import shutil import tarfile import tensorflow as tf # 🔁 Limpiar almacenamiento temporal si existe def clean_temp_dirs(): print("🧹 Limpiando carpetas temporales...") for folder in ["embeddings", "batches"]: path = Path(folder) if path.exists() and path.is_dir(): shutil.rmtree(path) print(f"✅ Carpeta eliminada: {folder}") path.mkdir(exist_ok=True) clean_temp_dirs() # 📁 Parámetros DATASET_ID = "Segizu/facial-recognition" EMBEDDINGS_SUBFOLDER = "embeddings" LOCAL_EMB_DIR = Path("embeddings") LOCAL_EMB_DIR.mkdir(exist_ok=True) HF_TOKEN = os.getenv("HF_TOKEN") headers = {"Authorization": f"Bearer {HF_TOKEN}"} if HF_TOKEN else {} # 💾 Configuración MAX_TEMP_STORAGE_GB = 40 UPLOAD_EVERY = 50 def get_folder_size(path): total = 0 for dirpath, _, filenames in os.walk(path): for f in filenames: fp = os.path.join(dirpath, f) total += os.path.getsize(fp) return total / (1024 ** 3) def preprocess_image(img: Image.Image) -> np.ndarray: img_rgb = img.convert("RGB") img_resized = img_rgb.resize((160, 160), Image.Resampling.LANCZOS) return np.array(img_resized) # ✅ Cargar CSV desde el dataset dataset = load_dataset( "csv", data_files="metadata.csv", split="train", column_names=["image"], header=0 ) @GPU def build_database(): print(f"📊 Uso actual de almacenamiento temporal INICIO: {get_folder_size('.'):.2f} GB") print("🔄 Generando embeddings...") batch_size = 10 archive_batch_size = 50 batch_files = [] batch_index = 0 ARCHIVE_DIR = Path("batches") ARCHIVE_DIR.mkdir(exist_ok=True) for i in range(0, len(dataset), batch_size): batch = dataset[i:i + batch_size] print(f"📦 Lote {i // batch_size + 1}/{(len(dataset) + batch_size - 1) // batch_size}") for j in range(len(batch["image"])): image_url = batch["image"][j] if not isinstance(image_url, str) or not image_url.startswith("http") or image_url.strip().lower() == "image": print(f"⚠️ Saltando {i + j} - URL inválida: {image_url}") continue name = f"image_{i + j}" filename = LOCAL_EMB_DIR / f"{name}.pkl" # Verificar si ya fue subido try: hf_hub_download( repo_id=DATASET_ID, repo_type="dataset", filename=f"{EMBEDDINGS_SUBFOLDER}/batch_{batch_index:03}.tar.gz", token=HF_TOKEN ) print(f"⏩ Ya existe en remoto: {name}.pkl") continue except: pass try: response = requests.get(image_url, headers=headers, timeout=10) response.raise_for_status() img = Image.open(BytesIO(response.content)).convert("RGB") img_processed = preprocess_image(img) embedding = DeepFace.represent( img_path=img_processed, model_name="Facenet", enforce_detection=False )[0]["embedding"] with open(filename, "wb") as f: pickle.dump({"name": name, "img": img, "embedding": embedding}, f) batch_files.append(filename) del img_processed gc.collect() if len(batch_files) >= archive_batch_size or get_folder_size(".") > MAX_TEMP_STORAGE_GB: archive_path = ARCHIVE_DIR / f"batch_{batch_index:03}.tar.gz" with tarfile.open(archive_path, "w:gz") as tar: for file in batch_files: tar.add(file, arcname=file.name) print(f"📦 Empaquetado: {archive_path}") upload_file( path_or_fileobj=str(archive_path), path_in_repo=f"{EMBEDDINGS_SUBFOLDER}/{archive_path.name}", repo_id=DATASET_ID, repo_type="dataset", token=HF_TOKEN ) print(f"✅ Subido: {archive_path.name}") for f in batch_files: f.unlink() archive_path.unlink() print("🧹 Limpieza completada tras subida") batch_files = [] batch_index += 1 time.sleep(2) print(f"📊 Uso actual FINAL: {get_folder_size('.'):.2f} GB") except Exception as e: print(f"❌ Error en {name}: {e}") continue if batch_files: archive_path = ARCHIVE_DIR / f"batch_{batch_index:03}.tar.gz" with tarfile.open(archive_path, "w:gz") as tar: for file in batch_files: tar.add(file, arcname=file.name) print(f"📦 Empaquetado final: {archive_path}") upload_file( path_or_fileobj=str(archive_path), path_in_repo=f"{EMBEDDINGS_SUBFOLDER}/{archive_path.name}", repo_id=DATASET_ID, repo_type="dataset", token=HF_TOKEN ) for f in batch_files: f.unlink() archive_path.unlink() print("✅ Subida y limpieza final") # 🔍 Buscar similitudes def find_similar_faces(uploaded_image: Image.Image): try: img_processed = preprocess_image(uploaded_image) query_embedding = DeepFace.represent( img_path=img_processed, model_name="Facenet", enforce_detection=False )[0]["embedding"] del img_processed gc.collect() except Exception as e: return [], f"⚠ Error procesando imagen: {str(e)}" similarities = [] try: embedding_files = [ f for f in list_repo_files(DATASET_ID, repo_type="dataset", token=HF_TOKEN) if f.startswith(f"{EMBEDDINGS_SUBFOLDER}/") and f.endswith(".pkl") ] except Exception as e: return [], f"⚠ Error obteniendo archivos: {str(e)}" for file_path in embedding_files: try: file_bytes = requests.get( f"https://huggingface.co/datasets/{DATASET_ID}/resolve/main/{file_path}", headers=headers, timeout=10 ).content record = pickle.loads(file_bytes) name = record["name"] img = record["img"] emb = record["embedding"] dist = np.linalg.norm(np.array(query_embedding) - np.array(emb)) sim_score = 1 / (1 + dist) similarities.append((sim_score, name, np.array(img))) except Exception as e: print(f"⚠ Error con {file_path}: {e}") continue similarities.sort(reverse=True) top = similarities[:5] gallery = [(img, f"{name} - Similitud: {sim:.2f}") for sim, name, img in top] summary = "\n".join([f"{name} - Similitud: {sim:.2f}" for sim, name, _ in top]) return gallery, summary # 🎛️ Interfaz Gradio with gr.Blocks() as demo: gr.Markdown("## 🔍 Reconocimiento facial con DeepFace + ZeroGPU") with gr.Row(): image_input = gr.Image(label="📤 Sube una imagen", type="pil") find_btn = gr.Button("🔎 Buscar similares") gallery = gr.Gallery(label="📸 Rostros similares") summary = gr.Textbox(label="🧠 Detalle", lines=6) find_btn.click(fn=find_similar_faces, inputs=image_input, outputs=[gallery, summary]) with gr.Row(): build_btn = gr.Button("⚙️ Construir base de embeddings (usa GPU)") build_btn.click(fn=build_database, inputs=[], outputs=[]) demo.launch()