samlax12's picture
Upload 23 files
1625bb7 verified
raw
history blame
10.3 kB
# modules/knowledge_base/routes.py
from flask import Blueprint, request, jsonify
import os
import time
import threading
import uuid
from werkzeug.utils import secure_filename
# Import existing components
from modules.knowledge_base.processor import DocumentProcessor
from modules.knowledge_base.vector_store import VectorStore
from modules.knowledge_base.retriever import Retriever
from modules.knowledge_base.reranker import Reranker
knowledge_bp = Blueprint('knowledge', __name__)
# Initialize components
doc_processor = DocumentProcessor()
vector_store = VectorStore()
retriever = Retriever()
reranker = Reranker()
# Store progress information
processing_tasks = {}
# Upload folder configuration
UPLOAD_FOLDER = "uploads"
os.makedirs(UPLOAD_FOLDER, exist_ok=True)
@knowledge_bp.route('/', methods=['GET'])
def get_all_knowledge():
"""Get all knowledge base information"""
try:
indices = retriever.get_all_indices()
result = []
for index in indices:
display_name = index[4:] if index.startswith('rag_') else index
files = vector_store.get_files_in_index(index)
result.append({
"id": index,
"name": display_name,
"files": files,
"fileCount": len(files)
})
return jsonify({"success": True, "data": result})
except Exception as e:
import traceback
traceback.print_exc()
return jsonify({"success": False, "message": str(e)}), 500
@knowledge_bp.route('/', methods=['POST'])
def create_knowledge():
"""Create a new knowledge base"""
try:
data = request.form
name = data.get('name')
if not name:
return jsonify({"success": False, "message": "Knowledge base name cannot be empty"}), 400
# Check if knowledge base already exists
indices = retriever.get_all_indices()
if f"rag_{name}" in indices:
return jsonify({"success": False, "message": f"Knowledge base '{name}' already exists"}), 400
# Process uploaded file
if 'file' not in request.files:
return jsonify({"success": False, "message": "No file uploaded"}), 400
file = request.files['file']
if file.filename == '':
return jsonify({"success": False, "message": "No file selected"}), 400
# Save file
filename = secure_filename(file.filename)
file_path = os.path.join(UPLOAD_FOLDER, filename)
file.save(file_path)
# Create task ID
task_id = f"task_{int(time.time())}_{name}"
# Initialize task status
processing_tasks[task_id] = {
"progress": 0,
"status": "Starting document processing...",
"index_name": name,
"file_path": file_path,
"error": False,
"docCount": 0
}
# Process documents in a separate thread
def process_in_thread():
try:
# Update task status
processing_tasks[task_id]["progress"] = 10
processing_tasks[task_id]["status"] = "Loading document..."
# Process document with progress tracking
def update_progress(progress, status):
processing_tasks[task_id]["progress"] = min(95, progress)
processing_tasks[task_id]["status"] = status
# Process the document
processed_docs = doc_processor.process(file_path, progress_callback=update_progress)
# Update task status
processing_tasks[task_id]["progress"] = 95
processing_tasks[task_id]["status"] = "Creating vector store..."
processing_tasks[task_id]["docCount"] = len(processed_docs)
# Store vectors
vector_store.store(processed_docs, f"rag_{name}")
# Complete task
processing_tasks[task_id]["progress"] = 100
processing_tasks[task_id]["status"] = "Processing complete"
except Exception as e:
# Record error
processing_tasks[task_id]["error"] = True
processing_tasks[task_id]["status"] = f"Processing failed: {str(e)}"
import traceback
traceback.print_exc()
threading.Thread(target=process_in_thread).start()
return jsonify({
"success": True,
"message": "Started processing document",
"task_id": task_id
}), 202
except Exception as e:
import traceback
traceback.print_exc()
return jsonify({"success": False, "message": str(e)}), 500
@knowledge_bp.route('/progress/<task_id>', methods=['GET'])
def get_progress(task_id):
"""Get document processing progress"""
try:
task_data = processing_tasks.get(task_id, {
"progress": 0,
"status": "Task not found",
"error": True
})
return jsonify({"success": True, "data": task_data})
except Exception as e:
import traceback
traceback.print_exc()
return jsonify({"success": False, "message": str(e)}), 500
@knowledge_bp.route('/<index_id>/documents', methods=['POST'])
def add_documents(index_id):
"""Add documents to a knowledge base"""
try:
# Check if knowledge base exists
indices = retriever.get_all_indices()
if index_id not in indices:
return jsonify({"success": False, "message": "Knowledge base does not exist"}), 404
# Process uploaded file
if 'file' not in request.files:
return jsonify({"success": False, "message": "No file uploaded"}), 400
file = request.files['file']
if file.filename == '':
return jsonify({"success": False, "message": "No file selected"}), 400
# Save file
filename = secure_filename(file.filename)
file_path = os.path.join(UPLOAD_FOLDER, filename)
file.save(file_path)
# Extract knowledge base name from index ID
kb_name = index_id[4:] if index_id.startswith('rag_') else index_id
# Create task ID
task_id = f"task_{int(time.time())}_{kb_name}_{filename}"
# Initialize task status
processing_tasks[task_id] = {
"progress": 0,
"status": "Starting document processing...",
"index_name": kb_name,
"file_path": file_path,
"error": False,
"docCount": 0
}
# Process documents in a separate thread
def process_in_thread():
try:
# Update task status
processing_tasks[task_id]["progress"] = 10
processing_tasks[task_id]["status"] = "Loading document..."
# Process document with progress tracking
def update_progress(progress, status):
processing_tasks[task_id]["progress"] = min(95, progress)
processing_tasks[task_id]["status"] = status
# Process the document
processed_docs = doc_processor.process(file_path, progress_callback=update_progress)
# Update task status
processing_tasks[task_id]["progress"] = 95
processing_tasks[task_id]["status"] = "Creating vector store..."
processing_tasks[task_id]["docCount"] = len(processed_docs)
# Store vectors
vector_store.store(processed_docs, index_id)
# Complete task
processing_tasks[task_id]["progress"] = 100
processing_tasks[task_id]["status"] = "Processing complete"
except Exception as e:
# Record error
processing_tasks[task_id]["error"] = True
processing_tasks[task_id]["status"] = f"Processing failed: {str(e)}"
import traceback
traceback.print_exc()
threading.Thread(target=process_in_thread).start()
return jsonify({
"success": True,
"message": "Started processing document",
"task_id": task_id
}), 202
except Exception as e:
import traceback
traceback.print_exc()
return jsonify({"success": False, "message": str(e)}), 500
@knowledge_bp.route('/<index_id>', methods=['DELETE'])
def delete_knowledge(index_id):
"""Delete a knowledge base"""
try:
result = vector_store.delete_index(index_id)
if result:
return jsonify({"success": True, "message": "Knowledge base deleted successfully"})
else:
return jsonify({"success": False, "message": "Failed to delete knowledge base"})
except Exception as e:
import traceback
traceback.print_exc()
return jsonify({"success": False, "message": str(e)}), 500
@knowledge_bp.route('/<index_id>/documents/<path:file_name>', methods=['DELETE'])
def delete_document(index_id, file_name):
"""Delete a document from a knowledge base"""
try:
result = vector_store.delete_document(index_id, file_name)
if result:
return jsonify({"success": True, "message": "Document deleted successfully"})
else:
return jsonify({"success": False, "message": "Failed to delete document"})
except Exception as e:
import traceback
traceback.print_exc()
return jsonify({"success": False, "message": str(e)}), 500