import os import json import av from ultralytics import YOLO from PIL import Image from datetime import timedelta # Paths VIDEOS_DIR = '.' video_path = os.path.join(VIDEOS_DIR, 'sample_video.mp4') output_json_path = 'output.json' model_path = os.path.join('.', 'best.pt') # Load YOLOv8 model model = YOLO(model_path) # Load a custom model threshold = 0.5 def format_timestamp(seconds): # Convert seconds to timedelta and format as HH:MM:SS td = timedelta(seconds=seconds) return str(td) def extract_frames(video_path): container = av.open(video_path) frames = [] for frame in container.decode(video=0): # Convert timestamp to float seconds timestamp = float(frame.pts * frame.time_base) img = frame.to_image() frames.append((img, timestamp)) return frames def detect_logos(frames): pepsi_pts = [] cocacola_pts = [] for img, timestamp in frames: results = model(img) # Run inference for result in results: boxes = result.boxes # Boxes object for bounding box outputs for box in boxes: # Extract the bounding box and confidence x1, y1, x2, y2 = box.xyxy[0].tolist() # Convert to list score = box.conf[0].item() # Convert to float class_id = int(box.cls[0].item()) # Convert to int if score > threshold: class_name = result.names[class_id].upper() width = x2 - x1 height = y2 - y1 center_x = (x1 + x2) / 2 center_y = (y1 + y2) / 2 frame_center_x = img.width / 2 frame_center_y = img.height / 2 distance_from_center = ((center_x - frame_center_x) ** 2 + (center_y - frame_center_y) ** 2) ** 0.5 formatted_timestamp = format_timestamp(timestamp) entry = { "timestamp": formatted_timestamp, "size": {"width": width, "height": height}, "distance_from_center": distance_from_center } if class_name == 'PEPSI': pepsi_pts.append(entry) elif class_name == 'COCA-COLA': cocacola_pts.append(entry) return pepsi_pts, cocacola_pts def generate_output_json(pepsi_pts, cocacola_pts, output_path='output.json'): # Convert all values to strings for JSON serialization def to_serializable(obj): if isinstance(obj, (list, dict)): return obj elif hasattr(obj, 'tolist'): return obj.tolist() # Convert numpy arrays or tensors elif hasattr(obj, 'item'): return obj.item() # Convert single element tensors else: return str(obj) # Convert other non-serializable objects to string output = { "Pepsi_pts": [entry["timestamp"] for entry in pepsi_pts], "CocaCola_pts": [entry["timestamp"] for entry in cocacola_pts], "Pepsi_details": [ {k: to_serializable(v) for k, v in entry.items()} for entry in pepsi_pts ], "CocaCola_details": [ {k: to_serializable(v) for k, v in entry.items()} for entry in cocacola_pts ] } with open(output_path, 'w') as f: json.dump(output, f, indent=4) def main(video_path): frames = extract_frames(video_path) pepsi_pts, cocacola_pts = detect_logos(frames) generate_output_json(pepsi_pts, cocacola_pts) if __name__ == "__main__": import sys if len(sys.argv) < 2: print("Usage: python main.py ") sys.exit(1) video_path = sys.argv[1] main(video_path)