tokenizers / app /routes.py
bartar's picture
Upload 26 files
d66ab65 verified
import os
from flask import Blueprint, request, render_template, jsonify, current_app
# Import services
from .services.tokenizer_service import tokenizer_service
from .services.file_service import file_service
from .utils.validators import validators, ValidationError
# Create Blueprint
main_bp = Blueprint('main', __name__)
@main_bp.route('/tokenizer-info', methods=['GET'])
def tokenizer_info():
"""Endpoint to get tokenizer information without processing text."""
model_id = request.args.get('model_id', '')
is_custom = request.args.get('is_custom', 'false').lower() == 'true'
if not model_id:
return jsonify({"error": "No model ID provided"}), 400
try:
# Validate custom model path if it's a custom model
if is_custom:
try:
validators.validate_model_path(model_id)
except ValidationError as e:
return jsonify({"error": str(e)}), 400
# For predefined models, use the model name from the dictionary
if not is_custom and tokenizer_service.is_predefined_model(model_id):
model_id_or_name = model_id
else:
# For custom models, use the model ID directly
model_id_or_name = model_id
# Load the tokenizer and get info
tokenizer, info, error = tokenizer_service.load_tokenizer(model_id_or_name)
if error:
return jsonify({"error": error}), 400
return jsonify(info)
except Exception as e:
return jsonify({"error": f"Failed to get tokenizer info: {str(e)}"}), 500
@main_bp.route('/', methods=['GET', 'POST'])
def index():
text = ""
token_data = None
error_message = ""
selected_model = request.args.get('model', request.form.get('model', 'qwen3'))
custom_model = request.args.get('custom_model', request.form.get('custom_model', ''))
model_type = request.args.get('model_type', request.form.get('model_type', 'predefined'))
# Determine which model to use based on model_type
model_to_use = selected_model if model_type == 'predefined' else custom_model
if request.method == 'POST':
# Check if file upload
if 'file' in request.files and request.files['file'].filename:
uploaded_file = request.files['file']
try:
# Validate file
validators.validate_filename(uploaded_file.filename)
validators.validate_file_extension(uploaded_file.filename, file_service.ALLOWED_EXTENSIONS)
# Validate custom model if needed
if model_type == 'custom' and custom_model:
validators.validate_model_path(custom_model)
# Save file securely
file_path = file_service.save_uploaded_file(uploaded_file, current_app.config['UPLOAD_FOLDER'])
# Read a small preview of the file
preview_char_limit = current_app.config.get('PREVIEW_CHAR_LIMIT', 8096)
with open(file_path, 'r', errors='replace') as f:
text = f.read(preview_char_limit)
try:
# Process the file using file service
token_data = file_service.process_file_for_tokenization(
file_path=file_path,
model_id_or_name=model_to_use,
preview_char_limit=preview_char_limit,
max_display_tokens=current_app.config.get('MAX_DISPLAY_TOKENS', 50000),
chunk_size=current_app.config.get('CHUNK_SIZE', 1024 * 1024)
)
# Clean up the file after processing
file_service.cleanup_file(file_path)
# If request is AJAX, return JSON
if request.headers.get('X-Requested-With') == 'XMLHttpRequest':
return jsonify(token_data)
except Exception as e:
error_message = str(e)
file_service.cleanup_file(file_path)
if request.headers.get('X-Requested-With') == 'XMLHttpRequest':
return jsonify({"error": error_message}), 400
return render_template(
'index.html',
text=text,
token_data=None,
models=tokenizer_service.TOKENIZER_MODELS,
selected_model=selected_model,
custom_model=custom_model,
model_type=model_type,
error=error_message
)
except ValidationError as e:
error_message = str(e)
if request.headers.get('X-Requested-With') == 'XMLHttpRequest':
return jsonify({"error": error_message}), 400
return render_template(
'index.html',
text="",
token_data=None,
models=tokenizer_service.TOKENIZER_MODELS,
selected_model=selected_model,
custom_model=custom_model,
model_type=model_type,
error=error_message
)
# Regular text processing
else:
text = request.form.get('text', '')
if text:
try:
# Validate text input
validators.validate_text_input(text)
# Validate custom model if needed
if model_type == 'custom' and custom_model:
validators.validate_model_path(custom_model)
# Process text using file service
token_data = file_service.process_text_for_tokenization(
text=text,
model_id_or_name=model_to_use,
preview_char_limit=current_app.config.get('PREVIEW_CHAR_LIMIT', 8096),
max_display_tokens=current_app.config.get('MAX_DISPLAY_TOKENS', 50000)
)
# If request is AJAX, return JSON
if request.headers.get('X-Requested-With') == 'XMLHttpRequest':
return jsonify(token_data)
except ValidationError as e:
error_message = str(e)
if request.headers.get('X-Requested-With') == 'XMLHttpRequest':
return jsonify({"error": error_message}), 400
return render_template(
'index.html',
text=text,
token_data=None,
models=tokenizer_service.TOKENIZER_MODELS,
selected_model=selected_model,
custom_model=custom_model,
model_type=model_type,
error=error_message
)
except Exception as e:
error_message = str(e)
if request.headers.get('X-Requested-With') == 'XMLHttpRequest':
return jsonify({"error": error_message}), 400
return render_template(
'index.html',
text=text,
token_data=None,
models=tokenizer_service.TOKENIZER_MODELS,
selected_model=selected_model,
custom_model=custom_model,
model_type=model_type,
error=error_message
)
return render_template(
'index.html',
text=text,
token_data=token_data,
models=tokenizer_service.TOKENIZER_MODELS,
selected_model=selected_model,
custom_model=custom_model,
model_type=model_type,
error=error_message
)
@main_bp.route('/health', methods=['GET'])
def health_check():
"""Basic health check endpoint."""
import time
import psutil
from flask import __version__ as flask_version
try:
# Basic application status
status = {
"status": "healthy",
"timestamp": int(time.time()),
"version": "1.0.0",
"flask_version": flask_version,
"uptime": int(time.time()), # Simple uptime since this request
}
return jsonify(status), 200
except Exception as e:
return jsonify({
"status": "unhealthy",
"error": str(e),
"timestamp": int(time.time())
}), 500
@main_bp.route('/health/detailed', methods=['GET'])
def detailed_health_check():
"""Detailed health check with system and service status."""
import time
import psutil
import os
from flask import __version__ as flask_version
try:
# System information
cpu_percent = psutil.cpu_percent(interval=1)
memory = psutil.virtual_memory()
disk = psutil.disk_usage('/')
# Check tokenizer service
tokenizer_status = "healthy"
tokenizer_cache_size = len(tokenizer_service.tokenizers) + len(tokenizer_service.custom_tokenizers)
# Test basic tokenizer loading
try:
test_tokenizer, _, error = tokenizer_service.load_tokenizer('gpt2')
if error:
tokenizer_status = f"warning: {error}"
except Exception as e:
tokenizer_status = f"error: {str(e)}"
# Check upload directory
upload_folder = current_app.config.get('UPLOAD_FOLDER', '/tmp')
upload_dir_exists = os.path.exists(upload_folder)
upload_dir_writable = os.access(upload_folder, os.W_OK) if upload_dir_exists else False
status = {
"status": "healthy",
"timestamp": int(time.time()),
"version": "1.0.0",
"flask_version": flask_version,
"system": {
"cpu_percent": round(cpu_percent, 1),
"memory": {
"total": memory.total,
"available": memory.available,
"percent": memory.percent,
"used": memory.used
},
"disk": {
"total": disk.total,
"used": disk.used,
"free": disk.free,
"percent": round((disk.used / disk.total) * 100, 1)
}
},
"services": {
"tokenizer_service": {
"status": tokenizer_status,
"cached_tokenizers": tokenizer_cache_size,
"available_models": len(tokenizer_service.TOKENIZER_MODELS)
},
"file_service": {
"upload_directory": upload_folder,
"directory_exists": upload_dir_exists,
"directory_writable": upload_dir_writable,
"allowed_extensions": list(file_service.ALLOWED_EXTENSIONS)
}
},
"configuration": {
"max_content_length": current_app.config.get('MAX_CONTENT_LENGTH'),
"cache_expiration": current_app.config.get('CACHE_EXPIRATION', 3600),
"max_display_tokens": current_app.config.get('MAX_DISPLAY_TOKENS', 50000),
"preview_char_limit": current_app.config.get('PREVIEW_CHAR_LIMIT', 8096)
}
}
# Determine overall status
overall_status = "healthy"
if tokenizer_status.startswith("error"):
overall_status = "unhealthy"
elif tokenizer_status.startswith("warning") or not upload_dir_writable:
overall_status = "degraded"
status["status"] = overall_status
return jsonify(status), 200 if overall_status == "healthy" else 503
except Exception as e:
return jsonify({
"status": "unhealthy",
"error": str(e),
"timestamp": int(time.time())
}), 500
@main_bp.route('/health/ready', methods=['GET'])
def readiness_check():
"""Readiness check - determines if the application is ready to serve requests."""
try:
# Check if core services are ready
checks = {
"tokenizer_service": False,
"file_service": False,
"configuration": False
}
# Test tokenizer service
try:
test_tokenizer, _, error = tokenizer_service.load_tokenizer('gpt2')
checks["tokenizer_service"] = error is None
except:
checks["tokenizer_service"] = False
# Test file service
try:
upload_folder = current_app.config.get('UPLOAD_FOLDER', '/tmp')
checks["file_service"] = os.path.exists(upload_folder) and os.access(upload_folder, os.W_OK)
except:
checks["file_service"] = False
# Check configuration
required_configs = ['MAX_CONTENT_LENGTH', 'UPLOAD_FOLDER']
checks["configuration"] = all(current_app.config.get(config) is not None for config in required_configs)
all_ready = all(checks.values())
return jsonify({
"ready": all_ready,
"checks": checks,
"timestamp": int(time.time())
}), 200 if all_ready else 503
except Exception as e:
return jsonify({
"ready": False,
"error": str(e),
"timestamp": int(time.time())
}), 500