File size: 6,042 Bytes
f8aaa9d
7c2c622
f8aaa9d
7c2c622
f8aaa9d
c43728b
f8aaa9d
 
0f96bc2
f8aaa9d
 
 
 
0f96bc2
f8aaa9d
 
0f96bc2
830c9fb
f8aaa9d
 
c43728b
f8aaa9d
001b623
cba459f
5e2d98d
f8aaa9d
001b623
 
f8aaa9d
 
 
001b623
4938676
c43728b
f8aaa9d
 
 
7c2c622
f8aaa9d
7c2c622
0f96bc2
7c2c622
 
 
cba459f
7c2c622
cba459f
7c2c622
 
 
63595a8
7c2c622
63595a8
 
4938676
63595a8
7c2c622
 
 
 
4938676
 
7c2c622
63595a8
7c2c622
63595a8
7c2c622
 
c43728b
 
 
 
 
 
 
 
 
7c2c622
03c6357
7c2c622
0f96bc2
7c2c622
001b623
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
7c2c622
 
001b623
0f96bc2
63595a8
 
 
7c2c622
 
63595a8
7c2c622
f8aaa9d
63595a8
 
 
 
 
 
 
 
 
7c2c622
 
 
 
 
 
 
 
001b623
f8aaa9d
001b623
4938676
f8aaa9d
001b623
 
 
f8aaa9d
 
 
0f96bc2
03c6357
0f96bc2
 
 
 
3f2c22a
0f96bc2
f8aaa9d
 
63595a8
 
 
f8aaa9d
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
import os
import json
import gradio as gr
import cv2
from google import genai
from google.genai.types import Part
from tenacity import retry, stop_after_attempt, wait_random_exponential

# Retrieve API key from environment variables.
GOOGLE_API_KEY = os.environ.get("GOOGLE_API_KEY")
if not GOOGLE_API_KEY:
    raise ValueError("Please set the GOOGLE_API_KEY environment variable.")

# Initialize the Gemini API client via AI Studio using the API key.
client = genai.Client(api_key=GOOGLE_API_KEY)

# Use the Gemini 2.0 Flash model.
MODEL_NAME = "gemini-2.0-flash-001"

@retry(wait=wait_random_exponential(multiplier=1, max=60), stop=stop_after_attempt(3))
def call_gemini(video_file: str, prompt: str) -> str:
    """
    Call the Gemini model with the provided video file and prompt.
    The video file is read as bytes and passed with MIME type "video/mp4".
    The prompt is passed as a plain string.
    """
    with open(video_file, "rb") as f:
        file_bytes = f.read()
    response = client.models.generate_content(
        model=MODEL_NAME,
        contents=[
            Part(file_data=file_bytes, mime_type="video/mp4"),
            prompt
        ]
    )
    return response.text

def hhmmss_to_seconds(time_str: str) -> float:
    """
    Convert a HH:MM:SS formatted string into seconds.
    """
    parts = time_str.strip().split(":")
    parts = [float(p) for p in parts]
    if len(parts) == 3:
        return parts[0] * 3600 + parts[1] * 60 + parts[2]
    elif len(parts) == 2:
        return parts[0] * 60 + parts[1]
    else:
        return parts[0]

def get_key_frames(video_file: str, summary: str, user_query: str) -> list:
    """
    Ask Gemini to output key timestamps and descriptions in plain text.
    The prompt instructs the model to output one line per event in the format:
    HH:MM:SS - description
    We then parse these lines and extract frames using OpenCV.
    
    Returns a list of tuples: (image_array, caption)
    """
    prompt = (
        "List the key timestamps in the video and a brief description of the important event at that time. "
        "Output one line per event in the following format: HH:MM:SS - description. Do not include any extra text."
    )
    prompt += f" Video Summary: {summary}"
    if user_query:
        prompt += f" Focus on: {user_query}"
    
    try:
        key_frames_response = call_gemini(video_file, prompt)
        lines = key_frames_response.strip().split("\n")
        key_frames = []
        for line in lines:
            if " - " in line:
                parts = line.split(" - ", 1)
                timestamp = parts[0].strip()
                description = parts[1].strip()
                key_frames.append({"timestamp": timestamp, "description": description})
    except Exception as e:
        print("Error in key frame extraction:", e)
        key_frames = []
    
    extracted_frames = []
    cap = cv2.VideoCapture(video_file)
    if not cap.isOpened():
        print("Error: Could not open the uploaded video file.")
        return extracted_frames

    for frame_obj in key_frames:
        ts = frame_obj.get("timestamp")
        description = frame_obj.get("description", "")
        try:
            seconds = hhmmss_to_seconds(ts)
        except Exception:
            continue
        cap.set(cv2.CAP_PROP_POS_MSEC, seconds * 1000)
        ret, frame = cap.read()
        if ret:
            frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            caption = f"{ts}: {description}"
            extracted_frames.append((frame_rgb, caption))
    cap.release()
    return extracted_frames

def analyze_video(video_file: str, user_query: str) -> (str, list):
    """
    Perform a single-step video analysis on the uploaded file.
    First, call Gemini to get a brief summary of the video.
    Then, ask Gemini for key timestamps and descriptions.
    
    Returns:
      - A Markdown report as a string.
      - A gallery list of key frames (each as a tuple of (image, caption)).
    """
    summary_prompt = "Summarize this video in a few sentences, focusing on any security or surveillance insights."
    if user_query:
        summary_prompt += f" Also focus on: {user_query}"
    try:
        summary = call_gemini(video_file, summary_prompt)
    except Exception as e:
        summary = f"[Error in summary extraction: {e}]"
    markdown_report = f"## Video Analysis Report\n\n**Summary:**\n\n{summary}\n"
    key_frames_gallery = get_key_frames(video_file, summary, user_query)
    if not key_frames_gallery:
        markdown_report += "\n*No key frames were extracted.*\n"
    else:
        markdown_report += "\n**Key Frames Extracted:**\n"
        for idx, (img, caption) in enumerate(key_frames_gallery, start=1):
            markdown_report += f"- **Frame {idx}:** {caption}\n"
    return markdown_report, key_frames_gallery

def gradio_interface(video_file, user_query: str) -> (str, list):
    """
    Gradio interface function that accepts an uploaded video file and an optional query,
    then returns a Markdown report and a gallery of extracted key frames with captions.
    """
    if not video_file:
        return "Please upload a valid video file.", []
    return analyze_video(video_file, user_query)

iface = gr.Interface(
    fn=gradio_interface,
    inputs=[
        gr.Video(label="Upload Video File"),
        gr.Textbox(label="Analysis Query (optional): guide the focus of the analysis", placeholder="e.g., focus on unusual movements near the entrance")
    ],
    outputs=[
        gr.Markdown(label="Security & Surveillance Analysis Report"),
        gr.Gallery(label="Extracted Key Frames", columns=2)
    ],
    title="AI Video Analysis and Summariser Agent",
    description=(
        "This tool uses Google's Gemini 2.0 Flash model via AI Studio to analyze an uploaded video. "
        "It returns a brief summary and extracts key frames based on that summary. "
        "Provide a video file and, optionally, a query to guide the analysis."
    )
)

if __name__ == "__main__":
    iface.launch()