Segizu's picture
funcionando con DEeepface
d9e1976
raw
history blame
13.2 kB
import os
import numpy as np
from PIL import Image
import gradio as gr
from deepface import DeepFace
from datasets import load_dataset
import pickle
from io import BytesIO
from huggingface_hub import upload_file, hf_hub_download, list_repo_files
from pathlib import Path
import gc
import requests
import time
import shutil
import tarfile
import tensorflow as tf
# Configuración de GPU
print("Dispositivos GPU disponibles:", tf.config.list_physical_devices('GPU'))
# Configurar memoria GPU
gpus = tf.config.list_physical_devices('GPU')
if gpus:
try:
# Permitir crecimiento de memoria
for gpu in gpus:
tf.config.experimental.set_memory_growth(gpu, True)
print("✅ GPU configurada correctamente")
# Configurar para usar solo GPU
tf.config.set_visible_devices(gpus[0], 'GPU')
print(f"✅ Usando GPU: {gpus[0]}")
except RuntimeError as e:
print(f"⚠️ Error configurando GPU: {e}")
else:
print("⚠️ No se detectó GPU, usando CPU")
# Configurar para usar mixed precision
tf.keras.mixed_precision.set_global_policy('mixed_float16')
# 🔁 Limpiar almacenamiento temporal si existe
def clean_temp_dirs():
print("🧹 Limpiando carpetas temporales...")
for folder in ["embeddings", "batches"]:
path = Path(folder)
if path.exists() and path.is_dir():
shutil.rmtree(path)
print(f"✅ Carpeta eliminada: {folder}")
path.mkdir(exist_ok=True)
clean_temp_dirs()
# 📁 Parámetros
DATASET_ID = "Segizu/facial-recognition-preview"
EMBEDDINGS_SUBFOLDER = "embeddings"
LOCAL_EMB_DIR = Path("embeddings")
LOCAL_EMB_DIR.mkdir(exist_ok=True)
HF_TOKEN = os.getenv("HF_TOKEN")
headers = {"Authorization": f"Bearer {HF_TOKEN}"} if HF_TOKEN else {}
# 💾 Configuración
MAX_TEMP_STORAGE_GB = 40
UPLOAD_EVERY = 50
def get_folder_size(path):
total = 0
for dirpath, _, filenames in os.walk(path):
for f in filenames:
fp = os.path.join(dirpath, f)
total += os.path.getsize(fp)
return total / (1024 ** 3)
def preprocess_image(img: Image.Image) -> np.ndarray:
# Convertir a RGB si no lo es
if img.mode != 'RGB':
img = img.convert('RGB')
# Obtener la orientación EXIF si existe
try:
exif = img._getexif()
if exif is not None:
orientation = exif.get(274) # 274 es el tag de orientación en EXIF
if orientation is not None:
# Rotar la imagen según la orientación EXIF
if orientation == 3:
img = img.rotate(180, expand=True)
elif orientation == 6:
img = img.rotate(270, expand=True)
elif orientation == 8:
img = img.rotate(90, expand=True)
except:
pass # Si no hay EXIF o hay error, continuamos con la imagen original
# Intentar detectar la orientación del rostro
try:
# Convertir a array numpy para DeepFace
img_array = np.array(img)
# Detectar rostros con GPU
face_objs = DeepFace.extract_faces(
img_path=img_array,
target_size=(160, 160),
detector_backend='retinaface',
enforce_detection=False
)
if face_objs and len(face_objs) > 0:
# Si se detecta un rostro, usar la imagen detectada
img_array = face_objs[0]['face']
return img_array
except:
pass # Si falla la detección, continuamos con el procesamiento normal
# Si no se detectó rostro o falló la detección, redimensionar la imagen original
img_resized = img.resize((160, 160), Image.Resampling.LANCZOS)
return np.array(img_resized)
# ✅ Cargar CSV desde el dataset
dataset = load_dataset(
"csv",
data_files="metadata.csv",
split="train",
column_names=["image"],
header=0
)
def build_database():
print(f"📊 Uso actual de almacenamiento temporal INICIO: {get_folder_size('.'):.2f} GB")
print("🔄 Generando embeddings...")
batch_size = 10
archive_batch_size = 50
batch_files = []
batch_index = 0
ARCHIVE_DIR = Path("batches")
ARCHIVE_DIR.mkdir(exist_ok=True)
for i in range(0, len(dataset), batch_size):
batch = dataset[i:i + batch_size]
print(f"📦 Lote {i // batch_size + 1}/{(len(dataset) + batch_size - 1) // batch_size}")
for j in range(len(batch["image"])):
image_url = batch["image"][j]
if not isinstance(image_url, str) or not image_url.startswith("http") or image_url.strip().lower() == "image":
print(f"⚠️ Saltando {i + j} - URL inválida: {image_url}")
continue
name = f"image_{i + j}"
filename = LOCAL_EMB_DIR / f"{name}.pkl"
# Verificar si ya fue subido
try:
hf_hub_download(
repo_id=DATASET_ID,
repo_type="dataset",
filename=f"{EMBEDDINGS_SUBFOLDER}/batch_{batch_index:03}.tar.gz",
token=HF_TOKEN
)
print(f"⏩ Ya existe en remoto: {name}.pkl")
continue
except:
pass
try:
response = requests.get(image_url, headers=headers, timeout=10)
response.raise_for_status()
img = Image.open(BytesIO(response.content)).convert("RGB")
img_processed = preprocess_image(img)
embedding = DeepFace.represent(
img_path=img_processed,
model_name="Facenet",
enforce_detection=False
)[0]["embedding"]
with open(filename, "wb") as f:
pickle.dump({"name": name, "img": img, "embedding": embedding}, f)
batch_files.append(filename)
del img_processed
gc.collect()
if len(batch_files) >= archive_batch_size or get_folder_size(".") > MAX_TEMP_STORAGE_GB:
archive_path = ARCHIVE_DIR / f"batch_{batch_index:03}.tar.gz"
with tarfile.open(archive_path, "w:gz") as tar:
for file in batch_files:
tar.add(file, arcname=file.name)
print(f"📦 Empaquetado: {archive_path}")
upload_file(
path_or_fileobj=str(archive_path),
path_in_repo=f"{EMBEDDINGS_SUBFOLDER}/{archive_path.name}",
repo_id=DATASET_ID,
repo_type="dataset",
token=HF_TOKEN
)
print(f"✅ Subido: {archive_path.name}")
for f in batch_files:
f.unlink()
archive_path.unlink()
print("🧹 Limpieza completada tras subida")
batch_files = []
batch_index += 1
time.sleep(2)
print(f"📊 Uso actual FINAL: {get_folder_size('.'):.2f} GB")
except Exception as e:
print(f"❌ Error en {name}: {e}")
continue
if batch_files:
archive_path = ARCHIVE_DIR / f"batch_{batch_index:03}.tar.gz"
with tarfile.open(archive_path, "w:gz") as tar:
for file in batch_files:
tar.add(file, arcname=file.name)
print(f"📦 Empaquetado final: {archive_path}")
upload_file(
path_or_fileobj=str(archive_path),
path_in_repo=f"{EMBEDDINGS_SUBFOLDER}/{archive_path.name}",
repo_id=DATASET_ID,
repo_type="dataset",
token=HF_TOKEN
)
for f in batch_files:
f.unlink()
archive_path.unlink()
print("✅ Subida y limpieza final")
# 🔍 Buscar similitudes
def find_similar_faces(uploaded_image: Image.Image):
if uploaded_image is None:
return [], "⚠ Por favor, sube una imagen primero"
try:
print("🔄 Procesando imagen de entrada...")
# Convertir a RGB si no lo es
if uploaded_image.mode != 'RGB':
uploaded_image = uploaded_image.convert('RGB')
# Mostrar dimensiones de la imagen
print(f"📐 Dimensiones de la imagen: {uploaded_image.size}")
img_processed = preprocess_image(uploaded_image)
print("✅ Imagen preprocesada correctamente")
# Intentar primero con enforce_detection=True
try:
query_embedding = DeepFace.represent(
img_path=img_processed,
model_name="Facenet",
enforce_detection=True,
detector_backend='retinaface'
)[0]["embedding"]
print("✅ Rostro detectado con enforce_detection=True")
except Exception as e:
print(f"⚠ No se pudo detectar rostro con enforce_detection=True, intentando con False: {str(e)}")
# Si falla, intentar con enforce_detection=False
query_embedding = DeepFace.represent(
img_path=img_processed,
model_name="Facenet",
enforce_detection=False,
detector_backend='retinaface'
)[0]["embedding"]
print("✅ Embedding generado con enforce_detection=False")
del img_processed
gc.collect()
except Exception as e:
print(f"❌ Error en procesamiento de imagen: {str(e)}")
return [], f"⚠ Error procesando imagen: {str(e)}"
similarities = []
print("🔍 Buscando similitudes en la base de datos...")
try:
embedding_files = [
f for f in list_repo_files(DATASET_ID, repo_type="dataset", token=HF_TOKEN)
if f.startswith(f"{EMBEDDINGS_SUBFOLDER}/") and f.endswith(".tar.gz")
]
print(f"📁 Encontrados {len(embedding_files)} archivos de embeddings")
except Exception as e:
print(f"❌ Error obteniendo archivos: {str(e)}")
return [], f"⚠ Error obteniendo archivos: {str(e)}"
# Procesar en lotes para mejor rendimiento
batch_size = 10
for i in range(0, len(embedding_files), batch_size):
batch_files = embedding_files[i:i + batch_size]
print(f"📦 Procesando lote {i//batch_size + 1}/{(len(embedding_files) + batch_size - 1)//batch_size}")
for file_path in batch_files:
try:
file_bytes = requests.get(
f"https://huggingface.co/datasets/{DATASET_ID}/resolve/main/{file_path}",
headers=headers,
timeout=30
).content
# Crear un archivo temporal para el tar.gz
temp_archive = Path("temp_archive.tar.gz")
with open(temp_archive, "wb") as f:
f.write(file_bytes)
# Extraer el contenido
with tarfile.open(temp_archive, "r:gz") as tar:
tar.extractall(path="temp_extract")
# Procesar cada archivo .pkl en el tar
for pkl_file in Path("temp_extract").glob("*.pkl"):
with open(pkl_file, "rb") as f:
record = pickle.load(f)
name = record["name"]
img = record["img"]
emb = record["embedding"]
dist = np.linalg.norm(np.array(query_embedding) - np.array(emb))
sim_score = 1 / (1 + dist)
similarities.append((sim_score, name, np.array(img)))
# Limpiar archivos temporales
shutil.rmtree("temp_extract")
temp_archive.unlink()
except Exception as e:
print(f"⚠ Error procesando {file_path}: {e}")
continue
if not similarities:
return [], "⚠ No se encontraron similitudes en la base de datos"
print(f"✅ Encontradas {len(similarities)} similitudes")
similarities.sort(reverse=True)
top = similarities[:5]
gallery = [(img, f"{name} - Similitud: {sim:.2f}") for sim, name, img in top]
summary = "\n".join([f"{name} - Similitud: {sim:.2f}" for sim, name, _ in top])
return gallery, summary
# 🎛️ Interfaz Gradio
with gr.Blocks() as demo:
gr.Markdown("## 🔍 Reconocimiento facial con DeepFace + ZeroGPU")
with gr.Row():
image_input = gr.Image(label="📤 Sube una imagen", type="pil")
find_btn = gr.Button("🔎 Buscar similares")
gallery = gr.Gallery(label="📸 Rostros similares")
summary = gr.Textbox(label="🧠 Detalle", lines=6)
find_btn.click(fn=find_similar_faces, inputs=image_input, outputs=[gallery, summary])
with gr.Row():
build_btn = gr.Button("⚙️ Construir base de embeddings (usa GPU)")
build_btn.click(fn=build_database, inputs=[], outputs=[])
demo.launch(share=True)