from fastapi import FastAPI, HTTPException from pydantic import BaseModel import cv2 import uuid import os import requests import random import string import json import shutil app = FastAPI() def check_nsfw(img_url): session_hash = ''.join(random.choice(string.ascii_lowercase + string.digits) for _ in range(7)) data = { 'data': [ {'path': img_url}, "chen-convnext", 0.5, True, True ], 'session_hash': session_hash, 'fn_index': 0, 'trigger_id': 12 } r = requests.post('https://yoinked-da-nsfw-checker.hf.space/queue/join', json=data) r = requests.get(f'https://yoinked-da-nsfw-checker.hf.space/queue/data?session_hash={session_hash}', stream=True) buffer = "" # Buffer to accumulate the chunks for content in r.iter_content(100): # Decode the byte content to a string buffer += content.decode('utf-8') return json.loads(buffer.split('data:')[len(buffer.split('data:'))-2])["output"]["data"][0]['label'] def check_nsfw2(img_url): session_hash = ''.join(random.choice(string.ascii_lowercase + string.digits) for _ in range(7)) data = { 'data': [ {'path': img_url} ], 'session_hash': session_hash, 'fn_index': 0, 'trigger_id': 9 } r = requests.post('https://jamescookjr90-falconsai-nsfw-image-detection.hf.space/queue/join', json=data) r = requests.get(f'https://jamescookjr90-falconsai-nsfw-image-detection.hf.space/queue/data?session_hash={session_hash}', stream=True) buffer = "" # Buffer to accumulate the chunks for content in r.iter_content(100): # Decode the byte content to a string buffer += content.decode('utf-8') return json.loads(buffer.split('data:')[len(buffer.split('data:'))-2])["output"]["data"][0]['label'] def check_nsfw3(img_url): data = { 'data': [ {'path': img_url} ] } r = requests.post('https://zanderlewis-xl-nsfw-detection.hf.space/call/predict',json=data) json_data = r.json() event_id = json_data['event_id'] r = requests.get(f'https://zanderlewis-xl-nsfw-detection.hf.space/call/predict/{event_id}', stream=True) event_stream = '' for chunk in r.iter_content(100): event_stream += chunk.decode('utf-8') return ast.literal_eval(event_stream.split('data:')[-1])[0]['label'] def get_replica_code(url): try: r = requests.get(url) return r.text.split('replicas/')[1].split('"};')[0] except: return None def check_nsfw4(img_url): code = get_replica_code('https://error466-falconsai-nsfw-image-detection.hf.space') session_hash = ''.join(random.choice(string.ascii_lowercase + string.digits) for _ in range(7)) data = { 'data': [ {'path': img_url} ], 'session_hash': session_hash, 'fn_index': 0, 'trigger_id': 58 } r = requests.post(f'https://error466-falconsai-nsfw-image-detection.hf.space/--replicas/{code}/queue/join', json=data) r = requests.get(f'https://error466-falconsai-nsfw-image-detection.hf.space/--replicas/{code}/queue/data?session_hash={session_hash}', stream=True) buffer = "" # Buffer to accumulate the chunks for content in r.iter_content(100): # Decode the byte content to a string buffer += content.decode('utf-8') return json.loads(buffer.split('data:')[-1])["output"]["data"][0]['label'] def check_nsfw5(img_url): session_hash = ''.join(random.choice(string.ascii_lowercase + string.digits) for _ in range(7)) data = { 'data': [ {'path': img_url} ], 'session_hash': session_hash, 'fn_index': 0, 'trigger_id': 9 } r = requests.post('https://phelpsgg-falconsai-nsfw-image-detection.hf.space/queue/join', json=data) r = requests.get(f'https://phelpsgg-falconsai-nsfw-image-detection.hf.space/queue/data?session_hash={session_hash}', stream=True) buffer = "" # Buffer to accumulate the chunks for content in r.iter_content(100): # Decode the byte content to a string buffer += content.decode('utf-8') return json.loads(buffer.split('data:')[-1])["output"]["data"][0]['label'] def check_nsfw_final(img_url): result = None print(img_url) try: check = check_nsfw2(img_url) if check == 'nsfw': result = True else: result = False except Exception as e: print(e) if result is None: try: check = check_nsfw3(img_url) if check == 'nsfw': result = True else: result = False except Exception as e: print(e) if result is None: try: check = check_nsfw4(img_url) if check == 'nsfw': result = True else: result = False except: pass if result is None: try: check = check_nsfw5(img_url) if check == 'nsfw': result = True else: result = False except Exception as e: print(e) return result # Frame extraction def extract_frames(video_path, num_frames, temp_dir): vidcap = cv2.VideoCapture(video_path) total_frames = int(vidcap.get(cv2.CAP_PROP_FRAME_COUNT)) frames = [] for i in range(num_frames): frame_number = int(i * total_frames / num_frames) vidcap.set(cv2.CAP_PROP_POS_FRAMES, frame_number) success, image = vidcap.read() if success: frame_filename = os.path.join(temp_dir, f"{uuid.uuid4().hex}.jpg") cv2.imwrite(frame_filename, image) frames.append(frame_filename) else: break vidcap.release() return frames # Video processing def process_video(video_path, num_frames): temp_dir = os.path.join("/tmp", f"frames_{uuid.uuid4().hex}") os.makedirs(temp_dir, exist_ok=True) frames = extract_frames(video_path, num_frames, temp_dir) nsfw_count = 0 total_frames = len(frames) frame_results = [] for frame_path in frames: img_url = f"file://{frame_path}" nsfw_detected = check_nsfw_final(img_url) frame_results.append({ "frame_path": frame_path, "nsfw_detected": nsfw_detected }) if nsfw_detected: nsfw_count += 1 # Cleanup shutil.rmtree(temp_dir, ignore_errors=True) result = { "nsfw_count": nsfw_count, "total_frames": total_frames, "frames": frame_results } return result # Request/Response models class VideoRequest(BaseModel): video_url: str num_frames: int = 10 class VideoResponse(BaseModel): nsfw_count: int total_frames: int frames: list # API Endpoints @app.post("/process_video", response_model=VideoResponse) async def process_video_endpoint(request: VideoRequest): # Download video try: video_path = os.path.join("/tmp", f"{uuid.uuid4().hex}.mp4") with requests.get(request.video_url, stream=True) as r: r.raise_for_status() with open(video_path, "wb") as f: for chunk in r.iter_content(chunk_size=8192): f.write(chunk) except Exception as e: raise HTTPException(status_code=400, detail=f"Error downloading video: {e}") try: # Process video result = process_video(video_path, request.num_frames) finally: # Cleanup video file if os.path.exists(video_path): os.remove(video_path) return result