Spaces:
Running
Running
import os | |
from flask import Blueprint, request, render_template, jsonify, current_app | |
# Import services | |
from .services.tokenizer_service import tokenizer_service | |
from .services.file_service import file_service | |
from .utils.validators import validators, ValidationError | |
# Create Blueprint | |
main_bp = Blueprint('main', __name__) | |
def tokenizer_info(): | |
"""Endpoint to get tokenizer information without processing text.""" | |
model_id = request.args.get('model_id', '') | |
is_custom = request.args.get('is_custom', 'false').lower() == 'true' | |
if not model_id: | |
return jsonify({"error": "No model ID provided"}), 400 | |
try: | |
# Validate custom model path if it's a custom model | |
if is_custom: | |
try: | |
validators.validate_model_path(model_id) | |
except ValidationError as e: | |
return jsonify({"error": str(e)}), 400 | |
# For predefined models, use the model name from the dictionary | |
if not is_custom and tokenizer_service.is_predefined_model(model_id): | |
model_id_or_name = model_id | |
else: | |
# For custom models, use the model ID directly | |
model_id_or_name = model_id | |
# Load the tokenizer and get info | |
tokenizer, info, error = tokenizer_service.load_tokenizer(model_id_or_name) | |
if error: | |
return jsonify({"error": error}), 400 | |
return jsonify(info) | |
except Exception as e: | |
return jsonify({"error": f"Failed to get tokenizer info: {str(e)}"}), 500 | |
def index(): | |
text = "" | |
token_data = None | |
error_message = "" | |
selected_model = request.args.get('model', request.form.get('model', 'qwen3')) | |
custom_model = request.args.get('custom_model', request.form.get('custom_model', '')) | |
model_type = request.args.get('model_type', request.form.get('model_type', 'predefined')) | |
# Determine which model to use based on model_type | |
model_to_use = selected_model if model_type == 'predefined' else custom_model | |
if request.method == 'POST': | |
# Check if file upload | |
if 'file' in request.files and request.files['file'].filename: | |
uploaded_file = request.files['file'] | |
try: | |
# Validate file | |
validators.validate_filename(uploaded_file.filename) | |
validators.validate_file_extension(uploaded_file.filename, file_service.ALLOWED_EXTENSIONS) | |
# Validate custom model if needed | |
if model_type == 'custom' and custom_model: | |
validators.validate_model_path(custom_model) | |
# Save file securely | |
file_path = file_service.save_uploaded_file(uploaded_file, current_app.config['UPLOAD_FOLDER']) | |
# Read a small preview of the file | |
preview_char_limit = current_app.config.get('PREVIEW_CHAR_LIMIT', 8096) | |
with open(file_path, 'r', errors='replace') as f: | |
text = f.read(preview_char_limit) | |
try: | |
# Process the file using file service | |
token_data = file_service.process_file_for_tokenization( | |
file_path=file_path, | |
model_id_or_name=model_to_use, | |
preview_char_limit=preview_char_limit, | |
max_display_tokens=current_app.config.get('MAX_DISPLAY_TOKENS', 50000), | |
chunk_size=current_app.config.get('CHUNK_SIZE', 1024 * 1024) | |
) | |
# Clean up the file after processing | |
file_service.cleanup_file(file_path) | |
# If request is AJAX, return JSON | |
if request.headers.get('X-Requested-With') == 'XMLHttpRequest': | |
return jsonify(token_data) | |
except Exception as e: | |
error_message = str(e) | |
file_service.cleanup_file(file_path) | |
if request.headers.get('X-Requested-With') == 'XMLHttpRequest': | |
return jsonify({"error": error_message}), 400 | |
return render_template( | |
'index.html', | |
text=text, | |
token_data=None, | |
models=tokenizer_service.TOKENIZER_MODELS, | |
selected_model=selected_model, | |
custom_model=custom_model, | |
model_type=model_type, | |
error=error_message | |
) | |
except ValidationError as e: | |
error_message = str(e) | |
if request.headers.get('X-Requested-With') == 'XMLHttpRequest': | |
return jsonify({"error": error_message}), 400 | |
return render_template( | |
'index.html', | |
text="", | |
token_data=None, | |
models=tokenizer_service.TOKENIZER_MODELS, | |
selected_model=selected_model, | |
custom_model=custom_model, | |
model_type=model_type, | |
error=error_message | |
) | |
# Regular text processing | |
else: | |
text = request.form.get('text', '') | |
if text: | |
try: | |
# Validate text input | |
validators.validate_text_input(text) | |
# Validate custom model if needed | |
if model_type == 'custom' and custom_model: | |
validators.validate_model_path(custom_model) | |
# Process text using file service | |
token_data = file_service.process_text_for_tokenization( | |
text=text, | |
model_id_or_name=model_to_use, | |
preview_char_limit=current_app.config.get('PREVIEW_CHAR_LIMIT', 8096), | |
max_display_tokens=current_app.config.get('MAX_DISPLAY_TOKENS', 50000) | |
) | |
# If request is AJAX, return JSON | |
if request.headers.get('X-Requested-With') == 'XMLHttpRequest': | |
return jsonify(token_data) | |
except ValidationError as e: | |
error_message = str(e) | |
if request.headers.get('X-Requested-With') == 'XMLHttpRequest': | |
return jsonify({"error": error_message}), 400 | |
return render_template( | |
'index.html', | |
text=text, | |
token_data=None, | |
models=tokenizer_service.TOKENIZER_MODELS, | |
selected_model=selected_model, | |
custom_model=custom_model, | |
model_type=model_type, | |
error=error_message | |
) | |
except Exception as e: | |
error_message = str(e) | |
if request.headers.get('X-Requested-With') == 'XMLHttpRequest': | |
return jsonify({"error": error_message}), 400 | |
return render_template( | |
'index.html', | |
text=text, | |
token_data=None, | |
models=tokenizer_service.TOKENIZER_MODELS, | |
selected_model=selected_model, | |
custom_model=custom_model, | |
model_type=model_type, | |
error=error_message | |
) | |
return render_template( | |
'index.html', | |
text=text, | |
token_data=token_data, | |
models=tokenizer_service.TOKENIZER_MODELS, | |
selected_model=selected_model, | |
custom_model=custom_model, | |
model_type=model_type, | |
error=error_message | |
) | |
def health_check(): | |
"""Basic health check endpoint.""" | |
import time | |
import psutil | |
from flask import __version__ as flask_version | |
try: | |
# Basic application status | |
status = { | |
"status": "healthy", | |
"timestamp": int(time.time()), | |
"version": "1.0.0", | |
"flask_version": flask_version, | |
"uptime": int(time.time()), # Simple uptime since this request | |
} | |
return jsonify(status), 200 | |
except Exception as e: | |
return jsonify({ | |
"status": "unhealthy", | |
"error": str(e), | |
"timestamp": int(time.time()) | |
}), 500 | |
def detailed_health_check(): | |
"""Detailed health check with system and service status.""" | |
import time | |
import psutil | |
import os | |
from flask import __version__ as flask_version | |
try: | |
# System information | |
cpu_percent = psutil.cpu_percent(interval=1) | |
memory = psutil.virtual_memory() | |
disk = psutil.disk_usage('/') | |
# Check tokenizer service | |
tokenizer_status = "healthy" | |
tokenizer_cache_size = len(tokenizer_service.tokenizers) + len(tokenizer_service.custom_tokenizers) | |
# Test basic tokenizer loading | |
try: | |
test_tokenizer, _, error = tokenizer_service.load_tokenizer('gpt2') | |
if error: | |
tokenizer_status = f"warning: {error}" | |
except Exception as e: | |
tokenizer_status = f"error: {str(e)}" | |
# Check upload directory | |
upload_folder = current_app.config.get('UPLOAD_FOLDER', '/tmp') | |
upload_dir_exists = os.path.exists(upload_folder) | |
upload_dir_writable = os.access(upload_folder, os.W_OK) if upload_dir_exists else False | |
status = { | |
"status": "healthy", | |
"timestamp": int(time.time()), | |
"version": "1.0.0", | |
"flask_version": flask_version, | |
"system": { | |
"cpu_percent": round(cpu_percent, 1), | |
"memory": { | |
"total": memory.total, | |
"available": memory.available, | |
"percent": memory.percent, | |
"used": memory.used | |
}, | |
"disk": { | |
"total": disk.total, | |
"used": disk.used, | |
"free": disk.free, | |
"percent": round((disk.used / disk.total) * 100, 1) | |
} | |
}, | |
"services": { | |
"tokenizer_service": { | |
"status": tokenizer_status, | |
"cached_tokenizers": tokenizer_cache_size, | |
"available_models": len(tokenizer_service.TOKENIZER_MODELS) | |
}, | |
"file_service": { | |
"upload_directory": upload_folder, | |
"directory_exists": upload_dir_exists, | |
"directory_writable": upload_dir_writable, | |
"allowed_extensions": list(file_service.ALLOWED_EXTENSIONS) | |
} | |
}, | |
"configuration": { | |
"max_content_length": current_app.config.get('MAX_CONTENT_LENGTH'), | |
"cache_expiration": current_app.config.get('CACHE_EXPIRATION', 3600), | |
"max_display_tokens": current_app.config.get('MAX_DISPLAY_TOKENS', 50000), | |
"preview_char_limit": current_app.config.get('PREVIEW_CHAR_LIMIT', 8096) | |
} | |
} | |
# Determine overall status | |
overall_status = "healthy" | |
if tokenizer_status.startswith("error"): | |
overall_status = "unhealthy" | |
elif tokenizer_status.startswith("warning") or not upload_dir_writable: | |
overall_status = "degraded" | |
status["status"] = overall_status | |
return jsonify(status), 200 if overall_status == "healthy" else 503 | |
except Exception as e: | |
return jsonify({ | |
"status": "unhealthy", | |
"error": str(e), | |
"timestamp": int(time.time()) | |
}), 500 | |
def readiness_check(): | |
"""Readiness check - determines if the application is ready to serve requests.""" | |
try: | |
# Check if core services are ready | |
checks = { | |
"tokenizer_service": False, | |
"file_service": False, | |
"configuration": False | |
} | |
# Test tokenizer service | |
try: | |
test_tokenizer, _, error = tokenizer_service.load_tokenizer('gpt2') | |
checks["tokenizer_service"] = error is None | |
except: | |
checks["tokenizer_service"] = False | |
# Test file service | |
try: | |
upload_folder = current_app.config.get('UPLOAD_FOLDER', '/tmp') | |
checks["file_service"] = os.path.exists(upload_folder) and os.access(upload_folder, os.W_OK) | |
except: | |
checks["file_service"] = False | |
# Check configuration | |
required_configs = ['MAX_CONTENT_LENGTH', 'UPLOAD_FOLDER'] | |
checks["configuration"] = all(current_app.config.get(config) is not None for config in required_configs) | |
all_ready = all(checks.values()) | |
return jsonify({ | |
"ready": all_ready, | |
"checks": checks, | |
"timestamp": int(time.time()) | |
}), 200 if all_ready else 503 | |
except Exception as e: | |
return jsonify({ | |
"ready": False, | |
"error": str(e), | |
"timestamp": int(time.time()) | |
}), 500 |