Spaces:
Runtime error
Runtime error
import subprocess | |
from pathlib import Path | |
from typing import List | |
import webvtt | |
import json | |
import torch | |
from llm_engineering.domain.video_chunks import EmbeddedVideoChunk | |
from llm_engineering.domain.queries import Query | |
from .multimodal_dispatcher import MultimodalEmbeddingDispatcher, ImageEmbedder, TextEmbedder | |
from qdrant_client import QdrantClient | |
from qdrant_client.models import PointStruct, VectorParams, Distance | |
from qdrant_client.http.exceptions import UnexpectedResponse | |
import uuid | |
import hashlib | |
import numpy as np | |
from typing import Set | |
import time | |
import psutil | |
from tqdm import tqdm | |
from contextlib import nullcontext | |
from PIL import Image, ImageDraw, ImageFont | |
import ffmpeg | |
# Make spacy optional | |
try: | |
import spacy | |
SPACY_AVAILABLE = True | |
except ImportError: | |
SPACY_AVAILABLE = False | |
print("Spacy not available, using simplified text processing") | |
import hashlib | |
import uuid | |
# Remove bertopic dependency | |
try: | |
from bertopic import BERTopic | |
BERTOPIC_AVAILABLE = True | |
except ImportError: | |
BERTOPIC_AVAILABLE = False | |
print("BERTopic not available, using simplified topic modeling") | |
from sentence_transformers import SentenceTransformer | |
class VideoIngester: | |
def __init__(self, video_root: str): | |
self.video_root = Path(video_root) | |
self.checkpoint_file = self.video_root / ".processed_videos.json" | |
self.processed_frames_file = self.video_root / ".processed_frames.json" | |
self.processed_frames = self._load_processed_frames() | |
self.processed_videos = self._load_checkpoint() | |
self.nlp = None | |
self.text_embedder = None | |
self.image_embedder = None | |
try: | |
# Use multi-threaded execution | |
cv2.setNumThreads(4) | |
except: | |
pass | |
# Initialize text embedder | |
try: | |
from llm_engineering.application.rag.multimodal_dispatcher import TextEmbedder, ImageEmbedder | |
self.text_embedder = TextEmbedder() | |
self.image_embedder = ImageEmbedder() | |
print("Initialized embedders") | |
except Exception as e: | |
print("Failed to load embedders: {}".format(e)) | |
# Load NLP if spaCy is available | |
if SPACY_AVAILABLE: | |
try: | |
import spacy | |
# Use smaller model for efficiency | |
try: | |
self.nlp = spacy.load("en_core_web_sm") | |
except: | |
# Download model if not found | |
spacy.cli.download("en_core_web_sm") | |
self.nlp = spacy.load("en_core_web_sm") | |
print("Loaded NLP model") | |
except Exception as e: | |
print("NLP model unavailable: {}. Using fallbacks.".format(e)) | |
# Try to load BERTopic | |
self.topic_model = None | |
if BERTOPIC_AVAILABLE: | |
try: | |
from bertopic import BERTopic | |
# Use minimal model | |
self.topic_model = BERTopic(verbose=True) | |
print("Loaded BERTopic") | |
except Exception as e: | |
print("BERTopic unavailable: {}".format(e)) | |
# Use CLIP-based text encoder for consistent embedding dimensions | |
# instead of sentence-transformers which has different dimensions | |
self.sentence_model = self.image_embedder | |
def _merge_subtitles(self, subtitles: List[dict]) -> List[dict]: | |
"""Merge adjacent subtitles into larger chunks for better context""" | |
merged = [] | |
if not subtitles: | |
return merged | |
current_text = [subtitles[0]["text"]] | |
current_start = subtitles[0]["start"] | |
current_end = subtitles[0]["end"] | |
# Configure max merge duration | |
max_duration = 30.0 # Maximum duration for merged subtitles in seconds | |
for i in range(1, len(subtitles)): | |
sub = subtitles[i] | |
# Check if this subtitle is within a reasonable time gap (2 seconds) of the previous one | |
time_gap = sub["start"] - current_end | |
duration_so_far = current_end - current_start | |
if time_gap <= 2.0 and duration_so_far < max_duration: | |
# Continue merging | |
current_text.append(sub["text"]) | |
current_end = sub["end"] | |
else: | |
# Merge complete, add to results and start a new segment | |
merged.append({ | |
"start": current_start, | |
"end": current_end, | |
"text": " ".join(current_text) | |
}) | |
current_text = [sub["text"]] | |
current_start = sub["start"] | |
current_end = sub["end"] | |
# Don't forget the last segment | |
if current_text: | |
merged.append({ | |
"start": current_start, | |
"end": current_end, | |
"text": " ".join(current_text) | |
}) | |
print("Merged {} subtitle entries into {} chunks".format(len(subtitles), len(merged))) | |
return merged | |
def process_video_library(self, force_reprocess: bool = False): | |
"""Process all videos in the root directory""" | |
if not self.video_root.exists(): | |
print("Error: Video root directory does not exist: {}".format(self.video_root)) | |
return | |
print("Processing videos from: {}".format(self.video_root)) | |
# Load checkpoint if exists | |
self.processed_videos = self._load_checkpoint() | |
print("Already processed {} videos".format(len(self.processed_videos))) | |
# Debug output to see which videos were already processed | |
if self.processed_videos: | |
print("Previously processed videos:") | |
for vid in sorted(self.processed_videos): | |
print(" - {}".format(vid)) | |
# Get list of folders containing mp4 files | |
folders = [] | |
for path in self.video_root.glob("*"): | |
if path.is_dir(): | |
mp4_files = list(path.glob("*.mp4")) | |
if mp4_files: | |
folders.append(path) | |
print("Found {} video folders".format(len(folders))) | |
# Count how many will be processed | |
to_process = [f for f in folders if force_reprocess or f.name not in self.processed_videos] | |
print("Will process {} videos ({} skipped)".format( | |
len(to_process), len(folders) - len(to_process))) | |
# Process each folder | |
start_time = time.time() | |
for i, folder in enumerate(folders): | |
folder_id = folder.name | |
# Skip if already processed and not forced to reprocess | |
if folder_id in self.processed_videos and not force_reprocess: | |
print("Skipping {} (already processed)".format(folder_id)) | |
continue | |
try: | |
print("\n[{}/{}] Processing {}".format(i+1, len(folders), folder_id)) | |
# Log resource utilization | |
self._log_resources() | |
# Process the folder | |
self._process_video_folder(folder) | |
# Add to processed list and update checkpoint | |
self.processed_videos.add(folder_id) | |
self._save_checkpoint() | |
# Estimate remaining time | |
elapsed = time.time() - start_time | |
videos_left = len(to_process) - (i + 1) | |
videos_processed = i + 1 | |
if videos_processed > 0: | |
avg_time_per_video = elapsed / videos_processed | |
eta = avg_time_per_video * videos_left | |
eta_str = self._format_eta(eta) | |
print("\nProgress: {}/{} videos ({:.1f}%)".format( | |
videos_processed, len(to_process), | |
100.0 * videos_processed / len(to_process) | |
)) | |
print("Elapsed: {}, Avg: {:.1f}s/video, ETA: {}".format( | |
self._format_eta(elapsed), | |
avg_time_per_video, | |
eta_str | |
)) | |
except Exception as e: | |
print("Error processing {}: {}".format(folder_id, str(e))) | |
# Save checkpoint to avoid reprocessing the same video | |
self._save_checkpoint() | |
print("\nAll videos processed!") | |
print("Total processed videos: {}".format(len(self.processed_videos))) | |
return | |
def _accelerated_frame_extraction(self, mp4_path: Path, subtitles: List[dict]) -> List[Path]: | |
"""Hardware-optimized frame extraction""" | |
frame_dir = mp4_path.parent / "frames" | |
frame_dir.mkdir(exist_ok=True) | |
# Check if there are already frames in the directory | |
existing_frames = sorted(frame_dir.glob("*.jpg")) | |
if existing_frames: | |
print("Found {} existing frames, skipping extraction".format(len(existing_frames))) | |
return existing_frames | |
total_duration = sum(sub["end"] - sub["start"] for sub in subtitles) | |
with tqdm(total=total_duration, desc="Extracting frames", leave=False) as pbar: | |
# Try to find ffmpeg in common locations | |
ffmpeg_cmd = None | |
for cmd in ["/opt/homebrew/bin/ffmpeg", "ffmpeg", "/usr/local/bin/ffmpeg", "/usr/bin/ffmpeg"]: | |
try: | |
# Check if the command exists | |
subprocess.run([cmd, "-version"], capture_output=True, check=True) | |
ffmpeg_cmd = cmd | |
print("Found ffmpeg at: {}".format(ffmpeg_cmd)) | |
break | |
except (subprocess.SubprocessError, FileNotFoundError): | |
continue | |
if not ffmpeg_cmd: | |
print("WARNING: ffmpeg not found, using manual frame extraction") | |
return self._manual_frame_extraction(mp4_path, subtitles) | |
cmd = [ | |
ffmpeg_cmd, | |
"-y", # Overwrite output files without asking | |
"-i", str(mp4_path), | |
"-vf", "fps=1", | |
"-vsync", "0", | |
str(frame_dir / "frame_%04d.jpg") | |
] | |
try: | |
result = subprocess.run(cmd, capture_output=True, text=True, check=True) | |
# Process the output to update progress | |
for line in result.stderr.split('\n'): | |
if "frame=" in line: | |
try: | |
# Extract frame number and update progress | |
frame_num = int(line.split("frame=")[1].split()[0]) | |
pbar.update(1) | |
except (ValueError, IndexError): | |
pass | |
except subprocess.CalledProcessError as e: | |
print("FFmpeg error: {}".format(e.stderr)) | |
print("Falling back to manual frame extraction") | |
return self._manual_frame_extraction(mp4_path, subtitles) | |
except FileNotFoundError: | |
print("FFmpeg not found, falling back to manual frame extraction") | |
return self._manual_frame_extraction(mp4_path, subtitles) | |
return sorted(frame_dir.glob("*.jpg")) | |
def _manual_frame_extraction(self, mp4_path: Path, subtitles: List[dict]) -> List[Path]: | |
"""Fallback method when ffmpeg is not available - create placeholder image files""" | |
print("Using manual frame extraction (FALLBACK MODE)") | |
frame_dir = mp4_path.parent / "frames" | |
frame_dir.mkdir(exist_ok=True) | |
# Try importing PIL for image creation | |
try: | |
from PIL import Image, ImageDraw, ImageFont | |
can_create_images = True | |
print("PIL is available for image creation") | |
except ImportError: | |
can_create_images = False | |
print("PIL not available, will create empty placeholder files") | |
# For each subtitle, create a simple blank image for each second | |
frame_paths = [] | |
# Ensure at least one frame is created even if no subtitles | |
if not subtitles: | |
print("No subtitles provided, creating a single frame") | |
subtitles = [{"start": 0, "end": 1, "text": "No subtitle data"}] | |
with tqdm(total=len(subtitles), desc="Creating placeholder frames", leave=False) as pbar: | |
for subtitle in subtitles: | |
try: | |
start_time = int(subtitle["start"]) | |
end_time = int(subtitle["end"]) | |
# Create one frame per second with a maximum of 5 frames per segment | |
frame_count = min(end_time - start_time + 1, 5) | |
seconds = list(range(start_time, end_time + 1)) | |
if frame_count < 5 and len(seconds) > 0: | |
# Use all available seconds | |
seconds_to_use = seconds | |
else: | |
# Sample evenly from the range | |
step = max(1, len(seconds) // 5) | |
seconds_to_use = seconds[::step][:5] # Take at most 5 | |
# Always ensure at least one frame | |
if not seconds_to_use and start_time <= end_time: | |
seconds_to_use = [start_time] | |
print("Creating {} placeholder frames for segment {}-{}".format( | |
len(seconds_to_use), start_time, end_time)) | |
for second in seconds_to_use: | |
frame_path = frame_dir / "frame_{:04d}.jpg".format(second) | |
# If the frame already exists, skip creation | |
if frame_path.exists(): | |
print("Frame already exists: {}".format(frame_path)) | |
frame_paths.append(frame_path) | |
continue | |
if can_create_images: | |
try: | |
# Create a white background | |
img = Image.new('RGB', (224, 224), color='white') | |
# Add timestamp and subtitle text | |
draw = ImageDraw.Draw(img) | |
# Add timestamp | |
draw.text((10, 10), "Timestamp: {}s".format(second), fill="black") | |
# Add subtitle text (wrap it if needed) | |
text = subtitle.get("text", "No text") | |
if len(text) > 30: | |
wrapped_text = "" | |
for i in range(0, len(text), 30): | |
wrapped_text += text[i:i+30] + "\n" | |
text = wrapped_text | |
draw.text((10, 40), text, fill="black") | |
# Save the image | |
img.save(str(frame_path), quality=85) | |
print("Created image frame: {}".format(frame_path)) | |
except Exception as e: | |
print("Failed to create image: {}".format(e)) | |
# Create an empty file as fallback | |
with open(frame_path, 'w') as f: | |
f.write("Placeholder for timestamp: {}s".format(second)) | |
else: | |
# Create an empty file as fallback | |
with open(frame_path, 'w') as f: | |
f.write("Placeholder for timestamp: {}s".format(second)) | |
frame_paths.append(frame_path) | |
except Exception as e: | |
print("Error in manual frame extraction: {}".format(e)) | |
# Ensure at least one frame is created even on error | |
timestamp = int(subtitle.get("start", 0)) | |
frame_path = frame_dir / "frame_{:04d}.jpg".format(timestamp) | |
with open(frame_path, 'w') as f: | |
f.write("Error placeholder for timestamp: {}s".format(timestamp)) | |
frame_paths.append(frame_path) | |
pbar.update(1) | |
if not frame_paths: | |
# Last resort - create at least one empty frame | |
print("No frames created, adding emergency placeholder") | |
frame_path = frame_dir / "frame_0000.jpg" | |
with open(frame_path, 'w') as f: | |
f.write("Emergency placeholder frame") | |
frame_paths.append(frame_path) | |
print("Created {} placeholder frames".format(len(frame_paths))) | |
return sorted(frame_paths) | |
def _create_frame_subtitle_map(self, frame_paths: List[Path], subtitles: List[dict]) -> dict: | |
frame_to_subtitle = {} | |
for i, sub in enumerate(subtitles): | |
start_frame = int(sub["start"]) | |
end_frame = int(sub["end"]) | |
for fn in range(start_frame, end_frame + 1): | |
frame_name = "frame_{:04d}.jpg".format(fn) | |
# Store the subtitle index instead of text | |
frame_to_subtitle[frame_name] = i | |
return frame_to_subtitle | |
def _clean_text_for_embedding(self, text): | |
"""Clean and prepare text for embedding""" | |
if not text: | |
return "" | |
try: | |
# Remove excessive whitespace and newlines | |
import re | |
text = re.sub(r'\s+', ' ', text).strip() | |
# Remove or replace problematic characters | |
text = re.sub(r'[^\w\s.,!?\'"-]', '', text) | |
return text | |
except Exception as e: | |
print("Error cleaning text: {}".format(e)) | |
return text.strip() if text else "" | |
def _optimized_embedding_processing(self, video_id: str, frame_paths: List[Path], | |
subtitles: List[dict], metadata: dict): | |
"""Process embeddings in batches to optimize memory usage""" | |
# Get sentences and embeddings using CLIP text encoder for consistency | |
chunks = [] | |
text_embedder = TextEmbedder() | |
if len(subtitles) == 0: | |
print("No subtitles found for {}".format(video_id)) | |
return [] | |
# Ensure embedders are initialized | |
try: | |
print("Initializing embedders") | |
if self.text_embedder is None: | |
print("Creating new text embedder") | |
self.text_embedder = TextEmbedder() | |
if self.image_embedder is None: | |
print("Creating new image embedder") | |
from llm_engineering.application.rag.multimodal_dispatcher import ImageEmbedder | |
self.image_embedder = ImageEmbedder() | |
except Exception as e: | |
print("Error initializing embedders: {}. Using simple TextEmbedder.".format(e)) | |
if self.text_embedder is None: | |
self.text_embedder = TextEmbedder() | |
# Reduce batch size for more stability | |
batch_size = 64 | |
# Print video processing status | |
print("Processing video {} with {} subtitles segments".format(video_id, len(subtitles))) | |
# Try to create a zero vector once for reuse | |
try: | |
zero_vector = [0.0] * 512 # CLIP uses 512 dimensions | |
except Exception: | |
zero_vector = None | |
# Process in smaller batches | |
for i in range(0, len(subtitles), batch_size): | |
batch_subtitles = subtitles[i:i+batch_size] | |
print("Processing subtitle batch {}/{} (segments {}-{})".format( | |
i//batch_size + 1, | |
(len(subtitles)-1)//batch_size + 1, | |
i, min(i+batch_size, len(subtitles)) | |
)) | |
current_batch_chunks = [] | |
for subtitle in batch_subtitles: | |
try: | |
# Extract frames for this segment - limit to max 3 frames per subtitle | |
frame_paths_for_segment = self._extract_frames( | |
Path(metadata["mp4_path"]), | |
subtitle["start"], | |
subtitle["end"] | |
) | |
# Limit number of frames to process | |
if frame_paths_for_segment and len(frame_paths_for_segment) > 3: | |
print("Limiting frames from {} to 3 for subtitle at {}".format( | |
len(frame_paths_for_segment), subtitle['start'])) | |
# Take first, middle and last frame | |
indices = [0, len(frame_paths_for_segment)//2, -1] | |
frame_paths_for_segment = [frame_paths_for_segment[i] for i in indices if i < len(frame_paths_for_segment)] | |
# Create chunk with cleaned text | |
original_content = subtitle["text"] | |
# Clean text for better embedding | |
content = self._clean_text_for_embedding(original_content) | |
# Skip empty content | |
if not content or not content.strip(): | |
print("Skipping empty content at time {}".format(subtitle["start"])) | |
continue | |
# Create a unique ID for this chunk | |
chunk_id = "{}_{}".format(video_id, int(subtitle["start"])) | |
# Create embeddings with better error handling | |
text_embedding = None | |
# First try with image embedder (CLIP) if text isn't too long | |
clip_succeeded = False | |
if self.image_embedder is not None and len(content) < 500: | |
try: | |
print("Encoding text with CLIP: {}...".format(content[:50])) | |
text_embedding = self.image_embedder.encode_text(content) | |
if text_embedding: | |
print("Text embedding done, dimension: {}".format(len(text_embedding))) | |
clip_succeeded = True | |
else: | |
print("Image embedder returned None, falling back") | |
text_embedding = None | |
except Exception as e: | |
print("Failed to embed text with CLIP: {}".format(e)) | |
text_embedding = None | |
elif self.image_embedder is not None: | |
print("Text too long for CLIP ({} chars), using fallback embedder".format(len(content))) | |
# Fall back to text embedder if needed | |
if text_embedding is None: | |
try: | |
print("Using sentence transformer for text") | |
if self.text_embedder: | |
text_embedding = self.text_embedder.encode(content) | |
else: | |
text_embedding = text_embedder.encode(content) | |
# Ensure we have 512 dimensions for compatibility | |
if text_embedding and len(text_embedding) != 512: | |
print("Adjusting dimensions from {} to 512".format(len(text_embedding))) | |
if len(text_embedding) < 512: | |
text_embedding = text_embedding + [0.0] * (512 - len(text_embedding)) | |
else: | |
text_embedding = text_embedding[:512] | |
print("Created text embedding with sentence transformer, dim: {}".format(len(text_embedding) if text_embedding else "None")) | |
except Exception as e: | |
print("Text embedding fallback failed: {}".format(e)) | |
# Last resort fallback - zero embedding | |
text_embedding = zero_vector or [0.0] * 512 | |
# Ensure embedding is valid | |
if not text_embedding or len(text_embedding) != 512: | |
print("Invalid embedding, using zeros") | |
text_embedding = zero_vector or [0.0] * 512 | |
# Create frame embeddings if possible | |
frame_embeddings = [] | |
if clip_succeeded: # Only attempt frame embeddings if CLIP text worked | |
for frame_idx, frame in enumerate(frame_paths_for_segment): | |
try: | |
if self.image_embedder is not None: | |
print("Encoding frame {}/{}".format(frame_idx + 1, len(frame_paths_for_segment))) | |
embedding = self.image_embedder.encode(str(frame)) | |
if embedding is not None: | |
frame_embeddings.append(embedding) | |
except Exception as e: | |
print("Error embedding frame {}: {}".format(frame, e)) | |
else: | |
print("Skipping frame embeddings since CLIP failed with text") | |
# Create a chunk | |
try: | |
chunk = EmbeddedVideoChunk( | |
video_id=video_id, | |
document_id=chunk_id, | |
start_time=subtitle["start"], | |
end_time=subtitle["end"], | |
content=content, | |
embedding=text_embedding, | |
frame_paths=[str(p) for p in frame_paths_for_segment] if frame_paths_for_segment else [], | |
frame_embeddings=frame_embeddings if frame_embeddings else [[0.0] * 512], # Match CLIP dimension | |
author_id=metadata.get("uploader", "unknown").replace(" ", "_").lower(), | |
author_full_name=metadata.get("uploader", "unknown") | |
) | |
current_batch_chunks.append(chunk) | |
print("Created chunk for segment {}-{}, content: {}...".format( | |
subtitle["start"], subtitle["end"], content[:50])) | |
except Exception as e: | |
print("Failed to create chunk object: {}".format(e)) | |
except Exception as e: | |
print("Error processing segment {}-{}: {}".format( | |
subtitle["start"], subtitle["end"], e)) | |
# Process current batch if we have chunks | |
if current_batch_chunks: | |
chunks.extend(current_batch_chunks) | |
# Store chunks after each batch to avoid memory buildup | |
try: | |
print("Storing batch of {} chunks to Qdrant".format(len(current_batch_chunks))) | |
self._store_chunks(current_batch_chunks) | |
print("Memory cleared after storing batch") | |
except Exception as e: | |
print("Error storing chunks: {}".format(e)) | |
# Clear batch to free memory | |
current_batch_chunks = [] | |
return chunks # Return any remaining chunks that weren't stored | |
def _log_resources(self): | |
"""System resource monitoring""" | |
mem = psutil.virtual_memory() | |
print("\nSystem Resources | CPU: {}% | " | |
"Memory: {:.1f}/{:.1f}GB | " | |
"GPU Memory: {:.1f}GB".format( | |
psutil.cpu_percent(), | |
mem.used/1e9, | |
mem.total/1e9, | |
self._get_gpu_memory() | |
)) | |
def _get_gpu_memory(self) -> float: | |
"""Get unified memory usage""" | |
return psutil.virtual_memory().used / 1e9 | |
def _format_eta(self, seconds: float) -> str: | |
return time.strftime("%H:%M:%S", time.gmtime(seconds)) | |
def _load_checkpoint(self) -> Set[str]: | |
if self.checkpoint_file.exists(): | |
try: | |
with open(self.checkpoint_file) as f: | |
return set(json.load(f)) | |
except (json.JSONDecodeError, IOError): | |
print("Corrupted checkpoint file, resetting...") | |
return set() | |
return set() | |
def _save_checkpoint(self): | |
"""Save the set of processed video IDs to checkpoint file""" | |
with open(self.checkpoint_file, "w") as f: | |
# Don't reload the checkpoint, use the current processed_videos set | |
json.dump(list(self.processed_videos), f) | |
print("Saved checkpoint with {} processed videos".format(len(self.processed_videos))) | |
def _process_video_folder(self, folder: Path): | |
"""Process a single video folder""" | |
# Load video metadata | |
video_id = folder.name | |
print("Processing video folder: {}".format(video_id)) | |
try: | |
# Phase 1: Load metadata and subtitles | |
print("Phase 1: Loading metadata and subtitles") | |
metadata = self._load_metadata(folder) | |
# Find VTT file | |
vtt_files = list(folder.glob("*.vtt")) | |
if not vtt_files: | |
print("No VTT subtitle file found for {}".format(video_id)) | |
return | |
subtitles = self._parse_subtitles(vtt_files[0]) | |
print("Loaded {} subtitle entries".format(len(subtitles))) | |
# Merge adjacent subtitles for better context | |
merged_subtitles = self._merge_subtitles(subtitles) | |
print("Merged to {} subtitle entries".format(len(merged_subtitles))) | |
# Phase 2: Find MP4 file | |
mp4_files = list(folder.glob("*.mp4")) | |
if not mp4_files: | |
print("No MP4 file found for {}".format(video_id)) | |
return | |
mp4_path = mp4_files[0] | |
metadata["mp4_path"] = str(mp4_path) # Store MP4 path in metadata | |
print("Using video file: {}".format(mp4_path)) | |
# Phase 3: Process video chunks directly with optimized method | |
print("Phase 3: Processing video chunks with optimized method") | |
remaining_chunks = self._optimized_embedding_processing(video_id, [], merged_subtitles, metadata) | |
# Store any remaining chunks | |
if remaining_chunks: | |
print("Storing {} remaining chunks".format(len(remaining_chunks))) | |
self._store_chunks(remaining_chunks) | |
print("Successfully processed video {}".format(video_id)) | |
except Exception as e: | |
print("Error in _process_video_folder for {}: {}".format(video_id, e)) | |
raise | |
def _load_metadata(self, folder: Path) -> dict: | |
info_json = next(folder.glob("*.info.json")) | |
with open(info_json) as f: | |
metadata = json.load(f) | |
metadata.setdefault("uploader", "unknown_author") | |
return metadata | |
def _parse_subtitles(self, vtt_path: Path) -> List[dict]: | |
captions = webvtt.read(vtt_path) | |
print("Raw subtitles found: {}".format(len(captions))) | |
valid_captions = [] | |
for caption in captions: | |
print("Caption: {} -> {}: {}...".format(caption.start, caption.end, caption.text[:50])) | |
if caption.end_in_seconds > caption.start_in_seconds: | |
valid_captions.append({ | |
"start": caption.start_in_seconds, | |
"end": caption.end_in_seconds, | |
"text": caption.text | |
}) | |
print("Valid subtitles: {}".format(len(valid_captions))) | |
return valid_captions | |
def _create_chunks(self, video_id: str, mp4_path: Path, subtitles: List[dict], metadata: dict): | |
"""Process subtitles and extract frames for each chunk""" | |
if len(subtitles) == 0: | |
print("No subtitles found for {}".format(video_id)) | |
return [] | |
# Process sentences with NLP for better chunking if available | |
chunks = [] | |
# Extract sentences with fallback for missing NLP | |
sentences = [] | |
if self.nlp is not None: | |
try: | |
# Join all subtitles and process as one document | |
full_text = " ".join([s["text"] for s in subtitles]) | |
doc = self.nlp(full_text) | |
sentences = [str(sent) for sent in doc.sents] | |
except Exception as e: | |
print("Error in NLP processing: {}".format(e)) | |
sentences = [s["text"] for s in subtitles] | |
else: | |
# Simple sentence splitting by punctuation | |
sentences = [s["text"] for s in subtitles] | |
# Create chunks | |
for subtitle in subtitles: | |
try: | |
# Extract frames for this segment | |
frame_paths = self._extract_frames(mp4_path, subtitle["start"], subtitle["end"]) | |
# Create chunk | |
content = subtitle["text"] | |
# Skip empty content | |
if not content.strip(): | |
continue | |
# Create a unique ID for this chunk | |
chunk_id = "{}_{}".format(video_id, int(subtitle["start"])) | |
# Create embeddings | |
text_embedding = None | |
if self.image_embedder is not None: | |
try: | |
text_embedding = self.image_embedder.encode_text(content) | |
except Exception as e: | |
print("Failed to embed text: {}".format(e)) | |
if text_embedding is None: | |
# Fallback to text embedder | |
try: | |
text_embedding = self.text_embedder.encode(content) | |
except Exception: | |
# Last resort fallback | |
text_embedding = [0.0] * 384 | |
# Create frame embeddings if possible | |
frame_embeddings = [] | |
for frame in frame_paths: | |
try: | |
if self.image_embedder is not None: | |
embedding = self.image_embedder.encode(str(frame)) | |
frame_embeddings.append(embedding) | |
except Exception as e: | |
print("Error embedding frame {}: {}".format(frame, e)) | |
# Create a chunk | |
chunk = EmbeddedVideoChunk( | |
video_id=video_id, | |
document_id=chunk_id, | |
start_time=subtitle["start"], | |
end_time=subtitle["end"], | |
content=content, | |
embedding=text_embedding, | |
frame_paths=[str(p) for p in frame_paths], | |
frame_embeddings=frame_embeddings if frame_embeddings else [[0.0] * 768], # Add fallback empty vector | |
author_id=metadata.get("uploader", "unknown").replace(" ", "_").lower(), | |
author_full_name=metadata.get("uploader", "unknown") | |
) | |
chunks.append(chunk) | |
except Exception as e: | |
print("Error creating chunk for segment {}-{}: {}".format( | |
subtitle["start"], subtitle["end"], e)) | |
return chunks | |
def _extract_frames(self, video_path: Path, start: float, end: float) -> List[Path]: | |
frame_dir = video_path.parent / "frames" | |
print("Extracting frames to: {}".format(frame_dir)) | |
frame_dir.mkdir(exist_ok=True) | |
# Try to find ffmpeg in common locations on macOS | |
ffmpeg_locations = [ | |
"ffmpeg", # if it's in PATH | |
"/opt/homebrew/bin/ffmpeg", # Homebrew on Apple Silicon | |
"/usr/local/bin/ffmpeg", # Homebrew on Intel Mac | |
"/usr/bin/ffmpeg", # System-installed | |
"/opt/local/bin/ffmpeg" # MacPorts | |
] | |
ffmpeg_cmd = None | |
for cmd in ffmpeg_locations: | |
try: | |
# Test if the command is available | |
result = subprocess.run([cmd, "-version"], | |
capture_output=True, | |
text=True, | |
check=False) | |
if result.returncode == 0: | |
ffmpeg_cmd = cmd | |
print("Found ffmpeg at: {}".format(ffmpeg_cmd)) | |
break | |
except FileNotFoundError: | |
continue | |
if ffmpeg_cmd is None: | |
print("WARNING: ffmpeg not found in any location. Using fallback method.") | |
return self._manual_frame_extraction(video_path, [{"start": start, "end": end, "text": ""}]) | |
# Continue with ffmpeg if found | |
cmd = [ | |
ffmpeg_cmd, | |
"-y", # Overwrite output files without asking | |
"-ss", str(start), | |
"-to", str(end), | |
"-i", str(video_path), | |
"-vf", "fps=1", | |
str(frame_dir / "frame_%04d.jpg") | |
] | |
print("Running ffmpeg command: {}".format(" ".join(cmd))) | |
try: | |
result = subprocess.run(cmd, capture_output=True, check=True, text=True) | |
# Check for errors | |
if result.stderr: | |
print("FFmpeg output: {}".format(result.stderr)) | |
except subprocess.CalledProcessError as e: | |
print("FFmpeg error: {}".format(e.stderr)) | |
print("Falling back to manual frame extraction") | |
return self._manual_frame_extraction(video_path, [{"start": start, "end": end, "text": ""}]) | |
frames = sorted(frame_dir.glob("*.jpg")) | |
print("Extracted {} frames".format(len(frames))) | |
return frames | |
def _store_chunks(self, chunks: List[EmbeddedVideoChunk]): | |
# Use a direct connection to Qdrant with the specified storage path | |
from qdrant_client import QdrantClient | |
qdrant_storage_path = "/Users/yufeizhen/Desktop/project/qdrant_storage" | |
# Ensure the storage directory exists | |
import os | |
os.makedirs(os.path.dirname(qdrant_storage_path), exist_ok=True) | |
# Create a direct connection to specified path | |
try: | |
client = QdrantClient(path=qdrant_storage_path) | |
print("Established direct connection to Qdrant storage at: {}".format(qdrant_storage_path)) | |
except Exception as e: | |
print("Error connecting to Qdrant storage, falling back to connection singleton: {}".format(e)) | |
# Fall back to the connection singleton if direct connection fails | |
from llm_engineering.infrastructure.db.qdrant import connection | |
client = connection | |
collection_name = "video_chunks" | |
if not chunks: | |
print("Warning: No chunks to store") | |
return | |
# Create points payload first | |
points = [] | |
skipped_chunks = 0 | |
print("Processing {} chunks for storage".format(len(chunks))) | |
for chunk in chunks: | |
try: | |
# Debug print chunk properties | |
print("Processing chunk with ID: {}, video_id: {}, start_time: {}".format( | |
chunk.document_id, chunk.video_id, chunk.start_time)) | |
# Ensure embedding is exactly 512 dimensions for CLIP | |
embedding = chunk.embedding | |
if embedding is None: | |
print("Warning: Chunk has None embedding, skipping") | |
skipped_chunks += 1 | |
continue | |
if not isinstance(embedding, list): | |
print("Warning: Embedding is not a list, converting") | |
try: | |
embedding = embedding.tolist() | |
except: | |
print("Failed to convert embedding to list, skipping chunk") | |
skipped_chunks += 1 | |
continue | |
if len(embedding) != 512: | |
print("Embedding dimension mismatch: {} (should be 512)".format(len(embedding))) | |
# Try to pad or truncate | |
if len(embedding) < 512: | |
print("Padding embedding from {} to 512 dimensions".format(len(embedding))) | |
embedding = embedding + [0.0] * (512 - len(embedding)) | |
else: | |
print("Truncating embedding from {} to 512 dimensions".format(len(embedding))) | |
embedding = embedding[:512] | |
# Create a unique ID based on video and timestamp | |
unique_str = "{}_{}".format(chunk.video_id, chunk.start_time) | |
hash_obj = hashlib.sha256(unique_str.encode()).hexdigest() | |
point_uuid = uuid.UUID(hash_obj[:32]) | |
# Validate that chunk content is not empty | |
if not chunk.content or not chunk.content.strip(): | |
print("Warning: Empty content in chunk, using placeholder") | |
content = "Empty content at timestamp {}".format(chunk.start_time) | |
else: | |
content = chunk.content | |
points.append(PointStruct( | |
id=str(point_uuid), | |
vector=embedding, | |
payload={ | |
"text": content, | |
"start": chunk.start_time, | |
"end": chunk.end_time, | |
"video_id": chunk.video_id, | |
"metadata": { | |
"topics": [], | |
"sentence_hash": hashlib.md5(content.encode()).hexdigest() | |
} | |
} | |
)) | |
except Exception as e: | |
print("Error processing chunk: {}".format(e)) | |
skipped_chunks += 1 | |
if skipped_chunks > 0: | |
print("Skipped {} chunks due to errors".format(skipped_chunks)) | |
if not points: | |
print("No valid points to store after processing") | |
return | |
print("Prepared {} valid points for storage".format(len(points))) | |
try: | |
# Check if Qdrant client is properly initialized | |
if client is None: | |
raise ValueError("Qdrant client is None, check connection setup") | |
# Create collection if not exists | |
try: | |
if not client.collection_exists(collection_name): | |
print("Creating collection '{}' with 512-dimensional vectors".format(collection_name)) | |
client.recreate_collection( | |
collection_name=collection_name, | |
vectors_config=VectorParams( | |
size=512, | |
distance=Distance.COSINE | |
) | |
) | |
else: | |
print("Collection '{}' already exists".format(collection_name)) | |
except Exception as e: | |
print("Error checking/creating collection: {}".format(e)) | |
raise | |
# Batch insert with progress and retry mechanism | |
batch_size = 64 | |
max_retries = 3 | |
for i in range(0, len(points), batch_size): | |
batch = points[i:i+batch_size] | |
retry_count = 0 | |
while retry_count < max_retries: | |
try: | |
print("Storing batch {} of {} ({} points)".format( | |
i//batch_size + 1, | |
(len(points)-1)//batch_size + 1, | |
len(batch) | |
)) | |
client.upsert( | |
collection_name=collection_name, | |
points=batch, | |
wait=True # Wait for the operation to complete | |
) | |
print("Successfully stored batch {} of {}".format( | |
i//batch_size + 1, | |
(len(points)-1)//batch_size + 1 | |
)) | |
break # Successfully stored, break the retry loop | |
except UnexpectedResponse as e: | |
# Specific handling for connection reset and other API errors | |
retry_count += 1 | |
print("Qdrant API error: {} - retrying batch {} (attempt {}/{})...".format( | |
str(e), i//batch_size + 1, retry_count, max_retries)) | |
import time | |
time.sleep(3 * retry_count) # Exponential backoff | |
except Exception as e: | |
if "Connection reset by peer" in str(e) and retry_count < max_retries - 1: | |
retry_count += 1 | |
print("Connection reset, retrying batch {} (attempt {}/{})...".format( | |
i//batch_size + 1, retry_count, max_retries)) | |
import time | |
time.sleep(3 * retry_count) # Exponential backoff | |
else: | |
# If it's not a connection reset or we've used all retries, re-raise | |
print("Fatal error storing batch: {}".format(str(e))) | |
raise | |
# Verify storage by counting points | |
try: | |
count = client.count(collection_name=collection_name) | |
print("Successfully stored {} chunks. Collection now contains {} points".format( | |
len(points), count.count)) | |
except Exception as e: | |
print("Note: Stored points but couldn't verify count: {}".format(e)) | |
except Exception as e: | |
print("Storage error: {}".format(str(e))) | |
import traceback | |
traceback.print_exc() | |
raise | |
def _load_processed_frames(self) -> dict: | |
if self.processed_frames_file.exists(): | |
with open(self.processed_frames_file) as f: | |
return json.load(f) | |
return {} | |
def _save_processed_frames(self): | |
with open(self.processed_frames_file, "w") as f: | |
json.dump(self.processed_frames, f) | |