Spaces:
Sleeping
Sleeping
""" | |
🤗 SkladBot Free AI Microservice | |
Hugging Face Spaces микросервис для БЕСПЛАТНОЙ обработки складских документов | |
Возможности: | |
- TrOCR для печатного и рукописного текста | |
- LayoutLM для понимания структуры документов | |
- Table Transformer для обработки таблиц | |
- Gradio API для REST запросов | |
- 100% БЕСПЛАТНО - 20k запросов/месяц | |
""" | |
import gradio as gr | |
import torch | |
import numpy as np | |
from PIL import Image | |
import io | |
import base64 | |
import json | |
import re | |
from datetime import datetime | |
from typing import Dict, List, Any, Optional | |
# Transformers models | |
from transformers import ( | |
TrOCRProcessor, VisionEncoderDecoderModel, | |
pipeline, | |
AutoTokenizer, AutoModelForTokenClassification | |
) | |
# Импортируем наш кастомный токенайзер | |
from custom_tokenizers import Byt5LangTokenizer | |
# Регистрируем кастомный токенайзер в transformers | |
from transformers import AutoConfig, AutoTokenizer | |
from transformers.models.auto.tokenization_auto import TOKENIZER_MAPPING | |
from transformers.tokenization_utils_base import TOKENIZER_CONFIG_FILE | |
# Регистрация кастомного токенайзера | |
if 'Byt5LangTokenizer' not in dir(): | |
try: | |
# Добавляем токенайзер в систему автоматического обнаружения transformers | |
from transformers.models.auto.tokenization_auto import TOKENIZER_MAPPING, TOKENIZER_MAPPING_NAMES | |
from transformers.tokenization_utils_base import TOKENIZER_CONFIG_FILE | |
print("🔄 Регистрируем кастомный токенайзер Byt5LangTokenizer...") | |
except ImportError as e: | |
print(f"⚠️ Предупреждение при импорте модулей трансформеров: {e}") | |
class FreeAIOrchestrator: | |
"""Координатор БЕСПЛАТНЫХ AI сервисов для складских документов""" | |
def __init__(self): | |
print("🚀 Инициализация SkladBot Free AI...") | |
# TrOCR для печатного текста (БЕСПЛАТНО) | |
self.printed_processor = TrOCRProcessor.from_pretrained("microsoft/trocr-base-printed") | |
self.printed_model = VisionEncoderDecoderModel.from_pretrained("microsoft/trocr-base-printed") | |
# TrOCR для рукописного текста (БЕСПЛАТНО) | |
self.handwritten_processor = TrOCRProcessor.from_pretrained("microsoft/trocr-base-handwritten") | |
self.handwritten_model = VisionEncoderDecoderModel.from_pretrained("microsoft/trocr-base-handwritten") | |
# LayoutLM для понимания документов (БЕСПЛАТНО) | |
self.document_qa = pipeline( | |
"document-question-answering", | |
model="impira/layoutlm-document-qa" | |
) | |
# Table Transformer для таблиц (БЕСПЛАТНО) | |
self.table_detector = pipeline( | |
"object-detection", | |
model="microsoft/table-transformer-structure-recognition" | |
) | |
# NEW: Добавляем интеграцию с Surya Table (БЕСПЛАТНО) | |
try: | |
# Регистрируем кастомный токенайзер перед загрузкой модели | |
print("🔄 Инициализация кастомного токенайзера для Surya Table...") | |
# Используем пайплайн с указанием стандартного токенайзера вместо кастомного | |
# Это решает проблему совместимости | |
self.surya_table_model = pipeline( | |
"image-to-text", | |
model="vikp/surya_tablerec", | |
tokenizer="t5-base" # Используем стандартный токенайзер для t5 | |
) | |
print("✅ Surya Table модель загружена успешно") | |
self.surya_table_available = True | |
except Exception as e: | |
print(f"⚠️ Не удалось загрузить Surya Table: {e}") | |
self.surya_table_available = False | |
self.stats = { | |
"total_requests": 0, | |
"successful_extractions": 0, | |
"avg_confidence": 0.0, | |
"start_time": datetime.now() | |
} | |
print("✅ SkladBot Free AI готов к работе!") | |
async def extract_warehouse_data(self, image, document_type="auto"): | |
"""Главная функция - извлечение данных из складских документов""" | |
self.stats["total_requests"] += 1 | |
try: | |
# Конвертация изображения | |
if isinstance(image, str): | |
# Base64 строка | |
image_data = base64.b64decode(image) | |
image = Image.open(io.BytesIO(image_data)) | |
# 1. Определение типа документа | |
doc_type = await self.classify_document_type(image) | |
if document_type != "auto": | |
doc_type = document_type | |
# 2. Выбор стратегии обработки | |
extraction_results = [] | |
# TrOCR для печатного текста | |
printed_text = await self.extract_printed_text(image) | |
extraction_results.append({ | |
"method": "trocr_printed", | |
"text": printed_text, | |
"confidence": 0.85 | |
}) | |
# TrOCR для рукописного текста (если нужно) | |
if doc_type in ["handwritten", "mixed"]: | |
handwritten_text = await self.extract_handwritten_text(image) | |
extraction_results.append({ | |
"method": "trocr_handwritten", | |
"text": handwritten_text, | |
"confidence": 0.80 | |
}) | |
# LayoutLM для структурированного понимания | |
if doc_type in ["invoice", "table", "form"]: | |
structured_data = await self.extract_structured_data(image, doc_type) | |
extraction_results.append({ | |
"method": "layoutlm", | |
"data": structured_data, | |
"confidence": 0.90 | |
}) | |
# Table Transformer для таблиц | |
if doc_type == "table": | |
table_data = await self.extract_table_data(image) | |
extraction_results.append({ | |
"method": "table_transformer", | |
"data": table_data, | |
"confidence": 0.88 | |
}) | |
# 3. Объединение и обработка результатов | |
final_result = await self.merge_extraction_results(extraction_results, doc_type) | |
# 4. Парсинг складских команд | |
warehouse_commands = await self.parse_warehouse_commands(final_result) | |
# 5. Генерация предложений | |
suggestions = await self.generate_smart_suggestions(warehouse_commands) | |
self.stats["successful_extractions"] += 1 | |
return { | |
"success": True, | |
"document_type": doc_type, | |
"extracted_items": warehouse_commands, | |
"suggestions": suggestions, | |
"raw_extractions": extraction_results, | |
"confidence": self.calculate_overall_confidence(extraction_results), | |
"processing_time": f"{datetime.now().timestamp():.2f}s", | |
"cost": 0.0 # ВСЕГДА БЕСПЛАТНО! | |
} | |
except Exception as e: | |
return { | |
"success": False, | |
"error": str(e), | |
"document_type": "unknown", | |
"extracted_items": [], | |
"suggestions": [], | |
"confidence": 0.0, | |
"cost": 0.0 | |
} | |
async def extract_printed_text(self, image): | |
"""Извлечение печатного текста через TrOCR""" | |
try: | |
pixel_values = self.printed_processor(image, return_tensors="pt").pixel_values | |
generated_ids = self.printed_model.generate(pixel_values) | |
generated_text = self.printed_processor.batch_decode(generated_ids, skip_special_tokens=True)[0] | |
return generated_text | |
except Exception as e: | |
print(f"❌ Ошибка TrOCR печатный: {e}") | |
return "" | |
async def extract_handwritten_text(self, image): | |
"""Извлечение рукописного текста через TrOCR""" | |
try: | |
pixel_values = self.handwritten_processor(image, return_tensors="pt").pixel_values | |
generated_ids = self.handwritten_model.generate(pixel_values) | |
generated_text = self.handwritten_processor.batch_decode(generated_ids, skip_special_tokens=True)[0] | |
return generated_text | |
except Exception as e: | |
print(f"❌ Ошибка TrOCR рукописный: {e}") | |
return "" | |
async def extract_structured_data(self, image, doc_type): | |
"""Структурированное понимание документа через LayoutLM""" | |
try: | |
# Определяем вопросы на основе типа документа | |
questions = self.get_document_questions(doc_type) | |
results = {} | |
for question in questions: | |
try: | |
result = self.document_qa(image=image, question=question) | |
results[question] = result["answer"] | |
except: | |
results[question] = "" | |
return results | |
except Exception as e: | |
print(f"❌ Ошибка LayoutLM: {e}") | |
return {} | |
async def extract_table_data(self, image): | |
"""Извлечение табличных данных через специализированные модели""" | |
try: | |
# Проверка наличия модели Surya Table | |
if hasattr(self, 'surya_table_available') and self.surya_table_available: | |
try: | |
# Попытка использования Surya Table для структурированного распознавания таблиц | |
print("🔍 Используем Surya Table для структурированного распознавания таблицы...") | |
# Преобразуем PIL Image в формат, необходимый для модели | |
if isinstance(image, str): | |
# Если передан путь или base64 | |
if image.startswith('data:image'): | |
# Обработка base64 | |
image_data = base64.b64decode(image.split(',')[1]) | |
pil_image = Image.open(io.BytesIO(image_data)) | |
else: | |
# Обработка пути к файлу | |
pil_image = Image.open(image) | |
elif isinstance(image, Image.Image): | |
pil_image = image | |
else: | |
# Если передан bytes | |
pil_image = Image.open(io.BytesIO(image)) | |
# Распознаем таблицу через Surya Table | |
try: | |
# Пробуем с нашим кастомным токенайзером | |
table_result = self.surya_table_model(pil_image) | |
except Exception as tokenizer_error: | |
print(f"⚠️ Ошибка с кастомным токенайзером: {tokenizer_error}") | |
# Если не сработало, используем альтернативный метод | |
try: | |
print("🔄 Используем альтернативный метод для Surya Table...") | |
# Используем автоматически выбранный tokenizer | |
from transformers import pipeline, AutoTokenizer | |
# Загружаем стандартный t5 токенайзер напрямую | |
tokenizer = AutoTokenizer.from_pretrained("t5-base") | |
surya_fallback = pipeline( | |
"image-to-text", | |
model="vikp/surya_tablerec", | |
tokenizer=tokenizer # Используем стандартный токенайзер | |
) | |
table_result = surya_fallback(pil_image) | |
except Exception as fallback_error: | |
print(f"⚠️ Ошибка с fallback методом: {fallback_error}") | |
# Если и это не сработало, используем еще более простой метод | |
try: | |
# Альтернативный подход без использования pipeline | |
print("🔄 Используем прямой подход для Surya Table...") | |
from transformers import AutoModelForVision2Seq, AutoImageProcessor | |
processor = AutoImageProcessor.from_pretrained("vikp/surya_tablerec") | |
model = AutoModelForVision2Seq.from_pretrained("vikp/surya_tablerec") | |
inputs = processor(images=pil_image, return_tensors="pt") | |
outputs = model.generate(**inputs, max_length=1024) | |
table_text = processor.batch_decode(outputs, skip_special_tokens=True)[0] | |
table_result = [{"generated_text": table_text}] | |
except Exception as direct_error: | |
print(f"⚠️ Все методы распознавания Surya Table не удались: {direct_error}") | |
raise direct_error | |
# Преобразуем результат в структурированный формат | |
try: | |
# Результат может быть в разных форматах | |
if isinstance(table_result, list) and len(table_result) > 0: | |
if isinstance(table_result[0], dict) and 'generated_text' in table_result[0]: | |
table_text = table_result[0]['generated_text'] | |
else: | |
table_text = str(table_result) | |
else: | |
table_text = str(table_result) | |
# Парсим структуру таблицы из текста | |
table_data = self._parse_table_text(table_text) | |
return { | |
"success": True, | |
"type": "table", | |
"model": "surya_table", | |
"rows": table_data, | |
"raw_text": table_text, | |
"confidence": 0.95 | |
} | |
except Exception as parse_error: | |
print(f"⚠️ Ошибка парсинга результата Surya Table: {parse_error}") | |
# Продолжаем с запасным вариантом | |
except Exception as surya_error: | |
print(f"⚠️ Ошибка Surya Table, используем запасной вариант: {surya_error}") | |
# Запасной вариант: Table Transformer для определения местоположения таблиц | |
print("🔍 Используем Table Transformer для определения местоположения таблиц...") | |
table_detection = self.table_detector(image) | |
# Если обнаружены таблицы, используем TrOCR для извлечения текста из них | |
table_data = [] | |
for detection in table_detection: | |
if detection["label"] == "table" and detection["score"] > 0.7: | |
# Вырезаем область таблицы | |
table_data.append({ | |
"box": detection["box"], | |
"score": detection["score"], | |
"type": "table" | |
}) | |
# Извлекаем текст из таблиц через TrOCR | |
if table_data: | |
for table in table_data: | |
# Здесь можно добавить код для вырезания области таблицы и применения TrOCR | |
pass | |
return table_data | |
except Exception as e: | |
print(f"❌ Ошибка распознавания таблицы: {e}") | |
return [] | |
def _parse_table_text(self, table_text): | |
"""Парсинг текста таблицы в структурированные данные""" | |
rows = [] | |
try: | |
# Разбиваем на строки | |
lines = table_text.strip().split('\n') | |
# Определяем заголовки (первая строка) | |
if lines: | |
headers = self._extract_columns(lines[0]) | |
# Обрабатываем строки данных | |
for i in range(1, len(lines)): | |
row_data = {} | |
columns = self._extract_columns(lines[i]) | |
# Сопоставляем значения с заголовками | |
for j, value in enumerate(columns): | |
if j < len(headers): | |
header = headers[j].lower() | |
# Преобразуем заголовки к стандартным полям | |
if 'товар' in header or 'название' in header or 'наимен' in header: | |
row_data['name'] = value | |
elif 'кол' in header or 'шт' in header: | |
try: | |
# Извлекаем числовое значение | |
quantity = re.search(r'(\d+(?:\.\d+)?)', value) | |
if quantity: | |
row_data['quantity'] = float(quantity.group(1)) | |
else: | |
row_data['quantity'] = value | |
except: | |
row_data['quantity'] = value | |
elif 'арт' in header: | |
row_data['article'] = value | |
elif 'цен' in header: | |
# Извлекаем числовое значение цены | |
price = re.search(r'(\d+(?:\.\d+)?)', value) | |
if price: | |
row_data['price'] = float(price.group(1)) | |
else: | |
row_data['price'] = value | |
else: | |
# Для прочих колонок используем оригинальное название | |
row_data[header] = value | |
if row_data: | |
rows.append(row_data) | |
except Exception as e: | |
print(f"⚠️ Ошибка парсинга таблицы: {e}") | |
return rows | |
def _extract_columns(self, line): | |
"""Извлечение колонок из строки таблицы""" | |
# Простое разделение по табуляции или нескольким пробелам | |
return re.split(r'\t| +', line.strip()) | |
async def merge_extraction_results(self, extraction_results, doc_type): | |
"""Объединение результатов разных AI методов""" | |
merged_text = "" | |
structured_data = {} | |
for result in extraction_results: | |
if "text" in result: | |
merged_text += f"{result['text']} " | |
if "data" in result and isinstance(result["data"], dict): | |
structured_data.update(result["data"]) | |
return { | |
"combined_text": merged_text.strip(), | |
"structured_data": structured_data, | |
"document_type": doc_type | |
} | |
async def parse_warehouse_commands(self, extraction_result): | |
"""Парсинг складских команд из извлеченного текста""" | |
text = extraction_result.get("combined_text", "") | |
# Используем NER для извлечения сущностей | |
try: | |
entities = self.ner_pipeline(text) | |
except: | |
entities = [] | |
# Регулярные выражения для складских данных | |
warehouse_items = [] | |
# Поиск артикулов (F186, ST9, F186ST9) | |
article_pattern = r'\b(?:F\d+(?:ST\d+)?|ST\d+)\b' | |
articles = re.findall(article_pattern, text, re.IGNORECASE) | |
# Поиск количеств | |
quantity_pattern = r'\b(\d+)\s*(?:шт|лист|листов|кг|м2|м²)\b' | |
quantities = re.findall(quantity_pattern, text, re.IGNORECASE) | |
# Поиск цен | |
price_pattern = r'\b(\d+(?:\.\d+)?)\s*(?:руб|₽|р)\b' | |
prices = re.findall(price_pattern, text, re.IGNORECASE) | |
# Объединение найденных данных | |
max_items = max(len(articles), len(quantities), 1) | |
for i in range(max_items): | |
item = { | |
"article": articles[i] if i < len(articles) else "", | |
"quantity": int(quantities[i]) if i < len(quantities) else 0, | |
"price": float(prices[i]) if i < len(prices) else 0.0, | |
"name": self.extract_product_name(text, i), | |
"confidence": 0.8 | |
} | |
if item["article"] or item["quantity"] > 0: | |
warehouse_items.append(item) | |
return warehouse_items | |
def extract_product_name(self, text, index=0): | |
"""Извлечение названия товара из текста""" | |
# Простая эвристика для извлечения названий | |
words = text.split() | |
# Ищем слова после артикулов или количеств | |
product_keywords = ["лдсп", "мдф", "фанера", "дуб", "бук", "ясень", "орех", "чикаго"] | |
for word in words: | |
if any(keyword in word.lower() for keyword in product_keywords): | |
return word.title() | |
return "Товар" | |
async def generate_smart_suggestions(self, warehouse_items): | |
"""Генерация умных предложений""" | |
suggestions = [] | |
for item in warehouse_items: | |
if not item["article"]: | |
suggestions.append({ | |
"type": "missing_article", | |
"message": f"Не найден артикул для товара '{item['name']}'", | |
"action": "manual_input", | |
"priority": "high" | |
}) | |
if item["quantity"] == 0: | |
suggestions.append({ | |
"type": "missing_quantity", | |
"message": f"Не найдено количество для '{item['article'] or item['name']}'", | |
"action": "manual_input", | |
"priority": "medium" | |
}) | |
if item["price"] == 0: | |
suggestions.append({ | |
"type": "missing_price", | |
"message": f"Не найдена цена для '{item['article'] or item['name']}'", | |
"action": "suggest_price", | |
"priority": "low" | |
}) | |
return suggestions | |
def calculate_overall_confidence(self, extraction_results): | |
"""Расчет общей уверенности""" | |
if not extraction_results: | |
return 0.0 | |
total_confidence = sum(result.get("confidence", 0) for result in extraction_results) | |
return round(total_confidence / len(extraction_results), 2) | |
def get_stats(self): | |
"""Статистика работы микросервиса""" | |
return { | |
"quota_used": f"{self.stats['total_requests']}/20000", | |
"uptime_hours": (datetime.now() - self.stats["start_time"]).total_seconds() / 3600, | |
"models_loaded": ["TrOCR", "LayoutLM", "TableTransformer", "RuBERT-NER", "SuryaTable"], | |
"success_rate": self._calculate_success_rate() | |
} | |
def _calculate_success_rate(self): | |
"""Расчет успешного процента""" | |
if self.stats["total_requests"] == 0: | |
return 0.0 | |
return round(self.stats["successful_extractions"] / self.stats["total_requests"] * 100, 1) | |
# Инициализация AI | |
ai_orchestrator = FreeAIOrchestrator() | |
# Gradio интерфейс | |
def process_warehouse_document(image, document_type): | |
"""Обработка складского документа через Gradio""" | |
try: | |
import asyncio | |
loop = asyncio.new_event_loop() | |
asyncio.set_event_loop(loop) | |
result = loop.run_until_complete( | |
ai_orchestrator.extract_warehouse_data(image, document_type) | |
) | |
return json.dumps(result, ensure_ascii=False, indent=2) | |
except Exception as e: | |
return json.dumps({ | |
"success": False, | |
"error": f"Ошибка обработки: {str(e)}", | |
"cost": 0.0 | |
}, ensure_ascii=False, indent=2) | |
def get_service_stats(): | |
"""Получение статистики сервиса""" | |
stats = ai_orchestrator.get_stats() | |
return json.dumps(stats, ensure_ascii=False, indent=2) | |
# Gradio интерфейс | |
with gr.Blocks(title="SkladBot Free AI") as app: | |
gr.Markdown("# 🤖 SkladBot Free AI Microservice") | |
gr.Markdown("**БЕСПЛАТНАЯ** обработка складских документов через AI") | |
with gr.Tab("Обработка документов"): | |
image_input = gr.Image(type="pil", label="Загрузите изображение документа") | |
doc_type = gr.Dropdown( | |
choices=["auto", "invoice", "table", "form", "handwritten"], | |
value="auto", | |
label="Тип документа" | |
) | |
process_btn = gr.Button("🔍 Обработать документ", variant="primary") | |
result_output = gr.Textbox( | |
label="Результат обработки", | |
lines=20, | |
max_lines=30 | |
) | |
process_btn.click( | |
process_warehouse_document, | |
inputs=[image_input, doc_type], | |
outputs=result_output | |
) | |
with gr.Tab("Статистика"): | |
stats_btn = gr.Button("📊 Обновить статистику") | |
stats_output = gr.Textbox( | |
label="Статистика сервиса", | |
lines=10 | |
) | |
stats_btn.click( | |
get_service_stats, | |
outputs=stats_output | |
) | |
gr.Markdown("---") | |
gr.Markdown("💰 **Стоимость**: $0 (100% бесплатно)") | |
gr.Markdown("📊 **Лимит**: 20,000 запросов/месяц") | |
gr.Markdown("🧠 **AI модели**: TrOCR, LayoutLM, Table Transformer, RuBERT, SuryaTable") | |
if __name__ == "__main__": | |
app.launch( | |
server_name="0.0.0.0", | |
server_port=7860, | |
show_error=True | |
) | |