NeeravS commited on
Commit
8cb4ac6
·
verified ·
1 Parent(s): 2c7b51d

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +19 -180
app.py CHANGED
@@ -7,13 +7,6 @@ from moviepy.editor import VideoFileClip
7
  import json
8
  import spaces
9
  import torch
10
- import random
11
- import string
12
- from datetime import datetime, timedelta
13
- from collections import defaultdict
14
-
15
- # Set up logging
16
- logging.basicConfig(level=logging.INFO, filename="api_access.log", format="%(asctime)s - %(message)s")
17
 
18
  torch.set_num_threads(1)
19
  torch.set_num_interop_threads(1)
@@ -23,95 +16,19 @@ torch.backends.cudnn.benchmark = False
23
  torch.backends.cuda.matmul.allow_tf32 = False
24
  torch.backends.cudnn.allow_tf32 = False
25
 
26
- # Data structures to track IP addresses
27
- ip_access_records = defaultdict(lambda: {
28
- 'total_access_time': timedelta(),
29
- 'blocked_until': None,
30
- 'session_start': None,
31
- 'last_access_time': None
32
- })
33
 
34
- def truncate_video(video_file):
35
- """Truncates video to 30 seconds and saves it as a temporary file."""
36
- clip = VideoFileClip(video_file)
37
- truncated_clip = clip.subclip(0, min(30, clip.duration)) # Change 15 to 30 for 30 seconds
38
- truncated_video_file = "temp_truncated_video.mp4"
39
- truncated_clip.write_videofile(truncated_video_file, codec="libx264", audio_codec="aac")
40
- clip.close()
41
- truncated_clip.close()
42
- return truncated_video_file
43
-
44
- '''
45
- AUTHORIZED_API_KEY = os.getenv("HF_API_KEY")
46
-
47
- def is_authorized(request: gr.Request):
48
- """Checks if the request includes the correct API key."""
49
- api_key = request.headers.get("API-Key")
50
- print("API-Key received:", api_key) # Log the received API key for debugging
51
- return api_key == AUTHORIZED_API_KEY
52
- '''
53
-
54
- SESSION_TIMEOUT = timedelta(hours=1)
55
-
56
- # Function to log API access attempts
57
- def log_api_access(successful_captcha, video_file=None, client_ip="unknown", blocked=False):
58
- status = "Blocked" if blocked else "Allowed"
59
- logging.info(f"Access Attempt - Status: {status}, Successful CAPTCHA: {successful_captcha}, "
60
- f"Video File: {video_file if video_file else 'N/A'}, Client IP: {client_ip}")
61
-
62
- # Function to get client IP address
63
- def get_client_ip(request: gr.Request):
64
- return request.client.host
65
-
66
- # Function to check if IP is blocked
67
- def is_ip_blocked(client_ip):
68
- record = ip_access_records[client_ip]
69
- blocked_until = record.get('blocked_until')
70
- if blocked_until and datetime.now() < blocked_until:
71
- return True
72
- return False
73
-
74
- # Function to update IP access time
75
- def update_ip_access_time(client_ip):
76
- record = ip_access_records[client_ip]
77
- now = datetime.now()
78
- session_start = record.get('session_start')
79
-
80
- if session_start:
81
- elapsed = now - session_start
82
- record['total_access_time'] += elapsed
83
- record['session_start'] = now
84
- else:
85
- record['session_start'] = now
86
-
87
- # Remove access times older than 24 hours
88
- last_access_time = record.get('last_access_time')
89
- if last_access_time and now - last_access_time > timedelta(hours=24):
90
- record['total_access_time'] = timedelta()
91
-
92
- record['last_access_time'] = now
93
-
94
- # Check if total access time in last 24 hours exceeds 1 hour
95
- if record['total_access_time'] >= timedelta(hours=1):
96
- # Block IP for 48 hours
97
- record['blocked_until'] = now + timedelta(hours=48)
98
- record['total_access_time'] = timedelta() # Reset total access time
99
-
100
- # Function to truncate video
101
  def truncate_video(video_file):
102
  """Truncates video to 15 seconds and saves it as a temporary file."""
103
  clip = VideoFileClip(video_file)
104
  truncated_clip = clip.subclip(0, min(15, clip.duration))
105
  truncated_video_file = "temp_truncated_video.mp4"
106
  truncated_clip.write_videofile(truncated_video_file, codec="libx264", audio_codec="aac")
107
- clip.close()
108
- truncated_clip.close()
109
  return truncated_video_file
110
 
111
- # Clone repository
112
  def clone_repo():
113
  """Clone the GitHub repository containing the backend."""
114
- repo_url = "https://github.com/NeeravSood/AllMark-MVP.git"
115
  repo_path = "./repository"
116
 
117
  github_pat = os.getenv("GITHUB_PAT")
@@ -135,14 +52,13 @@ def clone_repo():
135
  print("Error:", e.stderr)
136
  raise RuntimeError(f"Failed to clone repository: {e.stderr}")
137
 
138
- # Import backend script
139
  def import_backend_script(script_name):
140
  """Dynamically import the backend script."""
141
  try:
142
  script_path = os.path.join("./repository", script_name)
143
  if not os.path.exists(script_path):
144
  raise FileNotFoundError(f"Script {script_name} not found in the repository.")
145
-
146
  spec = importlib.util.spec_from_file_location("backend_module", script_path)
147
  backend_module = importlib.util.module_from_spec(spec)
148
  spec.loader.exec_module(backend_module)
@@ -151,63 +67,31 @@ def import_backend_script(script_name):
151
  logging.error(f"Error importing backend script: {str(e)}")
152
  raise RuntimeError(f"Failed to import backend script: {str(e)}")
153
 
154
- # Clone and import repository
155
  clone_repo()
156
- backend = import_backend_script("app.py")
157
  analyzer = backend.DeepfakeAnalyzer()
158
 
159
- # Generate a random CAPTCHA challenge
160
- def generate_captcha():
161
- return ''.join(random.choices(string.ascii_uppercase + string.digits, k=5))
162
-
163
- # Function to verify CAPTCHA per user session
164
- def verify_captcha(user_input, captcha_solution):
165
- if user_input.strip() == captcha_solution:
166
- return "Captcha verified. Video analysis will proceed.", True
167
- else:
168
- return "Incorrect CAPTCHA. Please try again.", False
169
-
170
  @spaces.GPU(duration=1000)
171
- def analyze_video(video_file, captcha_input, captcha_solution, request: gr.Request):
172
- # Authorization check
173
- #if not is_authorized(request):
174
- # return {"error": "Unauthorized access. Please provide a valid API key."}
175
-
176
- client_ip = get_client_ip(request)
177
-
178
- # Check if IP is blocked
179
- if is_ip_blocked(client_ip):
180
- log_api_access(successful_captcha=False, video_file=os.path.basename(video_file) if video_file else "N/A", client_ip=client_ip, blocked=True)
181
- return {"error": "Your IP has been blocked due to excessive usage. Please try again later."}
182
-
183
- # Verify CAPTCHA
184
- message, success = verify_captcha(captcha_input, captcha_solution)
185
- if not success:
186
- log_api_access(successful_captcha=False, video_file=os.path.basename(video_file) if video_file else "N/A", client_ip=client_ip)
187
- return {"error": message}
188
-
189
- # Update IP access time
190
- update_ip_access_time(client_ip)
191
-
192
- # Log API access attempt with successful CAPTCHA
193
- log_api_access(successful_captcha=True, video_file=os.path.basename(video_file) if video_file else "N/A", client_ip=client_ip)
194
 
195
  try:
196
- # Video truncation and analysis
197
  truncated_video = truncate_video(video_file)
198
  results = analyzer.analyze_media(truncated_video)
199
 
200
- # Process combined assessment
201
  combined_assessment = results.get('combined_assessment', 0)
202
  if isinstance(combined_assessment, str) and combined_assessment.lower() == "deepfake":
203
  analysis_result = "a deepfake"
204
  else:
205
  combined_assessment = int(combined_assessment) if str(combined_assessment).isdigit() else 0
206
  analysis_result = "genuine/original" if combined_assessment < 50 else "a deepfake"
207
-
208
  output = {
209
  "message": f"According to our analysis, the video you uploaded appears to be {analysis_result}. "
210
- f"Made by Neerav Sood (LinkedIn - https://www.linkedin.com/in/neeravsood/). "
211
  }
212
  return output
213
 
@@ -215,59 +99,14 @@ def analyze_video(video_file, captcha_input, captcha_solution, request: gr.Reque
215
  logging.error(f"Error during analysis: {e}")
216
  return {"error": "An error occurred during video analysis. Please check your input and try again."}
217
 
218
- # Interface with CAPTCHA after video upload
219
- def main_interface():
220
- with gr.Blocks() as interface:
221
- gr.Markdown("# AllMark - Deepfake Analyzer")
222
- gr.Markdown("Upload a video to proceed with analysis.")
223
-
224
- # State variables for per-session data
225
- captcha_solution = gr.State()
226
-
227
- # Video input with display size restriction
228
- video_input = gr.Video(label="Upload Video", height=300) # Adjust height as needed
229
- captcha_text = gr.Textbox(label="CAPTCHA", interactive=False, visible=False)
230
- captcha_input = gr.Textbox(label="Enter CAPTCHA Here", visible=False)
231
- captcha_output = gr.Textbox(label="CAPTCHA Status", interactive=False, visible=False)
232
- analyze_button = gr.Button("Analyze Video", visible=False)
233
- analysis_output = gr.JSON(label="Analysis Result")
234
-
235
- # Function to show CAPTCHA after video upload
236
- def show_captcha(video):
237
- if video is not None:
238
- # Generate a new CAPTCHA for this session
239
- new_captcha = generate_captcha()
240
- return (
241
- gr.update(visible=True, value=new_captcha),
242
- gr.update(visible=True),
243
- gr.update(visible=True),
244
- gr.update(visible=True),
245
- new_captcha # Update the session state
246
- )
247
- else:
248
- return (
249
- gr.update(visible=False),
250
- gr.update(visible=False),
251
- gr.update(visible=False),
252
- gr.update(visible=False),
253
- gr.no_update
254
- )
255
-
256
- video_input.change(
257
- show_captcha,
258
- inputs=video_input,
259
- outputs=[captcha_text, captcha_input, captcha_output, analyze_button, captcha_solution]
260
- )
261
-
262
- # Handle analysis
263
- analyze_button.click(
264
- analyze_video,
265
- inputs=[video_input, captcha_input, captcha_solution],
266
- outputs=analysis_output
267
- )
268
 
269
- return interface
 
 
 
 
 
 
270
 
271
- # Launch interface
272
  if __name__ == "__main__":
273
- main_interface().launch()
 
7
  import json
8
  import spaces
9
  import torch
 
 
 
 
 
 
 
10
 
11
  torch.set_num_threads(1)
12
  torch.set_num_interop_threads(1)
 
16
  torch.backends.cuda.matmul.allow_tf32 = False
17
  torch.backends.cudnn.allow_tf32 = False
18
 
19
+ ACCESS_KEY = os.getenv("ACCESS_KEY")
 
 
 
 
 
 
20
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
21
  def truncate_video(video_file):
22
  """Truncates video to 15 seconds and saves it as a temporary file."""
23
  clip = VideoFileClip(video_file)
24
  truncated_clip = clip.subclip(0, min(15, clip.duration))
25
  truncated_video_file = "temp_truncated_video.mp4"
26
  truncated_clip.write_videofile(truncated_video_file, codec="libx264", audio_codec="aac")
 
 
27
  return truncated_video_file
28
 
 
29
  def clone_repo():
30
  """Clone the GitHub repository containing the backend."""
31
+ repo_url = "https://github.com/NeeravSood/AllMark-MVP.git" # Update when changing
32
  repo_path = "./repository"
33
 
34
  github_pat = os.getenv("GITHUB_PAT")
 
52
  print("Error:", e.stderr)
53
  raise RuntimeError(f"Failed to clone repository: {e.stderr}")
54
 
 
55
  def import_backend_script(script_name):
56
  """Dynamically import the backend script."""
57
  try:
58
  script_path = os.path.join("./repository", script_name)
59
  if not os.path.exists(script_path):
60
  raise FileNotFoundError(f"Script {script_name} not found in the repository.")
61
+
62
  spec = importlib.util.spec_from_file_location("backend_module", script_path)
63
  backend_module = importlib.util.module_from_spec(spec)
64
  spec.loader.exec_module(backend_module)
 
67
  logging.error(f"Error importing backend script: {str(e)}")
68
  raise RuntimeError(f"Failed to import backend script: {str(e)}")
69
 
 
70
  clone_repo()
71
+ backend = import_backend_script("app.py")
72
  analyzer = backend.DeepfakeAnalyzer()
73
 
 
 
 
 
 
 
 
 
 
 
 
74
  @spaces.GPU(duration=1000)
75
+ def analyze_video(video_file):
76
+ if access_key != ACCESS_KEY:
77
+ logging.error("Invalid access key provided.")
78
+ return {"error": "Unauthorized access. Invalid access key."}
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
79
 
80
  try:
 
81
  truncated_video = truncate_video(video_file)
82
  results = analyzer.analyze_media(truncated_video)
83
 
84
+ # Get combined_assessment and handle non-numeric values
85
  combined_assessment = results.get('combined_assessment', 0)
86
  if isinstance(combined_assessment, str) and combined_assessment.lower() == "deepfake":
87
  analysis_result = "a deepfake"
88
  else:
89
  combined_assessment = int(combined_assessment) if str(combined_assessment).isdigit() else 0
90
  analysis_result = "genuine/original" if combined_assessment < 50 else "a deepfake"
91
+
92
  output = {
93
  "message": f"According to our analysis, the video you uploaded appears to be {analysis_result}. "
94
+ f"{len(results['video_analysis']['frame_results'])} frames were analyzed in total."
95
  }
96
  return output
97
 
 
99
  logging.error(f"Error during analysis: {e}")
100
  return {"error": "An error occurred during video analysis. Please check your input and try again."}
101
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
102
 
103
+ interface = gr.Interface(
104
+ fn=analyze_video,
105
+ inputs=gr.Video(label="Upload Video"),
106
+ outputs="json",
107
+ title="AllMark - Deepfake Analyzer",
108
+ description="Upload a video to analyze. N.B. - Only mp4 files. Processing time 1-10 minutes. For any false negatives, please contact the publisher for verification. Incognito Mode Recommended"
109
+ )
110
 
 
111
  if __name__ == "__main__":
112
+ interface.launch()