Spaces:
Build error
Build error
"""Integration tests for performance and error scenario testing.""" | |
import time | |
import pytest | |
import threading | |
import queue | |
import psutil | |
import os | |
from unittest.mock import Mock, patch, MagicMock | |
from typing import List, Dict, Any, Optional | |
from src.application.services.audio_processing_service import AudioProcessingApplicationService | |
from src.application.dtos.audio_upload_dto import AudioUploadDto | |
from src.application.dtos.processing_request_dto import ProcessingRequestDto | |
from src.application.dtos.processing_result_dto import ProcessingResultDto | |
from src.infrastructure.config.dependency_container import DependencyContainer | |
from src.infrastructure.config.app_config import AppConfig | |
from src.domain.models.audio_content import AudioContent | |
from src.domain.models.text_content import TextContent | |
from src.domain.exceptions import ( | |
SpeechRecognitionException, | |
TranslationFailedException, | |
SpeechSynthesisException, | |
AudioProcessingException, | |
ProviderNotAvailableException | |
) | |
class TestPerformanceAndErrors: | |
"""Integration tests for performance and error scenarios.""" | |
def mock_config(self, tmp_path): | |
"""Create mock configuration for testing.""" | |
config = Mock(spec=AppConfig) | |
# Processing configuration | |
config.get_processing_config.return_value = { | |
'max_file_size_mb': 100, | |
'supported_audio_formats': ['wav', 'mp3', 'flac'], | |
'temp_dir': str(tmp_path), | |
'cleanup_temp_files': True, | |
'processing_timeout': 300, # 5 minutes | |
'max_concurrent_requests': 10 | |
} | |
# Logging configuration | |
config.get_logging_config.return_value = { | |
'level': 'INFO', | |
'enable_file_logging': False, | |
'log_file_path': str(tmp_path / 'test.log'), | |
'format': '%(asctime)s - %(name)s - %(levelname)s - %(message)s' | |
} | |
# STT configuration | |
config.get_stt_config.return_value = { | |
'preferred_providers': ['parakeet', 'whisper-small', 'whisper-medium'], | |
'provider_timeout': 60.0, | |
'max_retries': 2 | |
} | |
# TTS configuration | |
config.get_tts_config.return_value = { | |
'preferred_providers': ['kokoro', 'dia', 'cosyvoice2', 'dummy'], | |
'provider_timeout': 30.0, | |
'max_retries': 3 | |
} | |
# Translation configuration | |
config.get_translation_config.return_value = { | |
'provider_timeout': 45.0, | |
'max_retries': 2, | |
'chunk_size': 512 | |
} | |
return config | |
def mock_container(self, mock_config): | |
"""Create mock dependency container.""" | |
container = Mock(spec=DependencyContainer) | |
container.resolve.return_value = mock_config | |
# Mock providers with configurable behavior | |
self._setup_mock_providers(container) | |
return container | |
def _setup_mock_providers(self, container): | |
"""Setup mock providers with configurable behavior.""" | |
# Mock STT provider | |
mock_stt_provider = Mock() | |
mock_stt_provider.transcribe.return_value = TextContent( | |
text="Performance test transcription", | |
language="en" | |
) | |
container.get_stt_provider.return_value = mock_stt_provider | |
# Mock translation provider | |
mock_translation_provider = Mock() | |
mock_translation_provider.translate.return_value = TextContent( | |
text="Transcripción de prueba de rendimiento", | |
language="es" | |
) | |
container.get_translation_provider.return_value = mock_translation_provider | |
# Mock TTS provider | |
mock_tts_provider = Mock() | |
mock_tts_provider.synthesize.return_value = AudioContent( | |
data=b"performance_test_audio_data", | |
format="wav", | |
sample_rate=22050, | |
duration=3.0 | |
) | |
container.get_tts_provider.return_value = mock_tts_provider | |
def audio_service(self, mock_container, mock_config): | |
"""Create audio processing service.""" | |
return AudioProcessingApplicationService(mock_container, mock_config) | |
def sample_request(self): | |
"""Create sample processing request.""" | |
audio_upload = AudioUploadDto( | |
filename="performance_test.wav", | |
content=b"performance_test_audio_data", | |
content_type="audio/wav", | |
size=len(b"performance_test_audio_data") | |
) | |
return ProcessingRequestDto( | |
audio=audio_upload, | |
asr_model="whisper-small", | |
target_language="es", | |
voice="kokoro", | |
speed=1.0, | |
requires_translation=True | |
) | |
def test_processing_time_performance(self, audio_service, sample_request): | |
"""Test processing time performance benchmarks.""" | |
# Warm up | |
audio_service.process_audio_pipeline(sample_request) | |
# Measure processing time | |
start_time = time.time() | |
result = audio_service.process_audio_pipeline(sample_request) | |
end_time = time.time() | |
processing_time = end_time - start_time | |
assert result.success is True | |
assert result.processing_time > 0 | |
assert result.processing_time <= processing_time + 0.1 # Allow small margin | |
# Performance benchmark: should complete within reasonable time | |
assert processing_time < 5.0 # Should complete within 5 seconds for mock providers | |
def test_memory_usage_performance(self, audio_service, sample_request): | |
"""Test memory usage during processing.""" | |
process = psutil.Process(os.getpid()) | |
# Measure initial memory | |
initial_memory = process.memory_info().rss | |
# Process multiple requests | |
for _ in range(10): | |
result = audio_service.process_audio_pipeline(sample_request) | |
assert result.success is True | |
# Measure final memory | |
final_memory = process.memory_info().rss | |
memory_increase = final_memory - initial_memory | |
# Memory increase should be reasonable (less than 100MB for test data) | |
assert memory_increase < 100 * 1024 * 1024 | |
def test_concurrent_processing_performance(self, audio_service, sample_request): | |
"""Test performance under concurrent load.""" | |
num_threads = 5 | |
results_queue = queue.Queue() | |
def process_request(): | |
try: | |
start_time = time.time() | |
result = audio_service.process_audio_pipeline(sample_request) | |
end_time = time.time() | |
results_queue.put((result, end_time - start_time)) | |
except Exception as e: | |
results_queue.put(e) | |
# Start concurrent processing | |
threads = [] | |
start_time = time.time() | |
for _ in range(num_threads): | |
thread = threading.Thread(target=process_request) | |
threads.append(thread) | |
thread.start() | |
# Wait for completion | |
for thread in threads: | |
thread.join() | |
total_time = time.time() - start_time | |
# Collect results | |
results = [] | |
processing_times = [] | |
while not results_queue.empty(): | |
item = results_queue.get() | |
if isinstance(item, Exception): | |
pytest.fail(f"Concurrent processing failed: {item}") | |
result, proc_time = item | |
results.append(result) | |
processing_times.append(proc_time) | |
# Verify all succeeded | |
assert len(results) == num_threads | |
for result in results: | |
assert result.success is True | |
# Performance checks | |
avg_processing_time = sum(processing_times) / len(processing_times) | |
assert avg_processing_time < 10.0 # Average should be reasonable | |
assert total_time < 15.0 # Total concurrent time should be reasonable | |
def test_large_file_performance(self, audio_service): | |
"""Test performance with large audio files.""" | |
# Create large audio file (10MB) | |
large_content = b"x" * (10 * 1024 * 1024) | |
audio_upload = AudioUploadDto( | |
filename="large_performance_test.wav", | |
content=large_content, | |
content_type="audio/wav", | |
size=len(large_content) | |
) | |
request = ProcessingRequestDto( | |
audio=audio_upload, | |
asr_model="whisper-small", | |
target_language="es", | |
voice="kokoro", | |
speed=1.0, | |
requires_translation=True | |
) | |
start_time = time.time() | |
result = audio_service.process_audio_pipeline(request) | |
end_time = time.time() | |
processing_time = end_time - start_time | |
assert result.success is True | |
# Large files should still complete within reasonable time | |
assert processing_time < 30.0 | |
def test_stt_provider_failure_recovery(self, audio_service, sample_request, mock_container): | |
"""Test recovery from STT provider failures.""" | |
mock_stt_provider = mock_container.get_stt_provider.return_value | |
# Mock first call to fail, second to succeed | |
mock_stt_provider.transcribe.side_effect = [ | |
SpeechRecognitionException("STT provider temporarily unavailable"), | |
TextContent(text="Recovered transcription", language="en") | |
] | |
result = audio_service.process_audio_pipeline(sample_request) | |
assert result.success is True | |
assert "Recovered transcription" in result.original_text | |
def test_translation_provider_failure_recovery(self, audio_service, sample_request, mock_container): | |
"""Test recovery from translation provider failures.""" | |
mock_translation_provider = mock_container.get_translation_provider.return_value | |
# Mock first call to fail, second to succeed | |
mock_translation_provider.translate.side_effect = [ | |
TranslationFailedException("Translation service temporarily unavailable"), | |
TextContent(text="Traducción recuperada", language="es") | |
] | |
result = audio_service.process_audio_pipeline(sample_request) | |
assert result.success is True | |
assert "Traducción recuperada" in result.translated_text | |
def test_tts_provider_failure_recovery(self, audio_service, sample_request, mock_container): | |
"""Test recovery from TTS provider failures.""" | |
mock_tts_provider = mock_container.get_tts_provider.return_value | |
# Mock first call to fail, second to succeed | |
mock_tts_provider.synthesize.side_effect = [ | |
SpeechSynthesisException("TTS provider temporarily unavailable"), | |
AudioContent( | |
data=b"recovered_audio_data", | |
format="wav", | |
sample_rate=22050, | |
duration=2.5 | |
) | |
] | |
result = audio_service.process_audio_pipeline(sample_request) | |
assert result.success is True | |
assert result.audio_path is not None | |
def test_multiple_provider_failures(self, audio_service, sample_request, mock_container): | |
"""Test handling of multiple provider failures.""" | |
# Mock all providers to fail initially | |
mock_stt_provider = mock_container.get_stt_provider.return_value | |
mock_translation_provider = mock_container.get_translation_provider.return_value | |
mock_tts_provider = mock_container.get_tts_provider.return_value | |
mock_stt_provider.transcribe.side_effect = SpeechRecognitionException("STT failed") | |
mock_translation_provider.translate.side_effect = TranslationFailedException("Translation failed") | |
mock_tts_provider.synthesize.side_effect = SpeechSynthesisException("TTS failed") | |
result = audio_service.process_audio_pipeline(sample_request) | |
assert result.success is False | |
assert result.error_message is not None | |
assert result.error_code is not None | |
def test_timeout_handling(self, audio_service, sample_request, mock_container): | |
"""Test handling of provider timeouts.""" | |
mock_stt_provider = mock_container.get_stt_provider.return_value | |
def slow_transcribe(*args, **kwargs): | |
time.sleep(2.0) # Simulate slow processing | |
return TextContent(text="Slow transcription", language="en") | |
mock_stt_provider.transcribe.side_effect = slow_transcribe | |
start_time = time.time() | |
result = audio_service.process_audio_pipeline(sample_request) | |
end_time = time.time() | |
processing_time = end_time - start_time | |
# Should complete despite slow provider | |
assert result.success is True | |
assert processing_time >= 2.0 # Should include the delay | |
def test_invalid_input_handling(self, audio_service): | |
"""Test handling of invalid input data.""" | |
# Test with invalid audio format | |
invalid_audio = AudioUploadDto( | |
filename="invalid.xyz", | |
content=b"invalid_audio_data", | |
content_type="audio/xyz", | |
size=len(b"invalid_audio_data") | |
) | |
request = ProcessingRequestDto( | |
audio=invalid_audio, | |
asr_model="whisper-small", | |
target_language="es", | |
voice="kokoro", | |
speed=1.0, | |
requires_translation=True | |
) | |
result = audio_service.process_audio_pipeline(request) | |
assert result.success is False | |
assert result.error_code is not None | |
assert "format" in result.error_message.lower() or "unsupported" in result.error_message.lower() | |
def test_oversized_file_handling(self, audio_service, mock_config): | |
"""Test handling of oversized files.""" | |
# Mock config to have small file size limit | |
mock_config.get_processing_config.return_value['max_file_size_mb'] = 1 | |
# Create file larger than limit | |
large_content = b"x" * (2 * 1024 * 1024) # 2MB | |
oversized_audio = AudioUploadDto( | |
filename="oversized.wav", | |
content=large_content, | |
content_type="audio/wav", | |
size=len(large_content) | |
) | |
request = ProcessingRequestDto( | |
audio=oversized_audio, | |
asr_model="whisper-small", | |
target_language="es", | |
voice="kokoro", | |
speed=1.0, | |
requires_translation=True | |
) | |
result = audio_service.process_audio_pipeline(request) | |
assert result.success is False | |
assert result.error_code is not None | |
assert "size" in result.error_message.lower() or "large" in result.error_message.lower() | |
def test_corrupted_audio_handling(self, audio_service): | |
"""Test handling of corrupted audio data.""" | |
corrupted_audio = AudioUploadDto( | |
filename="corrupted.wav", | |
content=b"corrupted_data_not_audio", | |
content_type="audio/wav", | |
size=len(b"corrupted_data_not_audio") | |
) | |
request = ProcessingRequestDto( | |
audio=corrupted_audio, | |
asr_model="whisper-small", | |
target_language="es", | |
voice="kokoro", | |
speed=1.0, | |
requires_translation=True | |
) | |
result = audio_service.process_audio_pipeline(request) | |
# Should handle gracefully (success depends on implementation) | |
assert result.error_message is None or "audio" in result.error_message.lower() | |
def test_network_error_simulation(self, audio_service, sample_request, mock_container): | |
"""Test handling of network-related errors.""" | |
mock_translation_provider = mock_container.get_translation_provider.return_value | |
# Simulate network errors | |
mock_translation_provider.translate.side_effect = [ | |
ConnectionError("Network connection failed"), | |
TimeoutError("Request timed out"), | |
TextContent(text="Network recovered translation", language="es") | |
] | |
result = audio_service.process_audio_pipeline(sample_request) | |
# Should recover from network errors | |
assert result.success is True | |
assert "Network recovered translation" in result.translated_text | |
def test_resource_exhaustion_handling(self, audio_service, sample_request): | |
"""Test handling of resource exhaustion scenarios.""" | |
# Simulate memory pressure by processing many requests | |
results = [] | |
for i in range(20): # Process many requests | |
result = audio_service.process_audio_pipeline(sample_request) | |
results.append(result) | |
# All should succeed despite resource pressure | |
assert result.success is True | |
# Verify all completed successfully | |
assert len(results) == 20 | |
for result in results: | |
assert result.success is True | |
def test_error_correlation_tracking(self, audio_service, sample_request, mock_container): | |
"""Test error correlation tracking across pipeline stages.""" | |
mock_stt_provider = mock_container.get_stt_provider.return_value | |
mock_stt_provider.transcribe.side_effect = SpeechRecognitionException("STT correlation test error") | |
result = audio_service.process_audio_pipeline(sample_request) | |
assert result.success is False | |
assert result.metadata is not None | |
assert 'correlation_id' in result.metadata | |
# Verify correlation ID is consistent | |
correlation_id = result.metadata['correlation_id'] | |
assert isinstance(correlation_id, str) | |
assert len(correlation_id) > 0 | |
def test_graceful_degradation(self, audio_service, sample_request, mock_container): | |
"""Test graceful degradation when some features fail.""" | |
# Mock translation to fail but allow STT and TTS to succeed | |
mock_translation_provider = mock_container.get_translation_provider.return_value | |
mock_translation_provider.translate.side_effect = TranslationFailedException("Translation unavailable") | |
# Modify request to not require translation | |
sample_request.requires_translation = False | |
sample_request.target_language = "en" # Same as source | |
result = audio_service.process_audio_pipeline(sample_request) | |
# Should succeed without translation | |
assert result.success is True | |
assert result.translated_text is None # No translation performed | |
def test_circuit_breaker_behavior(self, audio_service, sample_request, mock_container): | |
"""Test circuit breaker behavior under repeated failures.""" | |
mock_tts_provider = mock_container.get_tts_provider.return_value | |
# Mock repeated failures to trigger circuit breaker | |
mock_tts_provider.synthesize.side_effect = SpeechSynthesisException("Repeated TTS failure") | |
results = [] | |
for _ in range(5): # Multiple attempts | |
result = audio_service.process_audio_pipeline(sample_request) | |
results.append(result) | |
# All should fail, but circuit breaker should prevent excessive retries | |
for result in results: | |
assert result.success is False | |
assert result.error_code is not None | |
def test_performance_metrics_collection(self, audio_service, sample_request): | |
"""Test collection of performance metrics.""" | |
result = audio_service.process_audio_pipeline(sample_request) | |
assert result.success is True | |
assert result.processing_time > 0 | |
assert result.metadata is not None | |
# Verify performance-related metadata | |
metadata = result.metadata | |
assert 'correlation_id' in metadata | |
assert 'asr_model' in metadata | |
assert 'target_language' in metadata | |
assert 'voice' in metadata | |
def test_stress_testing(self, audio_service, sample_request): | |
"""Test system behavior under stress conditions.""" | |
num_requests = 50 | |
results = [] | |
start_time = time.time() | |
for i in range(num_requests): | |
result = audio_service.process_audio_pipeline(sample_request) | |
results.append(result) | |
end_time = time.time() | |
total_time = end_time - start_time | |
# Verify all requests completed | |
assert len(results) == num_requests | |
# Calculate success rate | |
successful_results = [r for r in results if r.success] | |
success_rate = len(successful_results) / len(results) | |
# Should maintain high success rate under stress | |
assert success_rate >= 0.95 # At least 95% success rate | |
# Performance should remain reasonable | |
avg_time_per_request = total_time / num_requests | |
assert avg_time_per_request < 1.0 # Average less than 1 second per request |