Spaces:
Runtime error
Runtime error
import os | |
import subprocess | |
import importlib.util | |
import gradio as gr | |
import logging | |
from moviepy.editor import VideoFileClip | |
import json | |
import spaces | |
import torch | |
import random | |
import string | |
from datetime import datetime, timedelta | |
from collections import defaultdict | |
# Set up logging | |
logging.basicConfig(level=logging.INFO, filename="api_access.log", format="%(asctime)s - %(message)s") | |
torch.set_num_threads(1) | |
torch.set_num_interop_threads(1) | |
torch.use_deterministic_algorithms(True) | |
torch.backends.cudnn.deterministic = True | |
torch.backends.cudnn.benchmark = False | |
torch.backends.cuda.matmul.allow_tf32 = False | |
torch.backends.cudnn.allow_tf32 = False | |
# Data structures to track IP addresses | |
ip_access_records = defaultdict(lambda: { | |
'total_access_time': timedelta(), | |
'blocked_until': None, | |
'session_start': None, | |
'last_access_time': None | |
}) | |
def truncate_video(video_file): | |
"""Truncates video to 30 seconds and saves it as a temporary file.""" | |
clip = VideoFileClip(video_file) | |
truncated_clip = clip.subclip(0, min(30, clip.duration)) # Change 15 to 30 for 30 seconds | |
truncated_video_file = "temp_truncated_video.mp4" | |
truncated_clip.write_videofile(truncated_video_file, codec="libx264", audio_codec="aac") | |
clip.close() | |
truncated_clip.close() | |
return truncated_video_file | |
''' | |
AUTHORIZED_API_KEY = os.getenv("HF_API_KEY") | |
def is_authorized(request: gr.Request): | |
"""Checks if the request includes the correct API key.""" | |
api_key = request.headers.get("API-Key") | |
print("API-Key received:", api_key) # Log the received API key for debugging | |
return api_key == AUTHORIZED_API_KEY | |
''' | |
SESSION_TIMEOUT = timedelta(hours=1) | |
# Function to log API access attempts | |
def log_api_access(successful_captcha, video_file=None, client_ip="unknown", blocked=False): | |
status = "Blocked" if blocked else "Allowed" | |
logging.info(f"Access Attempt - Status: {status}, Successful CAPTCHA: {successful_captcha}, " | |
f"Video File: {video_file if video_file else 'N/A'}, Client IP: {client_ip}") | |
# Function to get client IP address | |
def get_client_ip(request: gr.Request): | |
return request.client.host | |
# Function to check if IP is blocked | |
def is_ip_blocked(client_ip): | |
record = ip_access_records[client_ip] | |
blocked_until = record.get('blocked_until') | |
if blocked_until and datetime.now() < blocked_until: | |
return True | |
return False | |
# Function to update IP access time | |
def update_ip_access_time(client_ip): | |
record = ip_access_records[client_ip] | |
now = datetime.now() | |
session_start = record.get('session_start') | |
if session_start: | |
elapsed = now - session_start | |
record['total_access_time'] += elapsed | |
record['session_start'] = now | |
else: | |
record['session_start'] = now | |
# Remove access times older than 24 hours | |
last_access_time = record.get('last_access_time') | |
if last_access_time and now - last_access_time > timedelta(hours=24): | |
record['total_access_time'] = timedelta() | |
record['last_access_time'] = now | |
# Check if total access time in last 24 hours exceeds 1 hour | |
if record['total_access_time'] >= timedelta(hours=1): | |
# Block IP for 48 hours | |
record['blocked_until'] = now + timedelta(hours=48) | |
record['total_access_time'] = timedelta() # Reset total access time | |
# Function to truncate video | |
def truncate_video(video_file): | |
"""Truncates video to 15 seconds and saves it as a temporary file.""" | |
clip = VideoFileClip(video_file) | |
truncated_clip = clip.subclip(0, min(15, clip.duration)) | |
truncated_video_file = "temp_truncated_video.mp4" | |
truncated_clip.write_videofile(truncated_video_file, codec="libx264", audio_codec="aac") | |
clip.close() | |
truncated_clip.close() | |
return truncated_video_file | |
# Clone repository | |
def clone_repo(): | |
"""Clone the GitHub repository containing the backend.""" | |
repo_url = "https://github.com/NeeravSood/AllMark-MVP.git" | |
repo_path = "./repository" | |
github_pat = os.getenv("GITHUB_PAT") | |
if not github_pat: | |
raise RuntimeError("GitHub Personal Access Token (GITHUB_PAT) not found in environment variables.") | |
authenticated_repo_url = f"https://{github_pat}@github.com/NeeravSood/AllMark-MVP.git" | |
if os.path.exists(repo_path): | |
print("Repository already cloned.") | |
else: | |
try: | |
subprocess.run( | |
["git", "clone", authenticated_repo_url, repo_path], | |
check=True, | |
text=True, | |
capture_output=True | |
) | |
print("Repository cloned successfully.") | |
except subprocess.CalledProcessError as e: | |
print("Output:", e.stdout) | |
print("Error:", e.stderr) | |
raise RuntimeError(f"Failed to clone repository: {e.stderr}") | |
# Import backend script | |
def import_backend_script(script_name): | |
"""Dynamically import the backend script.""" | |
try: | |
script_path = os.path.join("./repository", script_name) | |
if not os.path.exists(script_path): | |
raise FileNotFoundError(f"Script {script_name} not found in the repository.") | |
spec = importlib.util.spec_from_file_location("backend_module", script_path) | |
backend_module = importlib.util.module_from_spec(spec) | |
spec.loader.exec_module(backend_module) | |
return backend_module | |
except Exception as e: | |
logging.error(f"Error importing backend script: {str(e)}") | |
raise RuntimeError(f"Failed to import backend script: {str(e)}") | |
# Clone and import repository | |
clone_repo() | |
backend = import_backend_script("app.py") | |
analyzer = backend.DeepfakeAnalyzer() | |
# Generate a random CAPTCHA challenge | |
def generate_captcha(): | |
return ''.join(random.choices(string.ascii_uppercase + string.digits, k=5)) | |
# Function to verify CAPTCHA per user session | |
def verify_captcha(user_input, captcha_solution): | |
if user_input.strip() == captcha_solution: | |
return "Captcha verified. Video analysis will proceed.", True | |
else: | |
return "Incorrect CAPTCHA. Please try again.", False | |
def analyze_video(video_file, captcha_input, captcha_solution, request: gr.Request): | |
# Authorization check | |
#if not is_authorized(request): | |
# return {"error": "Unauthorized access. Please provide a valid API key."} | |
client_ip = get_client_ip(request) | |
# Check if IP is blocked | |
if is_ip_blocked(client_ip): | |
log_api_access(successful_captcha=False, video_file=os.path.basename(video_file) if video_file else "N/A", client_ip=client_ip, blocked=True) | |
return {"error": "Your IP has been blocked due to excessive usage. Please try again later."} | |
# Verify CAPTCHA | |
message, success = verify_captcha(captcha_input, captcha_solution) | |
if not success: | |
log_api_access(successful_captcha=False, video_file=os.path.basename(video_file) if video_file else "N/A", client_ip=client_ip) | |
return {"error": message} | |
# Update IP access time | |
update_ip_access_time(client_ip) | |
# Log API access attempt with successful CAPTCHA | |
log_api_access(successful_captcha=True, video_file=os.path.basename(video_file) if video_file else "N/A", client_ip=client_ip) | |
try: | |
# Video truncation and analysis | |
truncated_video = truncate_video(video_file) | |
results = analyzer.analyze_media(truncated_video) | |
# Process combined assessment | |
combined_assessment = results.get('combined_assessment', 0) | |
if isinstance(combined_assessment, str) and combined_assessment.lower() == "deepfake": | |
analysis_result = "a deepfake" | |
else: | |
combined_assessment = int(combined_assessment) if str(combined_assessment).isdigit() else 0 | |
analysis_result = "genuine/original" if combined_assessment < 50 else "a deepfake" | |
output = { | |
"message": f"According to our analysis, the video you uploaded appears to be {analysis_result}. " | |
f"Made by Neerav Sood (LinkedIn - https://www.linkedin.com/in/neeravsood/). " | |
} | |
return output | |
except Exception as e: | |
logging.error(f"Error during analysis: {e}") | |
return {"error": "An error occurred during video analysis. Please check your input and try again."} | |
# Interface with CAPTCHA after video upload | |
def main_interface(): | |
with gr.Blocks() as interface: | |
gr.Markdown("# AllMark - Deepfake Analyzer") | |
gr.Markdown("Upload a video to proceed with analysis.") | |
# State variables for per-session data | |
captcha_solution = gr.State() | |
# Video input with display size restriction | |
video_input = gr.Video(label="Upload Video", height=300) # Adjust height as needed | |
captcha_text = gr.Textbox(label="CAPTCHA", interactive=False, visible=False) | |
captcha_input = gr.Textbox(label="Enter CAPTCHA Here", visible=False) | |
captcha_output = gr.Textbox(label="CAPTCHA Status", interactive=False, visible=False) | |
analyze_button = gr.Button("Analyze Video", visible=False) | |
analysis_output = gr.JSON(label="Analysis Result") | |
# Function to show CAPTCHA after video upload | |
def show_captcha(video): | |
if video is not None: | |
# Generate a new CAPTCHA for this session | |
new_captcha = generate_captcha() | |
return ( | |
gr.update(visible=True, value=new_captcha), | |
gr.update(visible=True), | |
gr.update(visible=True), | |
gr.update(visible=True), | |
new_captcha # Update the session state | |
) | |
else: | |
return ( | |
gr.update(visible=False), | |
gr.update(visible=False), | |
gr.update(visible=False), | |
gr.update(visible=False), | |
gr.no_update | |
) | |
video_input.change( | |
show_captcha, | |
inputs=video_input, | |
outputs=[captcha_text, captcha_input, captcha_output, analyze_button, captcha_solution] | |
) | |
# Handle analysis | |
analyze_button.click( | |
analyze_video, | |
inputs=[video_input, captcha_input, captcha_solution], | |
outputs=analysis_output | |
) | |
return interface | |
# Launch interface | |
if __name__ == "__main__": | |
main_interface().launch() | |