Spaces:
Runtime error
Runtime error
"""Temporary file writer for audio downloads""" | |
import os | |
import tempfile | |
from typing import List, Optional | |
import aiofiles | |
from fastapi import HTTPException | |
from loguru import logger | |
from ..core.config import settings | |
async def cleanup_temp_files() -> None: | |
"""Clean up old temp files""" | |
try: | |
if not await aiofiles.os.path.exists(settings.temp_file_dir): | |
await aiofiles.os.makedirs(settings.temp_file_dir, exist_ok=True) | |
return | |
# Get all temp files with stats | |
files = [] | |
total_size = 0 | |
# Use os.scandir for sync iteration, but aiofiles.os.stat for async stats | |
for entry in os.scandir(settings.temp_file_dir): | |
if entry.is_file(): | |
stat = await aiofiles.os.stat(entry.path) | |
files.append((entry.path, stat.st_mtime, stat.st_size)) | |
total_size += stat.st_size | |
# Sort by modification time (oldest first) | |
files.sort(key=lambda x: x[1]) | |
# Remove files if: | |
# 1. They're too old | |
# 2. We have too many files | |
# 3. Directory is too large | |
current_time = (await aiofiles.os.stat(settings.temp_file_dir)).st_mtime | |
max_age = settings.max_temp_dir_age_hours * 3600 | |
for path, mtime, size in files: | |
should_delete = False | |
# Check age | |
if current_time - mtime > max_age: | |
should_delete = True | |
logger.info(f"Deleting old temp file: {path}") | |
# Check count limit | |
elif len(files) > settings.max_temp_dir_count: | |
should_delete = True | |
logger.info(f"Deleting excess temp file: {path}") | |
# Check size limit | |
elif total_size > settings.max_temp_dir_size_mb * 1024 * 1024: | |
should_delete = True | |
logger.info(f"Deleting to reduce directory size: {path}") | |
if should_delete: | |
try: | |
await aiofiles.os.remove(path) | |
total_size -= size | |
logger.info(f"Deleted temp file: {path}") | |
except Exception as e: | |
logger.warning(f"Failed to delete temp file {path}: {e}") | |
except Exception as e: | |
logger.warning(f"Error during temp file cleanup: {e}") | |
class TempFileWriter: | |
"""Handles writing audio chunks to a temp file""" | |
def __init__(self, format: str): | |
"""Initialize temp file writer | |
Args: | |
format: Audio format extension (mp3, wav, etc) | |
""" | |
self.format = format | |
self.temp_file = None | |
self._finalized = False | |
self._write_error = False # Flag to track if we've had a write error | |
async def __aenter__(self): | |
"""Async context manager entry""" | |
try: | |
# Clean up old files first | |
await cleanup_temp_files() | |
# Create temp file with proper extension | |
await aiofiles.os.makedirs(settings.temp_file_dir, exist_ok=True) | |
temp = tempfile.NamedTemporaryFile( | |
dir=settings.temp_file_dir, | |
delete=False, | |
suffix=f".{self.format}", | |
mode="wb", | |
) | |
self.temp_file = await aiofiles.open(temp.name, mode="wb") | |
self.temp_path = temp.name | |
temp.close() # Close sync file, we'll use async version | |
# Generate download path immediately | |
self.download_path = f"/download/{os.path.basename(self.temp_path)}" | |
except Exception as e: | |
# Handle permission issues or other errors gracefully | |
logger.error(f"Failed to create temp file: {e}") | |
self._write_error = True | |
# Set a placeholder path so the API can still function | |
self.temp_path = f"unavailable_{self.format}" | |
self.download_path = f"/download/{self.temp_path}" | |
return self | |
async def __aexit__(self, exc_type, exc_val, exc_tb): | |
"""Async context manager exit""" | |
try: | |
if self.temp_file and not self._finalized: | |
await self.temp_file.close() | |
self._finalized = True | |
except Exception as e: | |
logger.error(f"Error closing temp file: {e}") | |
self._write_error = True | |
async def write(self, chunk: bytes) -> None: | |
"""Write a chunk of audio data | |
Args: | |
chunk: Audio data bytes to write | |
""" | |
if self._finalized: | |
raise RuntimeError("Cannot write to finalized temp file") | |
# Skip writing if we've already encountered an error | |
if self._write_error or not self.temp_file: | |
return | |
try: | |
await self.temp_file.write(chunk) | |
await self.temp_file.flush() | |
except Exception as e: | |
# Handle permission issues or other errors gracefully | |
logger.error(f"Failed to write to temp file: {e}") | |
self._write_error = True | |
async def finalize(self) -> str: | |
"""Close temp file and return download path | |
Returns: | |
Path to use for downloading the temp file | |
""" | |
if self._finalized: | |
raise RuntimeError("Temp file already finalized") | |
# Skip finalizing if we've already encountered an error | |
if self._write_error or not self.temp_file: | |
self._finalized = True | |
return self.download_path | |
try: | |
await self.temp_file.close() | |
self._finalized = True | |
except Exception as e: | |
# Handle permission issues or other errors gracefully | |
logger.error(f"Failed to finalize temp file: {e}") | |
self._write_error = True | |
self._finalized = True | |
return self.download_path | |