""" File Service - Handles file processing and chunked text analysis """ import os import uuid from typing import Dict, Any, List, Tuple from werkzeug.utils import secure_filename from flask import current_app from .tokenizer_service import tokenizer_service from .stats_service import stats_service class FileService: """Service for handling file uploads and processing.""" # Allowed file extensions for security ALLOWED_EXTENSIONS = {'.txt', '.md', '.py', '.js', '.html', '.css', '.json', '.csv', '.log'} @staticmethod def is_allowed_file(filename: str) -> bool: """Check if the uploaded file has an allowed extension.""" if not filename: return False _, ext = os.path.splitext(filename.lower()) return ext in FileService.ALLOWED_EXTENSIONS @staticmethod def generate_secure_filename(original_filename: str) -> str: """Generate a secure filename with UUID prefix.""" if not original_filename: return f"{uuid.uuid4().hex}.txt" # Secure the filename and add UUID prefix to avoid conflicts secure_name = secure_filename(original_filename) name, ext = os.path.splitext(secure_name) return f"{uuid.uuid4().hex}_{name}{ext}" @staticmethod def save_uploaded_file(uploaded_file, upload_folder: str) -> str: """ Save uploaded file to the upload folder with a secure filename. Returns: str: Path to the saved file """ # Ensure upload folder exists os.makedirs(upload_folder, exist_ok=True) # Generate secure filename secure_filename_str = FileService.generate_secure_filename(uploaded_file.filename) file_path = os.path.join(upload_folder, secure_filename_str) # Save the file uploaded_file.save(file_path) return file_path @staticmethod def process_file_for_tokenization( file_path: str, model_id_or_name: str, preview_char_limit: int = 8096, max_display_tokens: int = 50000, chunk_size: int = 1024 * 1024 ) -> Dict[str, Any]: """ Process a file for tokenization with chunked processing for large files. Args: file_path: Path to the file to process model_id_or_name: Tokenizer model to use preview_char_limit: Character limit for preview display max_display_tokens: Maximum tokens to display chunk_size: Size of chunks for processing large files Returns: Dict containing tokenization results """ # Load tokenizer tokenizer, tokenizer_info, error = tokenizer_service.load_tokenizer(model_id_or_name) if error: raise Exception(error) # Read the preview for display with open(file_path, 'r', errors='replace') as f: preview_text = f.read(preview_char_limit) # Tokenize preview for display preview_tokens = tokenizer.tokenize(preview_text) display_tokens = preview_tokens[:max_display_tokens] # Process full file for stats in chunks to avoid memory issues total_tokens = [] token_set = set() total_length = 0 with open(file_path, 'r', errors='replace') as f: while True: chunk = f.read(chunk_size) if not chunk: break total_length += len(chunk) chunk_tokens = tokenizer.tokenize(chunk) total_tokens.extend(chunk_tokens) token_set.update(chunk_tokens) # Calculate stats using approximation for original text stats = stats_service.get_token_stats(total_tokens, ' ' * total_length) # Format tokens for display token_data = stats_service.format_tokens_for_display(display_tokens, tokenizer) return { 'tokens': token_data, 'stats': stats, 'display_limit_reached': len(total_tokens) > max_display_tokens, 'total_tokens': len(total_tokens), 'is_full_file': True, 'preview_only': True, 'tokenizer_info': tokenizer_info } @staticmethod def process_text_for_tokenization( text: str, model_id_or_name: str, is_preview: bool = False, preview_char_limit: int = 8096, max_display_tokens: int = 50000 ) -> Dict[str, Any]: """ Process regular text input for tokenization. Args: text: Input text to tokenize model_id_or_name: Tokenizer model to use is_preview: Whether this is a preview of a larger text preview_char_limit: Character limit for preview max_display_tokens: Maximum tokens to display Returns: Dict containing tokenization results """ # Load tokenizer tokenizer, tokenizer_info, error = tokenizer_service.load_tokenizer(model_id_or_name) if error: raise Exception(error) # Tokenize full text for stats all_tokens = tokenizer.tokenize(text) total_token_count = len(all_tokens) # For display: if it's a preview, only take first preview_char_limit chars preview_text = text[:preview_char_limit] if is_preview else text preview_tokens = tokenizer.tokenize(preview_text) display_tokens = preview_tokens[:max_display_tokens] # Calculate stats on full text stats = stats_service.get_token_stats(all_tokens, text) # Format tokens for display token_data = stats_service.format_tokens_for_display(display_tokens, tokenizer) return { 'tokens': token_data, 'stats': stats, 'display_limit_reached': total_token_count > max_display_tokens and not is_preview, 'total_tokens': total_token_count, 'is_full_file': False, 'preview_only': is_preview, 'tokenizer_info': tokenizer_info } @staticmethod def cleanup_file(file_path: str): """Safely remove a file if it exists.""" try: if os.path.exists(file_path): os.remove(file_path) except OSError: pass # Ignore errors during cleanup # Global instance file_service = FileService()