# -*- coding: utf-8 -*- from phi.agent import Agent from phi.model.google import Gemini from phi.tools.duckduckgo import DuckDuckGo import google.generativeai as genai from google.generativeai import upload_file, get_file import os import numpy as np import time import uuid import yt_dlp import cv2 import mediapipe as mp #========================================================================================================== # Load a pre-trained face embedding model (OpenCV's FaceNet). This model has better performance than mp embedder face_embedder = cv2.dnn.readNetFromTorch("nn4.small2.v1.t7") # Download the model from OpenCV's GitHub # Define embedder with Mediapipe, -- comment off as worse performance for face detection # Download the model from https://storage.googleapis.com/mediapipe-tasks/image_embedder BaseOptions = mp.tasks.BaseOptions ImageEmbedder = mp.tasks.vision.ImageEmbedder ImageEmbedderOptions = mp.tasks.vision.ImageEmbedderOptions VisionRunningMode = mp.tasks.vision.RunningMode options = ImageEmbedderOptions( base_options=BaseOptions(model_asset_path='mobilenet_v3_small_075_224_embedder.tflite'), quantize=True, running_mode=VisionRunningMode.IMAGE) mp_embedder = ImageEmbedder.create_from_options(options) #================================================================================================================ def initialize_agent(): return Agent( name="Video AI summarizer", model=Gemini(id="gemini-2.0-flash-exp"), tools=[DuckDuckGo()], show_tool_calls=True, markdown=True, ) # Based on cv2 facenet embedder def get_face_embedding(face_image): """ Generate a face embedding using the pre-trained model. """ # Preprocess the face image with cv2 blob = cv2.dnn.blobFromImage(face_image, 1.0 / 255, (96, 96), (0, 0, 0), swapRB=True, crop=False) face_embedder.setInput(blob) embedding = face_embedder.forward() return embedding.flatten() # Based on mediapipe embedder def get_mp_embedding(face_image): """ Generate a face embedding using the pre-trained model. """ # Load the input image from a numpy array. mp_image = mp.Image(image_format=mp.ImageFormat.SRGB, data=np.array(face_image)) embedding_result = mp_embedder.embed(mp_image) return embedding_result.embeddings[0] # Advanced Face Tracking with MediaPipe and Face Embeddings def face_detection_embed(video_path): # Initialize MediaPipe Face Detection mp_face_detection = mp.solutions.face_detection mp_drawing = mp.solutions.drawing_utils # Load a pre-trained face embedding model (OpenCV's FaceNet) #embedder = cv2.dnn.readNetFromTorch("nn4.small2.v1.t7") # Download the model from OpenCV's GitHub, move out from this function # Open the video file video_capture = cv2.VideoCapture(video_path) # Dictionary to store face embeddings and their corresponding IDs, number of matched, normalized images face_tracker = {} # Format: {face_id: {"embedding": face_embedding, "number_matched": number_matched, "image": normalized_face}} face_id_counter = 0 similarity_threshold = 0.5 # Threshold for considering two faces the same frame_number = 0 # Define the target size for normalization target_width = 100 # Desired width for all faces target_height = 100 # Desired height for all faces with mp_face_detection.FaceDetection(min_detection_confidence=0.5) as face_detection: while True: # Grab a single frame of video ret, frame = video_capture.read() if not ret: break if frame_number % 30 == 0: # Convert the frame to RGB for MediaPipe rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) # Process the frame with MediaPipe Face Detection results = face_detection.process(rgb_frame) if results.detections: for detection in results.detections: # Get the bounding box of the face bboxC = detection.location_data.relative_bounding_box ih, iw, _ = frame.shape x = int(bboxC.xmin * iw) y = int(bboxC.ymin * ih) w = int(bboxC.width * iw) h = int(bboxC.height * ih) score = detection.score[0] # Extract the face region face_image = frame[y:y+h, x:x+w] if face_image.size == 0: continue # Skip empty face regions #yield face_image # Yield the frame for streaming # Generate the face embedding face_embedding = get_face_embedding(face_image) #This model has better performance than mp embedder #face_embedding = get_mp_embedding(face_image) # Check if this face matches any previously tracked face, and find face_id with maximum similarity matched_id = None max_similarity = 0 for face_id, data in face_tracker.items(): # Calculate the cosine similarity between embeddings. This model has better performance than mp embedder similarity = np.dot(face_embedding, data["embedding"]) / ( np.linalg.norm(face_embedding) * np.linalg.norm(data["embedding"]) ) ''' # Compute cosine similarity. comment off because of worse performance similarity = ImageEmbedder.cosine_similarity( face_embedding, data["embedding"]) ''' if similarity > max_similarity: max_similarity = similarity max_face_id = face_id # Define a larger bounding box for output faceface xb = int(x * 0.8) yb = int(y * 0.8) xe = int(x * 1.2 + w) ye = int(y * 1.2 + h) if max_similarity > similarity_threshold: matched_id = max_face_id number_matched = face_tracker[matched_id]["number_matched"] + 1 face_tracker[matched_id]["number_matched"] = number_matched if score > face_tracker[matched_id]["score"]: #switch to higher score image face_image_b = frame[yb:ye, xb:xe] normalized_face = cv2.resize(face_image_b, (target_width, target_height)) face_tracker[matched_id] = {"embedding": face_embedding, "number_matched": number_matched, "image": normalized_face, "score":score} # If the face is not matched, assign a new ID if matched_id is None: face_id_counter += 1 matched_id = face_id_counter # Update the face tracker with the new embedding and frame number face_image_b = frame[yb:ye, xb:xe] normalized_face = cv2.resize(face_image_b, (target_width, target_height)) face_tracker[matched_id] = {"embedding": face_embedding, "number_matched": 0, "image": normalized_face, "score":score} # Draw a larger bounding box and face ID cv2.rectangle(frame, (xb, yb), (xe, ye), (0, 255, 0), 2) cv2.putText(frame, f"ID: {matched_id}", (xb, yb - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0, 255, 0), 2) # Display the resulting frame, for debug purpose #yield frame # Yield the frame for streaming #time.sleep(2) #simulate a delay # Increment frame number frame_number += 1 # finished reading video if len(face_tracker) == 0: return None sorted_data = sorted(face_tracker, key=lambda x: face_tracker[x]['number_matched'], reverse =True) # find top N faces in all detected faces number_faces = len(face_tracker) if number_faces >= 3: center_top1 = [sorted_data[1], sorted_data[0], sorted_data[2]] # Top 1 will take center position else: center_top1 = sorted_data images = [] contents = [] for face_id in center_top1: #yield face_tracker[face_id]["image"] # Yield the frame for streaming #time.sleep(2) #simulate a delay face_image = face_tracker[face_id]["image"] images.append(face_image) # Release the video capture object video_capture.release() cv2.destroyAllWindows() return images # Advanced object Tracking with MediaPipe object detection def object_detection_embed(video_path): # Initialize MediaPipe Face Detection BaseOptions = mp.tasks.BaseOptions ObjectDetector = mp.tasks.vision.ObjectDetector ObjectDetectorOptions = mp.tasks.vision.ObjectDetectorOptions VisionRunningMode = mp.tasks.vision.RunningMode options = ObjectDetectorOptions( base_options=BaseOptions(model_asset_path='efficientdet_lite0.tflite'), max_results=3, score_threshold=0.5, running_mode=VisionRunningMode.IMAGE, ) mp_drawing = mp.solutions.drawing_utils # Load a pre-trained face embedding model (OpenCV's FaceNet) #embedder = cv2.dnn.readNetFromTorch("nn4.small2.v1.t7") # Download the model from OpenCV's GitHub, move out from this function # Open the video file video_capture = cv2.VideoCapture(video_path) # Dictionary to store face embeddings and their corresponding IDs, number of matched, normalized images object_tracker = {} # Format: {object_id: {"embedding": obj_embedding, "number_matched": number_matched, "image": normalized_obj, "score":score, "category": category}} object_id_counter = 0 similarity_threshold = 0.5 # Threshold for considering two faces the same frame_number = 0 # Define the target size for normalization, only fix height #target_width = 100 # Desired width for all faces target_height = 100 # Desired height for all faces with ObjectDetector.create_from_options(options) as obj_detection: while True: # Grab a single frame of video ret, frame = video_capture.read() if not ret: break if frame_number % 30 == 0: # Convert the frame to RGB for MediaPipe rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) # Load the image back into memory because Image object needs filepath input frame_height, frame_width, _ = rgb_frame.shape # Load the input image from a numpy array. mp_image = mp.Image(image_format=mp.ImageFormat.SRGB, data=rgb_frame) # Process the frame with MediaPipe Face Detection results = obj_detection.detect(mp_image) if results.detections: for detection in results.detections: #print("line 297: detection:", detection) ''' sample output: Detection(bounding_box=BoundingBox(origin_x=84, origin_y=168, width=272, height=448), categories=[Category(index=None, score=0.81640625, display_name=None, category_name='person')], keypoints=[]) ''' # Get the bounding box of the face, note x is in height ditection(h) bboxC = detection.bounding_box x = int(bboxC.origin_x) y = int(bboxC.origin_y) w = int(bboxC.width) h = int(bboxC.height) score = detection.categories[0].score category = detection.categories[0].category_name # Extract the face region obj_image = frame[y:y+w, x:x+h] if obj_image.size == 0: continue # Skip empty face regions #yield obj_image # Yield the frame for streaming # Generate the face embedding #face_embedding = get_face_embedding(face_image) #This model has better performance than mp embedder obj_embedding = get_mp_embedding(obj_image) # Check if this face matches any previously tracked face, and find face_id with maximum similarity matched_id = None max_similarity = 0 for obj_id, data in object_tracker.items(): ''' # Calculate the cosine similarity between embeddings. This model has better performance than mp embedder similarity = np.dot(face_embedding, data["embedding"]) / ( np.linalg.norm(face_embedding) * np.linalg.norm(data["embedding"]) ) ''' # Compute cosine similarity. comment off because of worse performance similarity = ImageEmbedder.cosine_similarity( obj_embedding, data["embedding"]) if similarity > max_similarity: max_similarity = similarity max_obj_id = obj_id # Define a larger bounding box for output faceface xb = int(x * 0.8) yb = int(y * 0.8) xe = int(x * 1.2 + h) ye = int(y * 1.2 + w) scale = target_height / (x * 0.4 + w) target_width = int((y * 0.4 + w) * scale) if max_similarity > similarity_threshold: matched_id = max_obj_id number_matched = object_tracker[matched_id]["number_matched"] + 1 object_tracker[matched_id]["number_matched"] = number_matched if score > object_tracker[matched_id]["score"]: #switch to higher score image obj_image_b = frame[yb:ye, xb:xe] normalized_obj = cv2.resize(obj_image_b, (target_width, target_height)) object_tracker[matched_id] = {"embedding": obj_embedding, "number_matched": number_matched, "image": normalized_obj, "score":score, "category":category} # If the face is not matched, assign a new ID if matched_id is None: object_id_counter += 1 matched_id = object_id_counter # Update the face tracker with the new embedding and frame number obj_image_b = frame[yb:ye, xb:xe] normalized_obj = cv2.resize(obj_image_b, (target_width, target_height)) object_tracker[matched_id] = {"embedding": obj_embedding, "number_matched": 0, "image": normalized_obj, "score":score, "category":category} # Draw a larger bounding box and face ID #cv2.rectangle(frame, (xb, yb), (xe, ye), (0, 255, 0), 2) #cv2.putText(frame, f"ID: {matched_id}", (xb, yb - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0, 255, 0), 2) # Display the resulting frame, for debug purpose #yield frame # Yield the frame for streaming #time.sleep(2) #simulate a delay # Increment frame number frame_number += 1 # finished reading video if len(object_tracker) == 0: return None sorted_data = sorted(object_tracker, key=lambda x: object_tracker[x]['number_matched'], reverse =True) # find top N faces in all detected faces number_objs = len(object_tracker) if number_objs >= 3: center_top1 = [sorted_data[1], sorted_data[0], sorted_data[2]] # Top 1 will take center position else: center_top1 = sorted_data images = [] contents = [] #center_top1 = [sorted_data[1], sorted_data[0], sorted_data[2]] # Top 1 will take center position for obj_id in center_top1: #yield object_tracker[obj_id]["image"] # Yield the frame for streaming #time.sleep(2) #simulate a delay obj_image = object_tracker[obj_id]["image"] images.append(obj_image) # Release the video capture object video_capture.release() cv2.destroyAllWindows() return images #========================================================================================================= # Summarize video using phi Agent def summarize_video(video_path, user_prompt, out_lang = 'Original'): # Upload and process the video processed_video = upload_file(video_path) # Extract video info to a dictionary video_info = str(processed_video).split('File(')[1] video_info = video_info.replace(")", "") video_dic = eval(video_info) print("display_name, sha256_hash:", video_dic['display_name'], video_dic['sha256_hash']) while processed_video.state.name == "PROCESSING": time.sleep(1) processed_video = get_file(processed_video.name) # detect language lang_prompt = (f'''Give language name''') lang_response = multimodal_Agent.run(lang_prompt, videos=[processed_video]).content language = str(lang_response).split(' ')[-1] print('Video language is:', language) if out_lang == 'Original': out_lang = language # Analysis prompt analysis_prompt = ( f''' First analyze the video and then answer following questions using the video analysis, questions: {user_prompt} Provide a comprehensive response focusing on practical, actionable information with original questions. Answer questions in {out_lang}. limit the total lines to 30 lines.''' ) # AI agent processing response = multimodal_Agent.run(analysis_prompt, videos=[processed_video]) markdown_text = response.content return out_lang, str(markdown_text) #======================================================================================= # Initialize the agent multimodal_Agent = initialize_agent()