Spaces:
Sleeping
Sleeping

Enhance Dockerfile and Streamlit app for comprehensive environment setup and permission testing
98aae70
import os | |
import time | |
import logging | |
import json | |
from dataclasses import dataclass | |
from typing import Optional | |
# Don't import DocumentConverter at module level to prevent early initialization | |
# from docling.document_converter import DocumentConverter | |
from processing.sections import SectionExtractor | |
# Remove global converter initialization - will be done lazily | |
# _docling_converter = DocumentConverter() | |
logger = logging.getLogger(__name__) # Logger for this module | |
class DocumentResult: | |
"""Holds processed results for a document.""" | |
file_path: str | |
structured_markdown: str | |
structured_json: dict | |
redacted_markdown: str | |
redacted_json: dict | |
class DocumentProcessor: | |
"""Handles parsing of documents with Docling and redacting specified sections.""" | |
def __init__(self, section_extractor: Optional[SectionExtractor] = None): | |
""" | |
Initialize with an optional SectionExtractor for removing specific sections. | |
If None, no redaction will be performed (original structure only). | |
The Docling DocumentConverter will be initialized lazily when needed. | |
""" | |
self.section_extractor = section_extractor | |
self._converter = None # Lazy initialization | |
def converter(self): | |
"""Lazy initialization of DocumentConverter to prevent early Hugging Face Hub initialization.""" | |
if self._converter is None: | |
# Import here to ensure environment variables are set first | |
from docling.document_converter import DocumentConverter | |
logger.info("Initializing Docling DocumentConverter...") | |
self._converter = DocumentConverter() | |
logger.info("Docling DocumentConverter initialized successfully") | |
return self._converter | |
def process(self, file_path: str) -> DocumentResult: | |
"""Parse the document and optionally remove specified sections. Returns a DocumentResult.""" | |
logger.info(f"Starting processing for file: {file_path}") | |
start_time = time.time() | |
# Ensure environment variables are set before processing | |
self._ensure_cache_directories() | |
# Convert the document using Docling | |
conv_result = self.converter.convert(file_path) | |
elapsed = time.time() - start_time | |
logger.info(f"Docling conversion completed in {elapsed:.2f} seconds") | |
# Export results from Docling | |
structured_md = conv_result.document.export_to_markdown() | |
structured_text = conv_result.document.export_to_text() | |
doc_json = conv_result.document.export_to_dict() | |
logger.info(f"Extracted document content (text length {len(structured_text)} characters)") | |
# Use SectionExtractor to remove target sections if provided | |
if self.section_extractor: | |
# Use the new JSON-based approach for better section removal | |
redacted_json = self.section_extractor.remove_sections_from_json(doc_json) | |
# Convert the redacted JSON back to markdown using Docling's export method | |
# Create a modified document structure for proper markdown export | |
redacted_md = self._export_redacted_markdown(conv_result.document, redacted_json) | |
logger.info("Applied section redaction to remove specified sections") | |
else: | |
redacted_md = structured_md # No redaction, use original | |
redacted_json = doc_json # No redaction, use original | |
logger.info("No section redaction applied (showing original structure)") | |
# Persist outputs to files (JSON and redacted text) for auditing | |
base_name = os.path.splitext(os.path.basename(file_path))[0] | |
# Use temp directory for output files - try to use the same temp dir as the main app | |
temp_dir = "temp_files" | |
try: | |
os.makedirs(temp_dir, exist_ok=True) | |
except PermissionError: | |
# Fallback to system temp directory if we can't create in current directory | |
import tempfile | |
temp_dir = os.path.join(tempfile.gettempdir(), "docling_temp_files") | |
os.makedirs(temp_dir, exist_ok=True) | |
json_path = os.path.join(temp_dir, f"{base_name}_structured.json") | |
redacted_path = os.path.join(temp_dir, f"{base_name}_redacted.txt") | |
redacted_json_path = os.path.join(temp_dir, f"{base_name}_redacted.json") | |
try: | |
with open(json_path, "w", encoding="utf-8") as jf: | |
json.dump(doc_json, jf, ensure_ascii=False, indent=2) | |
with open(redacted_path, "w", encoding="utf-8") as tf: | |
tf.write(redacted_md) | |
with open(redacted_json_path, "w", encoding="utf-8") as jf: | |
json.dump(redacted_json, jf, ensure_ascii=False, indent=2) | |
logger.info(f"Saved structured JSON to {json_path}, redacted text to {redacted_path}, and redacted JSON to {redacted_json_path}") | |
except Exception as e: | |
logger.error(f"Error saving outputs to files: {e}") | |
# Prepare result object | |
result = DocumentResult( | |
file_path=file_path, | |
structured_markdown=structured_md, | |
structured_json=doc_json, | |
redacted_markdown=redacted_md, | |
redacted_json=redacted_json | |
) | |
logger.info(f"Finished processing for file: {file_path}") | |
return result | |
def _ensure_cache_directories(self): | |
"""Ensure all necessary cache directories exist before processing.""" | |
cache_dirs = [ | |
os.environ.get('HF_HOME', '/tmp/docling_temp/huggingface'), | |
os.environ.get('HF_CACHE_HOME', '/tmp/docling_temp/huggingface_cache'), | |
os.environ.get('HF_HUB_CACHE', '/tmp/docling_temp/huggingface_cache'), | |
os.environ.get('TRANSFORMERS_CACHE', '/tmp/docling_temp/transformers_cache'), | |
os.environ.get('HF_DATASETS_CACHE', '/tmp/docling_temp/datasets_cache'), | |
os.environ.get('DIFFUSERS_CACHE', '/tmp/docling_temp/diffusers_cache'), | |
os.environ.get('ACCELERATE_CACHE', '/tmp/docling_temp/accelerate_cache'), | |
os.environ.get('TORCH_HOME', '/tmp/docling_temp/torch'), | |
os.environ.get('TENSORFLOW_HOME', '/tmp/docling_temp/tensorflow'), | |
os.environ.get('KERAS_HOME', '/tmp/docling_temp/keras'), | |
] | |
for cache_dir in cache_dirs: | |
try: | |
os.makedirs(cache_dir, exist_ok=True) | |
logger.debug(f"Ensured cache directory exists: {cache_dir}") | |
except Exception as e: | |
logger.warning(f"Could not create cache directory {cache_dir}: {e}") | |
def _export_redacted_markdown(self, document, redacted_json): | |
"""Export redacted markdown using the redacted JSON structure.""" | |
# Simply convert the redacted JSON back to markdown | |
return self._json_to_markdown(redacted_json) | |
def _json_to_markdown(self, json_data: dict) -> str: | |
"""Convert JSON document structure back to markdown format using Docling's structure.""" | |
markdown_lines = [] | |
# Get all text elements from the JSON | |
texts = json_data.get("texts", []) | |
for text_elem in texts: | |
text_content = text_elem.get("text", "") | |
label = text_elem.get("label", "") | |
level = text_elem.get("level", 0) | |
if not text_content.strip(): | |
continue | |
# Format based on the label and level (following Docling's structure) | |
if label == "section_header": | |
# Add appropriate markdown headers | |
if level == 1: | |
markdown_lines.append(f"# {text_content}") | |
elif level == 2: | |
markdown_lines.append(f"## {text_content}") | |
elif level == 3: | |
markdown_lines.append(f"### {text_content}") | |
else: | |
markdown_lines.append(f"#### {text_content}") | |
elif label == "list_item": | |
# Handle list items - preserve the original marker | |
marker = text_elem.get("marker", "-") | |
markdown_lines.append(f"{marker} {text_content}") | |
elif label == "text": | |
# Regular text content - preserve as-is | |
markdown_lines.append(text_content) | |
else: | |
# Default to regular text | |
markdown_lines.append(text_content) | |
# Join without extra spacing to match Docling's formatting | |
return "\n".join(markdown_lines) | |