from fastapi import APIRouter, HTTPException, BackgroundTasks import tempfile import os import logging import asyncio from .Schema import YouTubeUploadTask # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) router = APIRouter(prefix="/youtube", tags=["youtube"]) async def download_file(url: str) -> str: """Download file from URL using aria2c.""" try: # Create temp directory for download temp_dir = tempfile.mkdtemp() temp_path = os.path.join( temp_dir, "video" ) # aria2c will add extension automatically # Build aria2c command command = [ "aria2c", "--allow-overwrite=true", "--auto-file-renaming=false", "--max-connection-per-server=16", "--split=16", "--dir", temp_dir, "--out", "video", url, ] logger.info(f"Starting download with aria2c: {url}") process = await asyncio.create_subprocess_exec( *command, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) stdout, stderr = await process.communicate() if process.returncode != 0: raise Exception(f"Download failed: {stderr.decode()}") # Find the downloaded file (aria2c adds extension automatically) downloaded_files = os.listdir(temp_dir) if not downloaded_files: raise Exception("No file downloaded") downloaded_file = os.path.join(temp_dir, downloaded_files[0]) logger.info(f"Download completed: {downloaded_file}") return downloaded_file except Exception as e: logger.error(f"Error downloading file: {str(e)}") raise HTTPException(status_code=500, detail=f"File download failed: {str(e)}") async def upload_video_background(task: YouTubeUploadTask): """Background task to handle video upload.""" temp_dir = None try: # Convert HttpUrl to string url = str(task.filename) logger.info(f"Starting download for video: {url}") downloaded_file = await download_file(url) temp_dir = os.path.dirname(downloaded_file) logger.info(f"Download complete. Saved to: {downloaded_file}") # Build the command command = [ "/srv/youtube/youtubeuploader", "-filename", downloaded_file, "-title", task.title, "-description", task.description, "-categoryId", task.category_id, "-privacy", task.privacy, "-tags", task.tags, ] if task.thumbnail: command.extend(["-thumbnail", task.thumbnail]) logger.info("Executing upload command") process = await asyncio.create_subprocess_exec( *command, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) stdout, stderr = await process.communicate() if process.returncode != 0: logger.error(f"Upload failed: {stderr.decode()}") raise Exception(f"Upload failed: {stderr.decode()}") logger.info("Upload completed successfully") logger.debug(f"Upload output: {stdout.decode()}") except Exception as e: logger.error(f"Error in upload process: {str(e)}") raise finally: # Clean up temp directory and its contents if temp_dir and os.path.exists(temp_dir): try: for file in os.listdir(temp_dir): os.remove(os.path.join(temp_dir, file)) os.rmdir(temp_dir) logger.info(f"Cleaned up temporary directory: {temp_dir}") except Exception as e: logger.error(f"Failed to clean up temp directory: {str(e)}") @router.post("/upload") async def upload_video_to_youtube( task: YouTubeUploadTask, background_tasks: BackgroundTasks ): """ Endpoint to handle YouTube video upload requests. The actual upload is performed as a background task. """ try: background_tasks.add_task(upload_video_background, task) return {"message": "Upload task started", "status": "processing"} except Exception as e: logger.error(f"Error initiating upload task: {str(e)}") raise HTTPException(status_code=500, detail=str(e))