| | import torch |
| | from transformers import AutoProcessor, AutoModelForImageTextToText |
| | from typing import List, Dict |
| | import logging |
| | import os |
| | import subprocess |
| | import json |
| | import tempfile |
| | import time |
| |
|
| | logger = logging.getLogger(__name__) |
| |
|
| | def _grab_best_device(use_gpu=True): |
| | if torch.cuda.device_count() > 0 and use_gpu: |
| | device = "cuda" |
| | else: |
| | device = "cpu" |
| | return device |
| |
|
| | def get_video_duration_seconds(video_path: str) -> float: |
| | """Use ffprobe to get video duration in seconds.""" |
| | cmd = [ |
| | "ffprobe", |
| | "-v", "quiet", |
| | "-print_format", "json", |
| | "-show_format", |
| | video_path |
| | ] |
| | result = subprocess.run(cmd, capture_output=True, text=True) |
| | info = json.loads(result.stdout) |
| | return float(info["format"]["duration"]) |
| |
|
| | def format_duration(seconds: int) -> str: |
| | minutes = seconds // 60 |
| | secs = seconds % 60 |
| | return f"{minutes:02d}:{secs:02d}" |
| |
|
| | DEVICE = _grab_best_device() |
| |
|
| | logger.info(f"Using device: {DEVICE}") |
| |
|
| | class VideoAnalyzer: |
| | def __init__(self): |
| | if not torch.cuda.is_available(): |
| | raise RuntimeError("CUDA is required but not available!") |
| | |
| | logger.info("Initializing VideoAnalyzer") |
| | self.model_path = "HuggingFaceTB/SmolVLM2-500M-Video-Instruct" |
| | logger.info(f"Loading model from {self.model_path} - Using device: {DEVICE}") |
| | |
| | |
| | self.processor = AutoProcessor.from_pretrained(self.model_path) |
| |
|
| | self.model = AutoModelForImageTextToText.from_pretrained( |
| | self.model_path, |
| | torch_dtype=torch.bfloat16, |
| | device_map=DEVICE, |
| | _attn_implementation="flash_attention_2", |
| | low_cpu_mem_usage=True, |
| | ).to(DEVICE) |
| | |
| | |
| | self.model = torch.compile(self.model, mode="reduce-overhead") |
| | logger.info(f"Model loaded and compiled on device: {self.model.device}") |
| | |
| | def analyze_segment(self, video_path: str, start_time: float) -> str: |
| | """Analyze a single video segment.""" |
| |
|
| | messages = [ |
| | { |
| | "role": "system", |
| | "content": [ |
| | { |
| | "type": "text", |
| | "text": ( |
| | "You are an AI specialized in video content analysis. " |
| | "Your task is to watch the provided video segment and generate a detailed, structured description focusing on the following elements:\n" |
| | "1. **People and Their Actions:** Identify all individuals, their appearances, and describe their activities or interactions.\n" |
| | "2. **Environment and Setting:** Describe the location, time of day, weather conditions, and any notable background details.\n" |
| | "3. **Objects and Their Positions:** List prominent objects, their attributes, and spatial relationships within the scene.\n" |
| | "4. **On-Screen Text:** Transcribe any visible text, including signs, labels, or subtitles, and specify their locations.\n" |
| | "5. **Key Events and Timing:** Outline significant events, actions, or changes, along with their timestamps.\n\n" |
| | "Provide the information in a clear and concise manner, using bullet points or numbered lists where appropriate." |
| | ) |
| | } |
| | ] |
| | }, |
| | { |
| | "role": "user", |
| | "content": [ |
| | {"type": "video", "path": video_path}, |
| | { |
| | "type": "text", |
| | "text": ( |
| | "Please analyze the attached video segment and provide a structured description as per the guidelines above. " |
| | "If certain elements are not present in the video, you may omit them from your response." |
| | ) |
| | } |
| | ] |
| | } |
| | ] |
| |
|
| |
|
| | inputs = self.processor.apply_chat_template( |
| | messages, |
| | add_generation_prompt=True, |
| | tokenize=True, |
| | return_dict=True, |
| | return_tensors="pt" |
| | ).to(DEVICE, dtype=torch.bfloat16) |
| | |
| | with torch.inference_mode(): |
| | outputs = self.model.generate( |
| | **inputs, |
| | do_sample=True, |
| | temperature=0.7, |
| | max_new_tokens=256, |
| | ) |
| | return self.processor.batch_decode(outputs, skip_special_tokens=True)[0].split("Assistant: ")[-1] |
| |
|
| | def process_video(self, video_path: str, segment_length: int = 10) -> List[Dict]: |
| | try: |
| | |
| | temp_dir = tempfile.mkdtemp() |
| | |
| | |
| | duration = get_video_duration_seconds(video_path) |
| | total_segments = (int(duration) + segment_length - 1) // segment_length |
| | logger.info(f"Processing {total_segments} segments for video of length {duration:.2f} seconds") |
| | |
| | |
| | for segment_idx in range(total_segments): |
| | segment_start_time = time.time() |
| | start_time = segment_idx * segment_length |
| | end_time = min(start_time + segment_length, duration) |
| | |
| | |
| | if start_time >= duration: |
| | break |
| | |
| | |
| | segment_path = os.path.join(temp_dir, f"segment_{start_time}.mp4") |
| | cmd = [ |
| | "ffmpeg", |
| | "-y", |
| | "-i", video_path, |
| | "-ss", str(start_time), |
| | "-t", str(segment_length), |
| | "-c:v", "libx264", |
| | "-preset", "ultrafast", |
| | "-pix_fmt", "yuv420p", |
| | segment_path |
| | ] |
| | |
| | ffmpeg_start = time.time() |
| | subprocess.run(cmd, check=True) |
| | ffmpeg_time = time.time() - ffmpeg_start |
| | |
| | |
| | inference_start = time.time() |
| | description = self.analyze_segment(segment_path, start_time) |
| | inference_time = time.time() - inference_start |
| | |
| | |
| | yield { |
| | "timestamp": format_duration(int(start_time)), |
| | "description": description, |
| | "processing_times": { |
| | "ffmpeg": ffmpeg_time, |
| | "inference": inference_time, |
| | "total": time.time() - segment_start_time |
| | } |
| | } |
| | |
| | |
| | os.remove(segment_path) |
| | |
| | logger.info( |
| | f"Segment {segment_idx + 1}/{total_segments} ({start_time}-{end_time}s) - " |
| | f"FFmpeg: {ffmpeg_time:.2f}s, Inference: {inference_time:.2f}s" |
| | ) |
| | |
| | |
| | os.rmdir(temp_dir) |
| | |
| | except Exception as e: |
| | logger.error(f"Error processing video: {str(e)}", exc_info=True) |
| | raise |