File size: 4,567 Bytes
3a3c2be
2dc47c6
 
cb82c24
2dc47c6
589bf82
 
 
 
 
 
 
 
 
 
3a3c2be
2dc47c6
 
 
 
 
3a3c2be
2dc47c6
 
 
 
cb82c24
2dc47c6
 
cb82c24
2dc47c6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c4c84f3
 
1ae13cc
 
cb82c24
7b57752
 
 
 
 
 
 
1ae13cc
7b57752
 
 
1ae13cc
 
 
 
 
 
7b57752
1ae13cc
7b57752
 
1ae13cc
7b57752
1ae13cc
589bf82
1ae13cc
cb82c24
 
 
 
589bf82
c09a33a
cb82c24
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
import os
import subprocess
import importlib.util
import gradio as gr
import logging
from moviepy.editor import VideoFileClip

# Function to truncate video to 15 seconds
def truncate_video(video_file):
    """Truncates video to 15 seconds and saves it as a temporary file."""
    clip = VideoFileClip(video_file)
    truncated_clip = clip.subclip(0, min(15, clip.duration))
    truncated_video_file = "temp_truncated_video.mp4"
    truncated_clip.write_videofile(truncated_video_file, codec="libx264", audio_codec="aac")
    return truncated_video_file

# Clone the GitHub repository containing the backend
def clone_repo():
    """Clone the GitHub repository containing the backend."""
    repo_url = "https://github.com/NeeravSood/AllMark-MVP.git"  # Update if necessary
    repo_path = "./repository"

    # Retrieve the GitHub Personal Access Token (GITHUB_PAT) from environment variables
    github_pat = os.getenv("GITHUB_PAT")
    if not github_pat:
        raise RuntimeError("GitHub Personal Access Token (GITHUB_PAT) not found in environment variables.")

    # Modify the repository URL to include the token for authentication
    authenticated_repo_url = f"https://{github_pat}@github.com/NeeravSood/AllMark-MVP.git"

    if os.path.exists(repo_path):
        print("Repository already cloned.")
    else:
        try:
            # Clone the repository using the authenticated URL
            subprocess.run(
                ["git", "clone", authenticated_repo_url, repo_path],
                check=True,
                text=True,
                capture_output=True
            )
            print("Repository cloned successfully.")
        except subprocess.CalledProcessError as e:
            print("Output:", e.stdout)
            print("Error:", e.stderr)
            raise RuntimeError(f"Failed to clone repository: {e.stderr}")

def import_backend_script(script_name):
    """Dynamically import the backend script."""
    try:
        script_path = os.path.join("./repository", script_name)
        if not os.path.exists(script_path):
            raise FileNotFoundError(f"Script {script_name} not found in the repository.")
        
        spec = importlib.util.spec_from_file_location("backend_module", script_path)
        backend_module = importlib.util.module_from_spec(spec)
        spec.loader.exec_module(backend_module)
        return backend_module
    except Exception as e:
        logging.error(f"Error importing backend script: {str(e)}")
        raise RuntimeError(f"Failed to import backend script: {str(e)}")

# Clone the repository and import the backend module
clone_repo()
backend = import_backend_script("app.py")  # Import app.py from the cloned repository

# Initialize the analyzer instance from the imported module
analyzer = backend.DeepfakeAnalyzer()  # Use the imported module's class or function

# Define the Gradio function to analyze the video
import json  # Ensure imports include json for dictionary-to-JSON string handling

def analyze_video(video_file):
    try:
        # Truncate the video to 15 seconds
        truncated_video = truncate_video(video_file)
        
        # Pass the truncated video to the analyzer
        results = analyzer.analyze_media(truncated_video)
        
        # Get the combined probability and interpret the result
        combined_probability = results.get('combined_assessment', 0)
        analysis_result = "genuine/original" if combined_probability < 50 else "a deepfake"
        
        # Prepare JSON output
        output = {
            "message": f"According to our analysis, the video you uploaded appears to be {analysis_result} "
                       f"with a {combined_probability:.2f}% probability. "
                       f"{len(results['video_analysis']['frame_results'])} frames were analyzed in total."
        }
        
        return output

    except Exception as e:
        # Log the error and return a JSON-compatible error message
        logging.error(f"Error during analysis: {e}")
        return {"error": "An error occurred during video analysis. Please check your input and try again."}

# Define the Gradio interface with json output to handle dictionaries
interface = gr.Interface(
    fn=analyze_video,
    inputs=gr.Video(label="Upload Video"),
    outputs="json",
    title="AllMark - Deepfake Analyzer",
    description="Upload a video to analyze for deepfake content. N.B. - Only mp4 files supported for now. Analysis usually takes between 1 to 5 minutes."
)

# Launch Gradio app
if __name__ == "__main__":
    interface.launch()