TutorX-MCP / mcp_server /tools /ocr_tools.py
Meet Patel
Refactor response handling in app.py and various tool modules to improve JSON parsing and error management. Introduce utility functions for cleaning and extracting JSON from text, ensuring consistent data handling across quiz, lesson, learning path, and interaction tools. Enhance robustness by accommodating multiple response formats, including strings and dictionaries, to streamline educational interactions.
2dc3c19
"""
OCR (Optical Character Recognition) tools for TutorX with Mistral OCR integration.
"""
import os
from typing import Dict, Any, Optional
from mcp_server.mcp_instance import mcp
from mcp_server.model.gemini_flash import GeminiFlash
from mistralai import Mistral
# Initialize models
MODEL = GeminiFlash()
client = Mistral(api_key="5oHGQTYDGD3ecQZSqdLsr5ZL4nOsfGYj")
async def mistral_ocr_request(document_url: str) -> dict:
"""
Send OCR request to Mistral OCR service using document URL.
Args:
document_url: URL of the document to process
Returns:
OCR response from Mistral
"""
try:
# Process document with Mistral OCR
ocr_response = client.ocr.process(
model="mistral-ocr-latest",
document={
"type": "document_url",
"document_url": document_url
},
include_image_base64=True
)
# Convert the response to a dictionary
if hasattr(ocr_response, 'model_dump'):
return ocr_response.model_dump()
return ocr_response or {}
except Exception as e:
raise RuntimeError(f"Error processing document with Mistral OCR: {str(e)}")
@mcp.tool()
async def mistral_document_ocr(document_url: str) -> dict:
"""
Extract text from any document (PDF, image, etc.) using Mistral OCR service with document URL,
then use Gemini to summarize and extract key points as JSON.
Args:
document_url (str): URL of the document to process
Returns:
Dictionary with OCR results and AI analysis
"""
try:
if not document_url:
return {"error": "Document URL is required"}
# Extract filename from URL
filename = document_url.split('/')[-1] if '/' in document_url else "document"
# Call Mistral OCR API
ocr_response = await mistral_ocr_request(document_url)
# Extract text from Mistral response
extracted_text = ""
page_count = 0
if "pages" in ocr_response and isinstance(ocr_response["pages"], list):
# Extract text from each page's markdown field
extracted_text = "\n\n".join(
page.get("markdown", "")
for page in ocr_response["pages"]
if isinstance(page, dict) and "markdown" in page
)
page_count = len(ocr_response["pages"])
# Count words and characters
word_count = len(extracted_text.split())
char_count = len(extracted_text)
# Build result
result = {
"success": True,
"filename": filename,
"document_url": document_url,
"extracted_text": extracted_text,
"character_count": char_count,
"word_count": word_count,
"page_count": page_count,
"mistral_response": ocr_response,
"processing_service": "Mistral OCR",
"llm_analysis": {
"error": None,
"summary": "",
"key_points": [],
"document_type": "unknown"
}
}
# If we have text, try to analyze it with the LLM
if extracted_text.strip():
try:
# Use the LLM to analyze the extracted text
llm_prompt = f"""Analyze the following document and provide a brief summary, 3-5 key points, and the document type.
Document:
{extracted_text[:4000]} # Limit to first 4000 chars to avoid context window issues
"""
# Await the coroutine
llm_response = await MODEL.generate_text(llm_prompt)
# Parse the LLM response
if llm_response:
# Try to parse as JSON if the response is in JSON format
try:
import json
llm_data = json.loads(llm_response)
result["llm_analysis"].update({
"summary": llm_data.get("summary", ""),
"key_points": llm_data.get("key_points", []),
"document_type": llm_data.get("document_type", "document")
})
except (json.JSONDecodeError, AttributeError):
# If not JSON, use the raw response as summary
result["llm_analysis"].update({
"summary": str(llm_response),
"document_type": "document"
})
except Exception as e:
result["llm_analysis"]["error"] = f"LLM analysis error: {str(e)}"
return result
except Exception as e:
return {
"success": False,
"error": f"Error processing document with Mistral OCR: {str(e)}",
"document_url": document_url
}
def clean_json_trailing_commas(json_text: str) -> str:
import re
return re.sub(r',([ \t\r\n]*[}}\]])', r'\1', json_text)
def extract_json_from_text(text: str):
import re, json
if not text or not isinstance(text, str):
return None
# Remove code fences
text = re.sub(r'^\s*```(?:json)?\s*', '', text, flags=re.IGNORECASE)
text = re.sub(r'\s*```\s*$', '', text, flags=re.IGNORECASE)
text = text.strip()
# Remove trailing commas
cleaned = clean_json_trailing_commas(text)
return json.loads(cleaned)