|  | import math | 
					
						
						|  | from time import perf_counter | 
					
						
						|  | from typing import List, Optional, Tuple, Union | 
					
						
						|  |  | 
					
						
						|  | import cv2 | 
					
						
						|  | import mediapipe as mp | 
					
						
						|  | import numpy as np | 
					
						
						|  | import onnxruntime | 
					
						
						|  | import torch | 
					
						
						|  | import torch.nn as nn | 
					
						
						|  | import torchvision | 
					
						
						|  | from mediapipe.tasks.python.components.containers.bounding_box import BoundingBox | 
					
						
						|  | from mediapipe.tasks.python.components.containers.category import Category | 
					
						
						|  | from mediapipe.tasks.python.components.containers.detections import Detection | 
					
						
						|  | from torchvision import transforms | 
					
						
						|  |  | 
					
						
						|  | from inference.core.entities.requests.gaze import GazeDetectionInferenceRequest | 
					
						
						|  | from inference.core.entities.responses.gaze import ( | 
					
						
						|  | GazeDetectionInferenceResponse, | 
					
						
						|  | GazeDetectionPrediction, | 
					
						
						|  | ) | 
					
						
						|  | from inference.core.entities.responses.inference import FaceDetectionPrediction, Point | 
					
						
						|  | from inference.core.env import ( | 
					
						
						|  | GAZE_MAX_BATCH_SIZE, | 
					
						
						|  | MODEL_CACHE_DIR, | 
					
						
						|  | REQUIRED_ONNX_PROVIDERS, | 
					
						
						|  | TENSORRT_CACHE_PATH, | 
					
						
						|  | ) | 
					
						
						|  | from inference.core.exceptions import OnnxProviderNotAvailable | 
					
						
						|  | from inference.core.models.roboflow import OnnxRoboflowCoreModel | 
					
						
						|  | from inference.core.utils.image_utils import load_image_rgb | 
					
						
						|  | from inference.models.gaze.l2cs import L2CS | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | class Gaze(OnnxRoboflowCoreModel): | 
					
						
						|  | """Roboflow ONNX Gaze model. | 
					
						
						|  |  | 
					
						
						|  | This class is responsible for handling the ONNX Gaze model, including | 
					
						
						|  | loading the model, preprocessing the input, and performing inference. | 
					
						
						|  |  | 
					
						
						|  | Attributes: | 
					
						
						|  | gaze_onnx_session (onnxruntime.InferenceSession): ONNX Runtime session for gaze detection inference. | 
					
						
						|  | """ | 
					
						
						|  |  | 
					
						
						|  | def __init__(self, *args, **kwargs): | 
					
						
						|  | """Initializes the Gaze with the given arguments and keyword arguments.""" | 
					
						
						|  |  | 
					
						
						|  | t1 = perf_counter() | 
					
						
						|  | super().__init__(*args, **kwargs) | 
					
						
						|  |  | 
					
						
						|  | self.log("Creating inference sessions") | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | self.gaze_onnx_session = onnxruntime.InferenceSession( | 
					
						
						|  | self.cache_file("L2CSNet_gaze360_resnet50_90bins.onnx"), | 
					
						
						|  | providers=[ | 
					
						
						|  | ( | 
					
						
						|  | "TensorrtExecutionProvider", | 
					
						
						|  | { | 
					
						
						|  | "trt_engine_cache_enable": True, | 
					
						
						|  | "trt_engine_cache_path": TENSORRT_CACHE_PATH, | 
					
						
						|  | }, | 
					
						
						|  | ), | 
					
						
						|  | "CUDAExecutionProvider", | 
					
						
						|  | "CPUExecutionProvider", | 
					
						
						|  | ], | 
					
						
						|  | ) | 
					
						
						|  |  | 
					
						
						|  | if REQUIRED_ONNX_PROVIDERS: | 
					
						
						|  | available_providers = onnxruntime.get_available_providers() | 
					
						
						|  | for provider in REQUIRED_ONNX_PROVIDERS: | 
					
						
						|  | if provider not in available_providers: | 
					
						
						|  | raise OnnxProviderNotAvailable( | 
					
						
						|  | f"Required ONNX Execution Provider {provider} is not availble. Check that you are using the correct docker image on a supported device." | 
					
						
						|  | ) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | self.face_detector = mp.tasks.vision.FaceDetector.create_from_options( | 
					
						
						|  | mp.tasks.vision.FaceDetectorOptions( | 
					
						
						|  | base_options=mp.tasks.BaseOptions( | 
					
						
						|  | model_asset_path=self.cache_file("mediapipe_face_detector.tflite") | 
					
						
						|  | ), | 
					
						
						|  | running_mode=mp.tasks.vision.RunningMode.IMAGE, | 
					
						
						|  | ) | 
					
						
						|  | ) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | self._gaze_transformations = transforms.Compose( | 
					
						
						|  | [ | 
					
						
						|  | transforms.ToTensor(), | 
					
						
						|  | transforms.Resize(448), | 
					
						
						|  | transforms.Normalize( | 
					
						
						|  | mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225] | 
					
						
						|  | ), | 
					
						
						|  | ] | 
					
						
						|  | ) | 
					
						
						|  | self.task_type = "gaze-detection" | 
					
						
						|  | self.log(f"GAZE model loaded in {perf_counter() - t1:.2f} seconds") | 
					
						
						|  |  | 
					
						
						|  | def _crop_face_img(self, np_img: np.ndarray, face: Detection) -> np.ndarray: | 
					
						
						|  | """Extract facial area in an image. | 
					
						
						|  |  | 
					
						
						|  | Args: | 
					
						
						|  | np_img (np.ndarray): The numpy image. | 
					
						
						|  | face (mediapipe.tasks.python.components.containers.detections.Detection): The detected face. | 
					
						
						|  |  | 
					
						
						|  | Returns: | 
					
						
						|  | np.ndarray: Cropped face image. | 
					
						
						|  | """ | 
					
						
						|  |  | 
					
						
						|  | bbox = face.bounding_box | 
					
						
						|  | x_min = bbox.origin_x | 
					
						
						|  | y_min = bbox.origin_y | 
					
						
						|  | x_max = bbox.origin_x + bbox.width | 
					
						
						|  | y_max = bbox.origin_y + bbox.height | 
					
						
						|  | face_img = np_img[y_min:y_max, x_min:x_max, :] | 
					
						
						|  | face_img = cv2.resize(face_img, (224, 224)) | 
					
						
						|  | return face_img | 
					
						
						|  |  | 
					
						
						|  | def _detect_gaze(self, np_imgs: List[np.ndarray]) -> List[Tuple[float, float]]: | 
					
						
						|  | """Detect faces and gazes in an image. | 
					
						
						|  |  | 
					
						
						|  | Args: | 
					
						
						|  | pil_imgs (List[np.ndarray]): The numpy image list, each image is a cropped facial image. | 
					
						
						|  |  | 
					
						
						|  | Returns: | 
					
						
						|  | List[Tuple[float, float]]: Yaw (radian) and Pitch (radian). | 
					
						
						|  | """ | 
					
						
						|  | ret = [] | 
					
						
						|  | for i in range(0, len(np_imgs), GAZE_MAX_BATCH_SIZE): | 
					
						
						|  | img_batch = [] | 
					
						
						|  | for j in range(i, min(len(np_imgs), i + GAZE_MAX_BATCH_SIZE)): | 
					
						
						|  | img = self._gaze_transformations(np_imgs[j]) | 
					
						
						|  | img = np.expand_dims(img, axis=0).astype(np.float32) | 
					
						
						|  | img_batch.append(img) | 
					
						
						|  |  | 
					
						
						|  | img_batch = np.concatenate(img_batch, axis=0) | 
					
						
						|  | onnx_input_image = {self.gaze_onnx_session.get_inputs()[0].name: img_batch} | 
					
						
						|  | yaw, pitch = self.gaze_onnx_session.run(None, onnx_input_image) | 
					
						
						|  |  | 
					
						
						|  | for j in range(len(img_batch)): | 
					
						
						|  | ret.append((yaw[j], pitch[j])) | 
					
						
						|  |  | 
					
						
						|  | return ret | 
					
						
						|  |  | 
					
						
						|  | def _make_response( | 
					
						
						|  | self, | 
					
						
						|  | faces: List[Detection], | 
					
						
						|  | gazes: List[Tuple[float, float]], | 
					
						
						|  | imgW: int, | 
					
						
						|  | imgH: int, | 
					
						
						|  | time_total: float, | 
					
						
						|  | time_face_det: float = None, | 
					
						
						|  | time_gaze_det: float = None, | 
					
						
						|  | ) -> GazeDetectionInferenceResponse: | 
					
						
						|  | """Prepare response object from detected faces and corresponding gazes. | 
					
						
						|  |  | 
					
						
						|  | Args: | 
					
						
						|  | faces (List[Detection]): The detected faces. | 
					
						
						|  | gazes (List[tuple(float, float)]): The detected gazes (yaw, pitch). | 
					
						
						|  | imgW (int): The width (px) of original image. | 
					
						
						|  | imgH (int): The height (px) of original image. | 
					
						
						|  | time_total (float): The processing time. | 
					
						
						|  | time_face_det (float): The processing time. | 
					
						
						|  | time_gaze_det (float): The processing time. | 
					
						
						|  |  | 
					
						
						|  | Returns: | 
					
						
						|  | GazeDetectionInferenceResponse: The response object including the detected faces and gazes info. | 
					
						
						|  | """ | 
					
						
						|  | predictions = [] | 
					
						
						|  | for face, gaze in zip(faces, gazes): | 
					
						
						|  | landmarks = [] | 
					
						
						|  | for keypoint in face.keypoints: | 
					
						
						|  | x = min(max(int(keypoint.x * imgW), 0), imgW - 1) | 
					
						
						|  | y = min(max(int(keypoint.y * imgH), 0), imgH - 1) | 
					
						
						|  | landmarks.append(Point(x=x, y=y)) | 
					
						
						|  |  | 
					
						
						|  | bbox = face.bounding_box | 
					
						
						|  | x_center = bbox.origin_x + bbox.width / 2 | 
					
						
						|  | y_center = bbox.origin_y + bbox.height / 2 | 
					
						
						|  | score = face.categories[0].score | 
					
						
						|  |  | 
					
						
						|  | prediction = GazeDetectionPrediction( | 
					
						
						|  | face=FaceDetectionPrediction( | 
					
						
						|  | x=x_center, | 
					
						
						|  | y=y_center, | 
					
						
						|  | width=bbox.width, | 
					
						
						|  | height=bbox.height, | 
					
						
						|  | confidence=score, | 
					
						
						|  | class_name="face", | 
					
						
						|  | landmarks=landmarks, | 
					
						
						|  | ), | 
					
						
						|  | yaw=gaze[0], | 
					
						
						|  | pitch=gaze[1], | 
					
						
						|  | ) | 
					
						
						|  | predictions.append(prediction) | 
					
						
						|  |  | 
					
						
						|  | response = GazeDetectionInferenceResponse( | 
					
						
						|  | predictions=predictions, | 
					
						
						|  | time=time_total, | 
					
						
						|  | time_face_det=time_face_det, | 
					
						
						|  | time_gaze_det=time_gaze_det, | 
					
						
						|  | ) | 
					
						
						|  | return response | 
					
						
						|  |  | 
					
						
						|  | def get_infer_bucket_file_list(self) -> List[str]: | 
					
						
						|  | """Gets the list of files required for inference. | 
					
						
						|  |  | 
					
						
						|  | Returns: | 
					
						
						|  | List[str]: The list of file names. | 
					
						
						|  | """ | 
					
						
						|  | return [ | 
					
						
						|  | "mediapipe_face_detector.tflite", | 
					
						
						|  | "L2CSNet_gaze360_resnet50_90bins.onnx", | 
					
						
						|  | ] | 
					
						
						|  |  | 
					
						
						|  | def infer_from_request( | 
					
						
						|  | self, request: GazeDetectionInferenceRequest | 
					
						
						|  | ) -> List[GazeDetectionInferenceResponse]: | 
					
						
						|  | """Detect faces and gazes in image(s). | 
					
						
						|  |  | 
					
						
						|  | Args: | 
					
						
						|  | request (GazeDetectionInferenceRequest): The request object containing the image. | 
					
						
						|  |  | 
					
						
						|  | Returns: | 
					
						
						|  | List[GazeDetectionInferenceResponse]: The list of response objects containing the faces and corresponding gazes. | 
					
						
						|  | """ | 
					
						
						|  | if isinstance(request.image, list): | 
					
						
						|  | if len(request.image) > GAZE_MAX_BATCH_SIZE: | 
					
						
						|  | raise ValueError( | 
					
						
						|  | f"The maximum number of images that can be inferred with gaze detection at one time is {GAZE_MAX_BATCH_SIZE}" | 
					
						
						|  | ) | 
					
						
						|  | imgs = request.image | 
					
						
						|  | else: | 
					
						
						|  | imgs = [request.image] | 
					
						
						|  |  | 
					
						
						|  | time_total = perf_counter() | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | num_img = len(imgs) | 
					
						
						|  | np_imgs = [load_image_rgb(img) for img in imgs] | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | time_face_det = perf_counter() | 
					
						
						|  | faces = [] | 
					
						
						|  | for np_img in np_imgs: | 
					
						
						|  | if request.do_run_face_detection: | 
					
						
						|  | mp_img = mp.Image( | 
					
						
						|  | image_format=mp.ImageFormat.SRGB, data=np_img.astype(np.uint8) | 
					
						
						|  | ) | 
					
						
						|  | faces_per_img = self.face_detector.detect(mp_img).detections | 
					
						
						|  | else: | 
					
						
						|  | faces_per_img = [ | 
					
						
						|  | Detection( | 
					
						
						|  | bounding_box=BoundingBox( | 
					
						
						|  | origin_x=0, | 
					
						
						|  | origin_y=0, | 
					
						
						|  | width=np_img.shape[1], | 
					
						
						|  | height=np_img.shape[0], | 
					
						
						|  | ), | 
					
						
						|  | categories=[Category(score=1.0, category_name="face")], | 
					
						
						|  | keypoints=[], | 
					
						
						|  | ) | 
					
						
						|  | ] | 
					
						
						|  | faces.append(faces_per_img) | 
					
						
						|  | time_face_det = (perf_counter() - time_face_det) / num_img | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | time_gaze_det = perf_counter() | 
					
						
						|  | face_imgs = [] | 
					
						
						|  | for i, np_img in enumerate(np_imgs): | 
					
						
						|  | if request.do_run_face_detection: | 
					
						
						|  | face_imgs.extend( | 
					
						
						|  | [self._crop_face_img(np_img, face) for face in faces[i]] | 
					
						
						|  | ) | 
					
						
						|  | else: | 
					
						
						|  | face_imgs.append(cv2.resize(np_img, (224, 224))) | 
					
						
						|  | gazes = self._detect_gaze(face_imgs) | 
					
						
						|  | time_gaze_det = (perf_counter() - time_gaze_det) / num_img | 
					
						
						|  |  | 
					
						
						|  | time_total = (perf_counter() - time_total) / num_img | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | response = [] | 
					
						
						|  | idx_gaze = 0 | 
					
						
						|  | for i in range(len(np_imgs)): | 
					
						
						|  | imgH, imgW, _ = np_imgs[i].shape | 
					
						
						|  | faces_per_img = faces[i] | 
					
						
						|  | gazes_per_img = gazes[idx_gaze : idx_gaze + len(faces_per_img)] | 
					
						
						|  | response.append( | 
					
						
						|  | self._make_response( | 
					
						
						|  | faces_per_img, gazes_per_img, imgW, imgH, time_total | 
					
						
						|  | ) | 
					
						
						|  | ) | 
					
						
						|  |  | 
					
						
						|  | return response | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | class L2C2Wrapper(L2CS): | 
					
						
						|  | """Roboflow L2CS Gaze detection model. | 
					
						
						|  |  | 
					
						
						|  | This class is responsible for converting L2CS model to ONNX model. | 
					
						
						|  | It is ONLY intended for internal usage. | 
					
						
						|  |  | 
					
						
						|  | Workflow: | 
					
						
						|  | After training a L2CS model, create an instance of this wrapper class. | 
					
						
						|  | Load the trained weights file, and save it as ONNX model. | 
					
						
						|  | """ | 
					
						
						|  |  | 
					
						
						|  | def __init__(self): | 
					
						
						|  | self.device = torch.device("cpu") | 
					
						
						|  | self.num_bins = 90 | 
					
						
						|  | super().__init__( | 
					
						
						|  | torchvision.models.resnet.Bottleneck, [3, 4, 6, 3], self.num_bins | 
					
						
						|  | ) | 
					
						
						|  | self._gaze_softmax = nn.Softmax(dim=1) | 
					
						
						|  | self._gaze_idx_tensor = torch.FloatTensor([i for i in range(90)]).to( | 
					
						
						|  | self.device | 
					
						
						|  | ) | 
					
						
						|  |  | 
					
						
						|  | def forward(self, x): | 
					
						
						|  | idx_tensor = torch.stack( | 
					
						
						|  | [self._gaze_idx_tensor for i in range(x.shape[0])], dim=0 | 
					
						
						|  | ) | 
					
						
						|  | gaze_yaw, gaze_pitch = super().forward(x) | 
					
						
						|  |  | 
					
						
						|  | yaw_predicted = self._gaze_softmax(gaze_yaw) | 
					
						
						|  | yaw_radian = ( | 
					
						
						|  | (torch.sum(yaw_predicted * idx_tensor, dim=1) * 4 - 180) * np.pi / 180 | 
					
						
						|  | ) | 
					
						
						|  |  | 
					
						
						|  | pitch_predicted = self._gaze_softmax(gaze_pitch) | 
					
						
						|  | pitch_radian = ( | 
					
						
						|  | (torch.sum(pitch_predicted * idx_tensor, dim=1) * 4 - 180) * np.pi / 180 | 
					
						
						|  | ) | 
					
						
						|  |  | 
					
						
						|  | return yaw_radian, pitch_radian | 
					
						
						|  |  | 
					
						
						|  | def load_L2CS_model( | 
					
						
						|  | self, | 
					
						
						|  | file_path=f"{MODEL_CACHE_DIR}/gaze/L2CS/L2CSNet_gaze360_resnet50_90bins.pkl", | 
					
						
						|  | ): | 
					
						
						|  | super().load_state_dict(torch.load(file_path, map_location=self.device)) | 
					
						
						|  | super().to(self.device) | 
					
						
						|  |  | 
					
						
						|  | def saveas_ONNX_model( | 
					
						
						|  | self, | 
					
						
						|  | file_path=f"{MODEL_CACHE_DIR}/gaze/L2CS/L2CSNet_gaze360_resnet50_90bins.onnx", | 
					
						
						|  | ): | 
					
						
						|  | dummy_input = torch.randn(1, 3, 448, 448) | 
					
						
						|  | dynamic_axes = { | 
					
						
						|  | "input": {0: "batch_size"}, | 
					
						
						|  | "output_yaw": {0: "batch_size"}, | 
					
						
						|  | "output_pitch": {0: "batch_size"}, | 
					
						
						|  | } | 
					
						
						|  | torch.onnx.export( | 
					
						
						|  | self, | 
					
						
						|  | dummy_input, | 
					
						
						|  | file_path, | 
					
						
						|  | input_names=["input"], | 
					
						
						|  | output_names=["output_yaw", "output_pitch"], | 
					
						
						|  | dynamic_axes=dynamic_axes, | 
					
						
						|  | verbose=False, | 
					
						
						|  | ) | 
					
						
						|  |  |