chipling's picture
Upload 26 files
65d3b67 verified
from fastapi import FastAPI, HTTPException, Request, Query, Response
from fastapi.responses import StreamingResponse, HTMLResponse
from fastapi.templating import Jinja2Templates
from pytubefix import YouTube
from pytubefix.cli import on_progress
import os
import logging
import httpx
import hashlib
from functools import lru_cache
from encrypt import encrypt_video_id, decrypt_video_id
app = FastAPI()
CHUNK_SIZE = 1024 * 1024 # 1MB
logger = logging.getLogger(__name__)
def open_file_range(file_path: str, start: int, end: int):
with open(file_path, "rb") as f:
f.seek(start)
bytes_to_read = end - start + 1
while bytes_to_read > 0:
chunk = f.read(min(CHUNK_SIZE, bytes_to_read))
if not chunk:
break
bytes_to_read -= len(chunk)
yield chunk
def generate_etag(file_path):
hasher = hashlib.md5()
with open(file_path, 'rb') as f:
while chunk := f.read(8192):
hasher.update(chunk)
return hasher.hexdigest()
@lru_cache(maxsize=128)
def get_video_metadata(video_id: str):
yt = YouTube(f"https://www.youtube.com/watch?v={video_id}", client='WEB_EMBED')
if yt.length >= 600:
return {
"title": yt.title,
"description": yt.description,
"author": yt.author,
"duration": yt.length,
"views": yt.views,
"date": yt.publish_date,
"video_url": yt.streams.get_highest_resolution().url,
"audio_url": yt.streams.get_audio_only().url,
}
else:
return {
"title": yt.title,
"description": yt.description,
"author": yt.author,
"duration": yt.length,
"views": yt.views,
"date": yt.publish_date,
}
@app.get("/api/video/{video_id}")
def get_video_info(video_id: str, request: Request):
try:
metadata = get_video_metadata(video_id)
encrypted_video_id = encrypt_video_id(video_id)
BASE_URL = request.base_url
if metadata['duration'] >= 600:
return {**metadata}
else:
return {
**metadata,
"video_url": f"{BASE_URL}video/{encrypted_video_id}",
"audio_url": f"{BASE_URL}audio/{encrypted_video_id}"
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error: {str(e)}")
@app.get("/video/{video_id}")
async def stream_video(video_id: str, request: Request, download: bool = Query(False)):
try:
decrypted_video_id = decrypt_video_id(video_id)
yt = YouTube(f"https://www.youtube.com/watch?v={decrypted_video_id}")
stream = yt.streams.get_highest_resolution()
url = stream.url
headers = {}
if range_header := request.headers.get("range"):
headers["Range"] = range_header
async def proxy_stream():
try:
async with httpx.AsyncClient() as client:
async with client.stream("GET", url, headers=headers, timeout=60) as response:
if response.status_code not in (200, 206):
logger.error(f"Failed to stream: {response.status_code}")
return
async for chunk in response.aiter_bytes(CHUNK_SIZE):
yield chunk
except Exception as e:
logger.error(f"Streaming error: {str(e)}")
return
response_headers = {
"Accept-Ranges": "bytes",
"Cache-Control": "public, max-age=3600"
}
# Handle filename safely
title = yt.title.encode("utf-8", "ignore").decode("utf-8")
if download:
response_headers["Content-Disposition"] = f'attachment; filename="{title}.mp4"'
else:
response_headers["Content-Disposition"] = f'inline; filename="{title}.mp4"'
return StreamingResponse(
proxy_stream(),
media_type="video/mp4",
headers=response_headers
)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Could not fetch video URL: {str(e)}")
@app.get("/audio/{video_id}")
async def stream_audio(video_id: str, request: Request, download: bool = Query(False)):
try:
decrypted_video_id = decrypt_video_id(video_id)
yt = YouTube(f"https://www.youtube.com/watch?v={decrypted_video_id}")
stream = yt.streams.get_audio_only()
url = stream.url
headers = {
"User-Agent": request.headers.get("user-agent", "Mozilla/5.0"),
}
if range_header := request.headers.get("range"):
headers["Range"] = range_header
async def proxy_stream():
async with httpx.AsyncClient(follow_redirects=True) as client:
async with client.stream("GET", url, headers=headers) as response:
if response.status_code not in (200, 206):
raise HTTPException(status_code=502, detail="Source stream error")
async for chunk in response.aiter_bytes(CHUNK_SIZE):
yield chunk
response_headers = {
"Accept-Ranges": "bytes",
"Cache-Control": "public, max-age=3600"
}
# Handle filename safely
title = yt.title.encode("utf-8", "ignore").decode("utf-8")
if download:
response_headers["Content-Disposition"] = f'attachment; filename="{title}.mp3"'
else:
response_headers["Content-Disposition"] = f'inline; filename="{title}.mp3"'
return StreamingResponse(
proxy_stream(),
media_type=stream.mime_type or "audio/mp4",
headers=response_headers
)
except Exception as e:
logger.error(f"Streaming error: {e}")
raise HTTPException(status_code=500, detail=f"Error: {str(e)}")