test_ebc / custom /clip_ebc_tensorrt.py
piaspace's picture
[first]
bb3e610
import os
import sys
import torch
import numpy as np
import tensorrt as trt
from typing import Union, Tuple, Optional
from PIL import Image
import matplotlib.pyplot as plt
from torchvision.transforms import ToTensor, Normalize
from torchvision.transforms.functional import normalize, to_pil_image
import json
import datetime
from scipy.ndimage import gaussian_filter
from sklearn.cluster import KMeans
import assets
# ํ”„๋กœ์ ํŠธ ๋ฃจํŠธ ๋””๋ ‰ํ† ๋ฆฌ ์„ค์ •
project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.append(project_root)
class ClipEBCTensorRT:
"""
CLIP-EBC (Efficient Boundary Counting) TensorRT ๋ฒ„์ „ ์ด๋ฏธ์ง€ ์ฒ˜๋ฆฌ ํด๋ž˜์Šค์ž…๋‹ˆ๋‹ค.
TensorRT๋กœ ๋ณ€ํ™˜๋œ CLIP ๋ชจ๋ธ์„ ์‚ฌ์šฉํ•˜์—ฌ ์ด๋ฏธ์ง€๋ฅผ ์ฒ˜๋ฆฌํ•˜๋ฉฐ, ์Šฌ๋ผ์ด๋”ฉ ์œˆ๋„์šฐ ์˜ˆ์ธก ๊ธฐ๋Šฅ์„ ํฌํ•จํ•œ
๋‹ค์–‘ํ•œ ์„ค์ • ์˜ต์…˜์„ ์ œ๊ณตํ•ฉ๋‹ˆ๋‹ค.
"""
def __init__(self,
engine_path="assets/CLIP_EBC_nwpu_rmse_tensorrt.trt",
truncation=4,
reduction=8,
granularity="fine",
anchor_points="average",
input_size=224,
window_size=224,
stride=224,
dataset_name="qnrf",
mean=(0.485, 0.456, 0.406),
std=(0.229, 0.224, 0.225),
config_dir ="configs"):
"""CLIPEBC TensorRT ํด๋ž˜์Šค๋ฅผ ์„ค์ • ๋งค๊ฐœ๋ณ€์ˆ˜์™€ ํ•จ๊ป˜ ์ดˆ๊ธฐํ™”ํ•ฉ๋‹ˆ๋‹ค."""
self.engine_path = engine_path
self.truncation = truncation
self.reduction = reduction
self.granularity = granularity
self.anchor_points_type = anchor_points
self.input_size = input_size
self.window_size = window_size
self.stride = stride
self.dataset_name = dataset_name
self.mean = mean
self.std = std
self.config_dir = config_dir
# ๊ฒฐ๊ณผ ์ €์žฅ์šฉ ๋ณ€์ˆ˜ ์ดˆ๊ธฐํ™”
self.density_map = None
self.processed_image = None
self.count = None
self.original_image = None
# TensorRT ์—”์ง„ ๋กœ๋“œ
print(f"TensorRT ์—”์ง„ ๋กœ๋“œ ์ค‘: {self.engine_path}")
self._load_engine()
# ์ž…๋ ฅ ๋ฐ ์ถœ๋ ฅ ์ด๋ฆ„ ์„ค์ •
self.input_name = "input"
self.output_name = "output"
print(f"TensorRT ์—”์ง„ ์ดˆ๊ธฐํ™” ์™„๋ฃŒ")
def _load_engine(self):
"""TensorRT ์—”์ง„์„ ๋กœ๋“œํ•ฉ๋‹ˆ๋‹ค."""
# TensorRT ๋กœ๊ฑฐ ์ƒ์„ฑ
TRT_LOGGER = trt.Logger(trt.Logger.WARNING)
# ๋Ÿฐํƒ€์ž„ ์ƒ์„ฑ
self.runtime = trt.Runtime(TRT_LOGGER)
# ์—”์ง„ ํŒŒ์ผ ๋กœ๋“œ
with open(self.engine_path, 'rb') as f:
engine_data = f.read()
# ์ง๋ ฌํ™”๋œ ์—”์ง„์—์„œ ์—”์ง„ ์ƒ์„ฑ
self.engine = self.runtime.deserialize_cuda_engine(engine_data)
# ์‹คํ–‰ ์ปจํ…์ŠคํŠธ ์ƒ์„ฑ
self.context = self.engine.create_execution_context()
# TensorRT 10.x์—์„œ๋Š” input_binding/output_binding ๋Œ€์‹  ๋„คํŠธ์›Œํฌ ๊ตฌ์กฐ๋ฅผ ํ™•์ธ
# ์ž…๋ ฅ๊ณผ ์ถœ๋ ฅ์„ ๊ฐ€์ ธ์˜ค๋Š” ๋ฐฉ๋ฒ•์ด ๋ณ€๊ฒฝ๋จ
self.num_io_tensors = self.engine.num_io_tensors
# ์ž…๋ ฅ๊ณผ ์ถœ๋ ฅ ํ…์„œ ์ด๋ฆ„ ์ฐพ๊ธฐ
self.input_tensor_names = []
self.output_tensor_names = []
print(f"TensorRT ์—”์ง„์—์„œ {self.num_io_tensors}๊ฐœ์˜ IO ํ…์„œ๋ฅผ ์ฐพ์•˜์Šต๋‹ˆ๋‹ค")
for i in range(self.num_io_tensors):
name = self.engine.get_tensor_name(i)
is_input = self.engine.get_tensor_mode(name) == trt.TensorIOMode.INPUT
if is_input:
self.input_tensor_names.append(name)
else:
self.output_tensor_names.append(name)
# ์ž…๋ ฅ๊ณผ ์ถœ๋ ฅ ์ด๋ฆ„ ์„ค์ •
if not self.input_tensor_names:
raise ValueError("์—”์ง„์—์„œ ์ž…๋ ฅ ํ…์„œ๋ฅผ ์ฐพ์„ ์ˆ˜ ์—†์Šต๋‹ˆ๋‹ค.")
if not self.output_tensor_names:
raise ValueError("์—”์ง„์—์„œ ์ถœ๋ ฅ ํ…์„œ๋ฅผ ์ฐพ์„ ์ˆ˜ ์—†์Šต๋‹ˆ๋‹ค.")
# ๊ธฐ๋ณธ ์ž…๋ ฅ ๋ฐ ์ถœ๋ ฅ ์ด๋ฆ„ ์„ค์ •
self.input_name = self.input_tensor_names[0]
self.output_name = self.output_tensor_names[0]
# ์ž…์ถœ๋ ฅ ํ˜•ํƒœ ์ถ”์ถœ
self.input_shape = self.engine.get_tensor_shape(self.input_name)
self.output_shape = self.engine.get_tensor_shape(self.output_name)
print(f"์ž…๋ ฅ ์ด๋ฆ„: {self.input_name}, ํ˜•ํƒœ: {self.input_shape}")
print(f"์ถœ๋ ฅ ์ด๋ฆ„: {self.output_name}, ํ˜•ํƒœ: {self.output_shape}")
def _process_image(self, image: Union[str, np.ndarray]) -> np.ndarray:
"""
์ด๋ฏธ์ง€๋ฅผ ์ „์ฒ˜๋ฆฌํ•ฉ๋‹ˆ๋‹ค. ์ด๋ฏธ์ง€ ๊ฒฝ๋กœ, ๋„˜ํŒŒ์ด ๋ฐฐ์—ด, Streamlit UploadedFile ๋ชจ๋‘ ์ฒ˜๋ฆฌ ๊ฐ€๋Šฅํ•ฉ๋‹ˆ๋‹ค.
Args:
image: ์ž…๋ ฅ ์ด๋ฏธ์ง€. ๋‹ค์Œ ํ˜•์‹ ์ค‘ ํ•˜๋‚˜์—ฌ์•ผ ํ•ฉ๋‹ˆ๋‹ค:
- str: ์ด๋ฏธ์ง€ ํŒŒ์ผ ๊ฒฝ๋กœ
- np.ndarray: (H, W, 3) ํ˜•ํƒœ์˜ RGB ์ด๋ฏธ์ง€
- UploadedFile: Streamlit์˜ ์—…๋กœ๋“œ๋œ ํŒŒ์ผ
Returns:
np.ndarray: ์ „์ฒ˜๋ฆฌ๋œ ์ด๋ฏธ์ง€ ๋ฐฐ์—ด, shape (1, 3, H, W)
"""
to_tensor = ToTensor()
normalize = Normalize(mean=self.mean, std=self.std)
# ์›๋ณธ ์ด๋ฏธ์ง€ ์ €์žฅ
self.original_image = image
# ์ž…๋ ฅ ํƒ€์ž…์— ๋”ฐ๋ฅธ ์ฒ˜๋ฆฌ
if isinstance(image, str):
# ํŒŒ์ผ ๊ฒฝ๋กœ์ธ ๊ฒฝ์šฐ
with open(image, "rb") as f:
pil_image = Image.open(f).convert("RGB")
elif isinstance(image, np.ndarray):
# ๋„˜ํŒŒ์ด ๋ฐฐ์—ด์ธ ๊ฒฝ์šฐ
if image.dtype == np.uint8:
pil_image = Image.fromarray(image)
else:
# float ํƒ€์ž…์ธ ๊ฒฝ์šฐ [0, 1] ๋ฒ”์œ„๋กœ ๊ฐ€์ •ํ•˜๊ณ  ๋ณ€ํ™˜
pil_image = Image.fromarray((image * 255).astype(np.uint8))
else:
# Streamlit UploadedFile ๋˜๋Š” ๊ธฐํƒ€ ํŒŒ์ผ ๊ฐ์ฒด์ธ ๊ฒฝ์šฐ
try:
pil_image = Image.open(image).convert("RGB")
except Exception as e:
raise ValueError(f"์ง€์›ํ•˜์ง€ ์•Š๋Š” ์ด๋ฏธ์ง€ ํ˜•์‹์ž…๋‹ˆ๋‹ค: {type(image)}") from e
# ํ…์„œ ๋ณ€ํ™˜ ๋ฐ ์ •๊ทœํ™”
tensor_image = to_tensor(pil_image)
normalized_image = normalize(tensor_image)
batched_image = normalized_image.unsqueeze(0) # (1, 3, H, W)
# numpy๋กœ ๋ณ€ํ™˜
numpy_image = batched_image.numpy()
return numpy_image
def _post_process_image(self, image_tensor):
"""์ด๋ฏธ์ง€ ํ…์„œ๋ฅผ PIL ์ด๋ฏธ์ง€๋กœ ๋ณ€ํ™˜ํ•ฉ๋‹ˆ๋‹ค."""
# NumPy ๋ฐฐ์—ด์„ PyTorch ํ…์„œ๋กœ ๋ณ€ํ™˜
if isinstance(image_tensor, np.ndarray):
image_tensor = torch.from_numpy(image_tensor)
# ์ •๊ทœํ™” ์—ญ๋ณ€ํ™˜
image = normalize(
image_tensor,
mean=[0., 0., 0.],
std=[1./self.std[0], 1./self.std[1], 1./self.std[2]]
)
image = normalize(
image,
mean=[-self.mean[0], -self.mean[1], -self.mean[2]],
std=[1., 1., 1.]
)
# ๋ฐฐ์น˜ ์ฐจ์› ์ œ๊ฑฐ ๋ฐ PIL ์ด๋ฏธ์ง€๋กœ ๋ณ€ํ™˜
processed_image = to_pil_image(image.squeeze(0))
return processed_image
def _infer_batch(self, batch_input):
"""
TensorRT ์—”์ง„์„ ์‚ฌ์šฉํ•˜์—ฌ ๋ฐฐ์น˜ ์ถ”๋ก ์„ ์ˆ˜ํ–‰ํ•ฉ๋‹ˆ๋‹ค. (์ˆ˜์ • ๋ฒ„์ „)
"""
import pycuda.driver as cuda
import pycuda.autoinit
import numpy as np
batch_size = batch_input.shape[0]
# ์ž…๋ ฅ์˜ ํ˜•ํƒœ์™€ ๋ฐ์ดํ„ฐ ํƒ€์ž… ํ™•์ธ
input_shape = (batch_size, 3, self.input_size, self.input_size)
print(f"์ž…๋ ฅ ๋ฐฐ์น˜ ํ˜•ํƒœ: {batch_input.shape}, ๋ฐ์ดํ„ฐ ํƒ€์ž…: {batch_input.dtype}")
# ์ž…๋ ฅ ํ˜•ํƒœ ๊ฒ€์ฆ
if batch_input.shape != input_shape:
print(f"๊ฒฝ๊ณ : ์ž…๋ ฅ ํ˜•ํƒœ ๋ถˆ์ผ์น˜. ์˜ˆ์ƒ: {input_shape}, ์‹ค์ œ: {batch_input.shape}")
# ํ•„์š”์‹œ ํ˜•ํƒœ ์ˆ˜์ •
batch_input = np.resize(batch_input, input_shape)
# ๋ฐ์ดํ„ฐ ํƒ€์ž… ๊ฒ€์ฆ
if batch_input.dtype != np.float32:
print(f"๊ฒฝ๊ณ : ์ž…๋ ฅ ๋ฐ์ดํ„ฐ ํƒ€์ž… ๋ถˆ์ผ์น˜. float32๋กœ ๋ณ€ํ™˜ํ•ฉ๋‹ˆ๋‹ค.")
batch_input = batch_input.astype(np.float32)
# ๋™์  ๋ฐฐ์น˜ ํฌ๊ธฐ ์„ค์ •
self.context.set_input_shape(self.input_name, input_shape)
# ์ถœ๋ ฅ ํ˜•ํƒœ ๊ฐ€์ ธ์˜ค๊ธฐ
output_shape = self.context.get_tensor_shape(self.output_name)
output_shape = tuple(output_shape) # ํŠœํ”Œ๋กœ ๋ณ€ํ™˜ํ•˜์—ฌ ์•ˆ์ „์„ฑ ๋ณด์žฅ
print(f"์ถœ๋ ฅ ํ˜•ํƒœ: {output_shape}")
# -1 ๊ฐ’์„ ์‹ค์ œ ๋ฐฐ์น˜ ํฌ๊ธฐ๋กœ ๋Œ€์ฒด
if output_shape[0] == -1:
output_shape = (batch_size,) + output_shape[1:]
# ์ถœ๋ ฅ ๋ฒ„ํผ ์ค€๋น„
output = np.empty(output_shape, dtype=np.float32)
# ํ˜ธ์ŠคํŠธ ๋ฉ”๋ชจ๋ฆฌ ์ค€๋น„ (ํŽ˜์ด์ง€ ์ž ๊ธˆ ๋ฉ”๋ชจ๋ฆฌ ์‚ฌ์šฉ)
h_input = cuda.pagelocked_empty(batch_input.shape, dtype=np.float32)
h_output = cuda.pagelocked_empty(output_shape, dtype=np.float32)
# ์ž…๋ ฅ ๋ฐ์ดํ„ฐ ๋ณต์‚ฌ
np.copyto(h_input, batch_input)
# ๋””๋ฐ”์ด์Šค ๋ฉ”๋ชจ๋ฆฌ ํ• ๋‹น
d_input = cuda.mem_alloc(h_input.nbytes)
d_output = cuda.mem_alloc(h_output.nbytes)
# CUDA ์ŠคํŠธ๋ฆผ ์ƒ์„ฑ
stream = cuda.Stream()
try:
# ๋ฉ”๋ชจ๋ฆฌ ๋ณต์‚ฌ (ํ˜ธ์ŠคํŠธ -> ๋””๋ฐ”์ด์Šค)
cuda.memcpy_htod_async(d_input, h_input, stream)
# ํ…์„œ ์ฃผ์†Œ ์„ค์ •
self.context.set_tensor_address(self.input_name, int(d_input))
self.context.set_tensor_address(self.output_name, int(d_output))
# ๋””๋ฒ„๊น… ์ •๋ณด (๋ฉ”๋ชจ๋ฆฌ ์ฃผ์†Œ)
print(f"์ž…๋ ฅ ๋ฉ”๋ชจ๋ฆฌ ์ฃผ์†Œ: {int(d_input)}, ์ถœ๋ ฅ ๋ฉ”๋ชจ๋ฆฌ ์ฃผ์†Œ: {int(d_output)}")
# ์‹คํ–‰
success = self.context.execute_async_v3(stream_handle=stream.handle)
if not success:
print("TensorRT ์‹คํ–‰ ์‹คํŒจ")
return None
# ๋ฉ”๋ชจ๋ฆฌ ๋ณต์‚ฌ (๋””๋ฐ”์ด์Šค -> ํ˜ธ์ŠคํŠธ)
cuda.memcpy_dtoh_async(h_output, d_output, stream)
# ์ŠคํŠธ๋ฆผ ๋™๊ธฐํ™”
stream.synchronize()
# ์ถœ๋ ฅ ๋ฐ์ดํ„ฐ ๋ณต์‚ฌ
np.copyto(output, h_output)
return output
except Exception as e:
print(f"TensorRT ์ถ”๋ก  ์ค‘ ์˜ค๋ฅ˜ ๋ฐœ์ƒ: {str(e)}")
import traceback
traceback.print_exc()
return None
finally:
# ๋ฉ”๋ชจ๋ฆฌ ํ•ด์ œ
del stream
if 'd_input' in locals():
d_input.free()
if 'd_output' in locals():
d_output.free()
def sliding_window_predict(self, image: np.ndarray, window_size: Union[int, Tuple[int, int]],
stride: Union[int, Tuple[int, int]]) -> np.ndarray:
"""
์Šฌ๋ผ์ด๋”ฉ ์œˆ๋„์šฐ ๋ฐฉ์‹์œผ๋กœ ์ด๋ฏธ์ง€ ์˜ˆ์ธก์„ ์ˆ˜ํ–‰ํ•ฉ๋‹ˆ๋‹ค. ๊ฒน์น˜๋Š” ์˜์—ญ์€ ํ‰๊ท ๊ฐ’์„ ์‚ฌ์šฉํ•ฉ๋‹ˆ๋‹ค.
Args:
image (np.ndarray): ํ˜•ํƒœ๊ฐ€ (1, 3, H, W)์ธ ์ด๋ฏธ์ง€ ๋ฐฐ์—ด
window_size (int or tuple): ์œˆ๋„์šฐ ํฌ๊ธฐ
stride (int or tuple): ์œˆ๋„์šฐ ์ด๋™ ๊ฐ„๊ฒฉ
Returns:
np.ndarray: ์˜ˆ์ธก๋œ ๋ฐ€๋„ ๋งต
"""
# CUDA ์ดˆ๊ธฐํ™” (์ฒ˜์Œ ์‚ฌ์šฉํ•  ๋•Œ๋งŒ)
global cuda
if 'cuda' not in globals():
import pycuda.driver as cuda
cuda.init()
# ์ž…๋ ฅ ๊ฒ€์ฆ
assert len(image.shape) == 4, f"์ด๋ฏธ์ง€๋Š” 4์ฐจ์› ๋ฐฐ์—ด์ด์–ด์•ผ ํ•ฉ๋‹ˆ๋‹ค. (1, C, H, W), ํ˜„์žฌ: {image.shape}"
# ์œˆ๋„์šฐ ํฌ๊ธฐ์™€ ์ŠคํŠธ๋ผ์ด๋“œ ์„ค์ •
window_size = (int(window_size), int(window_size)) if isinstance(window_size, (int, float)) else window_size
stride = (int(stride), int(stride)) if isinstance(stride, (int, float)) else stride
window_size = tuple(window_size)
stride = tuple(stride)
# ๊ฒ€์ฆ
assert isinstance(window_size, tuple) and len(window_size) == 2 and window_size[0] > 0 and window_size[1] > 0, \
f"์œˆ๋„์šฐ ํฌ๊ธฐ๋Š” ์–‘์ˆ˜ ์ •์ˆ˜ ํŠœํ”Œ (h, w)์ด์–ด์•ผ ํ•ฉ๋‹ˆ๋‹ค. ํ˜„์žฌ: {window_size}"
assert isinstance(stride, tuple) and len(stride) == 2 and stride[0] > 0 and stride[1] > 0, \
f"์ŠคํŠธ๋ผ์ด๋“œ๋Š” ์–‘์ˆ˜ ์ •์ˆ˜ ํŠœํ”Œ (h, w)์ด์–ด์•ผ ํ•ฉ๋‹ˆ๋‹ค. ํ˜„์žฌ: {stride}"
assert stride[0] <= window_size[0] and stride[1] <= window_size[1], \
f"์ŠคํŠธ๋ผ์ด๋“œ๋Š” ์œˆ๋„์šฐ ํฌ๊ธฐ๋ณด๋‹ค ์ž‘์•„์•ผ ํ•ฉ๋‹ˆ๋‹ค. ํ˜„์žฌ: {stride}์™€ {window_size}"
image_height, image_width = image.shape[-2:]
window_height, window_width = window_size
stride_height, stride_width = stride
# ์Šฌ๋ผ์ด๋”ฉ ์œˆ๋„์šฐ ์ˆ˜ ๊ณ„์‚ฐ
num_rows = int(np.ceil((image_height - window_height) / stride_height) + 1)
num_cols = int(np.ceil((image_width - window_width) / stride_width) + 1)
# ์œˆ๋„์šฐ ์ถ”์ถœ
windows = []
window_positions = []
for i in range(num_rows):
for j in range(num_cols):
x_start, y_start = i * stride_height, j * stride_width
x_end, y_end = x_start + window_height, y_start + window_width
# ์ด๋ฏธ์ง€ ๊ฒฝ๊ณ„ ์ฒ˜๋ฆฌ
if x_end > image_height:
x_start, x_end = image_height - window_height, image_height
if y_end > image_width:
y_start, y_end = image_width - window_width, image_width
window = image[:, :, x_start:x_end, y_start:y_end]
windows.append(window)
window_positions.append((x_start, y_start, x_end, y_end))
# ๋ฐฐ์น˜ ๋‹จ์œ„๋กœ ์ถ”๋ก 
all_preds = []
max_batch_size = 8
for start_idx in range(0, len(windows), max_batch_size):
end_idx = min(start_idx + max_batch_size, len(windows))
batch_windows = np.vstack(windows[start_idx:end_idx]) # (batch_size, 3, h, w)
# TensorRT ์ถ”๋ก 
batch_preds = self._infer_batch(batch_windows)
# Debug ์ •๋ณด
# print(f"๋ฐฐ์น˜ ์ž…๋ ฅ ํ˜•ํƒœ: {batch_windows.shape}, ๋ฐฐ์น˜ ์ถœ๋ ฅ ํ˜•ํƒœ: {batch_preds.shape}")
all_preds.extend([batch_preds[i:i+1] for i in range(batch_preds.shape[0])])
# ์˜ˆ์ธก ๊ฒฐ๊ณผ๋ฅผ numpy ๋ฐฐ์—ด๋กœ ๋ณ€ํ™˜
preds = np.concatenate(all_preds, axis=0)
# ์ถœ๋ ฅ ๋ฐ€๋„ ๋งต ์กฐ๋ฆฝ
pred_map = np.zeros((preds.shape[1], image_height // self.reduction, image_width // self.reduction), dtype=np.float32)
count_map = np.zeros((preds.shape[1], image_height // self.reduction, image_width // self.reduction), dtype=np.float32)
idx = 0
for i in range(num_rows):
for j in range(num_cols):
x_start, y_start, x_end, y_end = window_positions[idx]
# ์ถœ๋ ฅ ์˜์—ญ ๊ณ„์‚ฐ (reduction ๊ณ ๋ ค)
x_start_out = x_start // self.reduction
y_start_out = y_start // self.reduction
x_end_out = x_end // self.reduction
y_end_out = y_end // self.reduction
pred_map[:, x_start_out:x_end_out, y_start_out:y_end_out] += preds[idx]
count_map[:, x_start_out:x_end_out, y_start_out:y_end_out] += 1.
idx += 1
# ๊ฒน์น˜๋Š” ์˜์—ญ ํ‰๊ท  ๊ณ„์‚ฐ
pred_map /= count_map
return pred_map
def resize_density_map(self, density_map: np.ndarray, target_size: Tuple[int, int]) -> np.ndarray:
"""
๋ฐ€๋„ ๋งต์˜ ํฌ๊ธฐ๋ฅผ ์กฐ์ •ํ•ฉ๋‹ˆ๋‹ค. ์ดํ•ฉ์€ ๋ณด์กด๋ฉ๋‹ˆ๋‹ค.
Args:
density_map: ํ˜•ํƒœ๊ฐ€ (C, H, W)์ธ ๋ฐ€๋„ ๋งต
target_size: ๋ชฉํ‘œ ํฌ๊ธฐ (H', W')
Returns:
np.ndarray: ํฌ๊ธฐ๊ฐ€ ์กฐ์ •๋œ ๋ฐ€๋„ ๋งต
"""
from PIL import Image
import torch.nn.functional as F
import torch
# numpy๋ฅผ torch๋กœ ๋ณ€ํ™˜
if isinstance(density_map, np.ndarray):
density_map = torch.from_numpy(density_map)
# ๋ฐฐ์น˜ ์ฐจ์› ์ถ”๊ฐ€
if density_map.dim() == 3:
density_map = density_map.unsqueeze(0) # (1, C, H, W)
current_size = density_map.shape[2:]
if current_size[0] == target_size[0] and current_size[1] == target_size[1]:
return density_map.squeeze(0).numpy()
# ์›๋ณธ ๋ฐ€๋„ ๋งต์˜ ์ดํ•ฉ ๊ณ„์‚ฐ
original_sum = density_map.sum()
# ํฌ๊ธฐ ์กฐ์ • (์Œ์„ ํ˜• ๋ณด๊ฐ„)
resized_map = F.interpolate(
density_map,
size=target_size,
mode='bilinear',
align_corners=False
)
# ์ดํ•ฉ ๋ณด์กด์„ ์œ„ํ•œ ์Šค์ผ€์ผ๋ง
if resized_map.sum() > 0: # 0์œผ๋กœ ๋‚˜๋ˆ„๊ธฐ ๋ฐฉ์ง€
resized_map = resized_map * (original_sum / resized_map.sum())
return resized_map.squeeze(0).numpy()
def predict(self, image: Union[str, np.ndarray]) -> float:
"""
์ด๋ฏธ์ง€์—์„œ ๊ตฐ์ค‘ ๊ณ„์ˆ˜ ์˜ˆ์ธก์„ ์ˆ˜ํ–‰ํ•ฉ๋‹ˆ๋‹ค.
Args:
image: ์ž…๋ ฅ ์ด๋ฏธ์ง€ (๊ฒฝ๋กœ, ๋„˜ํŒŒ์ด ๋ฐฐ์—ด, ๋˜๋Š” ์—…๋กœ๋“œ๋œ ํŒŒ์ผ)
Returns:
float: ์˜ˆ์ธก๋œ ์‚ฌ๋žŒ ์ˆ˜
"""
# ์ด๋ฏธ์ง€ ์ „์ฒ˜๋ฆฌ
processed_image = self._process_image(image)
image_height, image_width = processed_image.shape[-2:]
# ์Šฌ๋ผ์ด๋”ฉ ์œˆ๋„์šฐ ์˜ˆ์ธก
pred_density = self.sliding_window_predict(
processed_image,
self.window_size,
self.stride
)
# ์˜ˆ์ธก ๊ฒฐ๊ณผ ์ €์žฅ
pred_count = pred_density.sum()
# ์›๋ณธ ์ด๋ฏธ์ง€ ํฌ๊ธฐ๋กœ ๋ฐ€๋„ ๋งต ์กฐ์ •
resized_pred_density = self.resize_density_map(
pred_density,
(image_height, image_width)
)
# ๊ฒฐ๊ณผ ์ €์žฅ
self.processed_image = self._post_process_image(processed_image)
self.density_map = resized_pred_density.squeeze()
self.count = pred_count
return pred_count
def visualize_density_map(self, alpha: float = 0.5, save: bool = False,
save_path: Optional[str] = None):
"""
ํ˜„์žฌ ์ €์žฅ๋œ ์˜ˆ์ธก ๊ฒฐ๊ณผ๋ฅผ ์‹œ๊ฐํ™”ํ•ฉ๋‹ˆ๋‹ค.
Args:
alpha (float): density map์˜ ํˆฌ๋ช…๋„ (0~1). ๊ธฐ๋ณธ๊ฐ’ 0.5
save (bool): ์‹œ๊ฐํ™” ๊ฒฐ๊ณผ๋ฅผ ์ด๋ฏธ์ง€๋กœ ์ €์žฅํ• ์ง€ ์—ฌ๋ถ€. ๊ธฐ๋ณธ๊ฐ’ False
save_path (str, optional): ์ €์žฅํ•  ๊ฒฝ๋กœ. None์ผ ๊ฒฝ์šฐ ํ˜„์žฌ ๋””๋ ‰ํ† ๋ฆฌ์— ์ž๋™ ์ƒ์„ฑ๋œ ์ด๋ฆ„์œผ๋กœ ์ €์žฅ.
๊ธฐ๋ณธ๊ฐ’ None
Returns:
Tuple[matplotlib.figure.Figure, np.ndarray]:
- density map์ด ์˜ค๋ฒ„๋ ˆ์ด๋œ matplotlib Figure ๊ฐ์ฒด
- RGB ํ˜•์‹์˜ ์‹œ๊ฐํ™”๋œ ์ด๋ฏธ์ง€ ๋ฐฐ์—ด (H, W, 3)
"""
if self.density_map is None or self.processed_image is None:
raise ValueError("๋จผ์ € predict ๋ฉ”์„œ๋“œ๋ฅผ ์‹คํ–‰ํ•˜์—ฌ ์˜ˆ์ธก์„ ์ˆ˜ํ–‰ํ•ด์•ผ ํ•ฉ๋‹ˆ๋‹ค.")
fig, ax = plt.subplots(dpi=200, frameon=False)
ax.imshow(self.processed_image)
ax.imshow(self.density_map, cmap="jet", alpha=alpha)
ax.axis("off")
plt.title(f"Count: {self.count:.1f}")
if save:
if save_path is None:
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
save_path = f"crowd_density_{timestamp}.png"
# ์—ฌ๋ฐฑ ์ œ๊ฑฐํ•˜๊ณ  ์ €์žฅ
plt.savefig(save_path, bbox_inches='tight', pad_inches=0, dpi=200)
print(f"์ด๋ฏธ์ง€ ์ €์žฅ ์™„๋ฃŒ: {save_path}")
fig.canvas.draw()
image_from_plot = np.frombuffer(fig.canvas.buffer_rgba(), dtype=np.uint8)
image_from_plot = image_from_plot.reshape(fig.canvas.get_width_height()[::-1] + (4,))
image_from_plot = image_from_plot[:,:,:3] # RGB๋กœ ๋ณ€ํ™˜
return fig, image_from_plot
def visualize_dots(self, dot_size: int = 20, sigma: float = 1, percentile: float = 97,
save: bool = False, save_path: Optional[str] = None):
"""
์˜ˆ์ธก๋œ ๊ตฐ์ค‘ ์œ„์น˜๋ฅผ ์ ์œผ๋กœ ํ‘œ์‹œํ•˜์—ฌ ์‹œ๊ฐํ™”ํ•ฉ๋‹ˆ๋‹ค.
Args:
dot_size (int): ์ ์˜ ํฌ๊ธฐ. ๊ธฐ๋ณธ๊ฐ’ 20
sigma (float): Gaussian ํ•„ํ„ฐ์˜ sigma ๊ฐ’. ๊ธฐ๋ณธ๊ฐ’ 1
percentile (float): ์ž„๊ณ„๊ฐ’์œผ๋กœ ์‚ฌ์šฉํ•  ๋ฐฑ๋ถ„์œ„์ˆ˜ (0-100). ๊ธฐ๋ณธ๊ฐ’ 97
save (bool): ์‹œ๊ฐํ™” ๊ฒฐ๊ณผ๋ฅผ ์ด๋ฏธ์ง€๋กœ ์ €์žฅํ• ์ง€ ์—ฌ๋ถ€. ๊ธฐ๋ณธ๊ฐ’ False
save_path (str, optional): ์ €์žฅํ•  ๊ฒฝ๋กœ. None์ผ ๊ฒฝ์šฐ ํ˜„์žฌ ๋””๋ ‰ํ† ๋ฆฌ์— ์ž๋™ ์ƒ์„ฑ๋œ ์ด๋ฆ„์œผ๋กœ ์ €์žฅ.
๊ธฐ๋ณธ๊ฐ’ None
Returns:
Tuple[matplotlib.backends.backend_agg.FigureCanvasBase, np.ndarray]:
- matplotlib figure์˜ canvas ๊ฐ์ฒด
- RGB ํ˜•์‹์˜ ์‹œ๊ฐํ™”๋œ ์ด๋ฏธ์ง€ ๋ฐฐ์—ด (H, W, 3)
"""
if self.density_map is None or self.processed_image is None:
raise ValueError("๋จผ์ € predict ๋ฉ”์„œ๋“œ๋ฅผ ์‹คํ–‰ํ•˜์—ฌ ์˜ˆ์ธก์„ ์ˆ˜ํ–‰ํ•ด์•ผ ํ•ฉ๋‹ˆ๋‹ค.")
adjusted_pred_count = int(round(self.count))
fig, ax = plt.subplots(dpi=200, frameon=False)
ax.imshow(self.processed_image)
filtered_density = gaussian_filter(self.density_map, sigma=sigma)
threshold = np.percentile(filtered_density, percentile)
candidate_pixels = np.column_stack(np.where(filtered_density >= threshold))
if len(candidate_pixels) > adjusted_pred_count:
kmeans = KMeans(n_clusters=adjusted_pred_count, random_state=42, n_init=10)
kmeans.fit(candidate_pixels)
head_positions = kmeans.cluster_centers_.astype(int)
else:
head_positions = candidate_pixels
y_coords, x_coords = head_positions[:, 0], head_positions[:, 1]
ax.scatter(x_coords, y_coords,
c='red',
s=dot_size,
alpha=1.0,
edgecolors='white',
linewidth=1)
ax.axis("off")
plt.title(f"Count: {self.count:.1f}")
if save:
if save_path is None:
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
save_path = f"crowd_dots_{timestamp}.png"
plt.savefig(save_path, bbox_inches='tight', pad_inches=0, dpi=200)
print(f"์ด๋ฏธ์ง€ ์ €์žฅ ์™„๋ฃŒ: {save_path}")
# Figure๋ฅผ numpy ๋ฐฐ์—ด๋กœ ๋ณ€ํ™˜
fig.canvas.draw()
image_from_plot = np.frombuffer(fig.canvas.buffer_rgba(), dtype=np.uint8)
image_from_plot = image_from_plot.reshape(fig.canvas.get_width_height()[::-1] + (4,))
image_from_plot = image_from_plot[:,:,:3] # RGB๋กœ ๋ณ€ํ™˜
return fig.canvas, image_from_plot
def crowd_count(self):
"""
๊ฐ€์žฅ ์ตœ๊ทผ ์˜ˆ์ธก์˜ ๊ตฐ์ค‘ ์ˆ˜๋ฅผ ๋ฐ˜ํ™˜ํ•ฉ๋‹ˆ๋‹ค.
Returns:
float: ์˜ˆ์ธก๋œ ๊ตฐ์ค‘ ์ˆ˜
None: ์•„์ง ์˜ˆ์ธก์ด ์ˆ˜ํ–‰๋˜์ง€ ์•Š์€ ๊ฒฝ์šฐ
"""
return self.count
def get_density_map(self):
"""
๊ฐ€์žฅ ์ตœ๊ทผ ์˜ˆ์ธก์˜ ๋ฐ€๋„ ๋งต์„ ๋ฐ˜ํ™˜ํ•ฉ๋‹ˆ๋‹ค.
Returns:
numpy.ndarray: ๋ฐ€๋„ ๋งต
None: ์•„์ง ์˜ˆ์ธก์ด ์ˆ˜ํ–‰๋˜์ง€ ์•Š์€ ๊ฒฝ์šฐ
"""
return self.density_map