Spaces:
Runtime error
Runtime error
import torch | |
import torch.nn.functional as F | |
import logging | |
import os | |
import gc | |
import numpy as np | |
import cv2 | |
from PIL import Image | |
from typing import List, Dict, Any, Tuple | |
from transformers import Owlv2Processor, Owlv2ForObjectDetection | |
from .base import BaseDetector | |
import time | |
logger = logging.getLogger(__name__) | |
class WeaponDetectorGPU(BaseDetector): | |
"""Detector de armas otimizado para GPU.""" | |
def __init__(self): | |
"""Inicializa o detector.""" | |
super().__init__() | |
self.default_resolution = 640 | |
self.device = None # Será configurado em _initialize | |
self._initialize() | |
def _initialize(self): | |
try: | |
# Configurar device | |
if not torch.cuda.is_available(): | |
raise RuntimeError("CUDA não está disponível!") | |
# Configurar device corretamente | |
self.device = torch.device("cuda:0") # Usar device CUDA | |
# Carregar modelo e processador | |
logger.info("Carregando modelo e processador...") | |
model_name = "google/owlv2-base-patch16" | |
self.owlv2_processor = Owlv2Processor.from_pretrained(model_name) | |
self.owlv2_model = Owlv2ForObjectDetection.from_pretrained( | |
model_name, | |
torch_dtype=torch.float16, | |
device_map={"": 0} # Mapear todo o modelo para GPU 0 | |
) | |
# Otimizar modelo | |
self.owlv2_model.eval() | |
# Processar queries | |
self.text_queries = self._get_detection_queries() | |
logger.info(f"Queries carregadas: {self.text_queries}") # Log das queries | |
self.processed_text = self.owlv2_processor( | |
text=self.text_queries, | |
return_tensors="pt", | |
padding=True | |
).to(self.device) | |
logger.info("Inicialização GPU completa!") | |
self._initialized = True | |
except Exception as e: | |
logger.error(f"Erro na inicialização GPU: {str(e)}") | |
raise | |
def detect_objects(self, image: Image.Image, threshold: float = 0.3) -> List[Dict]: | |
"""Detecta objetos em uma imagem.""" | |
try: | |
# Pré-processar imagem | |
image = self._preprocess_image(image) | |
# Processar imagem | |
image_inputs = self.owlv2_processor( | |
images=image, | |
return_tensors="pt" | |
).to(self.device) | |
# Inferência | |
with torch.no_grad(): | |
outputs = self.owlv2_model(**{**image_inputs, **self.processed_text}) | |
target_sizes = torch.tensor([image.size[::-1]], device=self.device) | |
results = self.owlv2_processor.post_process_grounded_object_detection( | |
outputs=outputs, | |
target_sizes=target_sizes, | |
threshold=threshold | |
)[0] | |
# Processar detecções | |
detections = [] | |
if len(results["scores"]) > 0: | |
for score, box, label in zip(results["scores"], results["boxes"], results["labels"]): | |
score_val = score.item() | |
if score_val >= threshold: | |
label_idx = min(label.item(), len(self.text_queries) - 1) | |
detections.append({ | |
"confidence": round(score_val * 100, 2), | |
"box": [int(x) for x in box.tolist()], | |
"label": self.text_queries[label_idx] | |
}) | |
logger.debug(f"Detecção: {self.text_queries[label_idx]} ({score_val * 100:.2f}%)") | |
# Aplicar NMS nas detecções | |
return self._apply_nms(detections) | |
except Exception as e: | |
logger.error(f"Erro em detect_objects: {str(e)}") | |
return [] | |
def _get_best_device(self) -> torch.device: | |
"""Retorna o melhor dispositivo disponível.""" | |
if torch.cuda.is_available(): | |
return torch.device("cuda:0") | |
return torch.device("cpu") | |
def _clear_gpu_memory(self): | |
"""Limpa memória GPU.""" | |
torch.cuda.empty_cache() | |
gc.collect() | |
def process_video(self, video_path: str, fps: int = None, threshold: float = 0.3, resolution: int = 640) -> Tuple[str, Dict]: | |
"""Processa um vídeo para detecção de objetos.""" | |
metrics = { | |
"total_time": 0, | |
"frame_extraction_time": 0, | |
"analysis_time": 0, | |
"frames_analyzed": 0, | |
"video_duration": 0, | |
"device_type": "GPU", | |
"detections": [], | |
"technical": { | |
"model": "owlv2-base-patch16", | |
"input_size": f"{resolution}x{resolution}", | |
"nms_threshold": 0.5, | |
"preprocessing": "optimized", | |
"early_stop": False | |
} | |
} | |
try: | |
start_time = time.time() | |
# Limpar cache de GPU antes de começar | |
torch.cuda.empty_cache() | |
gc.collect() | |
# Extrair frames | |
t0 = time.time() | |
frames = self.extract_frames(video_path, fps or 2, resolution) | |
metrics["frame_extraction_time"] = time.time() - t0 | |
metrics["frames_analyzed"] = len(frames) | |
if not frames: | |
logger.warning("Nenhum frame extraído do vídeo") | |
return video_path, metrics | |
# Calcular duração do vídeo | |
metrics["video_duration"] = len(frames) / (fps or 2) | |
# Processar frames | |
t0 = time.time() | |
detections_by_frame = self._process_frames(frames, fps, threshold) | |
# Atualizar métricas finais | |
metrics["analysis_time"] = time.time() - t0 | |
metrics["total_time"] = time.time() - start_time | |
metrics["detections"] = detections_by_frame | |
return video_path, metrics | |
except Exception as e: | |
logger.error(f"Erro ao processar vídeo: {str(e)}") | |
return video_path, metrics | |
def _process_frames(self, frames: List[np.ndarray], fps: int, threshold: float) -> List[Dict]: | |
"""Processa frames do vídeo para detecção.""" | |
detections_by_frame = [] | |
for i, frame in enumerate(frames): | |
try: | |
# Preparar frame | |
frame_pil = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)) | |
frame_pil = self._preprocess_image(frame_pil) | |
# Processar frame | |
inputs = self.owlv2_processor( | |
images=frame_pil, | |
return_tensors="pt" | |
).to(self.device) | |
# Inferência | |
with torch.no_grad(): | |
outputs = self.owlv2_model(**{**inputs, **self.processed_text}) | |
target_sizes = torch.tensor([frame_pil.size[::-1]], device=self.device) | |
results = self.owlv2_processor.post_process_grounded_object_detection( | |
outputs=outputs, | |
target_sizes=target_sizes, | |
threshold=threshold | |
)[0] | |
# Processar resultados | |
if len(results["scores"]) > 0: | |
frame_detections = [] | |
for score, box, label in zip(results["scores"], results["boxes"], results["labels"]): | |
score_val = score.item() | |
if score_val >= threshold: | |
label_idx = min(label.item(), len(self.text_queries) - 1) | |
frame_detections.append({ | |
"confidence": round(score_val * 100, 2), | |
"box": [int(x) for x in box.tolist()], | |
"label": self.text_queries[label_idx], | |
"frame": i, | |
"timestamp": i / (fps or 2) | |
}) | |
if frame_detections: | |
detections_by_frame.extend(self._apply_nms(frame_detections)) | |
except Exception as e: | |
logger.error(f"Erro ao processar frame {i}: {str(e)}") | |
continue | |
finally: | |
# Liberar memória | |
if 'inputs' in locals(): | |
del inputs | |
if 'outputs' in locals(): | |
del outputs | |
torch.cuda.empty_cache() | |
# Log de progresso | |
if i % 10 == 0: | |
logger.info(f"Processados {i}/{len(frames)} frames") | |
return detections_by_frame | |
def _preprocess_image(self, image: Image.Image) -> Image.Image: | |
"""Pré-processa a imagem para o formato esperado pelo modelo.""" | |
try: | |
# Cache de tamanho para evitar redimensionamentos desnecessários | |
if hasattr(self, '_last_size') and self._last_size == image.size: | |
return image | |
# Converter para RGB se necessário usando conversão direta | |
if image.mode != 'RGB': | |
image = image.convert('RGB') | |
# Redimensionar mantendo proporção com otimização de memória | |
target_size = (self.default_resolution, self.default_resolution) | |
if image.size != target_size: | |
# Calcular novo tamanho uma única vez | |
ratio = min(target_size[0] / image.size[0], target_size[1] / image.size[1]) | |
new_size = tuple(int(dim * ratio) for dim in image.size) | |
# Redimensionar diretamente para o tamanho final se possível | |
if new_size == target_size: | |
image = image.resize(target_size, Image.Resampling.BILINEAR) | |
else: | |
# Criar imagem com padding em uma única operação | |
new_image = Image.new('RGB', target_size, (0, 0, 0)) | |
image = image.resize(new_size, Image.Resampling.BILINEAR) | |
paste_x = (target_size[0] - new_size[0]) // 2 | |
paste_y = (target_size[1] - new_size[1]) // 2 | |
new_image.paste(image, (paste_x, paste_y)) | |
image = new_image | |
# Armazenar tamanho para cache | |
self._last_size = image.size | |
return image | |
except Exception as e: | |
logger.error(f"Erro no pré-processamento: {str(e)}") | |
return image | |
def _apply_nms(self, detections: List[Dict], iou_threshold: float = 0.5) -> List[Dict]: | |
"""Aplica Non-Maximum Suppression nas detecções.""" | |
try: | |
if not detections or len(detections) <= 1: | |
return detections | |
# Extrair scores e boxes | |
scores = torch.tensor([d["confidence"] for d in detections], device=self.device) | |
boxes = torch.tensor([[d["box"][0], d["box"][1], d["box"][2], d["box"][3]] | |
for d in detections], device=self.device) | |
# Ordenar por score | |
_, order = scores.sort(descending=True) | |
keep = [] | |
while order.numel() > 0: | |
if order.numel() == 1: | |
keep.append(order.item()) | |
break | |
i = order[0] | |
keep.append(i.item()) | |
# Calcular IoU com os boxes restantes | |
box1 = boxes[i] | |
box2 = boxes[order[1:]] | |
# Calcular interseção | |
left = torch.max(box1[0], box2[:, 0]) | |
top = torch.max(box1[1], box2[:, 1]) | |
right = torch.min(box1[2], box2[:, 2]) | |
bottom = torch.min(box1[3], box2[:, 3]) | |
width = torch.clamp(right - left, min=0) | |
height = torch.clamp(bottom - top, min=0) | |
inter = width * height | |
# Calcular união | |
area1 = (box1[2] - box1[0]) * (box1[3] - box1[1]) | |
area2 = (box2[:, 2] - box2[:, 0]) * (box2[:, 3] - box2[:, 1]) | |
union = area1 + area2 - inter | |
# Calcular IoU | |
iou = inter / union | |
mask = iou <= iou_threshold | |
order = order[1:][mask] | |
# Retornar detecções filtradas | |
return [detections[i] for i in keep] | |
except Exception as e: | |
logger.error(f"Erro ao aplicar NMS: {str(e)}") | |
return detections |