Spaces:
Sleeping
Sleeping
Update app.py
Browse files
app.py
CHANGED
@@ -1,441 +1,95 @@
|
|
1 |
import os
|
2 |
import uuid
|
3 |
-
import tempfile
|
4 |
-
import logging
|
5 |
import shutil
|
6 |
-
|
7 |
-
|
8 |
-
from fastapi import
|
9 |
-
from
|
10 |
from pydub import AudioSegment
|
11 |
-
from
|
12 |
|
13 |
-
#
|
14 |
-
|
15 |
-
try:
|
16 |
-
from spleeter.separator import Separator
|
17 |
-
from spleeter.utils import logging as spleeter_logging
|
18 |
-
spleeter_available = True
|
19 |
-
# Optional: Configure Spleeter logging level (e.g., ERROR to reduce noise)
|
20 |
-
# spleeter_logging.set_level(spleeter_logging.ERROR)
|
21 |
-
except ImportError:
|
22 |
-
spleeter_available = False
|
23 |
-
Separator = None # Define Separator as None if import fails
|
24 |
-
logging.warning("Spleeter library not found or failed to import.")
|
25 |
-
logging.warning("AI Vocal Removal endpoint (/ai/remove-vocals) will be disabled.")
|
26 |
-
logging.warning("Install spleeter: pip install spleeter")
|
27 |
|
28 |
-
|
29 |
-
|
30 |
-
TEMP_DIR = tempfile.gettempdir()
|
31 |
os.makedirs(TEMP_DIR, exist_ok=True)
|
32 |
|
33 |
-
#
|
34 |
-
logging.basicConfig(level=logging.INFO
|
35 |
-
logger = logging.getLogger(
|
36 |
-
|
37 |
-
# --- Global Spleeter Separator Initialization ---
|
38 |
-
# Load the model once on startup for better request performance.
|
39 |
-
# This increases startup time and initial memory usage significantly.
|
40 |
-
# Choose the model: 2stems (vocals/accompaniment), 4stems (v/drums/bass/other), 5stems (v/d/b/piano/other)
|
41 |
-
# Using 'spleeter:2stems' - downloads model on first use if not cached.
|
42 |
-
spleeter_separator: Optional[Separator] = None
|
43 |
-
if spleeter_available:
|
44 |
-
try:
|
45 |
-
logger.info("Initializing Spleeter Separator (Model: spleeter:2stems)... This may download model files.")
|
46 |
-
# MWF = Multi-channel Wiener Filtering (can improve quality but slower)
|
47 |
-
spleeter_separator = Separator('spleeter:2stems', mwf=False)
|
48 |
-
logger.info("Spleeter Separator initialized successfully.")
|
49 |
-
except Exception as e:
|
50 |
-
logger.error(f"FATAL: Failed to initialize Spleeter Separator: {e}", exc_info=True)
|
51 |
-
logger.error("AI Vocal Removal endpoint will likely fail.")
|
52 |
-
spleeter_separator = None # Ensure it's None if init failed
|
53 |
|
54 |
-
#
|
55 |
-
app
|
56 |
-
|
57 |
-
|
58 |
-
|
|
|
59 |
)
|
60 |
|
61 |
-
#
|
|
|
|
|
|
|
|
|
|
|
|
|
62 |
|
63 |
-
def
|
64 |
-
"""Safely remove a file or directory."""
|
65 |
try:
|
66 |
-
|
67 |
-
|
68 |
-
return
|
69 |
-
|
70 |
-
if os.path.isfile(path):
|
71 |
-
os.remove(path)
|
72 |
-
logger.info(f"Cleaned up temporary file: {path}")
|
73 |
-
elif os.path.isdir(path):
|
74 |
-
shutil.rmtree(path)
|
75 |
-
logger.info(f"Cleaned up temporary directory: {path}")
|
76 |
-
else:
|
77 |
-
logger.warning(f"Cleanup attempted on non-file/dir path: {path}")
|
78 |
-
|
79 |
except Exception as e:
|
80 |
-
logger.error(f"
|
81 |
|
82 |
-
|
83 |
-
|
84 |
-
|
85 |
-
|
86 |
-
request_temp_dir = os.path.join(TEMP_DIR, f"audio_api_upload_{uuid.uuid4().hex}")
|
87 |
-
os.makedirs(request_temp_dir, exist_ok=True)
|
88 |
-
temp_file_path = os.path.join(request_temp_dir, f"input{file_extension}")
|
89 |
|
90 |
-
|
91 |
-
|
92 |
-
while content := await upload_file.read(1024 * 1024):
|
93 |
-
buffer.write(content)
|
94 |
-
logger.info(f"Saved uploaded file '{upload_file.filename}' to temp path: {temp_file_path}")
|
95 |
-
return temp_file_path
|
96 |
-
except Exception as e:
|
97 |
-
logger.error(f"Failed to save uploaded file {upload_file.filename}: {e}", exc_info=True)
|
98 |
-
cleanup_path(request_temp_dir) # Cleanup directory if save fails
|
99 |
-
raise HTTPException(status_code=500, detail=f"Could not save uploaded file: {upload_file.filename}")
|
100 |
-
finally:
|
101 |
-
await upload_file.close()
|
102 |
-
|
103 |
-
def load_audio(file_path: str) -> AudioSegment:
|
104 |
-
"""Loads an audio file using pydub."""
|
105 |
-
# (Implementation unchanged)
|
106 |
-
try:
|
107 |
-
audio = AudioSegment.from_file(file_path)
|
108 |
-
logger.info(f"Loaded audio from: {file_path} (Duration: {len(audio)}ms)")
|
109 |
-
return audio
|
110 |
-
except CouldntDecodeError:
|
111 |
-
logger.warning(f"pydub couldn't decode file: {file_path}. Unsupported format or corrupted?")
|
112 |
-
raise HTTPException(status_code=415, detail=f"Unsupported audio format or corrupted file: {os.path.basename(file_path)}")
|
113 |
-
except FileNotFoundError:
|
114 |
-
logger.error(f"Audio file not found after saving: {file_path}")
|
115 |
-
raise HTTPException(status_code=500, detail="Internal error: Audio file disappeared.")
|
116 |
-
except Exception as e:
|
117 |
-
logger.error(f"Error loading audio file {file_path}: {e}", exc_info=True)
|
118 |
-
raise HTTPException(status_code=500, detail=f"Error processing audio file: {os.path.basename(file_path)}")
|
119 |
-
|
120 |
-
def export_audio(audio: AudioSegment, desired_format: str, base_filename: str = "edited_audio") -> str:
|
121 |
-
"""Exports an AudioSegment to a temporary file with specified format and returns the path."""
|
122 |
-
# (Slight modification to allow base filename)
|
123 |
-
output_filename = f"{base_filename}_{uuid.uuid4().hex}.{desired_format.lower()}"
|
124 |
-
# Place export in main TEMP_DIR, not necessarily the upload sub-dir
|
125 |
-
output_path = os.path.join(TEMP_DIR, output_filename)
|
126 |
-
try:
|
127 |
-
logger.info(f"Exporting audio to format '{desired_format}' at {output_path}")
|
128 |
-
# Add bitrate argument for common formats if desired (e.g., "192k" for mp3)
|
129 |
-
export_params = {}
|
130 |
-
if desired_format.lower() == "mp3":
|
131 |
-
export_params['bitrate'] = "192k" # Example bitrate
|
132 |
-
|
133 |
-
audio.export(output_path, format=desired_format.lower(), **export_params)
|
134 |
-
return output_path
|
135 |
-
except Exception as e:
|
136 |
-
logger.error(f"Error exporting audio to format {desired_format}: {e}", exc_info=True)
|
137 |
-
cleanup_path(output_path)
|
138 |
-
raise HTTPException(status_code=500, detail=f"Failed to export audio to format '{desired_format}'.")
|
139 |
-
|
140 |
-
|
141 |
-
# --- API Endpoints ---
|
142 |
-
|
143 |
-
@app.get("/", tags=["General"])
|
144 |
def read_root():
|
145 |
-
"""
|
146 |
-
features = ["Trim (/trim)", "Concatenate (/concat)", "Volume (/volume)", "Convert (/convert)"]
|
147 |
-
if spleeter_separator:
|
148 |
-
features.append("AI Vocal Removal (/ai/remove-vocals)")
|
149 |
-
else:
|
150 |
-
features.append("AI Vocal Removal (Disabled - Spleeter not available)")
|
151 |
-
return {
|
152 |
-
"message": "Welcome to the Advanced Audio Editor API.",
|
153 |
-
"available_features": features,
|
154 |
-
"important": "AI Vocal Removal is computationally intensive and may take significant time."
|
155 |
-
}
|
156 |
-
|
157 |
-
# --- Existing Endpoints (Trim, Concat, Volume, Convert) ---
|
158 |
-
# Minor changes: Use updated cleanup_path, ensure input cleanup uses the directory
|
159 |
-
# Use updated export_audio
|
160 |
-
|
161 |
-
@app.post("/trim", tags=["Editing - Pydub"])
|
162 |
-
async def trim_audio(
|
163 |
-
background_tasks: BackgroundTasks,
|
164 |
-
file: UploadFile = File(..., description="Audio file to trim."),
|
165 |
-
start_ms: int = Form(..., description="Start time in milliseconds."),
|
166 |
-
end_ms: int = Form(..., description="End time in milliseconds.")
|
167 |
-
):
|
168 |
-
"""Trims an audio file (uses pydub)."""
|
169 |
-
if start_ms < 0 or end_ms <= start_ms:
|
170 |
-
raise HTTPException(status_code=422, detail="Invalid start/end times.")
|
171 |
-
|
172 |
-
logger.info(f"Trim request: file='{file.filename}', start={start_ms}ms, end={end_ms}ms")
|
173 |
-
input_path = await save_upload_file(file)
|
174 |
-
input_dir = os.path.dirname(input_path)
|
175 |
-
background_tasks.add_task(cleanup_path, input_dir) # Schedule input dir cleanup
|
176 |
-
|
177 |
-
output_path = None # Define output_path before try block
|
178 |
-
try:
|
179 |
-
audio = load_audio(input_path)
|
180 |
-
trimmed_audio = audio[start_ms:end_ms]
|
181 |
-
logger.info(f"Audio trimmed to {len(trimmed_audio)}ms")
|
182 |
-
|
183 |
-
original_format = os.path.splitext(file.filename)[1][1:].lower() or "mp3"
|
184 |
-
if original_format in ["tmp", ""]: original_format = "mp3"
|
185 |
-
|
186 |
-
output_path = export_audio(trimmed_audio, original_format, base_filename=f"trimmed_{os.path.splitext(file.filename)[0]}")
|
187 |
-
background_tasks.add_task(cleanup_path, output_path) # Schedule output cleanup
|
188 |
-
|
189 |
-
return FileResponse(
|
190 |
-
path=output_path,
|
191 |
-
media_type=f"audio/{original_format}",
|
192 |
-
filename=f"trimmed_{file.filename}"
|
193 |
-
)
|
194 |
-
except Exception as e:
|
195 |
-
logger.error(f"Error during trim operation: {e}", exc_info=True)
|
196 |
-
# Ensure immediate cleanup on error if possible
|
197 |
-
if output_path: cleanup_path(output_path)
|
198 |
-
# Input dir cleanup is handled by background task unless error is critical before scheduling
|
199 |
-
if isinstance(e, HTTPException): raise e
|
200 |
-
else: raise HTTPException(status_code=500, detail=f"An unexpected error occurred during trimming: {str(e)}")
|
201 |
-
|
202 |
-
|
203 |
-
@app.post("/concat", tags=["Editing - Pydub"])
|
204 |
-
async def concatenate_audio(
|
205 |
-
background_tasks: BackgroundTasks,
|
206 |
-
files: List[UploadFile] = File(..., description="Two or more audio files to join in order."),
|
207 |
-
output_format: str = Form("mp3", description="Desired output format (e.g., 'mp3', 'wav', 'ogg').")
|
208 |
-
):
|
209 |
-
"""Concatenates two or more audio files sequentially (uses pydub)."""
|
210 |
-
if len(files) < 2:
|
211 |
-
raise HTTPException(status_code=422, detail="Please upload at least two files to concatenate.")
|
212 |
|
213 |
-
|
214 |
-
|
215 |
-
|
216 |
-
output_path = None
|
217 |
-
|
218 |
-
try:
|
219 |
-
for file in files:
|
220 |
-
input_path = await save_upload_file(file)
|
221 |
-
input_dir = os.path.dirname(input_path)
|
222 |
-
input_dirs.append(input_dir)
|
223 |
-
background_tasks.add_task(cleanup_path, input_dir)
|
224 |
-
audio = load_audio(input_path)
|
225 |
-
loaded_audios.append(audio)
|
226 |
-
|
227 |
-
if not loaded_audios: raise ValueError("No audio segments loaded.")
|
228 |
-
|
229 |
-
combined_audio = loaded_audios[0]
|
230 |
-
for i in range(1, len(loaded_audios)):
|
231 |
-
combined_audio += loaded_audios[i]
|
232 |
-
logger.info(f"Concatenated audio length: {len(combined_audio)}ms")
|
233 |
-
|
234 |
-
first_filename_base = os.path.splitext(files[0].filename)[0]
|
235 |
-
output_base = f"concat_{first_filename_base}_and_{len(files)-1}_others"
|
236 |
-
output_path = export_audio(combined_audio, output_format, base_filename=output_base)
|
237 |
-
background_tasks.add_task(cleanup_path, output_path)
|
238 |
-
|
239 |
-
return FileResponse(
|
240 |
-
path=output_path,
|
241 |
-
media_type=f"audio/{output_format}",
|
242 |
-
filename=f"{output_base}.{output_format}"
|
243 |
-
)
|
244 |
-
except Exception as e:
|
245 |
-
logger.error(f"Error during concat operation: {e}", exc_info=True)
|
246 |
-
if output_path: cleanup_path(output_path)
|
247 |
-
# Input dirs cleanup handled by background tasks
|
248 |
-
if isinstance(e, HTTPException): raise e
|
249 |
-
else: raise HTTPException(status_code=500, detail=f"An unexpected error occurred during concatenation: {str(e)}")
|
250 |
-
|
251 |
-
|
252 |
-
@app.post("/volume", tags=["Editing - Pydub"])
|
253 |
-
async def change_volume(
|
254 |
-
background_tasks: BackgroundTasks,
|
255 |
-
file: UploadFile = File(..., description="Audio file to adjust volume for."),
|
256 |
-
change_db: float = Form(..., description="Volume change in decibels (dB). +/- values.")
|
257 |
-
):
|
258 |
-
"""Adjusts audio volume (uses pydub)."""
|
259 |
-
logger.info(f"Volume request: file='{file.filename}', change_db={change_db}dB")
|
260 |
-
input_path = await save_upload_file(file)
|
261 |
-
input_dir = os.path.dirname(input_path)
|
262 |
-
background_tasks.add_task(cleanup_path, input_dir)
|
263 |
-
output_path = None
|
264 |
-
|
265 |
-
try:
|
266 |
-
audio = load_audio(input_path)
|
267 |
-
adjusted_audio = audio + change_db
|
268 |
-
logger.info(f"Volume adjusted by {change_db}dB.")
|
269 |
-
|
270 |
-
original_format = os.path.splitext(file.filename)[1][1:].lower() or "mp3"
|
271 |
-
if original_format in ["tmp", ""]: original_format = "mp3"
|
272 |
-
|
273 |
-
output_base = f"volume_{change_db}dB_{os.path.splitext(file.filename)[0]}"
|
274 |
-
output_path = export_audio(adjusted_audio, original_format, base_filename=output_base)
|
275 |
-
background_tasks.add_task(cleanup_path, output_path)
|
276 |
-
|
277 |
-
return FileResponse(
|
278 |
-
path=output_path,
|
279 |
-
media_type=f"audio/{original_format}",
|
280 |
-
filename=f"{output_base}.{original_format}" # Use correct extension
|
281 |
-
)
|
282 |
-
except Exception as e:
|
283 |
-
logger.error(f"Error during volume operation: {e}", exc_info=True)
|
284 |
-
if output_path: cleanup_path(output_path)
|
285 |
-
if isinstance(e, HTTPException): raise e
|
286 |
-
else: raise HTTPException(status_code=500, detail=f"An unexpected error occurred during volume adjustment: {str(e)}")
|
287 |
-
|
288 |
-
|
289 |
-
@app.post("/convert", tags=["Editing - Pydub"])
|
290 |
-
async def convert_format(
|
291 |
background_tasks: BackgroundTasks,
|
292 |
-
file: UploadFile = File(..., description="Audio file
|
293 |
-
output_format: str = Form(
|
294 |
):
|
295 |
-
"
|
296 |
-
allowed_formats = {'mp3', 'wav', 'ogg', 'flac', 'aac', 'm4a'}
|
297 |
-
safe_output_format = output_format.lower()
|
298 |
-
if safe_output_format not in allowed_formats:
|
299 |
-
raise HTTPException(status_code=422, detail=f"Invalid output format. Allowed: {', '.join(allowed_formats)}")
|
300 |
|
301 |
-
|
302 |
-
input_path
|
303 |
-
input_dir = os.path.dirname(input_path)
|
304 |
-
background_tasks.add_task(cleanup_path, input_dir)
|
305 |
-
output_path = None
|
306 |
|
307 |
try:
|
308 |
-
|
309 |
-
|
310 |
-
|
311 |
|
312 |
-
|
313 |
-
|
|
|
314 |
|
315 |
-
|
316 |
-
|
317 |
-
|
318 |
-
filename=f"{output_base}.{safe_output_format}"
|
319 |
-
)
|
320 |
-
except Exception as e:
|
321 |
-
logger.error(f"Error during convert operation: {e}", exc_info=True)
|
322 |
-
if output_path: cleanup_path(output_path)
|
323 |
-
if isinstance(e, HTTPException): raise e
|
324 |
-
else: raise HTTPException(status_code=500, detail=f"An unexpected error occurred during format conversion: {str(e)}")
|
325 |
-
|
326 |
-
|
327 |
-
# --- AI Vocal Removal Endpoint ---
|
328 |
-
|
329 |
-
@app.post("/ai/remove-vocals", tags=["Editing - AI"])
|
330 |
-
async def ai_remove_vocals(
|
331 |
-
background_tasks: BackgroundTasks,
|
332 |
-
file: UploadFile = File(..., description="Audio file containing mixed vocals and accompaniment."),
|
333 |
-
stem_to_return: Literal['accompaniment', 'vocals'] = Form("accompaniment", description="Which stem to return: 'accompaniment' (default) or 'vocals'."),
|
334 |
-
output_format: str = Form("wav", description="Output format for the separated stem (e.g., 'wav', 'mp3'). WAV recommended for quality.")
|
335 |
-
):
|
336 |
-
"""
|
337 |
-
Separates vocals from accompaniment using Spleeter (AI model).
|
338 |
-
NOTE: This is computationally intensive and can take significant time.
|
339 |
-
"""
|
340 |
-
if not spleeter_separator:
|
341 |
-
logger.warning("Vocal removal endpoint called, but Spleeter is not available.")
|
342 |
-
raise HTTPException(status_code=503, detail="AI Vocal Removal service is unavailable (Spleeter not loaded).")
|
343 |
-
|
344 |
-
logger.info(f"AI Vocal Removal request: file='{file.filename}', return='{stem_to_return}', format='{output_format}'")
|
345 |
-
|
346 |
-
input_path = await save_upload_file(file)
|
347 |
-
input_dir = os.path.dirname(input_path) # Directory where input was saved
|
348 |
-
spleeter_output_dir = os.path.join(TEMP_DIR, f"spleeter_out_{uuid.uuid4().hex}") # Unique output dir for Spleeter
|
349 |
-
final_output_path = None # Path to the file that will be returned
|
350 |
-
|
351 |
-
# Schedule cleanup for both input dir and potential Spleeter output dir
|
352 |
-
background_tasks.add_task(cleanup_path, input_dir)
|
353 |
-
background_tasks.add_task(cleanup_path, spleeter_output_dir) # This will be created by Spleeter
|
354 |
|
355 |
-
|
356 |
-
|
357 |
-
# Spleeter separates into the specified directory, creating <filename>/vocals.wav and <filename>/accompaniment.wav
|
358 |
-
# We pass the input *file* path and the desired *output directory* path.
|
359 |
-
spleeter_separator.separate_to_file(
|
360 |
-
input_path,
|
361 |
-
spleeter_output_dir,
|
362 |
-
codec='wav' # Spleeter defaults to WAV, ensuring consistent intermediate format
|
363 |
-
)
|
364 |
-
logger.info(f"Spleeter separation completed.")
|
365 |
-
|
366 |
-
# Spleeter creates a subdirectory named after the input file (without extension)
|
367 |
-
input_filename_base = os.path.splitext(os.path.basename(input_path))[0]
|
368 |
-
stem_output_folder = os.path.join(spleeter_output_dir, input_filename_base)
|
369 |
-
|
370 |
-
# Determine the path to the requested stem file (always WAV from Spleeter)
|
371 |
-
target_stem_filename = f"{stem_to_return}.wav"
|
372 |
-
raw_stem_path = os.path.join(stem_output_folder, target_stem_filename)
|
373 |
-
|
374 |
-
if not os.path.exists(raw_stem_path):
|
375 |
-
logger.error(f"Spleeter output stem not found: {raw_stem_path}")
|
376 |
-
raise HTTPException(status_code=500, detail=f"AI separation failed: Could not find the '{stem_to_return}' stem.")
|
377 |
-
|
378 |
-
# --- Optional Conversion ---
|
379 |
-
safe_output_format = output_format.lower()
|
380 |
-
if safe_output_format == "wav":
|
381 |
-
# No conversion needed, return the direct Spleeter output
|
382 |
-
# We need to move/copy it out of the spleeter dir *or* just return it directly
|
383 |
-
# For simplicity and better cleanup, let's return it directly.
|
384 |
-
# BUT FileResponse needs the final path, and background task cleans the whole spleeter_output_dir.
|
385 |
-
# SAFER: Copy the desired file out to the main TEMP_DIR before returning.
|
386 |
-
final_output_path = os.path.join(TEMP_DIR, f"{input_filename_base}_{stem_to_return}_{uuid.uuid4().hex}.wav")
|
387 |
-
shutil.copyfile(raw_stem_path, final_output_path)
|
388 |
-
logger.info(f"Copied requested WAV stem to final output path: {final_output_path}")
|
389 |
-
background_tasks.add_task(cleanup_path, final_output_path) # Schedule cleanup for the copy
|
390 |
|
391 |
-
|
392 |
-
|
393 |
-
|
394 |
-
|
395 |
-
output_base = f"{input_filename_base}_{stem_to_return}"
|
396 |
-
final_output_path = export_audio(audio_stem, safe_output_format, base_filename=output_base)
|
397 |
-
logger.info(f"Converted stem saved to: {final_output_path}")
|
398 |
-
background_tasks.add_task(cleanup_path, final_output_path) # Schedule cleanup for converted file
|
399 |
-
|
400 |
-
# --- Return Result ---
|
401 |
-
if not final_output_path or not os.path.exists(final_output_path):
|
402 |
-
raise HTTPException(status_code=500, detail="Failed to prepare final output file after separation.")
|
403 |
-
|
404 |
-
return FileResponse(
|
405 |
-
path=final_output_path,
|
406 |
-
media_type=f"audio/{safe_output_format}", # Use the final format's media type
|
407 |
-
filename=os.path.basename(final_output_path) # Use the actual generated filename
|
408 |
-
)
|
409 |
|
|
|
|
|
410 |
except Exception as e:
|
411 |
-
logger.error(f"Error
|
412 |
-
|
413 |
-
# Input/Spleeter dir cleanup handled by background tasks
|
414 |
-
if isinstance(e, HTTPException): raise e
|
415 |
-
else: raise HTTPException(status_code=500, detail=f"An unexpected error occurred during AI processing: {str(e)}")
|
416 |
|
417 |
-
|
418 |
-
|
419 |
-
# 1. Make sure FFmpeg is installed and accessible in your PATH.
|
420 |
-
# 2. Save this code as `app.py`.
|
421 |
-
# 3. Create `requirements.txt` (as shown above).
|
422 |
-
# 4. Install dependencies: `pip install -r requirements.txt` (THIS MAY TAKE A WHILE!)
|
423 |
-
# 5. Run the FastAPI server: `uvicorn app:app --reload`
|
424 |
-
#
|
425 |
-
# --- Example Usage (using curl) ---
|
426 |
-
#
|
427 |
-
# **AI Remove Vocals (Get Accompaniment as WAV):**
|
428 |
-
# curl -X POST "http://127.0.0.1:8000/ai/remove-vocals" \
|
429 |
-
# -F "file=@my_song_mix.mp3" \
|
430 |
-
# -F "stem_to_return=accompaniment" \
|
431 |
-
# -F "output_format=wav" \
|
432 |
-
# --output accompaniment_output.wav
|
433 |
-
#
|
434 |
-
# **AI Remove Vocals (Get Vocals as MP3):**
|
435 |
-
# curl -X POST "http://127.0.0.1:8000/ai/remove-vocals" \
|
436 |
-
# -F "file=@another_track.wav" \
|
437 |
-
# -F "stem_to_return=vocals" \
|
438 |
-
# -F "output_format=mp3" \
|
439 |
-
# --output vocals_only_output.mp3
|
440 |
-
#
|
441 |
-
# (Other examples for /trim, /concat, /volume, /convert remain the same as before)
|
|
|
1 |
import os
|
2 |
import uuid
|
|
|
|
|
3 |
import shutil
|
4 |
+
import logging
|
5 |
+
from fastapi import FastAPI, UploadFile, File, Form, HTTPException, BackgroundTasks
|
6 |
+
from fastapi.responses import FileResponse
|
7 |
+
from spleeter.separator import Separator
|
8 |
from pydub import AudioSegment
|
9 |
+
from starlette.middleware.cors import CORSMiddleware
|
10 |
|
11 |
+
# Setup
|
12 |
+
app = FastAPI(title="AI Audio Editor API", description="FastAPI audio editor with vocal remover", version="1.0")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
13 |
|
14 |
+
# Directories
|
15 |
+
TEMP_DIR = "temp"
|
|
|
16 |
os.makedirs(TEMP_DIR, exist_ok=True)
|
17 |
|
18 |
+
# Logger
|
19 |
+
logging.basicConfig(level=logging.INFO)
|
20 |
+
logger = logging.getLogger("audio_editor")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
21 |
|
22 |
+
# CORS (optional for web frontend support)
|
23 |
+
app.add_middleware(
|
24 |
+
CORSMiddleware,
|
25 |
+
allow_origins=["*"],
|
26 |
+
allow_methods=["*"],
|
27 |
+
allow_headers=["*"],
|
28 |
)
|
29 |
|
30 |
+
# Helper functions
|
31 |
+
def save_upload_file(upload_file: UploadFile) -> str:
|
32 |
+
extension = os.path.splitext(upload_file.filename)[-1]
|
33 |
+
temp_path = os.path.join(TEMP_DIR, f"{uuid.uuid4().hex}{extension}")
|
34 |
+
with open(temp_path, "wb") as buffer:
|
35 |
+
shutil.copyfileobj(upload_file.file, buffer)
|
36 |
+
return temp_path
|
37 |
|
38 |
+
def cleanup_file(path: str):
|
|
|
39 |
try:
|
40 |
+
os.remove(path)
|
41 |
+
logger.info(f"Deleted temp file: {path}")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
42 |
except Exception as e:
|
43 |
+
logger.error(f"Cleanup failed: {e}")
|
44 |
|
45 |
+
def export_audio(audio: AudioSegment, output_format: str = "mp3") -> str:
|
46 |
+
output_path = os.path.join(TEMP_DIR, f"{uuid.uuid4().hex}.{output_format}")
|
47 |
+
audio.export(output_path, format=output_format)
|
48 |
+
return output_path
|
|
|
|
|
|
|
49 |
|
50 |
+
# Root endpoint
|
51 |
+
@app.get("/", tags=["Root"])
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
52 |
def read_root():
|
53 |
+
return {"message": "Welcome to the AI Audio Editor API!"}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
54 |
|
55 |
+
# AI vocal remover endpoint
|
56 |
+
@app.post("/remove_vocals", tags=["AI"])
|
57 |
+
async def remove_vocals(
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
58 |
background_tasks: BackgroundTasks,
|
59 |
+
file: UploadFile = File(..., description="Audio file for AI vocal removal."),
|
60 |
+
output_format: str = Form("mp3", description="Output format (mp3, wav, etc.)")
|
61 |
):
|
62 |
+
logger.info(f"Processing file for vocal removal: {file.filename}")
|
|
|
|
|
|
|
|
|
63 |
|
64 |
+
input_path = save_upload_file(file)
|
65 |
+
background_tasks.add_task(cleanup_file, input_path)
|
|
|
|
|
|
|
66 |
|
67 |
try:
|
68 |
+
# Output folder for spleeter
|
69 |
+
out_dir = os.path.join(TEMP_DIR, uuid.uuid4().hex)
|
70 |
+
os.makedirs(out_dir, exist_ok=True)
|
71 |
|
72 |
+
# Use spleeter
|
73 |
+
separator = Separator("spleeter:2stems")
|
74 |
+
separator.separate_to_file(input_path, out_dir)
|
75 |
|
76 |
+
# Locate instrumental file
|
77 |
+
base_name = os.path.splitext(os.path.basename(input_path))[0]
|
78 |
+
instrumental_path = os.path.join(out_dir, base_name, "accompaniment.wav")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
79 |
|
80 |
+
if not os.path.exists(instrumental_path):
|
81 |
+
raise FileNotFoundError("Instrumental not generated.")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
82 |
|
83 |
+
# Convert to desired format
|
84 |
+
instrumental_audio = AudioSegment.from_file(instrumental_path)
|
85 |
+
output_path = export_audio(instrumental_audio, output_format)
|
86 |
+
background_tasks.add_task(cleanup_file, output_path)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
87 |
|
88 |
+
return FileResponse(path=output_path, filename=f"instrumental_{file.filename}", media_type=f"audio/{output_format}")
|
89 |
+
|
90 |
except Exception as e:
|
91 |
+
logger.error(f"Error: {e}", exc_info=True)
|
92 |
+
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
|
93 |
|
94 |
+
finally:
|
95 |
+
shutil.rmtree(out_dir, ignore_errors=True)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|