import os import subprocess import importlib.util import gradio as gr import logging from moviepy.editor import VideoFileClip import json import spaces import torch import random import string from datetime import datetime, timedelta from collections import defaultdict # Set up logging logging.basicConfig(level=logging.INFO, filename="api_access.log", format="%(asctime)s - %(message)s") torch.set_num_threads(1) torch.set_num_interop_threads(1) torch.use_deterministic_algorithms(True) torch.backends.cudnn.deterministic = True torch.backends.cudnn.benchmark = False torch.backends.cuda.matmul.allow_tf32 = False torch.backends.cudnn.allow_tf32 = False # Data structures to track IP addresses ip_access_records = defaultdict(lambda: { 'total_access_time': timedelta(), 'blocked_until': None, 'session_start': None, 'last_access_time': None }) def truncate_video(video_file): """Truncates video to 30 seconds and saves it as a temporary file.""" clip = VideoFileClip(video_file) truncated_clip = clip.subclip(0, min(30, clip.duration)) # Change 15 to 30 for 30 seconds truncated_video_file = "temp_truncated_video.mp4" truncated_clip.write_videofile(truncated_video_file, codec="libx264", audio_codec="aac") clip.close() truncated_clip.close() return truncated_video_file ''' AUTHORIZED_API_KEY = os.getenv("HF_API_KEY") def is_authorized(request: gr.Request): """Checks if the request includes the correct API key.""" api_key = request.headers.get("API-Key") print("API-Key received:", api_key) # Log the received API key for debugging return api_key == AUTHORIZED_API_KEY ''' SESSION_TIMEOUT = timedelta(hours=1) # Function to log API access attempts def log_api_access(successful_captcha, video_file=None, client_ip="unknown", blocked=False): status = "Blocked" if blocked else "Allowed" logging.info(f"Access Attempt - Status: {status}, Successful CAPTCHA: {successful_captcha}, " f"Video File: {video_file if video_file else 'N/A'}, Client IP: {client_ip}") # Function to get client IP address def get_client_ip(request: gr.Request): return request.client.host # Function to check if IP is blocked def is_ip_blocked(client_ip): record = ip_access_records[client_ip] blocked_until = record.get('blocked_until') if blocked_until and datetime.now() < blocked_until: return True return False # Function to update IP access time def update_ip_access_time(client_ip): record = ip_access_records[client_ip] now = datetime.now() session_start = record.get('session_start') if session_start: elapsed = now - session_start record['total_access_time'] += elapsed record['session_start'] = now else: record['session_start'] = now # Remove access times older than 24 hours last_access_time = record.get('last_access_time') if last_access_time and now - last_access_time > timedelta(hours=24): record['total_access_time'] = timedelta() record['last_access_time'] = now # Check if total access time in last 24 hours exceeds 1 hour if record['total_access_time'] >= timedelta(hours=1): # Block IP for 48 hours record['blocked_until'] = now + timedelta(hours=48) record['total_access_time'] = timedelta() # Reset total access time # Function to truncate video def truncate_video(video_file): """Truncates video to 15 seconds and saves it as a temporary file.""" clip = VideoFileClip(video_file) truncated_clip = clip.subclip(0, min(15, clip.duration)) truncated_video_file = "temp_truncated_video.mp4" truncated_clip.write_videofile(truncated_video_file, codec="libx264", audio_codec="aac") clip.close() truncated_clip.close() return truncated_video_file # Clone repository def clone_repo(): """Clone the GitHub repository containing the backend.""" repo_url = "https://github.com/NeeravSood/AllMark-MVP.git" repo_path = "./repository" github_pat = os.getenv("GITHUB_PAT") if not github_pat: raise RuntimeError("GitHub Personal Access Token (GITHUB_PAT) not found in environment variables.") authenticated_repo_url = f"https://{github_pat}@github.com/NeeravSood/AllMark-MVP.git" if os.path.exists(repo_path): print("Repository already cloned.") else: try: subprocess.run( ["git", "clone", authenticated_repo_url, repo_path], check=True, text=True, capture_output=True ) print("Repository cloned successfully.") except subprocess.CalledProcessError as e: print("Output:", e.stdout) print("Error:", e.stderr) raise RuntimeError(f"Failed to clone repository: {e.stderr}") # Import backend script def import_backend_script(script_name): """Dynamically import the backend script.""" try: script_path = os.path.join("./repository", script_name) if not os.path.exists(script_path): raise FileNotFoundError(f"Script {script_name} not found in the repository.") spec = importlib.util.spec_from_file_location("backend_module", script_path) backend_module = importlib.util.module_from_spec(spec) spec.loader.exec_module(backend_module) return backend_module except Exception as e: logging.error(f"Error importing backend script: {str(e)}") raise RuntimeError(f"Failed to import backend script: {str(e)}") # Clone and import repository clone_repo() backend = import_backend_script("app.py") analyzer = backend.DeepfakeAnalyzer() # Generate a random CAPTCHA challenge def generate_captcha(): return ''.join(random.choices(string.ascii_uppercase + string.digits, k=5)) # Function to verify CAPTCHA per user session def verify_captcha(user_input, captcha_solution): if user_input.strip() == captcha_solution: return "Captcha verified. Video analysis will proceed.", True else: return "Incorrect CAPTCHA. Please try again.", False @spaces.GPU(duration=1000) def analyze_video(video_file, captcha_input, captcha_solution, request: gr.Request): # Authorization check #if not is_authorized(request): # return {"error": "Unauthorized access. Please provide a valid API key."} client_ip = get_client_ip(request) # Check if IP is blocked if is_ip_blocked(client_ip): log_api_access(successful_captcha=False, video_file=os.path.basename(video_file) if video_file else "N/A", client_ip=client_ip, blocked=True) return {"error": "Your IP has been blocked due to excessive usage. Please try again later."} # Verify CAPTCHA message, success = verify_captcha(captcha_input, captcha_solution) if not success: log_api_access(successful_captcha=False, video_file=os.path.basename(video_file) if video_file else "N/A", client_ip=client_ip) return {"error": message} # Update IP access time update_ip_access_time(client_ip) # Log API access attempt with successful CAPTCHA log_api_access(successful_captcha=True, video_file=os.path.basename(video_file) if video_file else "N/A", client_ip=client_ip) try: # Video truncation and analysis truncated_video = truncate_video(video_file) results = analyzer.analyze_media(truncated_video) # Process combined assessment combined_assessment = results.get('combined_assessment', 0) if isinstance(combined_assessment, str) and combined_assessment.lower() == "deepfake": analysis_result = "a deepfake" else: combined_assessment = int(combined_assessment) if str(combined_assessment).isdigit() else 0 analysis_result = "genuine/original" if combined_assessment < 50 else "a deepfake" output = { "message": f"According to our analysis, the video you uploaded appears to be {analysis_result}. " f"Made by Neerav Sood (LinkedIn - https://www.linkedin.com/in/neeravsood/). " } return output except Exception as e: logging.error(f"Error during analysis: {e}") return {"error": "An error occurred during video analysis. Please check your input and try again."} # Interface with CAPTCHA after video upload def main_interface(): with gr.Blocks() as interface: gr.Markdown("# AllMark - Deepfake Analyzer") gr.Markdown("Upload a video to proceed with analysis.") # State variables for per-session data captcha_solution = gr.State() # Video input with display size restriction video_input = gr.Video(label="Upload Video", height=300) # Adjust height as needed captcha_text = gr.Textbox(label="CAPTCHA", interactive=False, visible=False) captcha_input = gr.Textbox(label="Enter CAPTCHA Here", visible=False) captcha_output = gr.Textbox(label="CAPTCHA Status", interactive=False, visible=False) analyze_button = gr.Button("Analyze Video", visible=False) analysis_output = gr.JSON(label="Analysis Result") # Function to show CAPTCHA after video upload def show_captcha(video): if video is not None: # Generate a new CAPTCHA for this session new_captcha = generate_captcha() return ( gr.update(visible=True, value=new_captcha), gr.update(visible=True), gr.update(visible=True), gr.update(visible=True), new_captcha # Update the session state ) else: return ( gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.no_update ) video_input.change( show_captcha, inputs=video_input, outputs=[captcha_text, captcha_input, captcha_output, analyze_button, captcha_solution] ) # Handle analysis analyze_button.click( analyze_video, inputs=[video_input, captcha_input, captcha_solution], outputs=analysis_output ) return interface # Launch interface if __name__ == "__main__": main_interface().launch()