Spaces:
Sleeping
Sleeping
Meet Patel
Refactor TutorX MCP server to integrate Mistral OCR for document processing, update concept graph tools for LLM-driven responses, and enhance learning path generation with Gemini. Transitioned various tools to utilize LLM for improved educational interactions and streamlined API responses.
a806ca2
""" | |
Gradio app for document OCR processing with Mistral OCR. | |
Features: | |
- File upload to storage API | |
- Document processing using Mistral OCR | |
- Display of OCR results | |
""" | |
import os | |
import requests | |
import gradio as gr | |
import asyncio | |
import json | |
import tempfile | |
from typing import Dict, Any, Optional | |
from pathlib import Path | |
# Mistral AI | |
from mistralai import Mistral | |
# API Configuration | |
STORAGE_API_URL = "https://storage-bucket-api.vercel.app/upload" | |
MISTRAL_API_KEY = "5oHGQTYDGD3ecQZSqdLsr5ZL4nOsfGYj" # In production, use environment variables | |
# Initialize Mistral client | |
client = Mistral(api_key=MISTRAL_API_KEY) | |
class MistralOCRProcessor: | |
"""Handles document OCR processing using Mistral AI""" | |
def __init__(self, client: Mistral = None): | |
self.client = client or Mistral(api_key=MISTRAL_API_KEY) | |
async def process_document(self, document_path: str) -> Dict[str, Any]: | |
""" | |
Process a document using Mistral OCR | |
Args: | |
document_path: Local path to the document to process | |
Returns: | |
Dict containing OCR results or error information | |
""" | |
try: | |
# For local files, we need to upload to a temporary URL first | |
upload_result = await StorageManager().upload_file(document_path) | |
if not upload_result.get("success"): | |
return { | |
"success": False, | |
"result": None, | |
"error": f"Upload failed: {upload_result.get('error')}" | |
} | |
document_url = upload_result.get("storage_url") | |
if not document_url: | |
return { | |
"success": False, | |
"result": None, | |
"error": "No storage URL returned from upload" | |
} | |
# Process with Mistral OCR | |
ocr_response = self.client.ocr.process( | |
model="mistral-ocr-latest", | |
document={ | |
"type": "document_url", | |
"document_url": document_url | |
}, | |
include_image_base64=True | |
) | |
# Convert response to dict if it's a Pydantic model | |
if hasattr(ocr_response, 'model_dump'): | |
result = ocr_response.model_dump() | |
else: | |
result = ocr_response | |
return { | |
"success": True, | |
"result": result, | |
"document_url": document_url, | |
"error": None | |
} | |
except Exception as e: | |
return { | |
"success": False, | |
"result": None, | |
"error": f"OCR processing error: {str(e)}" | |
} | |
class StorageManager: | |
"""Handles file uploads to the storage service""" | |
def __init__(self, api_url: str = STORAGE_API_URL): | |
self.api_url = api_url | |
async def upload_file(self, file_path: str) -> Dict[str, Any]: | |
""" | |
Upload a file to the storage service | |
Args: | |
file_path: Path to the file to upload | |
Returns: | |
Dict containing upload result or error information | |
""" | |
try: | |
with open(file_path, 'rb') as f: | |
files = {'file': (os.path.basename(file_path), f)} | |
response = requests.post(self.api_url, files=files) | |
response.raise_for_status() | |
result = response.json() | |
if not result.get('success'): | |
raise Exception(result.get('message', 'Upload failed')) | |
return { | |
"success": True, | |
"storage_url": result.get('storage_url'), | |
"original_filename": result.get('original_filename'), | |
"file_size": result.get('file_size'), | |
"error": None | |
} | |
except Exception as e: | |
return { | |
"success": False, | |
"storage_url": None, | |
"original_filename": os.path.basename(file_path), | |
"file_size": os.path.getsize(file_path) if os.path.exists(file_path) else 0, | |
"error": f"Upload failed: {str(e)}" | |
} | |
# Initialize processors | |
ocr_processor = MistralOCRProcessor() | |
storage_manager = StorageManager() | |
async def process_document_ocr(file_path: str) -> Dict[str, Any]: | |
""" | |
Process a document through the complete OCR pipeline | |
Args: | |
file_path: Path to the document file | |
Returns: | |
Dict containing processing results | |
""" | |
# Process with Mistral OCR (handles upload internally) | |
result = await ocr_processor.process_document(file_path) | |
if not result.get("success"): | |
return { | |
"success": False, | |
"upload": {"success": False}, | |
"ocr": None, | |
"error": result.get("error", "Unknown error") | |
} | |
# Get the original filename from the file path | |
original_filename = Path(file_path).name | |
file_size = os.path.getsize(file_path) | |
return { | |
"success": True, | |
"upload": { | |
"success": True, | |
"storage_url": result.get("document_url"), | |
"original_filename": original_filename, | |
"file_size": file_size | |
}, | |
"ocr": result.get("result"), | |
"error": None, | |
"storage_url": result.get("document_url") | |
} | |
# Gradio Interface | |
def create_gradio_interface(): | |
"""Create and return the Gradio interface""" | |
with gr.Blocks(title="Document OCR Processor", theme=gr.themes.Soft()) as demo: | |
gr.Markdown("# Document OCR Processor") | |
gr.Markdown("Upload a document (PDF, JPG, JPEG, PNG) to process with Mistral OCR") | |
with gr.Row(): | |
with gr.Column(scale=2): | |
file_input = gr.File(label="Upload Document", type="filepath") | |
process_btn = gr.Button("Process Document", variant="primary") | |
with gr.Accordion("Debug Info", open=False): | |
status_text = gr.Textbox(label="Status", interactive=False) | |
with gr.Column(scale=3): | |
with gr.Tabs(): | |
with gr.TabItem("OCR Results"): | |
ocr_output = gr.JSON(label="OCR Output") | |
with gr.TabItem("Extracted Text"): | |
text_output = gr.Textbox(label="Extracted Text", lines=20, max_lines=50) | |
with gr.TabItem("Upload Info"): | |
upload_info = gr.JSON(label="Upload Information") | |
def update_status(message): | |
return message | |
async def process_file(file_path): | |
try: | |
status = "Starting document processing..." | |
yield {status_text: update_status(status)} | |
# Process the document | |
result = await process_document_ocr(file_path) | |
if not result["success"]: | |
error_msg = result.get('error', 'Unknown error') | |
yield { | |
status_text: update_status(f"β {error_msg}"), | |
ocr_output: None, | |
text_output: "", | |
upload_info: None | |
} | |
return | |
# Extract text from OCR result | |
extracted_text = "" | |
ocr_data = result.get("ocr", {}) | |
# Handle different OCR result formats | |
if isinstance(ocr_data, dict): | |
if "text" in ocr_data: | |
extracted_text = ocr_data["text"] | |
elif "pages" in ocr_data and isinstance(ocr_data["pages"], list): | |
extracted_text = "\n\n".join( | |
page.get("text", "") | |
for page in ocr_data["pages"] | |
if page and isinstance(page, dict) and "text" in page | |
) | |
# Prepare upload info | |
upload_info_data = { | |
"original_filename": result["upload"].get("original_filename"), | |
"file_size": result["upload"].get("file_size"), | |
"storage_url": result["upload"].get("storage_url"), | |
} | |
yield { | |
status_text: update_status("β Document processed successfully"), | |
ocr_output: ocr_data, | |
text_output: extracted_text, | |
upload_info: upload_info_data | |
} | |
except Exception as e: | |
import traceback | |
error_trace = traceback.format_exc() | |
error_msg = f"Unexpected error: {str(e)}" | |
yield { | |
status_text: update_status(f"β {error_msg}"), | |
ocr_output: None, | |
text_output: "", | |
upload_info: None | |
} | |
# Connect the process button to the processing function | |
process_btn.click( | |
fn=process_file, | |
inputs=file_input, | |
outputs=[status_text, ocr_output, text_output, upload_info] | |
) | |
# Auto-process when a file is uploaded | |
file_input.change( | |
fn=lambda x: "Ready to process. Click 'Process Document' to continue.", | |
inputs=file_input, | |
outputs=status_text | |
) | |
return demo.launch(server_name="0.0.0.0", server_port=7860) | |
if __name__ == "__main__": | |
# Create and launch the interface | |
create_gradio_interface() | |