"""Integration tests for performance and error scenario testing.""" import time import pytest import threading import queue import psutil import os from unittest.mock import Mock, patch, MagicMock from typing import List, Dict, Any, Optional from src.application.services.audio_processing_service import AudioProcessingApplicationService from src.application.dtos.audio_upload_dto import AudioUploadDto from src.application.dtos.processing_request_dto import ProcessingRequestDto from src.application.dtos.processing_result_dto import ProcessingResultDto from src.infrastructure.config.dependency_container import DependencyContainer from src.infrastructure.config.app_config import AppConfig from src.domain.models.audio_content import AudioContent from src.domain.models.text_content import TextContent from src.domain.exceptions import ( SpeechRecognitionException, TranslationFailedException, SpeechSynthesisException, AudioProcessingException, ProviderNotAvailableException ) class TestPerformanceAndErrors: """Integration tests for performance and error scenarios.""" @pytest.fixture def mock_config(self, tmp_path): """Create mock configuration for testing.""" config = Mock(spec=AppConfig) # Processing configuration config.get_processing_config.return_value = { 'max_file_size_mb': 100, 'supported_audio_formats': ['wav', 'mp3', 'flac'], 'temp_dir': str(tmp_path), 'cleanup_temp_files': True, 'processing_timeout': 300, # 5 minutes 'max_concurrent_requests': 10 } # Logging configuration config.get_logging_config.return_value = { 'level': 'INFO', 'enable_file_logging': False, 'log_file_path': str(tmp_path / 'test.log'), 'format': '%(asctime)s - %(name)s - %(levelname)s - %(message)s' } # STT configuration config.get_stt_config.return_value = { 'preferred_providers': ['parakeet', 'whisper-small', 'whisper-medium'], 'provider_timeout': 60.0, 'max_retries': 2 } # TTS configuration config.get_tts_config.return_value = { 'preferred_providers': ['kokoro', 'dia', 'cosyvoice2', 'dummy'], 'provider_timeout': 30.0, 'max_retries': 3 } # Translation configuration config.get_translation_config.return_value = { 'provider_timeout': 45.0, 'max_retries': 2, 'chunk_size': 512 } return config @pytest.fixture def mock_container(self, mock_config): """Create mock dependency container.""" container = Mock(spec=DependencyContainer) container.resolve.return_value = mock_config # Mock providers with configurable behavior self._setup_mock_providers(container) return container def _setup_mock_providers(self, container): """Setup mock providers with configurable behavior.""" # Mock STT provider mock_stt_provider = Mock() mock_stt_provider.transcribe.return_value = TextContent( text="Performance test transcription", language="en" ) container.get_stt_provider.return_value = mock_stt_provider # Mock translation provider mock_translation_provider = Mock() mock_translation_provider.translate.return_value = TextContent( text="Transcripción de prueba de rendimiento", language="es" ) container.get_translation_provider.return_value = mock_translation_provider # Mock TTS provider mock_tts_provider = Mock() mock_tts_provider.synthesize.return_value = AudioContent( data=b"performance_test_audio_data", format="wav", sample_rate=22050, duration=3.0 ) container.get_tts_provider.return_value = mock_tts_provider @pytest.fixture def audio_service(self, mock_container, mock_config): """Create audio processing service.""" return AudioProcessingApplicationService(mock_container, mock_config) @pytest.fixture def sample_request(self): """Create sample processing request.""" audio_upload = AudioUploadDto( filename="performance_test.wav", content=b"performance_test_audio_data", content_type="audio/wav", size=len(b"performance_test_audio_data") ) return ProcessingRequestDto( audio=audio_upload, asr_model="whisper-small", target_language="es", voice="kokoro", speed=1.0, requires_translation=True ) def test_processing_time_performance(self, audio_service, sample_request): """Test processing time performance benchmarks.""" # Warm up audio_service.process_audio_pipeline(sample_request) # Measure processing time start_time = time.time() result = audio_service.process_audio_pipeline(sample_request) end_time = time.time() processing_time = end_time - start_time assert result.success is True assert result.processing_time > 0 assert result.processing_time <= processing_time + 0.1 # Allow small margin # Performance benchmark: should complete within reasonable time assert processing_time < 5.0 # Should complete within 5 seconds for mock providers def test_memory_usage_performance(self, audio_service, sample_request): """Test memory usage during processing.""" process = psutil.Process(os.getpid()) # Measure initial memory initial_memory = process.memory_info().rss # Process multiple requests for _ in range(10): result = audio_service.process_audio_pipeline(sample_request) assert result.success is True # Measure final memory final_memory = process.memory_info().rss memory_increase = final_memory - initial_memory # Memory increase should be reasonable (less than 100MB for test data) assert memory_increase < 100 * 1024 * 1024 def test_concurrent_processing_performance(self, audio_service, sample_request): """Test performance under concurrent load.""" num_threads = 5 results_queue = queue.Queue() def process_request(): try: start_time = time.time() result = audio_service.process_audio_pipeline(sample_request) end_time = time.time() results_queue.put((result, end_time - start_time)) except Exception as e: results_queue.put(e) # Start concurrent processing threads = [] start_time = time.time() for _ in range(num_threads): thread = threading.Thread(target=process_request) threads.append(thread) thread.start() # Wait for completion for thread in threads: thread.join() total_time = time.time() - start_time # Collect results results = [] processing_times = [] while not results_queue.empty(): item = results_queue.get() if isinstance(item, Exception): pytest.fail(f"Concurrent processing failed: {item}") result, proc_time = item results.append(result) processing_times.append(proc_time) # Verify all succeeded assert len(results) == num_threads for result in results: assert result.success is True # Performance checks avg_processing_time = sum(processing_times) / len(processing_times) assert avg_processing_time < 10.0 # Average should be reasonable assert total_time < 15.0 # Total concurrent time should be reasonable def test_large_file_performance(self, audio_service): """Test performance with large audio files.""" # Create large audio file (10MB) large_content = b"x" * (10 * 1024 * 1024) audio_upload = AudioUploadDto( filename="large_performance_test.wav", content=large_content, content_type="audio/wav", size=len(large_content) ) request = ProcessingRequestDto( audio=audio_upload, asr_model="whisper-small", target_language="es", voice="kokoro", speed=1.0, requires_translation=True ) start_time = time.time() result = audio_service.process_audio_pipeline(request) end_time = time.time() processing_time = end_time - start_time assert result.success is True # Large files should still complete within reasonable time assert processing_time < 30.0 def test_stt_provider_failure_recovery(self, audio_service, sample_request, mock_container): """Test recovery from STT provider failures.""" mock_stt_provider = mock_container.get_stt_provider.return_value # Mock first call to fail, second to succeed mock_stt_provider.transcribe.side_effect = [ SpeechRecognitionException("STT provider temporarily unavailable"), TextContent(text="Recovered transcription", language="en") ] result = audio_service.process_audio_pipeline(sample_request) assert result.success is True assert "Recovered transcription" in result.original_text def test_translation_provider_failure_recovery(self, audio_service, sample_request, mock_container): """Test recovery from translation provider failures.""" mock_translation_provider = mock_container.get_translation_provider.return_value # Mock first call to fail, second to succeed mock_translation_provider.translate.side_effect = [ TranslationFailedException("Translation service temporarily unavailable"), TextContent(text="Traducción recuperada", language="es") ] result = audio_service.process_audio_pipeline(sample_request) assert result.success is True assert "Traducción recuperada" in result.translated_text def test_tts_provider_failure_recovery(self, audio_service, sample_request, mock_container): """Test recovery from TTS provider failures.""" mock_tts_provider = mock_container.get_tts_provider.return_value # Mock first call to fail, second to succeed mock_tts_provider.synthesize.side_effect = [ SpeechSynthesisException("TTS provider temporarily unavailable"), AudioContent( data=b"recovered_audio_data", format="wav", sample_rate=22050, duration=2.5 ) ] result = audio_service.process_audio_pipeline(sample_request) assert result.success is True assert result.audio_path is not None def test_multiple_provider_failures(self, audio_service, sample_request, mock_container): """Test handling of multiple provider failures.""" # Mock all providers to fail initially mock_stt_provider = mock_container.get_stt_provider.return_value mock_translation_provider = mock_container.get_translation_provider.return_value mock_tts_provider = mock_container.get_tts_provider.return_value mock_stt_provider.transcribe.side_effect = SpeechRecognitionException("STT failed") mock_translation_provider.translate.side_effect = TranslationFailedException("Translation failed") mock_tts_provider.synthesize.side_effect = SpeechSynthesisException("TTS failed") result = audio_service.process_audio_pipeline(sample_request) assert result.success is False assert result.error_message is not None assert result.error_code is not None def test_timeout_handling(self, audio_service, sample_request, mock_container): """Test handling of provider timeouts.""" mock_stt_provider = mock_container.get_stt_provider.return_value def slow_transcribe(*args, **kwargs): time.sleep(2.0) # Simulate slow processing return TextContent(text="Slow transcription", language="en") mock_stt_provider.transcribe.side_effect = slow_transcribe start_time = time.time() result = audio_service.process_audio_pipeline(sample_request) end_time = time.time() processing_time = end_time - start_time # Should complete despite slow provider assert result.success is True assert processing_time >= 2.0 # Should include the delay def test_invalid_input_handling(self, audio_service): """Test handling of invalid input data.""" # Test with invalid audio format invalid_audio = AudioUploadDto( filename="invalid.xyz", content=b"invalid_audio_data", content_type="audio/xyz", size=len(b"invalid_audio_data") ) request = ProcessingRequestDto( audio=invalid_audio, asr_model="whisper-small", target_language="es", voice="kokoro", speed=1.0, requires_translation=True ) result = audio_service.process_audio_pipeline(request) assert result.success is False assert result.error_code is not None assert "format" in result.error_message.lower() or "unsupported" in result.error_message.lower() def test_oversized_file_handling(self, audio_service, mock_config): """Test handling of oversized files.""" # Mock config to have small file size limit mock_config.get_processing_config.return_value['max_file_size_mb'] = 1 # Create file larger than limit large_content = b"x" * (2 * 1024 * 1024) # 2MB oversized_audio = AudioUploadDto( filename="oversized.wav", content=large_content, content_type="audio/wav", size=len(large_content) ) request = ProcessingRequestDto( audio=oversized_audio, asr_model="whisper-small", target_language="es", voice="kokoro", speed=1.0, requires_translation=True ) result = audio_service.process_audio_pipeline(request) assert result.success is False assert result.error_code is not None assert "size" in result.error_message.lower() or "large" in result.error_message.lower() def test_corrupted_audio_handling(self, audio_service): """Test handling of corrupted audio data.""" corrupted_audio = AudioUploadDto( filename="corrupted.wav", content=b"corrupted_data_not_audio", content_type="audio/wav", size=len(b"corrupted_data_not_audio") ) request = ProcessingRequestDto( audio=corrupted_audio, asr_model="whisper-small", target_language="es", voice="kokoro", speed=1.0, requires_translation=True ) result = audio_service.process_audio_pipeline(request) # Should handle gracefully (success depends on implementation) assert result.error_message is None or "audio" in result.error_message.lower() def test_network_error_simulation(self, audio_service, sample_request, mock_container): """Test handling of network-related errors.""" mock_translation_provider = mock_container.get_translation_provider.return_value # Simulate network errors mock_translation_provider.translate.side_effect = [ ConnectionError("Network connection failed"), TimeoutError("Request timed out"), TextContent(text="Network recovered translation", language="es") ] result = audio_service.process_audio_pipeline(sample_request) # Should recover from network errors assert result.success is True assert "Network recovered translation" in result.translated_text def test_resource_exhaustion_handling(self, audio_service, sample_request): """Test handling of resource exhaustion scenarios.""" # Simulate memory pressure by processing many requests results = [] for i in range(20): # Process many requests result = audio_service.process_audio_pipeline(sample_request) results.append(result) # All should succeed despite resource pressure assert result.success is True # Verify all completed successfully assert len(results) == 20 for result in results: assert result.success is True def test_error_correlation_tracking(self, audio_service, sample_request, mock_container): """Test error correlation tracking across pipeline stages.""" mock_stt_provider = mock_container.get_stt_provider.return_value mock_stt_provider.transcribe.side_effect = SpeechRecognitionException("STT correlation test error") result = audio_service.process_audio_pipeline(sample_request) assert result.success is False assert result.metadata is not None assert 'correlation_id' in result.metadata # Verify correlation ID is consistent correlation_id = result.metadata['correlation_id'] assert isinstance(correlation_id, str) assert len(correlation_id) > 0 def test_graceful_degradation(self, audio_service, sample_request, mock_container): """Test graceful degradation when some features fail.""" # Mock translation to fail but allow STT and TTS to succeed mock_translation_provider = mock_container.get_translation_provider.return_value mock_translation_provider.translate.side_effect = TranslationFailedException("Translation unavailable") # Modify request to not require translation sample_request.requires_translation = False sample_request.target_language = "en" # Same as source result = audio_service.process_audio_pipeline(sample_request) # Should succeed without translation assert result.success is True assert result.translated_text is None # No translation performed def test_circuit_breaker_behavior(self, audio_service, sample_request, mock_container): """Test circuit breaker behavior under repeated failures.""" mock_tts_provider = mock_container.get_tts_provider.return_value # Mock repeated failures to trigger circuit breaker mock_tts_provider.synthesize.side_effect = SpeechSynthesisException("Repeated TTS failure") results = [] for _ in range(5): # Multiple attempts result = audio_service.process_audio_pipeline(sample_request) results.append(result) # All should fail, but circuit breaker should prevent excessive retries for result in results: assert result.success is False assert result.error_code is not None def test_performance_metrics_collection(self, audio_service, sample_request): """Test collection of performance metrics.""" result = audio_service.process_audio_pipeline(sample_request) assert result.success is True assert result.processing_time > 0 assert result.metadata is not None # Verify performance-related metadata metadata = result.metadata assert 'correlation_id' in metadata assert 'asr_model' in metadata assert 'target_language' in metadata assert 'voice' in metadata def test_stress_testing(self, audio_service, sample_request): """Test system behavior under stress conditions.""" num_requests = 50 results = [] start_time = time.time() for i in range(num_requests): result = audio_service.process_audio_pipeline(sample_request) results.append(result) end_time = time.time() total_time = end_time - start_time # Verify all requests completed assert len(results) == num_requests # Calculate success rate successful_results = [r for r in results if r.success] success_rate = len(successful_results) / len(results) # Should maintain high success rate under stress assert success_rate >= 0.95 # At least 95% success rate # Performance should remain reasonable avg_time_per_request = total_time / num_requests assert avg_time_per_request < 1.0 # Average less than 1 second per request