Spaces:
Runtime error
Runtime error
File size: 10,460 Bytes
3a3c2be 2dc47c6 cb82c24 2dc47c6 589bf82 514b638 c975f81 0ef1091 6853273 9042fbb 6853273 9042fbb 589bf82 5e8743b dae402a 97eb128 9042fbb 97eb128 9042fbb 2c7b51d b23d160 f7b5887 c2422af f7b5887 b23d160 9042fbb 97eb128 9042fbb 6853273 589bf82 6853273 589bf82 3a3c2be 6853273 2dc47c6 6853273 2dc47c6 3a3c2be 2dc47c6 cb82c24 2dc47c6 6853273 2dc47c6 9042fbb 2dc47c6 6853273 2dc47c6 6853273 514b638 1ae13cc 6853273 97eb128 6853273 97eb128 6853273 97eb128 6853273 37b013a 97eb128 b2ebbba e53ae5a b2ebbba 9042fbb 97eb128 9042fbb 97eb128 9042fbb 97eb128 9042fbb 97eb128 9042fbb 7b57752 6853273 7b57752 564bb7d 6853273 f755699 6853273 1ae13cc b2619fa 5aa6ebe 1ae13cc 7b57752 1ae13cc b2619fa 9042fbb 6853273 9042fbb 97eb128 9042fbb 97eb128 9042fbb 97eb128 9042fbb 97eb128 9042fbb 564bb7d 6853273 cb82c24 6853273 cb82c24 6853273 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 |
import os
import subprocess
import importlib.util
import gradio as gr
import logging
from moviepy.editor import VideoFileClip
import json
import spaces
import torch
import random
import string
from datetime import datetime, timedelta
from collections import defaultdict
# Set up logging
logging.basicConfig(level=logging.INFO, filename="api_access.log", format="%(asctime)s - %(message)s")
torch.set_num_threads(1)
torch.set_num_interop_threads(1)
torch.use_deterministic_algorithms(True)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False
torch.backends.cuda.matmul.allow_tf32 = False
torch.backends.cudnn.allow_tf32 = False
# Data structures to track IP addresses
ip_access_records = defaultdict(lambda: {
'total_access_time': timedelta(),
'blocked_until': None,
'session_start': None,
'last_access_time': None
})
def truncate_video(video_file):
"""Truncates video to 30 seconds and saves it as a temporary file."""
clip = VideoFileClip(video_file)
truncated_clip = clip.subclip(0, min(30, clip.duration)) # Change 15 to 30 for 30 seconds
truncated_video_file = "temp_truncated_video.mp4"
truncated_clip.write_videofile(truncated_video_file, codec="libx264", audio_codec="aac")
clip.close()
truncated_clip.close()
return truncated_video_file
'''
AUTHORIZED_API_KEY = os.getenv("HF_API_KEY")
def is_authorized(request: gr.Request):
"""Checks if the request includes the correct API key."""
api_key = request.headers.get("API-Key")
print("API-Key received:", api_key) # Log the received API key for debugging
return api_key == AUTHORIZED_API_KEY
'''
SESSION_TIMEOUT = timedelta(hours=1)
# Function to log API access attempts
def log_api_access(successful_captcha, video_file=None, client_ip="unknown", blocked=False):
status = "Blocked" if blocked else "Allowed"
logging.info(f"Access Attempt - Status: {status}, Successful CAPTCHA: {successful_captcha}, "
f"Video File: {video_file if video_file else 'N/A'}, Client IP: {client_ip}")
# Function to get client IP address
def get_client_ip(request: gr.Request):
return request.client.host
# Function to check if IP is blocked
def is_ip_blocked(client_ip):
record = ip_access_records[client_ip]
blocked_until = record.get('blocked_until')
if blocked_until and datetime.now() < blocked_until:
return True
return False
# Function to update IP access time
def update_ip_access_time(client_ip):
record = ip_access_records[client_ip]
now = datetime.now()
session_start = record.get('session_start')
if session_start:
elapsed = now - session_start
record['total_access_time'] += elapsed
record['session_start'] = now
else:
record['session_start'] = now
# Remove access times older than 24 hours
last_access_time = record.get('last_access_time')
if last_access_time and now - last_access_time > timedelta(hours=24):
record['total_access_time'] = timedelta()
record['last_access_time'] = now
# Check if total access time in last 24 hours exceeds 1 hour
if record['total_access_time'] >= timedelta(hours=1):
# Block IP for 48 hours
record['blocked_until'] = now + timedelta(hours=48)
record['total_access_time'] = timedelta() # Reset total access time
# Function to truncate video
def truncate_video(video_file):
"""Truncates video to 15 seconds and saves it as a temporary file."""
clip = VideoFileClip(video_file)
truncated_clip = clip.subclip(0, min(15, clip.duration))
truncated_video_file = "temp_truncated_video.mp4"
truncated_clip.write_videofile(truncated_video_file, codec="libx264", audio_codec="aac")
clip.close()
truncated_clip.close()
return truncated_video_file
# Clone repository
def clone_repo():
"""Clone the GitHub repository containing the backend."""
repo_url = "https://github.com/NeeravSood/AllMark-MVP.git"
repo_path = "./repository"
github_pat = os.getenv("GITHUB_PAT")
if not github_pat:
raise RuntimeError("GitHub Personal Access Token (GITHUB_PAT) not found in environment variables.")
authenticated_repo_url = f"https://{github_pat}@github.com/NeeravSood/AllMark-MVP.git"
if os.path.exists(repo_path):
print("Repository already cloned.")
else:
try:
subprocess.run(
["git", "clone", authenticated_repo_url, repo_path],
check=True,
text=True,
capture_output=True
)
print("Repository cloned successfully.")
except subprocess.CalledProcessError as e:
print("Output:", e.stdout)
print("Error:", e.stderr)
raise RuntimeError(f"Failed to clone repository: {e.stderr}")
# Import backend script
def import_backend_script(script_name):
"""Dynamically import the backend script."""
try:
script_path = os.path.join("./repository", script_name)
if not os.path.exists(script_path):
raise FileNotFoundError(f"Script {script_name} not found in the repository.")
spec = importlib.util.spec_from_file_location("backend_module", script_path)
backend_module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(backend_module)
return backend_module
except Exception as e:
logging.error(f"Error importing backend script: {str(e)}")
raise RuntimeError(f"Failed to import backend script: {str(e)}")
# Clone and import repository
clone_repo()
backend = import_backend_script("app.py")
analyzer = backend.DeepfakeAnalyzer()
# Generate a random CAPTCHA challenge
def generate_captcha():
return ''.join(random.choices(string.ascii_uppercase + string.digits, k=5))
# Function to verify CAPTCHA per user session
def verify_captcha(user_input, captcha_solution):
if user_input.strip() == captcha_solution:
return "Captcha verified. Video analysis will proceed.", True
else:
return "Incorrect CAPTCHA. Please try again.", False
@spaces.GPU(duration=1000)
def analyze_video(video_file, captcha_input, captcha_solution, request: gr.Request):
# Authorization check
#if not is_authorized(request):
# return {"error": "Unauthorized access. Please provide a valid API key."}
client_ip = get_client_ip(request)
# Check if IP is blocked
if is_ip_blocked(client_ip):
log_api_access(successful_captcha=False, video_file=os.path.basename(video_file) if video_file else "N/A", client_ip=client_ip, blocked=True)
return {"error": "Your IP has been blocked due to excessive usage. Please try again later."}
# Verify CAPTCHA
message, success = verify_captcha(captcha_input, captcha_solution)
if not success:
log_api_access(successful_captcha=False, video_file=os.path.basename(video_file) if video_file else "N/A", client_ip=client_ip)
return {"error": message}
# Update IP access time
update_ip_access_time(client_ip)
# Log API access attempt with successful CAPTCHA
log_api_access(successful_captcha=True, video_file=os.path.basename(video_file) if video_file else "N/A", client_ip=client_ip)
try:
# Video truncation and analysis
truncated_video = truncate_video(video_file)
results = analyzer.analyze_media(truncated_video)
# Process combined assessment
combined_assessment = results.get('combined_assessment', 0)
if isinstance(combined_assessment, str) and combined_assessment.lower() == "deepfake":
analysis_result = "a deepfake"
else:
combined_assessment = int(combined_assessment) if str(combined_assessment).isdigit() else 0
analysis_result = "genuine/original" if combined_assessment < 50 else "a deepfake"
output = {
"message": f"According to our analysis, the video you uploaded appears to be {analysis_result}. "
f"Made by Neerav Sood (LinkedIn - https://www.linkedin.com/in/neeravsood/). "
}
return output
except Exception as e:
logging.error(f"Error during analysis: {e}")
return {"error": "An error occurred during video analysis. Please check your input and try again."}
# Interface with CAPTCHA after video upload
def main_interface():
with gr.Blocks() as interface:
gr.Markdown("# AllMark - Deepfake Analyzer")
gr.Markdown("Upload a video to proceed with analysis.")
# State variables for per-session data
captcha_solution = gr.State()
# Video input with display size restriction
video_input = gr.Video(label="Upload Video", height=300) # Adjust height as needed
captcha_text = gr.Textbox(label="CAPTCHA", interactive=False, visible=False)
captcha_input = gr.Textbox(label="Enter CAPTCHA Here", visible=False)
captcha_output = gr.Textbox(label="CAPTCHA Status", interactive=False, visible=False)
analyze_button = gr.Button("Analyze Video", visible=False)
analysis_output = gr.JSON(label="Analysis Result")
# Function to show CAPTCHA after video upload
def show_captcha(video):
if video is not None:
# Generate a new CAPTCHA for this session
new_captcha = generate_captcha()
return (
gr.update(visible=True, value=new_captcha),
gr.update(visible=True),
gr.update(visible=True),
gr.update(visible=True),
new_captcha # Update the session state
)
else:
return (
gr.update(visible=False),
gr.update(visible=False),
gr.update(visible=False),
gr.update(visible=False),
gr.no_update
)
video_input.change(
show_captcha,
inputs=video_input,
outputs=[captcha_text, captcha_input, captcha_output, analyze_button, captcha_solution]
)
# Handle analysis
analyze_button.click(
analyze_video,
inputs=[video_input, captcha_input, captcha_solution],
outputs=analysis_output
)
return interface
# Launch interface
if __name__ == "__main__":
main_interface().launch()
|