|
import pytest |
|
import os |
|
import shutil |
|
|
|
import os |
|
import sys |
|
|
|
|
|
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) |
|
from src.video_processing_tool import VideoProcessingTool |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
VIDEO_URL_TRANSCRIPT_DIALOGUE = "https://www.youtube.com/watch?v=1htKBjuUWec" |
|
VIDEO_URL_OBJECT_COUNT = "https://www.youtube.com/watch?v=L1vXCYZAYYM" |
|
VIDEO_URL_NO_TRANSCRIPT = "https://www.youtube.com/watch?v=dQw4w9WgXcQ" |
|
VIDEO_URL_SHORT_GENERAL = "https://www.youtube.com/watch?v=jNQXAC9IVRw" |
|
|
|
|
|
@pytest.fixture(scope="session") |
|
def model_files(): |
|
"""Creates dummy model files for testing CV functionality if real ones aren't provided.""" |
|
|
|
test_model_dir = os.path.join(os.path.dirname(__file__), "test_cv_models") |
|
os.makedirs(test_model_dir, exist_ok=True) |
|
|
|
cfg_path = os.path.join(test_model_dir, "dummy-yolov3-tiny.cfg") |
|
weights_path = os.path.join(test_model_dir, "dummy-yolov3-tiny.weights") |
|
names_path = os.path.join(test_model_dir, "dummy-coco.names") |
|
|
|
|
|
|
|
|
|
if not os.path.exists(cfg_path): open(cfg_path, 'w').write("[net]\nwidth=416\nheight=416") |
|
if not os.path.exists(weights_path): open(weights_path, 'wb').write(b'dummyweights') |
|
if not os.path.exists(names_path): open(names_path, 'w').write("bird\ncat\ndog\nperson\n") |
|
|
|
yield { |
|
"cfg": cfg_path, |
|
"weights": weights_path, |
|
"names": names_path, |
|
"dir": test_model_dir |
|
} |
|
|
|
|
|
|
|
@pytest.fixture |
|
def video_tool(model_files): |
|
"""Initializes VideoProcessingTool with dummy model paths for testing.""" |
|
|
|
test_temp_dir_base = os.path.join(os.path.dirname(__file__), "test_temp_videos") |
|
os.makedirs(test_temp_dir_base, exist_ok=True) |
|
|
|
tool = VideoProcessingTool( |
|
model_cfg_path=model_files["cfg"], |
|
model_weights_path=model_files["weights"], |
|
class_names_path=model_files["names"], |
|
temp_dir_base=test_temp_dir_base |
|
) |
|
yield tool |
|
tool.cleanup() |
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_extract_video_id(video_tool): |
|
assert video_tool._extract_video_id("https://www.youtube.com/watch?v=1htKBjuUWec") == "1htKBjuUWec" |
|
assert video_tool._extract_video_id("https://youtu.be/1htKBjuUWec") == "1htKBjuUWec" |
|
assert video_tool._extract_video_id("https://www.youtube.com/embed/1htKBjuUWec") == "1htKBjuUWec" |
|
assert video_tool._extract_video_id("https://www.youtube.com/watch?v=1htKBjuUWec&t=10s") == "1htKBjuUWec" |
|
assert video_tool._extract_video_id("invalid_url") is None |
|
|
|
@pytest.mark.integration |
|
def test_download_video(video_tool): |
|
result = video_tool.download_video(VIDEO_URL_SHORT_GENERAL, resolution="240p") |
|
assert result.get("success"), f"Download failed: {result.get('error')}" |
|
assert "file_path" in result |
|
assert os.path.exists(result["file_path"]) |
|
assert result["file_path"].endswith(".mp4") or result["file_path"].startswith(video_tool._extract_video_id(VIDEO_URL_SHORT_GENERAL)) |
|
|
|
@pytest.mark.integration |
|
def test_get_video_transcript_success(video_tool): |
|
result = video_tool.get_video_transcript(VIDEO_URL_TRANSCRIPT_DIALOGUE) |
|
assert result.get("success"), f"Transcript fetch failed: {result.get('error')}" |
|
assert "transcript" in result and len(result["transcript"]) > 0 |
|
assert "transcript_entries" in result and len(result["transcript_entries"]) > 0 |
|
|
|
assert "isn't that hot" in result["transcript"].lower() |
|
|
|
@pytest.mark.integration |
|
def test_get_video_transcript_no_transcript(video_tool): |
|
|
|
|
|
result = video_tool.get_video_transcript(VIDEO_URL_NO_TRANSCRIPT, languages=['xx-YY']) |
|
assert not result.get("success") |
|
assert "error" in result |
|
assert "No transcript found" in result["error"] or "Transcripts are disabled" in result["error"] |
|
|
|
@pytest.mark.integration |
|
def test_find_dialogue_response_success(video_tool): |
|
transcript_data = video_tool.get_video_transcript(VIDEO_URL_TRANSCRIPT_DIALOGUE) |
|
assert transcript_data.get("success"), f"Transcript fetch failed for dialogue test: {transcript_data.get('error')}" |
|
|
|
result = video_tool.find_dialogue_response(transcript_data["transcript_entries"], "Isn't that hot?") |
|
assert result.get("success"), f"Dialogue search failed: {result.get('error')}" |
|
assert "response_text" in result |
|
|
|
assert "Extremely".lower() in result["response_text"].lower() |
|
|
|
@pytest.mark.integration |
|
def test_find_dialogue_response_not_found(video_tool): |
|
transcript_data = video_tool.get_video_transcript(VIDEO_URL_TRANSCRIPT_DIALOGUE) |
|
assert transcript_data.get("success") |
|
|
|
result = video_tool.find_dialogue_response(transcript_data["transcript_entries"], "This phrase is not in the video") |
|
assert not result.get("success") |
|
assert "not found in transcript" in result.get("error", "") |
|
|
|
@pytest.mark.integration |
|
@pytest.mark.cv_dependent |
|
def test_object_counting_interface(video_tool): |
|
"""Tests the object counting call, expecting it to run with dummy models even if counts are zero.""" |
|
if not video_tool.object_detection_model: |
|
pytest.skip("CV model not loaded, skipping object count test.") |
|
|
|
download_result = video_tool.download_video(VIDEO_URL_OBJECT_COUNT, resolution="240p") |
|
assert download_result.get("success"), f"Video download failed for object counting: {download_result.get('error')}" |
|
video_path = download_result["file_path"] |
|
|
|
result = video_tool.count_objects_in_video(video_path, target_classes=["bird"], confidence_threshold=0.1, frame_skip=30) |
|
|
|
|
|
assert result.get("success"), f"Object counting failed: {result.get('error')}" |
|
assert "max_simultaneous_bird" in result |
|
|
|
|
|
@pytest.mark.integration |
|
@pytest.mark.cv_dependent |
|
def test_process_video_object_count_flow(video_tool): |
|
if not video_tool.object_detection_model: |
|
pytest.skip("CV model not loaded, skipping process_video object count test.") |
|
|
|
query_params = { |
|
"target_classes": ["bird"], |
|
"resolution": "240p", |
|
"confidence_threshold": 0.1, |
|
"frame_skip": 30 |
|
} |
|
result = video_tool.process_video(VIDEO_URL_OBJECT_COUNT, "object_count", query_params=query_params) |
|
assert result.get("success"), f"process_video for object_count failed: {result.get('error')}" |
|
assert "max_simultaneous_bird" in result |
|
|
|
@pytest.mark.integration |
|
def test_process_video_dialogue_flow(video_tool): |
|
query_params = {"query_phrase": "Isn't that hot?"} |
|
result = video_tool.process_video(VIDEO_URL_TRANSCRIPT_DIALOGUE, "dialogue_response", query_params=query_params) |
|
assert result.get("success"), f"process_video for dialogue_response failed: {result.get('error')}" |
|
assert "extremely" in result.get("response_text", "").lower() |
|
|
|
@pytest.mark.integration |
|
def test_process_video_transcript_flow(video_tool): |
|
result = video_tool.process_video(VIDEO_URL_TRANSCRIPT_DIALOGUE, "transcript") |
|
assert result.get("success"), f"process_video for transcript failed: {result.get('error')}" |
|
assert "transcript" in result and len(result["transcript"]) > 0 |
|
|
|
def test_cleanup_removes_temp_dir(model_files): |
|
test_temp_dir_base = os.path.join(os.path.dirname(__file__), "test_temp_cleanup") |
|
os.makedirs(test_temp_dir_base, exist_ok=True) |
|
tool = VideoProcessingTool( |
|
model_cfg_path=model_files["cfg"], |
|
model_weights_path=model_files["weights"], |
|
class_names_path=model_files["names"], |
|
temp_dir_base=test_temp_dir_base |
|
) |
|
|
|
temp_file_in_tool_dir = os.path.join(tool.temp_dir, "dummy.txt") |
|
open(temp_file_in_tool_dir, 'w').write("test") |
|
assert os.path.exists(tool.temp_dir) |
|
assert os.path.exists(temp_file_in_tool_dir) |
|
|
|
tool_temp_dir_path = tool.temp_dir |
|
tool.cleanup() |
|
assert not os.path.exists(tool_temp_dir_path), f"Temp directory {tool_temp_dir_path} was not removed." |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|