Spaces:
Runtime error
Runtime error
File size: 6,311 Bytes
a22e84b |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 |
from llm_engineering.domain.queries import Query
from .topic_retriever import TopicAwareRetriever
from .video_processor import VideoClipper
from pathlib import Path
import subprocess
import os
import traceback
import time
class VideoQAEngine:
def __init__(self, video_root: str, qdrant_storage_path="/Users/yufeizhen/Desktop/project/qdrant_storage"):
print("Initializing VideoQAEngine")
print("Video root: {}".format(video_root))
print("Qdrant storage path: {}".format(qdrant_storage_path))
# Verify video root exists
if not os.path.exists(video_root):
print("WARNING: Video root directory not found: {}".format(video_root))
# Ensure Qdrant storage path exists
os.makedirs(os.path.dirname(qdrant_storage_path), exist_ok=True)
# Initialize components with retry logic
retry_count = 0
max_retries = 3
while retry_count < max_retries:
try:
self.retriever = TopicAwareRetriever(qdrant_storage_path=qdrant_storage_path)
self.clipper = VideoClipper()
self.video_root = Path(video_root)
print("VideoQAEngine initialized successfully")
break
except Exception as e:
retry_count += 1
print("Error initializing components (attempt {}/{}): {}".format(
retry_count, max_retries, e))
if retry_count >= max_retries:
print("Failed to initialize components after {} attempts".format(max_retries))
raise
time.sleep(2)
def ask(self, question: str, output_dir: str = "clips"):
print("\n--- Processing query: '{}' ---".format(question))
try:
# Create a Query object
query = Query.from_str(question)
# Perform retrieval with diagnostics
print("Retrieving relevant video segments...")
start_time = time.time()
results = self.retriever.retrieve(query.content)
retrieval_time = time.time() - start_time
print("Retrieval completed in {:.2f} seconds".format(retrieval_time))
# Create output directory if it doesn't exist
os.makedirs(output_dir, exist_ok=True)
# Handle empty results
if not results:
print("No results found for query: '{}'".format(question))
return []
print("Found {} relevant video segments".format(len(results)))
# Process each result to create clips
clips = []
for i, result in enumerate(results):
print("\nProcessing result {}/{}:".format(i+1, len(results)))
print(" Video ID: {}".format(result["video_id"]))
print(" Timestamps: {:.1f}s - {:.1f}s".format(result["start"], result["end"]))
print(" Score: {:.4f}".format(result["score"]))
# Check if video file exists
video_path = self.video_root / result["video_id"] / "video.mp4"
if not video_path.exists():
# Try alternative filename patterns
alt_paths = list(self.video_root.glob("{}/*.mp4".format(result["video_id"])))
if alt_paths:
video_path = alt_paths[0]
print(" Found alternative video path: {}".format(video_path))
else:
print(" ERROR: Video file not found at {}".format(video_path))
continue
# Create unique output path
output_path = Path(output_dir) / "clip_{}_{}_{:.3f}.mp4".format(
result['video_id'],
int(result["start"]),
result["score"]
)
try:
print(" Creating clip to: {}".format(output_path))
self.clipper.create_clip(video_path, result["start"], result["end"], output_path)
print(" Clip created successfully")
# If clip was created successfully, add to results
clips.append({
"path": output_path,
"timestamps": (result["start"], result["end"]),
"score": result["score"],
"text": result.get("text", ""), # Include text for context
"video_id": result["video_id"]
})
except (subprocess.SubprocessError, FileNotFoundError) as e:
print(" ERROR: Could not create video clip: {}".format(e))
# Create a placeholder info file instead of a video clip
info_path = output_path.with_suffix('.txt')
with open(info_path, 'w') as f:
f.write("Video: {}\n".format(result['video_id']))
f.write("Time: {:.1f}s - {:.1f}s\n".format(result['start'], result['end']))
f.write("Text: {}\n".format(result.get('text', '')))
f.write("Score: {:.4f}\n".format(result['score']))
f.write("Error: {}\n".format(str(e)))
print(" Created info file instead: {}".format(info_path))
# Add text-only result
clips.append({
"path": info_path,
"timestamps": (result["start"], result["end"]),
"score": result["score"],
"text": result.get("text", ""),
"video_id": result["video_id"]
})
print("\nProcessed {} clips successfully".format(len(clips)))
return clips
except Exception as e:
print("Error in VideoQAEngine.ask: {}".format(e))
traceback.print_exc()
return []
|