Spaces:
Running
Running
File size: 6,788 Bytes
d66ab65 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 |
"""
File Service - Handles file processing and chunked text analysis
"""
import os
import uuid
from typing import Dict, Any, List, Tuple
from werkzeug.utils import secure_filename
from flask import current_app
from .tokenizer_service import tokenizer_service
from .stats_service import stats_service
class FileService:
"""Service for handling file uploads and processing."""
# Allowed file extensions for security
ALLOWED_EXTENSIONS = {'.txt', '.md', '.py', '.js', '.html', '.css', '.json', '.csv', '.log'}
@staticmethod
def is_allowed_file(filename: str) -> bool:
"""Check if the uploaded file has an allowed extension."""
if not filename:
return False
_, ext = os.path.splitext(filename.lower())
return ext in FileService.ALLOWED_EXTENSIONS
@staticmethod
def generate_secure_filename(original_filename: str) -> str:
"""Generate a secure filename with UUID prefix."""
if not original_filename:
return f"{uuid.uuid4().hex}.txt"
# Secure the filename and add UUID prefix to avoid conflicts
secure_name = secure_filename(original_filename)
name, ext = os.path.splitext(secure_name)
return f"{uuid.uuid4().hex}_{name}{ext}"
@staticmethod
def save_uploaded_file(uploaded_file, upload_folder: str) -> str:
"""
Save uploaded file to the upload folder with a secure filename.
Returns:
str: Path to the saved file
"""
# Ensure upload folder exists
os.makedirs(upload_folder, exist_ok=True)
# Generate secure filename
secure_filename_str = FileService.generate_secure_filename(uploaded_file.filename)
file_path = os.path.join(upload_folder, secure_filename_str)
# Save the file
uploaded_file.save(file_path)
return file_path
@staticmethod
def process_file_for_tokenization(
file_path: str,
model_id_or_name: str,
preview_char_limit: int = 8096,
max_display_tokens: int = 50000,
chunk_size: int = 1024 * 1024
) -> Dict[str, Any]:
"""
Process a file for tokenization with chunked processing for large files.
Args:
file_path: Path to the file to process
model_id_or_name: Tokenizer model to use
preview_char_limit: Character limit for preview display
max_display_tokens: Maximum tokens to display
chunk_size: Size of chunks for processing large files
Returns:
Dict containing tokenization results
"""
# Load tokenizer
tokenizer, tokenizer_info, error = tokenizer_service.load_tokenizer(model_id_or_name)
if error:
raise Exception(error)
# Read the preview for display
with open(file_path, 'r', errors='replace') as f:
preview_text = f.read(preview_char_limit)
# Tokenize preview for display
preview_tokens = tokenizer.tokenize(preview_text)
display_tokens = preview_tokens[:max_display_tokens]
# Process full file for stats in chunks to avoid memory issues
total_tokens = []
token_set = set()
total_length = 0
with open(file_path, 'r', errors='replace') as f:
while True:
chunk = f.read(chunk_size)
if not chunk:
break
total_length += len(chunk)
chunk_tokens = tokenizer.tokenize(chunk)
total_tokens.extend(chunk_tokens)
token_set.update(chunk_tokens)
# Calculate stats using approximation for original text
stats = stats_service.get_token_stats(total_tokens, ' ' * total_length)
# Format tokens for display
token_data = stats_service.format_tokens_for_display(display_tokens, tokenizer)
return {
'tokens': token_data,
'stats': stats,
'display_limit_reached': len(total_tokens) > max_display_tokens,
'total_tokens': len(total_tokens),
'is_full_file': True,
'preview_only': True,
'tokenizer_info': tokenizer_info
}
@staticmethod
def process_text_for_tokenization(
text: str,
model_id_or_name: str,
is_preview: bool = False,
preview_char_limit: int = 8096,
max_display_tokens: int = 50000
) -> Dict[str, Any]:
"""
Process regular text input for tokenization.
Args:
text: Input text to tokenize
model_id_or_name: Tokenizer model to use
is_preview: Whether this is a preview of a larger text
preview_char_limit: Character limit for preview
max_display_tokens: Maximum tokens to display
Returns:
Dict containing tokenization results
"""
# Load tokenizer
tokenizer, tokenizer_info, error = tokenizer_service.load_tokenizer(model_id_or_name)
if error:
raise Exception(error)
# Tokenize full text for stats
all_tokens = tokenizer.tokenize(text)
total_token_count = len(all_tokens)
# For display: if it's a preview, only take first preview_char_limit chars
preview_text = text[:preview_char_limit] if is_preview else text
preview_tokens = tokenizer.tokenize(preview_text)
display_tokens = preview_tokens[:max_display_tokens]
# Calculate stats on full text
stats = stats_service.get_token_stats(all_tokens, text)
# Format tokens for display
token_data = stats_service.format_tokens_for_display(display_tokens, tokenizer)
return {
'tokens': token_data,
'stats': stats,
'display_limit_reached': total_token_count > max_display_tokens and not is_preview,
'total_tokens': total_token_count,
'is_full_file': False,
'preview_only': is_preview,
'tokenizer_info': tokenizer_info
}
@staticmethod
def cleanup_file(file_path: str):
"""Safely remove a file if it exists."""
try:
if os.path.exists(file_path):
os.remove(file_path)
except OSError:
pass # Ignore errors during cleanup
# Global instance
file_service = FileService() |