File size: 20,033 Bytes
a81e750
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
# -*- coding: utf-8 -*-
from phi.agent import Agent
from phi.model.google import Gemini
from phi.tools.duckduckgo import DuckDuckGo
import google.generativeai as genai
from google.generativeai import upload_file, get_file

import os
import numpy as np
import time
import uuid

import yt_dlp
import cv2
import mediapipe as mp

#========================================================================================================== 
# Load a pre-trained face embedding model (OpenCV's FaceNet). This model has better performance than mp embedder
face_embedder = cv2.dnn.readNetFromTorch("nn4.small2.v1.t7")  # Download the model from OpenCV's GitHub

# Define embedder with Mediapipe, -- comment off as worse performance for face detection
# Download the model from https://storage.googleapis.com/mediapipe-tasks/image_embedder
BaseOptions = mp.tasks.BaseOptions
ImageEmbedder = mp.tasks.vision.ImageEmbedder
ImageEmbedderOptions = mp.tasks.vision.ImageEmbedderOptions
VisionRunningMode = mp.tasks.vision.RunningMode

options = ImageEmbedderOptions(
    base_options=BaseOptions(model_asset_path='mobilenet_v3_small_075_224_embedder.tflite'),
    quantize=True,
    running_mode=VisionRunningMode.IMAGE)

mp_embedder = ImageEmbedder.create_from_options(options)

#================================================================================================================   
def initialize_agent():
    return Agent(
        name="Video AI summarizer",
        model=Gemini(id="gemini-2.0-flash-exp"),
        tools=[DuckDuckGo()],
        show_tool_calls=True,
        markdown=True,
    )

# Based on cv2 facenet embedder
def get_face_embedding(face_image):
    """
    Generate a face embedding using the pre-trained model.
    """
    # Preprocess the face image with cv2
    blob = cv2.dnn.blobFromImage(face_image, 1.0 / 255, (96, 96), (0, 0, 0), swapRB=True, crop=False)
    face_embedder.setInput(blob)
    embedding = face_embedder.forward()
    
    return embedding.flatten()
    
#  Based on mediapipe embedder
def get_mp_embedding(face_image):
    """
    Generate a face embedding using the pre-trained model.
    """
    # Load the input image from a numpy array.
    mp_image = mp.Image(image_format=mp.ImageFormat.SRGB, data=np.array(face_image))
    embedding_result = mp_embedder.embed(mp_image)
    
    return embedding_result.embeddings[0]  

# Advanced Face Tracking with MediaPipe and Face Embeddings
def face_detection_embed(video_path):
    # Initialize MediaPipe Face Detection
    mp_face_detection = mp.solutions.face_detection
    mp_drawing = mp.solutions.drawing_utils
    
    # Load a pre-trained face embedding model (OpenCV's FaceNet)
    #embedder = cv2.dnn.readNetFromTorch("nn4.small2.v1.t7")  # Download the model from OpenCV's GitHub, move out from this function
    
    # Open the video file
    video_capture = cv2.VideoCapture(video_path)
    
    # Dictionary to store face embeddings and their corresponding IDs, number of matched, normalized images
    face_tracker = {}  # Format: {face_id: {"embedding": face_embedding,  "number_matched": number_matched, "image": normalized_face}}
    face_id_counter = 0
    similarity_threshold = 0.5  # Threshold for considering two faces the same
    frame_number = 0
    
    # Define the target size for normalization
    target_width = 100  # Desired width for all faces
    target_height = 100  # Desired height for all faces

    with mp_face_detection.FaceDetection(min_detection_confidence=0.5) as face_detection:
        while True:
            # Grab a single frame of video
            ret, frame = video_capture.read()
            if not ret:
                break

            if frame_number % 30 == 0:
                # Convert the frame to RGB for MediaPipe
                rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
                
                # Process the frame with MediaPipe Face Detection
                results = face_detection.process(rgb_frame)

                if results.detections:
                    for detection in results.detections:
                        # Get the bounding box of the face
                        bboxC = detection.location_data.relative_bounding_box
                        ih, iw, _ = frame.shape
                        x = int(bboxC.xmin * iw)
                        y = int(bboxC.ymin * ih)
                        w = int(bboxC.width * iw)
                        h = int(bboxC.height * ih)
                        score = detection.score[0]
                        
                        # Extract the face region
                        face_image = frame[y:y+h, x:x+w]
                        if face_image.size == 0:
                            continue  # Skip empty face regions
                            
                        #yield face_image  # Yield the frame for streaming
                        
                        # Generate the face embedding
                        face_embedding = get_face_embedding(face_image) #This model has better performance than mp embedder
                        #face_embedding = get_mp_embedding(face_image)
                        
                        # Check if this face matches any previously tracked face, and find face_id with maximum similarity
                        matched_id = None
                        max_similarity = 0
                        for face_id, data in face_tracker.items():
                            # Calculate the cosine similarity between embeddings. This model has better performance than mp embedder
                            similarity = np.dot(face_embedding, data["embedding"]) / (
                                np.linalg.norm(face_embedding) * np.linalg.norm(data["embedding"])
                            )
                            '''
                            # Compute cosine similarity. comment off because of worse performance
                            similarity = ImageEmbedder.cosine_similarity(
                              face_embedding, data["embedding"])
                            '''
                            if similarity > max_similarity:
                                max_similarity = similarity
                                max_face_id = face_id 
                           
                        # Define a larger bounding box for output faceface                      
                        xb = int(x * 0.8)                         
                        yb = int(y * 0.8)                         
                        xe = int(x * 1.2 + w)                         
                        ye = int(y * 1.2 + h)                         
  
                        if max_similarity > similarity_threshold:
                            matched_id = max_face_id
                            number_matched = face_tracker[matched_id]["number_matched"] + 1
                            face_tracker[matched_id]["number_matched"] = number_matched
                            if score > face_tracker[matched_id]["score"]: #switch to higher score image
                                face_image_b = frame[yb:ye, xb:xe]
                                normalized_face = cv2.resize(face_image_b, (target_width, target_height))
                                face_tracker[matched_id] = {"embedding": face_embedding, "number_matched": number_matched, "image": normalized_face, "score":score}

                        # If the face is not matched, assign a new ID
                        if matched_id is None:
                            face_id_counter += 1
                            matched_id = face_id_counter
                            
                            # Update the face tracker with the new embedding and frame number
                            face_image_b = frame[yb:ye, xb:xe]
                            normalized_face = cv2.resize(face_image_b, (target_width, target_height))
                            face_tracker[matched_id] = {"embedding": face_embedding, "number_matched": 0, "image": normalized_face, "score":score}

                        # Draw a larger bounding box and face ID
                        cv2.rectangle(frame, (xb, yb), (xe, ye), (0, 255, 0), 2)
                        cv2.putText(frame, f"ID: {matched_id}", (xb, yb - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0, 255, 0), 2)

                # Display the resulting frame, for debug purpose
                #yield frame  # Yield the frame for streaming
                #time.sleep(2) #simulate a delay

            # Increment frame number             
            frame_number += 1

        # finished reading video
        if len(face_tracker) == 0:
            return None
            
        sorted_data = sorted(face_tracker, key=lambda x: face_tracker[x]['number_matched'], reverse =True)
        
        # find top N faces in all detected faces
        number_faces = len(face_tracker)
        if number_faces >= 3:
            center_top1 = [sorted_data[1], sorted_data[0], sorted_data[2]] # Top 1 will take center position
        else:
            center_top1 = sorted_data 

        images = []
        contents = []
        for face_id in center_top1: 
            #yield  face_tracker[face_id]["image"]  # Yield the frame for streaming
            #time.sleep(2) #simulate a delay
            face_image = face_tracker[face_id]["image"]
            images.append(face_image)
                
    # Release the video capture object
    video_capture.release()
    cv2.destroyAllWindows()

    return images

 # Advanced object Tracking with MediaPipe object detection
def object_detection_embed(video_path):
    # Initialize MediaPipe Face Detection
    BaseOptions = mp.tasks.BaseOptions
    ObjectDetector = mp.tasks.vision.ObjectDetector
    ObjectDetectorOptions = mp.tasks.vision.ObjectDetectorOptions
    VisionRunningMode = mp.tasks.vision.RunningMode

    options = ObjectDetectorOptions(
        base_options=BaseOptions(model_asset_path='efficientdet_lite0.tflite'),
        max_results=3,
        score_threshold=0.5,
        running_mode=VisionRunningMode.IMAGE,
        )
 
    mp_drawing = mp.solutions.drawing_utils
    
    # Load a pre-trained face embedding model (OpenCV's FaceNet)
    #embedder = cv2.dnn.readNetFromTorch("nn4.small2.v1.t7")  # Download the model from OpenCV's GitHub, move out from this function
    
    # Open the video file
    video_capture = cv2.VideoCapture(video_path)
    
    # Dictionary to store face embeddings and their corresponding IDs, number of matched, normalized images
    object_tracker = {}  # Format: {object_id: {"embedding": obj_embedding,  "number_matched": number_matched, "image": normalized_obj, "score":score, "category": category}}
    object_id_counter = 0
    similarity_threshold = 0.5  # Threshold for considering two faces the same
    frame_number = 0
    
    # Define the target size for normalization, only fix height
    #target_width = 100  # Desired width for all faces
    target_height = 100  # Desired height for all faces

    with ObjectDetector.create_from_options(options) as obj_detection:
        while True:
            # Grab a single frame of video
            ret, frame = video_capture.read()
            if not ret:
                break

            if frame_number % 30 == 0:
                # Convert the frame to RGB for MediaPipe
                rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)

                # Load the image back into memory because Image object needs filepath input
                frame_height, frame_width, _ = rgb_frame.shape
                                
                # Load the input image from a numpy array.
                mp_image = mp.Image(image_format=mp.ImageFormat.SRGB, data=rgb_frame)

                # Process the frame with MediaPipe Face Detection
                results = obj_detection.detect(mp_image)
                
                if results.detections:
                    for detection in results.detections:
                        #print("line 297: detection:", detection)
                        '''
                        sample output:
                        Detection(bounding_box=BoundingBox(origin_x=84, origin_y=168, width=272, height=448), 
                                  categories=[Category(index=None, score=0.81640625, display_name=None, category_name='person')], keypoints=[])
                        '''
                        # Get the bounding box of the face, note x is in height ditection(h)
                        bboxC = detection.bounding_box
                        x = int(bboxC.origin_x)
                        y = int(bboxC.origin_y)
                        w = int(bboxC.width)
                        h = int(bboxC.height)
                        score = detection.categories[0].score
                        category = detection.categories[0].category_name
                        
                        # Extract the face region
                        obj_image = frame[y:y+w, x:x+h]
                        if obj_image.size == 0:
                            continue  # Skip empty face regions
                            
                        #yield obj_image  # Yield the frame for streaming
                        
                        # Generate the face embedding
                        #face_embedding = get_face_embedding(face_image) #This model has better performance than mp embedder
                        obj_embedding = get_mp_embedding(obj_image)
                        
                        # Check if this face matches any previously tracked face, and find face_id with maximum similarity
                        matched_id = None
                        max_similarity = 0
                        for obj_id, data in object_tracker.items():
                            '''
                            # Calculate the cosine similarity between embeddings. This model has better performance than mp embedder
                            similarity = np.dot(face_embedding, data["embedding"]) / (
                                np.linalg.norm(face_embedding) * np.linalg.norm(data["embedding"])
                            )
                            '''
                            # Compute cosine similarity. comment off because of worse performance
                            similarity = ImageEmbedder.cosine_similarity(
                              obj_embedding, data["embedding"])
                            
                            if similarity > max_similarity:
                                max_similarity = similarity
                                max_obj_id = obj_id 

                        # Define a larger bounding box for output faceface                      
                        xb = int(x * 0.8)                         
                        yb = int(y * 0.8)                         
                        xe = int(x * 1.2 + h)                         
                        ye = int(y * 1.2 + w)                         

                        scale = target_height / (x * 0.4 + w)
                        target_width = int((y * 0.4 + w) * scale)
                        
                        if max_similarity > similarity_threshold:
                            matched_id = max_obj_id
                            number_matched = object_tracker[matched_id]["number_matched"] + 1
                            object_tracker[matched_id]["number_matched"] = number_matched
                            if score > object_tracker[matched_id]["score"]: #switch to higher score image
                                obj_image_b = frame[yb:ye, xb:xe]
                                normalized_obj = cv2.resize(obj_image_b, (target_width, target_height))
                                object_tracker[matched_id] = {"embedding": obj_embedding, "number_matched": number_matched, "image": normalized_obj, "score":score, "category":category}

                        # If the face is not matched, assign a new ID
                        if matched_id is None:
                            object_id_counter += 1
                            matched_id = object_id_counter
                            
                            # Update the face tracker with the new embedding and frame number
                            obj_image_b = frame[yb:ye, xb:xe]
                            normalized_obj = cv2.resize(obj_image_b, (target_width, target_height))
                            object_tracker[matched_id] = {"embedding": obj_embedding, "number_matched": 0, "image": normalized_obj, "score":score, "category":category}

                        # Draw a larger bounding box and face ID
                        #cv2.rectangle(frame, (xb, yb), (xe, ye), (0, 255, 0), 2)
                        #cv2.putText(frame, f"ID: {matched_id}", (xb, yb - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0, 255, 0), 2)

                # Display the resulting frame, for debug purpose
                #yield frame  # Yield the frame for streaming
                #time.sleep(2) #simulate a delay

            # Increment frame number             
            frame_number += 1

        # finished reading video
        if len(object_tracker) == 0:
            return None
            
        sorted_data = sorted(object_tracker, key=lambda x: object_tracker[x]['number_matched'], reverse =True)
         
        # find top N faces in all detected faces
        number_objs = len(object_tracker)
        if number_objs >= 3:
            center_top1 = [sorted_data[1], sorted_data[0], sorted_data[2]] # Top 1 will take center position
        else:
            center_top1 = sorted_data 

        images = []
        contents = []
        #center_top1 = [sorted_data[1], sorted_data[0], sorted_data[2]] # Top 1 will take center position
        for obj_id in center_top1: 
            #yield  object_tracker[obj_id]["image"]  # Yield the frame for streaming
            #time.sleep(2) #simulate a delay
            obj_image = object_tracker[obj_id]["image"]
            images.append(obj_image)
                
    # Release the video capture object
    video_capture.release()
    cv2.destroyAllWindows()

    return images
   
#=========================================================================================================
# Summarize video using phi Agent

def summarize_video(video_path, user_prompt, out_lang = 'Original'):
    # Upload and process the video
    processed_video = upload_file(video_path)

    # Extract video info to a dictionary
    video_info = str(processed_video).split('File(')[1]
    video_info = video_info.replace(")", "")
    video_dic = eval(video_info)
    print("display_name, sha256_hash:", video_dic['display_name'], video_dic['sha256_hash'])
    
    while processed_video.state.name == "PROCESSING":
        time.sleep(1)
        processed_video = get_file(processed_video.name)
    
    # detect language
    lang_prompt = (f'''Give language name''')
    lang_response = multimodal_Agent.run(lang_prompt, videos=[processed_video]).content
    language = str(lang_response).split(' ')[-1]
    print('Video language is:', language)  
    if out_lang == 'Original':
        out_lang = language
        
    # Analysis prompt
    analysis_prompt = ( f'''
                        First analyze the video and then answer following questions using the video analysis, questions: 
                        {user_prompt}
                        Provide a comprehensive response focusing on practical, actionable information with original questions.
                        Answer questions in {out_lang}. limit the total lines to 30 lines.'''
                        )
    
    # AI agent processing
    response = multimodal_Agent.run(analysis_prompt, videos=[processed_video])
    
    markdown_text = response.content

    return out_lang, str(markdown_text)

#=======================================================================================

# Initialize the agent
multimodal_Agent = initialize_agent()