teachingAssistant / tests /integration /test_performance_and_errors.py
Michael Hu
set parakeet model to default asr
f7aaf3b
"""Integration tests for performance and error scenario testing."""
import time
import pytest
import threading
import queue
import psutil
import os
from unittest.mock import Mock, patch, MagicMock
from typing import List, Dict, Any, Optional
from src.application.services.audio_processing_service import AudioProcessingApplicationService
from src.application.dtos.audio_upload_dto import AudioUploadDto
from src.application.dtos.processing_request_dto import ProcessingRequestDto
from src.application.dtos.processing_result_dto import ProcessingResultDto
from src.infrastructure.config.dependency_container import DependencyContainer
from src.infrastructure.config.app_config import AppConfig
from src.domain.models.audio_content import AudioContent
from src.domain.models.text_content import TextContent
from src.domain.exceptions import (
SpeechRecognitionException,
TranslationFailedException,
SpeechSynthesisException,
AudioProcessingException,
ProviderNotAvailableException
)
class TestPerformanceAndErrors:
"""Integration tests for performance and error scenarios."""
@pytest.fixture
def mock_config(self, tmp_path):
"""Create mock configuration for testing."""
config = Mock(spec=AppConfig)
# Processing configuration
config.get_processing_config.return_value = {
'max_file_size_mb': 100,
'supported_audio_formats': ['wav', 'mp3', 'flac'],
'temp_dir': str(tmp_path),
'cleanup_temp_files': True,
'processing_timeout': 300, # 5 minutes
'max_concurrent_requests': 10
}
# Logging configuration
config.get_logging_config.return_value = {
'level': 'INFO',
'enable_file_logging': False,
'log_file_path': str(tmp_path / 'test.log'),
'format': '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
}
# STT configuration
config.get_stt_config.return_value = {
'preferred_providers': ['parakeet', 'whisper-small', 'whisper-medium'],
'provider_timeout': 60.0,
'max_retries': 2
}
# TTS configuration
config.get_tts_config.return_value = {
'preferred_providers': ['kokoro', 'dia', 'cosyvoice2', 'dummy'],
'provider_timeout': 30.0,
'max_retries': 3
}
# Translation configuration
config.get_translation_config.return_value = {
'provider_timeout': 45.0,
'max_retries': 2,
'chunk_size': 512
}
return config
@pytest.fixture
def mock_container(self, mock_config):
"""Create mock dependency container."""
container = Mock(spec=DependencyContainer)
container.resolve.return_value = mock_config
# Mock providers with configurable behavior
self._setup_mock_providers(container)
return container
def _setup_mock_providers(self, container):
"""Setup mock providers with configurable behavior."""
# Mock STT provider
mock_stt_provider = Mock()
mock_stt_provider.transcribe.return_value = TextContent(
text="Performance test transcription",
language="en"
)
container.get_stt_provider.return_value = mock_stt_provider
# Mock translation provider
mock_translation_provider = Mock()
mock_translation_provider.translate.return_value = TextContent(
text="Transcripción de prueba de rendimiento",
language="es"
)
container.get_translation_provider.return_value = mock_translation_provider
# Mock TTS provider
mock_tts_provider = Mock()
mock_tts_provider.synthesize.return_value = AudioContent(
data=b"performance_test_audio_data",
format="wav",
sample_rate=22050,
duration=3.0
)
container.get_tts_provider.return_value = mock_tts_provider
@pytest.fixture
def audio_service(self, mock_container, mock_config):
"""Create audio processing service."""
return AudioProcessingApplicationService(mock_container, mock_config)
@pytest.fixture
def sample_request(self):
"""Create sample processing request."""
audio_upload = AudioUploadDto(
filename="performance_test.wav",
content=b"performance_test_audio_data",
content_type="audio/wav",
size=len(b"performance_test_audio_data")
)
return ProcessingRequestDto(
audio=audio_upload,
asr_model="whisper-small",
target_language="es",
voice="kokoro",
speed=1.0,
requires_translation=True
)
def test_processing_time_performance(self, audio_service, sample_request):
"""Test processing time performance benchmarks."""
# Warm up
audio_service.process_audio_pipeline(sample_request)
# Measure processing time
start_time = time.time()
result = audio_service.process_audio_pipeline(sample_request)
end_time = time.time()
processing_time = end_time - start_time
assert result.success is True
assert result.processing_time > 0
assert result.processing_time <= processing_time + 0.1 # Allow small margin
# Performance benchmark: should complete within reasonable time
assert processing_time < 5.0 # Should complete within 5 seconds for mock providers
def test_memory_usage_performance(self, audio_service, sample_request):
"""Test memory usage during processing."""
process = psutil.Process(os.getpid())
# Measure initial memory
initial_memory = process.memory_info().rss
# Process multiple requests
for _ in range(10):
result = audio_service.process_audio_pipeline(sample_request)
assert result.success is True
# Measure final memory
final_memory = process.memory_info().rss
memory_increase = final_memory - initial_memory
# Memory increase should be reasonable (less than 100MB for test data)
assert memory_increase < 100 * 1024 * 1024
def test_concurrent_processing_performance(self, audio_service, sample_request):
"""Test performance under concurrent load."""
num_threads = 5
results_queue = queue.Queue()
def process_request():
try:
start_time = time.time()
result = audio_service.process_audio_pipeline(sample_request)
end_time = time.time()
results_queue.put((result, end_time - start_time))
except Exception as e:
results_queue.put(e)
# Start concurrent processing
threads = []
start_time = time.time()
for _ in range(num_threads):
thread = threading.Thread(target=process_request)
threads.append(thread)
thread.start()
# Wait for completion
for thread in threads:
thread.join()
total_time = time.time() - start_time
# Collect results
results = []
processing_times = []
while not results_queue.empty():
item = results_queue.get()
if isinstance(item, Exception):
pytest.fail(f"Concurrent processing failed: {item}")
result, proc_time = item
results.append(result)
processing_times.append(proc_time)
# Verify all succeeded
assert len(results) == num_threads
for result in results:
assert result.success is True
# Performance checks
avg_processing_time = sum(processing_times) / len(processing_times)
assert avg_processing_time < 10.0 # Average should be reasonable
assert total_time < 15.0 # Total concurrent time should be reasonable
def test_large_file_performance(self, audio_service):
"""Test performance with large audio files."""
# Create large audio file (10MB)
large_content = b"x" * (10 * 1024 * 1024)
audio_upload = AudioUploadDto(
filename="large_performance_test.wav",
content=large_content,
content_type="audio/wav",
size=len(large_content)
)
request = ProcessingRequestDto(
audio=audio_upload,
asr_model="whisper-small",
target_language="es",
voice="kokoro",
speed=1.0,
requires_translation=True
)
start_time = time.time()
result = audio_service.process_audio_pipeline(request)
end_time = time.time()
processing_time = end_time - start_time
assert result.success is True
# Large files should still complete within reasonable time
assert processing_time < 30.0
def test_stt_provider_failure_recovery(self, audio_service, sample_request, mock_container):
"""Test recovery from STT provider failures."""
mock_stt_provider = mock_container.get_stt_provider.return_value
# Mock first call to fail, second to succeed
mock_stt_provider.transcribe.side_effect = [
SpeechRecognitionException("STT provider temporarily unavailable"),
TextContent(text="Recovered transcription", language="en")
]
result = audio_service.process_audio_pipeline(sample_request)
assert result.success is True
assert "Recovered transcription" in result.original_text
def test_translation_provider_failure_recovery(self, audio_service, sample_request, mock_container):
"""Test recovery from translation provider failures."""
mock_translation_provider = mock_container.get_translation_provider.return_value
# Mock first call to fail, second to succeed
mock_translation_provider.translate.side_effect = [
TranslationFailedException("Translation service temporarily unavailable"),
TextContent(text="Traducción recuperada", language="es")
]
result = audio_service.process_audio_pipeline(sample_request)
assert result.success is True
assert "Traducción recuperada" in result.translated_text
def test_tts_provider_failure_recovery(self, audio_service, sample_request, mock_container):
"""Test recovery from TTS provider failures."""
mock_tts_provider = mock_container.get_tts_provider.return_value
# Mock first call to fail, second to succeed
mock_tts_provider.synthesize.side_effect = [
SpeechSynthesisException("TTS provider temporarily unavailable"),
AudioContent(
data=b"recovered_audio_data",
format="wav",
sample_rate=22050,
duration=2.5
)
]
result = audio_service.process_audio_pipeline(sample_request)
assert result.success is True
assert result.audio_path is not None
def test_multiple_provider_failures(self, audio_service, sample_request, mock_container):
"""Test handling of multiple provider failures."""
# Mock all providers to fail initially
mock_stt_provider = mock_container.get_stt_provider.return_value
mock_translation_provider = mock_container.get_translation_provider.return_value
mock_tts_provider = mock_container.get_tts_provider.return_value
mock_stt_provider.transcribe.side_effect = SpeechRecognitionException("STT failed")
mock_translation_provider.translate.side_effect = TranslationFailedException("Translation failed")
mock_tts_provider.synthesize.side_effect = SpeechSynthesisException("TTS failed")
result = audio_service.process_audio_pipeline(sample_request)
assert result.success is False
assert result.error_message is not None
assert result.error_code is not None
def test_timeout_handling(self, audio_service, sample_request, mock_container):
"""Test handling of provider timeouts."""
mock_stt_provider = mock_container.get_stt_provider.return_value
def slow_transcribe(*args, **kwargs):
time.sleep(2.0) # Simulate slow processing
return TextContent(text="Slow transcription", language="en")
mock_stt_provider.transcribe.side_effect = slow_transcribe
start_time = time.time()
result = audio_service.process_audio_pipeline(sample_request)
end_time = time.time()
processing_time = end_time - start_time
# Should complete despite slow provider
assert result.success is True
assert processing_time >= 2.0 # Should include the delay
def test_invalid_input_handling(self, audio_service):
"""Test handling of invalid input data."""
# Test with invalid audio format
invalid_audio = AudioUploadDto(
filename="invalid.xyz",
content=b"invalid_audio_data",
content_type="audio/xyz",
size=len(b"invalid_audio_data")
)
request = ProcessingRequestDto(
audio=invalid_audio,
asr_model="whisper-small",
target_language="es",
voice="kokoro",
speed=1.0,
requires_translation=True
)
result = audio_service.process_audio_pipeline(request)
assert result.success is False
assert result.error_code is not None
assert "format" in result.error_message.lower() or "unsupported" in result.error_message.lower()
def test_oversized_file_handling(self, audio_service, mock_config):
"""Test handling of oversized files."""
# Mock config to have small file size limit
mock_config.get_processing_config.return_value['max_file_size_mb'] = 1
# Create file larger than limit
large_content = b"x" * (2 * 1024 * 1024) # 2MB
oversized_audio = AudioUploadDto(
filename="oversized.wav",
content=large_content,
content_type="audio/wav",
size=len(large_content)
)
request = ProcessingRequestDto(
audio=oversized_audio,
asr_model="whisper-small",
target_language="es",
voice="kokoro",
speed=1.0,
requires_translation=True
)
result = audio_service.process_audio_pipeline(request)
assert result.success is False
assert result.error_code is not None
assert "size" in result.error_message.lower() or "large" in result.error_message.lower()
def test_corrupted_audio_handling(self, audio_service):
"""Test handling of corrupted audio data."""
corrupted_audio = AudioUploadDto(
filename="corrupted.wav",
content=b"corrupted_data_not_audio",
content_type="audio/wav",
size=len(b"corrupted_data_not_audio")
)
request = ProcessingRequestDto(
audio=corrupted_audio,
asr_model="whisper-small",
target_language="es",
voice="kokoro",
speed=1.0,
requires_translation=True
)
result = audio_service.process_audio_pipeline(request)
# Should handle gracefully (success depends on implementation)
assert result.error_message is None or "audio" in result.error_message.lower()
def test_network_error_simulation(self, audio_service, sample_request, mock_container):
"""Test handling of network-related errors."""
mock_translation_provider = mock_container.get_translation_provider.return_value
# Simulate network errors
mock_translation_provider.translate.side_effect = [
ConnectionError("Network connection failed"),
TimeoutError("Request timed out"),
TextContent(text="Network recovered translation", language="es")
]
result = audio_service.process_audio_pipeline(sample_request)
# Should recover from network errors
assert result.success is True
assert "Network recovered translation" in result.translated_text
def test_resource_exhaustion_handling(self, audio_service, sample_request):
"""Test handling of resource exhaustion scenarios."""
# Simulate memory pressure by processing many requests
results = []
for i in range(20): # Process many requests
result = audio_service.process_audio_pipeline(sample_request)
results.append(result)
# All should succeed despite resource pressure
assert result.success is True
# Verify all completed successfully
assert len(results) == 20
for result in results:
assert result.success is True
def test_error_correlation_tracking(self, audio_service, sample_request, mock_container):
"""Test error correlation tracking across pipeline stages."""
mock_stt_provider = mock_container.get_stt_provider.return_value
mock_stt_provider.transcribe.side_effect = SpeechRecognitionException("STT correlation test error")
result = audio_service.process_audio_pipeline(sample_request)
assert result.success is False
assert result.metadata is not None
assert 'correlation_id' in result.metadata
# Verify correlation ID is consistent
correlation_id = result.metadata['correlation_id']
assert isinstance(correlation_id, str)
assert len(correlation_id) > 0
def test_graceful_degradation(self, audio_service, sample_request, mock_container):
"""Test graceful degradation when some features fail."""
# Mock translation to fail but allow STT and TTS to succeed
mock_translation_provider = mock_container.get_translation_provider.return_value
mock_translation_provider.translate.side_effect = TranslationFailedException("Translation unavailable")
# Modify request to not require translation
sample_request.requires_translation = False
sample_request.target_language = "en" # Same as source
result = audio_service.process_audio_pipeline(sample_request)
# Should succeed without translation
assert result.success is True
assert result.translated_text is None # No translation performed
def test_circuit_breaker_behavior(self, audio_service, sample_request, mock_container):
"""Test circuit breaker behavior under repeated failures."""
mock_tts_provider = mock_container.get_tts_provider.return_value
# Mock repeated failures to trigger circuit breaker
mock_tts_provider.synthesize.side_effect = SpeechSynthesisException("Repeated TTS failure")
results = []
for _ in range(5): # Multiple attempts
result = audio_service.process_audio_pipeline(sample_request)
results.append(result)
# All should fail, but circuit breaker should prevent excessive retries
for result in results:
assert result.success is False
assert result.error_code is not None
def test_performance_metrics_collection(self, audio_service, sample_request):
"""Test collection of performance metrics."""
result = audio_service.process_audio_pipeline(sample_request)
assert result.success is True
assert result.processing_time > 0
assert result.metadata is not None
# Verify performance-related metadata
metadata = result.metadata
assert 'correlation_id' in metadata
assert 'asr_model' in metadata
assert 'target_language' in metadata
assert 'voice' in metadata
def test_stress_testing(self, audio_service, sample_request):
"""Test system behavior under stress conditions."""
num_requests = 50
results = []
start_time = time.time()
for i in range(num_requests):
result = audio_service.process_audio_pipeline(sample_request)
results.append(result)
end_time = time.time()
total_time = end_time - start_time
# Verify all requests completed
assert len(results) == num_requests
# Calculate success rate
successful_results = [r for r in results if r.success]
success_rate = len(successful_results) / len(results)
# Should maintain high success rate under stress
assert success_rate >= 0.95 # At least 95% success rate
# Performance should remain reasonable
avg_time_per_request = total_time / num_requests
assert avg_time_per_request < 1.0 # Average less than 1 second per request