import gradio as gr import cv2 import tempfile import os import requests import base64 import random import string import json import ast import uuid def check_nsfw(img_url): session_hash = ''.join(random.choice(string.ascii_lowercase + string.digits) for _ in range(7)) data = { 'data': [ {'path': img_url}, "chen-convnext", 0.5, True, True ], 'session_hash': session_hash, 'fn_index': 0, 'trigger_id': 12 } r = requests.post('https://yoinked-da-nsfw-checker.hf.space/queue/join', json=data) r = requests.get(f'https://yoinked-da-nsfw-checker.hf.space/queue/data?session_hash={session_hash}', stream=True) buffer = "" # Buffer to accumulate the chunks for content in r.iter_content(100): # Decode the byte content to a string buffer += content.decode('utf-8') return json.loads(buffer.split('data:')[len(buffer.split('data:'))-2])["output"]["data"][0]['label'] def check_nsfw2(img_url): session_hash = ''.join(random.choice(string.ascii_lowercase + string.digits) for _ in range(7)) data = { 'data': [ {'path': img_url} ], 'session_hash': session_hash, 'fn_index': 0, 'trigger_id': 9 } r = requests.post('https://jamescookjr90-falconsai-nsfw-image-detection.hf.space/queue/join', json=data) r = requests.get(f'https://jamescookjr90-falconsai-nsfw-image-detection.hf.space/queue/data?session_hash={session_hash}', stream=True) buffer = "" # Buffer to accumulate the chunks for content in r.iter_content(100): # Decode the byte content to a string buffer += content.decode('utf-8') return json.loads(buffer.split('data:')[len(buffer.split('data:'))-2])["output"]["data"][0]['label'] def check_nsfw3(img_url): data = { 'data': [ {'path': img_url} ] } r = requests.post('https://zanderlewis-xl-nsfw-detection.hf.space/call/predict',json=data) json_data = r.json() event_id = json_data['event_id'] r = requests.get(f'https://zanderlewis-xl-nsfw-detection.hf.space/call/predict/{event_id}', stream=True) event_stream = '' for chunk in r.iter_content(100): event_stream += chunk.decode('utf-8') return ast.literal_eval(event_stream.split('data:')[-1])[0]['label'] def get_replica_code(url): try: r = requests.get(url) return r.text.split('replicas/')[1].split('"};')[0] except: return None def check_nsfw4(img_url): code = get_replica_code('https://error466-falconsai-nsfw-image-detection.hf.space') session_hash = ''.join(random.choice(string.ascii_lowercase + string.digits) for _ in range(7)) data = { 'data': [ {'path': img_url} ], 'session_hash': session_hash, 'fn_index': 0, 'trigger_id': 58 } r = requests.post(f'https://error466-falconsai-nsfw-image-detection.hf.space/--replicas/{code}/queue/join', json=data) r = requests.get(f'https://error466-falconsai-nsfw-image-detection.hf.space/--replicas/{code}/queue/data?session_hash={session_hash}', stream=True) buffer = "" # Buffer to accumulate the chunks for content in r.iter_content(100): # Decode the byte content to a string buffer += content.decode('utf-8') return json.loads(buffer.split('data:')[-1])["output"]["data"][0]['label'] def check_nsfw5(img_url): session_hash = ''.join(random.choice(string.ascii_lowercase + string.digits) for _ in range(7)) data = { 'data': [ {'path': img_url} ], 'session_hash': session_hash, 'fn_index': 0, 'trigger_id': 9 } r = requests.post('https://phelpsgg-falconsai-nsfw-image-detection.hf.space/queue/join', json=data) r = requests.get(f'https://phelpsgg-falconsai-nsfw-image-detection.hf.space/queue/data?session_hash={session_hash}', stream=True) buffer = "" # Buffer to accumulate the chunks for content in r.iter_content(100): # Decode the byte content to a string buffer += content.decode('utf-8') return json.loads(buffer.split('data:')[-1])["output"]["data"][0]['label'] def check_nsfw_final(img_url): result = None print(img_url) try: check = check_nsfw2(img_url) if check == 'nsfw': result = True else: result = False except Exception as e: print(e) if result is None: try: check = check_nsfw3(img_url) if check == 'nsfw': result = True else: result = False except Exception as e: print(e) if result is None: try: check = check_nsfw4(img_url) if check == 'nsfw': result = True else: result = False except: pass if result is None: try: check = check_nsfw5(img_url) if check == 'nsfw': result = True else: result = False except Exception as e: print(e) return result def extract_frames(video_path, num_frames): vidcap = cv2.VideoCapture(video_path) total_frames = int(vidcap.get(cv2.CAP_PROP_FRAME_COUNT)) frames = [] for i in range(num_frames): frame_number = int(i * total_frames / num_frames) vidcap.set(cv2.CAP_PROP_POS_FRAMES, frame_number) success, image = vidcap.read() if success: random_filename = f"{uuid.uuid4().hex}.jpg" cv2.imwrite('/tmp/gradio/' + random_filename, image) frames.append(random_filename) else: break vidcap.release() return frames def process_video(video, num_frames): frames = extract_frames(video, num_frames) nsfw_count = 0 total_frames = len(frames) frame_results = [] for frame_path in frames: nsfw_detected = check_nsfw_final(f"https://sdafd-video-nsfw-detect.hf.space/file=/tmp/gradio/{frame_path}") frame_results.append({ "frame_path": frame_path, "nsfw_detected": nsfw_detected }) if nsfw_detected: nsfw_count += 1 # Cleanup for frame_path in frames: if os.path.exists('/tmp/gradio/' + frame_path): os.remove('/tmp/gradio/' + frame_path) result = { "nsfw_count": nsfw_count, "total_frames": total_frames, "frames": frame_results } return result with gr.Blocks() as app: gr.Markdown("## NSFW Frame Detection in Video") with gr.Row(): video_input = gr.Video(label="Upload Video") num_frames_input = gr.Number(label="Number of Frames to Check", value=10, precision=0) output_json = gr.JSON(label="Result") submit_button = gr.Button("Submit") submit_button.click( fn=process_video, inputs=[video_input, num_frames_input], outputs=output_json ) app.launch()