File size: 10,460 Bytes
3a3c2be
2dc47c6
 
cb82c24
2dc47c6
589bf82
514b638
c975f81
0ef1091
6853273
 
9042fbb
 
6853273
 
9042fbb
589bf82
5e8743b
 
dae402a
 
 
 
 
 
97eb128
9042fbb
 
 
97eb128
 
9042fbb
 
2c7b51d
 
 
 
 
 
 
 
 
 
b23d160
f7b5887
 
 
 
 
c2422af
f7b5887
b23d160
 
9042fbb
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
97eb128
 
9042fbb
 
 
 
 
 
 
 
 
 
6853273
589bf82
 
 
 
 
 
6853273
 
589bf82
3a3c2be
6853273
2dc47c6
 
6853273
2dc47c6
3a3c2be
2dc47c6
 
 
 
cb82c24
2dc47c6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
6853273
2dc47c6
 
 
 
 
 
9042fbb
2dc47c6
 
 
 
 
 
 
 
6853273
2dc47c6
6853273
514b638
1ae13cc
6853273
 
 
 
97eb128
 
6853273
97eb128
6853273
97eb128
6853273
37b013a
97eb128
b2ebbba
e53ae5a
 
b2ebbba
9042fbb
 
 
 
97eb128
9042fbb
 
 
97eb128
9042fbb
97eb128
9042fbb
 
 
 
 
 
97eb128
9042fbb
7b57752
6853273
7b57752
 
564bb7d
6853273
f755699
 
 
 
 
 
6853273
1ae13cc
b2619fa
5aa6ebe
1ae13cc
 
7b57752
 
 
1ae13cc
b2619fa
9042fbb
6853273
 
 
9042fbb
 
97eb128
 
 
 
 
 
9042fbb
 
 
 
 
 
 
 
97eb128
 
 
 
 
 
 
 
 
9042fbb
97eb128
 
 
 
 
 
 
 
 
 
 
 
 
9042fbb
 
 
 
97eb128
9042fbb
 
564bb7d
6853273
cb82c24
6853273
cb82c24
6853273
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
import os
import subprocess
import importlib.util
import gradio as gr
import logging
from moviepy.editor import VideoFileClip
import json
import spaces
import torch
import random
import string
from datetime import datetime, timedelta
from collections import defaultdict

# Set up logging
logging.basicConfig(level=logging.INFO, filename="api_access.log", format="%(asctime)s - %(message)s")

torch.set_num_threads(1)
torch.set_num_interop_threads(1)
torch.use_deterministic_algorithms(True)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False
torch.backends.cuda.matmul.allow_tf32 = False
torch.backends.cudnn.allow_tf32 = False

# Data structures to track IP addresses
ip_access_records = defaultdict(lambda: {
    'total_access_time': timedelta(),
    'blocked_until': None,
    'session_start': None,
    'last_access_time': None
})

def truncate_video(video_file):
    """Truncates video to 30 seconds and saves it as a temporary file."""
    clip = VideoFileClip(video_file)
    truncated_clip = clip.subclip(0, min(30, clip.duration))  # Change 15 to 30 for 30 seconds
    truncated_video_file = "temp_truncated_video.mp4"
    truncated_clip.write_videofile(truncated_video_file, codec="libx264", audio_codec="aac")
    clip.close()
    truncated_clip.close()
    return truncated_video_file
    
'''
AUTHORIZED_API_KEY = os.getenv("HF_API_KEY")

def is_authorized(request: gr.Request):
    """Checks if the request includes the correct API key."""
    api_key = request.headers.get("API-Key")
    print("API-Key received:", api_key)  # Log the received API key for debugging
    return api_key == AUTHORIZED_API_KEY
'''

SESSION_TIMEOUT = timedelta(hours=1)

# Function to log API access attempts
def log_api_access(successful_captcha, video_file=None, client_ip="unknown", blocked=False):
    status = "Blocked" if blocked else "Allowed"
    logging.info(f"Access Attempt - Status: {status}, Successful CAPTCHA: {successful_captcha}, "
                 f"Video File: {video_file if video_file else 'N/A'}, Client IP: {client_ip}")

# Function to get client IP address
def get_client_ip(request: gr.Request):
    return request.client.host

# Function to check if IP is blocked
def is_ip_blocked(client_ip):
    record = ip_access_records[client_ip]
    blocked_until = record.get('blocked_until')
    if blocked_until and datetime.now() < blocked_until:
        return True
    return False

# Function to update IP access time
def update_ip_access_time(client_ip):
    record = ip_access_records[client_ip]
    now = datetime.now()
    session_start = record.get('session_start')

    if session_start:
        elapsed = now - session_start
        record['total_access_time'] += elapsed
        record['session_start'] = now
    else:
        record['session_start'] = now

    # Remove access times older than 24 hours
    last_access_time = record.get('last_access_time')
    if last_access_time and now - last_access_time > timedelta(hours=24):
        record['total_access_time'] = timedelta()

    record['last_access_time'] = now

    # Check if total access time in last 24 hours exceeds 1 hour
    if record['total_access_time'] >= timedelta(hours=1):
        # Block IP for 48 hours
        record['blocked_until'] = now + timedelta(hours=48)
        record['total_access_time'] = timedelta()  # Reset total access time

# Function to truncate video
def truncate_video(video_file):
    """Truncates video to 15 seconds and saves it as a temporary file."""
    clip = VideoFileClip(video_file)
    truncated_clip = clip.subclip(0, min(15, clip.duration))
    truncated_video_file = "temp_truncated_video.mp4"
    truncated_clip.write_videofile(truncated_video_file, codec="libx264", audio_codec="aac")
    clip.close()
    truncated_clip.close()
    return truncated_video_file

# Clone repository
def clone_repo():
    """Clone the GitHub repository containing the backend."""
    repo_url = "https://github.com/NeeravSood/AllMark-MVP.git"
    repo_path = "./repository"

    github_pat = os.getenv("GITHUB_PAT")
    if not github_pat:
        raise RuntimeError("GitHub Personal Access Token (GITHUB_PAT) not found in environment variables.")
    authenticated_repo_url = f"https://{github_pat}@github.com/NeeravSood/AllMark-MVP.git"

    if os.path.exists(repo_path):
        print("Repository already cloned.")
    else:
        try:
            subprocess.run(
                ["git", "clone", authenticated_repo_url, repo_path],
                check=True,
                text=True,
                capture_output=True
            )
            print("Repository cloned successfully.")
        except subprocess.CalledProcessError as e:
            print("Output:", e.stdout)
            print("Error:", e.stderr)
            raise RuntimeError(f"Failed to clone repository: {e.stderr}")

# Import backend script
def import_backend_script(script_name):
    """Dynamically import the backend script."""
    try:
        script_path = os.path.join("./repository", script_name)
        if not os.path.exists(script_path):
            raise FileNotFoundError(f"Script {script_name} not found in the repository.")

        spec = importlib.util.spec_from_file_location("backend_module", script_path)
        backend_module = importlib.util.module_from_spec(spec)
        spec.loader.exec_module(backend_module)
        return backend_module
    except Exception as e:
        logging.error(f"Error importing backend script: {str(e)}")
        raise RuntimeError(f"Failed to import backend script: {str(e)}")

# Clone and import repository
clone_repo()
backend = import_backend_script("app.py")
analyzer = backend.DeepfakeAnalyzer()

# Generate a random CAPTCHA challenge
def generate_captcha():
    return ''.join(random.choices(string.ascii_uppercase + string.digits, k=5))

# Function to verify CAPTCHA per user session
def verify_captcha(user_input, captcha_solution):
    if user_input.strip() == captcha_solution:
        return "Captcha verified. Video analysis will proceed.", True
    else:
        return "Incorrect CAPTCHA. Please try again.", False

@spaces.GPU(duration=1000)
def analyze_video(video_file, captcha_input, captcha_solution, request: gr.Request):
    # Authorization check
    #if not is_authorized(request):
     #   return {"error": "Unauthorized access. Please provide a valid API key."}

    client_ip = get_client_ip(request)

    # Check if IP is blocked
    if is_ip_blocked(client_ip):
        log_api_access(successful_captcha=False, video_file=os.path.basename(video_file) if video_file else "N/A", client_ip=client_ip, blocked=True)
        return {"error": "Your IP has been blocked due to excessive usage. Please try again later."}

    # Verify CAPTCHA
    message, success = verify_captcha(captcha_input, captcha_solution)
    if not success:
        log_api_access(successful_captcha=False, video_file=os.path.basename(video_file) if video_file else "N/A", client_ip=client_ip)
        return {"error": message}

    # Update IP access time
    update_ip_access_time(client_ip)

    # Log API access attempt with successful CAPTCHA
    log_api_access(successful_captcha=True, video_file=os.path.basename(video_file) if video_file else "N/A", client_ip=client_ip)

    try:
        # Video truncation and analysis
        truncated_video = truncate_video(video_file)
        results = analyzer.analyze_media(truncated_video)

        # Process combined assessment
        combined_assessment = results.get('combined_assessment', 0)
        if isinstance(combined_assessment, str) and combined_assessment.lower() == "deepfake":
            analysis_result = "a deepfake"
        else:
            combined_assessment = int(combined_assessment) if str(combined_assessment).isdigit() else 0
            analysis_result = "genuine/original" if combined_assessment < 50 else "a deepfake"

        output = {
            "message": f"According to our analysis, the video you uploaded appears to be {analysis_result}. "
                       f"Made by Neerav Sood (LinkedIn - https://www.linkedin.com/in/neeravsood/). "
        }
        return output

    except Exception as e:
        logging.error(f"Error during analysis: {e}")
        return {"error": "An error occurred during video analysis. Please check your input and try again."}

# Interface with CAPTCHA after video upload
def main_interface():
    with gr.Blocks() as interface:
        gr.Markdown("# AllMark - Deepfake Analyzer")
        gr.Markdown("Upload a video to proceed with analysis.")

        # State variables for per-session data
        captcha_solution = gr.State()

        # Video input with display size restriction
        video_input = gr.Video(label="Upload Video", height=300)  # Adjust height as needed
        captcha_text = gr.Textbox(label="CAPTCHA", interactive=False, visible=False)
        captcha_input = gr.Textbox(label="Enter CAPTCHA Here", visible=False)
        captcha_output = gr.Textbox(label="CAPTCHA Status", interactive=False, visible=False)
        analyze_button = gr.Button("Analyze Video", visible=False)
        analysis_output = gr.JSON(label="Analysis Result")

        # Function to show CAPTCHA after video upload
        def show_captcha(video):
            if video is not None:
                # Generate a new CAPTCHA for this session
                new_captcha = generate_captcha()
                return (
                    gr.update(visible=True, value=new_captcha),
                    gr.update(visible=True),
                    gr.update(visible=True),
                    gr.update(visible=True),
                    new_captcha  # Update the session state
                )
            else:
                return (
                    gr.update(visible=False),
                    gr.update(visible=False),
                    gr.update(visible=False),
                    gr.update(visible=False),
                    gr.no_update
                )

        video_input.change(
            show_captcha,
            inputs=video_input,
            outputs=[captcha_text, captcha_input, captcha_output, analyze_button, captcha_solution]
        )

        # Handle analysis
        analyze_button.click(
            analyze_video,
            inputs=[video_input, captcha_input, captcha_solution],
            outputs=analysis_output
        )

    return interface

# Launch interface
if __name__ == "__main__":
    main_interface().launch()