import gradio as gr import os import shutil import pandas as pd from datetime import datetime from scripts.document_processor import * from scripts.rag_engine import * import json import tempfile from scripts.config import * if not os.path.exists(UPLOAD_FOLDER): os.makedirs(UPLOAD_FOLDER) if not os.path.exists("processed_data"): os.makedirs("processed_data") if not os.path.exists(RAG_FILES_DIR): os.makedirs(RAG_FILES_DIR) def initialize_system(): global query_engine query_engine = None try: query_engine = load_rag_system() if query_engine is not None: chunk_count = 0 if os.path.exists(PROCESSED_DATA_FILE): processed_chunks = load_processed_chunks(PROCESSED_DATA_FILE) chunk_count = len(processed_chunks) else: try: import pickle with open(os.path.join("processed_data", 'documents.pkl'), 'rb') as f: documents = pickle.load(f) chunk_count = len(documents) except: chunk_count = "неизвестно" return f"AIEXP система инициализирована с {chunk_count} фрагментами нормативных документов (загружена из сохраненного индекса)" except Exception as e: print(f"Не удалось загрузить сохраненную систему: {str(e)}") if os.path.exists(PROCESSED_DATA_FILE): try: processed_chunks_df = load_processed_chunks(PROCESSED_DATA_FILE) # Проверяем наличие нужных столбцов required_columns = {'file_link', 'chunk_text', 'chunk_id', 'document_id'} if not required_columns.issubset(set(processed_chunks_df.columns)): return f"Ошибка при инициализации из CSV: отсутствуют необходимые столбцы: {required_columns - set(processed_chunks_df.columns)}" processed_chunks = processed_chunks_df.to_dict('records') if processed_chunks: query_engine = build_rag_system(processed_chunks) return f"AIEXP система инициализирована с {len(processed_chunks)} фрагментами нормативных документов (построена из CSV)" except Exception as e: return f"Ошибка при инициализации из CSV: {str(e)}" return "AIEXP система готова к работе. Загрузите нормативные документы для создания базы знаний." def get_uploaded_files_info(): if not os.path.exists(UPLOAD_FOLDER): return "Нет загруженных файлов в базе знаний" files = os.listdir(UPLOAD_FOLDER) if not files: return "Нет загруженных файлов в базе знаний" file_info = [] file_count = len(files) for file in files: file_path = os.path.join(UPLOAD_FOLDER, file) size = os.path.getsize(file_path) modified = datetime.fromtimestamp(os.path.getmtime(file_path)).strftime("%Y-%m-%d %H:%M") file_info.append(f"📄 {file} ({size} байт, добавлен: {modified})") return f"Всего нормативных документов в базе: {file_count}\n\n" + "\n".join(file_info) def upload_files(files): global query_engine if not files: return "Файлы не выбраны", get_uploaded_files_info() uploaded_count = 0 errors = [] for file in files: try: filename = os.path.basename(file.name) destination = os.path.join(UPLOAD_FOLDER, filename) shutil.copy2(file.name, destination) uploaded_count += 1 if query_engine is not None: try: query_engine = add_new_document_to_system(destination, query_engine) except Exception as e: errors.append(f"Ошибка добавления {filename} в систему: {str(e)}") except Exception as e: errors.append(f"Ошибка загрузки {file.name}: {str(e)}") result_message = f"Загружено нормативных документов: {uploaded_count}" if errors: result_message += f"\nОшибки:\n" + "\n".join(errors) else: result_message += f"\nДокументы автоматически добавлены в базу знаний" return result_message, get_uploaded_files_info() def process_all_documents(): global query_engine if not os.path.exists(UPLOAD_FOLDER): return "Папка с нормативными документами не найдена" files = os.listdir(UPLOAD_FOLDER) if not files: return "Нет нормативных документов для обработки" file_paths = [os.path.join(UPLOAD_FOLDER, f) for f in files] try: processed_chunks = process_multiple_documents(file_paths) if not processed_chunks: return "Не удалось создать фрагменты нормативных документов" save_processed_chunks(processed_chunks, PROCESSED_DATA_FILE) query_engine = build_rag_system(processed_chunks) with open(INDEX_STATE_FILE, 'w', encoding='utf-8') as f: json.dump({ 'processed_files': files, 'chunks_count': len(processed_chunks), 'last_update': datetime.now().isoformat() }, f, ensure_ascii=False, indent=2) return f"Обработка базы знаний завершена успешно!\nОбработано нормативных документов: {len(files)}\nСоздано фрагментов: {len(processed_chunks)}\nAIEXP система готова для работы с нормативной документацией." except Exception as e: return f"Ошибка при обработке нормативных документов: {str(e)}" def get_system_status(): status_info = [] files_count = len(os.listdir(UPLOAD_FOLDER)) if os.path.exists(UPLOAD_FOLDER) else 0 if os.path.exists(INDEX_STATE_FILE): with open(INDEX_STATE_FILE, 'r', encoding='utf-8') as f: state = json.load(f) status_info.append(f"🟢 AIEXP система активна") status_info.append(f"📊 Нормативных документов в базе: {files_count}") status_info.append(f"📝 Фрагментов в индексе: {state.get('chunks_count', 0)}") status_info.append(f"🕒 Последнее обновление: {state.get('last_update', 'Неизвестно')}") if state.get('processed_files'): status_info.append(f"📋 Обработанные документы:") for file in state['processed_files'][:10]: status_info.append(f" • {file}") if len(state['processed_files']) > 10: status_info.append(f" ... и еще {len(state['processed_files']) - 10} документов") else: status_info.append("🔴 AIEXP система не инициализирована") status_info.append(f"📊 Нормативных документов загружено: {files_count}") status_info.append("Обработайте документы для создания базы знаний") return "\n".join(status_info) def answer_question(question): global query_engine if not question.strip(): return "Пожалуйста, введите вопрос по нормативной документации", "" if query_engine is None: return "База знаний не готова. Сначала загрузите и обработайте нормативные документы.", "" try: response = query_documents(query_engine, question) formatted_response = format_response_with_sources(response) answer = formatted_response['answer'] sources_info = [] sources_info.append("📚 Источники из нормативной документации:") for i, source in enumerate(formatted_response['sources'][:5], 1): sources_info.append(f"\n{i}. Документ: {source['document_id']}") if source['section']: sources_info.append(f" Раздел: {source['section']}") if source['subsection']: sources_info.append(f" Подраздел: {source['subsection']}") sources_info.append(f" Фрагмент: ...{source['text_preview'][:150]}...") return answer, "\n".join(sources_info) except Exception as e: return f"Ошибка при обработке вопроса: {str(e)}", "" def clear_all_data(): global query_engine try: if os.path.exists(UPLOAD_FOLDER): shutil.rmtree(UPLOAD_FOLDER) os.makedirs(UPLOAD_FOLDER) if os.path.exists("processed_data"): shutil.rmtree("processed_data") os.makedirs("processed_data") if os.path.exists(RAG_FILES_DIR): shutil.rmtree(RAG_FILES_DIR) os.makedirs(RAG_FILES_DIR) query_engine = None return "Вся база знаний успешно очищена", get_uploaded_files_info(), get_system_status() except Exception as e: return f"Ошибка при очистке базы знаний: {str(e)}", get_uploaded_files_info(), get_system_status() chat_history = [] def add_to_chat_history(user_query, assistant_response): """Add exchange to chat history""" global chat_history chat_history.append({ 'user': user_query, 'assistant': assistant_response, 'timestamp': datetime.now().isoformat() }) # Keep only last 10 exchanges to prevent memory issues if len(chat_history) > 10: chat_history = chat_history[-10:] def get_chat_context(): """Get formatted chat history""" if not chat_history: return "Новый разговор" context = "История разговора:\n" for i, exchange in enumerate(chat_history[-3:], 1): # Show last 3 context += f"{i}. Пользователь: {exchange['user'][:100]}...\n" context += f" Ответ: {exchange['assistant'][:100]}...\n" return context def enhanced_answer_question(question): """Enhanced version with preprocessing and chat history""" global query_engine, chat_history if not question.strip(): return "Пожалуйста, введите вопрос по нормативной документации", "", get_chat_context() if query_engine is None: return "База знаний не готова. Сначала загрузите и обработайте нормативные документы.", "", get_chat_context() try: # Step 1: Preprocess query with chat history improved_query = preprocess_query_with_context(question, chat_history) # Step 2: Query with enhanced scoring enhanced_response = query_documents_with_scores(query_engine, improved_query) # Step 3: Format response with sources formatted_response = format_enhanced_response_with_sources(enhanced_response) # Step 4: Add conversational context final_answer = create_chat_context_prompt(formatted_response['answer'], chat_history) # Step 5: Add to chat history add_to_chat_history(question, final_answer) # Add query info to sources if preprocessing was used sources_text = formatted_response['sources'] if improved_query != question: sources_text = f"🔄 Улучшенный запрос: '{improved_query}'\n\n" + sources_text return final_answer, sources_text, get_chat_context() except Exception as e: error_msg = f"Ошибка при обработке вопроса: {str(e)}" add_to_chat_history(question, error_msg) return error_msg, "", get_chat_context() def clear_chat_history(): """Clear chat history""" global chat_history chat_history = [] return "История чата очищена", get_chat_context() def create_demo_interface(): with gr.Blocks(title="AIEXP - AI Expert для нормативной документации", theme=gr.themes.Soft()) as demo: gr.Markdown(""" # 🤖 AIEXP - Artificial Intelligence Expert ## Инструмент для работы с нормативной документацией **Возможности системы:** - 🔍 Поиск информации по запросу с указанием источников среди нормативной документации - 📋 Цитирование пунктов нормативной документации из базы знаний - 📝 Краткий пересказ содержания разделов или целых нормативных документов - 🔎 Семантический анализ соответствия информации требованиям НД - 📋 Формирование пошаговых планов действий на основании требований НД **Поддерживаемые форматы:** PDF, DOCX, TXT, CSV, XLSX, JSON """) with gr.Tab("🏠 Поиск по нормативным документам"): gr.Markdown("### Задайте вопрос по нормативной документации") with gr.Row(): with gr.Column(scale=3): question_input = gr.Textbox( label="Ваш вопрос к базе знаний", placeholder="Введите вопрос по нормативным документам...", lines=3 ) ask_btn = gr.Button("🔍 Найти ответ", variant="primary", size="lg") clear_chat_btn = gr.Button("🗑️ Очистить историю чата", variant="secondary") chat_context = gr.Textbox( label="Контекст разговора", lines=4, interactive=False, value=get_chat_context() ) gr.Examples( examples=[ "Какой стандарт устанавливает порядок признания протоколов испытаний продукции в области использования атомной энергии?", "Кто несет ответственность за организацию и проведение признания протоколов испытаний продукции?", "В каких случаях могут быть признаны протоколы испытаний, проведенные лабораториями, не включенными в перечисления?", "Какие критерии используются органом по сертификации для анализа документации на втором этапе признания протоколов испытаний?" ], inputs=question_input, label="Примеры вопросов по нормативным документам" ) with gr.Column(scale=4): answer_output = gr.Textbox( label="Ответ на основе нормативных документов", lines=8, interactive=False ) sources_output = gr.Textbox( label="Источники из нормативной документации", lines=10, interactive=False ) # Event handlers ask_btn.click( fn=enhanced_answer_question, inputs=[question_input], outputs=[answer_output, sources_output, chat_context] ) clear_chat_btn.click( fn=clear_chat_history, outputs=[chat_context] ) with gr.Tab("📤 Управление базой знаний (Администратор)"): gr.Markdown("### Загрузка и обработка нормативных документов") with gr.Row(): with gr.Column(scale=2): file_upload = gr.File( label="Выберите нормативные документы для загрузки", file_count="multiple", file_types=[".pdf", ".docx", ".txt", ".csv", ".xlsx", ".json"] ) with gr.Row(): upload_btn = gr.Button("📤 Загрузить в базу знаний", variant="primary") process_btn = gr.Button("⚙️ Переобработать всю базу", variant="secondary") clear_btn = gr.Button("🗑️ Очистить базу знаний", variant="stop") with gr.Column(scale=2): upload_status = gr.Textbox( label="Статус загрузки", lines=3, interactive=False ) processing_status = gr.Textbox( label="Статус обработки базы знаний", lines=5, interactive=False ) gr.Markdown("### База нормативных документов") files_info = gr.Textbox( label="Документы в базе знаний", lines=8, interactive=False, value=get_uploaded_files_info() ) with gr.Tab("📊 Статус AIEXP системы"): gr.Markdown("### Информация о состоянии базы знаний") system_status = gr.Textbox( label="Статус AIEXP системы", lines=10, interactive=False, value=get_system_status() ) refresh_status_btn = gr.Button("🔄 Обновить статус системы") upload_btn.click( fn=upload_files, inputs=[file_upload], outputs=[upload_status, files_info] ) process_btn.click( fn=process_all_documents, outputs=[processing_status] ) ask_btn.click( fn=answer_question, inputs=[question_input], outputs=[answer_output, sources_output] ) question_input.submit( fn=answer_question, inputs=[question_input], outputs=[answer_output, sources_output] ) clear_btn.click( fn=clear_all_data, outputs=[processing_status, files_info, system_status] ) refresh_status_btn.click( fn=get_system_status, outputs=[system_status] ) return demo if __name__ == "__main__": print("Инициализация AIEXP системы...") init_message = initialize_system() print(init_message) demo = create_demo_interface() demo.launch( share=True, server_name="0.0.0.0", server_port=7860, show_error=True )