Spaces:
Sleeping
Sleeping
import gradio as gr | |
import os | |
import shutil | |
import pandas as pd | |
from datetime import datetime | |
from scripts.document_processor import * | |
from scripts.rag_engine import * | |
import json | |
import tempfile | |
from scripts.config import * | |
if not os.path.exists(UPLOAD_FOLDER): | |
os.makedirs(UPLOAD_FOLDER) | |
if not os.path.exists("processed_data"): | |
os.makedirs("processed_data") | |
if not os.path.exists(RAG_FILES_DIR): | |
os.makedirs(RAG_FILES_DIR) | |
def initialize_system(): | |
global query_engine | |
query_engine = None | |
try: | |
query_engine = load_rag_system() | |
if query_engine is not None: | |
chunk_count = 0 | |
if os.path.exists(PROCESSED_DATA_FILE): | |
processed_chunks = load_processed_chunks(PROCESSED_DATA_FILE) | |
chunk_count = len(processed_chunks) | |
else: | |
try: | |
import pickle | |
with open(os.path.join("processed_data", 'documents.pkl'), 'rb') as f: | |
documents = pickle.load(f) | |
chunk_count = len(documents) | |
except: | |
chunk_count = "неизвестно" | |
return f"AIEXP система инициализирована с {chunk_count} фрагментами нормативных документов (загружена из сохраненного индекса)" | |
except Exception as e: | |
print(f"Не удалось загрузить сохраненную систему: {str(e)}") | |
if os.path.exists(PROCESSED_DATA_FILE): | |
try: | |
processed_chunks_df = load_processed_chunks(PROCESSED_DATA_FILE) | |
# Проверяем наличие нужных столбцов | |
required_columns = {'file_link', 'chunk_text', 'chunk_id', 'document_id'} | |
if not required_columns.issubset(set(processed_chunks_df.columns)): | |
return f"Ошибка при инициализации из CSV: отсутствуют необходимые столбцы: {required_columns - set(processed_chunks_df.columns)}" | |
processed_chunks = processed_chunks_df.to_dict('records') | |
if processed_chunks: | |
query_engine = build_rag_system(processed_chunks) | |
return f"AIEXP система инициализирована с {len(processed_chunks)} фрагментами нормативных документов (построена из CSV)" | |
except Exception as e: | |
return f"Ошибка при инициализации из CSV: {str(e)}" | |
return "AIEXP система готова к работе. Загрузите нормативные документы для создания базы знаний." | |
def get_uploaded_files_info(): | |
if not os.path.exists(UPLOAD_FOLDER): | |
return "Нет загруженных файлов в базе знаний" | |
files = os.listdir(UPLOAD_FOLDER) | |
if not files: | |
return "Нет загруженных файлов в базе знаний" | |
file_info = [] | |
file_count = len(files) | |
for file in files: | |
file_path = os.path.join(UPLOAD_FOLDER, file) | |
size = os.path.getsize(file_path) | |
modified = datetime.fromtimestamp(os.path.getmtime(file_path)).strftime("%Y-%m-%d %H:%M") | |
file_info.append(f"📄 {file} ({size} байт, добавлен: {modified})") | |
return f"Всего нормативных документов в базе: {file_count}\n\n" + "\n".join(file_info) | |
def upload_files(files): | |
global query_engine | |
if not files: | |
return "Файлы не выбраны", get_uploaded_files_info() | |
uploaded_count = 0 | |
errors = [] | |
for file in files: | |
try: | |
filename = os.path.basename(file.name) | |
destination = os.path.join(UPLOAD_FOLDER, filename) | |
shutil.copy2(file.name, destination) | |
uploaded_count += 1 | |
if query_engine is not None: | |
try: | |
query_engine = add_new_document_to_system(destination, query_engine) | |
except Exception as e: | |
errors.append(f"Ошибка добавления {filename} в систему: {str(e)}") | |
except Exception as e: | |
errors.append(f"Ошибка загрузки {file.name}: {str(e)}") | |
result_message = f"Загружено нормативных документов: {uploaded_count}" | |
if errors: | |
result_message += f"\nОшибки:\n" + "\n".join(errors) | |
else: | |
result_message += f"\nДокументы автоматически добавлены в базу знаний" | |
return result_message, get_uploaded_files_info() | |
def process_all_documents(): | |
global query_engine | |
if not os.path.exists(UPLOAD_FOLDER): | |
return "Папка с нормативными документами не найдена" | |
files = os.listdir(UPLOAD_FOLDER) | |
if not files: | |
return "Нет нормативных документов для обработки" | |
file_paths = [os.path.join(UPLOAD_FOLDER, f) for f in files] | |
try: | |
processed_chunks = process_multiple_documents(file_paths) | |
if not processed_chunks: | |
return "Не удалось создать фрагменты нормативных документов" | |
save_processed_chunks(processed_chunks, PROCESSED_DATA_FILE) | |
query_engine = build_rag_system(processed_chunks) | |
with open(INDEX_STATE_FILE, 'w', encoding='utf-8') as f: | |
json.dump({ | |
'processed_files': files, | |
'chunks_count': len(processed_chunks), | |
'last_update': datetime.now().isoformat() | |
}, f, ensure_ascii=False, indent=2) | |
return f"Обработка базы знаний завершена успешно!\nОбработано нормативных документов: {len(files)}\nСоздано фрагментов: {len(processed_chunks)}\nAIEXP система готова для работы с нормативной документацией." | |
except Exception as e: | |
return f"Ошибка при обработке нормативных документов: {str(e)}" | |
def get_system_status(): | |
status_info = [] | |
files_count = len(os.listdir(UPLOAD_FOLDER)) if os.path.exists(UPLOAD_FOLDER) else 0 | |
if os.path.exists(INDEX_STATE_FILE): | |
with open(INDEX_STATE_FILE, 'r', encoding='utf-8') as f: | |
state = json.load(f) | |
status_info.append(f"🟢 AIEXP система активна") | |
status_info.append(f"📊 Нормативных документов в базе: {files_count}") | |
status_info.append(f"📝 Фрагментов в индексе: {state.get('chunks_count', 0)}") | |
status_info.append(f"🕒 Последнее обновление: {state.get('last_update', 'Неизвестно')}") | |
if state.get('processed_files'): | |
status_info.append(f"📋 Обработанные документы:") | |
for file in state['processed_files'][:10]: | |
status_info.append(f" • {file}") | |
if len(state['processed_files']) > 10: | |
status_info.append(f" ... и еще {len(state['processed_files']) - 10} документов") | |
else: | |
status_info.append("🔴 AIEXP система не инициализирована") | |
status_info.append(f"📊 Нормативных документов загружено: {files_count}") | |
status_info.append("Обработайте документы для создания базы знаний") | |
return "\n".join(status_info) | |
def answer_question(question): | |
global query_engine | |
if not question.strip(): | |
return "Пожалуйста, введите вопрос по нормативной документации", "" | |
if query_engine is None: | |
return "База знаний не готова. Сначала загрузите и обработайте нормативные документы.", "" | |
try: | |
response = query_documents(query_engine, question) | |
formatted_response = format_response_with_sources(response) | |
answer = formatted_response['answer'] | |
sources_info = [] | |
sources_info.append("📚 Источники из нормативной документации:") | |
for i, source in enumerate(formatted_response['sources'][:5], 1): | |
sources_info.append(f"\n{i}. Документ: {source['document_id']}") | |
if source['section']: | |
sources_info.append(f" Раздел: {source['section']}") | |
if source['subsection']: | |
sources_info.append(f" Подраздел: {source['subsection']}") | |
sources_info.append(f" Фрагмент: ...{source['text_preview'][:150]}...") | |
return answer, "\n".join(sources_info) | |
except Exception as e: | |
return f"Ошибка при обработке вопроса: {str(e)}", "" | |
def clear_all_data(): | |
global query_engine | |
try: | |
if os.path.exists(UPLOAD_FOLDER): | |
shutil.rmtree(UPLOAD_FOLDER) | |
os.makedirs(UPLOAD_FOLDER) | |
if os.path.exists("processed_data"): | |
shutil.rmtree("processed_data") | |
os.makedirs("processed_data") | |
if os.path.exists(RAG_FILES_DIR): | |
shutil.rmtree(RAG_FILES_DIR) | |
os.makedirs(RAG_FILES_DIR) | |
query_engine = None | |
return "Вся база знаний успешно очищена", get_uploaded_files_info(), get_system_status() | |
except Exception as e: | |
return f"Ошибка при очистке базы знаний: {str(e)}", get_uploaded_files_info(), get_system_status() | |
chat_history = [] | |
def add_to_chat_history(user_query, assistant_response): | |
"""Add exchange to chat history""" | |
global chat_history | |
chat_history.append({ | |
'user': user_query, | |
'assistant': assistant_response, | |
'timestamp': datetime.now().isoformat() | |
}) | |
# Keep only last 10 exchanges to prevent memory issues | |
if len(chat_history) > 10: | |
chat_history = chat_history[-10:] | |
def get_chat_context(): | |
"""Get formatted chat history""" | |
if not chat_history: | |
return "Новый разговор" | |
context = "История разговора:\n" | |
for i, exchange in enumerate(chat_history[-3:], 1): # Show last 3 | |
context += f"{i}. Пользователь: {exchange['user'][:100]}...\n" | |
context += f" Ответ: {exchange['assistant'][:100]}...\n" | |
return context | |
def enhanced_answer_question(question): | |
"""Enhanced version with preprocessing and chat history""" | |
global query_engine, chat_history | |
if not question.strip(): | |
return "Пожалуйста, введите вопрос по нормативной документации", "", get_chat_context() | |
if query_engine is None: | |
return "База знаний не готова. Сначала загрузите и обработайте нормативные документы.", "", get_chat_context() | |
try: | |
# Step 1: Preprocess query with chat history | |
improved_query = preprocess_query_with_context(question, chat_history) | |
# Step 2: Query with enhanced scoring | |
enhanced_response = query_documents_with_scores(query_engine, improved_query) | |
# Step 3: Format response with sources | |
formatted_response = format_enhanced_response_with_sources(enhanced_response) | |
# Step 4: Add conversational context | |
final_answer = create_chat_context_prompt(formatted_response['answer'], chat_history) | |
# Step 5: Add to chat history | |
add_to_chat_history(question, final_answer) | |
# Add query info to sources if preprocessing was used | |
sources_text = formatted_response['sources'] | |
if improved_query != question: | |
sources_text = f"🔄 Улучшенный запрос: '{improved_query}'\n\n" + sources_text | |
return final_answer, sources_text, get_chat_context() | |
except Exception as e: | |
error_msg = f"Ошибка при обработке вопроса: {str(e)}" | |
add_to_chat_history(question, error_msg) | |
return error_msg, "", get_chat_context() | |
def clear_chat_history(): | |
"""Clear chat history""" | |
global chat_history | |
chat_history = [] | |
return "История чата очищена", get_chat_context() | |
def create_demo_interface(): | |
with gr.Blocks(title="AIEXP - AI Expert для нормативной документации", theme=gr.themes.Soft()) as demo: | |
gr.Markdown(""" | |
# 🤖 AIEXP - Artificial Intelligence Expert | |
## Инструмент для работы с нормативной документацией | |
**Возможности системы:** | |
- 🔍 Поиск информации по запросу с указанием источников среди нормативной документации | |
- 📋 Цитирование пунктов нормативной документации из базы знаний | |
- 📝 Краткий пересказ содержания разделов или целых нормативных документов | |
- 🔎 Семантический анализ соответствия информации требованиям НД | |
- 📋 Формирование пошаговых планов действий на основании требований НД | |
**Поддерживаемые форматы:** PDF, DOCX, TXT, CSV, XLSX, JSON | |
""") | |
with gr.Tab("🏠 Поиск по нормативным документам"): | |
gr.Markdown("### Задайте вопрос по нормативной документации") | |
with gr.Row(): | |
with gr.Column(scale=3): | |
question_input = gr.Textbox( | |
label="Ваш вопрос к базе знаний", | |
placeholder="Введите вопрос по нормативным документам...", | |
lines=3 | |
) | |
ask_btn = gr.Button("🔍 Найти ответ", variant="primary", size="lg") | |
clear_chat_btn = gr.Button("🗑️ Очистить историю чата", variant="secondary") | |
chat_context = gr.Textbox( | |
label="Контекст разговора", | |
lines=4, | |
interactive=False, | |
value=get_chat_context() | |
) | |
gr.Examples( | |
examples=[ | |
"Какой стандарт устанавливает порядок признания протоколов испытаний продукции в области использования атомной энергии?", | |
"Кто несет ответственность за организацию и проведение признания протоколов испытаний продукции?", | |
"В каких случаях могут быть признаны протоколы испытаний, проведенные лабораториями, не включенными в перечисления?", | |
"Какие критерии используются органом по сертификации для анализа документации на втором этапе признания протоколов испытаний?" | |
], | |
inputs=question_input, | |
label="Примеры вопросов по нормативным документам" | |
) | |
with gr.Column(scale=4): | |
answer_output = gr.Textbox( | |
label="Ответ на основе нормативных документов", | |
lines=8, | |
interactive=False | |
) | |
sources_output = gr.Textbox( | |
label="Источники из нормативной документации", | |
lines=10, | |
interactive=False | |
) | |
# Event handlers | |
ask_btn.click( | |
fn=enhanced_answer_question, | |
inputs=[question_input], | |
outputs=[answer_output, sources_output, chat_context] | |
) | |
clear_chat_btn.click( | |
fn=clear_chat_history, | |
outputs=[chat_context] | |
) | |
with gr.Tab("📤 Управление базой знаний (Администратор)"): | |
gr.Markdown("### Загрузка и обработка нормативных документов") | |
with gr.Row(): | |
with gr.Column(scale=2): | |
file_upload = gr.File( | |
label="Выберите нормативные документы для загрузки", | |
file_count="multiple", | |
file_types=[".pdf", ".docx", ".txt", ".csv", ".xlsx", ".json"] | |
) | |
with gr.Row(): | |
upload_btn = gr.Button("📤 Загрузить в базу знаний", variant="primary") | |
process_btn = gr.Button("⚙️ Переобработать всю базу", variant="secondary") | |
clear_btn = gr.Button("🗑️ Очистить базу знаний", variant="stop") | |
with gr.Column(scale=2): | |
upload_status = gr.Textbox( | |
label="Статус загрузки", | |
lines=3, | |
interactive=False | |
) | |
processing_status = gr.Textbox( | |
label="Статус обработки базы знаний", | |
lines=5, | |
interactive=False | |
) | |
gr.Markdown("### База нормативных документов") | |
files_info = gr.Textbox( | |
label="Документы в базе знаний", | |
lines=8, | |
interactive=False, | |
value=get_uploaded_files_info() | |
) | |
with gr.Tab("📊 Статус AIEXP системы"): | |
gr.Markdown("### Информация о состоянии базы знаний") | |
system_status = gr.Textbox( | |
label="Статус AIEXP системы", | |
lines=10, | |
interactive=False, | |
value=get_system_status() | |
) | |
refresh_status_btn = gr.Button("🔄 Обновить статус системы") | |
upload_btn.click( | |
fn=upload_files, | |
inputs=[file_upload], | |
outputs=[upload_status, files_info] | |
) | |
process_btn.click( | |
fn=process_all_documents, | |
outputs=[processing_status] | |
) | |
ask_btn.click( | |
fn=answer_question, | |
inputs=[question_input], | |
outputs=[answer_output, sources_output] | |
) | |
question_input.submit( | |
fn=answer_question, | |
inputs=[question_input], | |
outputs=[answer_output, sources_output] | |
) | |
clear_btn.click( | |
fn=clear_all_data, | |
outputs=[processing_status, files_info, system_status] | |
) | |
refresh_status_btn.click( | |
fn=get_system_status, | |
outputs=[system_status] | |
) | |
return demo | |
if __name__ == "__main__": | |
print("Инициализация AIEXP системы...") | |
init_message = initialize_system() | |
print(init_message) | |
demo = create_demo_interface() | |
demo.launch( | |
share=True, | |
server_name="0.0.0.0", | |
server_port=7860, | |
show_error=True | |
) |