Spaces:
Runtime error
Runtime error
Update app.py
Browse files
app.py
CHANGED
@@ -9,9 +9,11 @@ import spaces
|
|
9 |
import torch
|
10 |
import random
|
11 |
import string
|
|
|
|
|
12 |
|
13 |
# Set up logging
|
14 |
-
logging.basicConfig(level=logging.
|
15 |
|
16 |
torch.set_num_threads(1)
|
17 |
torch.set_num_interop_threads(1)
|
@@ -21,6 +23,58 @@ torch.backends.cudnn.benchmark = False
|
|
21 |
torch.backends.cuda.matmul.allow_tf32 = False
|
22 |
torch.backends.cudnn.allow_tf32 = False
|
23 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
24 |
# Function to truncate video
|
25 |
def truncate_video(video_file):
|
26 |
"""Truncates video to 15 seconds and saves it as a temporary file."""
|
@@ -66,7 +120,7 @@ def import_backend_script(script_name):
|
|
66 |
script_path = os.path.join("./repository", script_name)
|
67 |
if not os.path.exists(script_path):
|
68 |
raise FileNotFoundError(f"Script {script_name} not found in the repository.")
|
69 |
-
|
70 |
spec = importlib.util.spec_from_file_location("backend_module", script_path)
|
71 |
backend_module = importlib.util.module_from_spec(spec)
|
72 |
spec.loader.exec_module(backend_module)
|
@@ -90,13 +144,32 @@ captcha_solution = generate_captcha()
|
|
90 |
def verify_captcha(user_input):
|
91 |
global captcha_solution
|
92 |
if user_input.strip() == captcha_solution:
|
93 |
-
return "Captcha verified.
|
94 |
else:
|
95 |
captcha_solution = generate_captcha() # Reset CAPTCHA on failure
|
96 |
return f"Incorrect CAPTCHA. Please try again.", False, captcha_solution
|
97 |
|
98 |
@spaces.GPU(duration=1000)
|
99 |
-
def analyze_video(video_file):
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
100 |
try:
|
101 |
# Video truncation and analysis
|
102 |
truncated_video = truncate_video(video_file)
|
@@ -120,38 +193,35 @@ def analyze_video(video_file):
|
|
120 |
logging.error(f"Error during analysis: {e}")
|
121 |
return {"error": "An error occurred during video analysis. Please check your input and try again."}
|
122 |
|
123 |
-
# Interface with CAPTCHA
|
124 |
def main_interface():
|
125 |
with gr.Blocks() as interface:
|
126 |
gr.Markdown("# AllMark - Deepfake Analyzer")
|
127 |
-
gr.Markdown("
|
128 |
-
|
129 |
-
#
|
130 |
-
|
131 |
-
|
132 |
-
|
133 |
-
|
134 |
-
|
135 |
-
|
136 |
-
|
137 |
-
#
|
138 |
-
|
139 |
-
|
140 |
-
|
141 |
-
|
142 |
-
|
143 |
-
|
144 |
-
|
145 |
-
|
146 |
-
# Handle
|
147 |
-
|
148 |
-
|
149 |
-
|
150 |
-
|
151 |
-
|
152 |
-
|
153 |
-
# Update both CAPTCHA message and CAPTCHA text dynamically
|
154 |
-
verify_button.click(verify_captcha_click, inputs=captcha_input, outputs=[captcha_output, captcha_text])
|
155 |
|
156 |
return interface
|
157 |
|
|
|
9 |
import torch
|
10 |
import random
|
11 |
import string
|
12 |
+
from datetime import datetime, timedelta
|
13 |
+
from collections import defaultdict
|
14 |
|
15 |
# Set up logging
|
16 |
+
logging.basicConfig(level=logging.INFO, filename="api_access.log", format="%(asctime)s - %(message)s")
|
17 |
|
18 |
torch.set_num_threads(1)
|
19 |
torch.set_num_interop_threads(1)
|
|
|
23 |
torch.backends.cuda.matmul.allow_tf32 = False
|
24 |
torch.backends.cudnn.allow_tf32 = False
|
25 |
|
26 |
+
# Data structures to track IP access
|
27 |
+
ip_access_records = defaultdict(lambda: {
|
28 |
+
'total_access_time': timedelta(),
|
29 |
+
'blocked_until': None,
|
30 |
+
'session_start': None
|
31 |
+
})
|
32 |
+
|
33 |
+
SESSION_TIMEOUT = timedelta(hours=1)
|
34 |
+
|
35 |
+
# Function to log API access attempts
|
36 |
+
def log_api_access(successful_captcha, video_file=None, client_ip="unknown", blocked=False):
|
37 |
+
status = "Blocked" if blocked else "Allowed"
|
38 |
+
logging.info(f"Access Attempt - Status: {status}, Successful CAPTCHA: {successful_captcha}, "
|
39 |
+
f"Video File: {video_file if video_file else 'N/A'}, Client IP: {client_ip}")
|
40 |
+
|
41 |
+
# Function to get client IP address
|
42 |
+
def get_client_ip(request: gr.Request):
|
43 |
+
return request.client.host
|
44 |
+
|
45 |
+
# Function to check if IP is blocked
|
46 |
+
def is_ip_blocked(client_ip):
|
47 |
+
record = ip_access_records[client_ip]
|
48 |
+
blocked_until = record.get('blocked_until')
|
49 |
+
if blocked_until and datetime.now() < blocked_until:
|
50 |
+
return True
|
51 |
+
return False
|
52 |
+
|
53 |
+
# Function to update IP access time
|
54 |
+
def update_ip_access_time(client_ip):
|
55 |
+
record = ip_access_records[client_ip]
|
56 |
+
now = datetime.now()
|
57 |
+
session_start = record.get('session_start')
|
58 |
+
|
59 |
+
if session_start:
|
60 |
+
elapsed = now - session_start
|
61 |
+
record['total_access_time'] += elapsed
|
62 |
+
record['session_start'] = now
|
63 |
+
else:
|
64 |
+
record['session_start'] = now
|
65 |
+
|
66 |
+
# Remove access times older than 24 hours
|
67 |
+
if 'last_access_time' in record and now - record['last_access_time'] > timedelta(hours=24):
|
68 |
+
record['total_access_time'] = timedelta()
|
69 |
+
|
70 |
+
record['last_access_time'] = now
|
71 |
+
|
72 |
+
# Check if total access time in last 24 hours exceeds 1 hour
|
73 |
+
if record['total_access_time'] >= timedelta(hours=1):
|
74 |
+
# Block IP for 48 hours
|
75 |
+
record['blocked_until'] = now + timedelta(hours=48)
|
76 |
+
record['total_access_time'] = timedelta() # Reset total access time
|
77 |
+
|
78 |
# Function to truncate video
|
79 |
def truncate_video(video_file):
|
80 |
"""Truncates video to 15 seconds and saves it as a temporary file."""
|
|
|
120 |
script_path = os.path.join("./repository", script_name)
|
121 |
if not os.path.exists(script_path):
|
122 |
raise FileNotFoundError(f"Script {script_name} not found in the repository.")
|
123 |
+
|
124 |
spec = importlib.util.spec_from_file_location("backend_module", script_path)
|
125 |
backend_module = importlib.util.module_from_spec(spec)
|
126 |
spec.loader.exec_module(backend_module)
|
|
|
144 |
def verify_captcha(user_input):
|
145 |
global captcha_solution
|
146 |
if user_input.strip() == captcha_solution:
|
147 |
+
return "Captcha verified. Video analysis will proceed.", True, captcha_solution
|
148 |
else:
|
149 |
captcha_solution = generate_captcha() # Reset CAPTCHA on failure
|
150 |
return f"Incorrect CAPTCHA. Please try again.", False, captcha_solution
|
151 |
|
152 |
@spaces.GPU(duration=1000)
|
153 |
+
def analyze_video(video_file, captcha_input, request: gr.Request):
|
154 |
+
client_ip = get_client_ip(request)
|
155 |
+
|
156 |
+
# Check if IP is blocked
|
157 |
+
if is_ip_blocked(client_ip):
|
158 |
+
log_api_access(successful_captcha=False, video_file=video_file.name if video_file else "N/A", client_ip=client_ip, blocked=True)
|
159 |
+
return {"error": "Your IP has been blocked due to excessive usage. Please try again later."}
|
160 |
+
|
161 |
+
# Verify CAPTCHA
|
162 |
+
message, success, _ = verify_captcha(captcha_input)
|
163 |
+
if not success:
|
164 |
+
log_api_access(successful_captcha=False, video_file=video_file.name if video_file else "N/A", client_ip=client_ip)
|
165 |
+
return {"error": message}
|
166 |
+
|
167 |
+
# Update IP access time
|
168 |
+
update_ip_access_time(client_ip)
|
169 |
+
|
170 |
+
# Log API access attempt with successful CAPTCHA
|
171 |
+
log_api_access(successful_captcha=True, video_file=video_file.name if video_file else "N/A", client_ip=client_ip)
|
172 |
+
|
173 |
try:
|
174 |
# Video truncation and analysis
|
175 |
truncated_video = truncate_video(video_file)
|
|
|
193 |
logging.error(f"Error during analysis: {e}")
|
194 |
return {"error": "An error occurred during video analysis. Please check your input and try again."}
|
195 |
|
196 |
+
# Interface with CAPTCHA after video upload
|
197 |
def main_interface():
|
198 |
with gr.Blocks() as interface:
|
199 |
gr.Markdown("# AllMark - Deepfake Analyzer")
|
200 |
+
gr.Markdown("Upload a video to proceed with analysis.")
|
201 |
+
|
202 |
+
# Video input
|
203 |
+
video_input = gr.Video(label="Upload Video")
|
204 |
+
captcha_text = gr.Textbox(label="CAPTCHA", value=captcha_solution, interactive=False, visible=False)
|
205 |
+
captcha_input = gr.Textbox(label="Enter CAPTCHA Here", visible=False)
|
206 |
+
captcha_output = gr.Textbox(label="CAPTCHA Status", interactive=False, visible=False)
|
207 |
+
analyze_button = gr.Button("Analyze Video", visible=False)
|
208 |
+
analysis_output = gr.JSON(label="Analysis Result")
|
209 |
+
|
210 |
+
# Function to show CAPTCHA after video upload
|
211 |
+
def show_captcha(video):
|
212 |
+
if video is not None:
|
213 |
+
return gr.update(visible=True), gr.update(visible=True), gr.update(visible=True), gr.update(visible=True)
|
214 |
+
else:
|
215 |
+
return gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False)
|
216 |
+
|
217 |
+
video_input.change(show_captcha, inputs=video_input, outputs=[captcha_text, captcha_input, captcha_output, analyze_button])
|
218 |
+
|
219 |
+
# Handle analysis
|
220 |
+
analyze_button.click(
|
221 |
+
analyze_video,
|
222 |
+
inputs=[video_input, captcha_input],
|
223 |
+
outputs=analysis_output
|
224 |
+
)
|
|
|
|
|
|
|
225 |
|
226 |
return interface
|
227 |
|