AIEXP_RAG_1 / app_1.py
MrSimple07's picture
fixed file_path problem + added app_1.py + added possible relevancy check first
648d16e
raw
history blame
21 kB
import gradio as gr
import os
import shutil
import pandas as pd
from datetime import datetime
from scripts.document_processor import *
from scripts.rag_engine import *
import json
import tempfile
from scripts.config import *
if not os.path.exists(UPLOAD_FOLDER):
os.makedirs(UPLOAD_FOLDER)
if not os.path.exists("processed_data"):
os.makedirs("processed_data")
if not os.path.exists(RAG_FILES_DIR):
os.makedirs(RAG_FILES_DIR)
def initialize_system():
global query_engine
query_engine = None
try:
query_engine = load_rag_system()
if query_engine is not None:
chunk_count = 0
if os.path.exists(PROCESSED_DATA_FILE):
processed_chunks = load_processed_chunks(PROCESSED_DATA_FILE)
chunk_count = len(processed_chunks)
else:
try:
import pickle
with open(os.path.join("processed_data", 'documents.pkl'), 'rb') as f:
documents = pickle.load(f)
chunk_count = len(documents)
except:
chunk_count = "неизвестно"
return f"AIEXP система инициализирована с {chunk_count} фрагментами нормативных документов (загружена из сохраненного индекса)"
except Exception as e:
print(f"Не удалось загрузить сохраненную систему: {str(e)}")
if os.path.exists(PROCESSED_DATA_FILE):
try:
processed_chunks_df = load_processed_chunks(PROCESSED_DATA_FILE)
# Проверяем наличие нужных столбцов
required_columns = {'file_link', 'chunk_text', 'chunk_id', 'document_id'}
if not required_columns.issubset(set(processed_chunks_df.columns)):
return f"Ошибка при инициализации из CSV: отсутствуют необходимые столбцы: {required_columns - set(processed_chunks_df.columns)}"
processed_chunks = processed_chunks_df.to_dict('records')
if processed_chunks:
query_engine = build_rag_system(processed_chunks)
return f"AIEXP система инициализирована с {len(processed_chunks)} фрагментами нормативных документов (построена из CSV)"
except Exception as e:
return f"Ошибка при инициализации из CSV: {str(e)}"
return "AIEXP система готова к работе. Загрузите нормативные документы для создания базы знаний."
def get_uploaded_files_info():
if not os.path.exists(UPLOAD_FOLDER):
return "Нет загруженных файлов в базе знаний"
files = os.listdir(UPLOAD_FOLDER)
if not files:
return "Нет загруженных файлов в базе знаний"
file_info = []
file_count = len(files)
for file in files:
file_path = os.path.join(UPLOAD_FOLDER, file)
size = os.path.getsize(file_path)
modified = datetime.fromtimestamp(os.path.getmtime(file_path)).strftime("%Y-%m-%d %H:%M")
file_info.append(f"📄 {file} ({size} байт, добавлен: {modified})")
return f"Всего нормативных документов в базе: {file_count}\n\n" + "\n".join(file_info)
def upload_files(files):
global query_engine
if not files:
return "Файлы не выбраны", get_uploaded_files_info()
uploaded_count = 0
errors = []
for file in files:
try:
filename = os.path.basename(file.name)
destination = os.path.join(UPLOAD_FOLDER, filename)
shutil.copy2(file.name, destination)
uploaded_count += 1
if query_engine is not None:
try:
query_engine = add_new_document_to_system(destination, query_engine)
except Exception as e:
errors.append(f"Ошибка добавления {filename} в систему: {str(e)}")
except Exception as e:
errors.append(f"Ошибка загрузки {file.name}: {str(e)}")
result_message = f"Загружено нормативных документов: {uploaded_count}"
if errors:
result_message += f"\nОшибки:\n" + "\n".join(errors)
else:
result_message += f"\nДокументы автоматически добавлены в базу знаний"
return result_message, get_uploaded_files_info()
def process_all_documents():
global query_engine
if not os.path.exists(UPLOAD_FOLDER):
return "Папка с нормативными документами не найдена"
files = os.listdir(UPLOAD_FOLDER)
if not files:
return "Нет нормативных документов для обработки"
file_paths = [os.path.join(UPLOAD_FOLDER, f) for f in files]
try:
processed_chunks = process_multiple_documents(file_paths)
if not processed_chunks:
return "Не удалось создать фрагменты нормативных документов"
save_processed_chunks(processed_chunks, PROCESSED_DATA_FILE)
query_engine = build_rag_system(processed_chunks)
with open(INDEX_STATE_FILE, 'w', encoding='utf-8') as f:
json.dump({
'processed_files': files,
'chunks_count': len(processed_chunks),
'last_update': datetime.now().isoformat()
}, f, ensure_ascii=False, indent=2)
return f"Обработка базы знаний завершена успешно!\nОбработано нормативных документов: {len(files)}\nСоздано фрагментов: {len(processed_chunks)}\nAIEXP система готова для работы с нормативной документацией."
except Exception as e:
return f"Ошибка при обработке нормативных документов: {str(e)}"
def get_system_status():
status_info = []
files_count = len(os.listdir(UPLOAD_FOLDER)) if os.path.exists(UPLOAD_FOLDER) else 0
if os.path.exists(INDEX_STATE_FILE):
with open(INDEX_STATE_FILE, 'r', encoding='utf-8') as f:
state = json.load(f)
status_info.append(f"🟢 AIEXP система активна")
status_info.append(f"📊 Нормативных документов в базе: {files_count}")
status_info.append(f"📝 Фрагментов в индексе: {state.get('chunks_count', 0)}")
status_info.append(f"🕒 Последнее обновление: {state.get('last_update', 'Неизвестно')}")
if state.get('processed_files'):
status_info.append(f"📋 Обработанные документы:")
for file in state['processed_files'][:10]:
status_info.append(f" • {file}")
if len(state['processed_files']) > 10:
status_info.append(f" ... и еще {len(state['processed_files']) - 10} документов")
else:
status_info.append("🔴 AIEXP система не инициализирована")
status_info.append(f"📊 Нормативных документов загружено: {files_count}")
status_info.append("Обработайте документы для создания базы знаний")
return "\n".join(status_info)
def answer_question(question):
global query_engine
if not question.strip():
return "Пожалуйста, введите вопрос по нормативной документации", ""
if query_engine is None:
return "База знаний не готова. Сначала загрузите и обработайте нормативные документы.", ""
try:
response = query_documents(query_engine, question)
formatted_response = format_response_with_sources(response)
answer = formatted_response['answer']
sources_info = []
sources_info.append("📚 Источники из нормативной документации:")
for i, source in enumerate(formatted_response['sources'][:5], 1):
sources_info.append(f"\n{i}. Документ: {source['document_id']}")
if source['section']:
sources_info.append(f" Раздел: {source['section']}")
if source['subsection']:
sources_info.append(f" Подраздел: {source['subsection']}")
sources_info.append(f" Фрагмент: ...{source['text_preview'][:150]}...")
return answer, "\n".join(sources_info)
except Exception as e:
return f"Ошибка при обработке вопроса: {str(e)}", ""
def clear_all_data():
global query_engine
try:
if os.path.exists(UPLOAD_FOLDER):
shutil.rmtree(UPLOAD_FOLDER)
os.makedirs(UPLOAD_FOLDER)
if os.path.exists("processed_data"):
shutil.rmtree("processed_data")
os.makedirs("processed_data")
if os.path.exists(RAG_FILES_DIR):
shutil.rmtree(RAG_FILES_DIR)
os.makedirs(RAG_FILES_DIR)
query_engine = None
return "Вся база знаний успешно очищена", get_uploaded_files_info(), get_system_status()
except Exception as e:
return f"Ошибка при очистке базы знаний: {str(e)}", get_uploaded_files_info(), get_system_status()
chat_history = []
def add_to_chat_history(user_query, assistant_response):
"""Add exchange to chat history"""
global chat_history
chat_history.append({
'user': user_query,
'assistant': assistant_response,
'timestamp': datetime.now().isoformat()
})
# Keep only last 10 exchanges to prevent memory issues
if len(chat_history) > 10:
chat_history = chat_history[-10:]
def get_chat_context():
"""Get formatted chat history"""
if not chat_history:
return "Новый разговор"
context = "История разговора:\n"
for i, exchange in enumerate(chat_history[-3:], 1): # Show last 3
context += f"{i}. Пользователь: {exchange['user'][:100]}...\n"
context += f" Ответ: {exchange['assistant'][:100]}...\n"
return context
def enhanced_answer_question(question):
"""Enhanced version with preprocessing and chat history"""
global query_engine, chat_history
if not question.strip():
return "Пожалуйста, введите вопрос по нормативной документации", "", get_chat_context()
if query_engine is None:
return "База знаний не готова. Сначала загрузите и обработайте нормативные документы.", "", get_chat_context()
try:
# Step 1: Preprocess query with chat history
improved_query = preprocess_query_with_context(question, chat_history)
# Step 2: Query with enhanced scoring
enhanced_response = query_documents_with_scores(query_engine, improved_query)
# Step 3: Format response with sources
formatted_response = format_enhanced_response_with_sources(enhanced_response)
# Step 4: Add conversational context
final_answer = create_chat_context_prompt(formatted_response['answer'], chat_history)
# Step 5: Add to chat history
add_to_chat_history(question, final_answer)
# Add query info to sources if preprocessing was used
sources_text = formatted_response['sources']
if improved_query != question:
sources_text = f"🔄 Улучшенный запрос: '{improved_query}'\n\n" + sources_text
return final_answer, sources_text, get_chat_context()
except Exception as e:
error_msg = f"Ошибка при обработке вопроса: {str(e)}"
add_to_chat_history(question, error_msg)
return error_msg, "", get_chat_context()
def clear_chat_history():
"""Clear chat history"""
global chat_history
chat_history = []
return "История чата очищена", get_chat_context()
def create_demo_interface():
with gr.Blocks(title="AIEXP - AI Expert для нормативной документации", theme=gr.themes.Soft()) as demo:
gr.Markdown("""
# 🤖 AIEXP - Artificial Intelligence Expert
## Инструмент для работы с нормативной документацией
**Возможности системы:**
- 🔍 Поиск информации по запросу с указанием источников среди нормативной документации
- 📋 Цитирование пунктов нормативной документации из базы знаний
- 📝 Краткий пересказ содержания разделов или целых нормативных документов
- 🔎 Семантический анализ соответствия информации требованиям НД
- 📋 Формирование пошаговых планов действий на основании требований НД
**Поддерживаемые форматы:** PDF, DOCX, TXT, CSV, XLSX, JSON
""")
with gr.Tab("🏠 Поиск по нормативным документам"):
gr.Markdown("### Задайте вопрос по нормативной документации")
with gr.Row():
with gr.Column(scale=3):
question_input = gr.Textbox(
label="Ваш вопрос к базе знаний",
placeholder="Введите вопрос по нормативным документам...",
lines=3
)
ask_btn = gr.Button("🔍 Найти ответ", variant="primary", size="lg")
clear_chat_btn = gr.Button("🗑️ Очистить историю чата", variant="secondary")
chat_context = gr.Textbox(
label="Контекст разговора",
lines=4,
interactive=False,
value=get_chat_context()
)
gr.Examples(
examples=[
"Какой стандарт устанавливает порядок признания протоколов испытаний продукции в области использования атомной энергии?",
"Кто несет ответственность за организацию и проведение признания протоколов испытаний продукции?",
"В каких случаях могут быть признаны протоколы испытаний, проведенные лабораториями, не включенными в перечисления?",
"Какие критерии используются органом по сертификации для анализа документации на втором этапе признания протоколов испытаний?"
],
inputs=question_input,
label="Примеры вопросов по нормативным документам"
)
with gr.Column(scale=4):
answer_output = gr.Textbox(
label="Ответ на основе нормативных документов",
lines=8,
interactive=False
)
sources_output = gr.Textbox(
label="Источники из нормативной документации",
lines=10,
interactive=False
)
# Event handlers
ask_btn.click(
fn=enhanced_answer_question,
inputs=[question_input],
outputs=[answer_output, sources_output, chat_context]
)
clear_chat_btn.click(
fn=clear_chat_history,
outputs=[chat_context]
)
with gr.Tab("📤 Управление базой знаний (Администратор)"):
gr.Markdown("### Загрузка и обработка нормативных документов")
with gr.Row():
with gr.Column(scale=2):
file_upload = gr.File(
label="Выберите нормативные документы для загрузки",
file_count="multiple",
file_types=[".pdf", ".docx", ".txt", ".csv", ".xlsx", ".json"]
)
with gr.Row():
upload_btn = gr.Button("📤 Загрузить в базу знаний", variant="primary")
process_btn = gr.Button("⚙️ Переобработать всю базу", variant="secondary")
clear_btn = gr.Button("🗑️ Очистить базу знаний", variant="stop")
with gr.Column(scale=2):
upload_status = gr.Textbox(
label="Статус загрузки",
lines=3,
interactive=False
)
processing_status = gr.Textbox(
label="Статус обработки базы знаний",
lines=5,
interactive=False
)
gr.Markdown("### База нормативных документов")
files_info = gr.Textbox(
label="Документы в базе знаний",
lines=8,
interactive=False,
value=get_uploaded_files_info()
)
with gr.Tab("📊 Статус AIEXP системы"):
gr.Markdown("### Информация о состоянии базы знаний")
system_status = gr.Textbox(
label="Статус AIEXP системы",
lines=10,
interactive=False,
value=get_system_status()
)
refresh_status_btn = gr.Button("🔄 Обновить статус системы")
upload_btn.click(
fn=upload_files,
inputs=[file_upload],
outputs=[upload_status, files_info]
)
process_btn.click(
fn=process_all_documents,
outputs=[processing_status]
)
ask_btn.click(
fn=answer_question,
inputs=[question_input],
outputs=[answer_output, sources_output]
)
question_input.submit(
fn=answer_question,
inputs=[question_input],
outputs=[answer_output, sources_output]
)
clear_btn.click(
fn=clear_all_data,
outputs=[processing_status, files_info, system_status]
)
refresh_status_btn.click(
fn=get_system_status,
outputs=[system_status]
)
return demo
if __name__ == "__main__":
print("Инициализация AIEXP системы...")
init_message = initialize_system()
print(init_message)
demo = create_demo_interface()
demo.launch(
share=True,
server_name="0.0.0.0",
server_port=7860,
show_error=True
)