"""Integration tests for the complete audio processing pipeline.""" import os import tempfile import time import pytest from pathlib import Path from unittest.mock import Mock, patch, MagicMock from typing import Dict, Any, Optional from src.application.services.audio_processing_service import AudioProcessingApplicationService from src.application.dtos.audio_upload_dto import AudioUploadDto from src.application.dtos.processing_request_dto import ProcessingRequestDto from src.application.dtos.processing_result_dto import ProcessingResultDto from src.infrastructure.config.dependency_container import DependencyContainer from src.infrastructure.config.app_config import AppConfig from src.domain.models.audio_content import AudioContent from src.domain.models.text_content import TextContent from src.domain.models.voice_settings import VoiceSettings from src.domain.exceptions import ( SpeechRecognitionException, TranslationFailedException, SpeechSynthesisException ) class TestAudioProcessingPipeline: """Integration tests for the complete audio processing pipeline.""" @pytest.fixture def temp_dir(self): """Create temporary directory for test files.""" with tempfile.TemporaryDirectory() as temp_dir: yield temp_dir @pytest.fixture def mock_config(self, temp_dir): """Create mock configuration for testing.""" config = Mock(spec=AppConfig) # Processing configuration config.get_processing_config.return_value = { 'max_file_size_mb': 50, 'supported_audio_formats': ['wav', 'mp3', 'flac'], 'temp_dir': temp_dir, 'cleanup_temp_files': True } # Logging configuration config.get_logging_config.return_value = { 'level': 'INFO', 'enable_file_logging': False, 'log_file_path': os.path.join(temp_dir, 'test.log'), 'format': '%(asctime)s - %(name)s - %(levelname)s - %(message)s' } # STT configuration config.get_stt_config.return_value = { 'preferred_providers': ['parakeet', 'whisper-small', 'whisper-medium'] } # TTS configuration config.get_tts_config.return_value = { 'preferred_providers': ['kokoro', 'dia', 'cosyvoice2', 'dummy'] } return config @pytest.fixture def mock_container(self, mock_config): """Create mock dependency container for testing.""" container = Mock(spec=DependencyContainer) container.resolve.return_value = mock_config # Mock STT provider mock_stt_provider = Mock() mock_stt_provider.transcribe.return_value = TextContent( text="Hello, this is a test transcription.", language="en" ) container.get_stt_provider.return_value = mock_stt_provider # Mock translation provider mock_translation_provider = Mock() mock_translation_provider.translate.return_value = TextContent( text="Hola, esta es una transcripción de prueba.", language="es" ) container.get_translation_provider.return_value = mock_translation_provider # Mock TTS provider mock_tts_provider = Mock() mock_audio_content = AudioContent( data=b"fake_audio_data", format="wav", sample_rate=22050, duration=2.5 ) mock_tts_provider.synthesize.return_value = mock_audio_content container.get_tts_provider.return_value = mock_tts_provider return container @pytest.fixture def audio_service(self, mock_container, mock_config): """Create audio processing service for testing.""" return AudioProcessingApplicationService(mock_container, mock_config) @pytest.fixture def sample_audio_upload(self): """Create sample audio upload DTO.""" return AudioUploadDto( filename="test_audio.wav", content=b"fake_wav_audio_data", content_type="audio/wav", size=1024 ) @pytest.fixture def sample_processing_request(self, sample_audio_upload): """Create sample processing request DTO.""" return ProcessingRequestDto( audio=sample_audio_upload, asr_model="whisper-small", target_language="es", source_language="en", voice="kokoro", speed=1.0, requires_translation=True ) def test_complete_pipeline_success(self, audio_service, sample_processing_request): """Test successful execution of the complete audio processing pipeline.""" # Execute the pipeline result = audio_service.process_audio_pipeline(sample_processing_request) # Verify successful result assert isinstance(result, ProcessingResultDto) assert result.success is True assert result.error_message is None assert result.original_text == "Hello, this is a test transcription." assert result.translated_text == "Hola, esta es una transcripción de prueba." assert result.audio_path is not None assert result.processing_time > 0 assert result.metadata is not None assert 'correlation_id' in result.metadata def test_pipeline_without_translation(self, audio_service, sample_audio_upload): """Test pipeline execution without translation (same language).""" request = ProcessingRequestDto( audio=sample_audio_upload, asr_model="whisper-small", target_language="en", source_language="en", voice="kokoro", speed=1.0, requires_translation=False ) result = audio_service.process_audio_pipeline(request) assert result.success is True assert result.original_text == "Hello, this is a test transcription." assert result.translated_text is None # No translation performed assert result.audio_path is not None def test_pipeline_with_different_voice_settings(self, audio_service, sample_audio_upload): """Test pipeline with different voice settings.""" request = ProcessingRequestDto( audio=sample_audio_upload, asr_model="whisper-medium", target_language="fr", source_language="en", voice="dia", speed=1.5, requires_translation=True ) result = audio_service.process_audio_pipeline(request) assert result.success is True assert result.metadata['voice'] == "dia" assert result.metadata['speed'] == 1.5 assert result.metadata['asr_model'] == "whisper-medium" def test_pipeline_performance_metrics(self, audio_service, sample_processing_request): """Test that pipeline captures performance metrics.""" start_time = time.time() result = audio_service.process_audio_pipeline(sample_processing_request) end_time = time.time() assert result.success is True assert result.processing_time > 0 assert result.processing_time <= (end_time - start_time) + 0.1 # Allow small margin assert 'correlation_id' in result.metadata def test_pipeline_with_large_file(self, audio_service, mock_config): """Test pipeline behavior with large audio files.""" # Create large audio upload large_audio = AudioUploadDto( filename="large_audio.wav", content=b"x" * (10 * 1024 * 1024), # 10MB content_type="audio/wav", size=10 * 1024 * 1024 ) request = ProcessingRequestDto( audio=large_audio, asr_model="whisper-small", target_language="es", voice="kokoro", speed=1.0, requires_translation=True ) result = audio_service.process_audio_pipeline(request) assert result.success is True assert result.metadata['file_size'] == 10 * 1024 * 1024 def test_pipeline_file_cleanup(self, audio_service, sample_processing_request, temp_dir): """Test that temporary files are properly cleaned up.""" # Count files before processing files_before = len(list(Path(temp_dir).rglob("*"))) result = audio_service.process_audio_pipeline(sample_processing_request) # Verify processing succeeded assert result.success is True # Verify cleanup occurred (no additional temp files) files_after = len(list(Path(temp_dir).rglob("*"))) assert files_after <= files_before + 1 # Allow for output file def test_pipeline_correlation_id_tracking(self, audio_service, sample_processing_request): """Test that correlation IDs are properly tracked throughout the pipeline.""" result = audio_service.process_audio_pipeline(sample_processing_request) assert result.success is True assert 'correlation_id' in result.metadata correlation_id = result.metadata['correlation_id'] assert isinstance(correlation_id, str) assert len(correlation_id) > 0 # Verify correlation ID is used in status tracking status = audio_service.get_processing_status(correlation_id) assert status['correlation_id'] == correlation_id def test_pipeline_metadata_completeness(self, audio_service, sample_processing_request): """Test that pipeline result contains complete metadata.""" result = audio_service.process_audio_pipeline(sample_processing_request) assert result.success is True assert result.metadata is not None expected_metadata_keys = [ 'correlation_id', 'asr_model', 'target_language', 'voice', 'speed', 'translation_required' ] for key in expected_metadata_keys: assert key in result.metadata def test_pipeline_supported_configurations(self, audio_service): """Test retrieval of supported pipeline configurations.""" config = audio_service.get_supported_configurations() assert 'asr_models' in config assert 'voices' in config assert 'languages' in config assert 'audio_formats' in config assert 'max_file_size_mb' in config assert 'speed_range' in config assert isinstance(config['asr_models'], list) assert isinstance(config['voices'], list) assert isinstance(config['languages'], list) assert len(config['asr_models']) > 0 assert len(config['voices']) > 0 def test_pipeline_context_manager(self, mock_container, mock_config): """Test audio service as context manager.""" with AudioProcessingApplicationService(mock_container, mock_config) as service: assert service is not None # Service should be usable within context config = service.get_supported_configurations() assert config is not None def test_pipeline_multiple_requests(self, audio_service, sample_audio_upload): """Test processing multiple requests in sequence.""" requests = [] for i in range(3): request = ProcessingRequestDto( audio=sample_audio_upload, asr_model="whisper-small", target_language="es", voice="kokoro", speed=1.0, requires_translation=True ) requests.append(request) results = [] for request in requests: result = audio_service.process_audio_pipeline(request) results.append(result) # Verify all requests succeeded for result in results: assert result.success is True assert result.original_text is not None assert result.translated_text is not None # Verify each request has unique correlation ID correlation_ids = [r.metadata['correlation_id'] for r in results] assert len(set(correlation_ids)) == 3 # All unique def test_pipeline_concurrent_processing(self, audio_service, sample_processing_request): """Test pipeline behavior under concurrent processing.""" import threading import queue results_queue = queue.Queue() def process_request(): try: result = audio_service.process_audio_pipeline(sample_processing_request) results_queue.put(result) except Exception as e: results_queue.put(e) # Start multiple threads threads = [] for _ in range(3): thread = threading.Thread(target=process_request) threads.append(thread) thread.start() # Wait for completion for thread in threads: thread.join() # Verify all results results = [] while not results_queue.empty(): result = results_queue.get() if isinstance(result, Exception): pytest.fail(f"Concurrent processing failed: {result}") results.append(result) assert len(results) == 3 for result in results: assert result.success is True def test_pipeline_memory_usage(self, audio_service, sample_processing_request): """Test pipeline memory usage and cleanup.""" import psutil import os process = psutil.Process(os.getpid()) memory_before = process.memory_info().rss # Process multiple requests for _ in range(5): result = audio_service.process_audio_pipeline(sample_processing_request) assert result.success is True memory_after = process.memory_info().rss memory_increase = memory_after - memory_before # Memory increase should be reasonable (less than 50MB for test data) assert memory_increase < 50 * 1024 * 1024 def test_pipeline_with_streaming_synthesis(self, audio_service, sample_processing_request, mock_container): """Test pipeline with streaming TTS synthesis.""" # Mock streaming TTS provider mock_tts_provider = mock_container.get_tts_provider.return_value def mock_stream(): for i in range(3): yield AudioContent( data=f"chunk_{i}".encode(), format="wav", sample_rate=22050, duration=0.5 ) mock_tts_provider.synthesize_stream.return_value = mock_stream() result = audio_service.process_audio_pipeline(sample_processing_request) assert result.success is True assert result.audio_path is not None def test_pipeline_configuration_validation(self, audio_service): """Test pipeline configuration validation.""" config = audio_service.get_supported_configurations() # Verify configuration structure assert isinstance(config['asr_models'], list) assert isinstance(config['voices'], list) assert isinstance(config['languages'], list) assert isinstance(config['audio_formats'], list) assert isinstance(config['max_file_size_mb'], (int, float)) assert isinstance(config['speed_range'], dict) # Verify speed range speed_range = config['speed_range'] assert 'min' in speed_range assert 'max' in speed_range assert speed_range['min'] < speed_range['max'] assert speed_range['min'] > 0 assert speed_range['max'] <= 3.0 def test_pipeline_error_recovery_logging(self, audio_service, sample_processing_request, mock_container): """Test that error recovery attempts are properly logged.""" # Mock STT provider to fail first time, succeed second time mock_stt_provider = mock_container.get_stt_provider.return_value mock_stt_provider.transcribe.side_effect = [ SpeechRecognitionException("First attempt failed"), TextContent(text="Recovered transcription", language="en") ] with patch('src.application.services.audio_processing_service.logger') as mock_logger: result = audio_service.process_audio_pipeline(sample_processing_request) assert result.success is True # Verify error and recovery were logged mock_logger.warning.assert_called() mock_logger.info.assert_called() def test_pipeline_end_to_end_timing(self, audio_service, sample_processing_request): """Test end-to-end pipeline timing and performance.""" start_time = time.time() result = audio_service.process_audio_pipeline(sample_processing_request) end_time = time.time() total_time = end_time - start_time assert result.success is True assert result.processing_time > 0 assert result.processing_time <= total_time # For mock providers, processing should be fast assert total_time < 5.0 # Should complete within 5 seconds # Verify timing metadata assert 'correlation_id' in result.metadata timing_info = result.metadata assert timing_info is not None