Spaces:
Configuration error
Configuration error
import logging | |
import os | |
import tempfile | |
from contextlib import asynccontextmanager | |
from io import BytesIO | |
from pathlib import Path | |
from typing import Annotated, Any, Dict, List, Optional, Union | |
from docling.datamodel.base_models import DocumentStream, InputFormat | |
from docling.document_converter import DocumentConverter | |
from dotenv import load_dotenv | |
from fastapi import BackgroundTasks, FastAPI, UploadFile | |
from fastapi.middleware.cors import CORSMiddleware | |
from fastapi.responses import RedirectResponse | |
from pydantic import BaseModel | |
from docling_serve.docling_conversion import ( | |
ConvertDocumentFileSourcesRequest, | |
ConvertDocumentsOptions, | |
ConvertDocumentsRequest, | |
convert_documents, | |
converters, | |
get_pdf_pipeline_opts, | |
) | |
from docling_serve.helper_functions import FormDepends, _str_to_bool | |
from docling_serve.response_preparation import ConvertDocumentResponse, process_results | |
# Load local env vars if present | |
load_dotenv() | |
WITH_UI = _str_to_bool(os.getenv("WITH_UI", "False")) | |
if WITH_UI: | |
import gradio as gr | |
from docling_serve.gradio_ui import ui as gradio_ui | |
# Set up custom logging as we'll be intermixes with FastAPI/Uvicorn's logging | |
class ColoredLogFormatter(logging.Formatter): | |
COLOR_CODES = { | |
logging.DEBUG: "\033[94m", # Blue | |
logging.INFO: "\033[92m", # Green | |
logging.WARNING: "\033[93m", # Yellow | |
logging.ERROR: "\033[91m", # Red | |
logging.CRITICAL: "\033[95m", # Magenta | |
} | |
RESET_CODE = "\033[0m" | |
def format(self, record): | |
color = self.COLOR_CODES.get(record.levelno, "") | |
record.levelname = f"{color}{record.levelname}{self.RESET_CODE}" | |
return super().format(record) | |
logging.basicConfig( | |
level=logging.INFO, # Set the logging level | |
format="%(levelname)s:\t%(asctime)s - %(name)s - %(message)s", | |
datefmt="%H:%M:%S", | |
) | |
# Override the formatter with the custom ColoredLogFormatter | |
root_logger = logging.getLogger() # Get the root logger | |
for handler in root_logger.handlers: # Iterate through existing handlers | |
if handler.formatter: | |
handler.setFormatter(ColoredLogFormatter(handler.formatter._fmt)) | |
_log = logging.getLogger(__name__) | |
# Context manager to initialize and clean up the lifespan of the FastAPI app | |
async def lifespan(app: FastAPI): | |
# settings = Settings() | |
# Converter with default options | |
pdf_format_option, options_hash = get_pdf_pipeline_opts(ConvertDocumentsOptions()) | |
converters[options_hash] = DocumentConverter( | |
format_options={ | |
InputFormat.PDF: pdf_format_option, | |
InputFormat.IMAGE: pdf_format_option, | |
} | |
) | |
converters[options_hash].initialize_pipeline(InputFormat.PDF) | |
yield | |
converters.clear() | |
if WITH_UI: | |
gradio_ui.close() | |
################################## | |
# App creation and configuration # | |
################################## | |
app = FastAPI( | |
title="Docling Serve", | |
lifespan=lifespan, | |
) | |
origins = ["*"] | |
methods = ["*"] | |
headers = ["*"] | |
app.add_middleware( | |
CORSMiddleware, | |
allow_origins=origins, | |
allow_credentials=True, | |
allow_methods=methods, | |
allow_headers=headers, | |
) | |
# Mount the Gradio app | |
if WITH_UI: | |
tmp_output_dir = Path(tempfile.mkdtemp()) | |
gradio_ui.gradio_output_dir = tmp_output_dir | |
app = gr.mount_gradio_app( | |
app, gradio_ui, path="/ui", allowed_paths=["./logo.png", tmp_output_dir] | |
) | |
############################# | |
# API Endpoints definitions # | |
############################# | |
# Favicon | |
async def favicon(): | |
response = RedirectResponse(url="https://ds4sd.github.io/docling/assets/logo.png") | |
return response | |
# Status | |
class HealthCheckResponse(BaseModel): | |
status: str = "ok" | |
def health() -> HealthCheckResponse: | |
return HealthCheckResponse() | |
# API readiness compatibility for OpenShift AI Workbench | |
def api_check() -> HealthCheckResponse: | |
return HealthCheckResponse() | |
# Convert a document from URL(s) | |
def process_url( | |
background_tasks: BackgroundTasks, conversion_request: ConvertDocumentsRequest | |
): | |
sources: List[Union[str, DocumentStream]] = [] | |
headers: Optional[Dict[str, Any]] = None | |
if isinstance(conversion_request, ConvertDocumentFileSourcesRequest): | |
for file_source in conversion_request.file_sources: | |
sources.append(file_source.to_document_stream()) | |
else: | |
for http_source in conversion_request.http_sources: | |
sources.append(http_source.url) | |
if headers is None and http_source.headers: | |
headers = http_source.headers | |
# Note: results are only an iterator->lazy evaluation | |
results = convert_documents( | |
sources=sources, options=conversion_request.options, headers=headers | |
) | |
# The real processing will happen here | |
response = process_results( | |
background_tasks=background_tasks, | |
conversion_options=conversion_request.options, | |
conv_results=results, | |
) | |
return response | |
# Convert a document from file(s) | |
async def process_file( | |
background_tasks: BackgroundTasks, | |
files: List[UploadFile], | |
options: Annotated[ConvertDocumentsOptions, FormDepends(ConvertDocumentsOptions)], | |
): | |
_log.info(f"Received {len(files)} files for processing.") | |
# Load the uploaded files to Docling DocumentStream | |
file_sources = [] | |
for file in files: | |
buf = BytesIO(file.file.read()) | |
name = file.filename if file.filename else "file.pdf" | |
file_sources.append(DocumentStream(name=name, stream=buf)) | |
results = convert_documents(sources=file_sources, options=options) | |
response = process_results( | |
background_tasks=background_tasks, | |
conversion_options=options, | |
conv_results=results, | |
) | |
return response | |