Michael Hu
add more logs
fdc056d
"""File generation and management utilities for infrastructure providers."""
import logging
import os
import tempfile
import time
from pathlib import Path
from typing import Optional, Union
import hashlib
logger = logging.getLogger(__name__)
class FileManager:
"""Utility class for managing temporary files and directories."""
def __init__(self, base_dir: Optional[Union[str, Path]] = None):
"""
Initialize the file manager.
Args:
base_dir: Base directory for file operations (defaults to system temp)
"""
if base_dir:
self.base_dir = Path(base_dir)
else:
self.base_dir = Path(tempfile.gettempdir()) / "tts_app"
self.base_dir.mkdir(exist_ok=True)
logger.info(f"FileManager initialized with base directory: {self.base_dir}")
def create_temp_file(self, suffix: str = ".tmp", prefix: str = "temp", content: bytes = None) -> Path:
"""
Create a temporary file.
Args:
suffix: File suffix/extension
prefix: File prefix
content: Optional content to write to the file
Returns:
Path: Path to the created temporary file
"""
timestamp = int(time.time() * 1000)
filename = f"{prefix}_{timestamp}{suffix}"
file_path = self.base_dir / filename
if content:
with open(file_path, 'wb') as f:
f.write(content)
else:
file_path.touch()
logger.info(f"Created temporary file: {file_path}")
return file_path
def create_unique_filename(self, base_name: str, extension: str = "", content_hash: bool = False, content: bytes = None) -> str:
"""
Create a unique filename.
Args:
base_name: Base name for the file
extension: File extension (with or without dot)
content_hash: Whether to include content hash in filename
content: Content to hash (required if content_hash=True)
Returns:
str: Unique filename
"""
timestamp = int(time.time() * 1000)
if not extension.startswith('.') and extension:
extension = '.' + extension
filename = f"{base_name}_{timestamp}"
if content_hash and content:
hash_obj = hashlib.md5(content)
content_hash_str = hash_obj.hexdigest()[:8]
filename += f"_{content_hash_str}"
filename += extension
return filename
def save_audio_file(self, audio_data: bytes, format: str = "wav", prefix: str = "audio") -> Path:
"""
Save audio data to a file.
Args:
audio_data: Raw audio data
format: Audio format (wav, mp3, etc.)
prefix: Filename prefix
Returns:
Path: Path to the saved audio file
"""
if not format.startswith('.'):
format = '.' + format
filename = self.create_unique_filename(prefix, format, content_hash=True, content=audio_data)
file_path = self.base_dir / filename
with open(file_path, 'wb') as f:
f.write(audio_data)
logger.info(f"Saved audio file: {file_path} ({len(audio_data)} bytes)")
return file_path
def save_text_file(self, text_content: str, encoding: str = "utf-8", prefix: str = "text") -> Path:
"""
Save text content to a file.
Args:
text_content: Text content to save
encoding: Text encoding
prefix: Filename prefix
Returns:
Path: Path to the saved text file
"""
filename = self.create_unique_filename(prefix, ".txt")
file_path = self.base_dir / filename
with open(file_path, 'w', encoding=encoding) as f:
f.write(text_content)
logger.info(f"Saved text file: {file_path} ({len(text_content)} characters)")
return file_path
def cleanup_file(self, file_path: Union[str, Path]) -> bool:
"""
Clean up a single file.
Args:
file_path: Path to the file to clean up
Returns:
bool: True if file was successfully deleted, False otherwise
"""
try:
path = Path(file_path)
if path.exists() and path.is_file():
path.unlink()
logger.info(f"Cleaned up file: {path}")
return True
return False
except Exception as e:
logger.warning(f"Failed to cleanup file {file_path}: {str(e)}")
return False
def cleanup_old_files(self, max_age_hours: int = 24, pattern: str = "*") -> int:
"""
Clean up old files in the base directory.
Args:
max_age_hours: Maximum age of files to keep in hours
pattern: File pattern to match (glob pattern)
Returns:
int: Number of files cleaned up
"""
try:
current_time = time.time()
max_age_seconds = max_age_hours * 3600
cleaned_count = 0
for file_path in self.base_dir.glob(pattern):
if file_path.is_file():
file_age = current_time - file_path.stat().st_mtime
if file_age > max_age_seconds:
if self.cleanup_file(file_path):
cleaned_count += 1
if cleaned_count > 0:
logger.info(f"Cleaned up {cleaned_count} old files")
return cleaned_count
except Exception as e:
logger.error(f"Failed to cleanup old files: {str(e)}")
return 0
def get_file_info(self, file_path: Union[str, Path]) -> dict:
"""
Get information about a file.
Args:
file_path: Path to the file
Returns:
dict: File information
"""
try:
path = Path(file_path)
if not path.exists():
return {'exists': False}
stat = path.stat()
return {
'exists': True,
'size_bytes': stat.st_size,
'created_time': stat.st_ctime,
'modified_time': stat.st_mtime,
'is_file': path.is_file(),
'is_directory': path.is_dir(),
'extension': path.suffix,
'name': path.name,
'parent': str(path.parent)
}
except Exception as e:
logger.error(f"Failed to get file info for {file_path}: {str(e)}")
return {'exists': False, 'error': str(e)}
def ensure_directory(self, dir_path: Union[str, Path]) -> Path:
"""
Ensure a directory exists, creating it if necessary.
Args:
dir_path: Path to the directory
Returns:
Path: Path to the directory
"""
path = Path(dir_path)
path.mkdir(parents=True, exist_ok=True)
logger.info(f"Ensured directory exists: {path}")
return path
def get_disk_usage(self) -> dict:
"""
Get disk usage information for the base directory.
Returns:
dict: Disk usage information
"""
try:
total_size = 0
file_count = 0
for file_path in self.base_dir.rglob('*'):
if file_path.is_file():
total_size += file_path.stat().st_size
file_count += 1
return {
'base_directory': str(self.base_dir),
'total_size_bytes': total_size,
'total_size_mb': total_size / (1024 * 1024),
'file_count': file_count
}
except Exception as e:
logger.error(f"Failed to get disk usage: {str(e)}")
return {'error': str(e)}
class AudioFileGenerator:
"""Utility class for generating audio files from raw audio data."""
@staticmethod
def save_wav_file(audio_data: bytes, sample_rate: int, file_path: Union[str, Path], channels: int = 1, sample_width: int = 2) -> Path:
"""
Save raw audio data as a WAV file.
Args:
audio_data: Raw audio data
sample_rate: Sample rate in Hz
file_path: Output file path
channels: Number of audio channels
sample_width: Sample width in bytes
Returns:
Path: Path to the saved WAV file
"""
try:
import wave
path = Path(file_path)
with wave.open(str(path), 'wb') as wav_file:
wav_file.setnchannels(channels)
wav_file.setsampwidth(sample_width)
wav_file.setframerate(sample_rate)
wav_file.writeframes(audio_data)
logger.info(f"Saved WAV file: {path} (sample_rate={sample_rate}, channels={channels})")
return path
except Exception as e:
logger.error(f"Failed to save WAV file: {str(e)}")
raise
@staticmethod
def convert_numpy_to_wav(audio_array, sample_rate: int, file_path: Union[str, Path]) -> Path:
"""
Convert numpy array to WAV file.
Args:
audio_array: Numpy array containing audio data
sample_rate: Sample rate in Hz
file_path: Output file path
Returns:
Path: Path to the saved WAV file
"""
try:
import numpy as np
import soundfile as sf
path = Path(file_path)
# Ensure audio is in the correct format
if audio_array.dtype != np.float32:
audio_array = audio_array.astype(np.float32)
# Normalize if needed
if np.max(np.abs(audio_array)) > 1.0:
audio_array = audio_array / np.max(np.abs(audio_array))
sf.write(str(path), audio_array, sample_rate)
logger.info(f"Converted numpy array to WAV: {path}")
return path
except ImportError:
logger.error("soundfile library not available for numpy conversion")
raise
except Exception as e:
logger.error(f"Failed to convert numpy array to WAV: {str(e)}")
raise
class ErrorHandler:
"""Utility class for handling and logging errors in infrastructure providers."""
def __init__(self, provider_name: str):
"""
Initialize the error handler.
Args:
provider_name: Name of the provider for error context
"""
self.provider_name = provider_name
self.logger = logging.getLogger(f"{__name__}.{provider_name}")
def handle_error(self, error: Exception, context: str = "", reraise_as: type = None) -> None:
"""
Handle an error with proper logging and optional re-raising.
Args:
error: The original error
context: Additional context about when the error occurred
reraise_as: Exception type to re-raise as (if None, re-raises original)
"""
error_msg = f"{self.provider_name} error"
if context:
error_msg += f" during {context}"
error_msg += f": {str(error)}"
self.logger.error(error_msg, exception=error)
if reraise_as:
raise reraise_as(error_msg) from error
else:
raise
def log_warning(self, message: str, context: str = "") -> None:
"""
Log a warning message.
Args:
message: Warning message
context: Additional context
"""
warning_msg = f"{self.provider_name}"
if context:
warning_msg += f" ({context})"
warning_msg += f": {message}"
self.logger.warning(warning_msg)
def log_info(self, message: str, context: str = "") -> None:
"""
Log an info message.
Args:
message: Info message
context: Additional context
"""
info_msg = f"{self.provider_name}"
if context:
info_msg += f" ({context})"
info_msg += f": {message}"
self.logger.info(info_msg)
def log_debug(self, message: str, context: str = "") -> None:
"""
Log a debug message.
Args:
message: Debug message
context: Additional context
"""
debug_msg = f"{self.provider_name}"
if context:
debug_msg += f" ({context})"
debug_msg += f": {message}"
self.logger.info(debug_msg)