Video2Poster_Agent / agent_tools.py
wangjin2000's picture
Upload 6 files
a81e750 verified
# -*- coding: utf-8 -*-
from phi.agent import Agent
from phi.model.google import Gemini
from phi.tools.duckduckgo import DuckDuckGo
import google.generativeai as genai
from google.generativeai import upload_file, get_file
import os
import numpy as np
import time
import uuid
import yt_dlp
import cv2
import mediapipe as mp
#==========================================================================================================
# Load a pre-trained face embedding model (OpenCV's FaceNet). This model has better performance than mp embedder
face_embedder = cv2.dnn.readNetFromTorch("nn4.small2.v1.t7") # Download the model from OpenCV's GitHub
# Define embedder with Mediapipe, -- comment off as worse performance for face detection
# Download the model from https://storage.googleapis.com/mediapipe-tasks/image_embedder
BaseOptions = mp.tasks.BaseOptions
ImageEmbedder = mp.tasks.vision.ImageEmbedder
ImageEmbedderOptions = mp.tasks.vision.ImageEmbedderOptions
VisionRunningMode = mp.tasks.vision.RunningMode
options = ImageEmbedderOptions(
base_options=BaseOptions(model_asset_path='mobilenet_v3_small_075_224_embedder.tflite'),
quantize=True,
running_mode=VisionRunningMode.IMAGE)
mp_embedder = ImageEmbedder.create_from_options(options)
#================================================================================================================
def initialize_agent():
return Agent(
name="Video AI summarizer",
model=Gemini(id="gemini-2.0-flash-exp"),
tools=[DuckDuckGo()],
show_tool_calls=True,
markdown=True,
)
# Based on cv2 facenet embedder
def get_face_embedding(face_image):
"""
Generate a face embedding using the pre-trained model.
"""
# Preprocess the face image with cv2
blob = cv2.dnn.blobFromImage(face_image, 1.0 / 255, (96, 96), (0, 0, 0), swapRB=True, crop=False)
face_embedder.setInput(blob)
embedding = face_embedder.forward()
return embedding.flatten()
# Based on mediapipe embedder
def get_mp_embedding(face_image):
"""
Generate a face embedding using the pre-trained model.
"""
# Load the input image from a numpy array.
mp_image = mp.Image(image_format=mp.ImageFormat.SRGB, data=np.array(face_image))
embedding_result = mp_embedder.embed(mp_image)
return embedding_result.embeddings[0]
# Advanced Face Tracking with MediaPipe and Face Embeddings
def face_detection_embed(video_path):
# Initialize MediaPipe Face Detection
mp_face_detection = mp.solutions.face_detection
mp_drawing = mp.solutions.drawing_utils
# Load a pre-trained face embedding model (OpenCV's FaceNet)
#embedder = cv2.dnn.readNetFromTorch("nn4.small2.v1.t7") # Download the model from OpenCV's GitHub, move out from this function
# Open the video file
video_capture = cv2.VideoCapture(video_path)
# Dictionary to store face embeddings and their corresponding IDs, number of matched, normalized images
face_tracker = {} # Format: {face_id: {"embedding": face_embedding, "number_matched": number_matched, "image": normalized_face}}
face_id_counter = 0
similarity_threshold = 0.5 # Threshold for considering two faces the same
frame_number = 0
# Define the target size for normalization
target_width = 100 # Desired width for all faces
target_height = 100 # Desired height for all faces
with mp_face_detection.FaceDetection(min_detection_confidence=0.5) as face_detection:
while True:
# Grab a single frame of video
ret, frame = video_capture.read()
if not ret:
break
if frame_number % 30 == 0:
# Convert the frame to RGB for MediaPipe
rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
# Process the frame with MediaPipe Face Detection
results = face_detection.process(rgb_frame)
if results.detections:
for detection in results.detections:
# Get the bounding box of the face
bboxC = detection.location_data.relative_bounding_box
ih, iw, _ = frame.shape
x = int(bboxC.xmin * iw)
y = int(bboxC.ymin * ih)
w = int(bboxC.width * iw)
h = int(bboxC.height * ih)
score = detection.score[0]
# Extract the face region
face_image = frame[y:y+h, x:x+w]
if face_image.size == 0:
continue # Skip empty face regions
#yield face_image # Yield the frame for streaming
# Generate the face embedding
face_embedding = get_face_embedding(face_image) #This model has better performance than mp embedder
#face_embedding = get_mp_embedding(face_image)
# Check if this face matches any previously tracked face, and find face_id with maximum similarity
matched_id = None
max_similarity = 0
for face_id, data in face_tracker.items():
# Calculate the cosine similarity between embeddings. This model has better performance than mp embedder
similarity = np.dot(face_embedding, data["embedding"]) / (
np.linalg.norm(face_embedding) * np.linalg.norm(data["embedding"])
)
'''
# Compute cosine similarity. comment off because of worse performance
similarity = ImageEmbedder.cosine_similarity(
face_embedding, data["embedding"])
'''
if similarity > max_similarity:
max_similarity = similarity
max_face_id = face_id
# Define a larger bounding box for output faceface
xb = int(x * 0.8)
yb = int(y * 0.8)
xe = int(x * 1.2 + w)
ye = int(y * 1.2 + h)
if max_similarity > similarity_threshold:
matched_id = max_face_id
number_matched = face_tracker[matched_id]["number_matched"] + 1
face_tracker[matched_id]["number_matched"] = number_matched
if score > face_tracker[matched_id]["score"]: #switch to higher score image
face_image_b = frame[yb:ye, xb:xe]
normalized_face = cv2.resize(face_image_b, (target_width, target_height))
face_tracker[matched_id] = {"embedding": face_embedding, "number_matched": number_matched, "image": normalized_face, "score":score}
# If the face is not matched, assign a new ID
if matched_id is None:
face_id_counter += 1
matched_id = face_id_counter
# Update the face tracker with the new embedding and frame number
face_image_b = frame[yb:ye, xb:xe]
normalized_face = cv2.resize(face_image_b, (target_width, target_height))
face_tracker[matched_id] = {"embedding": face_embedding, "number_matched": 0, "image": normalized_face, "score":score}
# Draw a larger bounding box and face ID
cv2.rectangle(frame, (xb, yb), (xe, ye), (0, 255, 0), 2)
cv2.putText(frame, f"ID: {matched_id}", (xb, yb - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0, 255, 0), 2)
# Display the resulting frame, for debug purpose
#yield frame # Yield the frame for streaming
#time.sleep(2) #simulate a delay
# Increment frame number
frame_number += 1
# finished reading video
if len(face_tracker) == 0:
return None
sorted_data = sorted(face_tracker, key=lambda x: face_tracker[x]['number_matched'], reverse =True)
# find top N faces in all detected faces
number_faces = len(face_tracker)
if number_faces >= 3:
center_top1 = [sorted_data[1], sorted_data[0], sorted_data[2]] # Top 1 will take center position
else:
center_top1 = sorted_data
images = []
contents = []
for face_id in center_top1:
#yield face_tracker[face_id]["image"] # Yield the frame for streaming
#time.sleep(2) #simulate a delay
face_image = face_tracker[face_id]["image"]
images.append(face_image)
# Release the video capture object
video_capture.release()
cv2.destroyAllWindows()
return images
# Advanced object Tracking with MediaPipe object detection
def object_detection_embed(video_path):
# Initialize MediaPipe Face Detection
BaseOptions = mp.tasks.BaseOptions
ObjectDetector = mp.tasks.vision.ObjectDetector
ObjectDetectorOptions = mp.tasks.vision.ObjectDetectorOptions
VisionRunningMode = mp.tasks.vision.RunningMode
options = ObjectDetectorOptions(
base_options=BaseOptions(model_asset_path='efficientdet_lite0.tflite'),
max_results=3,
score_threshold=0.5,
running_mode=VisionRunningMode.IMAGE,
)
mp_drawing = mp.solutions.drawing_utils
# Load a pre-trained face embedding model (OpenCV's FaceNet)
#embedder = cv2.dnn.readNetFromTorch("nn4.small2.v1.t7") # Download the model from OpenCV's GitHub, move out from this function
# Open the video file
video_capture = cv2.VideoCapture(video_path)
# Dictionary to store face embeddings and their corresponding IDs, number of matched, normalized images
object_tracker = {} # Format: {object_id: {"embedding": obj_embedding, "number_matched": number_matched, "image": normalized_obj, "score":score, "category": category}}
object_id_counter = 0
similarity_threshold = 0.5 # Threshold for considering two faces the same
frame_number = 0
# Define the target size for normalization, only fix height
#target_width = 100 # Desired width for all faces
target_height = 100 # Desired height for all faces
with ObjectDetector.create_from_options(options) as obj_detection:
while True:
# Grab a single frame of video
ret, frame = video_capture.read()
if not ret:
break
if frame_number % 30 == 0:
# Convert the frame to RGB for MediaPipe
rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
# Load the image back into memory because Image object needs filepath input
frame_height, frame_width, _ = rgb_frame.shape
# Load the input image from a numpy array.
mp_image = mp.Image(image_format=mp.ImageFormat.SRGB, data=rgb_frame)
# Process the frame with MediaPipe Face Detection
results = obj_detection.detect(mp_image)
if results.detections:
for detection in results.detections:
#print("line 297: detection:", detection)
'''
sample output:
Detection(bounding_box=BoundingBox(origin_x=84, origin_y=168, width=272, height=448),
categories=[Category(index=None, score=0.81640625, display_name=None, category_name='person')], keypoints=[])
'''
# Get the bounding box of the face, note x is in height ditection(h)
bboxC = detection.bounding_box
x = int(bboxC.origin_x)
y = int(bboxC.origin_y)
w = int(bboxC.width)
h = int(bboxC.height)
score = detection.categories[0].score
category = detection.categories[0].category_name
# Extract the face region
obj_image = frame[y:y+w, x:x+h]
if obj_image.size == 0:
continue # Skip empty face regions
#yield obj_image # Yield the frame for streaming
# Generate the face embedding
#face_embedding = get_face_embedding(face_image) #This model has better performance than mp embedder
obj_embedding = get_mp_embedding(obj_image)
# Check if this face matches any previously tracked face, and find face_id with maximum similarity
matched_id = None
max_similarity = 0
for obj_id, data in object_tracker.items():
'''
# Calculate the cosine similarity between embeddings. This model has better performance than mp embedder
similarity = np.dot(face_embedding, data["embedding"]) / (
np.linalg.norm(face_embedding) * np.linalg.norm(data["embedding"])
)
'''
# Compute cosine similarity. comment off because of worse performance
similarity = ImageEmbedder.cosine_similarity(
obj_embedding, data["embedding"])
if similarity > max_similarity:
max_similarity = similarity
max_obj_id = obj_id
# Define a larger bounding box for output faceface
xb = int(x * 0.8)
yb = int(y * 0.8)
xe = int(x * 1.2 + h)
ye = int(y * 1.2 + w)
scale = target_height / (x * 0.4 + w)
target_width = int((y * 0.4 + w) * scale)
if max_similarity > similarity_threshold:
matched_id = max_obj_id
number_matched = object_tracker[matched_id]["number_matched"] + 1
object_tracker[matched_id]["number_matched"] = number_matched
if score > object_tracker[matched_id]["score"]: #switch to higher score image
obj_image_b = frame[yb:ye, xb:xe]
normalized_obj = cv2.resize(obj_image_b, (target_width, target_height))
object_tracker[matched_id] = {"embedding": obj_embedding, "number_matched": number_matched, "image": normalized_obj, "score":score, "category":category}
# If the face is not matched, assign a new ID
if matched_id is None:
object_id_counter += 1
matched_id = object_id_counter
# Update the face tracker with the new embedding and frame number
obj_image_b = frame[yb:ye, xb:xe]
normalized_obj = cv2.resize(obj_image_b, (target_width, target_height))
object_tracker[matched_id] = {"embedding": obj_embedding, "number_matched": 0, "image": normalized_obj, "score":score, "category":category}
# Draw a larger bounding box and face ID
#cv2.rectangle(frame, (xb, yb), (xe, ye), (0, 255, 0), 2)
#cv2.putText(frame, f"ID: {matched_id}", (xb, yb - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0, 255, 0), 2)
# Display the resulting frame, for debug purpose
#yield frame # Yield the frame for streaming
#time.sleep(2) #simulate a delay
# Increment frame number
frame_number += 1
# finished reading video
if len(object_tracker) == 0:
return None
sorted_data = sorted(object_tracker, key=lambda x: object_tracker[x]['number_matched'], reverse =True)
# find top N faces in all detected faces
number_objs = len(object_tracker)
if number_objs >= 3:
center_top1 = [sorted_data[1], sorted_data[0], sorted_data[2]] # Top 1 will take center position
else:
center_top1 = sorted_data
images = []
contents = []
#center_top1 = [sorted_data[1], sorted_data[0], sorted_data[2]] # Top 1 will take center position
for obj_id in center_top1:
#yield object_tracker[obj_id]["image"] # Yield the frame for streaming
#time.sleep(2) #simulate a delay
obj_image = object_tracker[obj_id]["image"]
images.append(obj_image)
# Release the video capture object
video_capture.release()
cv2.destroyAllWindows()
return images
#=========================================================================================================
# Summarize video using phi Agent
def summarize_video(video_path, user_prompt, out_lang = 'Original'):
# Upload and process the video
processed_video = upload_file(video_path)
# Extract video info to a dictionary
video_info = str(processed_video).split('File(')[1]
video_info = video_info.replace(")", "")
video_dic = eval(video_info)
print("display_name, sha256_hash:", video_dic['display_name'], video_dic['sha256_hash'])
while processed_video.state.name == "PROCESSING":
time.sleep(1)
processed_video = get_file(processed_video.name)
# detect language
lang_prompt = (f'''Give language name''')
lang_response = multimodal_Agent.run(lang_prompt, videos=[processed_video]).content
language = str(lang_response).split(' ')[-1]
print('Video language is:', language)
if out_lang == 'Original':
out_lang = language
# Analysis prompt
analysis_prompt = ( f'''
First analyze the video and then answer following questions using the video analysis, questions:
{user_prompt}
Provide a comprehensive response focusing on practical, actionable information with original questions.
Answer questions in {out_lang}. limit the total lines to 30 lines.'''
)
# AI agent processing
response = multimodal_Agent.run(analysis_prompt, videos=[processed_video])
markdown_text = response.content
return out_lang, str(markdown_text)
#=======================================================================================
# Initialize the agent
multimodal_Agent = initialize_agent()