File size: 6,549 Bytes
f8aaa9d
7c2c622
f8aaa9d
7c2c622
f8aaa9d
c43728b
f8aaa9d
 
0f96bc2
f8aaa9d
 
 
 
0f96bc2
f8aaa9d
 
0f96bc2
830c9fb
f8aaa9d
 
c43728b
f8aaa9d
001b623
cba459f
5e2d98d
f8aaa9d
001b623
 
f8aaa9d
 
 
001b623
4938676
c43728b
f8aaa9d
 
 
7c2c622
f8aaa9d
7c2c622
0f96bc2
7c2c622
 
 
cba459f
7c2c622
cba459f
7c2c622
 
 
001b623
7c2c622
4938676
 
 
 
7c2c622
 
 
 
4938676
 
7c2c622
4938676
7c2c622
 
 
 
c43728b
 
 
 
 
 
 
 
 
7c2c622
03c6357
7c2c622
0f96bc2
7c2c622
001b623
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
7c2c622
 
001b623
0f96bc2
4938676
 
7c2c622
 
4938676
7c2c622
f8aaa9d
 
 
 
 
4938676
 
 
 
 
 
 
 
 
 
 
f8aaa9d
001b623
f8aaa9d
 
0f96bc2
 
 
001b623
7c2c622
 
 
 
 
 
 
 
 
001b623
f8aaa9d
001b623
4938676
f8aaa9d
001b623
 
 
f8aaa9d
 
 
0f96bc2
03c6357
0f96bc2
 
 
 
3f2c22a
0f96bc2
f8aaa9d
 
 
4938676
 
f8aaa9d
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
import os
import json
import gradio as gr
import cv2
from google import genai
from google.genai.types import Part
from tenacity import retry, stop_after_attempt, wait_random_exponential

# Retrieve API key from environment variables.
GOOGLE_API_KEY = os.environ.get("GOOGLE_API_KEY")
if not GOOGLE_API_KEY:
    raise ValueError("Please set the GOOGLE_API_KEY environment variable.")

# Initialize the Gemini API client via AI Studio using the API key.
client = genai.Client(api_key=GOOGLE_API_KEY)

# Use the Gemini 2.0 Flash model.
MODEL_NAME = "gemini-2.0-flash-001"

@retry(wait=wait_random_exponential(multiplier=1, max=60), stop=stop_after_attempt(3))
def call_gemini(video_file: str, prompt: str) -> str:
    """
    Call the Gemini model with the provided video file and prompt.
    The video file is read as bytes and passed with MIME type "video/mp4".
    The prompt is passed as a plain string.
    """
    with open(video_file, "rb") as f:
        file_bytes = f.read()
    response = client.models.generate_content(
        model=MODEL_NAME,
        contents=[
            Part(file_data=file_bytes, mime_type="video/mp4"),
            prompt
        ]
    )
    return response.text

def hhmmss_to_seconds(time_str: str) -> float:
    """
    Convert a HH:MM:SS formatted string into seconds.
    """
    parts = time_str.strip().split(":")
    parts = [float(p) for p in parts]
    if len(parts) == 3:
        return parts[0] * 3600 + parts[1] * 60 + parts[2]
    elif len(parts) == 2:
        return parts[0] * 60 + parts[1]
    else:
        return parts[0]

def get_key_frames(video_file: str, analysis: str, user_query: str) -> list:
    """
    Ask Gemini to list key timestamps and descriptions for the video.
    The model is instructed to output one line per event in the format:
    HH:MM:SS - description
    We then parse these lines and extract the corresponding frames using OpenCV.
    
    Returns a list of tuples: (image_array, caption)
    """
    prompt = (
        "List the key timestamps in the video and a brief description of the important event at that time. "
        "Output one line per event in the following format: HH:MM:SS - description. Do not include any extra text."
    )
    prompt += f" Video Summary: {analysis}"
    if user_query:
        prompt += f" Additional focus: {user_query}"
    
    try:
        key_frames_response = call_gemini(video_file, prompt)
        lines = key_frames_response.strip().split("\n")
        key_frames = []
        for line in lines:
            if " - " in line:
                parts = line.split(" - ", 1)
                timestamp = parts[0].strip()
                description = parts[1].strip()
                key_frames.append({"timestamp": timestamp, "description": description})
    except Exception as e:
        print("Error in key frame extraction:", e)
        key_frames = []
    
    extracted_frames = []
    cap = cv2.VideoCapture(video_file)
    if not cap.isOpened():
        print("Error: Could not open the uploaded video file.")
        return extracted_frames

    for frame_obj in key_frames:
        ts = frame_obj.get("timestamp")
        description = frame_obj.get("description", "")
        try:
            seconds = hhmmss_to_seconds(ts)
        except Exception:
            continue
        cap.set(cv2.CAP_PROP_POS_MSEC, seconds * 1000)
        ret, frame = cap.read()
        if ret:
            frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            caption = f"{ts}: {description}"
            extracted_frames.append((frame_rgb, caption))
    cap.release()
    return extracted_frames

def analyze_video(video_file: str, user_query: str) -> (str, list):
    """
    Perform iterative video analysis on the uploaded file.
    Iteratively refine the summary with simpler prompts, then ask for key timestamps.
    
    Returns:
      - A Markdown report (string) summarizing the video.
      - A gallery list of key frames (each as a tuple of (image, caption)).
    """
    analysis = ""
    num_iterations = 3

    for i in range(num_iterations):
        if i == 0:
            prompt = "Give a detailed summary of the video."
            if user_query:
                prompt += f" Also focus on: {user_query}"
        elif i == 1:
            prompt = f"Based on the summary: \"{analysis}\", provide additional details about important events and anomalies in the video."
            if user_query:
                prompt += f" Also focus on: {user_query}"
        else:
            prompt = f"Refine and consolidate the analysis: \"{analysis}\" into a final summary."
        
        try:
            analysis = call_gemini(video_file, prompt)
        except Exception as e:
            analysis += f"\n[Error during iteration {i+1}: {e}]"
            break

    markdown_report = f"## Video Analysis Report\n\n**Summary:**\n\n{analysis}\n"
    key_frames_gallery = get_key_frames(video_file, analysis, user_query)
    if not key_frames_gallery:
        markdown_report += "\n*No key frames were extracted.*\n"
    else:
        markdown_report += "\n**Key Frames Extracted:**\n"
        for idx, (img, caption) in enumerate(key_frames_gallery, start=1):
            markdown_report += f"- **Frame {idx}:** {caption}\n"

    return markdown_report, key_frames_gallery

def gradio_interface(video_file, user_query: str) -> (str, list):
    """
    Gradio interface function that accepts an uploaded video file and an optional query,
    then returns a Markdown report and a gallery of extracted key frames with captions.
    """
    if not video_file:
        return "Please upload a valid video file.", []
    return analyze_video(video_file, user_query)

iface = gr.Interface(
    fn=gradio_interface,
    inputs=[
        gr.Video(label="Upload Video File"),
        gr.Textbox(label="Analysis Query (optional): guide the focus of the analysis", placeholder="e.g., focus on unusual movements near the entrance")
    ],
    outputs=[
        gr.Markdown(label="Security & Surveillance Analysis Report"),
        gr.Gallery(label="Extracted Key Frames", columns=2)
    ],
    title="AI Video Analysis and Summariser Agent",
    description=(
        "This agentic video analysis tool uses Google's Gemini 2.0 Flash model via AI Studio "
        "to iteratively analyze an uploaded video for insights. Provide a video file and, optionally, "
        "a query to guide the analysis. The tool returns a Markdown report along with a gallery of key frame images."
    )
)

if __name__ == "__main__":
    iface.launch()