import os from flask import Blueprint, request, render_template, jsonify, current_app # Import services from .services.tokenizer_service import tokenizer_service from .services.file_service import file_service from .utils.validators import validators, ValidationError # Create Blueprint main_bp = Blueprint('main', __name__) @main_bp.route('/tokenizer-info', methods=['GET']) def tokenizer_info(): """Endpoint to get tokenizer information without processing text.""" model_id = request.args.get('model_id', '') is_custom = request.args.get('is_custom', 'false').lower() == 'true' if not model_id: return jsonify({"error": "No model ID provided"}), 400 try: # Validate custom model path if it's a custom model if is_custom: try: validators.validate_model_path(model_id) except ValidationError as e: return jsonify({"error": str(e)}), 400 # For predefined models, use the model name from the dictionary if not is_custom and tokenizer_service.is_predefined_model(model_id): model_id_or_name = model_id else: # For custom models, use the model ID directly model_id_or_name = model_id # Load the tokenizer and get info tokenizer, info, error = tokenizer_service.load_tokenizer(model_id_or_name) if error: return jsonify({"error": error}), 400 return jsonify(info) except Exception as e: return jsonify({"error": f"Failed to get tokenizer info: {str(e)}"}), 500 @main_bp.route('/', methods=['GET', 'POST']) def index(): text = "" token_data = None error_message = "" selected_model = request.args.get('model', request.form.get('model', 'qwen3')) custom_model = request.args.get('custom_model', request.form.get('custom_model', '')) model_type = request.args.get('model_type', request.form.get('model_type', 'predefined')) # Determine which model to use based on model_type model_to_use = selected_model if model_type == 'predefined' else custom_model if request.method == 'POST': # Check if file upload if 'file' in request.files and request.files['file'].filename: uploaded_file = request.files['file'] try: # Validate file validators.validate_filename(uploaded_file.filename) validators.validate_file_extension(uploaded_file.filename, file_service.ALLOWED_EXTENSIONS) # Validate custom model if needed if model_type == 'custom' and custom_model: validators.validate_model_path(custom_model) # Save file securely file_path = file_service.save_uploaded_file(uploaded_file, current_app.config['UPLOAD_FOLDER']) # Read a small preview of the file preview_char_limit = current_app.config.get('PREVIEW_CHAR_LIMIT', 8096) with open(file_path, 'r', errors='replace') as f: text = f.read(preview_char_limit) try: # Process the file using file service token_data = file_service.process_file_for_tokenization( file_path=file_path, model_id_or_name=model_to_use, preview_char_limit=preview_char_limit, max_display_tokens=current_app.config.get('MAX_DISPLAY_TOKENS', 50000), chunk_size=current_app.config.get('CHUNK_SIZE', 1024 * 1024) ) # Clean up the file after processing file_service.cleanup_file(file_path) # If request is AJAX, return JSON if request.headers.get('X-Requested-With') == 'XMLHttpRequest': return jsonify(token_data) except Exception as e: error_message = str(e) file_service.cleanup_file(file_path) if request.headers.get('X-Requested-With') == 'XMLHttpRequest': return jsonify({"error": error_message}), 400 return render_template( 'index.html', text=text, token_data=None, models=tokenizer_service.TOKENIZER_MODELS, selected_model=selected_model, custom_model=custom_model, model_type=model_type, error=error_message ) except ValidationError as e: error_message = str(e) if request.headers.get('X-Requested-With') == 'XMLHttpRequest': return jsonify({"error": error_message}), 400 return render_template( 'index.html', text="", token_data=None, models=tokenizer_service.TOKENIZER_MODELS, selected_model=selected_model, custom_model=custom_model, model_type=model_type, error=error_message ) # Regular text processing else: text = request.form.get('text', '') if text: try: # Validate text input validators.validate_text_input(text) # Validate custom model if needed if model_type == 'custom' and custom_model: validators.validate_model_path(custom_model) # Process text using file service token_data = file_service.process_text_for_tokenization( text=text, model_id_or_name=model_to_use, preview_char_limit=current_app.config.get('PREVIEW_CHAR_LIMIT', 8096), max_display_tokens=current_app.config.get('MAX_DISPLAY_TOKENS', 50000) ) # If request is AJAX, return JSON if request.headers.get('X-Requested-With') == 'XMLHttpRequest': return jsonify(token_data) except ValidationError as e: error_message = str(e) if request.headers.get('X-Requested-With') == 'XMLHttpRequest': return jsonify({"error": error_message}), 400 return render_template( 'index.html', text=text, token_data=None, models=tokenizer_service.TOKENIZER_MODELS, selected_model=selected_model, custom_model=custom_model, model_type=model_type, error=error_message ) except Exception as e: error_message = str(e) if request.headers.get('X-Requested-With') == 'XMLHttpRequest': return jsonify({"error": error_message}), 400 return render_template( 'index.html', text=text, token_data=None, models=tokenizer_service.TOKENIZER_MODELS, selected_model=selected_model, custom_model=custom_model, model_type=model_type, error=error_message ) return render_template( 'index.html', text=text, token_data=token_data, models=tokenizer_service.TOKENIZER_MODELS, selected_model=selected_model, custom_model=custom_model, model_type=model_type, error=error_message ) @main_bp.route('/health', methods=['GET']) def health_check(): """Basic health check endpoint.""" import time import psutil from flask import __version__ as flask_version try: # Basic application status status = { "status": "healthy", "timestamp": int(time.time()), "version": "1.0.0", "flask_version": flask_version, "uptime": int(time.time()), # Simple uptime since this request } return jsonify(status), 200 except Exception as e: return jsonify({ "status": "unhealthy", "error": str(e), "timestamp": int(time.time()) }), 500 @main_bp.route('/health/detailed', methods=['GET']) def detailed_health_check(): """Detailed health check with system and service status.""" import time import psutil import os from flask import __version__ as flask_version try: # System information cpu_percent = psutil.cpu_percent(interval=1) memory = psutil.virtual_memory() disk = psutil.disk_usage('/') # Check tokenizer service tokenizer_status = "healthy" tokenizer_cache_size = len(tokenizer_service.tokenizers) + len(tokenizer_service.custom_tokenizers) # Test basic tokenizer loading try: test_tokenizer, _, error = tokenizer_service.load_tokenizer('gpt2') if error: tokenizer_status = f"warning: {error}" except Exception as e: tokenizer_status = f"error: {str(e)}" # Check upload directory upload_folder = current_app.config.get('UPLOAD_FOLDER', '/tmp') upload_dir_exists = os.path.exists(upload_folder) upload_dir_writable = os.access(upload_folder, os.W_OK) if upload_dir_exists else False status = { "status": "healthy", "timestamp": int(time.time()), "version": "1.0.0", "flask_version": flask_version, "system": { "cpu_percent": round(cpu_percent, 1), "memory": { "total": memory.total, "available": memory.available, "percent": memory.percent, "used": memory.used }, "disk": { "total": disk.total, "used": disk.used, "free": disk.free, "percent": round((disk.used / disk.total) * 100, 1) } }, "services": { "tokenizer_service": { "status": tokenizer_status, "cached_tokenizers": tokenizer_cache_size, "available_models": len(tokenizer_service.TOKENIZER_MODELS) }, "file_service": { "upload_directory": upload_folder, "directory_exists": upload_dir_exists, "directory_writable": upload_dir_writable, "allowed_extensions": list(file_service.ALLOWED_EXTENSIONS) } }, "configuration": { "max_content_length": current_app.config.get('MAX_CONTENT_LENGTH'), "cache_expiration": current_app.config.get('CACHE_EXPIRATION', 3600), "max_display_tokens": current_app.config.get('MAX_DISPLAY_TOKENS', 50000), "preview_char_limit": current_app.config.get('PREVIEW_CHAR_LIMIT', 8096) } } # Determine overall status overall_status = "healthy" if tokenizer_status.startswith("error"): overall_status = "unhealthy" elif tokenizer_status.startswith("warning") or not upload_dir_writable: overall_status = "degraded" status["status"] = overall_status return jsonify(status), 200 if overall_status == "healthy" else 503 except Exception as e: return jsonify({ "status": "unhealthy", "error": str(e), "timestamp": int(time.time()) }), 500 @main_bp.route('/health/ready', methods=['GET']) def readiness_check(): """Readiness check - determines if the application is ready to serve requests.""" try: # Check if core services are ready checks = { "tokenizer_service": False, "file_service": False, "configuration": False } # Test tokenizer service try: test_tokenizer, _, error = tokenizer_service.load_tokenizer('gpt2') checks["tokenizer_service"] = error is None except: checks["tokenizer_service"] = False # Test file service try: upload_folder = current_app.config.get('UPLOAD_FOLDER', '/tmp') checks["file_service"] = os.path.exists(upload_folder) and os.access(upload_folder, os.W_OK) except: checks["file_service"] = False # Check configuration required_configs = ['MAX_CONTENT_LENGTH', 'UPLOAD_FOLDER'] checks["configuration"] = all(current_app.config.get(config) is not None for config in required_configs) all_ready = all(checks.values()) return jsonify({ "ready": all_ready, "checks": checks, "timestamp": int(time.time()) }), 200 if all_ready else 503 except Exception as e: return jsonify({ "ready": False, "error": str(e), "timestamp": int(time.time()) }), 500