Spaces:
Build error
Build error
| """File generation and management utilities for infrastructure providers.""" | |
| import logging | |
| import os | |
| import tempfile | |
| import time | |
| from pathlib import Path | |
| from typing import Optional, Union | |
| import hashlib | |
| logger = logging.getLogger(__name__) | |
| class FileManager: | |
| """Utility class for managing temporary files and directories.""" | |
| def __init__(self, base_dir: Optional[Union[str, Path]] = None): | |
| """ | |
| Initialize the file manager. | |
| Args: | |
| base_dir: Base directory for file operations (defaults to system temp) | |
| """ | |
| if base_dir: | |
| self.base_dir = Path(base_dir) | |
| else: | |
| self.base_dir = Path(tempfile.gettempdir()) / "tts_app" | |
| self.base_dir.mkdir(exist_ok=True) | |
| logger.debug(f"FileManager initialized with base directory: {self.base_dir}") | |
| def create_temp_file(self, suffix: str = ".tmp", prefix: str = "temp", content: bytes = None) -> Path: | |
| """ | |
| Create a temporary file. | |
| Args: | |
| suffix: File suffix/extension | |
| prefix: File prefix | |
| content: Optional content to write to the file | |
| Returns: | |
| Path: Path to the created temporary file | |
| """ | |
| timestamp = int(time.time() * 1000) | |
| filename = f"{prefix}_{timestamp}{suffix}" | |
| file_path = self.base_dir / filename | |
| if content: | |
| with open(file_path, 'wb') as f: | |
| f.write(content) | |
| else: | |
| file_path.touch() | |
| logger.debug(f"Created temporary file: {file_path}") | |
| return file_path | |
| def create_unique_filename(self, base_name: str, extension: str = "", content_hash: bool = False, content: bytes = None) -> str: | |
| """ | |
| Create a unique filename. | |
| Args: | |
| base_name: Base name for the file | |
| extension: File extension (with or without dot) | |
| content_hash: Whether to include content hash in filename | |
| content: Content to hash (required if content_hash=True) | |
| Returns: | |
| str: Unique filename | |
| """ | |
| timestamp = int(time.time() * 1000) | |
| if not extension.startswith('.') and extension: | |
| extension = '.' + extension | |
| filename = f"{base_name}_{timestamp}" | |
| if content_hash and content: | |
| hash_obj = hashlib.md5(content) | |
| content_hash_str = hash_obj.hexdigest()[:8] | |
| filename += f"_{content_hash_str}" | |
| filename += extension | |
| return filename | |
| def save_audio_file(self, audio_data: bytes, format: str = "wav", prefix: str = "audio") -> Path: | |
| """ | |
| Save audio data to a file. | |
| Args: | |
| audio_data: Raw audio data | |
| format: Audio format (wav, mp3, etc.) | |
| prefix: Filename prefix | |
| Returns: | |
| Path: Path to the saved audio file | |
| """ | |
| if not format.startswith('.'): | |
| format = '.' + format | |
| filename = self.create_unique_filename(prefix, format, content_hash=True, content=audio_data) | |
| file_path = self.base_dir / filename | |
| with open(file_path, 'wb') as f: | |
| f.write(audio_data) | |
| logger.debug(f"Saved audio file: {file_path} ({len(audio_data)} bytes)") | |
| return file_path | |
| def save_text_file(self, text_content: str, encoding: str = "utf-8", prefix: str = "text") -> Path: | |
| """ | |
| Save text content to a file. | |
| Args: | |
| text_content: Text content to save | |
| encoding: Text encoding | |
| prefix: Filename prefix | |
| Returns: | |
| Path: Path to the saved text file | |
| """ | |
| filename = self.create_unique_filename(prefix, ".txt") | |
| file_path = self.base_dir / filename | |
| with open(file_path, 'w', encoding=encoding) as f: | |
| f.write(text_content) | |
| logger.debug(f"Saved text file: {file_path} ({len(text_content)} characters)") | |
| return file_path | |
| def cleanup_file(self, file_path: Union[str, Path]) -> bool: | |
| """ | |
| Clean up a single file. | |
| Args: | |
| file_path: Path to the file to clean up | |
| Returns: | |
| bool: True if file was successfully deleted, False otherwise | |
| """ | |
| try: | |
| path = Path(file_path) | |
| if path.exists() and path.is_file(): | |
| path.unlink() | |
| logger.debug(f"Cleaned up file: {path}") | |
| return True | |
| return False | |
| except Exception as e: | |
| logger.warning(f"Failed to cleanup file {file_path}: {str(e)}") | |
| return False | |
| def cleanup_old_files(self, max_age_hours: int = 24, pattern: str = "*") -> int: | |
| """ | |
| Clean up old files in the base directory. | |
| Args: | |
| max_age_hours: Maximum age of files to keep in hours | |
| pattern: File pattern to match (glob pattern) | |
| Returns: | |
| int: Number of files cleaned up | |
| """ | |
| try: | |
| current_time = time.time() | |
| max_age_seconds = max_age_hours * 3600 | |
| cleaned_count = 0 | |
| for file_path in self.base_dir.glob(pattern): | |
| if file_path.is_file(): | |
| file_age = current_time - file_path.stat().st_mtime | |
| if file_age > max_age_seconds: | |
| if self.cleanup_file(file_path): | |
| cleaned_count += 1 | |
| if cleaned_count > 0: | |
| logger.info(f"Cleaned up {cleaned_count} old files") | |
| return cleaned_count | |
| except Exception as e: | |
| logger.error(f"Failed to cleanup old files: {str(e)}") | |
| return 0 | |
| def get_file_info(self, file_path: Union[str, Path]) -> dict: | |
| """ | |
| Get information about a file. | |
| Args: | |
| file_path: Path to the file | |
| Returns: | |
| dict: File information | |
| """ | |
| try: | |
| path = Path(file_path) | |
| if not path.exists(): | |
| return {'exists': False} | |
| stat = path.stat() | |
| return { | |
| 'exists': True, | |
| 'size_bytes': stat.st_size, | |
| 'created_time': stat.st_ctime, | |
| 'modified_time': stat.st_mtime, | |
| 'is_file': path.is_file(), | |
| 'is_directory': path.is_dir(), | |
| 'extension': path.suffix, | |
| 'name': path.name, | |
| 'parent': str(path.parent) | |
| } | |
| except Exception as e: | |
| logger.error(f"Failed to get file info for {file_path}: {str(e)}") | |
| return {'exists': False, 'error': str(e)} | |
| def ensure_directory(self, dir_path: Union[str, Path]) -> Path: | |
| """ | |
| Ensure a directory exists, creating it if necessary. | |
| Args: | |
| dir_path: Path to the directory | |
| Returns: | |
| Path: Path to the directory | |
| """ | |
| path = Path(dir_path) | |
| path.mkdir(parents=True, exist_ok=True) | |
| logger.debug(f"Ensured directory exists: {path}") | |
| return path | |
| def get_disk_usage(self) -> dict: | |
| """ | |
| Get disk usage information for the base directory. | |
| Returns: | |
| dict: Disk usage information | |
| """ | |
| try: | |
| total_size = 0 | |
| file_count = 0 | |
| for file_path in self.base_dir.rglob('*'): | |
| if file_path.is_file(): | |
| total_size += file_path.stat().st_size | |
| file_count += 1 | |
| return { | |
| 'base_directory': str(self.base_dir), | |
| 'total_size_bytes': total_size, | |
| 'total_size_mb': total_size / (1024 * 1024), | |
| 'file_count': file_count | |
| } | |
| except Exception as e: | |
| logger.error(f"Failed to get disk usage: {str(e)}") | |
| return {'error': str(e)} | |
| class AudioFileGenerator: | |
| """Utility class for generating audio files from raw audio data.""" | |
| def save_wav_file(audio_data: bytes, sample_rate: int, file_path: Union[str, Path], channels: int = 1, sample_width: int = 2) -> Path: | |
| """ | |
| Save raw audio data as a WAV file. | |
| Args: | |
| audio_data: Raw audio data | |
| sample_rate: Sample rate in Hz | |
| file_path: Output file path | |
| channels: Number of audio channels | |
| sample_width: Sample width in bytes | |
| Returns: | |
| Path: Path to the saved WAV file | |
| """ | |
| try: | |
| import wave | |
| path = Path(file_path) | |
| with wave.open(str(path), 'wb') as wav_file: | |
| wav_file.setnchannels(channels) | |
| wav_file.setsampwidth(sample_width) | |
| wav_file.setframerate(sample_rate) | |
| wav_file.writeframes(audio_data) | |
| logger.debug(f"Saved WAV file: {path} (sample_rate={sample_rate}, channels={channels})") | |
| return path | |
| except Exception as e: | |
| logger.error(f"Failed to save WAV file: {str(e)}") | |
| raise | |
| def convert_numpy_to_wav(audio_array, sample_rate: int, file_path: Union[str, Path]) -> Path: | |
| """ | |
| Convert numpy array to WAV file. | |
| Args: | |
| audio_array: Numpy array containing audio data | |
| sample_rate: Sample rate in Hz | |
| file_path: Output file path | |
| Returns: | |
| Path: Path to the saved WAV file | |
| """ | |
| try: | |
| import numpy as np | |
| import soundfile as sf | |
| path = Path(file_path) | |
| # Ensure audio is in the correct format | |
| if audio_array.dtype != np.float32: | |
| audio_array = audio_array.astype(np.float32) | |
| # Normalize if needed | |
| if np.max(np.abs(audio_array)) > 1.0: | |
| audio_array = audio_array / np.max(np.abs(audio_array)) | |
| sf.write(str(path), audio_array, sample_rate) | |
| logger.debug(f"Converted numpy array to WAV: {path}") | |
| return path | |
| except ImportError: | |
| logger.error("soundfile library not available for numpy conversion") | |
| raise | |
| except Exception as e: | |
| logger.error(f"Failed to convert numpy array to WAV: {str(e)}") | |
| raise | |
| class ErrorHandler: | |
| """Utility class for handling and logging errors in infrastructure providers.""" | |
| def __init__(self, provider_name: str): | |
| """ | |
| Initialize the error handler. | |
| Args: | |
| provider_name: Name of the provider for error context | |
| """ | |
| self.provider_name = provider_name | |
| self.logger = logging.getLogger(f"{__name__}.{provider_name}") | |
| def handle_error(self, error: Exception, context: str = "", reraise_as: type = None) -> None: | |
| """ | |
| Handle an error with proper logging and optional re-raising. | |
| Args: | |
| error: The original error | |
| context: Additional context about when the error occurred | |
| reraise_as: Exception type to re-raise as (if None, re-raises original) | |
| """ | |
| error_msg = f"{self.provider_name} error" | |
| if context: | |
| error_msg += f" during {context}" | |
| error_msg += f": {str(error)}" | |
| self.logger.error(error_msg, exc_info=True) | |
| if reraise_as: | |
| raise reraise_as(error_msg) from error | |
| else: | |
| raise | |
| def log_warning(self, message: str, context: str = "") -> None: | |
| """ | |
| Log a warning message. | |
| Args: | |
| message: Warning message | |
| context: Additional context | |
| """ | |
| warning_msg = f"{self.provider_name}" | |
| if context: | |
| warning_msg += f" ({context})" | |
| warning_msg += f": {message}" | |
| self.logger.warning(warning_msg) | |
| def log_info(self, message: str, context: str = "") -> None: | |
| """ | |
| Log an info message. | |
| Args: | |
| message: Info message | |
| context: Additional context | |
| """ | |
| info_msg = f"{self.provider_name}" | |
| if context: | |
| info_msg += f" ({context})" | |
| info_msg += f": {message}" | |
| self.logger.info(info_msg) | |
| def log_debug(self, message: str, context: str = "") -> None: | |
| """ | |
| Log a debug message. | |
| Args: | |
| message: Debug message | |
| context: Additional context | |
| """ | |
| debug_msg = f"{self.provider_name}" | |
| if context: | |
| debug_msg += f" ({context})" | |
| debug_msg += f": {message}" | |
| self.logger.debug(debug_msg) |