Spaces:
Sleeping
Sleeping
# -*- coding: utf-8 -*- | |
from phi.agent import Agent | |
from phi.model.google import Gemini | |
from phi.tools.duckduckgo import DuckDuckGo | |
import google.generativeai as genai | |
from google.generativeai import upload_file, get_file | |
import os | |
import numpy as np | |
import time | |
import uuid | |
import yt_dlp | |
import cv2 | |
import mediapipe as mp | |
#========================================================================================================== | |
# Load a pre-trained face embedding model (OpenCV's FaceNet). This model has better performance than mp embedder | |
face_embedder = cv2.dnn.readNetFromTorch("nn4.small2.v1.t7") # Download the model from OpenCV's GitHub | |
# Define embedder with Mediapipe, -- comment off as worse performance for face detection | |
# Download the model from https://storage.googleapis.com/mediapipe-tasks/image_embedder | |
BaseOptions = mp.tasks.BaseOptions | |
ImageEmbedder = mp.tasks.vision.ImageEmbedder | |
ImageEmbedderOptions = mp.tasks.vision.ImageEmbedderOptions | |
VisionRunningMode = mp.tasks.vision.RunningMode | |
options = ImageEmbedderOptions( | |
base_options=BaseOptions(model_asset_path='mobilenet_v3_small_075_224_embedder.tflite'), | |
quantize=True, | |
running_mode=VisionRunningMode.IMAGE) | |
mp_embedder = ImageEmbedder.create_from_options(options) | |
#================================================================================================================ | |
def initialize_agent(): | |
return Agent( | |
name="Video AI summarizer", | |
model=Gemini(id="gemini-2.0-flash-exp"), | |
tools=[DuckDuckGo()], | |
show_tool_calls=True, | |
markdown=True, | |
) | |
# Based on cv2 facenet embedder | |
def get_face_embedding(face_image): | |
""" | |
Generate a face embedding using the pre-trained model. | |
""" | |
# Preprocess the face image with cv2 | |
blob = cv2.dnn.blobFromImage(face_image, 1.0 / 255, (96, 96), (0, 0, 0), swapRB=True, crop=False) | |
face_embedder.setInput(blob) | |
embedding = face_embedder.forward() | |
return embedding.flatten() | |
# Based on mediapipe embedder | |
def get_mp_embedding(face_image): | |
""" | |
Generate a face embedding using the pre-trained model. | |
""" | |
# Load the input image from a numpy array. | |
mp_image = mp.Image(image_format=mp.ImageFormat.SRGB, data=np.array(face_image)) | |
embedding_result = mp_embedder.embed(mp_image) | |
return embedding_result.embeddings[0] | |
# Advanced Face Tracking with MediaPipe and Face Embeddings | |
def face_detection_embed(video_path): | |
# Initialize MediaPipe Face Detection | |
mp_face_detection = mp.solutions.face_detection | |
mp_drawing = mp.solutions.drawing_utils | |
# Load a pre-trained face embedding model (OpenCV's FaceNet) | |
#embedder = cv2.dnn.readNetFromTorch("nn4.small2.v1.t7") # Download the model from OpenCV's GitHub, move out from this function | |
# Open the video file | |
video_capture = cv2.VideoCapture(video_path) | |
# Dictionary to store face embeddings and their corresponding IDs, number of matched, normalized images | |
face_tracker = {} # Format: {face_id: {"embedding": face_embedding, "number_matched": number_matched, "image": normalized_face}} | |
face_id_counter = 0 | |
similarity_threshold = 0.5 # Threshold for considering two faces the same | |
frame_number = 0 | |
# Define the target size for normalization | |
target_width = 100 # Desired width for all faces | |
target_height = 100 # Desired height for all faces | |
with mp_face_detection.FaceDetection(min_detection_confidence=0.5) as face_detection: | |
while True: | |
# Grab a single frame of video | |
ret, frame = video_capture.read() | |
if not ret: | |
break | |
if frame_number % 30 == 0: | |
# Convert the frame to RGB for MediaPipe | |
rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) | |
# Process the frame with MediaPipe Face Detection | |
results = face_detection.process(rgb_frame) | |
if results.detections: | |
for detection in results.detections: | |
# Get the bounding box of the face | |
bboxC = detection.location_data.relative_bounding_box | |
ih, iw, _ = frame.shape | |
x = int(bboxC.xmin * iw) | |
y = int(bboxC.ymin * ih) | |
w = int(bboxC.width * iw) | |
h = int(bboxC.height * ih) | |
score = detection.score[0] | |
# Extract the face region | |
face_image = frame[y:y+h, x:x+w] | |
if face_image.size == 0: | |
continue # Skip empty face regions | |
#yield face_image # Yield the frame for streaming | |
# Generate the face embedding | |
face_embedding = get_face_embedding(face_image) #This model has better performance than mp embedder | |
#face_embedding = get_mp_embedding(face_image) | |
# Check if this face matches any previously tracked face, and find face_id with maximum similarity | |
matched_id = None | |
max_similarity = 0 | |
for face_id, data in face_tracker.items(): | |
# Calculate the cosine similarity between embeddings. This model has better performance than mp embedder | |
similarity = np.dot(face_embedding, data["embedding"]) / ( | |
np.linalg.norm(face_embedding) * np.linalg.norm(data["embedding"]) | |
) | |
''' | |
# Compute cosine similarity. comment off because of worse performance | |
similarity = ImageEmbedder.cosine_similarity( | |
face_embedding, data["embedding"]) | |
''' | |
if similarity > max_similarity: | |
max_similarity = similarity | |
max_face_id = face_id | |
# Define a larger bounding box for output faceface | |
xb = int(x * 0.8) | |
yb = int(y * 0.8) | |
xe = int(x * 1.2 + w) | |
ye = int(y * 1.2 + h) | |
if max_similarity > similarity_threshold: | |
matched_id = max_face_id | |
number_matched = face_tracker[matched_id]["number_matched"] + 1 | |
face_tracker[matched_id]["number_matched"] = number_matched | |
if score > face_tracker[matched_id]["score"]: #switch to higher score image | |
face_image_b = frame[yb:ye, xb:xe] | |
normalized_face = cv2.resize(face_image_b, (target_width, target_height)) | |
face_tracker[matched_id] = {"embedding": face_embedding, "number_matched": number_matched, "image": normalized_face, "score":score} | |
# If the face is not matched, assign a new ID | |
if matched_id is None: | |
face_id_counter += 1 | |
matched_id = face_id_counter | |
# Update the face tracker with the new embedding and frame number | |
face_image_b = frame[yb:ye, xb:xe] | |
normalized_face = cv2.resize(face_image_b, (target_width, target_height)) | |
face_tracker[matched_id] = {"embedding": face_embedding, "number_matched": 0, "image": normalized_face, "score":score} | |
# Draw a larger bounding box and face ID | |
cv2.rectangle(frame, (xb, yb), (xe, ye), (0, 255, 0), 2) | |
cv2.putText(frame, f"ID: {matched_id}", (xb, yb - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0, 255, 0), 2) | |
# Display the resulting frame, for debug purpose | |
#yield frame # Yield the frame for streaming | |
#time.sleep(2) #simulate a delay | |
# Increment frame number | |
frame_number += 1 | |
# finished reading video | |
if len(face_tracker) == 0: | |
return None | |
sorted_data = sorted(face_tracker, key=lambda x: face_tracker[x]['number_matched'], reverse =True) | |
# find top N faces in all detected faces | |
number_faces = len(face_tracker) | |
if number_faces >= 3: | |
center_top1 = [sorted_data[1], sorted_data[0], sorted_data[2]] # Top 1 will take center position | |
else: | |
center_top1 = sorted_data | |
images = [] | |
contents = [] | |
for face_id in center_top1: | |
#yield face_tracker[face_id]["image"] # Yield the frame for streaming | |
#time.sleep(2) #simulate a delay | |
face_image = face_tracker[face_id]["image"] | |
images.append(face_image) | |
# Release the video capture object | |
video_capture.release() | |
cv2.destroyAllWindows() | |
return images | |
# Advanced object Tracking with MediaPipe object detection | |
def object_detection_embed(video_path): | |
# Initialize MediaPipe Face Detection | |
BaseOptions = mp.tasks.BaseOptions | |
ObjectDetector = mp.tasks.vision.ObjectDetector | |
ObjectDetectorOptions = mp.tasks.vision.ObjectDetectorOptions | |
VisionRunningMode = mp.tasks.vision.RunningMode | |
options = ObjectDetectorOptions( | |
base_options=BaseOptions(model_asset_path='efficientdet_lite0.tflite'), | |
max_results=3, | |
score_threshold=0.5, | |
running_mode=VisionRunningMode.IMAGE, | |
) | |
mp_drawing = mp.solutions.drawing_utils | |
# Load a pre-trained face embedding model (OpenCV's FaceNet) | |
#embedder = cv2.dnn.readNetFromTorch("nn4.small2.v1.t7") # Download the model from OpenCV's GitHub, move out from this function | |
# Open the video file | |
video_capture = cv2.VideoCapture(video_path) | |
# Dictionary to store face embeddings and their corresponding IDs, number of matched, normalized images | |
object_tracker = {} # Format: {object_id: {"embedding": obj_embedding, "number_matched": number_matched, "image": normalized_obj, "score":score, "category": category}} | |
object_id_counter = 0 | |
similarity_threshold = 0.5 # Threshold for considering two faces the same | |
frame_number = 0 | |
# Define the target size for normalization, only fix height | |
#target_width = 100 # Desired width for all faces | |
target_height = 100 # Desired height for all faces | |
with ObjectDetector.create_from_options(options) as obj_detection: | |
while True: | |
# Grab a single frame of video | |
ret, frame = video_capture.read() | |
if not ret: | |
break | |
if frame_number % 30 == 0: | |
# Convert the frame to RGB for MediaPipe | |
rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) | |
# Load the image back into memory because Image object needs filepath input | |
frame_height, frame_width, _ = rgb_frame.shape | |
# Load the input image from a numpy array. | |
mp_image = mp.Image(image_format=mp.ImageFormat.SRGB, data=rgb_frame) | |
# Process the frame with MediaPipe Face Detection | |
results = obj_detection.detect(mp_image) | |
if results.detections: | |
for detection in results.detections: | |
#print("line 297: detection:", detection) | |
''' | |
sample output: | |
Detection(bounding_box=BoundingBox(origin_x=84, origin_y=168, width=272, height=448), | |
categories=[Category(index=None, score=0.81640625, display_name=None, category_name='person')], keypoints=[]) | |
''' | |
# Get the bounding box of the face, note x is in height ditection(h) | |
bboxC = detection.bounding_box | |
x = int(bboxC.origin_x) | |
y = int(bboxC.origin_y) | |
w = int(bboxC.width) | |
h = int(bboxC.height) | |
score = detection.categories[0].score | |
category = detection.categories[0].category_name | |
# Extract the face region | |
obj_image = frame[y:y+w, x:x+h] | |
if obj_image.size == 0: | |
continue # Skip empty face regions | |
#yield obj_image # Yield the frame for streaming | |
# Generate the face embedding | |
#face_embedding = get_face_embedding(face_image) #This model has better performance than mp embedder | |
obj_embedding = get_mp_embedding(obj_image) | |
# Check if this face matches any previously tracked face, and find face_id with maximum similarity | |
matched_id = None | |
max_similarity = 0 | |
for obj_id, data in object_tracker.items(): | |
''' | |
# Calculate the cosine similarity between embeddings. This model has better performance than mp embedder | |
similarity = np.dot(face_embedding, data["embedding"]) / ( | |
np.linalg.norm(face_embedding) * np.linalg.norm(data["embedding"]) | |
) | |
''' | |
# Compute cosine similarity. comment off because of worse performance | |
similarity = ImageEmbedder.cosine_similarity( | |
obj_embedding, data["embedding"]) | |
if similarity > max_similarity: | |
max_similarity = similarity | |
max_obj_id = obj_id | |
# Define a larger bounding box for output faceface | |
xb = int(x * 0.8) | |
yb = int(y * 0.8) | |
xe = int(x * 1.2 + h) | |
ye = int(y * 1.2 + w) | |
scale = target_height / (x * 0.4 + w) | |
target_width = int((y * 0.4 + w) * scale) | |
if max_similarity > similarity_threshold: | |
matched_id = max_obj_id | |
number_matched = object_tracker[matched_id]["number_matched"] + 1 | |
object_tracker[matched_id]["number_matched"] = number_matched | |
if score > object_tracker[matched_id]["score"]: #switch to higher score image | |
obj_image_b = frame[yb:ye, xb:xe] | |
normalized_obj = cv2.resize(obj_image_b, (target_width, target_height)) | |
object_tracker[matched_id] = {"embedding": obj_embedding, "number_matched": number_matched, "image": normalized_obj, "score":score, "category":category} | |
# If the face is not matched, assign a new ID | |
if matched_id is None: | |
object_id_counter += 1 | |
matched_id = object_id_counter | |
# Update the face tracker with the new embedding and frame number | |
obj_image_b = frame[yb:ye, xb:xe] | |
normalized_obj = cv2.resize(obj_image_b, (target_width, target_height)) | |
object_tracker[matched_id] = {"embedding": obj_embedding, "number_matched": 0, "image": normalized_obj, "score":score, "category":category} | |
# Draw a larger bounding box and face ID | |
#cv2.rectangle(frame, (xb, yb), (xe, ye), (0, 255, 0), 2) | |
#cv2.putText(frame, f"ID: {matched_id}", (xb, yb - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0, 255, 0), 2) | |
# Display the resulting frame, for debug purpose | |
#yield frame # Yield the frame for streaming | |
#time.sleep(2) #simulate a delay | |
# Increment frame number | |
frame_number += 1 | |
# finished reading video | |
if len(object_tracker) == 0: | |
return None | |
sorted_data = sorted(object_tracker, key=lambda x: object_tracker[x]['number_matched'], reverse =True) | |
# find top N faces in all detected faces | |
number_objs = len(object_tracker) | |
if number_objs >= 3: | |
center_top1 = [sorted_data[1], sorted_data[0], sorted_data[2]] # Top 1 will take center position | |
else: | |
center_top1 = sorted_data | |
images = [] | |
contents = [] | |
#center_top1 = [sorted_data[1], sorted_data[0], sorted_data[2]] # Top 1 will take center position | |
for obj_id in center_top1: | |
#yield object_tracker[obj_id]["image"] # Yield the frame for streaming | |
#time.sleep(2) #simulate a delay | |
obj_image = object_tracker[obj_id]["image"] | |
images.append(obj_image) | |
# Release the video capture object | |
video_capture.release() | |
cv2.destroyAllWindows() | |
return images | |
#========================================================================================================= | |
# Summarize video using phi Agent | |
def summarize_video(video_path, user_prompt, out_lang = 'Original'): | |
# Upload and process the video | |
processed_video = upload_file(video_path) | |
# Extract video info to a dictionary | |
video_info = str(processed_video).split('File(')[1] | |
video_info = video_info.replace(")", "") | |
video_dic = eval(video_info) | |
print("display_name, sha256_hash:", video_dic['display_name'], video_dic['sha256_hash']) | |
while processed_video.state.name == "PROCESSING": | |
time.sleep(1) | |
processed_video = get_file(processed_video.name) | |
# detect language | |
lang_prompt = (f'''Give language name''') | |
lang_response = multimodal_Agent.run(lang_prompt, videos=[processed_video]).content | |
language = str(lang_response).split(' ')[-1] | |
print('Video language is:', language) | |
if out_lang == 'Original': | |
out_lang = language | |
# Analysis prompt | |
analysis_prompt = ( f''' | |
First analyze the video and then answer following questions using the video analysis, questions: | |
{user_prompt} | |
Provide a comprehensive response focusing on practical, actionable information with original questions. | |
Answer questions in {out_lang}. limit the total lines to 30 lines.''' | |
) | |
# AI agent processing | |
response = multimodal_Agent.run(analysis_prompt, videos=[processed_video]) | |
markdown_text = response.content | |
return out_lang, str(markdown_text) | |
#======================================================================================= | |
# Initialize the agent | |
multimodal_Agent = initialize_agent() |