Spaces:
Sleeping
Sleeping
File size: 5,629 Bytes
1af10cc a806ca2 1af10cc a806ca2 1af10cc a806ca2 1af10cc a806ca2 1af10cc a806ca2 1af10cc a806ca2 1af10cc a806ca2 1af10cc a806ca2 1af10cc a806ca2 1af10cc a806ca2 1af10cc a806ca2 1af10cc a806ca2 1af10cc a806ca2 1af10cc a806ca2 1af10cc a806ca2 1af10cc a806ca2 1af10cc a806ca2 1af10cc a806ca2 1af10cc a806ca2 1af10cc a806ca2 1af10cc a806ca2 1af10cc a806ca2 1af10cc a806ca2 1af10cc a806ca2 1af10cc a806ca2 2dc3c19 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 |
"""
OCR (Optical Character Recognition) tools for TutorX with Mistral OCR integration.
"""
import os
from typing import Dict, Any, Optional
from mcp_server.mcp_instance import mcp
from mcp_server.model.gemini_flash import GeminiFlash
from mistralai import Mistral
# Initialize models
MODEL = GeminiFlash()
client = Mistral(api_key="5oHGQTYDGD3ecQZSqdLsr5ZL4nOsfGYj")
async def mistral_ocr_request(document_url: str) -> dict:
"""
Send OCR request to Mistral OCR service using document URL.
Args:
document_url: URL of the document to process
Returns:
OCR response from Mistral
"""
try:
# Process document with Mistral OCR
ocr_response = client.ocr.process(
model="mistral-ocr-latest",
document={
"type": "document_url",
"document_url": document_url
},
include_image_base64=True
)
# Convert the response to a dictionary
if hasattr(ocr_response, 'model_dump'):
return ocr_response.model_dump()
return ocr_response or {}
except Exception as e:
raise RuntimeError(f"Error processing document with Mistral OCR: {str(e)}")
@mcp.tool()
async def mistral_document_ocr(document_url: str) -> dict:
"""
Extract text from any document (PDF, image, etc.) using Mistral OCR service with document URL,
then use Gemini to summarize and extract key points as JSON.
Args:
document_url (str): URL of the document to process
Returns:
Dictionary with OCR results and AI analysis
"""
try:
if not document_url:
return {"error": "Document URL is required"}
# Extract filename from URL
filename = document_url.split('/')[-1] if '/' in document_url else "document"
# Call Mistral OCR API
ocr_response = await mistral_ocr_request(document_url)
# Extract text from Mistral response
extracted_text = ""
page_count = 0
if "pages" in ocr_response and isinstance(ocr_response["pages"], list):
# Extract text from each page's markdown field
extracted_text = "\n\n".join(
page.get("markdown", "")
for page in ocr_response["pages"]
if isinstance(page, dict) and "markdown" in page
)
page_count = len(ocr_response["pages"])
# Count words and characters
word_count = len(extracted_text.split())
char_count = len(extracted_text)
# Build result
result = {
"success": True,
"filename": filename,
"document_url": document_url,
"extracted_text": extracted_text,
"character_count": char_count,
"word_count": word_count,
"page_count": page_count,
"mistral_response": ocr_response,
"processing_service": "Mistral OCR",
"llm_analysis": {
"error": None,
"summary": "",
"key_points": [],
"document_type": "unknown"
}
}
# If we have text, try to analyze it with the LLM
if extracted_text.strip():
try:
# Use the LLM to analyze the extracted text
llm_prompt = f"""Analyze the following document and provide a brief summary, 3-5 key points, and the document type.
Document:
{extracted_text[:4000]} # Limit to first 4000 chars to avoid context window issues
"""
# Await the coroutine
llm_response = await MODEL.generate_text(llm_prompt)
# Parse the LLM response
if llm_response:
# Try to parse as JSON if the response is in JSON format
try:
import json
llm_data = json.loads(llm_response)
result["llm_analysis"].update({
"summary": llm_data.get("summary", ""),
"key_points": llm_data.get("key_points", []),
"document_type": llm_data.get("document_type", "document")
})
except (json.JSONDecodeError, AttributeError):
# If not JSON, use the raw response as summary
result["llm_analysis"].update({
"summary": str(llm_response),
"document_type": "document"
})
except Exception as e:
result["llm_analysis"]["error"] = f"LLM analysis error: {str(e)}"
return result
except Exception as e:
return {
"success": False,
"error": f"Error processing document with Mistral OCR: {str(e)}",
"document_url": document_url
}
def clean_json_trailing_commas(json_text: str) -> str:
import re
return re.sub(r',([ \t\r\n]*[}}\]])', r'\1', json_text)
def extract_json_from_text(text: str):
import re, json
if not text or not isinstance(text, str):
return None
# Remove code fences
text = re.sub(r'^\s*```(?:json)?\s*', '', text, flags=re.IGNORECASE)
text = re.sub(r'\s*```\s*$', '', text, flags=re.IGNORECASE)
text = text.strip()
# Remove trailing commas
cleaned = clean_json_trailing_commas(text)
return json.loads(cleaned) |