Spaces:
Runtime error
Runtime error
import os | |
import sys | |
import torch | |
import numpy as np | |
import tensorrt as trt | |
from typing import Union, Tuple, Optional | |
from PIL import Image | |
import matplotlib.pyplot as plt | |
from torchvision.transforms import ToTensor, Normalize | |
from torchvision.transforms.functional import normalize, to_pil_image | |
import json | |
import datetime | |
from scipy.ndimage import gaussian_filter | |
from sklearn.cluster import KMeans | |
import assets | |
# ํ๋ก์ ํธ ๋ฃจํธ ๋๋ ํ ๋ฆฌ ์ค์ | |
project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) | |
sys.path.append(project_root) | |
class ClipEBCTensorRT: | |
""" | |
CLIP-EBC (Efficient Boundary Counting) TensorRT ๋ฒ์ ์ด๋ฏธ์ง ์ฒ๋ฆฌ ํด๋์ค์ ๋๋ค. | |
TensorRT๋ก ๋ณํ๋ CLIP ๋ชจ๋ธ์ ์ฌ์ฉํ์ฌ ์ด๋ฏธ์ง๋ฅผ ์ฒ๋ฆฌํ๋ฉฐ, ์ฌ๋ผ์ด๋ฉ ์๋์ฐ ์์ธก ๊ธฐ๋ฅ์ ํฌํจํ | |
๋ค์ํ ์ค์ ์ต์ ์ ์ ๊ณตํฉ๋๋ค. | |
""" | |
def __init__(self, | |
engine_path="assets/CLIP_EBC_nwpu_rmse_tensorrt.trt", | |
truncation=4, | |
reduction=8, | |
granularity="fine", | |
anchor_points="average", | |
input_size=224, | |
window_size=224, | |
stride=224, | |
dataset_name="qnrf", | |
mean=(0.485, 0.456, 0.406), | |
std=(0.229, 0.224, 0.225), | |
config_dir ="configs"): | |
"""CLIPEBC TensorRT ํด๋์ค๋ฅผ ์ค์ ๋งค๊ฐ๋ณ์์ ํจ๊ป ์ด๊ธฐํํฉ๋๋ค.""" | |
self.engine_path = engine_path | |
self.truncation = truncation | |
self.reduction = reduction | |
self.granularity = granularity | |
self.anchor_points_type = anchor_points | |
self.input_size = input_size | |
self.window_size = window_size | |
self.stride = stride | |
self.dataset_name = dataset_name | |
self.mean = mean | |
self.std = std | |
self.config_dir = config_dir | |
# ๊ฒฐ๊ณผ ์ ์ฅ์ฉ ๋ณ์ ์ด๊ธฐํ | |
self.density_map = None | |
self.processed_image = None | |
self.count = None | |
self.original_image = None | |
# TensorRT ์์ง ๋ก๋ | |
print(f"TensorRT ์์ง ๋ก๋ ์ค: {self.engine_path}") | |
self._load_engine() | |
# ์ ๋ ฅ ๋ฐ ์ถ๋ ฅ ์ด๋ฆ ์ค์ | |
self.input_name = "input" | |
self.output_name = "output" | |
print(f"TensorRT ์์ง ์ด๊ธฐํ ์๋ฃ") | |
def _load_engine(self): | |
"""TensorRT ์์ง์ ๋ก๋ํฉ๋๋ค.""" | |
# TensorRT ๋ก๊ฑฐ ์์ฑ | |
TRT_LOGGER = trt.Logger(trt.Logger.WARNING) | |
# ๋ฐํ์ ์์ฑ | |
self.runtime = trt.Runtime(TRT_LOGGER) | |
# ์์ง ํ์ผ ๋ก๋ | |
with open(self.engine_path, 'rb') as f: | |
engine_data = f.read() | |
# ์ง๋ ฌํ๋ ์์ง์์ ์์ง ์์ฑ | |
self.engine = self.runtime.deserialize_cuda_engine(engine_data) | |
# ์คํ ์ปจํ ์คํธ ์์ฑ | |
self.context = self.engine.create_execution_context() | |
# TensorRT 10.x์์๋ input_binding/output_binding ๋์ ๋คํธ์ํฌ ๊ตฌ์กฐ๋ฅผ ํ์ธ | |
# ์ ๋ ฅ๊ณผ ์ถ๋ ฅ์ ๊ฐ์ ธ์ค๋ ๋ฐฉ๋ฒ์ด ๋ณ๊ฒฝ๋จ | |
self.num_io_tensors = self.engine.num_io_tensors | |
# ์ ๋ ฅ๊ณผ ์ถ๋ ฅ ํ ์ ์ด๋ฆ ์ฐพ๊ธฐ | |
self.input_tensor_names = [] | |
self.output_tensor_names = [] | |
print(f"TensorRT ์์ง์์ {self.num_io_tensors}๊ฐ์ IO ํ ์๋ฅผ ์ฐพ์์ต๋๋ค") | |
for i in range(self.num_io_tensors): | |
name = self.engine.get_tensor_name(i) | |
is_input = self.engine.get_tensor_mode(name) == trt.TensorIOMode.INPUT | |
if is_input: | |
self.input_tensor_names.append(name) | |
else: | |
self.output_tensor_names.append(name) | |
# ์ ๋ ฅ๊ณผ ์ถ๋ ฅ ์ด๋ฆ ์ค์ | |
if not self.input_tensor_names: | |
raise ValueError("์์ง์์ ์ ๋ ฅ ํ ์๋ฅผ ์ฐพ์ ์ ์์ต๋๋ค.") | |
if not self.output_tensor_names: | |
raise ValueError("์์ง์์ ์ถ๋ ฅ ํ ์๋ฅผ ์ฐพ์ ์ ์์ต๋๋ค.") | |
# ๊ธฐ๋ณธ ์ ๋ ฅ ๋ฐ ์ถ๋ ฅ ์ด๋ฆ ์ค์ | |
self.input_name = self.input_tensor_names[0] | |
self.output_name = self.output_tensor_names[0] | |
# ์ ์ถ๋ ฅ ํํ ์ถ์ถ | |
self.input_shape = self.engine.get_tensor_shape(self.input_name) | |
self.output_shape = self.engine.get_tensor_shape(self.output_name) | |
print(f"์ ๋ ฅ ์ด๋ฆ: {self.input_name}, ํํ: {self.input_shape}") | |
print(f"์ถ๋ ฅ ์ด๋ฆ: {self.output_name}, ํํ: {self.output_shape}") | |
def _process_image(self, image: Union[str, np.ndarray]) -> np.ndarray: | |
""" | |
์ด๋ฏธ์ง๋ฅผ ์ ์ฒ๋ฆฌํฉ๋๋ค. ์ด๋ฏธ์ง ๊ฒฝ๋ก, ๋ํ์ด ๋ฐฐ์ด, Streamlit UploadedFile ๋ชจ๋ ์ฒ๋ฆฌ ๊ฐ๋ฅํฉ๋๋ค. | |
Args: | |
image: ์ ๋ ฅ ์ด๋ฏธ์ง. ๋ค์ ํ์ ์ค ํ๋์ฌ์ผ ํฉ๋๋ค: | |
- str: ์ด๋ฏธ์ง ํ์ผ ๊ฒฝ๋ก | |
- np.ndarray: (H, W, 3) ํํ์ RGB ์ด๋ฏธ์ง | |
- UploadedFile: Streamlit์ ์ ๋ก๋๋ ํ์ผ | |
Returns: | |
np.ndarray: ์ ์ฒ๋ฆฌ๋ ์ด๋ฏธ์ง ๋ฐฐ์ด, shape (1, 3, H, W) | |
""" | |
to_tensor = ToTensor() | |
normalize = Normalize(mean=self.mean, std=self.std) | |
# ์๋ณธ ์ด๋ฏธ์ง ์ ์ฅ | |
self.original_image = image | |
# ์ ๋ ฅ ํ์ ์ ๋ฐ๋ฅธ ์ฒ๋ฆฌ | |
if isinstance(image, str): | |
# ํ์ผ ๊ฒฝ๋ก์ธ ๊ฒฝ์ฐ | |
with open(image, "rb") as f: | |
pil_image = Image.open(f).convert("RGB") | |
elif isinstance(image, np.ndarray): | |
# ๋ํ์ด ๋ฐฐ์ด์ธ ๊ฒฝ์ฐ | |
if image.dtype == np.uint8: | |
pil_image = Image.fromarray(image) | |
else: | |
# float ํ์ ์ธ ๊ฒฝ์ฐ [0, 1] ๋ฒ์๋ก ๊ฐ์ ํ๊ณ ๋ณํ | |
pil_image = Image.fromarray((image * 255).astype(np.uint8)) | |
else: | |
# Streamlit UploadedFile ๋๋ ๊ธฐํ ํ์ผ ๊ฐ์ฒด์ธ ๊ฒฝ์ฐ | |
try: | |
pil_image = Image.open(image).convert("RGB") | |
except Exception as e: | |
raise ValueError(f"์ง์ํ์ง ์๋ ์ด๋ฏธ์ง ํ์์ ๋๋ค: {type(image)}") from e | |
# ํ ์ ๋ณํ ๋ฐ ์ ๊ทํ | |
tensor_image = to_tensor(pil_image) | |
normalized_image = normalize(tensor_image) | |
batched_image = normalized_image.unsqueeze(0) # (1, 3, H, W) | |
# numpy๋ก ๋ณํ | |
numpy_image = batched_image.numpy() | |
return numpy_image | |
def _post_process_image(self, image_tensor): | |
"""์ด๋ฏธ์ง ํ ์๋ฅผ PIL ์ด๋ฏธ์ง๋ก ๋ณํํฉ๋๋ค.""" | |
# NumPy ๋ฐฐ์ด์ PyTorch ํ ์๋ก ๋ณํ | |
if isinstance(image_tensor, np.ndarray): | |
image_tensor = torch.from_numpy(image_tensor) | |
# ์ ๊ทํ ์ญ๋ณํ | |
image = normalize( | |
image_tensor, | |
mean=[0., 0., 0.], | |
std=[1./self.std[0], 1./self.std[1], 1./self.std[2]] | |
) | |
image = normalize( | |
image, | |
mean=[-self.mean[0], -self.mean[1], -self.mean[2]], | |
std=[1., 1., 1.] | |
) | |
# ๋ฐฐ์น ์ฐจ์ ์ ๊ฑฐ ๋ฐ PIL ์ด๋ฏธ์ง๋ก ๋ณํ | |
processed_image = to_pil_image(image.squeeze(0)) | |
return processed_image | |
def _infer_batch(self, batch_input): | |
""" | |
TensorRT ์์ง์ ์ฌ์ฉํ์ฌ ๋ฐฐ์น ์ถ๋ก ์ ์ํํฉ๋๋ค. (์์ ๋ฒ์ ) | |
""" | |
import pycuda.driver as cuda | |
import pycuda.autoinit | |
import numpy as np | |
batch_size = batch_input.shape[0] | |
# ์ ๋ ฅ์ ํํ์ ๋ฐ์ดํฐ ํ์ ํ์ธ | |
input_shape = (batch_size, 3, self.input_size, self.input_size) | |
print(f"์ ๋ ฅ ๋ฐฐ์น ํํ: {batch_input.shape}, ๋ฐ์ดํฐ ํ์ : {batch_input.dtype}") | |
# ์ ๋ ฅ ํํ ๊ฒ์ฆ | |
if batch_input.shape != input_shape: | |
print(f"๊ฒฝ๊ณ : ์ ๋ ฅ ํํ ๋ถ์ผ์น. ์์: {input_shape}, ์ค์ : {batch_input.shape}") | |
# ํ์์ ํํ ์์ | |
batch_input = np.resize(batch_input, input_shape) | |
# ๋ฐ์ดํฐ ํ์ ๊ฒ์ฆ | |
if batch_input.dtype != np.float32: | |
print(f"๊ฒฝ๊ณ : ์ ๋ ฅ ๋ฐ์ดํฐ ํ์ ๋ถ์ผ์น. float32๋ก ๋ณํํฉ๋๋ค.") | |
batch_input = batch_input.astype(np.float32) | |
# ๋์ ๋ฐฐ์น ํฌ๊ธฐ ์ค์ | |
self.context.set_input_shape(self.input_name, input_shape) | |
# ์ถ๋ ฅ ํํ ๊ฐ์ ธ์ค๊ธฐ | |
output_shape = self.context.get_tensor_shape(self.output_name) | |
output_shape = tuple(output_shape) # ํํ๋ก ๋ณํํ์ฌ ์์ ์ฑ ๋ณด์ฅ | |
print(f"์ถ๋ ฅ ํํ: {output_shape}") | |
# -1 ๊ฐ์ ์ค์ ๋ฐฐ์น ํฌ๊ธฐ๋ก ๋์ฒด | |
if output_shape[0] == -1: | |
output_shape = (batch_size,) + output_shape[1:] | |
# ์ถ๋ ฅ ๋ฒํผ ์ค๋น | |
output = np.empty(output_shape, dtype=np.float32) | |
# ํธ์คํธ ๋ฉ๋ชจ๋ฆฌ ์ค๋น (ํ์ด์ง ์ ๊ธ ๋ฉ๋ชจ๋ฆฌ ์ฌ์ฉ) | |
h_input = cuda.pagelocked_empty(batch_input.shape, dtype=np.float32) | |
h_output = cuda.pagelocked_empty(output_shape, dtype=np.float32) | |
# ์ ๋ ฅ ๋ฐ์ดํฐ ๋ณต์ฌ | |
np.copyto(h_input, batch_input) | |
# ๋๋ฐ์ด์ค ๋ฉ๋ชจ๋ฆฌ ํ ๋น | |
d_input = cuda.mem_alloc(h_input.nbytes) | |
d_output = cuda.mem_alloc(h_output.nbytes) | |
# CUDA ์คํธ๋ฆผ ์์ฑ | |
stream = cuda.Stream() | |
try: | |
# ๋ฉ๋ชจ๋ฆฌ ๋ณต์ฌ (ํธ์คํธ -> ๋๋ฐ์ด์ค) | |
cuda.memcpy_htod_async(d_input, h_input, stream) | |
# ํ ์ ์ฃผ์ ์ค์ | |
self.context.set_tensor_address(self.input_name, int(d_input)) | |
self.context.set_tensor_address(self.output_name, int(d_output)) | |
# ๋๋ฒ๊น ์ ๋ณด (๋ฉ๋ชจ๋ฆฌ ์ฃผ์) | |
print(f"์ ๋ ฅ ๋ฉ๋ชจ๋ฆฌ ์ฃผ์: {int(d_input)}, ์ถ๋ ฅ ๋ฉ๋ชจ๋ฆฌ ์ฃผ์: {int(d_output)}") | |
# ์คํ | |
success = self.context.execute_async_v3(stream_handle=stream.handle) | |
if not success: | |
print("TensorRT ์คํ ์คํจ") | |
return None | |
# ๋ฉ๋ชจ๋ฆฌ ๋ณต์ฌ (๋๋ฐ์ด์ค -> ํธ์คํธ) | |
cuda.memcpy_dtoh_async(h_output, d_output, stream) | |
# ์คํธ๋ฆผ ๋๊ธฐํ | |
stream.synchronize() | |
# ์ถ๋ ฅ ๋ฐ์ดํฐ ๋ณต์ฌ | |
np.copyto(output, h_output) | |
return output | |
except Exception as e: | |
print(f"TensorRT ์ถ๋ก ์ค ์ค๋ฅ ๋ฐ์: {str(e)}") | |
import traceback | |
traceback.print_exc() | |
return None | |
finally: | |
# ๋ฉ๋ชจ๋ฆฌ ํด์ | |
del stream | |
if 'd_input' in locals(): | |
d_input.free() | |
if 'd_output' in locals(): | |
d_output.free() | |
def sliding_window_predict(self, image: np.ndarray, window_size: Union[int, Tuple[int, int]], | |
stride: Union[int, Tuple[int, int]]) -> np.ndarray: | |
""" | |
์ฌ๋ผ์ด๋ฉ ์๋์ฐ ๋ฐฉ์์ผ๋ก ์ด๋ฏธ์ง ์์ธก์ ์ํํฉ๋๋ค. ๊ฒน์น๋ ์์ญ์ ํ๊ท ๊ฐ์ ์ฌ์ฉํฉ๋๋ค. | |
Args: | |
image (np.ndarray): ํํ๊ฐ (1, 3, H, W)์ธ ์ด๋ฏธ์ง ๋ฐฐ์ด | |
window_size (int or tuple): ์๋์ฐ ํฌ๊ธฐ | |
stride (int or tuple): ์๋์ฐ ์ด๋ ๊ฐ๊ฒฉ | |
Returns: | |
np.ndarray: ์์ธก๋ ๋ฐ๋ ๋งต | |
""" | |
# CUDA ์ด๊ธฐํ (์ฒ์ ์ฌ์ฉํ ๋๋ง) | |
global cuda | |
if 'cuda' not in globals(): | |
import pycuda.driver as cuda | |
cuda.init() | |
# ์ ๋ ฅ ๊ฒ์ฆ | |
assert len(image.shape) == 4, f"์ด๋ฏธ์ง๋ 4์ฐจ์ ๋ฐฐ์ด์ด์ด์ผ ํฉ๋๋ค. (1, C, H, W), ํ์ฌ: {image.shape}" | |
# ์๋์ฐ ํฌ๊ธฐ์ ์คํธ๋ผ์ด๋ ์ค์ | |
window_size = (int(window_size), int(window_size)) if isinstance(window_size, (int, float)) else window_size | |
stride = (int(stride), int(stride)) if isinstance(stride, (int, float)) else stride | |
window_size = tuple(window_size) | |
stride = tuple(stride) | |
# ๊ฒ์ฆ | |
assert isinstance(window_size, tuple) and len(window_size) == 2 and window_size[0] > 0 and window_size[1] > 0, \ | |
f"์๋์ฐ ํฌ๊ธฐ๋ ์์ ์ ์ ํํ (h, w)์ด์ด์ผ ํฉ๋๋ค. ํ์ฌ: {window_size}" | |
assert isinstance(stride, tuple) and len(stride) == 2 and stride[0] > 0 and stride[1] > 0, \ | |
f"์คํธ๋ผ์ด๋๋ ์์ ์ ์ ํํ (h, w)์ด์ด์ผ ํฉ๋๋ค. ํ์ฌ: {stride}" | |
assert stride[0] <= window_size[0] and stride[1] <= window_size[1], \ | |
f"์คํธ๋ผ์ด๋๋ ์๋์ฐ ํฌ๊ธฐ๋ณด๋ค ์์์ผ ํฉ๋๋ค. ํ์ฌ: {stride}์ {window_size}" | |
image_height, image_width = image.shape[-2:] | |
window_height, window_width = window_size | |
stride_height, stride_width = stride | |
# ์ฌ๋ผ์ด๋ฉ ์๋์ฐ ์ ๊ณ์ฐ | |
num_rows = int(np.ceil((image_height - window_height) / stride_height) + 1) | |
num_cols = int(np.ceil((image_width - window_width) / stride_width) + 1) | |
# ์๋์ฐ ์ถ์ถ | |
windows = [] | |
window_positions = [] | |
for i in range(num_rows): | |
for j in range(num_cols): | |
x_start, y_start = i * stride_height, j * stride_width | |
x_end, y_end = x_start + window_height, y_start + window_width | |
# ์ด๋ฏธ์ง ๊ฒฝ๊ณ ์ฒ๋ฆฌ | |
if x_end > image_height: | |
x_start, x_end = image_height - window_height, image_height | |
if y_end > image_width: | |
y_start, y_end = image_width - window_width, image_width | |
window = image[:, :, x_start:x_end, y_start:y_end] | |
windows.append(window) | |
window_positions.append((x_start, y_start, x_end, y_end)) | |
# ๋ฐฐ์น ๋จ์๋ก ์ถ๋ก | |
all_preds = [] | |
max_batch_size = 8 | |
for start_idx in range(0, len(windows), max_batch_size): | |
end_idx = min(start_idx + max_batch_size, len(windows)) | |
batch_windows = np.vstack(windows[start_idx:end_idx]) # (batch_size, 3, h, w) | |
# TensorRT ์ถ๋ก | |
batch_preds = self._infer_batch(batch_windows) | |
# Debug ์ ๋ณด | |
# print(f"๋ฐฐ์น ์ ๋ ฅ ํํ: {batch_windows.shape}, ๋ฐฐ์น ์ถ๋ ฅ ํํ: {batch_preds.shape}") | |
all_preds.extend([batch_preds[i:i+1] for i in range(batch_preds.shape[0])]) | |
# ์์ธก ๊ฒฐ๊ณผ๋ฅผ numpy ๋ฐฐ์ด๋ก ๋ณํ | |
preds = np.concatenate(all_preds, axis=0) | |
# ์ถ๋ ฅ ๋ฐ๋ ๋งต ์กฐ๋ฆฝ | |
pred_map = np.zeros((preds.shape[1], image_height // self.reduction, image_width // self.reduction), dtype=np.float32) | |
count_map = np.zeros((preds.shape[1], image_height // self.reduction, image_width // self.reduction), dtype=np.float32) | |
idx = 0 | |
for i in range(num_rows): | |
for j in range(num_cols): | |
x_start, y_start, x_end, y_end = window_positions[idx] | |
# ์ถ๋ ฅ ์์ญ ๊ณ์ฐ (reduction ๊ณ ๋ ค) | |
x_start_out = x_start // self.reduction | |
y_start_out = y_start // self.reduction | |
x_end_out = x_end // self.reduction | |
y_end_out = y_end // self.reduction | |
pred_map[:, x_start_out:x_end_out, y_start_out:y_end_out] += preds[idx] | |
count_map[:, x_start_out:x_end_out, y_start_out:y_end_out] += 1. | |
idx += 1 | |
# ๊ฒน์น๋ ์์ญ ํ๊ท ๊ณ์ฐ | |
pred_map /= count_map | |
return pred_map | |
def resize_density_map(self, density_map: np.ndarray, target_size: Tuple[int, int]) -> np.ndarray: | |
""" | |
๋ฐ๋ ๋งต์ ํฌ๊ธฐ๋ฅผ ์กฐ์ ํฉ๋๋ค. ์ดํฉ์ ๋ณด์กด๋ฉ๋๋ค. | |
Args: | |
density_map: ํํ๊ฐ (C, H, W)์ธ ๋ฐ๋ ๋งต | |
target_size: ๋ชฉํ ํฌ๊ธฐ (H', W') | |
Returns: | |
np.ndarray: ํฌ๊ธฐ๊ฐ ์กฐ์ ๋ ๋ฐ๋ ๋งต | |
""" | |
from PIL import Image | |
import torch.nn.functional as F | |
import torch | |
# numpy๋ฅผ torch๋ก ๋ณํ | |
if isinstance(density_map, np.ndarray): | |
density_map = torch.from_numpy(density_map) | |
# ๋ฐฐ์น ์ฐจ์ ์ถ๊ฐ | |
if density_map.dim() == 3: | |
density_map = density_map.unsqueeze(0) # (1, C, H, W) | |
current_size = density_map.shape[2:] | |
if current_size[0] == target_size[0] and current_size[1] == target_size[1]: | |
return density_map.squeeze(0).numpy() | |
# ์๋ณธ ๋ฐ๋ ๋งต์ ์ดํฉ ๊ณ์ฐ | |
original_sum = density_map.sum() | |
# ํฌ๊ธฐ ์กฐ์ (์์ ํ ๋ณด๊ฐ) | |
resized_map = F.interpolate( | |
density_map, | |
size=target_size, | |
mode='bilinear', | |
align_corners=False | |
) | |
# ์ดํฉ ๋ณด์กด์ ์ํ ์ค์ผ์ผ๋ง | |
if resized_map.sum() > 0: # 0์ผ๋ก ๋๋๊ธฐ ๋ฐฉ์ง | |
resized_map = resized_map * (original_sum / resized_map.sum()) | |
return resized_map.squeeze(0).numpy() | |
def predict(self, image: Union[str, np.ndarray]) -> float: | |
""" | |
์ด๋ฏธ์ง์์ ๊ตฐ์ค ๊ณ์ ์์ธก์ ์ํํฉ๋๋ค. | |
Args: | |
image: ์ ๋ ฅ ์ด๋ฏธ์ง (๊ฒฝ๋ก, ๋ํ์ด ๋ฐฐ์ด, ๋๋ ์ ๋ก๋๋ ํ์ผ) | |
Returns: | |
float: ์์ธก๋ ์ฌ๋ ์ | |
""" | |
# ์ด๋ฏธ์ง ์ ์ฒ๋ฆฌ | |
processed_image = self._process_image(image) | |
image_height, image_width = processed_image.shape[-2:] | |
# ์ฌ๋ผ์ด๋ฉ ์๋์ฐ ์์ธก | |
pred_density = self.sliding_window_predict( | |
processed_image, | |
self.window_size, | |
self.stride | |
) | |
# ์์ธก ๊ฒฐ๊ณผ ์ ์ฅ | |
pred_count = pred_density.sum() | |
# ์๋ณธ ์ด๋ฏธ์ง ํฌ๊ธฐ๋ก ๋ฐ๋ ๋งต ์กฐ์ | |
resized_pred_density = self.resize_density_map( | |
pred_density, | |
(image_height, image_width) | |
) | |
# ๊ฒฐ๊ณผ ์ ์ฅ | |
self.processed_image = self._post_process_image(processed_image) | |
self.density_map = resized_pred_density.squeeze() | |
self.count = pred_count | |
return pred_count | |
def visualize_density_map(self, alpha: float = 0.5, save: bool = False, | |
save_path: Optional[str] = None): | |
""" | |
ํ์ฌ ์ ์ฅ๋ ์์ธก ๊ฒฐ๊ณผ๋ฅผ ์๊ฐํํฉ๋๋ค. | |
Args: | |
alpha (float): density map์ ํฌ๋ช ๋ (0~1). ๊ธฐ๋ณธ๊ฐ 0.5 | |
save (bool): ์๊ฐํ ๊ฒฐ๊ณผ๋ฅผ ์ด๋ฏธ์ง๋ก ์ ์ฅํ ์ง ์ฌ๋ถ. ๊ธฐ๋ณธ๊ฐ False | |
save_path (str, optional): ์ ์ฅํ ๊ฒฝ๋ก. None์ผ ๊ฒฝ์ฐ ํ์ฌ ๋๋ ํ ๋ฆฌ์ ์๋ ์์ฑ๋ ์ด๋ฆ์ผ๋ก ์ ์ฅ. | |
๊ธฐ๋ณธ๊ฐ None | |
Returns: | |
Tuple[matplotlib.figure.Figure, np.ndarray]: | |
- density map์ด ์ค๋ฒ๋ ์ด๋ matplotlib Figure ๊ฐ์ฒด | |
- RGB ํ์์ ์๊ฐํ๋ ์ด๋ฏธ์ง ๋ฐฐ์ด (H, W, 3) | |
""" | |
if self.density_map is None or self.processed_image is None: | |
raise ValueError("๋จผ์ predict ๋ฉ์๋๋ฅผ ์คํํ์ฌ ์์ธก์ ์ํํด์ผ ํฉ๋๋ค.") | |
fig, ax = plt.subplots(dpi=200, frameon=False) | |
ax.imshow(self.processed_image) | |
ax.imshow(self.density_map, cmap="jet", alpha=alpha) | |
ax.axis("off") | |
plt.title(f"Count: {self.count:.1f}") | |
if save: | |
if save_path is None: | |
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") | |
save_path = f"crowd_density_{timestamp}.png" | |
# ์ฌ๋ฐฑ ์ ๊ฑฐํ๊ณ ์ ์ฅ | |
plt.savefig(save_path, bbox_inches='tight', pad_inches=0, dpi=200) | |
print(f"์ด๋ฏธ์ง ์ ์ฅ ์๋ฃ: {save_path}") | |
fig.canvas.draw() | |
image_from_plot = np.frombuffer(fig.canvas.buffer_rgba(), dtype=np.uint8) | |
image_from_plot = image_from_plot.reshape(fig.canvas.get_width_height()[::-1] + (4,)) | |
image_from_plot = image_from_plot[:,:,:3] # RGB๋ก ๋ณํ | |
return fig, image_from_plot | |
def visualize_dots(self, dot_size: int = 20, sigma: float = 1, percentile: float = 97, | |
save: bool = False, save_path: Optional[str] = None): | |
""" | |
์์ธก๋ ๊ตฐ์ค ์์น๋ฅผ ์ ์ผ๋ก ํ์ํ์ฌ ์๊ฐํํฉ๋๋ค. | |
Args: | |
dot_size (int): ์ ์ ํฌ๊ธฐ. ๊ธฐ๋ณธ๊ฐ 20 | |
sigma (float): Gaussian ํํฐ์ sigma ๊ฐ. ๊ธฐ๋ณธ๊ฐ 1 | |
percentile (float): ์๊ณ๊ฐ์ผ๋ก ์ฌ์ฉํ ๋ฐฑ๋ถ์์ (0-100). ๊ธฐ๋ณธ๊ฐ 97 | |
save (bool): ์๊ฐํ ๊ฒฐ๊ณผ๋ฅผ ์ด๋ฏธ์ง๋ก ์ ์ฅํ ์ง ์ฌ๋ถ. ๊ธฐ๋ณธ๊ฐ False | |
save_path (str, optional): ์ ์ฅํ ๊ฒฝ๋ก. None์ผ ๊ฒฝ์ฐ ํ์ฌ ๋๋ ํ ๋ฆฌ์ ์๋ ์์ฑ๋ ์ด๋ฆ์ผ๋ก ์ ์ฅ. | |
๊ธฐ๋ณธ๊ฐ None | |
Returns: | |
Tuple[matplotlib.backends.backend_agg.FigureCanvasBase, np.ndarray]: | |
- matplotlib figure์ canvas ๊ฐ์ฒด | |
- RGB ํ์์ ์๊ฐํ๋ ์ด๋ฏธ์ง ๋ฐฐ์ด (H, W, 3) | |
""" | |
if self.density_map is None or self.processed_image is None: | |
raise ValueError("๋จผ์ predict ๋ฉ์๋๋ฅผ ์คํํ์ฌ ์์ธก์ ์ํํด์ผ ํฉ๋๋ค.") | |
adjusted_pred_count = int(round(self.count)) | |
fig, ax = plt.subplots(dpi=200, frameon=False) | |
ax.imshow(self.processed_image) | |
filtered_density = gaussian_filter(self.density_map, sigma=sigma) | |
threshold = np.percentile(filtered_density, percentile) | |
candidate_pixels = np.column_stack(np.where(filtered_density >= threshold)) | |
if len(candidate_pixels) > adjusted_pred_count: | |
kmeans = KMeans(n_clusters=adjusted_pred_count, random_state=42, n_init=10) | |
kmeans.fit(candidate_pixels) | |
head_positions = kmeans.cluster_centers_.astype(int) | |
else: | |
head_positions = candidate_pixels | |
y_coords, x_coords = head_positions[:, 0], head_positions[:, 1] | |
ax.scatter(x_coords, y_coords, | |
c='red', | |
s=dot_size, | |
alpha=1.0, | |
edgecolors='white', | |
linewidth=1) | |
ax.axis("off") | |
plt.title(f"Count: {self.count:.1f}") | |
if save: | |
if save_path is None: | |
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") | |
save_path = f"crowd_dots_{timestamp}.png" | |
plt.savefig(save_path, bbox_inches='tight', pad_inches=0, dpi=200) | |
print(f"์ด๋ฏธ์ง ์ ์ฅ ์๋ฃ: {save_path}") | |
# Figure๋ฅผ numpy ๋ฐฐ์ด๋ก ๋ณํ | |
fig.canvas.draw() | |
image_from_plot = np.frombuffer(fig.canvas.buffer_rgba(), dtype=np.uint8) | |
image_from_plot = image_from_plot.reshape(fig.canvas.get_width_height()[::-1] + (4,)) | |
image_from_plot = image_from_plot[:,:,:3] # RGB๋ก ๋ณํ | |
return fig.canvas, image_from_plot | |
def crowd_count(self): | |
""" | |
๊ฐ์ฅ ์ต๊ทผ ์์ธก์ ๊ตฐ์ค ์๋ฅผ ๋ฐํํฉ๋๋ค. | |
Returns: | |
float: ์์ธก๋ ๊ตฐ์ค ์ | |
None: ์์ง ์์ธก์ด ์ํ๋์ง ์์ ๊ฒฝ์ฐ | |
""" | |
return self.count | |
def get_density_map(self): | |
""" | |
๊ฐ์ฅ ์ต๊ทผ ์์ธก์ ๋ฐ๋ ๋งต์ ๋ฐํํฉ๋๋ค. | |
Returns: | |
numpy.ndarray: ๋ฐ๋ ๋งต | |
None: ์์ง ์์ธก์ด ์ํ๋์ง ์์ ๊ฒฝ์ฐ | |
""" | |
return self.density_map |