sdafd's picture
Create app.py
6f695bf verified
raw
history blame
7.82 kB
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import cv2
import uuid
import os
import requests
import random
import string
import json
import shutil
app = FastAPI()
def check_nsfw(img_url):
session_hash = ''.join(random.choice(string.ascii_lowercase + string.digits) for _ in range(7))
data = {
'data': [
{'path': img_url},
"chen-convnext",
0.5,
True,
True
],
'session_hash': session_hash,
'fn_index': 0,
'trigger_id': 12
}
r = requests.post('https://yoinked-da-nsfw-checker.hf.space/queue/join', json=data)
r = requests.get(f'https://yoinked-da-nsfw-checker.hf.space/queue/data?session_hash={session_hash}', stream=True)
buffer = "" # Buffer to accumulate the chunks
for content in r.iter_content(100):
# Decode the byte content to a string
buffer += content.decode('utf-8')
return json.loads(buffer.split('data:')[len(buffer.split('data:'))-2])["output"]["data"][0]['label']
def check_nsfw2(img_url):
session_hash = ''.join(random.choice(string.ascii_lowercase + string.digits) for _ in range(7))
data = {
'data': [
{'path': img_url}
],
'session_hash': session_hash,
'fn_index': 0,
'trigger_id': 9
}
r = requests.post('https://jamescookjr90-falconsai-nsfw-image-detection.hf.space/queue/join', json=data)
r = requests.get(f'https://jamescookjr90-falconsai-nsfw-image-detection.hf.space/queue/data?session_hash={session_hash}', stream=True)
buffer = "" # Buffer to accumulate the chunks
for content in r.iter_content(100):
# Decode the byte content to a string
buffer += content.decode('utf-8')
return json.loads(buffer.split('data:')[len(buffer.split('data:'))-2])["output"]["data"][0]['label']
def check_nsfw3(img_url):
data = {
'data': [
{'path': img_url}
]
}
r = requests.post('https://zanderlewis-xl-nsfw-detection.hf.space/call/predict',json=data)
json_data = r.json()
event_id = json_data['event_id']
r = requests.get(f'https://zanderlewis-xl-nsfw-detection.hf.space/call/predict/{event_id}', stream=True)
event_stream = ''
for chunk in r.iter_content(100):
event_stream += chunk.decode('utf-8')
return ast.literal_eval(event_stream.split('data:')[-1])[0]['label']
def get_replica_code(url):
try:
r = requests.get(url)
return r.text.split('replicas/')[1].split('"};')[0]
except:
return None
def check_nsfw4(img_url):
code = get_replica_code('https://error466-falconsai-nsfw-image-detection.hf.space')
session_hash = ''.join(random.choice(string.ascii_lowercase + string.digits) for _ in range(7))
data = {
'data': [
{'path': img_url}
],
'session_hash': session_hash,
'fn_index': 0,
'trigger_id': 58
}
r = requests.post(f'https://error466-falconsai-nsfw-image-detection.hf.space/--replicas/{code}/queue/join', json=data)
r = requests.get(f'https://error466-falconsai-nsfw-image-detection.hf.space/--replicas/{code}/queue/data?session_hash={session_hash}', stream=True)
buffer = "" # Buffer to accumulate the chunks
for content in r.iter_content(100):
# Decode the byte content to a string
buffer += content.decode('utf-8')
return json.loads(buffer.split('data:')[-1])["output"]["data"][0]['label']
def check_nsfw5(img_url):
session_hash = ''.join(random.choice(string.ascii_lowercase + string.digits) for _ in range(7))
data = {
'data': [
{'path': img_url}
],
'session_hash': session_hash,
'fn_index': 0,
'trigger_id': 9
}
r = requests.post('https://phelpsgg-falconsai-nsfw-image-detection.hf.space/queue/join', json=data)
r = requests.get(f'https://phelpsgg-falconsai-nsfw-image-detection.hf.space/queue/data?session_hash={session_hash}', stream=True)
buffer = "" # Buffer to accumulate the chunks
for content in r.iter_content(100):
# Decode the byte content to a string
buffer += content.decode('utf-8')
return json.loads(buffer.split('data:')[-1])["output"]["data"][0]['label']
def check_nsfw_final(img_url):
result = None
print(img_url)
try:
check = check_nsfw2(img_url)
if check == 'nsfw':
result = True
else:
result = False
except Exception as e:
print(e)
if result is None:
try:
check = check_nsfw3(img_url)
if check == 'nsfw':
result = True
else:
result = False
except Exception as e:
print(e)
if result is None:
try:
check = check_nsfw4(img_url)
if check == 'nsfw':
result = True
else:
result = False
except:
pass
if result is None:
try:
check = check_nsfw5(img_url)
if check == 'nsfw':
result = True
else:
result = False
except Exception as e:
print(e)
return result
# Frame extraction
def extract_frames(video_path, num_frames, temp_dir):
vidcap = cv2.VideoCapture(video_path)
total_frames = int(vidcap.get(cv2.CAP_PROP_FRAME_COUNT))
frames = []
for i in range(num_frames):
frame_number = int(i * total_frames / num_frames)
vidcap.set(cv2.CAP_PROP_POS_FRAMES, frame_number)
success, image = vidcap.read()
if success:
frame_filename = os.path.join(temp_dir, f"{uuid.uuid4().hex}.jpg")
cv2.imwrite(frame_filename, image)
frames.append(frame_filename)
else:
break
vidcap.release()
return frames
# Video processing
def process_video(video_path, num_frames):
temp_dir = os.path.join("/tmp", f"frames_{uuid.uuid4().hex}")
os.makedirs(temp_dir, exist_ok=True)
frames = extract_frames(video_path, num_frames, temp_dir)
nsfw_count = 0
total_frames = len(frames)
frame_results = []
for frame_path in frames:
img_url = f"file://{frame_path}"
nsfw_detected = check_nsfw_final(img_url)
frame_results.append({
"frame_path": frame_path,
"nsfw_detected": nsfw_detected
})
if nsfw_detected:
nsfw_count += 1
# Cleanup
shutil.rmtree(temp_dir, ignore_errors=True)
result = {
"nsfw_count": nsfw_count,
"total_frames": total_frames,
"frames": frame_results
}
return result
# Request/Response models
class VideoRequest(BaseModel):
video_url: str
num_frames: int = 10
class VideoResponse(BaseModel):
nsfw_count: int
total_frames: int
frames: list
# API Endpoints
@app.post("/process_video", response_model=VideoResponse)
async def process_video_endpoint(request: VideoRequest):
# Download video
try:
video_path = os.path.join("/tmp", f"{uuid.uuid4().hex}.mp4")
with requests.get(request.video_url, stream=True) as r:
r.raise_for_status()
with open(video_path, "wb") as f:
for chunk in r.iter_content(chunk_size=8192):
f.write(chunk)
except Exception as e:
raise HTTPException(status_code=400, detail=f"Error downloading video: {e}")
try:
# Process video
result = process_video(video_path, request.num_frames)
finally:
# Cleanup video file
if os.path.exists(video_path):
os.remove(video_path)
return result