Spaces:
Sleeping
Sleeping
from fastapi import APIRouter, File, UploadFile, Form | |
from typing import Optional | |
from PIL import Image | |
import urllib.request | |
from io import BytesIO | |
import utils | |
import os | |
import json | |
from config import Settings | |
from routers.donut_inference import process_document_donut | |
router = APIRouter() | |
def count_values(obj): | |
if isinstance(obj, dict): | |
return sum(count_values(v) for v in obj.values()) | |
elif isinstance(obj, list): | |
return sum(count_values(i) for i in obj) | |
else: | |
return 1 | |
async def run_inference( | |
file: Optional[UploadFile] = File(None), | |
image_url: Optional[str] = Form(None), | |
model_in_use: str = Form('donut'), | |
shipper_id: str = Form(...) | |
): | |
# Dynamically load config based on shipper ID | |
settings = Settings(shipper_id=shipper_id) | |
result = [] | |
processing_time = 0 | |
if file: | |
if file.content_type not in ["image/jpeg", "image/jpg"]: | |
return {"error": "Invalid file type. Only JPG images are allowed."} | |
image = Image.open(BytesIO(await file.read())) | |
if model_in_use == 'donut': | |
result, processing_time = process_document_donut(image, settings) | |
utils.log_stats(settings.inference_stats_file, [processing_time, count_values(result), file.filename, settings.model]) | |
elif image_url: | |
with urllib.request.urlopen(image_url) as url: | |
image = Image.open(BytesIO(url.read())) | |
if model_in_use == 'donut': | |
result, processing_time = process_document_donut(image, settings) | |
file_name = image_url.split("/")[-1] | |
utils.log_stats(settings.inference_stats_file, [processing_time, count_values(result), file_name, settings.model]) | |
else: | |
result = {"info": "No input provided"} | |
return { | |
"shipper_id": shipper_id, | |
"model": settings.model, | |
"processor": settings.processor, | |
"result": result | |
} | |
async def get_statistics(): | |
file_path = settings.inference_stats_file | |
# Check if the file exists, and read its content | |
if os.path.exists(file_path): | |
with open(file_path, 'r') as file: | |
try: | |
content = json.load(file) | |
except json.JSONDecodeError: | |
content = [] | |
else: | |
content = [] | |
return content | |