Spaces:
Build error
Build error
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. | |
# SPDX-License-Identifier: Apache-2.0 | |
# | |
# Licensed under the Apache License, Version 2.0 (the "License"); | |
# you may not use this file except in compliance with the License. | |
# You may obtain a copy of the License at | |
# | |
# http://www.apache.org/licenses/LICENSE-2.0 | |
# | |
# Unless required by applicable law or agreed to in writing, software | |
# distributed under the License is distributed on an "AS IS" BASIS, | |
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
# See the License for the specific language governing permissions and | |
# limitations under the License. | |
from contextlib import asynccontextmanager | |
from dataclasses import dataclass | |
import logging | |
import os | |
import pickle | |
import traceback | |
from fastapi import FastAPI | |
from fastapi.requests import Request | |
from fastapi.responses import Response | |
import imageio.v3 as imageio | |
import numpy as np | |
from api_types import CompressedSeedingRequest | |
from server_base import InferenceModel | |
from server_cosmos import CosmosModel | |
# ------------------------------ | |
class ServerSettings(): | |
""" | |
Note: we use a dataclass + env variables because we can't | |
easily pass command line arguments through the `fastapi` launcher. | |
""" | |
model: str = os.environ.get("GEN3C_MODEL", "cosmos-predict1") | |
checkpoint_path: str | None = os.environ.get("GEN3C_CKPT_PATH") | |
data_path: str | None = os.environ.get("GEN3C_DATA_PATH") | |
#: Additional latency to add to any inference request, in milliseconds. | |
inference_latency: int = int(os.environ.get("GEN3C_INFERENCE_LATENCY", 0)) | |
#: Number of inference results to keep in cache. | |
#: This may be useful when multiple requests are in flight and the user hasn't | |
#: retrieved the results yet. | |
inference_cache_size: int = int(os.environ.get("GEN3C_INFERENCE_CACHE_SIZE", 15)) | |
#: Number of GPUs to use for inference. Leave at 0 to automatically select | |
#: based on available hardware. | |
gpu_count: int = int(os.environ.get("GEN3C_GPU_COUNT", 0)) | |
settings = ServerSettings() | |
model: InferenceModel | None = None | |
async def lifespan(app: FastAPI): | |
global model | |
model_name = settings.model.lower() | |
if model_name in ("cosmos", "cosmos-predict1"): | |
cls = CosmosModel | |
else: | |
raise ValueError(f"Unsupported model type: '{settings.model}'") | |
model = cls(checkpoint_path=settings.checkpoint_path, | |
data_path=settings.data_path, | |
fake_delay_ms=settings.inference_latency, | |
inference_cache_size=settings.inference_cache_size, | |
gpu_count=settings.gpu_count) | |
# --- Startup code | |
# Pre-render at least one image to make sure everything is running | |
if not model.requires_seeding(): | |
await model.make_test_image() | |
yield | |
# --- Shutdown code | |
model.cleanup() | |
del model | |
app = FastAPI(lifespan=lifespan) | |
logger = logging.getLogger('uvicorn.error') | |
# ------------------------------ | |
def get_bool_query_param(request: Request, name: str, default: bool) -> bool: | |
b_str = request.query_params.get(name, "1" if default else "0") | |
return b_str.lower() in ("1", "true", "yes", "") | |
async def request_inference(request: Request): | |
""" | |
Start a new asynchronous inference job. | |
""" | |
sync = get_bool_query_param(request, "sync", default=False) | |
req: bytes = await request.body() | |
req = pickle.loads(req) | |
try: | |
if sync: | |
result = await model.request_inference_sync(req) | |
return Response(content=pickle.dumps(result), | |
media_type="application/octet-stream") | |
else: | |
model.request_inference(req) | |
except Exception as e: | |
logging.error("Inference request failed with exception:" | |
f"\n{e}\n{traceback.format_exc()}") | |
return Response(str(e), status_code=400) | |
return Response("Request accepted.", status_code=202) | |
async def seed_model(request: Request): | |
""" | |
Start a new asynchronous inference job. | |
""" | |
sync = get_bool_query_param(request, "sync", default=False) | |
req: bytes = await request.body() | |
req = pickle.loads(req) | |
if isinstance(req, CompressedSeedingRequest): | |
req.decompress() | |
try: | |
# There isn't really anything async about the seeding request being done on the server | |
# so far, so we just await. This could be changed in the future. | |
result = await model.seed_model(req) | |
except Exception as e: | |
logging.error(f"Seeding request failed with exception:" | |
f"\n{e}\n{traceback.format_exc()}") | |
return Response(str(e), status_code=400) | |
# return Response("Seeding request accepted.", status_code=(200 if sync else 202)) | |
return Response(content=pickle.dumps(result), | |
media_type="application/octet-stream") | |
async def inference_results_or_none(request_id: str): | |
try: | |
result = model.inference_result_or_none(request_id) | |
except Exception as e: | |
# TODO: try to differentiate the status codes (doesn't exist, inference failed, etc) | |
logging.error(f"Inference results request failed with exception:" | |
f"\n{e}\n{traceback.format_exc()}") | |
return Response(str(e), status_code=500) | |
if result is None: | |
return Response(content="Result not ready", | |
status_code=503) | |
else: | |
return Response(content=pickle.dumps(result), | |
media_type="application/octet-stream") | |
def latest_rgb(format: str = "jpg"): | |
# We return the data as pickled bytes to avoid the JSON serialization / deserialization overhead. | |
image = model.get_latest_rgb() | |
if image is None: | |
return Response(content="No image available yet.", status_code=404) | |
if format == "pickle": | |
content = pickle.dumps( | |
{ | |
"image": image, | |
} | |
) | |
return Response(content=content, media_type="application/octet-stream") | |
elif format in ("jpg", "png"): | |
image = image.copy() | |
# Allow alpha channel to be omitted for faster transfers | |
if image.shape[-1] == 3: | |
image = np.concatenate([ | |
image, | |
np.ones((*image.shape[:2], 1)) | |
], axis=-1) | |
if image.dtype != np.uint8: | |
# TODO: proper handling of gamma compression, etc | |
image[:, :, :3] = np.power(image[:, :, :3], 1 / 2.2) * 255 | |
image[:, :, 3] = image[:, :, 3] * 255 | |
if format != "png": | |
image = image[:, :, :3] | |
content = imageio.imwrite(uri="<bytes>", image=image.astype(np.uint8), extension="." + format) | |
return Response(content=content, media_type=f"image/{format}") | |
else: | |
return Response(f"Unsupported image format: {format}", status_code=400) | |
def metadata(): | |
return model.metadata() | |