Segizu's picture
ZeroGPU
ea437dd
import os
import numpy as np
from PIL import Image
import gradio as gr
from deepface import DeepFace
from datasets import load_dataset
import pickle
from io import BytesIO
from huggingface_hub import upload_file, hf_hub_download, list_repo_files
from pathlib import Path
import gc
import requests
import time
import shutil
import tarfile
import tensorflow as tf
# ๐Ÿ” Limpiar almacenamiento temporal si existe
def clean_temp_dirs():
print("๐Ÿงน Limpiando carpetas temporales...")
for folder in ["embeddings", "batches"]:
path = Path(folder)
if path.exists() and path.is_dir():
shutil.rmtree(path)
print(f"โœ… Carpeta eliminada: {folder}")
path.mkdir(exist_ok=True)
clean_temp_dirs()
# ๐Ÿ“ Parรกmetros
DATASET_ID = "Segizu/facial-recognition"
EMBEDDINGS_SUBFOLDER = "embeddings"
LOCAL_EMB_DIR = Path("embeddings")
LOCAL_EMB_DIR.mkdir(exist_ok=True)
HF_TOKEN = os.getenv("HF_TOKEN")
headers = {"Authorization": f"Bearer {HF_TOKEN}"} if HF_TOKEN else {}
# ๐Ÿ’พ Configuraciรณn
MAX_TEMP_STORAGE_GB = 40
UPLOAD_EVERY = 50
def get_folder_size(path):
total = 0
for dirpath, _, filenames in os.walk(path):
for f in filenames:
fp = os.path.join(dirpath, f)
total += os.path.getsize(fp)
return total / (1024 ** 3)
def preprocess_image(img: Image.Image) -> np.ndarray:
img_rgb = img.convert("RGB")
img_resized = img_rgb.resize((160, 160), Image.Resampling.LANCZOS)
return np.array(img_resized)
# โœ… Cargar CSV desde el dataset
dataset = load_dataset(
"csv",
data_files="metadata.csv",
split="train",
column_names=["image"],
header=0
)
@GPU
def build_database():
print(f"๐Ÿ“Š Uso actual de almacenamiento temporal INICIO: {get_folder_size('.'):.2f} GB")
print("๐Ÿ”„ Generando embeddings...")
batch_size = 10
archive_batch_size = 50
batch_files = []
batch_index = 0
ARCHIVE_DIR = Path("batches")
ARCHIVE_DIR.mkdir(exist_ok=True)
for i in range(0, len(dataset), batch_size):
batch = dataset[i:i + batch_size]
print(f"๐Ÿ“ฆ Lote {i // batch_size + 1}/{(len(dataset) + batch_size - 1) // batch_size}")
for j in range(len(batch["image"])):
image_url = batch["image"][j]
if not isinstance(image_url, str) or not image_url.startswith("http") or image_url.strip().lower() == "image":
print(f"โš ๏ธ Saltando {i + j} - URL invรกlida: {image_url}")
continue
name = f"image_{i + j}"
filename = LOCAL_EMB_DIR / f"{name}.pkl"
# Verificar si ya fue subido
try:
hf_hub_download(
repo_id=DATASET_ID,
repo_type="dataset",
filename=f"{EMBEDDINGS_SUBFOLDER}/batch_{batch_index:03}.tar.gz",
token=HF_TOKEN
)
print(f"โฉ Ya existe en remoto: {name}.pkl")
continue
except:
pass
try:
response = requests.get(image_url, headers=headers, timeout=10)
response.raise_for_status()
img = Image.open(BytesIO(response.content)).convert("RGB")
img_processed = preprocess_image(img)
embedding = DeepFace.represent(
img_path=img_processed,
model_name="Facenet",
enforce_detection=False
)[0]["embedding"]
with open(filename, "wb") as f:
pickle.dump({"name": name, "img": img, "embedding": embedding}, f)
batch_files.append(filename)
del img_processed
gc.collect()
if len(batch_files) >= archive_batch_size or get_folder_size(".") > MAX_TEMP_STORAGE_GB:
archive_path = ARCHIVE_DIR / f"batch_{batch_index:03}.tar.gz"
with tarfile.open(archive_path, "w:gz") as tar:
for file in batch_files:
tar.add(file, arcname=file.name)
print(f"๐Ÿ“ฆ Empaquetado: {archive_path}")
upload_file(
path_or_fileobj=str(archive_path),
path_in_repo=f"{EMBEDDINGS_SUBFOLDER}/{archive_path.name}",
repo_id=DATASET_ID,
repo_type="dataset",
token=HF_TOKEN
)
print(f"โœ… Subido: {archive_path.name}")
for f in batch_files:
f.unlink()
archive_path.unlink()
print("๐Ÿงน Limpieza completada tras subida")
batch_files = []
batch_index += 1
time.sleep(2)
print(f"๐Ÿ“Š Uso actual FINAL: {get_folder_size('.'):.2f} GB")
except Exception as e:
print(f"โŒ Error en {name}: {e}")
continue
if batch_files:
archive_path = ARCHIVE_DIR / f"batch_{batch_index:03}.tar.gz"
with tarfile.open(archive_path, "w:gz") as tar:
for file in batch_files:
tar.add(file, arcname=file.name)
print(f"๐Ÿ“ฆ Empaquetado final: {archive_path}")
upload_file(
path_or_fileobj=str(archive_path),
path_in_repo=f"{EMBEDDINGS_SUBFOLDER}/{archive_path.name}",
repo_id=DATASET_ID,
repo_type="dataset",
token=HF_TOKEN
)
for f in batch_files:
f.unlink()
archive_path.unlink()
print("โœ… Subida y limpieza final")
# ๐Ÿ” Buscar similitudes
def find_similar_faces(uploaded_image: Image.Image):
try:
img_processed = preprocess_image(uploaded_image)
query_embedding = DeepFace.represent(
img_path=img_processed,
model_name="Facenet",
enforce_detection=False
)[0]["embedding"]
del img_processed
gc.collect()
except Exception as e:
return [], f"โš  Error procesando imagen: {str(e)}"
similarities = []
try:
embedding_files = [
f for f in list_repo_files(DATASET_ID, repo_type="dataset", token=HF_TOKEN)
if f.startswith(f"{EMBEDDINGS_SUBFOLDER}/") and f.endswith(".pkl")
]
except Exception as e:
return [], f"โš  Error obteniendo archivos: {str(e)}"
for file_path in embedding_files:
try:
file_bytes = requests.get(
f"https://huggingface.co/datasets/{DATASET_ID}/resolve/main/{file_path}",
headers=headers,
timeout=10
).content
record = pickle.loads(file_bytes)
name = record["name"]
img = record["img"]
emb = record["embedding"]
dist = np.linalg.norm(np.array(query_embedding) - np.array(emb))
sim_score = 1 / (1 + dist)
similarities.append((sim_score, name, np.array(img)))
except Exception as e:
print(f"โš  Error con {file_path}: {e}")
continue
similarities.sort(reverse=True)
top = similarities[:5]
gallery = [(img, f"{name} - Similitud: {sim:.2f}") for sim, name, img in top]
summary = "\n".join([f"{name} - Similitud: {sim:.2f}" for sim, name, _ in top])
return gallery, summary
# ๐ŸŽ›๏ธ Interfaz Gradio
with gr.Blocks() as demo:
gr.Markdown("## ๐Ÿ” Reconocimiento facial con DeepFace + ZeroGPU")
with gr.Row():
image_input = gr.Image(label="๐Ÿ“ค Sube una imagen", type="pil")
find_btn = gr.Button("๐Ÿ”Ž Buscar similares")
gallery = gr.Gallery(label="๐Ÿ“ธ Rostros similares")
summary = gr.Textbox(label="๐Ÿง  Detalle", lines=6)
find_btn.click(fn=find_similar_faces, inputs=image_input, outputs=[gallery, summary])
with gr.Row():
build_btn = gr.Button("โš™๏ธ Construir base de embeddings (usa GPU)")
build_btn.click(fn=build_database, inputs=[], outputs=[])
demo.launch()