Spaces:
Running
Running
File size: 50,893 Bytes
c04ffe5 3dd2ff2 c04ffe5 42dc069 c04ffe5 42dc069 c04ffe5 42dc069 c04ffe5 42dc069 c04ffe5 42dc069 c04ffe5 42dc069 73375a3 42dc069 c04ffe5 42dc069 c04ffe5 42dc069 c04ffe5 42dc069 c04ffe5 42dc069 c04ffe5 42dc069 c04ffe5 42dc069 c04ffe5 42dc069 c04ffe5 42dc069 c04ffe5 42dc069 c04ffe5 42dc069 c04ffe5 42dc069 c04ffe5 42dc069 c04ffe5 42dc069 c04ffe5 42dc069 c04ffe5 42dc069 c04ffe5 42dc069 c04ffe5 42dc069 c04ffe5 42dc069 c04ffe5 42dc069 c04ffe5 42dc069 c04ffe5 42dc069 c04ffe5 42dc069 c04ffe5 42dc069 c04ffe5 42dc069 c04ffe5 42dc069 c04ffe5 42dc069 c04ffe5 42dc069 c04ffe5 42dc069 c04ffe5 42dc069 c04ffe5 42dc069 c04ffe5 42dc069 c04ffe5 42dc069 c04ffe5 42dc069 c04ffe5 42dc069 c04ffe5 42dc069 c04ffe5 42dc069 c04ffe5 42dc069 c04ffe5 42dc069 c04ffe5 42dc069 c04ffe5 42dc069 c04ffe5 42dc069 c04ffe5 42dc069 c04ffe5 42dc069 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255 1256 1257 1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 |
"""
Utility functions for OCR image processing with Mistral AI.
Contains helper functions for working with OCR responses and image handling.
"""
# Standard library imports
import json
import base64
import io
import zipfile
import logging
import re
import time
import math
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Optional, Union, Any, Tuple
from functools import lru_cache
# Configure logging
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# Third-party imports
import numpy as np
# Mistral AI imports
from mistralai import DocumentURLChunk, ImageURLChunk, TextChunk
from mistralai.models import OCRImageObject
# Check for image processing libraries
try:
from PIL import Image, ImageEnhance, ImageFilter, ImageOps
PILLOW_AVAILABLE = True
except ImportError:
logger.warning("PIL not available - image preprocessing will be limited")
PILLOW_AVAILABLE = False
try:
import cv2
CV2_AVAILABLE = True
except ImportError:
logger.warning("OpenCV (cv2) not available - advanced image processing will be limited")
CV2_AVAILABLE = False
# Import configuration
try:
from config import IMAGE_PREPROCESSING
except ImportError:
# Fallback defaults if config not available
IMAGE_PREPROCESSING = {
"enhance_contrast": 1.5,
"sharpen": True,
"denoise": True,
"max_size_mb": 8.0,
"target_dpi": 300,
"compression_quality": 92
}
def detect_skew(image: Union[Image.Image, np.ndarray]) -> float:
"""
Quick skew detection that returns angle in degrees.
Uses a computationally efficient approach by analyzing at 1% resolution.
Args:
image: PIL Image or numpy array
Returns:
Estimated skew angle in degrees (positive or negative)
"""
# Convert PIL Image to numpy array if needed
if isinstance(image, Image.Image):
# Convert to grayscale for processing
if image.mode != 'L':
img_np = np.array(image.convert('L'))
else:
img_np = np.array(image)
else:
# If already numpy array, ensure it's grayscale
if len(image.shape) == 3:
if CV2_AVAILABLE:
img_np = cv2.cvtColor(image, cv2.COLOR_RGB2GRAY)
else:
# Fallback grayscale conversion
img_np = np.mean(image, axis=2).astype(np.uint8)
else:
img_np = image
# Downsample to 1% resolution for faster processing
height, width = img_np.shape
target_size = int(min(width, height) * 0.01)
# Use a sane minimum size and ensure we have enough pixels to detect lines
target_size = max(target_size, 100)
if CV2_AVAILABLE:
# OpenCV-based implementation (faster)
# Resize the image to the target size
scale_factor = target_size / max(width, height)
small_img = cv2.resize(img_np, None, fx=scale_factor, fy=scale_factor, interpolation=cv2.INTER_AREA)
# Apply binary thresholding to get cleaner edges
_, binary = cv2.threshold(small_img, 0, 255, cv2.THRESH_BINARY_INV + cv2.THRESH_OTSU)
# Use Hough Line Transform to detect lines
lines = cv2.HoughLinesP(binary, 1, np.pi/180, threshold=target_size//10,
minLineLength=target_size//5, maxLineGap=target_size//10)
if lines is None or len(lines) < 3:
# Not enough lines detected, assume no significant skew
return 0.0
# Calculate angles of lines
angles = []
for line in lines:
x1, y1, x2, y2 = line[0]
if x2 - x1 == 0: # Avoid division by zero
continue
angle = math.atan2(y2 - y1, x2 - x1) * 180.0 / np.pi
# Normalize angle to -45 to 45 range
angle = angle % 180
if angle > 90:
angle -= 180
if angle > 45:
angle -= 90
if angle < -45:
angle += 90
angles.append(angle)
if not angles:
return 0.0
# Use median to reduce impact of outliers
angles.sort()
median_angle = angles[len(angles) // 2]
return median_angle
else:
# PIL-only fallback implementation
# Resize using PIL
small_img = Image.fromarray(img_np).resize(
(int(width * target_size / max(width, height)),
int(height * target_size / max(width, height))),
Image.NEAREST
)
# Find edges
edges = small_img.filter(ImageFilter.FIND_EDGES)
edges_data = np.array(edges)
# Simple edge orientation analysis (less precise than OpenCV)
# Count horizontal vs vertical edges
h_edges = np.sum(np.abs(np.diff(edges_data, axis=1)))
v_edges = np.sum(np.abs(np.diff(edges_data, axis=0)))
# If horizontal edges dominate, no significant skew
if h_edges > v_edges * 1.2:
return 0.0
# Simple angle estimation based on edge distribution
# This is a simplified approach that works for slight skews
rows, cols = edges_data.shape
xs, ys = [], []
# Sample strong edge points
for r in range(0, rows, 2):
for c in range(0, cols, 2):
if edges_data[r, c] > 128:
xs.append(c)
ys.append(r)
if len(xs) < 10: # Not enough edge points
return 0.0
# Use simple linear regression to estimate the slope
n = len(xs)
mean_x = sum(xs) / n
mean_y = sum(ys) / n
# Calculate slope
numerator = sum((xs[i] - mean_x) * (ys[i] - mean_y) for i in range(n))
denominator = sum((xs[i] - mean_x) ** 2 for i in range(n))
if abs(denominator) < 1e-6: # Avoid division by zero
return 0.0
slope = numerator / denominator
angle = math.atan(slope) * 180.0 / math.pi
# Normalize to -45 to 45 degrees
if angle > 45:
angle -= 90
elif angle < -45:
angle += 90
return angle
def replace_images_in_markdown(md: str, images: dict[str, str]) -> str:
"""
Replace image placeholders in markdown with base64-encoded images.
Uses regex-based matching to handle variations in image IDs and formats.
Args:
md: Markdown text containing image placeholders
images: Dictionary mapping image IDs to base64 strings
Returns:
Markdown text with images replaced by base64 data
"""
# Process each image ID in the dictionary
for img_id, base64_str in images.items():
# Extract the base ID without extension for more flexible matching
base_id = img_id.split('.')[0]
# Match markdown image pattern where URL contains the base ID
# Using a single regex with groups to capture the full pattern
pattern = re.compile(rf'!\[([^\]]*)\]\(([^\)]*{base_id}[^\)]*)\)')
# Process all matches
matches = list(pattern.finditer(md))
for match in reversed(matches): # Process in reverse to avoid offset issues
# Replace the entire match with a properly formatted base64 image
md = md[:match.start()] + f"" + md[match.end():]
return md
def get_combined_markdown(ocr_response) -> str:
"""
Combine OCR text and images into a single markdown document.
Args:
ocr_response: OCR response object from Mistral AI
Returns:
Combined markdown string with embedded images
"""
markdowns = []
# Process each page of the OCR response
for page in ocr_response.pages:
# Extract image data if available
image_data = {}
if hasattr(page, "images"):
for img in page.images:
if hasattr(img, "id") and hasattr(img, "image_base64"):
image_data[img.id] = img.image_base64
# Replace image placeholders with base64 data
page_markdown = page.markdown if hasattr(page, "markdown") else ""
processed_markdown = replace_images_in_markdown(page_markdown, image_data)
markdowns.append(processed_markdown)
# Join all pages' markdown with double newlines
return "\n\n".join(markdowns)
def encode_image_for_api(image_path: Union[str, Path]) -> str:
"""
Encode an image as base64 data URL for API submission.
Args:
image_path: Path to the image file
Returns:
Base64 data URL for the image
"""
# Convert to Path object if string
image_file = Path(image_path) if isinstance(image_path, str) else image_path
# Verify image exists
if not image_file.is_file():
raise FileNotFoundError(f"Image file not found: {image_file}")
# Determine mime type based on file extension
mime_type = 'image/jpeg' # Default mime type
suffix = image_file.suffix.lower()
if suffix == '.png':
mime_type = 'image/png'
elif suffix == '.gif':
mime_type = 'image/gif'
elif suffix in ['.jpg', '.jpeg']:
mime_type = 'image/jpeg'
elif suffix == '.pdf':
mime_type = 'application/pdf'
# Encode image as base64
encoded = base64.b64encode(image_file.read_bytes()).decode()
return f"data:{mime_type};base64,{encoded}"
def encode_bytes_for_api(file_bytes: bytes, mime_type: str) -> str:
"""
Encode binary data as base64 data URL for API submission.
Args:
file_bytes: Binary file data
mime_type: MIME type of the file (e.g., 'image/jpeg', 'application/pdf')
Returns:
Base64 data URL for the data
"""
# Encode data as base64
encoded = base64.b64encode(file_bytes).decode()
return f"data:{mime_type};base64,{encoded}"
def calculate_image_entropy(pil_img: Image.Image) -> float:
"""
Calculate the entropy of a PIL image.
Entropy is a measure of randomness; low entropy indicates a blank or simple image,
high entropy indicates more complex content (e.g., text or detailed images).
Args:
pil_img: PIL Image object
Returns:
float: Entropy value
"""
# Convert to grayscale for entropy calculation
gray_img = pil_img.convert("L")
arr = np.array(gray_img)
# Compute histogram
hist, _ = np.histogram(arr, bins=256, range=(0, 255), density=True)
# Remove zero entries to avoid log(0)
hist = hist[hist > 0]
# Calculate entropy
entropy = -np.sum(hist * np.log2(hist))
return float(entropy)
def estimate_text_density(image_np):
"""
Estimate text density patterns in an image.
Returns metrics on text distribution and special cases.
Args:
image_np: Numpy array of the image
Returns:
dict: Text density metrics
"""
# Convert to grayscale
if len(image_np.shape) > 2 and image_np.shape[2] == 3:
gray = cv2.cvtColor(image_np, cv2.COLOR_RGB2GRAY)
else:
gray = image_np
# Binarize image
_, binary = cv2.threshold(gray, 0, 255, cv2.THRESH_BINARY_INV + cv2.THRESH_OTSU)
# Analyze vertical text density profile (important for headers/footers)
height, width = gray.shape
vertical_profile = np.sum(binary, axis=1) / width
# Analyze horizontal text density profile
horizontal_profile = np.sum(binary, axis=0) / height
# Calculate statistics
v_mean = np.mean(vertical_profile)
v_std = np.std(vertical_profile)
v_max = np.max(vertical_profile)
# Detect uppercase text regions (common in headers of Baldwin document)
# Uppercase text tends to have more consistent height and uniform vertical density
section_height = height // 10 # Divide into 10 vertical sections
uppercase_sections = 0
for i in range(0, height, section_height):
section = binary[i:min(i+section_height, height), :]
section_profile = np.sum(section, axis=1) / width
# Uppercase characteristics: high density with low variation
if np.mean(section_profile) > v_mean * 1.5 and np.std(section_profile) < v_std * 0.7:
uppercase_sections += 1
# Determine overall pattern
if v_std / v_mean > 0.8:
pattern = 'varied' # High variance indicates sections with different text densities
else:
pattern = 'uniform' # Low variance indicates uniform text distribution
return {
'mean_density': float(v_mean),
'density_variation': float(v_std),
'pattern': pattern,
'uppercase_sections': uppercase_sections,
'max_density': float(v_max)
}
def serialize_ocr_object(obj):
"""
Serialize OCR response objects to JSON serializable format.
Handles OCRImageObject specifically to prevent serialization errors.
Args:
obj: The object to serialize
Returns:
JSON serializable representation of the object
"""
# Fast path: Handle primitive types directly
if obj is None or isinstance(obj, (str, int, float, bool)):
return obj
# Handle collections
if isinstance(obj, list):
return [serialize_ocr_object(item) for item in obj]
elif isinstance(obj, dict):
return {k: serialize_ocr_object(v) for k, v in obj.items()}
elif isinstance(obj, OCRImageObject):
# Special handling for OCRImageObject
return {
'id': obj.id if hasattr(obj, 'id') else None,
'image_base64': obj.image_base64 if hasattr(obj, 'image_base64') else None
}
elif hasattr(obj, '__dict__'):
# For objects with __dict__ attribute
return {k: serialize_ocr_object(v) for k, v in obj.__dict__.items()
if not k.startswith('_')} # Skip private attributes
else:
# Try to convert to string as last resort
try:
return str(obj)
except:
return None
# Clean OCR result with focus on Mistral compatibility
def clean_ocr_result(result, use_segmentation=False, vision_enabled=True, preprocessing_options=None):
"""
Clean text content in OCR results, preserving original structure from Mistral API.
Only removes markdown/HTML conflicts without duplicating content across fields.
Args:
result: OCR result object or dictionary
use_segmentation: Whether image segmentation was used
vision_enabled: Whether vision model was used
preprocessing_options: Dictionary of preprocessing options
Returns:
Cleaned result object
"""
if not result:
return result
# Import text utilities for cleaning
try:
from utils.text_utils import clean_raw_text
text_cleaner_available = True
except ImportError:
text_cleaner_available = False
def clean_text(text):
"""Clean text content, removing markdown image references and base64 data"""
if not text or not isinstance(text, str):
return ""
if text_cleaner_available:
text = clean_raw_text(text)
else:
# Remove image references like 
text = re.sub(r'!\[.*?\]\(data:image/[^)]+\)', '', text)
# Remove basic markdown image references like 
text = re.sub(r'!\[[^\]]*\]\([^)]+\)', '', text)
# Remove base64 encoded image data
text = re.sub(r'data:image/[^;]+;base64,[a-zA-Z0-9+/=]+', '', text)
# Clean up any JSON-like image object references
text = re.sub(r'{"image(_data)?":("[^"]*"|null|true|false|\{[^}]*\}|\[[^\]]*\])}', '', text)
# Clean up excessive whitespace and line breaks created by removals
text = re.sub(r'\n{3,}', '\n\n', text)
text = re.sub(r'\s{3,}', ' ', text)
return text.strip()
# Process dictionary
if isinstance(result, dict):
# For PDF documents, preserve original structure from Mistral API
is_pdf = result.get('file_type', '') == 'pdf' or (
result.get('file_name', '').lower().endswith('.pdf')
)
# Ensure ocr_contents exists
if 'ocr_contents' not in result:
result['ocr_contents'] = {}
# Clean raw_text if it exists but don't duplicate it
if 'raw_text' in result:
result['raw_text'] = clean_text(result['raw_text'])
# Handle ocr_contents fields - clean them but don't duplicate
if 'ocr_contents' in result:
for key, value in list(result['ocr_contents'].items()):
# Skip binary fields and image data
if key in ['image_base64', 'images', 'binary_data'] and value:
continue
# Clean string values to remove markdown/HTML conflicts
if isinstance(value, str):
result['ocr_contents'][key] = clean_text(value)
# Handle segmentation data
if use_segmentation and preprocessing_options and 'segmentation_data' in preprocessing_options:
# Store segmentation metadata
result['segmentation_applied'] = True
# Extract combined text if available
if 'combined_text' in preprocessing_options['segmentation_data']:
segmentation_text = clean_text(preprocessing_options['segmentation_data']['combined_text'])
# Add as dedicated field
result['ocr_contents']['segmentation_text'] = segmentation_text
# IMPORTANT: For documents with overlapping regions like baldwin-15th-north,
# the intelligently merged segmentation text is more accurate than the raw OCR
# Always use segmentation text as the primary source when available
# This ensures clean, non-duplicated content from overlapping regions
result['ocr_contents']['raw_text'] = segmentation_text
# Also update the 'text' field which is used in some contexts
if 'text' in result['ocr_contents']:
result['ocr_contents']['text'] = segmentation_text
# Clean pages_data if available (Mistral OCR format)
if 'pages_data' in result:
for page in result['pages_data']:
if isinstance(page, dict):
# Clean text field
if 'text' in page:
page['text'] = clean_text(page['text'])
# Clean markdown field
if 'markdown' in page:
page['markdown'] = clean_text(page['markdown'])
# Handle list content recursively
elif isinstance(result, list):
return [clean_ocr_result(item, use_segmentation, vision_enabled, preprocessing_options)
for item in result]
return result
def create_results_zip(results, output_dir=None, zip_name=None):
"""
Create a zip file containing OCR results.
Args:
results: Dictionary or list of OCR results
output_dir: Optional output directory
zip_name: Optional zip file name
Returns:
Path to the created zip file
"""
# Create temporary output directory if not provided
if output_dir is None:
output_dir = Path.cwd() / "output"
output_dir.mkdir(exist_ok=True)
else:
output_dir = Path(output_dir)
output_dir.mkdir(exist_ok=True)
# Generate zip name if not provided
if zip_name is None:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
if isinstance(results, list):
# For a list of results, create a descriptive name
file_count = len(results)
zip_name = f"ocr_results_{file_count}_{timestamp}.zip"
else:
# For single result, create descriptive filename
base_name = results.get('file_name', 'document').split('.')[0]
zip_name = f"{base_name}_{timestamp}.zip"
try:
# Get zip data in memory first
zip_data = create_results_zip_in_memory(results)
# Save to file
zip_path = output_dir / zip_name
with open(zip_path, 'wb') as f:
f.write(zip_data)
return zip_path
except Exception as e:
# Create an empty zip file as fallback
logger.error(f"Error creating zip file: {str(e)}")
zip_path = output_dir / zip_name
with zipfile.ZipFile(zip_path, 'w') as zipf:
zipf.writestr("info.txt", "Could not create complete archive")
return zip_path
def create_results_zip_in_memory(results):
"""
Create a zip file containing OCR results in memory.
Packages markdown with embedded image tags, raw text, and JSON file
in a contextually relevant structure.
Args:
results: Dictionary or list of OCR results
Returns:
Binary zip file data
"""
# Create a BytesIO object
zip_buffer = io.BytesIO()
# Create a ZipFile instance
with zipfile.ZipFile(zip_buffer, 'w', compression=zipfile.ZIP_DEFLATED) as zipf:
# Check if results is a list or a dictionary
is_list = isinstance(results, list)
if is_list:
# Handle multiple results by creating subdirectories
for idx, result in enumerate(results):
if result and isinstance(result, dict):
# Create a folder name based on the file name or index
folder_name = result.get('file_name', f'document_{idx+1}')
folder_name = Path(folder_name).stem # Remove file extension
# Add files to this folder
add_result_files_to_zip(zipf, result, f"{folder_name}/")
else:
# Single result - add files directly to root of zip
add_result_files_to_zip(zipf, results)
# Seek to the beginning of the BytesIO object
zip_buffer.seek(0)
# Return the zip file bytes
return zip_buffer.getvalue()
def truncate_base64_in_result(result, prefix_length=32, suffix_length=32):
"""
Create a copy of the result dictionary with base64 image data truncated.
This keeps the structure intact while making the JSON more readable.
Args:
result: OCR result dictionary
prefix_length: Number of characters to keep at the beginning
suffix_length: Number of characters to keep at the end
Returns:
Dictionary with truncated base64 data
"""
if not result or not isinstance(result, dict):
return {}
# Create a deep copy to avoid modifying the original
import copy
truncated_result = copy.deepcopy(result)
# Helper function to truncate base64 strings
def truncate_base64(data):
if not isinstance(data, str) or len(data) <= prefix_length + suffix_length + 10:
return data
# Extract prefix and suffix based on whether this is a data URI or raw base64
if data.startswith('data:'):
# Handle data URIs like '...'
parts = data.split(',', 1)
if len(parts) != 2:
return data # Unexpected format, return as is
header = parts[0] + ','
base64_content = parts[1]
if len(base64_content) <= prefix_length + suffix_length + 10:
return data # Not long enough to truncate
truncated = (f"{header}{base64_content[:prefix_length]}..."
f"[truncated {len(base64_content) - prefix_length - suffix_length} chars]..."
f"{base64_content[-suffix_length:]}")
else:
# Handle raw base64 strings
truncated = (f"{data[:prefix_length]}..."
f"[truncated {len(data) - prefix_length - suffix_length} chars]..."
f"{data[-suffix_length:]}")
return truncated
# Helper function to recursively truncate base64 in nested structures
def truncate_base64_recursive(obj):
if isinstance(obj, dict):
# Check for keys that typically contain base64 data
for key in list(obj.keys()):
if key in ['image_base64', 'base64'] and isinstance(obj[key], str):
obj[key] = truncate_base64(obj[key])
elif isinstance(obj[key], (dict, list)):
truncate_base64_recursive(obj[key])
elif isinstance(obj, list):
for item in obj:
if isinstance(item, (dict, list)):
truncate_base64_recursive(item)
# Truncate base64 data throughout the result
truncate_base64_recursive(truncated_result)
# Specifically handle the pages_data structure
if 'pages_data' in truncated_result:
for page in truncated_result['pages_data']:
if isinstance(page, dict) and 'images' in page:
for img in page['images']:
if isinstance(img, dict) and 'image_base64' in img and isinstance(img['image_base64'], str):
img['image_base64'] = truncate_base64(img['image_base64'])
# Handle raw_response_data if present
if 'raw_response_data' in truncated_result and isinstance(truncated_result['raw_response_data'], dict):
if 'pages' in truncated_result['raw_response_data']:
for page in truncated_result['raw_response_data']['pages']:
if isinstance(page, dict) and 'images' in page:
for img in page['images']:
if isinstance(img, dict) and 'base64' in img and isinstance(img['base64'], str):
img['base64'] = truncate_base64(img['base64'])
return truncated_result
def clean_base64_from_result(result):
"""
Create a clean copy of the result dictionary with base64 image data removed.
This ensures JSON files don't contain large base64 strings.
Args:
result: OCR result dictionary
Returns:
Cleaned dictionary without base64 data
"""
if not result or not isinstance(result, dict):
return {}
# Create a deep copy to avoid modifying the original
import copy
clean_result = copy.deepcopy(result)
# Helper function to recursively clean base64 from nested structures
def clean_base64_recursive(obj):
if isinstance(obj, dict):
# Check for keys that typically contain base64 data
for key in list(obj.keys()):
if key in ['image_base64', 'base64']:
obj[key] = "[BASE64_DATA_REMOVED]"
elif isinstance(obj[key], (dict, list)):
clean_base64_recursive(obj[key])
elif isinstance(obj, list):
for item in obj:
if isinstance(item, (dict, list)):
clean_base64_recursive(item)
# Clean the entire result
clean_base64_recursive(clean_result)
# Specifically handle the pages_data structure
if 'pages_data' in clean_result:
for page in clean_result['pages_data']:
if isinstance(page, dict) and 'images' in page:
for img in page['images']:
if isinstance(img, dict) and 'image_base64' in img:
img['image_base64'] = "[BASE64_DATA_REMOVED]"
# Handle raw_response_data if present
if 'raw_response_data' in clean_result and isinstance(clean_result['raw_response_data'], dict):
if 'pages' in clean_result['raw_response_data']:
for page in clean_result['raw_response_data']['pages']:
if isinstance(page, dict) and 'images' in page:
for img in page['images']:
if isinstance(img, dict) and 'base64' in img:
img['base64'] = "[BASE64_DATA_REMOVED]"
return clean_result
def create_markdown_with_file_references(result, image_path_prefix="images/"):
"""
Create a markdown document with file references to images instead of base64 embedding.
Ideal for use in zip archives where images are stored as separate files.
Args:
result: OCR result dictionary
image_path_prefix: Path prefix for image references (e.g., "images/")
Returns:
Markdown content as string with file references
"""
# Similar to create_markdown_with_images but uses file references
# Import content utils to use classification functions
try:
from utils.content_utils import classify_document_content, extract_document_text, extract_image_description
content_utils_available = True
except ImportError:
content_utils_available = False
# Get content classification
has_text = True
has_images = False
if content_utils_available:
classification = classify_document_content(result)
has_text = classification['has_content']
has_images = result.get('has_images', False)
else:
# Minimal fallback detection
if 'has_images' in result:
has_images = result['has_images']
# Check for image data more thoroughly
if 'pages_data' in result and isinstance(result['pages_data'], list):
for page in result['pages_data']:
if isinstance(page, dict) and 'images' in page and page['images']:
has_images = True
break
# Start building the markdown document
md = []
# Add document title/header
md.append(f"# {result.get('file_name', 'Document')}\n")
# Add metadata section
md.append("## Document Metadata\n")
# Add timestamp
if 'timestamp' in result:
md.append(f"**Processed:** {result['timestamp']}\n")
# Add languages if available
if 'languages' in result and result['languages']:
languages = [lang for lang in result['languages'] if lang]
if languages:
md.append(f"**Languages:** {', '.join(languages)}\n")
# Add document type and topics
if 'detected_document_type' in result:
md.append(f"**Document Type:** {result['detected_document_type']}\n")
if 'topics' in result and result['topics']:
md.append(f"**Topics:** {', '.join(result['topics'])}\n")
md.append("\n---\n")
# Document title - extract from result if available
if 'ocr_contents' in result and 'title' in result['ocr_contents'] and result['ocr_contents']['title']:
title_content = result['ocr_contents']['title']
md.append(f"## {title_content}\n")
# Add images if present
if has_images and 'pages_data' in result:
md.append("## Images\n")
# Extract and display all images with file references
for page_idx, page in enumerate(result['pages_data']):
if 'images' in page and isinstance(page['images'], list):
for img_idx, img in enumerate(page['images']):
if 'image_base64' in img:
# Create image reference to file in the zip
image_filename = f"image_{page_idx+1}_{img_idx+1}.jpg"
image_path = f"{image_path_prefix}{image_filename}"
image_caption = f"Image {page_idx+1}-{img_idx+1}"
md.append(f"\n")
# Add image description if available through utils
if content_utils_available:
description = extract_image_description(result)
if description:
md.append(f"*{description}*\n")
md.append("\n---\n")
# Add document text section
md.append("## Text Content\n")
# Extract text content systematically
text_content = ""
structured_sections = {}
# Helper function to extract clean text from dictionary objects
def extract_clean_text(content):
if isinstance(content, str):
# Check if content is a stringified JSON
if content.strip().startswith("{") and content.strip().endswith("}"):
try:
# Try to parse as JSON
content_dict = json.loads(content.replace("'", '"'))
if 'text' in content_dict:
return content_dict['text']
return content
except:
return content
return content
elif isinstance(content, dict):
# If it's a dictionary with a 'text' key, return just that value
if 'text' in content and isinstance(content['text'], str):
return content['text']
return content
return content
if content_utils_available:
# Use the systematic utility function for main text
text_content = extract_document_text(result)
text_content = extract_clean_text(text_content)
# Collect all available structured sections
if 'ocr_contents' in result:
for field, content in result['ocr_contents'].items():
# Skip certain fields that are handled separately
if field in ["raw_text", "error", "partial_text", "main_text"]:
continue
if content:
# Extract clean text from content if possible
clean_content = extract_clean_text(content)
# Add this as a structured section
structured_sections[field] = clean_content
else:
# Fallback extraction logic
if 'ocr_contents' in result:
# First find main text
for field in ["main_text", "content", "text", "transcript", "raw_text"]:
if field in result['ocr_contents'] and result['ocr_contents'][field]:
content = result['ocr_contents'][field]
if isinstance(content, str) and content.strip():
text_content = content
break
elif isinstance(content, dict):
# Try to convert complex objects to string
try:
text_content = json.dumps(content, indent=2)
break
except:
pass
# Then collect all structured sections
for field, content in result['ocr_contents'].items():
# Skip certain fields that are handled separately
if field in ["raw_text", "error", "partial_text", "main_text", "content", "text", "transcript"]:
continue
if content:
# Add this as a structured section
structured_sections[field] = content
# Add the main text content - display raw text without a field label
if text_content:
# Check if this is from raw_text (based on content match)
is_raw_text = False
if 'ocr_contents' in result and 'raw_text' in result['ocr_contents']:
if result['ocr_contents']['raw_text'] == text_content:
is_raw_text = True
# Display content without adding a "raw_text:" label
md.append(text_content + "\n\n")
# Add structured sections if available
if structured_sections:
for section_name, section_content in structured_sections.items():
# Use proper markdown header for sections - consistently capitalize all section names
display_name = section_name.replace("_", " ").capitalize()
# Handle different content types
if isinstance(section_content, str):
md.append(section_content + "\n\n")
elif isinstance(section_content, dict):
# Dictionary content - format as key-value pairs
for key, value in section_content.items():
# Treat all values as plain text to maintain content purity
# This prevents JSON-like structures from being formatted as code blocks
md.append(f"**{key}:** {value}\n\n")
elif isinstance(section_content, list):
# List content - create a markdown list
for item in section_content:
# Treat all items as plain text
md.append(f"- {item}\n")
md.append("\n")
# Join all markdown parts into a single string
return "\n".join(md)
def add_result_files_to_zip(zipf, result, prefix=""):
"""
Add files for a single result to a zip file.
Args:
zipf: ZipFile instance to add files to
result: OCR result dictionary
prefix: Optional prefix for file paths in the zip
"""
if not result or not isinstance(result, dict):
return
# Create a timestamp for filename if not in result
timestamp = result.get('timestamp', datetime.now().strftime("%Y-%m-%d_%H-%M-%S"))
# Get base name for files
file_name = result.get('file_name', 'document')
base_name = Path(file_name).stem
try:
# 1. Add JSON file - with base64 data cleaned out
clean_result = clean_base64_from_result(result)
json_str = json.dumps(clean_result, indent=2)
zipf.writestr(f"{prefix}{base_name}.json", json_str)
# 2. Add markdown file that exactly matches Tab 1 display
# Use the create_markdown_with_images function to ensure it matches the UI exactly
try:
markdown_content = create_markdown_with_images(result)
zipf.writestr(f"{prefix}{base_name}.md", markdown_content)
except Exception as e:
logger.error(f"Error creating markdown: {str(e)}")
# Fallback to simpler markdown if error occurs
zipf.writestr(f"{prefix}{base_name}.md", f"# {file_name}\n\nError generating complete markdown output.")
# Extract and save images first to ensure they exist before creating markdown
img_paths = {}
has_images = result.get('has_images', False)
# 3. Add individual images if available
if has_images and 'pages_data' in result:
img_folder = f"{prefix}images/"
for page_idx, page in enumerate(result['pages_data']):
if 'images' in page and isinstance(page['images'], list):
for img_idx, img in enumerate(page['images']):
if 'image_base64' in img and img['image_base64']:
# Extract the base64 data
try:
# Get the base64 data
img_data = img['image_base64']
# Handle the base64 data carefully
if isinstance(img_data, str):
# If it has a data URI prefix, remove it
if ',' in img_data and ';base64,' in img_data:
# Keep the complete data after the comma
img_data = img_data.split(',', 1)[1]
# Make sure we have the complete data (not truncated)
try:
# Decode the base64 data with padding correction
# Add padding if needed to prevent truncation errors
missing_padding = len(img_data) % 4
if missing_padding:
img_data += '=' * (4 - missing_padding)
img_bytes = base64.b64decode(img_data)
except Exception as e:
logger.error(f"Base64 decoding error: {str(e)} for image {page_idx}-{img_idx}")
# Skip this image if we can't decode it
continue
else:
# If it's not a string (e.g., already bytes), use it directly
img_bytes = img_data
# Create image filename
image_filename = f"image_{page_idx+1}_{img_idx+1}.jpg"
img_paths[(page_idx, img_idx)] = image_filename
# Write the image to the zip file
zipf.writestr(f"{img_folder}{image_filename}", img_bytes)
except Exception as e:
logger.warning(f"Could not add image to zip: {str(e)}")
# 4. Add markdown with file references to images for offline viewing
try:
if has_images:
# Create markdown with file references
file_ref_markdown = create_markdown_with_file_references(result, "images/")
zipf.writestr(f"{prefix}{base_name}_with_files.md", file_ref_markdown)
except Exception as e:
logger.warning(f"Error creating markdown with file references: {str(e)}")
# 5. Add README.txt with explanation of file contents
readme_content = f"""
OCR RESULTS FOR: {file_name}
Processed: {timestamp}
This archive contains the following files:
- {base_name}.json: Complete JSON data with all extracted information
- {base_name}.md: Markdown document with embedded base64 images (exactly as shown in the app)
- {base_name}_with_files.md: Alternative markdown with file references instead of base64 (for offline viewing)
- images/ folder: Contains extracted images from the document (if present)
Generated by Historical OCR using Mistral AI
"""
zipf.writestr(f"{prefix}README.txt", readme_content.strip())
except Exception as e:
logger.error(f"Error adding files to zip: {str(e)}")
def create_markdown_with_images(result):
"""
Create a clean Markdown document from OCR results that properly preserves
image references and text structure, following the principle of content purity.
Args:
result: OCR result dictionary
Returns:
Markdown content as string
"""
# Similar to create_markdown_with_file_references but embeds base64 images
# Import content utils to use classification functions
try:
from utils.content_utils import classify_document_content, extract_document_text, extract_image_description
content_utils_available = True
except ImportError:
content_utils_available = False
# Get content classification
has_text = True
has_images = False
if content_utils_available:
classification = classify_document_content(result)
has_text = classification['has_content']
has_images = result.get('has_images', False)
else:
# Minimal fallback detection
if 'has_images' in result:
has_images = result['has_images']
# Check for image data more thoroughly
if 'pages_data' in result and isinstance(result['pages_data'], list):
for page in result['pages_data']:
if isinstance(page, dict) and 'images' in page and page['images']:
has_images = True
break
# Start building the markdown document
md = []
# Add document title/header
md.append(f"# {result.get('file_name', 'Document')}\n")
# Add metadata section
md.append("## Document Metadata\n")
# Add timestamp
if 'timestamp' in result:
md.append(f"**Processed:** {result['timestamp']}\n")
# Add languages if available
if 'languages' in result and result['languages']:
languages = [lang for lang in result['languages'] if lang]
if languages:
md.append(f"**Languages:** {', '.join(languages)}\n")
# Add document type and topics
if 'detected_document_type' in result:
md.append(f"**Document Type:** {result['detected_document_type']}\n")
if 'topics' in result and result['topics']:
md.append(f"**Topics:** {', '.join(result['topics'])}\n")
md.append("\n---\n")
# Document title - extract from result if available
if 'ocr_contents' in result and 'title' in result['ocr_contents'] and result['ocr_contents']['title']:
title_content = result['ocr_contents']['title']
md.append(f"## {title_content}\n")
# Add images if present - with base64 embedding
if has_images and 'pages_data' in result:
md.append("## Images\n")
# Extract and display all images with embedded base64
for page_idx, page in enumerate(result['pages_data']):
if 'images' in page and isinstance(page['images'], list):
for img_idx, img in enumerate(page['images']):
if 'image_base64' in img:
# Use the base64 data directly
image_caption = f"Image {page_idx+1}-{img_idx+1}"
img_data = img['image_base64']
# Make sure it has proper data URI format
if isinstance(img_data, str) and not img_data.startswith('data:'):
img_data = f"data:image/jpeg;base64,{img_data}"
md.append(f"\n")
# Add image description if available through utils
if content_utils_available:
description = extract_image_description(result)
if description:
md.append(f"*{description}*\n")
md.append("\n---\n")
# Add document text section
md.append("## Text Content\n")
# Extract text content systematically
text_content = ""
structured_sections = {}
if content_utils_available:
# Use the systematic utility function for main text
text_content = extract_document_text(result)
# Collect all available structured sections
if 'ocr_contents' in result:
for field, content in result['ocr_contents'].items():
# Skip certain fields that are handled separately
if field in ["raw_text", "error", "partial_text", "main_text"]:
continue
if content:
# Add this as a structured section
structured_sections[field] = content
else:
# Fallback extraction logic
if 'ocr_contents' in result:
# First find main text
for field in ["main_text", "content", "text", "transcript", "raw_text"]:
if field in result['ocr_contents'] and result['ocr_contents'][field]:
content = result['ocr_contents'][field]
if isinstance(content, str) and content.strip():
text_content = content
break
elif isinstance(content, dict):
# Try to convert complex objects to string
try:
text_content = json.dumps(content, indent=2)
break
except:
pass
# Then collect all structured sections
for field, content in result['ocr_contents'].items():
# Skip certain fields that are handled separately
if field in ["raw_text", "error", "partial_text", "main_text", "content", "text", "transcript"]:
continue
if content:
# Add this as a structured section
structured_sections[field] = content
# Add the main text content
if text_content:
md.append(text_content + "\n\n")
# Add structured sections if available
if structured_sections:
for section_name, section_content in structured_sections.items():
# Use proper markdown header for sections - consistently capitalize all section names
display_name = section_name.replace("_", " ").capitalize()
md.append(f"### {display_name}\n")
# Add a separator for clarity
md.append("\n---\n\n")
# Handle different content types
if isinstance(section_content, str):
md.append(section_content + "\n\n")
elif isinstance(section_content, dict):
# Dictionary content - format as key-value pairs
for key, value in section_content.items():
# Treat all values as plain text to maintain content purity
md.append(f"**{key}:** {value}\n\n")
elif isinstance(section_content, list):
# List content - create a markdown list
for item in section_content:
# Keep list items as plain text
md.append(f"- {item}\n")
md.append("\n")
# Join all markdown parts into a single string
return "\n".join(md)
|