Spaces:
Running
Running
""" | |
File Service - Handles file processing and chunked text analysis | |
""" | |
import os | |
import uuid | |
from typing import Dict, Any, List, Tuple | |
from werkzeug.utils import secure_filename | |
from flask import current_app | |
from .tokenizer_service import tokenizer_service | |
from .stats_service import stats_service | |
class FileService: | |
"""Service for handling file uploads and processing.""" | |
# Allowed file extensions for security | |
ALLOWED_EXTENSIONS = {'.txt', '.md', '.py', '.js', '.html', '.css', '.json', '.csv', '.log'} | |
def is_allowed_file(filename: str) -> bool: | |
"""Check if the uploaded file has an allowed extension.""" | |
if not filename: | |
return False | |
_, ext = os.path.splitext(filename.lower()) | |
return ext in FileService.ALLOWED_EXTENSIONS | |
def generate_secure_filename(original_filename: str) -> str: | |
"""Generate a secure filename with UUID prefix.""" | |
if not original_filename: | |
return f"{uuid.uuid4().hex}.txt" | |
# Secure the filename and add UUID prefix to avoid conflicts | |
secure_name = secure_filename(original_filename) | |
name, ext = os.path.splitext(secure_name) | |
return f"{uuid.uuid4().hex}_{name}{ext}" | |
def save_uploaded_file(uploaded_file, upload_folder: str) -> str: | |
""" | |
Save uploaded file to the upload folder with a secure filename. | |
Returns: | |
str: Path to the saved file | |
""" | |
# Ensure upload folder exists | |
os.makedirs(upload_folder, exist_ok=True) | |
# Generate secure filename | |
secure_filename_str = FileService.generate_secure_filename(uploaded_file.filename) | |
file_path = os.path.join(upload_folder, secure_filename_str) | |
# Save the file | |
uploaded_file.save(file_path) | |
return file_path | |
def process_file_for_tokenization( | |
file_path: str, | |
model_id_or_name: str, | |
preview_char_limit: int = 8096, | |
max_display_tokens: int = 50000, | |
chunk_size: int = 1024 * 1024 | |
) -> Dict[str, Any]: | |
""" | |
Process a file for tokenization with chunked processing for large files. | |
Args: | |
file_path: Path to the file to process | |
model_id_or_name: Tokenizer model to use | |
preview_char_limit: Character limit for preview display | |
max_display_tokens: Maximum tokens to display | |
chunk_size: Size of chunks for processing large files | |
Returns: | |
Dict containing tokenization results | |
""" | |
# Load tokenizer | |
tokenizer, tokenizer_info, error = tokenizer_service.load_tokenizer(model_id_or_name) | |
if error: | |
raise Exception(error) | |
# Read the preview for display | |
with open(file_path, 'r', errors='replace') as f: | |
preview_text = f.read(preview_char_limit) | |
# Tokenize preview for display | |
preview_tokens = tokenizer.tokenize(preview_text) | |
display_tokens = preview_tokens[:max_display_tokens] | |
# Process full file for stats in chunks to avoid memory issues | |
total_tokens = [] | |
token_set = set() | |
total_length = 0 | |
with open(file_path, 'r', errors='replace') as f: | |
while True: | |
chunk = f.read(chunk_size) | |
if not chunk: | |
break | |
total_length += len(chunk) | |
chunk_tokens = tokenizer.tokenize(chunk) | |
total_tokens.extend(chunk_tokens) | |
token_set.update(chunk_tokens) | |
# Calculate stats using approximation for original text | |
stats = stats_service.get_token_stats(total_tokens, ' ' * total_length) | |
# Format tokens for display | |
token_data = stats_service.format_tokens_for_display(display_tokens, tokenizer) | |
return { | |
'tokens': token_data, | |
'stats': stats, | |
'display_limit_reached': len(total_tokens) > max_display_tokens, | |
'total_tokens': len(total_tokens), | |
'is_full_file': True, | |
'preview_only': True, | |
'tokenizer_info': tokenizer_info | |
} | |
def process_text_for_tokenization( | |
text: str, | |
model_id_or_name: str, | |
is_preview: bool = False, | |
preview_char_limit: int = 8096, | |
max_display_tokens: int = 50000 | |
) -> Dict[str, Any]: | |
""" | |
Process regular text input for tokenization. | |
Args: | |
text: Input text to tokenize | |
model_id_or_name: Tokenizer model to use | |
is_preview: Whether this is a preview of a larger text | |
preview_char_limit: Character limit for preview | |
max_display_tokens: Maximum tokens to display | |
Returns: | |
Dict containing tokenization results | |
""" | |
# Load tokenizer | |
tokenizer, tokenizer_info, error = tokenizer_service.load_tokenizer(model_id_or_name) | |
if error: | |
raise Exception(error) | |
# Tokenize full text for stats | |
all_tokens = tokenizer.tokenize(text) | |
total_token_count = len(all_tokens) | |
# For display: if it's a preview, only take first preview_char_limit chars | |
preview_text = text[:preview_char_limit] if is_preview else text | |
preview_tokens = tokenizer.tokenize(preview_text) | |
display_tokens = preview_tokens[:max_display_tokens] | |
# Calculate stats on full text | |
stats = stats_service.get_token_stats(all_tokens, text) | |
# Format tokens for display | |
token_data = stats_service.format_tokens_for_display(display_tokens, tokenizer) | |
return { | |
'tokens': token_data, | |
'stats': stats, | |
'display_limit_reached': total_token_count > max_display_tokens and not is_preview, | |
'total_tokens': total_token_count, | |
'is_full_file': False, | |
'preview_only': is_preview, | |
'tokenizer_info': tokenizer_info | |
} | |
def cleanup_file(file_path: str): | |
"""Safely remove a file if it exists.""" | |
try: | |
if os.path.exists(file_path): | |
os.remove(file_path) | |
except OSError: | |
pass # Ignore errors during cleanup | |
# Global instance | |
file_service = FileService() |