gen3c / gui /api /httpx_utils.py
elungky's picture
Initial commit for new Space - pre-built Docker image
28451f7
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import asyncio
from io import BytesIO
import pickle
import time
from typing import Callable
import httpx
from tqdm import tqdm
def content_with_progress(content, chunk_size=1024, desc="Upload",
progress_callback: Callable[[str, float, tqdm], None] | None = None):
total = len(content)
with tqdm(total=total, unit_scale=True, unit_divisor=1024, unit="B", desc=desc) as progress:
for i in range(0, total, chunk_size):
chunk = content[i:i + chunk_size]
yield chunk
report_progress("upload", len(chunk), progress, callback=progress_callback)
async def async_content_with_progress(*args, **kwargs):
for chunk in content_with_progress(*args, **kwargs):
yield chunk
def streaming_response_to_response(response: httpx.Response, content_bytes: BytesIO) -> httpx.Response:
"""
Convert a streaming response to a non-streaming response.
"""
# TODO: is there a nicer way to get a non-streaming-style Response object, despite
# having used the streaming API above? (for uniform consumption by the caller).
to_remove = set(["is_stream_consumed", "next_request", "is_closed", "content", "stream"] + [
k for k in response.__dict__ if k.startswith("_")
])
kwargs = { k: v for k, v in response.__dict__.items() if k not in to_remove }
content_bytes.seek(0)
kwargs["content"] = content_bytes.read()
return httpx.Response(**kwargs)
def report_progress(direction: str, progress_absolute: int | float,
bar: tqdm, callback: Callable[[str, float, tqdm], None] | None = None):
bar.update(progress_absolute)
if callback is not None:
progress_percent = bar.n / bar.total
callback(direction=direction, progress=progress_percent, bar=bar)
def httpx_request(method: str,
*args,
progress: bool = False,
progress_direction: str = "auto",
desc: str | None = None,
async_client: httpx.AsyncClient | None = None,
callback: Callable | None = None,
progress_callback: Callable[[str, float, tqdm], None] | None = None,
**kwargs) -> httpx.Response | asyncio.Task[httpx.Response]:
is_async = async_client is not None
progress_download = progress and (
progress_direction in ("both", "download")
or (progress_direction == "auto" and method.lower() == "get")
)
progress_upload = progress and (
progress_direction in ("both", "upload")
or (progress_direction == "auto" and method.lower() == "post")
)
if progress_upload:
for key in ("content", "data"):
if key in kwargs:
upload_desc = f"{desc} (upload)" if desc else "Upload"
wrapper = async_content_with_progress if is_async else content_with_progress
kwargs[key] = wrapper(kwargs[key], desc=upload_desc, progress_callback=progress_callback)
if progress_download:
# Progress bar requested for download, need to use streaming API
if async_client is None:
content_bytes = BytesIO()
with httpx.stream(method, *args, **kwargs) as response:
total = int(response.headers["Content-Length"])
with tqdm(total=total, unit_scale=True, unit_divisor=1024, unit="B", desc=desc) as progress:
num_bytes_downloaded = response.num_bytes_downloaded
for chunk in response.iter_bytes():
report_progress("download", response.num_bytes_downloaded - num_bytes_downloaded,
progress, callback=progress_callback)
num_bytes_downloaded = response.num_bytes_downloaded
content_bytes.write(chunk)
response = streaming_response_to_response(response, content_bytes)
if callback is not None:
callback(response)
return response
else:
async def inner():
content_bytes = BytesIO()
async with async_client.stream(method, *args, **kwargs) as response:
total = int(response.headers["Content-Length"])
with tqdm(total=total, unit_scale=True, unit_divisor=1024, unit="B", desc=desc) as progress:
num_bytes_downloaded = response.num_bytes_downloaded
async for chunk in response.aiter_bytes():
report_progress("download", response.num_bytes_downloaded - num_bytes_downloaded,
progress, callback=progress_callback)
num_bytes_downloaded = response.num_bytes_downloaded
content_bytes.write(chunk)
response = streaming_response_to_response(response, content_bytes)
return response
task = asyncio.create_task(inner())
if callback is not None:
task.add_done_callback(callback)
return task
else:
# No download progress bar needed, use standard httpx methods
if is_async:
task = asyncio.create_task(
async_client.request(method, *args, **kwargs)
)
if callback is not None:
task.add_done_callback(callback)
return task
else:
res = httpx.request(method, *args, **kwargs)
if callback is not None:
callback(res)
return res
def benchmark_requests(host, port, n=100):
url = f"http://{host}:{port}/image"
t0 = time.time()
for i in range(n):
res = httpx.get(url)
loaded = pickle.loads(res.content)
assert "image" in loaded
elapsed = time.time() - t0
print(f"Took {elapsed} s = {1000 * elapsed / n} ms/it")