"""File generation and management utilities for infrastructure providers.""" import logging import os import tempfile import time from pathlib import Path from typing import Optional, Union import hashlib logger = logging.getLogger(__name__) class FileManager: """Utility class for managing temporary files and directories.""" def __init__(self, base_dir: Optional[Union[str, Path]] = None): """ Initialize the file manager. Args: base_dir: Base directory for file operations (defaults to system temp) """ if base_dir: self.base_dir = Path(base_dir) else: self.base_dir = Path(tempfile.gettempdir()) / "tts_app" self.base_dir.mkdir(exist_ok=True) logger.info(f"FileManager initialized with base directory: {self.base_dir}") def create_temp_file(self, suffix: str = ".tmp", prefix: str = "temp", content: bytes = None) -> Path: """ Create a temporary file. Args: suffix: File suffix/extension prefix: File prefix content: Optional content to write to the file Returns: Path: Path to the created temporary file """ timestamp = int(time.time() * 1000) filename = f"{prefix}_{timestamp}{suffix}" file_path = self.base_dir / filename if content: with open(file_path, 'wb') as f: f.write(content) else: file_path.touch() logger.info(f"Created temporary file: {file_path}") return file_path def create_unique_filename(self, base_name: str, extension: str = "", content_hash: bool = False, content: bytes = None) -> str: """ Create a unique filename. Args: base_name: Base name for the file extension: File extension (with or without dot) content_hash: Whether to include content hash in filename content: Content to hash (required if content_hash=True) Returns: str: Unique filename """ timestamp = int(time.time() * 1000) if not extension.startswith('.') and extension: extension = '.' + extension filename = f"{base_name}_{timestamp}" if content_hash and content: hash_obj = hashlib.md5(content) content_hash_str = hash_obj.hexdigest()[:8] filename += f"_{content_hash_str}" filename += extension return filename def save_audio_file(self, audio_data: bytes, format: str = "wav", prefix: str = "audio") -> Path: """ Save audio data to a file. Args: audio_data: Raw audio data format: Audio format (wav, mp3, etc.) prefix: Filename prefix Returns: Path: Path to the saved audio file """ if not format.startswith('.'): format = '.' + format filename = self.create_unique_filename(prefix, format, content_hash=True, content=audio_data) file_path = self.base_dir / filename with open(file_path, 'wb') as f: f.write(audio_data) logger.info(f"Saved audio file: {file_path} ({len(audio_data)} bytes)") return file_path def save_text_file(self, text_content: str, encoding: str = "utf-8", prefix: str = "text") -> Path: """ Save text content to a file. Args: text_content: Text content to save encoding: Text encoding prefix: Filename prefix Returns: Path: Path to the saved text file """ filename = self.create_unique_filename(prefix, ".txt") file_path = self.base_dir / filename with open(file_path, 'w', encoding=encoding) as f: f.write(text_content) logger.info(f"Saved text file: {file_path} ({len(text_content)} characters)") return file_path def cleanup_file(self, file_path: Union[str, Path]) -> bool: """ Clean up a single file. Args: file_path: Path to the file to clean up Returns: bool: True if file was successfully deleted, False otherwise """ try: path = Path(file_path) if path.exists() and path.is_file(): path.unlink() logger.info(f"Cleaned up file: {path}") return True return False except Exception as e: logger.warning(f"Failed to cleanup file {file_path}: {str(e)}") return False def cleanup_old_files(self, max_age_hours: int = 24, pattern: str = "*") -> int: """ Clean up old files in the base directory. Args: max_age_hours: Maximum age of files to keep in hours pattern: File pattern to match (glob pattern) Returns: int: Number of files cleaned up """ try: current_time = time.time() max_age_seconds = max_age_hours * 3600 cleaned_count = 0 for file_path in self.base_dir.glob(pattern): if file_path.is_file(): file_age = current_time - file_path.stat().st_mtime if file_age > max_age_seconds: if self.cleanup_file(file_path): cleaned_count += 1 if cleaned_count > 0: logger.info(f"Cleaned up {cleaned_count} old files") return cleaned_count except Exception as e: logger.error(f"Failed to cleanup old files: {str(e)}") return 0 def get_file_info(self, file_path: Union[str, Path]) -> dict: """ Get information about a file. Args: file_path: Path to the file Returns: dict: File information """ try: path = Path(file_path) if not path.exists(): return {'exists': False} stat = path.stat() return { 'exists': True, 'size_bytes': stat.st_size, 'created_time': stat.st_ctime, 'modified_time': stat.st_mtime, 'is_file': path.is_file(), 'is_directory': path.is_dir(), 'extension': path.suffix, 'name': path.name, 'parent': str(path.parent) } except Exception as e: logger.error(f"Failed to get file info for {file_path}: {str(e)}") return {'exists': False, 'error': str(e)} def ensure_directory(self, dir_path: Union[str, Path]) -> Path: """ Ensure a directory exists, creating it if necessary. Args: dir_path: Path to the directory Returns: Path: Path to the directory """ path = Path(dir_path) path.mkdir(parents=True, exist_ok=True) logger.info(f"Ensured directory exists: {path}") return path def get_disk_usage(self) -> dict: """ Get disk usage information for the base directory. Returns: dict: Disk usage information """ try: total_size = 0 file_count = 0 for file_path in self.base_dir.rglob('*'): if file_path.is_file(): total_size += file_path.stat().st_size file_count += 1 return { 'base_directory': str(self.base_dir), 'total_size_bytes': total_size, 'total_size_mb': total_size / (1024 * 1024), 'file_count': file_count } except Exception as e: logger.error(f"Failed to get disk usage: {str(e)}") return {'error': str(e)} class AudioFileGenerator: """Utility class for generating audio files from raw audio data.""" @staticmethod def save_wav_file(audio_data: bytes, sample_rate: int, file_path: Union[str, Path], channels: int = 1, sample_width: int = 2) -> Path: """ Save raw audio data as a WAV file. Args: audio_data: Raw audio data sample_rate: Sample rate in Hz file_path: Output file path channels: Number of audio channels sample_width: Sample width in bytes Returns: Path: Path to the saved WAV file """ try: import wave path = Path(file_path) with wave.open(str(path), 'wb') as wav_file: wav_file.setnchannels(channels) wav_file.setsampwidth(sample_width) wav_file.setframerate(sample_rate) wav_file.writeframes(audio_data) logger.info(f"Saved WAV file: {path} (sample_rate={sample_rate}, channels={channels})") return path except Exception as e: logger.error(f"Failed to save WAV file: {str(e)}") raise @staticmethod def convert_numpy_to_wav(audio_array, sample_rate: int, file_path: Union[str, Path]) -> Path: """ Convert numpy array to WAV file. Args: audio_array: Numpy array containing audio data sample_rate: Sample rate in Hz file_path: Output file path Returns: Path: Path to the saved WAV file """ try: import numpy as np import soundfile as sf path = Path(file_path) # Ensure audio is in the correct format if audio_array.dtype != np.float32: audio_array = audio_array.astype(np.float32) # Normalize if needed if np.max(np.abs(audio_array)) > 1.0: audio_array = audio_array / np.max(np.abs(audio_array)) sf.write(str(path), audio_array, sample_rate) logger.info(f"Converted numpy array to WAV: {path}") return path except ImportError: logger.error("soundfile library not available for numpy conversion") raise except Exception as e: logger.error(f"Failed to convert numpy array to WAV: {str(e)}") raise class ErrorHandler: """Utility class for handling and logging errors in infrastructure providers.""" def __init__(self, provider_name: str): """ Initialize the error handler. Args: provider_name: Name of the provider for error context """ self.provider_name = provider_name self.logger = logging.getLogger(f"{__name__}.{provider_name}") def handle_error(self, error: Exception, context: str = "", reraise_as: type = None) -> None: """ Handle an error with proper logging and optional re-raising. Args: error: The original error context: Additional context about when the error occurred reraise_as: Exception type to re-raise as (if None, re-raises original) """ error_msg = f"{self.provider_name} error" if context: error_msg += f" during {context}" error_msg += f": {str(error)}" self.logger.error(error_msg, exception=error) if reraise_as: raise reraise_as(error_msg) from error else: raise def log_warning(self, message: str, context: str = "") -> None: """ Log a warning message. Args: message: Warning message context: Additional context """ warning_msg = f"{self.provider_name}" if context: warning_msg += f" ({context})" warning_msg += f": {message}" self.logger.warning(warning_msg) def log_info(self, message: str, context: str = "") -> None: """ Log an info message. Args: message: Info message context: Additional context """ info_msg = f"{self.provider_name}" if context: info_msg += f" ({context})" info_msg += f": {message}" self.logger.info(info_msg) def log_debug(self, message: str, context: str = "") -> None: """ Log a debug message. Args: message: Debug message context: Additional context """ debug_msg = f"{self.provider_name}" if context: debug_msg += f" ({context})" debug_msg += f": {message}" self.logger.info(debug_msg)