import platform from typing import Tuple, List import torch from hfendpoints.http import Context, run from hfendpoints.tasks import Usage from hfendpoints.tasks.embedding import EmbeddingRequest, EmbeddingResponse from intel_extension_for_pytorch.cpu.runtime import pin from loguru import logger from sentence_transformers import SentenceTransformer from torch.backends.mkldnn import VERBOSE_ON_CREATION, VERBOSE_OFF from torch.nn import Module from hfendpoints import EndpointConfig, Handler, __version__ # Not used for now SUPPORTED_AMP_DTYPES = {torch.float32, torch.bfloat16} def has_bf16_support() -> bool: """ Helper to detect if the hardware supports bfloat16 Note: Intel libraries, such as oneDNN, provide emulation for bfloat16 even if the underlying hardware does not support it. This means CPU ISA with AVX512 will work, even if not with the same performances as one could expect from CPU ISA with AVX512_BF16. Also, AMX_BF16 is implicitly assumed true when AVX512_BF16 is true (that's the case on Intel Sapphire Rapids). :return: True if the hardware supports (or can emulate) bfloat16, False otherwise """ return torch.cpu._is_avx512_bf16_supported() or torch.cpu._is_avx512_supported() def get_cores_pinning_strategy() -> "CPUPool": import intel_extension_for_pytorch as ipex # Retrieve the number of nodes num_nodes = ipex.cpu.runtime.runtime_utils.get_num_nodes() cpu_cores_id = [ipex.cpu.runtime.runtime_utils.get_core_list_of_node_id(node_id) for node_id in range(num_nodes)] if num_nodes == 1: pinned_cpu_cores_id = cpu_cores_id[0] else: pinned_cpu_cores_id = [core_id for node in cpu_cores_id for core_id in node] logger.info(f"Pinning CPU cores to {pinned_cpu_cores_id}") return ipex.cpu.runtime.CPUPool(pinned_cpu_cores_id) def get_usage(mask: List[torch.IntTensor]) -> Usage: """ Compute the number of processed tokens and return as Usage object matching OpenAI :param mask: Attention mask tensor, as returned by the model :return: Usage object matching OpenAI specifications """ num_tokens = sum(x.sum().detach().item() for x in mask) return Usage(prompt_tokens=num_tokens, total_tokens=num_tokens) class SentenceTransformerWithUsage(Module): __slots__ = ("_model",) def __init__(self, model: SentenceTransformer): super().__init__() self._model = model def forward(self, sentences: list[str]) -> Tuple[List[List[int]], List[List[int]]]: vectors = self._model.encode(sentences, output_value=None) return ( [vector['attention_mask'] for vector in vectors], [vector['sentence_embedding'].tolist() for vector in vectors] ) class SentenceTransformerHandler(Handler): __slots__ = ("_config", "_dtype", "_model", "_model_name", "_pinned_cores", "_use_amp") def __init__(self, config: EndpointConfig): self._config = config self._dtype = torch.float32 self._model_name = config.model_id self._allocate_model() def _allocate_model(self): # Denormal number is used to store tiny numbers that are close to 0. # Computations with denormal numbers are remarkably slower than normalized number. torch.set_flush_denormal(True) dtype = torch.bfloat16 if has_bf16_support() else torch.float32 model = SentenceTransformer(self._config.model_id, device="cpu", model_kwargs={"torch_dtype": dtype}) if platform.machine() == "x86_64": import intel_extension_for_pytorch as ipex logger.info(f"x64 platform detected: {platform.processor()}") # Retrieve all the physical cores ID for all the CPU nodes self._pinned_cores = get_cores_pinning_strategy() # Optimize the model for inference with torch.inference_mode(): model = model.eval() model = model.to(memory_format=torch.channels_last) # Apply IPEx optimizations model = ipex.optimize(model, dtype=dtype, weights_prepack=True, graph_mode=True, concat_linear=True) model = torch.compile(model, dynamic=True, backend="ipex") # model = ipex.cpu.runtime.MultiStreamModule(SentenceTransformerWithUsage(model), num_streams=1) else: model = torch.compile(model) self._dtype = dtype self._use_amp = dtype in SUPPORTED_AMP_DTYPES self._model = SentenceTransformerWithUsage(model) async def __call__(self, request: EmbeddingRequest, ctx: Context) -> EmbeddingResponse: with torch.backends.mkldnn.verbose(VERBOSE_ON_CREATION if self._config.is_debug else VERBOSE_OFF): with torch.inference_mode(), torch.amp.autocast("cpu", dtype=self._dtype, enabled=self._use_amp): with pin(self._pinned_cores): mask, vectors = self._model(request.input if request.is_batched else [request.input]) # TODO: Change the way we return usage usage = get_usage(mask) vectors = vectors if request.is_batched else vectors[0] return EmbeddingResponse(embeddings=vectors, num_tokens=usage.total_tokens) def entrypoint(): # Readout the endpoint configuration from the provided environment variable config = EndpointConfig.from_env() logger.info(f"[Hugging Face Endpoint v{__version__}] Serving: {config.model_id}") # Allocate handler handler = SentenceTransformerHandler(config) # Allocate endpoint from hfendpoints.openai.embedding import EmbeddingEndpoint endpoint = EmbeddingEndpoint(handler) run(endpoint, config.interface, config.port) if __name__ == "__main__": entrypoint()