samlax12's picture
Upload 29 files
9e00cc6 verified
# modules/knowledge_base/routes.py
from flask import Blueprint, request, jsonify
import os
import time
import threading
import uuid
from werkzeug.utils import secure_filename
# Import existing components
from modules.knowledge_base.processor import DocumentProcessor
from modules.knowledge_base.vector_store import VectorStore
from modules.knowledge_base.retriever import Retriever
from modules.knowledge_base.reranker import Reranker
knowledge_bp = Blueprint('knowledge', __name__)
# Initialize components
doc_processor = DocumentProcessor()
vector_store = VectorStore()
retriever = Retriever()
reranker = Reranker()
# Store progress information
processing_tasks = {}
# Upload folder configuration
UPLOAD_FOLDER = "uploads"
os.makedirs(UPLOAD_FOLDER, exist_ok=True)
@knowledge_bp.route('/', methods=['GET'])
def get_all_knowledge():
"""Get all knowledge base information"""
try:
indices = retriever.get_all_indices()
result = []
for index in indices:
display_name = index[4:] if index.startswith('rag_') else index
files = vector_store.get_files_in_index(index)
result.append({
"id": index,
"name": display_name,
"files": files,
"fileCount": len(files)
})
return jsonify({"success": True, "data": result})
except Exception as e:
import traceback
traceback.print_exc()
return jsonify({"success": False, "message": str(e)}), 500
@knowledge_bp.route('/', methods=['POST'])
def create_knowledge():
"""Create a new knowledge base"""
try:
data = request.form
name = data.get('name')
if not name:
return jsonify({"success": False, "message": "Knowledge base name cannot be empty"}), 400
# 检查知识库是否已存在
indices = retriever.get_all_indices()
if f"rag_{name}" in indices:
return jsonify({"success": False, "message": f"Knowledge base '{name}' already exists"}), 400
# 处理上传文件
if 'file' not in request.files:
return jsonify({"success": False, "message": "No file uploaded"}), 400
file = request.files['file']
if file.filename == '':
return jsonify({"success": False, "message": "No file selected"}), 400
# 保存原始文件名
original_filename = file.filename
# 从原始文件名中提取扩展名,确保中文文件名也能正确识别文件类型
file_ext = os.path.splitext(original_filename)[1].lower()
# 使用UUID生成唯一文件名
unique_filename = f"{uuid.uuid4().hex}{file_ext}"
file_path = os.path.join(UPLOAD_FOLDER, unique_filename)
file.save(file_path)
# 创建任务ID
task_id = f"task_{int(time.time())}_{name}"
# 初始化任务状态
processing_tasks[task_id] = {
"progress": 0,
"status": "Starting document processing...",
"index_name": name,
"file_path": file_path,
"original_filename": original_filename, # 保存原始文件名
"error": False,
"docCount": 0
}
# 处理文档的线程函数
def process_in_thread():
try:
# 更新任务状态
processing_tasks[task_id]["progress"] = 10
processing_tasks[task_id]["status"] = "Loading document..."
# 处理文档进度回调
def update_progress(progress, status):
processing_tasks[task_id]["progress"] = min(95, progress)
processing_tasks[task_id]["status"] = status
# 处理文档,传递原始文件名
processed_docs = doc_processor.process(
file_path,
progress_callback=update_progress,
original_filename=original_filename # 传递原始文件名
)
# 更新任务状态
processing_tasks[task_id]["progress"] = 95
processing_tasks[task_id]["status"] = "Creating vector store..."
processing_tasks[task_id]["docCount"] = len(processed_docs)
# 存储向量
vector_store.store(processed_docs, f"rag_{name}")
# 完成任务
processing_tasks[task_id]["progress"] = 100
processing_tasks[task_id]["status"] = "Processing complete"
except Exception as e:
# 记录错误
processing_tasks[task_id]["error"] = True
processing_tasks[task_id]["status"] = f"Processing failed: {str(e)}"
import traceback
traceback.print_exc()
threading.Thread(target=process_in_thread).start()
return jsonify({
"success": True,
"message": "Started processing document",
"task_id": task_id
}), 202
except Exception as e:
import traceback
traceback.print_exc()
return jsonify({"success": False, "message": str(e)}), 500
@knowledge_bp.route('/progress/<task_id>', methods=['GET'])
def get_progress(task_id):
"""Get document processing progress"""
try:
task_data = processing_tasks.get(task_id, {
"progress": 0,
"status": "Task not found",
"error": True
})
return jsonify({"success": True, "data": task_data})
except Exception as e:
import traceback
traceback.print_exc()
return jsonify({"success": False, "message": str(e)}), 500
@knowledge_bp.route('/<index_id>/documents', methods=['POST'])
def add_documents(index_id):
"""Add documents to a knowledge base"""
try:
# 检查知识库是否存在
indices = retriever.get_all_indices()
if index_id not in indices:
return jsonify({"success": False, "message": "Knowledge base does not exist"}), 404
# 处理上传文件
if 'file' not in request.files:
return jsonify({"success": False, "message": "No file uploaded"}), 400
file = request.files['file']
if file.filename == '':
return jsonify({"success": False, "message": "No file selected"}), 400
# 保存原始文件名并使用UUID生成唯一文件名
original_filename = file.filename
file_ext = os.path.splitext(original_filename)[1].lower()
unique_filename = f"{uuid.uuid4().hex}{file_ext}"
file_path = os.path.join(UPLOAD_FOLDER, unique_filename)
file.save(file_path)
# 提取知识库名称
kb_name = index_id[4:] if index_id.startswith('rag_') else index_id
# 创建任务ID
task_id = f"task_{int(time.time())}_{kb_name}_{uuid.uuid4().hex[:8]}"
# 初始化任务状态
processing_tasks[task_id] = {
"progress": 0,
"status": "Starting document processing...",
"index_name": kb_name,
"file_path": file_path,
"original_filename": original_filename, # 保存原始文件名
"error": False,
"docCount": 0
}
# 处理文档的线程函数
def process_in_thread():
try:
# 更新任务状态
processing_tasks[task_id]["progress"] = 10
processing_tasks[task_id]["status"] = "Loading document..."
# 处理文档进度回调
def update_progress(progress, status):
processing_tasks[task_id]["progress"] = min(95, progress)
processing_tasks[task_id]["status"] = status
# 处理文档,传递原始文件名
processed_docs = doc_processor.process(
file_path,
progress_callback=update_progress,
original_filename=original_filename # 传递原始文件名
)
# 更新任务状态
processing_tasks[task_id]["progress"] = 95
processing_tasks[task_id]["status"] = "Creating vector store..."
processing_tasks[task_id]["docCount"] = len(processed_docs)
# 存储向量
vector_store.store(processed_docs, index_id)
# 完成任务
processing_tasks[task_id]["progress"] = 100
processing_tasks[task_id]["status"] = "Processing complete"
except Exception as e:
# 记录错误
processing_tasks[task_id]["error"] = True
processing_tasks[task_id]["status"] = f"Processing failed: {str(e)}"
import traceback
traceback.print_exc()
threading.Thread(target=process_in_thread).start()
return jsonify({
"success": True,
"message": "Started processing document",
"task_id": task_id
}), 202
except Exception as e:
import traceback
traceback.print_exc()
return jsonify({"success": False, "message": str(e)}), 500
@knowledge_bp.route('/<index_id>', methods=['DELETE'])
def delete_knowledge(index_id):
"""Delete a knowledge base"""
try:
result = vector_store.delete_index(index_id)
if result:
return jsonify({"success": True, "message": "Knowledge base deleted successfully"})
else:
return jsonify({"success": False, "message": "Failed to delete knowledge base"})
except Exception as e:
import traceback
traceback.print_exc()
return jsonify({"success": False, "message": str(e)}), 500
@knowledge_bp.route('/<index_id>/documents/<path:file_name>', methods=['DELETE'])
def delete_document(index_id, file_name):
"""Delete a document from a knowledge base"""
try:
result = vector_store.delete_document(index_id, file_name)
if result:
return jsonify({"success": True, "message": "Document deleted successfully"})
else:
return jsonify({"success": False, "message": "Failed to delete document"})
except Exception as e:
import traceback
traceback.print_exc()
return jsonify({"success": False, "message": str(e)}), 500