AllMark / app.py
NeeravS's picture
Update app.py
2c7b51d verified
raw
history blame
10.5 kB
import os
import subprocess
import importlib.util
import gradio as gr
import logging
from moviepy.editor import VideoFileClip
import json
import spaces
import torch
import random
import string
from datetime import datetime, timedelta
from collections import defaultdict
# Set up logging
logging.basicConfig(level=logging.INFO, filename="api_access.log", format="%(asctime)s - %(message)s")
torch.set_num_threads(1)
torch.set_num_interop_threads(1)
torch.use_deterministic_algorithms(True)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False
torch.backends.cuda.matmul.allow_tf32 = False
torch.backends.cudnn.allow_tf32 = False
# Data structures to track IP addresses
ip_access_records = defaultdict(lambda: {
'total_access_time': timedelta(),
'blocked_until': None,
'session_start': None,
'last_access_time': None
})
def truncate_video(video_file):
"""Truncates video to 30 seconds and saves it as a temporary file."""
clip = VideoFileClip(video_file)
truncated_clip = clip.subclip(0, min(30, clip.duration)) # Change 15 to 30 for 30 seconds
truncated_video_file = "temp_truncated_video.mp4"
truncated_clip.write_videofile(truncated_video_file, codec="libx264", audio_codec="aac")
clip.close()
truncated_clip.close()
return truncated_video_file
'''
AUTHORIZED_API_KEY = os.getenv("HF_API_KEY")
def is_authorized(request: gr.Request):
"""Checks if the request includes the correct API key."""
api_key = request.headers.get("API-Key")
print("API-Key received:", api_key) # Log the received API key for debugging
return api_key == AUTHORIZED_API_KEY
'''
SESSION_TIMEOUT = timedelta(hours=1)
# Function to log API access attempts
def log_api_access(successful_captcha, video_file=None, client_ip="unknown", blocked=False):
status = "Blocked" if blocked else "Allowed"
logging.info(f"Access Attempt - Status: {status}, Successful CAPTCHA: {successful_captcha}, "
f"Video File: {video_file if video_file else 'N/A'}, Client IP: {client_ip}")
# Function to get client IP address
def get_client_ip(request: gr.Request):
return request.client.host
# Function to check if IP is blocked
def is_ip_blocked(client_ip):
record = ip_access_records[client_ip]
blocked_until = record.get('blocked_until')
if blocked_until and datetime.now() < blocked_until:
return True
return False
# Function to update IP access time
def update_ip_access_time(client_ip):
record = ip_access_records[client_ip]
now = datetime.now()
session_start = record.get('session_start')
if session_start:
elapsed = now - session_start
record['total_access_time'] += elapsed
record['session_start'] = now
else:
record['session_start'] = now
# Remove access times older than 24 hours
last_access_time = record.get('last_access_time')
if last_access_time and now - last_access_time > timedelta(hours=24):
record['total_access_time'] = timedelta()
record['last_access_time'] = now
# Check if total access time in last 24 hours exceeds 1 hour
if record['total_access_time'] >= timedelta(hours=1):
# Block IP for 48 hours
record['blocked_until'] = now + timedelta(hours=48)
record['total_access_time'] = timedelta() # Reset total access time
# Function to truncate video
def truncate_video(video_file):
"""Truncates video to 15 seconds and saves it as a temporary file."""
clip = VideoFileClip(video_file)
truncated_clip = clip.subclip(0, min(15, clip.duration))
truncated_video_file = "temp_truncated_video.mp4"
truncated_clip.write_videofile(truncated_video_file, codec="libx264", audio_codec="aac")
clip.close()
truncated_clip.close()
return truncated_video_file
# Clone repository
def clone_repo():
"""Clone the GitHub repository containing the backend."""
repo_url = "https://github.com/NeeravSood/AllMark-MVP.git"
repo_path = "./repository"
github_pat = os.getenv("GITHUB_PAT")
if not github_pat:
raise RuntimeError("GitHub Personal Access Token (GITHUB_PAT) not found in environment variables.")
authenticated_repo_url = f"https://{github_pat}@github.com/NeeravSood/AllMark-MVP.git"
if os.path.exists(repo_path):
print("Repository already cloned.")
else:
try:
subprocess.run(
["git", "clone", authenticated_repo_url, repo_path],
check=True,
text=True,
capture_output=True
)
print("Repository cloned successfully.")
except subprocess.CalledProcessError as e:
print("Output:", e.stdout)
print("Error:", e.stderr)
raise RuntimeError(f"Failed to clone repository: {e.stderr}")
# Import backend script
def import_backend_script(script_name):
"""Dynamically import the backend script."""
try:
script_path = os.path.join("./repository", script_name)
if not os.path.exists(script_path):
raise FileNotFoundError(f"Script {script_name} not found in the repository.")
spec = importlib.util.spec_from_file_location("backend_module", script_path)
backend_module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(backend_module)
return backend_module
except Exception as e:
logging.error(f"Error importing backend script: {str(e)}")
raise RuntimeError(f"Failed to import backend script: {str(e)}")
# Clone and import repository
clone_repo()
backend = import_backend_script("app.py")
analyzer = backend.DeepfakeAnalyzer()
# Generate a random CAPTCHA challenge
def generate_captcha():
return ''.join(random.choices(string.ascii_uppercase + string.digits, k=5))
# Function to verify CAPTCHA per user session
def verify_captcha(user_input, captcha_solution):
if user_input.strip() == captcha_solution:
return "Captcha verified. Video analysis will proceed.", True
else:
return "Incorrect CAPTCHA. Please try again.", False
@spaces.GPU(duration=1000)
def analyze_video(video_file, captcha_input, captcha_solution, request: gr.Request):
# Authorization check
#if not is_authorized(request):
# return {"error": "Unauthorized access. Please provide a valid API key."}
client_ip = get_client_ip(request)
# Check if IP is blocked
if is_ip_blocked(client_ip):
log_api_access(successful_captcha=False, video_file=os.path.basename(video_file) if video_file else "N/A", client_ip=client_ip, blocked=True)
return {"error": "Your IP has been blocked due to excessive usage. Please try again later."}
# Verify CAPTCHA
message, success = verify_captcha(captcha_input, captcha_solution)
if not success:
log_api_access(successful_captcha=False, video_file=os.path.basename(video_file) if video_file else "N/A", client_ip=client_ip)
return {"error": message}
# Update IP access time
update_ip_access_time(client_ip)
# Log API access attempt with successful CAPTCHA
log_api_access(successful_captcha=True, video_file=os.path.basename(video_file) if video_file else "N/A", client_ip=client_ip)
try:
# Video truncation and analysis
truncated_video = truncate_video(video_file)
results = analyzer.analyze_media(truncated_video)
# Process combined assessment
combined_assessment = results.get('combined_assessment', 0)
if isinstance(combined_assessment, str) and combined_assessment.lower() == "deepfake":
analysis_result = "a deepfake"
else:
combined_assessment = int(combined_assessment) if str(combined_assessment).isdigit() else 0
analysis_result = "genuine/original" if combined_assessment < 50 else "a deepfake"
output = {
"message": f"According to our analysis, the video you uploaded appears to be {analysis_result}. "
f"Made by Neerav Sood (LinkedIn - https://www.linkedin.com/in/neeravsood/). "
}
return output
except Exception as e:
logging.error(f"Error during analysis: {e}")
return {"error": "An error occurred during video analysis. Please check your input and try again."}
# Interface with CAPTCHA after video upload
def main_interface():
with gr.Blocks() as interface:
gr.Markdown("# AllMark - Deepfake Analyzer")
gr.Markdown("Upload a video to proceed with analysis.")
# State variables for per-session data
captcha_solution = gr.State()
# Video input with display size restriction
video_input = gr.Video(label="Upload Video", height=300) # Adjust height as needed
captcha_text = gr.Textbox(label="CAPTCHA", interactive=False, visible=False)
captcha_input = gr.Textbox(label="Enter CAPTCHA Here", visible=False)
captcha_output = gr.Textbox(label="CAPTCHA Status", interactive=False, visible=False)
analyze_button = gr.Button("Analyze Video", visible=False)
analysis_output = gr.JSON(label="Analysis Result")
# Function to show CAPTCHA after video upload
def show_captcha(video):
if video is not None:
# Generate a new CAPTCHA for this session
new_captcha = generate_captcha()
return (
gr.update(visible=True, value=new_captcha),
gr.update(visible=True),
gr.update(visible=True),
gr.update(visible=True),
new_captcha # Update the session state
)
else:
return (
gr.update(visible=False),
gr.update(visible=False),
gr.update(visible=False),
gr.update(visible=False),
gr.no_update
)
video_input.change(
show_captcha,
inputs=video_input,
outputs=[captcha_text, captcha_input, captcha_output, analyze_button, captcha_solution]
)
# Handle analysis
analyze_button.click(
analyze_video,
inputs=[video_input, captcha_input, captcha_solution],
outputs=analysis_output
)
return interface
# Launch interface
if __name__ == "__main__":
main_interface().launch()