Spaces:
Build error
Build error
"""File generation and management utilities for infrastructure providers.""" | |
import logging | |
import os | |
import tempfile | |
import time | |
from pathlib import Path | |
from typing import Optional, Union | |
import hashlib | |
logger = logging.getLogger(__name__) | |
class FileManager: | |
"""Utility class for managing temporary files and directories.""" | |
def __init__(self, base_dir: Optional[Union[str, Path]] = None): | |
""" | |
Initialize the file manager. | |
Args: | |
base_dir: Base directory for file operations (defaults to system temp) | |
""" | |
if base_dir: | |
self.base_dir = Path(base_dir) | |
else: | |
self.base_dir = Path(tempfile.gettempdir()) / "tts_app" | |
self.base_dir.mkdir(exist_ok=True) | |
logger.info(f"FileManager initialized with base directory: {self.base_dir}") | |
def create_temp_file(self, suffix: str = ".tmp", prefix: str = "temp", content: bytes = None) -> Path: | |
""" | |
Create a temporary file. | |
Args: | |
suffix: File suffix/extension | |
prefix: File prefix | |
content: Optional content to write to the file | |
Returns: | |
Path: Path to the created temporary file | |
""" | |
timestamp = int(time.time() * 1000) | |
filename = f"{prefix}_{timestamp}{suffix}" | |
file_path = self.base_dir / filename | |
if content: | |
with open(file_path, 'wb') as f: | |
f.write(content) | |
else: | |
file_path.touch() | |
logger.info(f"Created temporary file: {file_path}") | |
return file_path | |
def create_unique_filename(self, base_name: str, extension: str = "", content_hash: bool = False, content: bytes = None) -> str: | |
""" | |
Create a unique filename. | |
Args: | |
base_name: Base name for the file | |
extension: File extension (with or without dot) | |
content_hash: Whether to include content hash in filename | |
content: Content to hash (required if content_hash=True) | |
Returns: | |
str: Unique filename | |
""" | |
timestamp = int(time.time() * 1000) | |
if not extension.startswith('.') and extension: | |
extension = '.' + extension | |
filename = f"{base_name}_{timestamp}" | |
if content_hash and content: | |
hash_obj = hashlib.md5(content) | |
content_hash_str = hash_obj.hexdigest()[:8] | |
filename += f"_{content_hash_str}" | |
filename += extension | |
return filename | |
def save_audio_file(self, audio_data: bytes, format: str = "wav", prefix: str = "audio") -> Path: | |
""" | |
Save audio data to a file. | |
Args: | |
audio_data: Raw audio data | |
format: Audio format (wav, mp3, etc.) | |
prefix: Filename prefix | |
Returns: | |
Path: Path to the saved audio file | |
""" | |
if not format.startswith('.'): | |
format = '.' + format | |
filename = self.create_unique_filename(prefix, format, content_hash=True, content=audio_data) | |
file_path = self.base_dir / filename | |
with open(file_path, 'wb') as f: | |
f.write(audio_data) | |
logger.info(f"Saved audio file: {file_path} ({len(audio_data)} bytes)") | |
return file_path | |
def save_text_file(self, text_content: str, encoding: str = "utf-8", prefix: str = "text") -> Path: | |
""" | |
Save text content to a file. | |
Args: | |
text_content: Text content to save | |
encoding: Text encoding | |
prefix: Filename prefix | |
Returns: | |
Path: Path to the saved text file | |
""" | |
filename = self.create_unique_filename(prefix, ".txt") | |
file_path = self.base_dir / filename | |
with open(file_path, 'w', encoding=encoding) as f: | |
f.write(text_content) | |
logger.info(f"Saved text file: {file_path} ({len(text_content)} characters)") | |
return file_path | |
def cleanup_file(self, file_path: Union[str, Path]) -> bool: | |
""" | |
Clean up a single file. | |
Args: | |
file_path: Path to the file to clean up | |
Returns: | |
bool: True if file was successfully deleted, False otherwise | |
""" | |
try: | |
path = Path(file_path) | |
if path.exists() and path.is_file(): | |
path.unlink() | |
logger.info(f"Cleaned up file: {path}") | |
return True | |
return False | |
except Exception as e: | |
logger.warning(f"Failed to cleanup file {file_path}: {str(e)}") | |
return False | |
def cleanup_old_files(self, max_age_hours: int = 24, pattern: str = "*") -> int: | |
""" | |
Clean up old files in the base directory. | |
Args: | |
max_age_hours: Maximum age of files to keep in hours | |
pattern: File pattern to match (glob pattern) | |
Returns: | |
int: Number of files cleaned up | |
""" | |
try: | |
current_time = time.time() | |
max_age_seconds = max_age_hours * 3600 | |
cleaned_count = 0 | |
for file_path in self.base_dir.glob(pattern): | |
if file_path.is_file(): | |
file_age = current_time - file_path.stat().st_mtime | |
if file_age > max_age_seconds: | |
if self.cleanup_file(file_path): | |
cleaned_count += 1 | |
if cleaned_count > 0: | |
logger.info(f"Cleaned up {cleaned_count} old files") | |
return cleaned_count | |
except Exception as e: | |
logger.error(f"Failed to cleanup old files: {str(e)}") | |
return 0 | |
def get_file_info(self, file_path: Union[str, Path]) -> dict: | |
""" | |
Get information about a file. | |
Args: | |
file_path: Path to the file | |
Returns: | |
dict: File information | |
""" | |
try: | |
path = Path(file_path) | |
if not path.exists(): | |
return {'exists': False} | |
stat = path.stat() | |
return { | |
'exists': True, | |
'size_bytes': stat.st_size, | |
'created_time': stat.st_ctime, | |
'modified_time': stat.st_mtime, | |
'is_file': path.is_file(), | |
'is_directory': path.is_dir(), | |
'extension': path.suffix, | |
'name': path.name, | |
'parent': str(path.parent) | |
} | |
except Exception as e: | |
logger.error(f"Failed to get file info for {file_path}: {str(e)}") | |
return {'exists': False, 'error': str(e)} | |
def ensure_directory(self, dir_path: Union[str, Path]) -> Path: | |
""" | |
Ensure a directory exists, creating it if necessary. | |
Args: | |
dir_path: Path to the directory | |
Returns: | |
Path: Path to the directory | |
""" | |
path = Path(dir_path) | |
path.mkdir(parents=True, exist_ok=True) | |
logger.info(f"Ensured directory exists: {path}") | |
return path | |
def get_disk_usage(self) -> dict: | |
""" | |
Get disk usage information for the base directory. | |
Returns: | |
dict: Disk usage information | |
""" | |
try: | |
total_size = 0 | |
file_count = 0 | |
for file_path in self.base_dir.rglob('*'): | |
if file_path.is_file(): | |
total_size += file_path.stat().st_size | |
file_count += 1 | |
return { | |
'base_directory': str(self.base_dir), | |
'total_size_bytes': total_size, | |
'total_size_mb': total_size / (1024 * 1024), | |
'file_count': file_count | |
} | |
except Exception as e: | |
logger.error(f"Failed to get disk usage: {str(e)}") | |
return {'error': str(e)} | |
class AudioFileGenerator: | |
"""Utility class for generating audio files from raw audio data.""" | |
def save_wav_file(audio_data: bytes, sample_rate: int, file_path: Union[str, Path], channels: int = 1, sample_width: int = 2) -> Path: | |
""" | |
Save raw audio data as a WAV file. | |
Args: | |
audio_data: Raw audio data | |
sample_rate: Sample rate in Hz | |
file_path: Output file path | |
channels: Number of audio channels | |
sample_width: Sample width in bytes | |
Returns: | |
Path: Path to the saved WAV file | |
""" | |
try: | |
import wave | |
path = Path(file_path) | |
with wave.open(str(path), 'wb') as wav_file: | |
wav_file.setnchannels(channels) | |
wav_file.setsampwidth(sample_width) | |
wav_file.setframerate(sample_rate) | |
wav_file.writeframes(audio_data) | |
logger.info(f"Saved WAV file: {path} (sample_rate={sample_rate}, channels={channels})") | |
return path | |
except Exception as e: | |
logger.error(f"Failed to save WAV file: {str(e)}") | |
raise | |
def convert_numpy_to_wav(audio_array, sample_rate: int, file_path: Union[str, Path]) -> Path: | |
""" | |
Convert numpy array to WAV file. | |
Args: | |
audio_array: Numpy array containing audio data | |
sample_rate: Sample rate in Hz | |
file_path: Output file path | |
Returns: | |
Path: Path to the saved WAV file | |
""" | |
try: | |
import numpy as np | |
import soundfile as sf | |
path = Path(file_path) | |
# Ensure audio is in the correct format | |
if audio_array.dtype != np.float32: | |
audio_array = audio_array.astype(np.float32) | |
# Normalize if needed | |
if np.max(np.abs(audio_array)) > 1.0: | |
audio_array = audio_array / np.max(np.abs(audio_array)) | |
sf.write(str(path), audio_array, sample_rate) | |
logger.info(f"Converted numpy array to WAV: {path}") | |
return path | |
except ImportError: | |
logger.error("soundfile library not available for numpy conversion") | |
raise | |
except Exception as e: | |
logger.error(f"Failed to convert numpy array to WAV: {str(e)}") | |
raise | |
class ErrorHandler: | |
"""Utility class for handling and logging errors in infrastructure providers.""" | |
def __init__(self, provider_name: str): | |
""" | |
Initialize the error handler. | |
Args: | |
provider_name: Name of the provider for error context | |
""" | |
self.provider_name = provider_name | |
self.logger = logging.getLogger(f"{__name__}.{provider_name}") | |
def handle_error(self, error: Exception, context: str = "", reraise_as: type = None) -> None: | |
""" | |
Handle an error with proper logging and optional re-raising. | |
Args: | |
error: The original error | |
context: Additional context about when the error occurred | |
reraise_as: Exception type to re-raise as (if None, re-raises original) | |
""" | |
error_msg = f"{self.provider_name} error" | |
if context: | |
error_msg += f" during {context}" | |
error_msg += f": {str(error)}" | |
self.logger.error(error_msg, exception=error) | |
if reraise_as: | |
raise reraise_as(error_msg) from error | |
else: | |
raise | |
def log_warning(self, message: str, context: str = "") -> None: | |
""" | |
Log a warning message. | |
Args: | |
message: Warning message | |
context: Additional context | |
""" | |
warning_msg = f"{self.provider_name}" | |
if context: | |
warning_msg += f" ({context})" | |
warning_msg += f": {message}" | |
self.logger.warning(warning_msg) | |
def log_info(self, message: str, context: str = "") -> None: | |
""" | |
Log an info message. | |
Args: | |
message: Info message | |
context: Additional context | |
""" | |
info_msg = f"{self.provider_name}" | |
if context: | |
info_msg += f" ({context})" | |
info_msg += f": {message}" | |
self.logger.info(info_msg) | |
def log_debug(self, message: str, context: str = "") -> None: | |
""" | |
Log a debug message. | |
Args: | |
message: Debug message | |
context: Additional context | |
""" | |
debug_msg = f"{self.provider_name}" | |
if context: | |
debug_msg += f" ({context})" | |
debug_msg += f": {message}" | |
self.logger.info(debug_msg) |