Michael Hu
initial check in
05b45a5
"""Temporary file writer for audio downloads"""
import os
import tempfile
from typing import List, Optional
import aiofiles
from fastapi import HTTPException
from loguru import logger
from ..core.config import settings
async def cleanup_temp_files() -> None:
"""Clean up old temp files"""
try:
if not await aiofiles.os.path.exists(settings.temp_file_dir):
await aiofiles.os.makedirs(settings.temp_file_dir, exist_ok=True)
return
# Get all temp files with stats
files = []
total_size = 0
# Use os.scandir for sync iteration, but aiofiles.os.stat for async stats
for entry in os.scandir(settings.temp_file_dir):
if entry.is_file():
stat = await aiofiles.os.stat(entry.path)
files.append((entry.path, stat.st_mtime, stat.st_size))
total_size += stat.st_size
# Sort by modification time (oldest first)
files.sort(key=lambda x: x[1])
# Remove files if:
# 1. They're too old
# 2. We have too many files
# 3. Directory is too large
current_time = (await aiofiles.os.stat(settings.temp_file_dir)).st_mtime
max_age = settings.max_temp_dir_age_hours * 3600
for path, mtime, size in files:
should_delete = False
# Check age
if current_time - mtime > max_age:
should_delete = True
logger.info(f"Deleting old temp file: {path}")
# Check count limit
elif len(files) > settings.max_temp_dir_count:
should_delete = True
logger.info(f"Deleting excess temp file: {path}")
# Check size limit
elif total_size > settings.max_temp_dir_size_mb * 1024 * 1024:
should_delete = True
logger.info(f"Deleting to reduce directory size: {path}")
if should_delete:
try:
await aiofiles.os.remove(path)
total_size -= size
logger.info(f"Deleted temp file: {path}")
except Exception as e:
logger.warning(f"Failed to delete temp file {path}: {e}")
except Exception as e:
logger.warning(f"Error during temp file cleanup: {e}")
class TempFileWriter:
"""Handles writing audio chunks to a temp file"""
def __init__(self, format: str):
"""Initialize temp file writer
Args:
format: Audio format extension (mp3, wav, etc)
"""
self.format = format
self.temp_file = None
self._finalized = False
self._write_error = False # Flag to track if we've had a write error
async def __aenter__(self):
"""Async context manager entry"""
try:
# Clean up old files first
await cleanup_temp_files()
# Create temp file with proper extension
await aiofiles.os.makedirs(settings.temp_file_dir, exist_ok=True)
temp = tempfile.NamedTemporaryFile(
dir=settings.temp_file_dir,
delete=False,
suffix=f".{self.format}",
mode="wb",
)
self.temp_file = await aiofiles.open(temp.name, mode="wb")
self.temp_path = temp.name
temp.close() # Close sync file, we'll use async version
# Generate download path immediately
self.download_path = f"/download/{os.path.basename(self.temp_path)}"
except Exception as e:
# Handle permission issues or other errors gracefully
logger.error(f"Failed to create temp file: {e}")
self._write_error = True
# Set a placeholder path so the API can still function
self.temp_path = f"unavailable_{self.format}"
self.download_path = f"/download/{self.temp_path}"
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Async context manager exit"""
try:
if self.temp_file and not self._finalized:
await self.temp_file.close()
self._finalized = True
except Exception as e:
logger.error(f"Error closing temp file: {e}")
self._write_error = True
async def write(self, chunk: bytes) -> None:
"""Write a chunk of audio data
Args:
chunk: Audio data bytes to write
"""
if self._finalized:
raise RuntimeError("Cannot write to finalized temp file")
# Skip writing if we've already encountered an error
if self._write_error or not self.temp_file:
return
try:
await self.temp_file.write(chunk)
await self.temp_file.flush()
except Exception as e:
# Handle permission issues or other errors gracefully
logger.error(f"Failed to write to temp file: {e}")
self._write_error = True
async def finalize(self) -> str:
"""Close temp file and return download path
Returns:
Path to use for downloading the temp file
"""
if self._finalized:
raise RuntimeError("Temp file already finalized")
# Skip finalizing if we've already encountered an error
if self._write_error or not self.temp_file:
self._finalized = True
return self.download_path
try:
await self.temp_file.close()
self._finalized = True
except Exception as e:
# Handle permission issues or other errors gracefully
logger.error(f"Failed to finalize temp file: {e}")
self._write_error = True
self._finalized = True
return self.download_path