senga-dnotes / routers /donut_inference.py
serenarolloh's picture
Update routers/donut_inference.py
0ca384b verified
import re
import time
import torch
from transformers import DonutProcessor, VisionEncoderDecoderModel
from config import settings, update_shipper
from functools import lru_cache
import os
import logging
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Model cache dictionary - maps shipper_id to loaded models
model_cache = {}
def load_model(shipper_id: str):
"""
Load a model for a specific shipper_id, with caching
Args:
shipper_id: The shipper ID to load the model for
Returns:
tuple: (processor, model, device) for the specified shipper
"""
# Check if this model is already loaded in cache
if shipper_id in model_cache:
logger.info(f"Using cached model for shipper {shipper_id}")
return model_cache[shipper_id]
# Update settings to use the appropriate model for this shipper
model_name, processor_name = update_shipper(shipper_id)
logger.info(f"Loading model for shipper {shipper_id}: model={model_name}, processor={processor_name}")
try:
# Load the model from HuggingFace
processor = DonutProcessor.from_pretrained(processor_name)
model = VisionEncoderDecoderModel.from_pretrained(model_name)
device = "cuda" if torch.cuda.is_available() else "cpu"
model.to(device)
# Cache the loaded model
model_cache[shipper_id] = (processor, model, device)
return processor, model, device
except Exception as e:
logger.error(f"Error loading model for shipper {shipper_id}: {str(e)}")
# Fall back to default model
if 'default_shipper' in model_cache:
logger.info("Falling back to default model")
return model_cache['default_shipper']
else:
logger.info("Loading default model")
processor = DonutProcessor.from_pretrained(settings.base_processor)
model = VisionEncoderDecoderModel.from_pretrained(settings.base_model)
device = "cuda" if torch.cuda.is_available() else "cpu"
model.to(device)
model_cache['default_shipper'] = (processor, model, device)
return model_cache['default_shipper']
def process_document_donut(image, shipper_id="default_shipper"):
"""
Process a document using the appropriate model for the shipper
Args:
image: The document image to process
shipper_id: Shipper ID to select a specific model
Returns:
tuple: (results, processing_time)
"""
worker_pid = os.getpid()
logger.info(f"Handling inference request with worker PID: {worker_pid}, shipper_id: {shipper_id}")
start_time = time.time()
# Load the model based on shipper_id
processor, model, device = load_model(shipper_id)
# prepare encoder inputs
pixel_values = processor(image, return_tensors="pt").pixel_values
# prepare decoder inputs
task_prompt = "<s_cord-v2>"
decoder_input_ids = processor.tokenizer(
task_prompt,
add_special_tokens=False,
return_tensors="pt"
).input_ids
# generate answer
outputs = model.generate(
pixel_values.to(device),
decoder_input_ids=decoder_input_ids.to(device),
max_length=model.decoder.config.max_position_embeddings,
early_stopping=True,
pad_token_id=processor.tokenizer.pad_token_id,
eos_token_id=processor.tokenizer.eos_token_id,
use_cache=True,
num_beams=1,
bad_words_ids=[[processor.tokenizer.unk_token_id]],
return_dict_in_generate=True,
)
# postprocess
sequence = processor.batch_decode(outputs.sequences)[0]
sequence = sequence.replace(processor.tokenizer.eos_token, "").replace(processor.tokenizer.pad_token, "")
sequence = re.sub(r"<.*?>", "", sequence, count=1).strip() # remove first task start token
end_time = time.time()
processing_time = end_time - start_time
logger.info(f"Inference done in {processing_time:.2f}s, worker PID: {worker_pid}")
return processor.token2json(sequence), processing_time