purpleriann's picture
Upload folder using huggingface_hub
a22e84b verified
from llm_engineering.domain.queries import Query
from .topic_retriever import TopicAwareRetriever
from .video_processor import VideoClipper
from pathlib import Path
import subprocess
import os
import traceback
import time
class VideoQAEngine:
def __init__(self, video_root: str, qdrant_storage_path="/Users/yufeizhen/Desktop/project/qdrant_storage"):
print("Initializing VideoQAEngine")
print("Video root: {}".format(video_root))
print("Qdrant storage path: {}".format(qdrant_storage_path))
# Verify video root exists
if not os.path.exists(video_root):
print("WARNING: Video root directory not found: {}".format(video_root))
# Ensure Qdrant storage path exists
os.makedirs(os.path.dirname(qdrant_storage_path), exist_ok=True)
# Initialize components with retry logic
retry_count = 0
max_retries = 3
while retry_count < max_retries:
try:
self.retriever = TopicAwareRetriever(qdrant_storage_path=qdrant_storage_path)
self.clipper = VideoClipper()
self.video_root = Path(video_root)
print("VideoQAEngine initialized successfully")
break
except Exception as e:
retry_count += 1
print("Error initializing components (attempt {}/{}): {}".format(
retry_count, max_retries, e))
if retry_count >= max_retries:
print("Failed to initialize components after {} attempts".format(max_retries))
raise
time.sleep(2)
def ask(self, question: str, output_dir: str = "clips"):
print("\n--- Processing query: '{}' ---".format(question))
try:
# Create a Query object
query = Query.from_str(question)
# Perform retrieval with diagnostics
print("Retrieving relevant video segments...")
start_time = time.time()
results = self.retriever.retrieve(query.content)
retrieval_time = time.time() - start_time
print("Retrieval completed in {:.2f} seconds".format(retrieval_time))
# Create output directory if it doesn't exist
os.makedirs(output_dir, exist_ok=True)
# Handle empty results
if not results:
print("No results found for query: '{}'".format(question))
return []
print("Found {} relevant video segments".format(len(results)))
# Process each result to create clips
clips = []
for i, result in enumerate(results):
print("\nProcessing result {}/{}:".format(i+1, len(results)))
print(" Video ID: {}".format(result["video_id"]))
print(" Timestamps: {:.1f}s - {:.1f}s".format(result["start"], result["end"]))
print(" Score: {:.4f}".format(result["score"]))
# Check if video file exists
video_path = self.video_root / result["video_id"] / "video.mp4"
if not video_path.exists():
# Try alternative filename patterns
alt_paths = list(self.video_root.glob("{}/*.mp4".format(result["video_id"])))
if alt_paths:
video_path = alt_paths[0]
print(" Found alternative video path: {}".format(video_path))
else:
print(" ERROR: Video file not found at {}".format(video_path))
continue
# Create unique output path
output_path = Path(output_dir) / "clip_{}_{}_{:.3f}.mp4".format(
result['video_id'],
int(result["start"]),
result["score"]
)
try:
print(" Creating clip to: {}".format(output_path))
self.clipper.create_clip(video_path, result["start"], result["end"], output_path)
print(" Clip created successfully")
# If clip was created successfully, add to results
clips.append({
"path": output_path,
"timestamps": (result["start"], result["end"]),
"score": result["score"],
"text": result.get("text", ""), # Include text for context
"video_id": result["video_id"]
})
except (subprocess.SubprocessError, FileNotFoundError) as e:
print(" ERROR: Could not create video clip: {}".format(e))
# Create a placeholder info file instead of a video clip
info_path = output_path.with_suffix('.txt')
with open(info_path, 'w') as f:
f.write("Video: {}\n".format(result['video_id']))
f.write("Time: {:.1f}s - {:.1f}s\n".format(result['start'], result['end']))
f.write("Text: {}\n".format(result.get('text', '')))
f.write("Score: {:.4f}\n".format(result['score']))
f.write("Error: {}\n".format(str(e)))
print(" Created info file instead: {}".format(info_path))
# Add text-only result
clips.append({
"path": info_path,
"timestamps": (result["start"], result["end"]),
"score": result["score"],
"text": result.get("text", ""),
"video_id": result["video_id"]
})
print("\nProcessed {} clips successfully".format(len(clips)))
return clips
except Exception as e:
print("Error in VideoQAEngine.ask: {}".format(e))
traceback.print_exc()
return []