Spaces:
Runtime error
Runtime error
from llm_engineering.domain.queries import Query | |
from .topic_retriever import TopicAwareRetriever | |
from .video_processor import VideoClipper | |
from pathlib import Path | |
import subprocess | |
import os | |
import traceback | |
import time | |
class VideoQAEngine: | |
def __init__(self, video_root: str, qdrant_storage_path="/Users/yufeizhen/Desktop/project/qdrant_storage"): | |
print("Initializing VideoQAEngine") | |
print("Video root: {}".format(video_root)) | |
print("Qdrant storage path: {}".format(qdrant_storage_path)) | |
# Verify video root exists | |
if not os.path.exists(video_root): | |
print("WARNING: Video root directory not found: {}".format(video_root)) | |
# Ensure Qdrant storage path exists | |
os.makedirs(os.path.dirname(qdrant_storage_path), exist_ok=True) | |
# Initialize components with retry logic | |
retry_count = 0 | |
max_retries = 3 | |
while retry_count < max_retries: | |
try: | |
self.retriever = TopicAwareRetriever(qdrant_storage_path=qdrant_storage_path) | |
self.clipper = VideoClipper() | |
self.video_root = Path(video_root) | |
print("VideoQAEngine initialized successfully") | |
break | |
except Exception as e: | |
retry_count += 1 | |
print("Error initializing components (attempt {}/{}): {}".format( | |
retry_count, max_retries, e)) | |
if retry_count >= max_retries: | |
print("Failed to initialize components after {} attempts".format(max_retries)) | |
raise | |
time.sleep(2) | |
def ask(self, question: str, output_dir: str = "clips"): | |
print("\n--- Processing query: '{}' ---".format(question)) | |
try: | |
# Create a Query object | |
query = Query.from_str(question) | |
# Perform retrieval with diagnostics | |
print("Retrieving relevant video segments...") | |
start_time = time.time() | |
results = self.retriever.retrieve(query.content) | |
retrieval_time = time.time() - start_time | |
print("Retrieval completed in {:.2f} seconds".format(retrieval_time)) | |
# Create output directory if it doesn't exist | |
os.makedirs(output_dir, exist_ok=True) | |
# Handle empty results | |
if not results: | |
print("No results found for query: '{}'".format(question)) | |
return [] | |
print("Found {} relevant video segments".format(len(results))) | |
# Process each result to create clips | |
clips = [] | |
for i, result in enumerate(results): | |
print("\nProcessing result {}/{}:".format(i+1, len(results))) | |
print(" Video ID: {}".format(result["video_id"])) | |
print(" Timestamps: {:.1f}s - {:.1f}s".format(result["start"], result["end"])) | |
print(" Score: {:.4f}".format(result["score"])) | |
# Check if video file exists | |
video_path = self.video_root / result["video_id"] / "video.mp4" | |
if not video_path.exists(): | |
# Try alternative filename patterns | |
alt_paths = list(self.video_root.glob("{}/*.mp4".format(result["video_id"]))) | |
if alt_paths: | |
video_path = alt_paths[0] | |
print(" Found alternative video path: {}".format(video_path)) | |
else: | |
print(" ERROR: Video file not found at {}".format(video_path)) | |
continue | |
# Create unique output path | |
output_path = Path(output_dir) / "clip_{}_{}_{:.3f}.mp4".format( | |
result['video_id'], | |
int(result["start"]), | |
result["score"] | |
) | |
try: | |
print(" Creating clip to: {}".format(output_path)) | |
self.clipper.create_clip(video_path, result["start"], result["end"], output_path) | |
print(" Clip created successfully") | |
# If clip was created successfully, add to results | |
clips.append({ | |
"path": output_path, | |
"timestamps": (result["start"], result["end"]), | |
"score": result["score"], | |
"text": result.get("text", ""), # Include text for context | |
"video_id": result["video_id"] | |
}) | |
except (subprocess.SubprocessError, FileNotFoundError) as e: | |
print(" ERROR: Could not create video clip: {}".format(e)) | |
# Create a placeholder info file instead of a video clip | |
info_path = output_path.with_suffix('.txt') | |
with open(info_path, 'w') as f: | |
f.write("Video: {}\n".format(result['video_id'])) | |
f.write("Time: {:.1f}s - {:.1f}s\n".format(result['start'], result['end'])) | |
f.write("Text: {}\n".format(result.get('text', ''))) | |
f.write("Score: {:.4f}\n".format(result['score'])) | |
f.write("Error: {}\n".format(str(e))) | |
print(" Created info file instead: {}".format(info_path)) | |
# Add text-only result | |
clips.append({ | |
"path": info_path, | |
"timestamps": (result["start"], result["end"]), | |
"score": result["score"], | |
"text": result.get("text", ""), | |
"video_id": result["video_id"] | |
}) | |
print("\nProcessed {} clips successfully".format(len(clips))) | |
return clips | |
except Exception as e: | |
print("Error in VideoQAEngine.ask: {}".format(e)) | |
traceback.print_exc() | |
return [] | |