Spaces:
Running
Running
import os | |
import uuid | |
import tempfile | |
import logging | |
import math | |
from typing import List, Optional | |
from fastapi import FastAPI, File, UploadFile, Form, HTTPException, BackgroundTasks, Query | |
from fastapi.responses import FileResponse, JSONResponse | |
from pydub import AudioSegment | |
from pydub.exceptions import CouldntDecodeError | |
# --- Configuration & Setup --- | |
TEMP_DIR = tempfile.gettempdir() | |
os.makedirs(TEMP_DIR, exist_ok=True) | |
# Configure logging | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
# --- FastAPI App Initialization --- | |
app = FastAPI( | |
title="Enhanced Audio Editor API", | |
description="API for various audio editing tasks including trim, concat, volume, convert, fade, reverse, normalize, overlay, info, silence, speed. Requires FFmpeg.", | |
version="2.0.0", | |
) | |
# --- Helper Functions (Slightly Enhanced) --- | |
def cleanup_file(file_path: str): | |
"""Safely remove a file.""" | |
try: | |
if os.path.exists(file_path): | |
os.remove(file_path) | |
logger.info(f"Cleaned up temporary file: {file_path}") | |
except Exception as e: | |
logger.error(f"Error cleaning up file {file_path}: {e}", exc_info=True) | |
async def save_upload_file(upload_file: UploadFile) -> str: | |
"""Saves an uploaded file to a temporary location and returns the path.""" | |
file_extension = os.path.splitext(upload_file.filename)[1].lower() or '.tmp' | |
temp_file_path = os.path.join(TEMP_DIR, f"{uuid.uuid4().hex}{file_extension}") | |
try: | |
with open(temp_file_path, "wb") as buffer: | |
while content := await upload_file.read(1024 * 1024): | |
buffer.write(content) | |
logger.info(f"Saved uploaded file '{upload_file.filename}' to temp path: {temp_file_path}") | |
return temp_file_path | |
except Exception as e: | |
logger.error(f"Failed to save uploaded file {upload_file.filename}: {e}", exc_info=True) | |
cleanup_file(temp_file_path) | |
raise HTTPException(status_code=500, detail=f"Could not save uploaded file: {upload_file.filename}") | |
finally: | |
await upload_file.close() | |
def load_audio(file_path: str) -> AudioSegment: | |
"""Loads an audio file using pydub.""" | |
try: | |
audio = AudioSegment.from_file(file_path) | |
logger.info(f"Loaded audio from: {file_path} ({len(audio)}ms)") | |
return audio | |
except CouldntDecodeError: | |
logger.warning(f"pydub couldn't decode file: {file_path}. Check format/corruption/FFmpeg.") | |
raise HTTPException(status_code=415, detail=f"Unsupported audio format or corrupted file: {os.path.basename(file_path)}. Ensure FFmpeg is correctly installed and supports the format.") | |
except FileNotFoundError: | |
logger.error(f"Audio file not found after saving: {file_path}") | |
raise HTTPException(status_code=500, detail="Internal error: Temporary audio file missing.") | |
except Exception as e: | |
logger.error(f"Error loading audio file {file_path}: {e}", exc_info=True) | |
raise HTTPException(status_code=500, detail=f"Error processing audio file: {os.path.basename(file_path)}") | |
def export_audio(audio: AudioSegment, format: str, bitrate: Optional[str] = None) -> str: | |
"""Exports an AudioSegment to a temporary file and returns the path. Supports bitrate.""" | |
output_filename = f"edited_{uuid.uuid4().hex}.{format}" | |
output_path = os.path.join(TEMP_DIR, output_filename) | |
export_params = {} | |
if bitrate and format in ['mp3', 'ogg', 'aac', 'm4a']: # Add other formats if they support bitrate param in pydub/ffmpeg | |
export_params['bitrate'] = bitrate | |
logger.info(f"Using bitrate: {bitrate} for export.") | |
try: | |
logger.info(f"Exporting audio to format '{format}' at {output_path}") | |
audio.export(output_path, format=format, **export_params) | |
return output_path | |
except Exception as e: | |
logger.error(f"Error exporting audio to format {format} (bitrate: {bitrate}): {e}", exc_info=True) | |
cleanup_file(output_path) | |
raise HTTPException(status_code=500, detail=f"Failed to export audio to format '{format}'. Check format support and parameters.") | |
# --- API Endpoints --- | |
# --- General --- | |
def read_root(): | |
"""Root endpoint providing a welcome message.""" | |
return {"message": "Welcome to the Enhanced Audio Editor API. Use POST requests to the specific editing endpoints."} | |
async def get_audio_info( | |
background_tasks: BackgroundTasks, | |
file: UploadFile = File(..., description="Audio file to analyze.") | |
): | |
"""Retrieves basic information about the uploaded audio file.""" | |
logger.info(f"Info request: file='{file.filename}'") | |
input_path = await save_upload_file(file) | |
background_tasks.add_task(cleanup_file, input_path) # Schedule cleanup | |
try: | |
audio = load_audio(input_path) | |
info = { | |
"filename": file.filename, | |
"duration_ms": len(audio), | |
"duration_seconds": len(audio) / 1000.0, | |
"channels": audio.channels, | |
"sample_width_bytes": audio.sample_width, | |
"frame_rate_hz": audio.frame_rate, | |
"frame_count": audio.frame_count(), | |
"max_amplitude": audio.max, # Max sample value (peak) | |
"rms_amplitude": audio.rms, # Root Mean Square amplitude (average loudness) | |
"dBFS": audio.dBFS, # Peak amplitude in dBFS | |
} | |
logger.info(f"Audio info retrieved for '{file.filename}': {info}") | |
return JSONResponse(content=info) | |
except Exception as e: | |
logger.error(f"Error during info operation: {e}", exc_info=True) | |
if isinstance(e, HTTPException): raise e | |
else: raise HTTPException(status_code=500, detail=f"An unexpected error occurred while getting audio info: {str(e)}") | |
# --- Basic Editing --- | |
async def trim_audio( | |
background_tasks: BackgroundTasks, | |
file: UploadFile = File(..., description="Audio file to trim."), | |
start_ms: int = Form(..., description="Start time in milliseconds."), | |
end_ms: int = Form(..., description="End time in milliseconds.") | |
): | |
"""Trims an audio file to the specified start and end times (in milliseconds).""" | |
if start_ms < 0 or end_ms <= start_ms: | |
raise HTTPException(status_code=422, detail="Invalid start/end times. Ensure start_ms >= 0 and end_ms > start_ms.") | |
logger.info(f"Trim request: file='{file.filename}', start={start_ms}ms, end={end_ms}ms") | |
input_path = await save_upload_file(file) | |
background_tasks.add_task(cleanup_file, input_path) | |
try: | |
audio = load_audio(input_path) | |
if end_ms > len(audio): | |
logger.warning(f"End time ({end_ms}ms) exceeds audio duration ({len(audio)}ms). Trimming to end.") | |
end_ms = len(audio) | |
if start_ms >= len(audio): | |
raise HTTPException(status_code=422, detail=f"Start time ({start_ms}ms) is beyond audio duration ({len(audio)}ms).") | |
trimmed_audio = audio[start_ms:end_ms] | |
logger.info(f"Audio trimmed to {len(trimmed_audio)}ms") | |
original_format = os.path.splitext(file.filename)[1][1:].lower() or "mp3" | |
if not original_format or original_format == "tmp": original_format = "mp3" | |
output_path = export_audio(trimmed_audio, original_format) | |
background_tasks.add_task(cleanup_file, output_path) | |
return FileResponse( | |
path=output_path, | |
media_type=f"audio/{original_format}", | |
filename=f"trimmed_{start_ms}-{end_ms}ms_{file.filename}" | |
) | |
except Exception as e: | |
logger.error(f"Error during trim operation: {e}", exc_info=True) | |
if 'output_path' in locals() and os.path.exists(output_path): cleanup_file(output_path) | |
# Input cleanup is handled by background task unless saving failed earlier | |
if isinstance(e, HTTPException): raise e | |
else: raise HTTPException(status_code=500, detail=f"An unexpected error occurred during trimming: {str(e)}") | |
async def concatenate_audio( | |
background_tasks: BackgroundTasks, | |
files: List[UploadFile] = File(..., description="Two or more audio files to join in order."), | |
output_format: str = Form("mp3", description="Desired output format (e.g., 'mp3', 'wav', 'ogg')."), | |
crossfade_ms: int = Form(0, description="Duration of crossfade between segments in milliseconds (0 for no crossfade).") | |
): | |
"""Concatenates two or more audio files sequentially, optionally with crossfade.""" | |
if len(files) < 2: | |
raise HTTPException(status_code=422, detail="Please upload at least two files to concatenate.") | |
if crossfade_ms < 0: | |
raise HTTPException(status_code=422, detail="Crossfade duration cannot be negative.") | |
logger.info(f"Concatenate request: {len(files)} files, format='{output_format}', crossfade={crossfade_ms}ms") | |
input_paths = [] | |
loaded_audios = [] | |
output_path = None # Define to allow cleanup in finally | |
try: | |
for file in files: | |
input_path = await save_upload_file(file) | |
input_paths.append(input_path) | |
background_tasks.add_task(cleanup_file, input_path) | |
audio = load_audio(input_path) | |
loaded_audios.append(audio) | |
if not loaded_audios: | |
raise HTTPException(status_code=500, detail="No audio segments were loaded successfully.") | |
combined_audio = loaded_audios[0] | |
logger.info(f"Starting concatenation with first segment ({len(combined_audio)}ms)") | |
for i in range(1, len(loaded_audios)): | |
logger.info(f"Adding segment {i+1} ({len(loaded_audios[i])}ms)") | |
# Use crossfade parameter if provided | |
combined_audio = combined_audio.append(loaded_audios[i], crossfade=crossfade_ms) | |
logger.info(f"Concatenated audio length: {len(combined_audio)}ms") | |
output_path = export_audio(combined_audio, output_format) | |
background_tasks.add_task(cleanup_file, output_path) | |
first_filename_base = os.path.splitext(files[0].filename)[0] | |
output_filename = f"concat_{first_filename_base}_and_{len(files)-1}_others.{output_format}" | |
return FileResponse(path=output_path, media_type=f"audio/{output_format}", filename=output_filename) | |
except Exception as e: | |
logger.error(f"Error during concat operation: {e}", exc_info=True) | |
# Cleanup output if it exists and error happened after export | |
if output_path and os.path.exists(output_path): cleanup_file(output_path) | |
# Input cleanup is handled by background tasks | |
if isinstance(e, HTTPException): raise e | |
else: raise HTTPException(status_code=500, detail=f"An unexpected error occurred during concatenation: {str(e)}") | |
async def change_volume( | |
background_tasks: BackgroundTasks, | |
file: UploadFile = File(..., description="Audio file to adjust volume for."), | |
change_db: float = Form(..., description="Volume change in decibels (dB). Positive increases, negative decreases.") | |
): | |
"""Adjusts the volume of an audio file by a specified decibel amount.""" | |
logger.info(f"Volume request: file='{file.filename}', change_db={change_db}dB") | |
input_path = await save_upload_file(file) | |
background_tasks.add_task(cleanup_file, input_path) | |
output_path = None | |
try: | |
audio = load_audio(input_path) | |
adjusted_audio = audio + change_db | |
logger.info(f"Volume adjusted by {change_db}dB.") | |
original_format = os.path.splitext(file.filename)[1][1:].lower() or "mp3" | |
if not original_format or original_format == "tmp": original_format = "mp3" | |
output_path = export_audio(adjusted_audio, original_format) | |
background_tasks.add_task(cleanup_file, output_path) | |
return FileResponse(path=output_path, media_type=f"audio/{original_format}", filename=f"volume_{change_db}dB_{file.filename}") | |
except Exception as e: | |
logger.error(f"Error during volume operation: {e}", exc_info=True) | |
if output_path and os.path.exists(output_path): cleanup_file(output_path) | |
if isinstance(e, HTTPException): raise e | |
else: raise HTTPException(status_code=500, detail=f"An unexpected error occurred during volume adjustment: {str(e)}") | |
async def convert_format( | |
background_tasks: BackgroundTasks, | |
file: UploadFile = File(..., description="Audio file to convert."), | |
output_format: str = Form(..., description="Target audio format (e.g., 'mp3', 'wav', 'ogg', 'flac', 'aac')."), | |
bitrate: Optional[str] = Form(None, description="Target bitrate (e.g., '192k', '320k'). Only applicable for certain formats like MP3, OGG, AAC.") | |
): | |
"""Converts an audio file to a different format, optionally specifying bitrate.""" | |
allowed_formats = {'mp3', 'wav', 'ogg', 'flac', 'aac', 'm4a', 'opus'} # Common formats | |
output_format = output_format.lower() | |
if output_format not in allowed_formats: | |
raise HTTPException(status_code=422, detail=f"Invalid output format '{output_format}'. Allowed: {', '.join(allowed_formats)}") | |
logger.info(f"Convert request: file='{file.filename}', format='{output_format}', bitrate='{bitrate}'") | |
input_path = await save_upload_file(file) | |
background_tasks.add_task(cleanup_file, input_path) | |
output_path = None | |
try: | |
audio = load_audio(input_path) | |
output_path = export_audio(audio, output_format, bitrate=bitrate) | |
background_tasks.add_task(cleanup_file, output_path) | |
filename_base = os.path.splitext(file.filename)[0] | |
output_filename = f"{filename_base}_converted.{output_format}" | |
return FileResponse(path=output_path, media_type=f"audio/{output_format}", filename=output_filename) | |
except Exception as e: | |
logger.error(f"Error during convert operation: {e}", exc_info=True) | |
if output_path and os.path.exists(output_path): cleanup_file(output_path) | |
if isinstance(e, HTTPException): raise e | |
else: raise HTTPException(status_code=500, detail=f"An unexpected error occurred during format conversion: {str(e)}") | |
# --- Effects & Advanced Editing --- | |
async def apply_fade( | |
background_tasks: BackgroundTasks, | |
file: UploadFile = File(..., description="Audio file to apply fade."), | |
fade_type: str = Form(..., description="Type of fade: 'in' or 'out'."), | |
duration_ms: int = Form(..., description="Duration of the fade in milliseconds.") | |
): | |
"""Applies a fade-in or fade-out effect to the audio.""" | |
if fade_type not in ['in', 'out']: | |
raise HTTPException(status_code=422, detail="Invalid fade_type. Must be 'in' or 'out'.") | |
if duration_ms <= 0: | |
raise HTTPException(status_code=422, detail="Fade duration must be positive.") | |
logger.info(f"Fade request: file='{file.filename}', type='{fade_type}', duration={duration_ms}ms") | |
input_path = await save_upload_file(file) | |
background_tasks.add_task(cleanup_file, input_path) | |
output_path = None | |
try: | |
audio = load_audio(input_path) | |
if duration_ms > len(audio): | |
logger.warning(f"Fade duration ({duration_ms}ms) exceeds audio length ({len(audio)}ms). Clamping.") | |
duration_ms = len(audio) | |
if fade_type == 'in': | |
faded_audio = audio.fade_in(duration_ms) | |
else: # fade_type == 'out' | |
faded_audio = audio.fade_out(duration_ms) | |
logger.info(f"Fade-{fade_type} applied successfully.") | |
original_format = os.path.splitext(file.filename)[1][1:].lower() or "mp3" | |
if not original_format or original_format == "tmp": original_format = "mp3" | |
output_path = export_audio(faded_audio, original_format) | |
background_tasks.add_task(cleanup_file, output_path) | |
return FileResponse(path=output_path, media_type=f"audio/{original_format}", filename=f"fade_{fade_type}_{duration_ms}ms_{file.filename}") | |
except Exception as e: | |
logger.error(f"Error during fade operation: {e}", exc_info=True) | |
if output_path and os.path.exists(output_path): cleanup_file(output_path) | |
if isinstance(e, HTTPException): raise e | |
else: raise HTTPException(status_code=500, detail=f"An unexpected error occurred during fade: {str(e)}") | |
async def reverse_audio( | |
background_tasks: BackgroundTasks, | |
file: UploadFile = File(..., description="Audio file to reverse.") | |
): | |
"""Reverses the audio playback.""" | |
logger.info(f"Reverse request: file='{file.filename}'") | |
input_path = await save_upload_file(file) | |
background_tasks.add_task(cleanup_file, input_path) | |
output_path = None | |
try: | |
audio = load_audio(input_path) | |
reversed_audio = audio.reverse() | |
logger.info("Audio reversed successfully.") | |
original_format = os.path.splitext(file.filename)[1][1:].lower() or "mp3" | |
if not original_format or original_format == "tmp": original_format = "mp3" | |
output_path = export_audio(reversed_audio, original_format) | |
background_tasks.add_task(cleanup_file, output_path) | |
return FileResponse(path=output_path, media_type=f"audio/{original_format}", filename=f"reversed_{file.filename}") | |
except Exception as e: | |
logger.error(f"Error during reverse operation: {e}", exc_info=True) | |
if output_path and os.path.exists(output_path): cleanup_file(output_path) | |
if isinstance(e, HTTPException): raise e | |
else: raise HTTPException(status_code=500, detail=f"An unexpected error occurred during reverse: {str(e)}") | |
async def normalize_audio( | |
background_tasks: BackgroundTasks, | |
file: UploadFile = File(..., description="Audio file to normalize."), | |
headroom_db: float = Form(0.1, description="Target peak amplitude headroom in dB below 0 dBFS. Default is 0.1dB.") | |
): | |
"""Normalizes the audio volume so the peak is at -headroom_db dBFS.""" | |
if headroom_db < 0: | |
raise HTTPException(status_code=422, detail="Headroom must be non-negative.") | |
logger.info(f"Normalize request: file='{file.filename}', headroom={headroom_db}dB") | |
input_path = await save_upload_file(file) | |
background_tasks.add_task(cleanup_file, input_path) | |
output_path = None | |
try: | |
audio = load_audio(input_path) | |
normalized_audio = audio.normalize(headroom=headroom_db) | |
logger.info(f"Audio normalized with {headroom_db}dB headroom.") | |
original_format = os.path.splitext(file.filename)[1][1:].lower() or "mp3" | |
if not original_format or original_format == "tmp": original_format = "mp3" | |
output_path = export_audio(normalized_audio, original_format) | |
background_tasks.add_task(cleanup_file, output_path) | |
return FileResponse(path=output_path, media_type=f"audio/{original_format}", filename=f"normalized_{headroom_db}dB_{file.filename}") | |
except Exception as e: | |
logger.error(f"Error during normalize operation: {e}", exc_info=True) | |
if output_path and os.path.exists(output_path): cleanup_file(output_path) | |
if isinstance(e, HTTPException): raise e | |
else: raise HTTPException(status_code=500, detail=f"An unexpected error occurred during normalization: {str(e)}") | |
async def overlay_audio( | |
background_tasks: BackgroundTasks, | |
file_base: UploadFile = File(..., description="The base audio track."), | |
file_overlay: UploadFile = File(..., description="The audio track to overlay."), | |
position_ms: int = Form(0, description="Position (in ms) in the base track where the overlay should start."), | |
# loop: bool = Form(False, description="Whether to loop the overlay track if it's shorter than needed."), # Pydub overlay doesn't directly support count-limited loop, only infinite or no loop | |
# times: int = Form(1, description="How many times to loop the overlay (if loop=True). 'inf' for infinite.") # See above note | |
gain_during_overlay: Optional[float] = Form(None, description="Volume change (dB) applied to the base track *during* the overlay. E.g., -6 to lower base volume.") | |
): | |
"""Overlays (mixes) one audio file onto another at a specific position.""" | |
if position_ms < 0: | |
raise HTTPException(status_code=422, detail="Overlay position cannot be negative.") | |
logger.info(f"Overlay request: base='{file_base.filename}', overlay='{file_overlay.filename}', position={position_ms}ms, gain_during={gain_during_overlay}dB") | |
input_path_base = None | |
input_path_overlay = None | |
output_path = None | |
try: | |
input_path_base = await save_upload_file(file_base) | |
background_tasks.add_task(cleanup_file, input_path_base) | |
input_path_overlay = await save_upload_file(file_overlay) | |
background_tasks.add_task(cleanup_file, input_path_overlay) | |
audio_base = load_audio(input_path_base) | |
audio_overlay = load_audio(input_path_overlay) | |
# Note: pydub's gain_during_overlay is relative to the segment's *current* volume, not absolute dBFS. | |
# It applies the gain change only to the portion of the base track that overlaps with the overlay. | |
overlaid_audio = audio_base.overlay( | |
audio_overlay, | |
position=position_ms, | |
gain_during_overlay=gain_during_overlay if gain_during_overlay is not None else 0 # pydub needs a numerical value | |
) | |
logger.info("Overlay applied successfully.") | |
original_format = os.path.splitext(file_base.filename)[1][1:].lower() or "mp3" | |
if not original_format or original_format == "tmp": original_format = "mp3" | |
output_path = export_audio(overlaid_audio, original_format) | |
background_tasks.add_task(cleanup_file, output_path) | |
base_name = os.path.splitext(file_base.filename)[0] | |
overlay_name = os.path.splitext(file_overlay.filename)[0] | |
return FileResponse(path=output_path, media_type=f"audio/{original_format}", filename=f"overlay_{base_name}_with_{overlay_name}.{original_format}") | |
except Exception as e: | |
logger.error(f"Error during overlay operation: {e}", exc_info=True) | |
if output_path and os.path.exists(output_path): cleanup_file(output_path) | |
# Input cleanups handled by background tasks | |
if isinstance(e, HTTPException): raise e | |
else: raise HTTPException(status_code=500, detail=f"An unexpected error occurred during overlay: {str(e)}") | |
async def add_silence( | |
background_tasks: BackgroundTasks, | |
file: UploadFile = File(..., description="Audio file to add silence to."), | |
duration_ms: int = Form(..., description="Duration of silence in milliseconds."), | |
position: str = Form("end", description="Position to add silence: 'start' or 'end'.") | |
): | |
"""Adds a period of silence to the beginning or end of an audio file.""" | |
if duration_ms <= 0: | |
raise HTTPException(status_code=422, detail="Silence duration must be positive.") | |
if position not in ['start', 'end']: | |
raise HTTPException(status_code=422, detail="Position must be 'start' or 'end'.") | |
logger.info(f"Add silence request: file='{file.filename}', duration={duration_ms}ms, position='{position}'") | |
input_path = await save_upload_file(file) | |
background_tasks.add_task(cleanup_file, input_path) | |
output_path = None | |
try: | |
audio = load_audio(input_path) | |
silence = AudioSegment.silent(duration=duration_ms, frame_rate=audio.frame_rate) # Match frame rate | |
if position == 'start': | |
modified_audio = silence + audio | |
else: # position == 'end' | |
modified_audio = audio + silence | |
logger.info(f"Silence added successfully to {position}.") | |
original_format = os.path.splitext(file.filename)[1][1:].lower() or "mp3" | |
if not original_format or original_format == "tmp": original_format = "mp3" | |
output_path = export_audio(modified_audio, original_format) | |
background_tasks.add_task(cleanup_file, output_path) | |
return FileResponse(path=output_path, media_type=f"audio/{original_format}", filename=f"silence_{position}_{duration_ms}ms_{file.filename}") | |
except Exception as e: | |
logger.error(f"Error during add silence operation: {e}", exc_info=True) | |
if output_path and os.path.exists(output_path): cleanup_file(output_path) | |
if isinstance(e, HTTPException): raise e | |
else: raise HTTPException(status_code=500, detail=f"An unexpected error occurred while adding silence: {str(e)}") | |
async def change_speed( | |
background_tasks: BackgroundTasks, | |
file: UploadFile = File(..., description="Audio file to change speed of."), | |
playback_speed: float = Form(..., gt=0, description="Playback speed multiplier (e.g., 1.5 for 50% faster, 0.8 for 20% slower). Note: Affects pitch.") | |
): | |
"""Changes the playback speed of the audio. WARNING: This basic method also changes the pitch.""" | |
if playback_speed <= 0: | |
raise HTTPException(status_code=422, detail="Playback speed must be positive.") | |
logger.info(f"Speed change request: file='{file.filename}', speed={playback_speed}x") | |
input_path = await save_upload_file(file) | |
background_tasks.add_task(cleanup_file, input_path) | |
output_path = None | |
try: | |
audio = load_audio(input_path) | |
# Pydub's speedup changes pitch. More complex methods exist for time-stretching without pitch shift (e.g., using rubberband via ffmpeg), but not directly in pydub. | |
# We need to manipulate the frame rate directly for speed changes *with* pitch shift | |
new_frame_rate = int(audio.frame_rate * playback_speed) | |
logger.info(f"Original frame rate: {audio.frame_rate}, New frame rate: {new_frame_rate}") | |
speed_changed_audio = audio._spawn(audio.raw_data, overrides={'frame_rate': new_frame_rate}) | |
# Recalculate duration based on speed change | |
new_duration = len(audio) / playback_speed | |
logger.info(f"Speed changed by {playback_speed}x. New duration approx {new_duration:.2f}ms (pitch also changed).") | |
original_format = os.path.splitext(file.filename)[1][1:].lower() or "mp3" | |
if not original_format or original_format == "tmp": original_format = "mp3" | |
# Set frame rate back to original for export compatibility? Or keep changed rate? | |
# Keeping the changed rate reflects the speed change. Some players might handle it; others might play at the wrong speed/pitch if they ignore the rate. | |
# Let's try keeping the modified rate first. | |
output_path = export_audio(speed_changed_audio.set_frame_rate(new_frame_rate), original_format) | |
background_tasks.add_task(cleanup_file, output_path) | |
return FileResponse(path=output_path, media_type=f"audio/{original_format}", filename=f"speed_{playback_speed}x_{file.filename}") | |
except Exception as e: | |
logger.error(f"Error during speed change operation: {e}", exc_info=True) | |
if output_path and os.path.exists(output_path): cleanup_file(output_path) | |
if isinstance(e, HTTPException): raise e | |
else: raise HTTPException(status_code=500, detail=f"An unexpected error occurred during speed change: {str(e)}") | |
# --- How to Run --- | |
# 1. Ensure FFmpeg is installed and in PATH. | |
# 2. Save as `app.py`, create/update `requirements.txt`. | |
# 3. `pip install -r requirements.txt` | |
# 4. `uvicorn app:app --reload` | |
# | |
# --- Example Usage (New Endpoints with curl) --- | |
# | |
# **Fade In:** (Fade in input.wav over 500ms) | |
# curl -X POST "http://127.0.0.1:8000/fade" \ | |
# -F "[email protected]" \ | |
# -F "fade_type=in" \ | |
# -F "duration_ms=500" \ | |
# --output faded_in_output.wav | |
# | |
# **Reverse:** (Reverse input.mp3) | |
# curl -X POST "http://127.0.0.1:8000/reverse" \ | |
# -F "[email protected]" \ | |
# --output reversed_output.mp3 | |
# | |
# **Normalize:** (Normalize input.ogg to peak at -0.5 dBFS) | |
# curl -X POST "http://127.0.0.1:8000/normalize" \ | |
# -F "[email protected]" \ | |
# -F "headroom_db=0.5" \ | |
# --output normalized_output.ogg | |
# | |
# **Overlay:** (Overlay effect.wav onto base.mp3 starting at 2000ms) | |
# curl -X POST "http://127.0.0.1:8000/overlay" \ | |
# -F "[email protected]" \ | |
# -F "[email protected]" \ | |
# -F "position_ms=2000" \ | |
# --output overlay_output.mp3 | |
# | |
# **Get Info:** (Get info about input.flac - returns JSON, not a file) | |
# curl -X POST "http://127.0.0.1:8000/info" \ | |
# -F "[email protected]" | |
# | |
# **Add Silence:** (Add 1500ms silence to the start of input.m4a) | |
# curl -X POST "http://127.0.0.1:8000/add-silence" \ | |
# -F "[email protected]" \ | |
# -F "duration_ms=1500" \ | |
# -F "position=start" \ | |
# --output silence_start_output.m4a | |
# | |
# **Change Speed:** (Make input.wav play 50% faster - pitch will increase) | |
# curl -X POST "http://127.0.0.1:8000/speedup" \ | |
# -F "[email protected]" \ | |
# -F "playback_speed=1.5" \ | |
# --output speed_1.5x_output.wav | |
# | |
# **Convert with Bitrate:** (Convert input.wav to MP3 at 192kbps) | |
# curl -X POST "http://127.0.0.1:8000/convert" \ | |
# -F "[email protected]" \ | |
# -F "output_format=mp3" \ | |
# -F "bitrate=192k" \ | |
# --output converted_192k_output.mp3 | |
# |