Spaces:
Running
Running
| from fastapi import FastAPI, HTTPException | |
| from pydantic import BaseModel | |
| import cv2 | |
| import uuid | |
| import os | |
| import requests | |
| import random | |
| import string | |
| import json | |
| import shutil | |
| import ast | |
| import tempfile | |
| import base64 | |
| app = FastAPI() | |
| def check_nsfw(img_url): | |
| session_hash = ''.join(random.choice(string.ascii_lowercase + string.digits) for _ in range(7)) | |
| data = { | |
| 'data': [ | |
| {'path': img_url}, | |
| "chen-convnext", | |
| 0.5, | |
| True, | |
| True | |
| ], | |
| 'session_hash': session_hash, | |
| 'fn_index': 0, | |
| 'trigger_id': 12 | |
| } | |
| r = requests.post('https://yoinked-da-nsfw-checker.hf.space/queue/join', json=data) | |
| r = requests.get(f'https://yoinked-da-nsfw-checker.hf.space/queue/data?session_hash={session_hash}', stream=True) | |
| buffer = "" # Buffer to accumulate the chunks | |
| for content in r.iter_content(100): | |
| # Decode the byte content to a string | |
| buffer += content.decode('utf-8') | |
| return json.loads(buffer.split('data:')[len(buffer.split('data:'))-2])["output"]["data"][0]['label'] | |
| def check_nsfw2(img_url): | |
| session_hash = ''.join(random.choice(string.ascii_lowercase + string.digits) for _ in range(7)) | |
| data = { | |
| 'data': [ | |
| {'path': img_url} | |
| ], | |
| 'session_hash': session_hash, | |
| 'fn_index': 0, | |
| 'trigger_id': 9 | |
| } | |
| r = requests.post('https://jamescookjr90-falconsai-nsfw-image-detection.hf.space/queue/join', json=data) | |
| r = requests.get(f'https://jamescookjr90-falconsai-nsfw-image-detection.hf.space/queue/data?session_hash={session_hash}', stream=True) | |
| buffer = "" # Buffer to accumulate the chunks | |
| for content in r.iter_content(100): | |
| # Decode the byte content to a string | |
| buffer += content.decode('utf-8') | |
| return json.loads(buffer.split('data:')[len(buffer.split('data:'))-2])["output"]["data"][0]['label'] | |
| def check_nsfw3(img_url): | |
| data = { | |
| 'data': [ | |
| {'path': img_url} | |
| ] | |
| } | |
| r = requests.post('https://zanderlewis-xl-nsfw-detection.hf.space/call/predict',json=data) | |
| json_data = r.json() | |
| event_id = json_data['event_id'] | |
| r = requests.get(f'https://zanderlewis-xl-nsfw-detection.hf.space/call/predict/{event_id}', stream=True) | |
| event_stream = '' | |
| for chunk in r.iter_content(100): | |
| event_stream += chunk.decode('utf-8') | |
| return ast.literal_eval(event_stream.split('data:')[-1])[0]['label'] | |
| def get_replica_code(url): | |
| try: | |
| r = requests.get(url) | |
| return r.text.split('replicas/')[1].split('"};')[0] | |
| except: | |
| return None | |
| def check_nsfw4(img_url): | |
| code = get_replica_code('https://error466-falconsai-nsfw-image-detection.hf.space') | |
| session_hash = ''.join(random.choice(string.ascii_lowercase + string.digits) for _ in range(7)) | |
| data = { | |
| 'data': [ | |
| {'path': img_url} | |
| ], | |
| 'session_hash': session_hash, | |
| 'fn_index': 0, | |
| 'trigger_id': 58 | |
| } | |
| r = requests.post(f'https://error466-falconsai-nsfw-image-detection.hf.space/--replicas/{code}/queue/join', json=data) | |
| r = requests.get(f'https://error466-falconsai-nsfw-image-detection.hf.space/--replicas/{code}/queue/data?session_hash={session_hash}', stream=True) | |
| buffer = "" # Buffer to accumulate the chunks | |
| for content in r.iter_content(100): | |
| # Decode the byte content to a string | |
| buffer += content.decode('utf-8') | |
| return json.loads(buffer.split('data:')[-1])["output"]["data"][0]['label'] | |
| def check_nsfw5(img_url): | |
| session_hash = ''.join(random.choice(string.ascii_lowercase + string.digits) for _ in range(7)) | |
| data = { | |
| 'data': [ | |
| {'path': img_url} | |
| ], | |
| 'session_hash': session_hash, | |
| 'fn_index': 0, | |
| 'trigger_id': 9 | |
| } | |
| r = requests.post('https://phelpsgg-falconsai-nsfw-image-detection.hf.space/queue/join', json=data) | |
| r = requests.get(f'https://phelpsgg-falconsai-nsfw-image-detection.hf.space/queue/data?session_hash={session_hash}', stream=True) | |
| buffer = "" # Buffer to accumulate the chunks | |
| for content in r.iter_content(100): | |
| # Decode the byte content to a string | |
| buffer += content.decode('utf-8') | |
| return json.loads(buffer.split('data:')[-1])["output"]["data"][0]['label'] | |
| def check_nsfw_final(img_url): | |
| result = None | |
| print(img_url) | |
| try: | |
| check = check_nsfw2(img_url) | |
| if check == 'nsfw': | |
| result = True | |
| else: | |
| result = False | |
| except Exception as e: | |
| print(e) | |
| if result is None: | |
| try: | |
| check = check_nsfw3(img_url) | |
| if check == 'nsfw': | |
| result = True | |
| else: | |
| result = False | |
| except Exception as e: | |
| print(e) | |
| if result is None: | |
| try: | |
| check = check_nsfw4(img_url) | |
| if check == 'nsfw': | |
| result = True | |
| else: | |
| result = False | |
| except: | |
| pass | |
| if result is None: | |
| try: | |
| check = check_nsfw5(img_url) | |
| if check == 'nsfw': | |
| result = True | |
| else: | |
| result = False | |
| except Exception as e: | |
| print(e) | |
| return result | |
| # Frame extraction | |
| def extract_frames(video_path, num_frames, temp_dir): | |
| print(video_path) | |
| vidcap = cv2.VideoCapture(video_path) | |
| total_frames = int(vidcap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
| frames = [] | |
| for i in range(num_frames): | |
| frame_number = int(i * total_frames / num_frames) | |
| vidcap.set(cv2.CAP_PROP_POS_FRAMES, frame_number) | |
| success, image = vidcap.read() | |
| if success: | |
| frame_filename = os.path.join(temp_dir, f"{uuid.uuid4().hex}.jpg") | |
| cv2.imwrite(frame_filename, image) | |
| frames.append(frame_filename) | |
| else: | |
| break | |
| vidcap.release() | |
| return frames | |
| # Video processing | |
| def process_video(video_path, num_frames): | |
| temp_dir = os.path.join("/tmp", f"frames_{uuid.uuid4().hex}") | |
| os.makedirs(temp_dir, exist_ok=True) | |
| frames = extract_frames(video_path, num_frames, temp_dir) | |
| nsfw_count = 0 | |
| total_frames = len(frames) | |
| frame_results = [] | |
| for frame_path in frames: | |
| img_url = f"file://{frame_path}" | |
| print(img_url) | |
| nsfw_detected = check_nsfw_final(img_url) | |
| frame_results.append({ | |
| "frame_path": frame_path, | |
| "nsfw_detected": nsfw_detected | |
| }) | |
| if nsfw_detected: | |
| nsfw_count += 1 | |
| # Cleanup | |
| shutil.rmtree(temp_dir, ignore_errors=True) | |
| result = { | |
| "nsfw_count": nsfw_count, | |
| "total_frames": total_frames, | |
| "frames": frame_results | |
| } | |
| return result | |
| # Request/Response models | |
| class VideoRequest(BaseModel): | |
| video_url: str | |
| num_frames: int = 10 | |
| class VideoResponse(BaseModel): | |
| nsfw_count: int | |
| total_frames: int | |
| frames: list | |
| # API Endpoints | |
| async def process_video_endpoint(request: VideoRequest): | |
| # Download video | |
| try: | |
| video_path = os.path.join("/tmp", f"{uuid.uuid4().hex}.mp4") | |
| with requests.get(request.video_url, stream=True) as r: | |
| r.raise_for_status() | |
| with open(video_path, "wb") as f: | |
| for chunk in r.iter_content(chunk_size=8192): | |
| f.write(chunk) | |
| except Exception as e: | |
| raise HTTPException(status_code=400, detail=f"Error downloading video: {e}") | |
| try: | |
| # Process video | |
| result = process_video(video_path, request.num_frames) | |
| finally: | |
| # Cleanup video file | |
| if os.path.exists(video_path): | |
| os.remove(video_path) | |
| return result | |