tokenizers / app /services /file_service.py
bartar's picture
Upload 26 files
d66ab65 verified
"""
File Service - Handles file processing and chunked text analysis
"""
import os
import uuid
from typing import Dict, Any, List, Tuple
from werkzeug.utils import secure_filename
from flask import current_app
from .tokenizer_service import tokenizer_service
from .stats_service import stats_service
class FileService:
"""Service for handling file uploads and processing."""
# Allowed file extensions for security
ALLOWED_EXTENSIONS = {'.txt', '.md', '.py', '.js', '.html', '.css', '.json', '.csv', '.log'}
@staticmethod
def is_allowed_file(filename: str) -> bool:
"""Check if the uploaded file has an allowed extension."""
if not filename:
return False
_, ext = os.path.splitext(filename.lower())
return ext in FileService.ALLOWED_EXTENSIONS
@staticmethod
def generate_secure_filename(original_filename: str) -> str:
"""Generate a secure filename with UUID prefix."""
if not original_filename:
return f"{uuid.uuid4().hex}.txt"
# Secure the filename and add UUID prefix to avoid conflicts
secure_name = secure_filename(original_filename)
name, ext = os.path.splitext(secure_name)
return f"{uuid.uuid4().hex}_{name}{ext}"
@staticmethod
def save_uploaded_file(uploaded_file, upload_folder: str) -> str:
"""
Save uploaded file to the upload folder with a secure filename.
Returns:
str: Path to the saved file
"""
# Ensure upload folder exists
os.makedirs(upload_folder, exist_ok=True)
# Generate secure filename
secure_filename_str = FileService.generate_secure_filename(uploaded_file.filename)
file_path = os.path.join(upload_folder, secure_filename_str)
# Save the file
uploaded_file.save(file_path)
return file_path
@staticmethod
def process_file_for_tokenization(
file_path: str,
model_id_or_name: str,
preview_char_limit: int = 8096,
max_display_tokens: int = 50000,
chunk_size: int = 1024 * 1024
) -> Dict[str, Any]:
"""
Process a file for tokenization with chunked processing for large files.
Args:
file_path: Path to the file to process
model_id_or_name: Tokenizer model to use
preview_char_limit: Character limit for preview display
max_display_tokens: Maximum tokens to display
chunk_size: Size of chunks for processing large files
Returns:
Dict containing tokenization results
"""
# Load tokenizer
tokenizer, tokenizer_info, error = tokenizer_service.load_tokenizer(model_id_or_name)
if error:
raise Exception(error)
# Read the preview for display
with open(file_path, 'r', errors='replace') as f:
preview_text = f.read(preview_char_limit)
# Tokenize preview for display
preview_tokens = tokenizer.tokenize(preview_text)
display_tokens = preview_tokens[:max_display_tokens]
# Process full file for stats in chunks to avoid memory issues
total_tokens = []
token_set = set()
total_length = 0
with open(file_path, 'r', errors='replace') as f:
while True:
chunk = f.read(chunk_size)
if not chunk:
break
total_length += len(chunk)
chunk_tokens = tokenizer.tokenize(chunk)
total_tokens.extend(chunk_tokens)
token_set.update(chunk_tokens)
# Calculate stats using approximation for original text
stats = stats_service.get_token_stats(total_tokens, ' ' * total_length)
# Format tokens for display
token_data = stats_service.format_tokens_for_display(display_tokens, tokenizer)
return {
'tokens': token_data,
'stats': stats,
'display_limit_reached': len(total_tokens) > max_display_tokens,
'total_tokens': len(total_tokens),
'is_full_file': True,
'preview_only': True,
'tokenizer_info': tokenizer_info
}
@staticmethod
def process_text_for_tokenization(
text: str,
model_id_or_name: str,
is_preview: bool = False,
preview_char_limit: int = 8096,
max_display_tokens: int = 50000
) -> Dict[str, Any]:
"""
Process regular text input for tokenization.
Args:
text: Input text to tokenize
model_id_or_name: Tokenizer model to use
is_preview: Whether this is a preview of a larger text
preview_char_limit: Character limit for preview
max_display_tokens: Maximum tokens to display
Returns:
Dict containing tokenization results
"""
# Load tokenizer
tokenizer, tokenizer_info, error = tokenizer_service.load_tokenizer(model_id_or_name)
if error:
raise Exception(error)
# Tokenize full text for stats
all_tokens = tokenizer.tokenize(text)
total_token_count = len(all_tokens)
# For display: if it's a preview, only take first preview_char_limit chars
preview_text = text[:preview_char_limit] if is_preview else text
preview_tokens = tokenizer.tokenize(preview_text)
display_tokens = preview_tokens[:max_display_tokens]
# Calculate stats on full text
stats = stats_service.get_token_stats(all_tokens, text)
# Format tokens for display
token_data = stats_service.format_tokens_for_display(display_tokens, tokenizer)
return {
'tokens': token_data,
'stats': stats,
'display_limit_reached': total_token_count > max_display_tokens and not is_preview,
'total_tokens': total_token_count,
'is_full_file': False,
'preview_only': is_preview,
'tokenizer_info': tokenizer_info
}
@staticmethod
def cleanup_file(file_path: str):
"""Safely remove a file if it exists."""
try:
if os.path.exists(file_path):
os.remove(file_path)
except OSError:
pass # Ignore errors during cleanup
# Global instance
file_service = FileService()