Svngoku commited on
Commit
3cd8625
·
verified ·
1 Parent(s): 2e2b7f9

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +183 -213
app.py CHANGED
@@ -7,170 +7,135 @@ from pathlib import Path
7
  import pycountry
8
  import json
9
  import logging
10
- from tenacity import retry, stop_after_attempt, wait_fixed
11
  import tempfile
12
- from typing import Union, Dict, List
13
  from contextlib import contextmanager
14
  import requests
15
  import shutil
 
 
16
 
17
  # Constants
18
  DEFAULT_LANGUAGE = "English"
19
- SUPPORTED_IMAGE_TYPES = [".jpg", ".png"]
20
  SUPPORTED_PDF_TYPES = [".pdf"]
21
  TEMP_FILE_EXPIRY = 7200 # 2 hours in seconds
22
- UPLOAD_FOLDER = "uploads" # Local storage folder
 
 
23
 
24
- # Create upload folder if it doesn't exist
25
  os.makedirs(UPLOAD_FOLDER, exist_ok=True)
26
-
27
- # Configure logging
28
- logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
 
 
29
  logger = logging.getLogger(__name__)
30
 
31
  class OCRProcessor:
32
  def __init__(self, api_key: str):
33
- if not api_key:
34
- raise ValueError("API key must be provided")
35
- self.api_key = api_key
36
  self.client = Mistral(api_key=self.api_key)
 
 
 
 
 
 
 
 
 
37
  try:
38
- models = self.client.models.list() # Validate API key
39
  if not models:
40
  raise ValueError("No models available")
41
  except Exception as e:
42
- raise ValueError(f"Invalid API key: {str(e)}")
43
 
44
  @staticmethod
45
- def _encode_image(image_path: str) -> str:
46
- try:
47
- with open(image_path, "rb") as image_file:
48
- return base64.b64encode(image_file.read()).decode('utf-8')
49
- except FileNotFoundError:
50
- logger.error(f"Error: The file {image_path} was not found.")
51
- return None
52
- except Exception as e:
53
- logger.error(f"Error encoding image: {str(e)}")
54
- return None
 
 
 
 
 
55
 
56
  @staticmethod
57
  def _save_uploaded_file(file_input: Union[str, bytes], filename: str) -> str:
58
- """Save uploaded file to local storage and return path"""
59
- file_path = os.path.join(UPLOAD_FOLDER, filename)
60
- try:
61
- if isinstance(file_input, str):
62
- if file_input.startswith("http"):
63
- response = requests.get(file_input)
64
- response.raise_for_status()
65
- with open(file_path, 'wb') as f:
66
- f.write(response.content)
 
67
  else:
68
- # Copy file to new location if source and destination are different
69
- if os.path.abspath(file_input) != os.path.abspath(file_path):
70
- shutil.copy2(file_input, file_path)
71
- else:
72
- return file_input # Return original path if same file
73
- else:
74
- with open(file_path, 'wb') as f:
75
- if hasattr(file_input, 'read'):
76
- shutil.copyfileobj(file_input, f)
77
- else:
78
- f.write(file_input)
79
- return file_path
80
- except Exception as e:
81
- logger.error(f"Error saving file: {str(e)}")
82
- return None
83
 
84
  @staticmethod
85
  def _pdf_to_images(pdf_path: str) -> List[str]:
86
- """Convert PDF pages to images and return their paths"""
87
- image_paths = []
 
 
 
 
 
 
 
 
 
 
 
 
 
88
  try:
89
  pdf_document = fitz.open(pdf_path)
90
- for page_num in range(pdf_document.page_count):
91
- page = pdf_document[page_num]
92
- pix = page.get_pixmap()
93
- image_path = os.path.join(UPLOAD_FOLDER, f"page_{page_num + 1}.png")
94
- pix.save(image_path)
95
- image_paths.append(image_path)
96
  pdf_document.close()
97
- return image_paths
98
  except Exception as e:
99
- logger.error(f"Error converting PDF to images: {str(e)}")
100
- return []
101
 
102
- @staticmethod
103
- @contextmanager
104
- def _temp_file(content: bytes, suffix: str) -> str:
105
- temp_file = tempfile.NamedTemporaryFile(delete=False, suffix=suffix)
106
- try:
107
- temp_file.write(content)
108
- temp_file.close()
109
- yield temp_file.name
110
- finally:
111
- if os.path.exists(temp_file.name):
112
- os.unlink(temp_file.name)
113
-
114
- @retry(stop=stop_after_attempt(3), wait=wait_fixed(2))
115
  def _call_ocr_api(self, document: Union[DocumentURLChunk, ImageURLChunk]) -> OCRResponse:
116
- try:
117
- return self.client.ocr.process(model="mistral-ocr-latest", document=document, include_image_base64=True)
118
- except Exception as e:
119
- logger.error(f"OCR API call failed: {str(e)}")
120
- raise
121
 
122
- @retry(stop=stop_after_attempt(3), wait=wait_fixed(2))
123
  def _call_chat_complete(self, model: str, messages: List[Dict], **kwargs) -> Dict:
124
- try:
125
- return self.client.chat.complete(model=model, messages=messages, **kwargs)
126
- except Exception as e:
127
- logger.error(f"Chat complete API call failed: {str(e)}")
128
- raise
129
-
130
- def _get_file_content(self, file_input: Union[str, bytes]) -> bytes:
131
- if isinstance(file_input, str):
132
- if file_input.startswith("http"):
133
- response = requests.get(file_input)
134
- response.raise_for_status()
135
- return response.content
136
- else:
137
- with open(file_input, "rb") as f:
138
- return f.read()
139
- return file_input.read() if hasattr(file_input, 'read') else file_input
140
-
141
- def ocr_pdf_url(self, pdf_url: str) -> tuple[str, List[str]]:
142
- logger.info(f"Processing PDF URL: {pdf_url}")
143
- try:
144
- # Download and save PDF
145
- response = requests.get(pdf_url)
146
- response.raise_for_status()
147
- filename = pdf_url.split('/')[-1]
148
- pdf_path = self._save_uploaded_file(response.content, filename)
149
- if not pdf_path:
150
- return self._handle_error("PDF saving", Exception("Failed to save PDF")), []
151
-
152
- # Convert PDF to images for visualization
153
- image_paths = self._pdf_to_images(pdf_path)
154
-
155
- # Process with OCR
156
- response = self._call_ocr_api(DocumentURLChunk(document_url=pdf_url))
157
- return self._get_combined_markdown(response), image_paths
158
- except Exception as e:
159
- return self._handle_error("PDF URL processing", e), []
160
 
161
- def ocr_uploaded_pdf(self, pdf_file: Union[str, bytes]) -> tuple[str, List[str]]:
162
- file_name = getattr(pdf_file, 'name', 'unknown')
163
  logger.info(f"Processing uploaded PDF: {file_name}")
164
  try:
165
- # Save uploaded PDF
166
  pdf_path = self._save_uploaded_file(pdf_file, file_name)
167
- if not pdf_path:
168
- return self._handle_error("PDF saving", Exception("Failed to save PDF")), []
169
-
170
- # Convert PDF to images for visualization
171
  image_paths = self._pdf_to_images(pdf_path)
172
 
173
- # Process with OCR
174
  uploaded_file = self.client.files.upload(
175
  file={"file_name": pdf_path, "content": open(pdf_path, "rb")},
176
  purpose="ocr"
@@ -179,184 +144,189 @@ class OCRProcessor:
179
  response = self._call_ocr_api(DocumentURLChunk(document_url=signed_url.url))
180
  return self._get_combined_markdown(response), image_paths
181
  except Exception as e:
182
- return self._handle_error("uploaded PDF processing", e), []
183
 
184
- def ocr_image_url(self, image_url: str) -> tuple[str, str]:
185
- logger.info(f"Processing image URL: {image_url}")
186
- try:
187
- # Download and save image
188
- response = requests.get(image_url)
189
- response.raise_for_status()
190
- filename = image_url.split('/')[-1]
191
- image_path = self._save_uploaded_file(response.content, filename)
192
- if not image_path:
193
- return self._handle_error("image saving", Exception("Failed to save image")), None
194
-
195
- # Process with OCR
196
- response = self._call_ocr_api(ImageURLChunk(image_url=image_url))
197
- return self._get_combined_markdown(response), image_path
198
- except Exception as e:
199
- return self._handle_error("image URL processing", e), None
200
-
201
- def ocr_uploaded_image(self, image_file: Union[str, bytes]) -> tuple[str, str]:
202
- file_name = getattr(image_file, 'name', 'unknown')
203
  logger.info(f"Processing uploaded image: {file_name}")
204
  try:
205
- # Save uploaded image
206
  image_path = self._save_uploaded_file(image_file, file_name)
207
- if not image_path:
208
- return self._handle_error("image saving", Exception("Failed to save image")), None
209
-
210
- # Process with OCR
211
  encoded_image = self._encode_image(image_path)
212
- if encoded_image is None:
213
- return self._handle_error("image encoding", Exception("Failed to encode image")), None
214
  base64_url = f"data:image/jpeg;base64,{encoded_image}"
215
  response = self._call_ocr_api(ImageURLChunk(image_url=base64_url))
216
  return self._get_combined_markdown(response), image_path
217
  except Exception as e:
218
- return self._handle_error("uploaded image processing", e), None
219
 
220
  def document_understanding(self, doc_url: str, question: str) -> str:
221
- logger.info(f"Document understanding - URL: {doc_url}, Question: {question}")
222
  try:
223
  messages = [{"role": "user", "content": [
224
  TextChunk(text=question),
225
  DocumentURLChunk(document_url=doc_url)
226
  ]}]
227
- response = self._call_chat_complete(model="mistral-small-latest", messages=messages)
228
- return response.choices[0].message.content if response.choices else "No response received"
 
 
 
 
229
  except Exception as e:
230
  return self._handle_error("document understanding", e)
231
 
232
- def structured_ocr(self, image_file: Union[str, bytes]) -> tuple[str, str]:
233
- file_name = getattr(image_file, 'name', 'unknown')
234
- logger.info(f"Processing structured OCR for: {file_name}")
235
  try:
236
- # Save uploaded image
237
  image_path = self._save_uploaded_file(image_file, file_name)
238
- if not image_path:
239
- return self._handle_error("image saving", Exception("Failed to save image")), None
240
-
241
  encoded_image = self._encode_image(image_path)
242
- if encoded_image is None:
243
- return self._handle_error("image encoding", Exception("Failed to encode image")), None
244
  base64_url = f"data:image/jpeg;base64,{encoded_image}"
 
245
  ocr_response = self._call_ocr_api(ImageURLChunk(image_url=base64_url))
246
  markdown = self._get_combined_markdown(ocr_response)
247
 
248
  chat_response = self._call_chat_complete(
249
  model="pixtral-12b-latest",
250
  messages=[{
251
- "role": "user",
252
  "content": [
253
  ImageURLChunk(image_url=base64_url),
254
  TextChunk(text=(
255
  f"This is image's OCR in markdown:\n<BEGIN_IMAGE_OCR>\n{markdown}\n<END_IMAGE_OCR>.\n"
256
- "Convert this into a sensible structured json response with file_name, topics, languages, and ocr_contents fields"
257
  ))
258
  ]
259
  }],
260
  response_format={"type": "json_object"},
261
- temperature=0
262
  )
263
-
264
- response_content = chat_response.choices[0].message.content
265
- content = json.loads(response_content)
266
- return self._format_structured_response(image_path, content), image_path
267
  except Exception as e:
268
  return self._handle_error("structured OCR", e), None
269
 
270
- def _get_combined_markdown(self, response: OCRResponse) -> str:
271
- markdowns = []
272
- for page in response.pages:
273
- image_data = {}
274
- for img in page.images:
275
- image_data[img.id] = img.image_base64
276
- markdown = page.markdown
277
- for img_name, base64_str in image_data.items():
278
- markdown = markdown.replace(f"![{img_name}]({img_name})", f"![{img_name}]({base64_str})")
279
- markdowns.append(markdown)
280
- return "\n\n".join(markdowns)
281
 
282
  @staticmethod
283
  def _handle_error(context: str, error: Exception) -> str:
284
  logger.error(f"Error in {context}: {str(error)}")
285
- return f"**Error:** {str(error)}"
286
 
287
  @staticmethod
288
  def _format_structured_response(file_path: str, content: Dict) -> str:
289
  languages = {lang.alpha_2: lang.name for lang in pycountry.languages if hasattr(lang, 'alpha_2')}
290
- # Handle languages as a list instead of using .get()
291
- content_languages = content["languages"] if "languages" in content else [DEFAULT_LANGUAGE]
292
- valid_langs = [l for l in content_languages if l in languages.values()]
293
 
294
  response = {
295
  "file_name": Path(file_path).name,
296
- "topics": content["topics"] if "topics" in content else [],
297
- "languages": valid_langs or [DEFAULT_LANGUAGE],
298
- "ocr_contents": content["ocr_contents"] if "ocr_contents" in content else {}
299
  }
300
- return f"```json\n{json.dumps(response, indent=4)}\n```"
301
 
302
  def create_interface():
303
- with gr.Blocks(title="Mistral OCR App") as demo:
304
- gr.Markdown("# Mistral OCR App")
 
 
 
 
 
 
 
 
 
305
 
306
- api_key = gr.Textbox(label="API Key", type="password")
307
  processor_state = gr.State()
308
- status = gr.Markdown()
309
 
310
  def init_processor(key):
311
  try:
312
  processor = OCRProcessor(key)
313
- return processor, "API key validated!"
314
  except Exception as e:
315
- return None, f"Error: {str(e)}"
316
 
317
- gr.Button("Set API Key").click(
318
  fn=init_processor,
319
  inputs=api_key,
320
  outputs=[processor_state, status]
321
  )
322
 
323
  with gr.Tab("Image OCR"):
324
- image_input = gr.File(label="Upload Image", file_types=SUPPORTED_IMAGE_TYPES)
325
- image_preview = gr.Image(label="Image Preview")
326
- image_output = gr.Markdown()
 
 
 
 
 
327
 
328
  def process_image(processor, image):
329
- if not processor:
330
- return "Please set API key first", None
331
- ocr_result, image_path = processor.ocr_uploaded_image(image)
332
- return ocr_result, image_path
333
 
334
- gr.Button("Process Image").click(
335
  fn=process_image,
336
  inputs=[processor_state, image_input],
337
  outputs=[image_output, image_preview]
338
  )
339
 
340
  with gr.Tab("PDF OCR"):
341
- pdf_input = gr.File(label="Upload PDF", file_types=SUPPORTED_PDF_TYPES)
342
- pdf_gallery = gr.Gallery(label="PDF Pages")
343
- pdf_output = gr.Markdown()
 
 
 
 
 
344
 
345
  def process_pdf(processor, pdf):
346
- if not processor:
347
- return "Please set API key first", None
348
- ocr_result, image_paths = processor.ocr_uploaded_pdf(pdf)
349
- return ocr_result, image_paths
350
 
351
- gr.Button("Process PDF").click(
352
  fn=process_pdf,
353
  inputs=[processor_state, pdf_input],
354
  outputs=[pdf_output, pdf_gallery]
355
  )
356
 
357
- return demo
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
358
 
 
359
 
360
  if __name__ == "__main__":
361
- print(f"===== Application Startup at {os.environ.get('START_TIME', 'Unknown')} =====")
362
- create_interface().launch(share=True, debug=True)
 
 
 
 
 
7
  import pycountry
8
  import json
9
  import logging
10
+ from tenacity import retry, stop_after_attempt, wait_exponential
11
  import tempfile
12
+ from typing import Union, Dict, List, Optional, Tuple
13
  from contextlib import contextmanager
14
  import requests
15
  import shutil
16
+ from concurrent.futures import ThreadPoolExecutor
17
+ import time
18
 
19
  # Constants
20
  DEFAULT_LANGUAGE = "English"
21
+ SUPPORTED_IMAGE_TYPES = [".jpg", ".png", ".jpeg"]
22
  SUPPORTED_PDF_TYPES = [".pdf"]
23
  TEMP_FILE_EXPIRY = 7200 # 2 hours in seconds
24
+ UPLOAD_FOLDER = "uploads"
25
+ MAX_FILE_SIZE = 50 * 1024 * 1024 # 50MB
26
+ MAX_PDF_PAGES = 50
27
 
28
+ # Configuration
29
  os.makedirs(UPLOAD_FOLDER, exist_ok=True)
30
+ logging.basicConfig(
31
+ level=logging.INFO,
32
+ format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
33
+ handlers=[logging.StreamHandler()]
34
+ )
35
  logger = logging.getLogger(__name__)
36
 
37
  class OCRProcessor:
38
  def __init__(self, api_key: str):
39
+ self.api_key = self._validate_api_key(api_key)
 
 
40
  self.client = Mistral(api_key=self.api_key)
41
+ self._validate_client()
42
+
43
+ @staticmethod
44
+ def _validate_api_key(api_key: str) -> str:
45
+ if not api_key or not isinstance(api_key, str):
46
+ raise ValueError("Valid API key must be provided")
47
+ return api_key
48
+
49
+ def _validate_client(self) -> None:
50
  try:
51
+ models = self.client.models.list()
52
  if not models:
53
  raise ValueError("No models available")
54
  except Exception as e:
55
+ raise ValueError(f"API key validation failed: {str(e)}")
56
 
57
  @staticmethod
58
+ def _check_file_size(file_input: Union[str, bytes]) -> None:
59
+ if isinstance(file_input, str) and os.path.exists(file_input):
60
+ size = os.path.getsize(file_input)
61
+ elif hasattr(file_input, 'read'):
62
+ size = len(file_input.read())
63
+ file_input.seek(0) # Reset file pointer
64
+ else:
65
+ size = len(file_input)
66
+ if size > MAX_FILE_SIZE:
67
+ raise ValueError(f"File size exceeds {MAX_FILE_SIZE/1024/1024}MB limit")
68
+
69
+ @staticmethod
70
+ def _encode_image(image_path: str) -> Optional[str]:
71
+ with open(image_path, "rb") as image_file:
72
+ return base64.b64encode(image_file.read()).decode('utf-8')
73
 
74
  @staticmethod
75
  def _save_uploaded_file(file_input: Union[str, bytes], filename: str) -> str:
76
+ file_path = os.path.join(UPLOAD_FOLDER, f"{int(time.time())}_{filename}")
77
+ if isinstance(file_input, str) and file_input.startswith("http"):
78
+ response = requests.get(file_input, timeout=10)
79
+ response.raise_for_status()
80
+ with open(file_path, 'wb') as f:
81
+ f.write(response.content)
82
+ else:
83
+ with open(file_path, 'wb') as f:
84
+ if hasattr(file_input, 'read'):
85
+ shutil.copyfileobj(file_input, f)
86
  else:
87
+ f.write(file_input)
88
+ return file_path
 
 
 
 
 
 
 
 
 
 
 
 
 
89
 
90
  @staticmethod
91
  def _pdf_to_images(pdf_path: str) -> List[str]:
92
+ pdf_document = fitz.open(pdf_path)
93
+ if pdf_document.page_count > MAX_PDF_PAGES:
94
+ pdf_document.close()
95
+ raise ValueError(f"PDF exceeds maximum page limit of {MAX_PDF_PAGES}")
96
+
97
+ with ThreadPoolExecutor() as executor:
98
+ image_paths = list(executor.map(
99
+ lambda i: OCRProcessor._convert_page(pdf_path, i),
100
+ range(pdf_document.page_count)
101
+ ))
102
+ pdf_document.close()
103
+ return [path for path in image_paths if path]
104
+
105
+ @staticmethod
106
+ def _convert_page(pdf_path: str, page_num: int) -> Optional[str]:
107
  try:
108
  pdf_document = fitz.open(pdf_path)
109
+ page = pdf_document[page_num]
110
+ pix = page.get_pixmap(dpi=150) # Improved resolution
111
+ image_path = os.path.join(UPLOAD_FOLDER, f"page_{page_num + 1}_{int(time.time())}.png")
112
+ pix.save(image_path)
 
 
113
  pdf_document.close()
114
+ return image_path
115
  except Exception as e:
116
+ logger.error(f"Error converting page {page_num}: {str(e)}")
117
+ return None
118
 
119
+ @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=10))
 
 
 
 
 
 
 
 
 
 
 
 
120
  def _call_ocr_api(self, document: Union[DocumentURLChunk, ImageURLChunk]) -> OCRResponse:
121
+ return self.client.ocr.process(
122
+ model="mistral-ocr-latest",
123
+ document=document,
124
+ include_image_base64=True
125
+ )
126
 
127
+ @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=10))
128
  def _call_chat_complete(self, model: str, messages: List[Dict], **kwargs) -> Dict:
129
+ return self.client.chat.complete(model=model, messages=messages, **kwargs)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
130
 
131
+ def ocr_uploaded_pdf(self, pdf_file: Union[str, bytes]) -> Tuple[str, List[str]]:
132
+ file_name = getattr(pdf_file, 'name', f"pdf_{int(time.time())}.pdf")
133
  logger.info(f"Processing uploaded PDF: {file_name}")
134
  try:
135
+ self._check_file_size(pdf_file)
136
  pdf_path = self._save_uploaded_file(pdf_file, file_name)
 
 
 
 
137
  image_paths = self._pdf_to_images(pdf_path)
138
 
 
139
  uploaded_file = self.client.files.upload(
140
  file={"file_name": pdf_path, "content": open(pdf_path, "rb")},
141
  purpose="ocr"
 
144
  response = self._call_ocr_api(DocumentURLChunk(document_url=signed_url.url))
145
  return self._get_combined_markdown(response), image_paths
146
  except Exception as e:
147
+ return self._handle_error("PDF processing", e), []
148
 
149
+ def ocr_uploaded_image(self, image_file: Union[str, bytes]) -> Tuple[str, str]:
150
+ file_name = getattr(image_file, 'name', f"image_{int(time.time())}.jpg")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
151
  logger.info(f"Processing uploaded image: {file_name}")
152
  try:
153
+ self._check_file_size(image_file)
154
  image_path = self._save_uploaded_file(image_file, file_name)
 
 
 
 
155
  encoded_image = self._encode_image(image_path)
 
 
156
  base64_url = f"data:image/jpeg;base64,{encoded_image}"
157
  response = self._call_ocr_api(ImageURLChunk(image_url=base64_url))
158
  return self._get_combined_markdown(response), image_path
159
  except Exception as e:
160
+ return self._handle_error("image processing", e), None
161
 
162
  def document_understanding(self, doc_url: str, question: str) -> str:
 
163
  try:
164
  messages = [{"role": "user", "content": [
165
  TextChunk(text=question),
166
  DocumentURLChunk(document_url=doc_url)
167
  ]}]
168
+ response = self._call_chat_complete(
169
+ model="mistral-small-latest",
170
+ messages=messages,
171
+ temperature=0.1
172
+ )
173
+ return response.choices[0].message.content
174
  except Exception as e:
175
  return self._handle_error("document understanding", e)
176
 
177
+ def structured_ocr(self, image_file: Union[str, bytes]) -> Tuple[str, str]:
178
+ file_name = getattr(image_file, 'name', f"image_{int(time.time())}.jpg")
 
179
  try:
180
+ self._check_file_size(image_file)
181
  image_path = self._save_uploaded_file(image_file, file_name)
 
 
 
182
  encoded_image = self._encode_image(image_path)
 
 
183
  base64_url = f"data:image/jpeg;base64,{encoded_image}"
184
+
185
  ocr_response = self._call_ocr_api(ImageURLChunk(image_url=base64_url))
186
  markdown = self._get_combined_markdown(ocr_response)
187
 
188
  chat_response = self._call_chat_complete(
189
  model="pixtral-12b-latest",
190
  messages=[{
191
+ "role": "user",
192
  "content": [
193
  ImageURLChunk(image_url=base64_url),
194
  TextChunk(text=(
195
  f"This is image's OCR in markdown:\n<BEGIN_IMAGE_OCR>\n{markdown}\n<END_IMAGE_OCR>.\n"
196
+ "Convert this into a structured JSON response with file_name, topics, languages, and ocr_contents fields"
197
  ))
198
  ]
199
  }],
200
  response_format={"type": "json_object"},
201
+ temperature=0.1
202
  )
203
+ return self._format_structured_response(image_path, json.loads(chat_response.choices[0].message.content)), image_path
 
 
 
204
  except Exception as e:
205
  return self._handle_error("structured OCR", e), None
206
 
207
+ @staticmethod
208
+ def _get_combined_markdown(response: OCRResponse) -> str:
209
+ return "\n\n".join(
210
+ page.markdown for page in response.pages
211
+ if page.markdown.strip()
212
+ ) or "No text detected"
 
 
 
 
 
213
 
214
  @staticmethod
215
  def _handle_error(context: str, error: Exception) -> str:
216
  logger.error(f"Error in {context}: {str(error)}")
217
+ return f"**Error in {context}:** {str(error)}"
218
 
219
  @staticmethod
220
  def _format_structured_response(file_path: str, content: Dict) -> str:
221
  languages = {lang.alpha_2: lang.name for lang in pycountry.languages if hasattr(lang, 'alpha_2')}
222
+ content_languages = content.get("languages", [DEFAULT_LANGUAGE])
223
+ valid_langs = [l for l in content_languages if l in languages.values()] or [DEFAULT_LANGUAGE]
 
224
 
225
  response = {
226
  "file_name": Path(file_path).name,
227
+ "topics": content.get("topics", []),
228
+ "languages": valid_langs,
229
+ "ocr_contents": content.get("ocr_contents", {})
230
  }
231
+ return f"```json\n{json.dumps(response, indent=2, ensure_ascii=False)}\n```"
232
 
233
  def create_interface():
234
+ css = """
235
+ .output-markdown {font-size: 14px; max-height: 500px; overflow-y: auto;}
236
+ .status {color: #666; font-style: italic;}
237
+ """
238
+
239
+ with gr.Blocks(title="Mistral OCR App", css=css) as demo:
240
+ gr.Markdown("# Mistral OCR App\nUpload images or PDFs for OCR processing")
241
+
242
+ with gr.Row():
243
+ api_key = gr.Textbox(label="Mistral API Key", type="password", placeholder="Enter your API key")
244
+ set_key_btn = gr.Button("Set API Key", variant="primary")
245
 
 
246
  processor_state = gr.State()
247
+ status = gr.Markdown("Please enter API key", elem_classes="status")
248
 
249
  def init_processor(key):
250
  try:
251
  processor = OCRProcessor(key)
252
+ return processor, "API key validated successfully"
253
  except Exception as e:
254
+ return None, f"Error: {str(e)}"
255
 
256
+ set_key_btn.click(
257
  fn=init_processor,
258
  inputs=api_key,
259
  outputs=[processor_state, status]
260
  )
261
 
262
  with gr.Tab("Image OCR"):
263
+ with gr.Row():
264
+ image_input = gr.File(
265
+ label=f"Upload Image (max {MAX_FILE_SIZE/1024/1024}MB)",
266
+ file_types=SUPPORTED_IMAGE_TYPES
267
+ )
268
+ image_preview = gr.Image(label="Preview", height=300)
269
+ image_output = gr.Markdown(label="OCR Result", elem_classes="output-markdown")
270
+ process_image_btn = gr.Button("Process Image", variant="primary")
271
 
272
  def process_image(processor, image):
273
+ if not processor or not image:
274
+ return "Please set API key and upload an image", None
275
+ return processor.ocr_uploaded_image(image)
 
276
 
277
+ process_image_btn.click(
278
  fn=process_image,
279
  inputs=[processor_state, image_input],
280
  outputs=[image_output, image_preview]
281
  )
282
 
283
  with gr.Tab("PDF OCR"):
284
+ with gr.Row():
285
+ pdf_input = gr.File(
286
+ label=f"Upload PDF (max {MAX_FILE_SIZE/1024/1024}MB, {MAX_PDF_PAGES} pages)",
287
+ file_types=SUPPORTED_PDF_TYPES
288
+ )
289
+ pdf_gallery = gr.Gallery(label="PDF Pages", height=300)
290
+ pdf_output = gr.Markdown(label="OCR Result", elem_classes="output-markdown")
291
+ process_pdf_btn = gr.Button("Process PDF", variant="primary")
292
 
293
  def process_pdf(processor, pdf):
294
+ if not processor or not pdf:
295
+ return "Please set API key and upload a PDF", []
296
+ return processor.ocr_uploaded_pdf(pdf)
 
297
 
298
+ process_pdf_btn.click(
299
  fn=process_pdf,
300
  inputs=[processor_state, pdf_input],
301
  outputs=[pdf_output, pdf_gallery]
302
  )
303
 
304
+ with gr.Tab("Structured OCR"):
305
+ structured_input = gr.File(
306
+ label=f"Upload Image for Structured OCR (max {MAX_FILE_SIZE/1024/1024}MB)",
307
+ file_types=SUPPORTED_IMAGE_TYPES
308
+ )
309
+ structured_output = gr.Markdown(label="Structured Result", elem_classes="output-markdown")
310
+ structured_preview = gr.Image(label="Preview", height=300)
311
+ process_structured_btn = gr.Button("Process Structured OCR", variant="primary")
312
+
313
+ def process_structured(processor, image):
314
+ if not processor or not image:
315
+ return "Please set API key and upload an image", None
316
+ return processor.structured_ocr(image)
317
+
318
+ process_structured_btn.click(
319
+ fn=process_structured,
320
+ inputs=[processor_state, structured_input],
321
+ outputs=[structured_output, structured_preview]
322
+ )
323
 
324
+ return demo
325
 
326
  if __name__ == "__main__":
327
+ os.environ['START_TIME'] = time.strftime('%Y-%m-%d %H:%M:%S')
328
+ print(f"===== Application Startup at {os.environ['START_TIME']} =====")
329
+ create_interface().launch(
330
+ share=True,
331
+ debug=True,
332
+ )