Spaces:
Runtime error
Runtime error
import os | |
import numpy as np | |
from PIL import Image | |
import gradio as gr | |
from deepface import DeepFace | |
from datasets import load_dataset | |
import pickle | |
from io import BytesIO | |
from huggingface_hub import upload_file, hf_hub_download, list_repo_files | |
from pathlib import Path | |
import gc | |
import requests | |
import time | |
import shutil | |
import tarfile | |
import tensorflow as tf | |
# ๐ Limpiar almacenamiento temporal si existe | |
def clean_temp_dirs(): | |
print("๐งน Limpiando carpetas temporales...") | |
for folder in ["embeddings", "batches"]: | |
path = Path(folder) | |
if path.exists() and path.is_dir(): | |
shutil.rmtree(path) | |
print(f"โ Carpeta eliminada: {folder}") | |
path.mkdir(exist_ok=True) | |
clean_temp_dirs() | |
# ๐ Parรกmetros | |
DATASET_ID = "Segizu/facial-recognition" | |
EMBEDDINGS_SUBFOLDER = "embeddings" | |
LOCAL_EMB_DIR = Path("embeddings") | |
LOCAL_EMB_DIR.mkdir(exist_ok=True) | |
HF_TOKEN = os.getenv("HF_TOKEN") | |
headers = {"Authorization": f"Bearer {HF_TOKEN}"} if HF_TOKEN else {} | |
# ๐พ Configuraciรณn | |
MAX_TEMP_STORAGE_GB = 40 | |
UPLOAD_EVERY = 50 | |
def get_folder_size(path): | |
total = 0 | |
for dirpath, _, filenames in os.walk(path): | |
for f in filenames: | |
fp = os.path.join(dirpath, f) | |
total += os.path.getsize(fp) | |
return total / (1024 ** 3) | |
def preprocess_image(img: Image.Image) -> np.ndarray: | |
img_rgb = img.convert("RGB") | |
img_resized = img_rgb.resize((160, 160), Image.Resampling.LANCZOS) | |
return np.array(img_resized) | |
# โ Cargar CSV desde el dataset | |
dataset = load_dataset( | |
"csv", | |
data_files="metadata.csv", | |
split="train", | |
column_names=["image"], | |
header=0 | |
) | |
def build_database(): | |
print(f"๐ Uso actual de almacenamiento temporal INICIO: {get_folder_size('.'):.2f} GB") | |
print("๐ Generando embeddings...") | |
batch_size = 10 | |
archive_batch_size = 50 | |
batch_files = [] | |
batch_index = 0 | |
ARCHIVE_DIR = Path("batches") | |
ARCHIVE_DIR.mkdir(exist_ok=True) | |
for i in range(0, len(dataset), batch_size): | |
batch = dataset[i:i + batch_size] | |
print(f"๐ฆ Lote {i // batch_size + 1}/{(len(dataset) + batch_size - 1) // batch_size}") | |
for j in range(len(batch["image"])): | |
image_url = batch["image"][j] | |
if not isinstance(image_url, str) or not image_url.startswith("http") or image_url.strip().lower() == "image": | |
print(f"โ ๏ธ Saltando {i + j} - URL invรกlida: {image_url}") | |
continue | |
name = f"image_{i + j}" | |
filename = LOCAL_EMB_DIR / f"{name}.pkl" | |
# Verificar si ya fue subido | |
try: | |
hf_hub_download( | |
repo_id=DATASET_ID, | |
repo_type="dataset", | |
filename=f"{EMBEDDINGS_SUBFOLDER}/batch_{batch_index:03}.tar.gz", | |
token=HF_TOKEN | |
) | |
print(f"โฉ Ya existe en remoto: {name}.pkl") | |
continue | |
except: | |
pass | |
try: | |
response = requests.get(image_url, headers=headers, timeout=10) | |
response.raise_for_status() | |
img = Image.open(BytesIO(response.content)).convert("RGB") | |
img_processed = preprocess_image(img) | |
embedding = DeepFace.represent( | |
img_path=img_processed, | |
model_name="Facenet", | |
enforce_detection=False | |
)[0]["embedding"] | |
with open(filename, "wb") as f: | |
pickle.dump({"name": name, "img": img, "embedding": embedding}, f) | |
batch_files.append(filename) | |
del img_processed | |
gc.collect() | |
if len(batch_files) >= archive_batch_size or get_folder_size(".") > MAX_TEMP_STORAGE_GB: | |
archive_path = ARCHIVE_DIR / f"batch_{batch_index:03}.tar.gz" | |
with tarfile.open(archive_path, "w:gz") as tar: | |
for file in batch_files: | |
tar.add(file, arcname=file.name) | |
print(f"๐ฆ Empaquetado: {archive_path}") | |
upload_file( | |
path_or_fileobj=str(archive_path), | |
path_in_repo=f"{EMBEDDINGS_SUBFOLDER}/{archive_path.name}", | |
repo_id=DATASET_ID, | |
repo_type="dataset", | |
token=HF_TOKEN | |
) | |
print(f"โ Subido: {archive_path.name}") | |
for f in batch_files: | |
f.unlink() | |
archive_path.unlink() | |
print("๐งน Limpieza completada tras subida") | |
batch_files = [] | |
batch_index += 1 | |
time.sleep(2) | |
print(f"๐ Uso actual FINAL: {get_folder_size('.'):.2f} GB") | |
except Exception as e: | |
print(f"โ Error en {name}: {e}") | |
continue | |
if batch_files: | |
archive_path = ARCHIVE_DIR / f"batch_{batch_index:03}.tar.gz" | |
with tarfile.open(archive_path, "w:gz") as tar: | |
for file in batch_files: | |
tar.add(file, arcname=file.name) | |
print(f"๐ฆ Empaquetado final: {archive_path}") | |
upload_file( | |
path_or_fileobj=str(archive_path), | |
path_in_repo=f"{EMBEDDINGS_SUBFOLDER}/{archive_path.name}", | |
repo_id=DATASET_ID, | |
repo_type="dataset", | |
token=HF_TOKEN | |
) | |
for f in batch_files: | |
f.unlink() | |
archive_path.unlink() | |
print("โ Subida y limpieza final") | |
# ๐ Buscar similitudes | |
def find_similar_faces(uploaded_image: Image.Image): | |
try: | |
img_processed = preprocess_image(uploaded_image) | |
query_embedding = DeepFace.represent( | |
img_path=img_processed, | |
model_name="Facenet", | |
enforce_detection=False | |
)[0]["embedding"] | |
del img_processed | |
gc.collect() | |
except Exception as e: | |
return [], f"โ Error procesando imagen: {str(e)}" | |
similarities = [] | |
try: | |
embedding_files = [ | |
f for f in list_repo_files(DATASET_ID, repo_type="dataset", token=HF_TOKEN) | |
if f.startswith(f"{EMBEDDINGS_SUBFOLDER}/") and f.endswith(".pkl") | |
] | |
except Exception as e: | |
return [], f"โ Error obteniendo archivos: {str(e)}" | |
for file_path in embedding_files: | |
try: | |
file_bytes = requests.get( | |
f"https://huggingface.co/datasets/{DATASET_ID}/resolve/main/{file_path}", | |
headers=headers, | |
timeout=10 | |
).content | |
record = pickle.loads(file_bytes) | |
name = record["name"] | |
img = record["img"] | |
emb = record["embedding"] | |
dist = np.linalg.norm(np.array(query_embedding) - np.array(emb)) | |
sim_score = 1 / (1 + dist) | |
similarities.append((sim_score, name, np.array(img))) | |
except Exception as e: | |
print(f"โ Error con {file_path}: {e}") | |
continue | |
similarities.sort(reverse=True) | |
top = similarities[:5] | |
gallery = [(img, f"{name} - Similitud: {sim:.2f}") for sim, name, img in top] | |
summary = "\n".join([f"{name} - Similitud: {sim:.2f}" for sim, name, _ in top]) | |
return gallery, summary | |
# ๐๏ธ Interfaz Gradio | |
with gr.Blocks() as demo: | |
gr.Markdown("## ๐ Reconocimiento facial con DeepFace + ZeroGPU") | |
with gr.Row(): | |
image_input = gr.Image(label="๐ค Sube una imagen", type="pil") | |
find_btn = gr.Button("๐ Buscar similares") | |
gallery = gr.Gallery(label="๐ธ Rostros similares") | |
summary = gr.Textbox(label="๐ง Detalle", lines=6) | |
find_btn.click(fn=find_similar_faces, inputs=image_input, outputs=[gallery, summary]) | |
with gr.Row(): | |
build_btn = gr.Button("โ๏ธ Construir base de embeddings (usa GPU)") | |
build_btn.click(fn=build_database, inputs=[], outputs=[]) | |
demo.launch() | |