Spaces:
Configuration error
Configuration error
import logging | |
import os | |
import shutil | |
import tempfile | |
import time | |
from collections.abc import Iterable | |
from pathlib import Path | |
from typing import Union | |
from fastapi import BackgroundTasks, HTTPException | |
from fastapi.responses import FileResponse | |
from docling.datamodel.base_models import OutputFormat | |
from docling.datamodel.document import ConversionResult, ConversionStatus | |
from docling_core.types.doc import ImageRefMode | |
from docling_serve.datamodel.convert import ConvertDocumentsOptions | |
from docling_serve.datamodel.responses import ConvertDocumentResponse, DocumentResponse | |
_log = logging.getLogger(__name__) | |
def _export_document_as_content( | |
conv_res: ConversionResult, | |
export_json: bool, | |
export_html: bool, | |
export_md: bool, | |
export_txt: bool, | |
export_doctags: bool, | |
image_mode: ImageRefMode, | |
): | |
document = DocumentResponse(filename=conv_res.input.file.name) | |
if conv_res.status == ConversionStatus.SUCCESS: | |
new_doc = conv_res.document._make_copy_with_refmode(Path(), image_mode) | |
# Create the different formats | |
if export_json: | |
document.json_content = new_doc | |
if export_html: | |
document.html_content = new_doc.export_to_html(image_mode=image_mode) | |
if export_txt: | |
document.text_content = new_doc.export_to_markdown( | |
strict_text=True, image_mode=image_mode | |
) | |
if export_md: | |
document.md_content = new_doc.export_to_markdown(image_mode=image_mode) | |
if export_doctags: | |
document.doctags_content = new_doc.export_to_document_tokens() | |
elif conv_res.status == ConversionStatus.SKIPPED: | |
raise HTTPException(status_code=400, detail=conv_res.errors) | |
else: | |
raise HTTPException(status_code=500, detail=conv_res.errors) | |
return document | |
def _export_documents_as_files( | |
conv_results: Iterable[ConversionResult], | |
output_dir: Path, | |
export_json: bool, | |
export_html: bool, | |
export_md: bool, | |
export_txt: bool, | |
export_doctags: bool, | |
image_export_mode: ImageRefMode, | |
): | |
success_count = 0 | |
failure_count = 0 | |
for conv_res in conv_results: | |
if conv_res.status == ConversionStatus.SUCCESS: | |
success_count += 1 | |
doc_filename = conv_res.input.file.stem | |
# Export JSON format: | |
if export_json: | |
fname = output_dir / f"{doc_filename}.json" | |
_log.info(f"writing JSON output to {fname}") | |
conv_res.document.save_as_json( | |
filename=fname, image_mode=image_export_mode | |
) | |
# Export HTML format: | |
if export_html: | |
fname = output_dir / f"{doc_filename}.html" | |
_log.info(f"writing HTML output to {fname}") | |
conv_res.document.save_as_html( | |
filename=fname, image_mode=image_export_mode | |
) | |
# Export Text format: | |
if export_txt: | |
fname = output_dir / f"{doc_filename}.txt" | |
_log.info(f"writing TXT output to {fname}") | |
conv_res.document.save_as_markdown( | |
filename=fname, | |
strict_text=True, | |
image_mode=ImageRefMode.PLACEHOLDER, | |
) | |
# Export Markdown format: | |
if export_md: | |
fname = output_dir / f"{doc_filename}.md" | |
_log.info(f"writing Markdown output to {fname}") | |
conv_res.document.save_as_markdown( | |
filename=fname, image_mode=image_export_mode | |
) | |
# Export Document Tags format: | |
if export_doctags: | |
fname = output_dir / f"{doc_filename}.doctags" | |
_log.info(f"writing Doc Tags output to {fname}") | |
conv_res.document.save_as_document_tokens(filename=fname) | |
else: | |
_log.warning(f"Document {conv_res.input.file} failed to convert.") | |
failure_count += 1 | |
_log.info( | |
f"Processed {success_count + failure_count} docs, " | |
f"of which {failure_count} failed" | |
) | |
def process_results( | |
background_tasks: BackgroundTasks, | |
conversion_options: ConvertDocumentsOptions, | |
conv_results: Iterable[ConversionResult], | |
) -> Union[ConvertDocumentResponse, FileResponse]: | |
# Let's start by processing the documents | |
try: | |
start_time = time.monotonic() | |
# Convert the iterator to a list to count the number of results and get timings | |
# As it's an iterator (lazy evaluation), it will also start the conversion | |
conv_results = list(conv_results) | |
processing_time = time.monotonic() - start_time | |
_log.info( | |
f"Processed {len(conv_results)} docs in {processing_time:.2f} seconds." | |
) | |
except Exception as e: | |
raise HTTPException(status_code=500, detail=str(e)) | |
if len(conv_results) == 0: | |
raise HTTPException( | |
status_code=500, detail="No documents were generated by Docling." | |
) | |
# We have some results, let's prepare the response | |
response: Union[FileResponse, ConvertDocumentResponse] | |
# Booleans to know what to export | |
export_json = OutputFormat.JSON in conversion_options.to_formats | |
export_html = OutputFormat.HTML in conversion_options.to_formats | |
export_md = OutputFormat.MARKDOWN in conversion_options.to_formats | |
export_txt = OutputFormat.TEXT in conversion_options.to_formats | |
export_doctags = OutputFormat.DOCTAGS in conversion_options.to_formats | |
# Only 1 document was processed, and we are not returning it as a file | |
if len(conv_results) == 1 and not conversion_options.return_as_file: | |
conv_res = conv_results[0] | |
document = _export_document_as_content( | |
conv_res, | |
export_json=export_json, | |
export_html=export_html, | |
export_md=export_md, | |
export_txt=export_txt, | |
export_doctags=export_doctags, | |
image_mode=conversion_options.image_export_mode, | |
) | |
response = ConvertDocumentResponse( | |
document=document, | |
status=conv_res.status, | |
processing_time=processing_time, | |
timings=conv_res.timings, | |
) | |
# Multiple documents were processed, or we are forced returning as a file | |
else: | |
# Temporary directory to store the outputs | |
work_dir = Path(tempfile.mkdtemp(prefix="docling_")) | |
output_dir = work_dir / "output" | |
output_dir.mkdir(parents=True, exist_ok=True) | |
# Worker pid to use in archive identification as we may have multiple workers | |
os.getpid() | |
# Export the documents | |
_export_documents_as_files( | |
conv_results=conv_results, | |
output_dir=output_dir, | |
export_json=export_json, | |
export_html=export_html, | |
export_md=export_md, | |
export_txt=export_txt, | |
export_doctags=export_doctags, | |
image_export_mode=conversion_options.image_export_mode, | |
) | |
files = os.listdir(output_dir) | |
if len(files) == 0: | |
raise HTTPException(status_code=500, detail="No documents were exported.") | |
file_path = work_dir / "converted_docs.zip" | |
shutil.make_archive( | |
base_name=str(file_path.with_suffix("")), | |
format="zip", | |
root_dir=output_dir, | |
) | |
# Other cleanups after the response is sent | |
# Output directory | |
background_tasks.add_task(shutil.rmtree, work_dir, ignore_errors=True) | |
response = FileResponse( | |
file_path, filename=file_path.name, media_type="application/zip" | |
) | |
return response | |