vid-test / app.py
sdafd's picture
Create app.py
5da8662 verified
raw
history blame
7.43 kB
import os
import subprocess
import shlex
import tempfile
import uuid
from typing import List
from fastapi import FastAPI, File, UploadFile, Form, HTTPException
from fastapi.responses import FileResponse, JSONResponse
import logging
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# --- Define the core ffmpeg function (adapted for API context) ---
# Note: We pass paths directly now, as they are managed within the API call.
# We also return stderr for debugging purposes in case of failure.
def run_ffmpeg_concatenation(input_files: List[str], output_file: str, ffmpeg_executable: str = "ffmpeg"):
"""
Runs the FFmpeg concatenation process.
Returns (success: bool, message: str, stderr: str)
"""
if not input_files:
return False, "Error: Cannot concatenate, file list is empty.", ""
logger.info(f"Starting re-encode concatenation of {len(input_files)} videos into {output_file}...")
logger.info("This will re-encode the video and may take some time.")
input_args = []
for video_file in input_files:
input_args.extend(['-i', video_file])
filter_parts = []
for i in range(len(input_files)):
filter_parts.append(f"[{i}:v]")
filter_parts.append(f"[{i}:a]")
filter_string = "".join(filter_parts) + f"concat=n={len(input_files)}:v=1:a=1[outv][outa]"
command = [
ffmpeg_executable,
*input_args,
'-filter_complex', filter_string,
'-map', '[outv]',
'-map', '[outa]',
'-vsync', 'vfr',
'-movflags', '+faststart',
'-y', # Overwrite output without asking
output_file
]
logger.info("\nRunning FFmpeg command:")
try:
cmd_str = shlex.join(command)
logger.info(cmd_str)
except AttributeError: # shlex.join is Python 3.8+
cmd_str = ' '.join(f'"{arg}"' if ' ' in arg else arg for arg in command)
logger.info(cmd_str)
logger.info("\n--- FFmpeg Execution Start ---")
stderr_output = ""
try:
# Using Popen for potentially better handling of large outputs if needed,
# but run is simpler here. Ensure encoding/errors are handled.
process = subprocess.run(
command,
check=True, # Raises CalledProcessError on non-zero exit
capture_output=True,
text=True,
encoding='utf-8',
errors='replace' # Handle potential encoding errors in ffmpeg output
)
stderr_output = process.stderr
logger.info("--- FFmpeg Execution End ---")
# logger.info("\nSTDOUT:") # Often empty for concat
# logger.info(process.stdout)
logger.info("\nSTDERR (Progress/Info):\n" + stderr_output)
msg = f"Successfully concatenated videos into {os.path.basename(output_file)}"
logger.info(msg)
return True, msg, stderr_output # Return success, message, and stderr
except subprocess.CalledProcessError as e:
stderr_output = e.stderr + "\n" + e.stdout # Combine stderr/stdout on error
logger.error("--- FFmpeg Execution End ---")
logger.error(f"\nError during FFmpeg execution (return code {e.returncode}):")
logger.error("\nSTDERR/STDOUT:\n" + stderr_output)
return False, f"FFmpeg failed with return code {e.returncode}.", stderr_output
except FileNotFoundError:
err_msg = f"Error: '{ffmpeg_executable}' command not found on the server."
logger.error(err_msg)
return False, err_msg, ""
except Exception as e:
err_msg = f"An unexpected server error occurred during ffmpeg processing: {e}"
logger.exception(err_msg) # Log full traceback
return False, err_msg, stderr_output # Include any captured stderr
# --- FastAPI App Definition ---
app = FastAPI()
@app.post("/concatenate/")
async def concatenate_videos_api(
files: List[UploadFile] = File(..., description="List of video files to concatenate"),
output_filename: str = Form("concatenated_video.mp4", description="Desired output filename (e.g., final.mp4)")
):
"""
API endpoint to concatenate uploaded video files using FFmpeg re-encoding.
"""
if not files:
raise HTTPException(status_code=400, detail="No files were uploaded.")
if not output_filename.lower().endswith(('.mp4', '.mov', '.avi', '.mkv')): # Basic validation
raise HTTPException(status_code=400, detail="Output filename must have a common video extension (mp4, mov, avi, mkv).")
logger.info(f"Received {len(files)} files for concatenation. Output name: {output_filename}")
# Create a temporary directory to store uploaded files and the output
with tempfile.TemporaryDirectory() as temp_dir:
input_file_paths = []
original_filenames = []
try:
# Save uploaded files to the temporary directory
for uploaded_file in files:
original_filenames.append(uploaded_file.filename)
# Create a unique-ish temp filename to avoid clashes, keep extension
_, ext = os.path.splitext(uploaded_file.filename)
temp_input_path = os.path.join(temp_dir, f"{uuid.uuid4()}{ext}")
logger.info(f"Saving uploaded file '{uploaded_file.filename}' to '{temp_input_path}'")
with open(temp_input_path, "wb") as buffer:
buffer.write(await uploaded_file.read())
input_file_paths.append(temp_input_path)
await uploaded_file.close() # Close the file handle
logger.info(f"Saved files: {original_filenames}")
# Define the output path within the temporary directory
temp_output_path = os.path.join(temp_dir, output_filename)
# Run the FFmpeg concatenation logic
success, message, ffmpeg_stderr = run_ffmpeg_concatenation(input_file_paths, temp_output_path)
if success:
logger.info(f"Concatenation successful. Preparing file response for: {temp_output_path}")
# Return the concatenated file
return FileResponse(
path=temp_output_path,
filename=output_filename, # Send back with the desired name
media_type='video/mp4' # Adjust if needed based on output_filename extension
)
else:
logger.error(f"Concatenation failed: {message}")
# Return a JSON error response including ffmpeg's stderr if available
return JSONResponse(
status_code=500,
content={
"detail": f"Video concatenation failed: {message}",
"ffmpeg_stderr": ffmpeg_stderr,
"input_files": original_filenames
}
)
except Exception as e:
logger.exception("An unexpected error occurred in the API endpoint.")
raise HTTPException(status_code=500, detail=f"Internal server error: {e}")
@app.get("/")
async def read_root():
return {"message": "Video Concatenation API is running. Use the /concatenate/ endpoint (POST) to process videos."}
# Example of how to run locally (Hugging Face Spaces handles this automatically)
# if __name__ == "__main__":
# import uvicorn
# uvicorn.run(app, host="0.0.0.0", port=8000)