smollvm / src /video_processor /processor.py
youssef
fix dockerfile
c9f0527
import torch
from transformers import AutoProcessor, AutoModelForImageTextToText
from typing import List, Dict
import logging
import os
import subprocess
import json
import tempfile
import time
logger = logging.getLogger(__name__)
def _grab_best_device(use_gpu=True):
if torch.cuda.device_count() > 0 and use_gpu:
device = "cuda"
else:
device = "cpu"
return device
def get_video_duration_seconds(video_path: str) -> float:
"""Use ffprobe to get video duration in seconds."""
cmd = [
"ffprobe",
"-v", "quiet",
"-print_format", "json",
"-show_format",
video_path
]
result = subprocess.run(cmd, capture_output=True, text=True)
info = json.loads(result.stdout)
return float(info["format"]["duration"])
def format_duration(seconds: int) -> str:
minutes = seconds // 60
secs = seconds % 60
return f"{minutes:02d}:{secs:02d}"
DEVICE = _grab_best_device()
logger.info(f"Using device: {DEVICE}")
class VideoAnalyzer:
def __init__(self):
if not torch.cuda.is_available():
raise RuntimeError("CUDA is required but not available!")
logger.info("Initializing VideoAnalyzer")
self.model_path = "HuggingFaceTB/SmolVLM2-500M-Video-Instruct"
logger.info(f"Loading model from {self.model_path} - Using device: {DEVICE}")
# Load processor and model
self.processor = AutoProcessor.from_pretrained(self.model_path)
self.model = AutoModelForImageTextToText.from_pretrained(
self.model_path,
torch_dtype=torch.bfloat16,
device_map=DEVICE,
_attn_implementation="flash_attention_2",
low_cpu_mem_usage=True,
).to(DEVICE)
# Compile model for faster inference
self.model = torch.compile(self.model, mode="reduce-overhead")
logger.info(f"Model loaded and compiled on device: {self.model.device}")
def analyze_segment(self, video_path: str, start_time: float) -> str:
"""Analyze a single video segment."""
messages = [
{
"role": "system",
"content": [
{
"type": "text",
"text": (
"You are an AI specialized in video content analysis. "
"Your task is to watch the provided video segment and generate a detailed, structured description focusing on the following elements:\n"
"1. **People and Their Actions:** Identify all individuals, their appearances, and describe their activities or interactions.\n"
"2. **Environment and Setting:** Describe the location, time of day, weather conditions, and any notable background details.\n"
"3. **Objects and Their Positions:** List prominent objects, their attributes, and spatial relationships within the scene.\n"
"4. **On-Screen Text:** Transcribe any visible text, including signs, labels, or subtitles, and specify their locations.\n"
"5. **Key Events and Timing:** Outline significant events, actions, or changes, along with their timestamps.\n\n"
"Provide the information in a clear and concise manner, using bullet points or numbered lists where appropriate."
)
}
]
},
{
"role": "user",
"content": [
{"type": "video", "path": video_path},
{
"type": "text",
"text": (
"Please analyze the attached video segment and provide a structured description as per the guidelines above. "
"If certain elements are not present in the video, you may omit them from your response."
)
}
]
}
]
inputs = self.processor.apply_chat_template(
messages,
add_generation_prompt=True,
tokenize=True,
return_dict=True,
return_tensors="pt"
).to(DEVICE, dtype=torch.bfloat16)
with torch.inference_mode():
outputs = self.model.generate(
**inputs,
do_sample=True,
temperature=0.7,
max_new_tokens=256,
)
return self.processor.batch_decode(outputs, skip_special_tokens=True)[0].split("Assistant: ")[-1]
def process_video(self, video_path: str, segment_length: int = 10) -> List[Dict]:
try:
# Create temp directory for segments
temp_dir = tempfile.mkdtemp()
# Get video duration
duration = get_video_duration_seconds(video_path)
total_segments = (int(duration) + segment_length - 1) // segment_length
logger.info(f"Processing {total_segments} segments for video of length {duration:.2f} seconds")
# Process video in segments
for segment_idx in range(total_segments):
segment_start_time = time.time()
start_time = segment_idx * segment_length
end_time = min(start_time + segment_length, duration)
# Skip if we've reached the end
if start_time >= duration:
break
# Create segment - Optimized ffmpeg settings
segment_path = os.path.join(temp_dir, f"segment_{start_time}.mp4")
cmd = [
"ffmpeg",
"-y",
"-i", video_path,
"-ss", str(start_time),
"-t", str(segment_length),
"-c:v", "libx264",
"-preset", "ultrafast", # Use ultrafast preset for speed
"-pix_fmt", "yuv420p", # Ensure compatible pixel format
segment_path
]
ffmpeg_start = time.time()
subprocess.run(cmd, check=True)
ffmpeg_time = time.time() - ffmpeg_start
# Analyze segment
inference_start = time.time()
description = self.analyze_segment(segment_path, start_time)
inference_time = time.time() - inference_start
# Add segment info with timestamp
yield {
"timestamp": format_duration(int(start_time)),
"description": description,
"processing_times": {
"ffmpeg": ffmpeg_time,
"inference": inference_time,
"total": time.time() - segment_start_time
}
}
# Clean up segment file
os.remove(segment_path)
logger.info(
f"Segment {segment_idx + 1}/{total_segments} ({start_time}-{end_time}s) - "
f"FFmpeg: {ffmpeg_time:.2f}s, Inference: {inference_time:.2f}s"
)
# Clean up temp directory
os.rmdir(temp_dir)
except Exception as e:
logger.error(f"Error processing video: {str(e)}", exc_info=True)
raise