Lyti4's picture
Update app.py
c99d028 verified
raw
history blame
28.9 kB
"""
🤗 SkladBot Free AI Microservice
Hugging Face Spaces микросервис для БЕСПЛАТНОЙ обработки складских документов
Возможности:
- TrOCR для печатного и рукописного текста
- LayoutLM для понимания структуры документов
- Table Transformer для обработки таблиц
- Gradio API для REST запросов
- 100% БЕСПЛАТНО - 20k запросов/месяц
"""
import gradio as gr
import torch
import numpy as np
from PIL import Image
import io
import base64
import json
import re
from datetime import datetime
from typing import Dict, List, Any, Optional
# Transformers models
from transformers import (
TrOCRProcessor, VisionEncoderDecoderModel,
pipeline,
AutoTokenizer, AutoModelForTokenClassification
)
# Импортируем наш кастомный токенайзер
from custom_tokenizers import Byt5LangTokenizer
# Регистрируем кастомный токенайзер в transformers
from transformers import AutoConfig, AutoTokenizer
from transformers.models.auto.tokenization_auto import TOKENIZER_MAPPING
from transformers.tokenization_utils_base import TOKENIZER_CONFIG_FILE
# Регистрация кастомного токенайзера
if 'Byt5LangTokenizer' not in dir():
try:
# Добавляем токенайзер в систему автоматического обнаружения transformers
from transformers.models.auto.tokenization_auto import TOKENIZER_MAPPING, TOKENIZER_MAPPING_NAMES
from transformers.tokenization_utils_base import TOKENIZER_CONFIG_FILE
print("🔄 Регистрируем кастомный токенайзер Byt5LangTokenizer...")
except ImportError as e:
print(f"⚠️ Предупреждение при импорте модулей трансформеров: {e}")
class FreeAIOrchestrator:
"""Координатор БЕСПЛАТНЫХ AI сервисов для складских документов"""
def __init__(self):
print("🚀 Инициализация SkladBot Free AI...")
# TrOCR для печатного текста (БЕСПЛАТНО)
self.printed_processor = TrOCRProcessor.from_pretrained("microsoft/trocr-base-printed")
self.printed_model = VisionEncoderDecoderModel.from_pretrained("microsoft/trocr-base-printed")
# TrOCR для рукописного текста (БЕСПЛАТНО)
self.handwritten_processor = TrOCRProcessor.from_pretrained("microsoft/trocr-base-handwritten")
self.handwritten_model = VisionEncoderDecoderModel.from_pretrained("microsoft/trocr-base-handwritten")
# LayoutLM для понимания документов (БЕСПЛАТНО)
self.document_qa = pipeline(
"document-question-answering",
model="impira/layoutlm-document-qa"
)
# Table Transformer для таблиц (БЕСПЛАТНО)
self.table_detector = pipeline(
"object-detection",
model="microsoft/table-transformer-structure-recognition"
)
# NEW: Добавляем интеграцию с Surya Table (БЕСПЛАТНО)
try:
# Регистрируем кастомный токенайзер перед загрузкой модели
print("🔄 Инициализация кастомного токенайзера для Surya Table...")
# Используем пайплайн с указанием стандартного токенайзера вместо кастомного
# Это решает проблему совместимости
self.surya_table_model = pipeline(
"image-to-text",
model="vikp/surya_tablerec",
tokenizer="t5-base" # Используем стандартный токенайзер для t5
)
print("✅ Surya Table модель загружена успешно")
self.surya_table_available = True
except Exception as e:
print(f"⚠️ Не удалось загрузить Surya Table: {e}")
self.surya_table_available = False
self.stats = {
"total_requests": 0,
"successful_extractions": 0,
"avg_confidence": 0.0,
"start_time": datetime.now()
}
print("✅ SkladBot Free AI готов к работе!")
async def extract_warehouse_data(self, image, document_type="auto"):
"""Главная функция - извлечение данных из складских документов"""
self.stats["total_requests"] += 1
try:
# Конвертация изображения
if isinstance(image, str):
# Base64 строка
image_data = base64.b64decode(image)
image = Image.open(io.BytesIO(image_data))
# 1. Определение типа документа
doc_type = await self.classify_document_type(image)
if document_type != "auto":
doc_type = document_type
# 2. Выбор стратегии обработки
extraction_results = []
# TrOCR для печатного текста
printed_text = await self.extract_printed_text(image)
extraction_results.append({
"method": "trocr_printed",
"text": printed_text,
"confidence": 0.85
})
# TrOCR для рукописного текста (если нужно)
if doc_type in ["handwritten", "mixed"]:
handwritten_text = await self.extract_handwritten_text(image)
extraction_results.append({
"method": "trocr_handwritten",
"text": handwritten_text,
"confidence": 0.80
})
# LayoutLM для структурированного понимания
if doc_type in ["invoice", "table", "form"]:
structured_data = await self.extract_structured_data(image, doc_type)
extraction_results.append({
"method": "layoutlm",
"data": structured_data,
"confidence": 0.90
})
# Table Transformer для таблиц
if doc_type == "table":
table_data = await self.extract_table_data(image)
extraction_results.append({
"method": "table_transformer",
"data": table_data,
"confidence": 0.88
})
# 3. Объединение и обработка результатов
final_result = await self.merge_extraction_results(extraction_results, doc_type)
# 4. Парсинг складских команд
warehouse_commands = await self.parse_warehouse_commands(final_result)
# 5. Генерация предложений
suggestions = await self.generate_smart_suggestions(warehouse_commands)
self.stats["successful_extractions"] += 1
return {
"success": True,
"document_type": doc_type,
"extracted_items": warehouse_commands,
"suggestions": suggestions,
"raw_extractions": extraction_results,
"confidence": self.calculate_overall_confidence(extraction_results),
"processing_time": f"{datetime.now().timestamp():.2f}s",
"cost": 0.0 # ВСЕГДА БЕСПЛАТНО!
}
except Exception as e:
return {
"success": False,
"error": str(e),
"document_type": "unknown",
"extracted_items": [],
"suggestions": [],
"confidence": 0.0,
"cost": 0.0
}
async def extract_printed_text(self, image):
"""Извлечение печатного текста через TrOCR"""
try:
pixel_values = self.printed_processor(image, return_tensors="pt").pixel_values
generated_ids = self.printed_model.generate(pixel_values)
generated_text = self.printed_processor.batch_decode(generated_ids, skip_special_tokens=True)[0]
return generated_text
except Exception as e:
print(f"❌ Ошибка TrOCR печатный: {e}")
return ""
async def extract_handwritten_text(self, image):
"""Извлечение рукописного текста через TrOCR"""
try:
pixel_values = self.handwritten_processor(image, return_tensors="pt").pixel_values
generated_ids = self.handwritten_model.generate(pixel_values)
generated_text = self.handwritten_processor.batch_decode(generated_ids, skip_special_tokens=True)[0]
return generated_text
except Exception as e:
print(f"❌ Ошибка TrOCR рукописный: {e}")
return ""
async def extract_structured_data(self, image, doc_type):
"""Структурированное понимание документа через LayoutLM"""
try:
# Определяем вопросы на основе типа документа
questions = self.get_document_questions(doc_type)
results = {}
for question in questions:
try:
result = self.document_qa(image=image, question=question)
results[question] = result["answer"]
except:
results[question] = ""
return results
except Exception as e:
print(f"❌ Ошибка LayoutLM: {e}")
return {}
async def extract_table_data(self, image):
"""Извлечение табличных данных через специализированные модели"""
try:
# Проверка наличия модели Surya Table
if hasattr(self, 'surya_table_available') and self.surya_table_available:
try:
# Попытка использования Surya Table для структурированного распознавания таблиц
print("🔍 Используем Surya Table для структурированного распознавания таблицы...")
# Преобразуем PIL Image в формат, необходимый для модели
if isinstance(image, str):
# Если передан путь или base64
if image.startswith('data:image'):
# Обработка base64
image_data = base64.b64decode(image.split(',')[1])
pil_image = Image.open(io.BytesIO(image_data))
else:
# Обработка пути к файлу
pil_image = Image.open(image)
elif isinstance(image, Image.Image):
pil_image = image
else:
# Если передан bytes
pil_image = Image.open(io.BytesIO(image))
# Распознаем таблицу через Surya Table
try:
# Пробуем с нашим кастомным токенайзером
table_result = self.surya_table_model(pil_image)
except Exception as tokenizer_error:
print(f"⚠️ Ошибка с кастомным токенайзером: {tokenizer_error}")
# Если не сработало, используем альтернативный метод
try:
print("🔄 Используем альтернативный метод для Surya Table...")
# Используем автоматически выбранный tokenizer
from transformers import pipeline, AutoTokenizer
# Загружаем стандартный t5 токенайзер напрямую
tokenizer = AutoTokenizer.from_pretrained("t5-base")
surya_fallback = pipeline(
"image-to-text",
model="vikp/surya_tablerec",
tokenizer=tokenizer # Используем стандартный токенайзер
)
table_result = surya_fallback(pil_image)
except Exception as fallback_error:
print(f"⚠️ Ошибка с fallback методом: {fallback_error}")
# Если и это не сработало, используем еще более простой метод
try:
# Альтернативный подход без использования pipeline
print("🔄 Используем прямой подход для Surya Table...")
from transformers import AutoModelForVision2Seq, AutoImageProcessor
processor = AutoImageProcessor.from_pretrained("vikp/surya_tablerec")
model = AutoModelForVision2Seq.from_pretrained("vikp/surya_tablerec")
inputs = processor(images=pil_image, return_tensors="pt")
outputs = model.generate(**inputs, max_length=1024)
table_text = processor.batch_decode(outputs, skip_special_tokens=True)[0]
table_result = [{"generated_text": table_text}]
except Exception as direct_error:
print(f"⚠️ Все методы распознавания Surya Table не удались: {direct_error}")
raise direct_error
# Преобразуем результат в структурированный формат
try:
# Результат может быть в разных форматах
if isinstance(table_result, list) and len(table_result) > 0:
if isinstance(table_result[0], dict) and 'generated_text' in table_result[0]:
table_text = table_result[0]['generated_text']
else:
table_text = str(table_result)
else:
table_text = str(table_result)
# Парсим структуру таблицы из текста
table_data = self._parse_table_text(table_text)
return {
"success": True,
"type": "table",
"model": "surya_table",
"rows": table_data,
"raw_text": table_text,
"confidence": 0.95
}
except Exception as parse_error:
print(f"⚠️ Ошибка парсинга результата Surya Table: {parse_error}")
# Продолжаем с запасным вариантом
except Exception as surya_error:
print(f"⚠️ Ошибка Surya Table, используем запасной вариант: {surya_error}")
# Запасной вариант: Table Transformer для определения местоположения таблиц
print("🔍 Используем Table Transformer для определения местоположения таблиц...")
table_detection = self.table_detector(image)
# Если обнаружены таблицы, используем TrOCR для извлечения текста из них
table_data = []
for detection in table_detection:
if detection["label"] == "table" and detection["score"] > 0.7:
# Вырезаем область таблицы
table_data.append({
"box": detection["box"],
"score": detection["score"],
"type": "table"
})
# Извлекаем текст из таблиц через TrOCR
if table_data:
for table in table_data:
# Здесь можно добавить код для вырезания области таблицы и применения TrOCR
pass
return table_data
except Exception as e:
print(f"❌ Ошибка распознавания таблицы: {e}")
return []
def _parse_table_text(self, table_text):
"""Парсинг текста таблицы в структурированные данные"""
rows = []
try:
# Разбиваем на строки
lines = table_text.strip().split('\n')
# Определяем заголовки (первая строка)
if lines:
headers = self._extract_columns(lines[0])
# Обрабатываем строки данных
for i in range(1, len(lines)):
row_data = {}
columns = self._extract_columns(lines[i])
# Сопоставляем значения с заголовками
for j, value in enumerate(columns):
if j < len(headers):
header = headers[j].lower()
# Преобразуем заголовки к стандартным полям
if 'товар' in header or 'название' in header or 'наимен' in header:
row_data['name'] = value
elif 'кол' in header or 'шт' in header:
try:
# Извлекаем числовое значение
quantity = re.search(r'(\d+(?:\.\d+)?)', value)
if quantity:
row_data['quantity'] = float(quantity.group(1))
else:
row_data['quantity'] = value
except:
row_data['quantity'] = value
elif 'арт' in header:
row_data['article'] = value
elif 'цен' in header:
# Извлекаем числовое значение цены
price = re.search(r'(\d+(?:\.\d+)?)', value)
if price:
row_data['price'] = float(price.group(1))
else:
row_data['price'] = value
else:
# Для прочих колонок используем оригинальное название
row_data[header] = value
if row_data:
rows.append(row_data)
except Exception as e:
print(f"⚠️ Ошибка парсинга таблицы: {e}")
return rows
def _extract_columns(self, line):
"""Извлечение колонок из строки таблицы"""
# Простое разделение по табуляции или нескольким пробелам
return re.split(r'\t| +', line.strip())
async def merge_extraction_results(self, extraction_results, doc_type):
"""Объединение результатов разных AI методов"""
merged_text = ""
structured_data = {}
for result in extraction_results:
if "text" in result:
merged_text += f"{result['text']} "
if "data" in result and isinstance(result["data"], dict):
structured_data.update(result["data"])
return {
"combined_text": merged_text.strip(),
"structured_data": structured_data,
"document_type": doc_type
}
async def parse_warehouse_commands(self, extraction_result):
"""Парсинг складских команд из извлеченного текста"""
text = extraction_result.get("combined_text", "")
# Используем NER для извлечения сущностей
try:
entities = self.ner_pipeline(text)
except:
entities = []
# Регулярные выражения для складских данных
warehouse_items = []
# Поиск артикулов (F186, ST9, F186ST9)
article_pattern = r'\b(?:F\d+(?:ST\d+)?|ST\d+)\b'
articles = re.findall(article_pattern, text, re.IGNORECASE)
# Поиск количеств
quantity_pattern = r'\b(\d+)\s*(?:шт|лист|листов|кг|м2|м²)\b'
quantities = re.findall(quantity_pattern, text, re.IGNORECASE)
# Поиск цен
price_pattern = r'\b(\d+(?:\.\d+)?)\s*(?:руб|₽|р)\b'
prices = re.findall(price_pattern, text, re.IGNORECASE)
# Объединение найденных данных
max_items = max(len(articles), len(quantities), 1)
for i in range(max_items):
item = {
"article": articles[i] if i < len(articles) else "",
"quantity": int(quantities[i]) if i < len(quantities) else 0,
"price": float(prices[i]) if i < len(prices) else 0.0,
"name": self.extract_product_name(text, i),
"confidence": 0.8
}
if item["article"] or item["quantity"] > 0:
warehouse_items.append(item)
return warehouse_items
def extract_product_name(self, text, index=0):
"""Извлечение названия товара из текста"""
# Простая эвристика для извлечения названий
words = text.split()
# Ищем слова после артикулов или количеств
product_keywords = ["лдсп", "мдф", "фанера", "дуб", "бук", "ясень", "орех", "чикаго"]
for word in words:
if any(keyword in word.lower() for keyword in product_keywords):
return word.title()
return "Товар"
async def generate_smart_suggestions(self, warehouse_items):
"""Генерация умных предложений"""
suggestions = []
for item in warehouse_items:
if not item["article"]:
suggestions.append({
"type": "missing_article",
"message": f"Не найден артикул для товара '{item['name']}'",
"action": "manual_input",
"priority": "high"
})
if item["quantity"] == 0:
suggestions.append({
"type": "missing_quantity",
"message": f"Не найдено количество для '{item['article'] or item['name']}'",
"action": "manual_input",
"priority": "medium"
})
if item["price"] == 0:
suggestions.append({
"type": "missing_price",
"message": f"Не найдена цена для '{item['article'] or item['name']}'",
"action": "suggest_price",
"priority": "low"
})
return suggestions
def calculate_overall_confidence(self, extraction_results):
"""Расчет общей уверенности"""
if not extraction_results:
return 0.0
total_confidence = sum(result.get("confidence", 0) for result in extraction_results)
return round(total_confidence / len(extraction_results), 2)
def get_stats(self):
"""Статистика работы микросервиса"""
return {
"quota_used": f"{self.stats['total_requests']}/20000",
"uptime_hours": (datetime.now() - self.stats["start_time"]).total_seconds() / 3600,
"models_loaded": ["TrOCR", "LayoutLM", "TableTransformer", "RuBERT-NER", "SuryaTable"],
"success_rate": self._calculate_success_rate()
}
def _calculate_success_rate(self):
"""Расчет успешного процента"""
if self.stats["total_requests"] == 0:
return 0.0
return round(self.stats["successful_extractions"] / self.stats["total_requests"] * 100, 1)
# Инициализация AI
ai_orchestrator = FreeAIOrchestrator()
# Gradio интерфейс
def process_warehouse_document(image, document_type):
"""Обработка складского документа через Gradio"""
try:
import asyncio
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
result = loop.run_until_complete(
ai_orchestrator.extract_warehouse_data(image, document_type)
)
return json.dumps(result, ensure_ascii=False, indent=2)
except Exception as e:
return json.dumps({
"success": False,
"error": f"Ошибка обработки: {str(e)}",
"cost": 0.0
}, ensure_ascii=False, indent=2)
def get_service_stats():
"""Получение статистики сервиса"""
stats = ai_orchestrator.get_stats()
return json.dumps(stats, ensure_ascii=False, indent=2)
# Gradio интерфейс
with gr.Blocks(title="SkladBot Free AI") as app:
gr.Markdown("# 🤖 SkladBot Free AI Microservice")
gr.Markdown("**БЕСПЛАТНАЯ** обработка складских документов через AI")
with gr.Tab("Обработка документов"):
image_input = gr.Image(type="pil", label="Загрузите изображение документа")
doc_type = gr.Dropdown(
choices=["auto", "invoice", "table", "form", "handwritten"],
value="auto",
label="Тип документа"
)
process_btn = gr.Button("🔍 Обработать документ", variant="primary")
result_output = gr.Textbox(
label="Результат обработки",
lines=20,
max_lines=30
)
process_btn.click(
process_warehouse_document,
inputs=[image_input, doc_type],
outputs=result_output
)
with gr.Tab("Статистика"):
stats_btn = gr.Button("📊 Обновить статистику")
stats_output = gr.Textbox(
label="Статистика сервиса",
lines=10
)
stats_btn.click(
get_service_stats,
outputs=stats_output
)
gr.Markdown("---")
gr.Markdown("💰 **Стоимость**: $0 (100% бесплатно)")
gr.Markdown("📊 **Лимит**: 20,000 запросов/месяц")
gr.Markdown("🧠 **AI модели**: TrOCR, LayoutLM, Table Transformer, RuBERT, SuryaTable")
if __name__ == "__main__":
app.launch(
server_name="0.0.0.0",
server_port=7860,
show_error=True
)