import ctypes from collections import OrderedDict from typing import Type from cuda import cuda, cudart, nvrtc import numpy as np import torch import ctypes import os import torch try: import tensorrt as trt except ImportError: import tensorrt_libs trt_libs_path = tensorrt_libs.__path__[0] ctypes.CDLL(os.path.join(trt_libs_path, "libnvinfer.so.8")) ctypes.CDLL(os.path.join(trt_libs_path, "libnvinfer_plugin.so.8")) ctypes.CDLL(os.path.join(trt_libs_path, "libnvonnxparser.so.8")) ctypes.CDLL(os.path.join(trt_libs_path, "libnvparsers.so.8")) ctypes.CDLL(os.path.join(trt_libs_path, "libnvinfer_builder_resource.so.8.6.1")) import tensorrt as trt logger = trt.Logger(trt.Logger.ERROR) trt.init_libnvinfer_plugins(logger, "") def _cudaGetErrorEnum(error): if isinstance(error, cuda.CUresult): err, name = cuda.cuGetErrorName(error) return name if err == cuda.CUresult.CUDA_SUCCESS else "" elif isinstance(error, cudart.cudaError_t): return cudart.cudaGetErrorName(error)[1] elif isinstance(error, nvrtc.nvrtcResult): return nvrtc.nvrtcGetErrorString(error)[1] else: raise RuntimeError("Unknown error type: {}".format(error)) def checkCudaErrors(result): if result[0].value: raise RuntimeError( "CUDA error code={}({})".format( result[0].value, _cudaGetErrorEnum(result[0]) ) ) if len(result) == 1: return None elif len(result) == 2: return result[1] else: return result[1:] class MyOutputAllocator(trt.IOutputAllocator): def __init__(self) -> None: super().__init__() # members for outside use self.shape = None self.n_bytes = 0 self.address = 0 def reallocate_output(self, tensor_name, old_address, size, alignment) -> int: return self.reallocate_common(tensor_name, old_address, size, alignment) def reallocate_output_async( self, tensor_name, old_address, size, alignment, stream ) -> int: return self.reallocate_common(tensor_name, old_address, size, alignment, stream) def notify_shape(self, tensor_name, shape): self.shape = shape return def reallocate_common( self, tensor_name, old_address, size, alignment, stream=-1 ): # not necessary API if size <= self.n_bytes: return old_address if old_address != 0: checkCudaErrors(cudart.cudaFree(old_address)) if stream == -1: address = checkCudaErrors(cudart.cudaMalloc(size)) else: address = checkCudaErrors(cudart.cudaMallocAsync(size, stream)) self.n_bytes = size self.address = address return address class TRTWrapper: def __init__( self, trt_file: str, plugin_file_list: list = [], ) -> None: # Load custom plugins for plugin_file in plugin_file_list: ctypes.cdll.LoadLibrary(plugin_file) # Load engine bytes from file self.model = trt_file with open(trt_file, "rb") as f, trt.Runtime(logger) as runtime: assert runtime self.engine = runtime.deserialize_cuda_engine(f.read()) assert self.engine self.buffer = OrderedDict() self.output_allocator_map = OrderedDict() self.context = self.engine.create_execution_context() return def setup(self, input_data: dict = {}) -> None: for name, value in self.buffer.items(): _, device_buffer, _ = value if ( device_buffer is not None and device_buffer != 0 and name not in self.output_allocator_map ): checkCudaErrors(cudart.cudaFree(device_buffer)) self.buffer[name][1] = None self.buffer[name][2] = 0 self.tensor_name_list = [ self.engine.get_tensor_name(i) for i in range(self.engine.num_io_tensors) ] self.n_input = sum( [ self.engine.get_tensor_mode(name) == trt.TensorIOMode.INPUT for name in self.tensor_name_list ] ) self.n_output = self.engine.num_io_tensors - self.n_input for name, data in input_data.items(): if self.engine.get_tensor_location(name) == trt.TensorLocation.DEVICE: self.context.set_input_shape(name, data.shape) else: self.context.set_tensor_address(name, data.ctypes.data) # Prepare work before inference for name in self.tensor_name_list: data_type = self.engine.get_tensor_dtype(name) runtime_shape = self.context.get_tensor_shape(name) if name not in self.output_allocator_map: if -1 in runtime_shape: # for Data-Dependent-Shape (DDS) output, "else" branch for normal output n_byte = 0 # self.context.get_max_output_size(name) self.output_allocator_map[name] = MyOutputAllocator() self.context.set_output_allocator( name, self.output_allocator_map[name] ) host_buffer = np.empty(0, dtype=trt.nptype(data_type)) device_buffer = None else: n_byte = trt.volume(runtime_shape) * data_type.itemsize host_buffer = np.empty(runtime_shape, dtype=trt.nptype(data_type)) if ( self.engine.get_tensor_location(name) == trt.TensorLocation.DEVICE ): device_buffer = checkCudaErrors(cudart.cudaMalloc(n_byte)) else: device_buffer = None self.buffer[name] = [host_buffer, device_buffer, n_byte] else: # for DDS output, don't need to reallocate pass for name, data in input_data.items(): self.buffer[name][0] = np.ascontiguousarray(data) for name in self.tensor_name_list: if self.engine.get_tensor_location(name) == trt.TensorLocation.DEVICE: if self.buffer[name][1] is not None: self.context.set_tensor_address(name, self.buffer[name][1]) elif self.engine.get_tensor_mode(name) == trt.TensorIOMode.OUTPUT: self.context.set_tensor_address(name, self.buffer[name][0].ctypes.data) return def infer(self, stream=0) -> None: # Do inference and print output for name in self.tensor_name_list: if ( self.engine.get_tensor_mode(name) == trt.TensorIOMode.INPUT and self.engine.get_tensor_location(name) == trt.TensorLocation.DEVICE ): cudart.cudaMemcpy( self.buffer[name][1], self.buffer[name][0].ctypes.data, self.buffer[name][2], cudart.cudaMemcpyKind.cudaMemcpyHostToDevice, ) self.context.execute_async_v3(stream) for name in self.output_allocator_map: myOutputAllocator = self.context.get_output_allocator(name) runtime_shape = myOutputAllocator.shape data_type = self.engine.get_tensor_dtype(name) host_buffer = np.empty(runtime_shape, dtype=trt.nptype(data_type)) device_buffer = myOutputAllocator.address n_bytes = trt.volume(runtime_shape) * data_type.itemsize self.buffer[name] = [host_buffer, device_buffer, n_bytes] for name in self.tensor_name_list: if ( self.engine.get_tensor_mode(name) == trt.TensorIOMode.OUTPUT and self.engine.get_tensor_location(name) == trt.TensorLocation.DEVICE ): cudart.cudaMemcpy( self.buffer[name][0].ctypes.data, self.buffer[name][1], self.buffer[name][2], cudart.cudaMemcpyKind.cudaMemcpyDeviceToHost, ) return def infer_async(self, stream=0) -> None: # Do inference and print output for name in self.tensor_name_list: if ( self.engine.get_tensor_mode(name) == trt.TensorIOMode.INPUT and self.engine.get_tensor_location(name) == trt.TensorLocation.DEVICE ): cudart.cudaMemcpyAsync( self.buffer[name][1], self.buffer[name][0].ctypes.data, self.buffer[name][2], cudart.cudaMemcpyKind.cudaMemcpyHostToDevice, stream=stream, ) self.context.execute_async_v3(stream) for name in self.output_allocator_map: myOutputAllocator = self.context.get_output_allocator(name) runtime_shape = myOutputAllocator.shape data_type = self.engine.get_tensor_dtype(name) host_buffer = np.empty(runtime_shape, dtype=trt.nptype(data_type)) device_buffer = myOutputAllocator.address n_bytes = trt.volume(runtime_shape) * data_type.itemsize self.buffer[name] = [host_buffer, device_buffer, n_bytes] for name in self.tensor_name_list: if ( self.engine.get_tensor_mode(name) == trt.TensorIOMode.OUTPUT and self.engine.get_tensor_location(name) == trt.TensorLocation.DEVICE ): cudart.cudaMemcpyAsync( self.buffer[name][0].ctypes.data, self.buffer[name][1], self.buffer[name][2], cudart.cudaMemcpyKind.cudaMemcpyDeviceToHost, stream=stream, ) return def __del__(self): if hasattr(self, "buffer") and self.buffer is not None: for _, device_buffer, _ in self.buffer.values(): if ( device_buffer is not None and device_buffer != 0 and cudart is not None ): try: checkCudaErrors(cudart.cudaFree(device_buffer)) except TypeError: pass return