Spaces:
Sleeping
Sleeping
| """Integration tests for file handling and cleanup.""" | |
| import os | |
| import tempfile | |
| import shutil | |
| import time | |
| import pytest | |
| from pathlib import Path | |
| from unittest.mock import Mock, patch, MagicMock | |
| from typing import List, Dict, Any | |
| from src.application.services.audio_processing_service import AudioProcessingApplicationService | |
| from src.application.dtos.audio_upload_dto import AudioUploadDto | |
| from src.application.dtos.processing_request_dto import ProcessingRequestDto | |
| from src.infrastructure.config.dependency_container import DependencyContainer | |
| from src.infrastructure.config.app_config import AppConfig | |
| from src.domain.models.audio_content import AudioContent | |
| from src.domain.models.text_content import TextContent | |
| class TestFileHandling: | |
| """Integration tests for file handling and cleanup.""" | |
| def temp_base_dir(self): | |
| """Create base temporary directory for all tests.""" | |
| with tempfile.TemporaryDirectory() as temp_dir: | |
| yield temp_dir | |
| def mock_config(self, temp_base_dir): | |
| """Create mock configuration with temporary directories.""" | |
| config = Mock(spec=AppConfig) | |
| # Processing configuration with temp directory | |
| config.get_processing_config.return_value = { | |
| 'max_file_size_mb': 50, | |
| 'supported_audio_formats': ['wav', 'mp3', 'flac', 'ogg'], | |
| 'temp_dir': temp_base_dir, | |
| 'cleanup_temp_files': True, | |
| 'max_temp_file_age_hours': 24, | |
| 'temp_file_prefix': 'audio_processing_' | |
| } | |
| # Logging configuration | |
| config.get_logging_config.return_value = { | |
| 'level': 'INFO', | |
| 'enable_file_logging': True, | |
| 'log_file_path': os.path.join(temp_base_dir, 'processing.log'), | |
| 'format': '%(asctime)s - %(name)s - %(levelname)s - %(message)s' | |
| } | |
| # STT configuration | |
| config.get_stt_config.return_value = { | |
| 'preferred_providers': ['whisper-small'] | |
| } | |
| # TTS configuration | |
| config.get_tts_config.return_value = { | |
| 'preferred_providers': ['dummy'] | |
| } | |
| return config | |
| def mock_container(self, mock_config): | |
| """Create mock dependency container.""" | |
| container = Mock(spec=DependencyContainer) | |
| container.resolve.return_value = mock_config | |
| # Mock providers | |
| mock_stt_provider = Mock() | |
| mock_stt_provider.transcribe.return_value = TextContent( | |
| text="Test transcription", | |
| language="en" | |
| ) | |
| container.get_stt_provider.return_value = mock_stt_provider | |
| mock_translation_provider = Mock() | |
| mock_translation_provider.translate.return_value = TextContent( | |
| text="Prueba de transcripción", | |
| language="es" | |
| ) | |
| container.get_translation_provider.return_value = mock_translation_provider | |
| mock_tts_provider = Mock() | |
| mock_tts_provider.synthesize.return_value = AudioContent( | |
| data=b"synthesized_audio_data", | |
| format="wav", | |
| sample_rate=22050, | |
| duration=2.0 | |
| ) | |
| container.get_tts_provider.return_value = mock_tts_provider | |
| return container | |
| def audio_service(self, mock_container, mock_config): | |
| """Create audio processing service.""" | |
| return AudioProcessingApplicationService(mock_container, mock_config) | |
| def sample_audio_files(self, temp_base_dir): | |
| """Create sample audio files for testing.""" | |
| files = {} | |
| # Create different audio file types | |
| audio_formats = { | |
| 'wav': b'RIFF\x24\x00\x00\x00WAVEfmt \x10\x00\x00\x00', | |
| 'mp3': b'\xff\xfb\x90\x00\x00\x00\x00\x00\x00\x00\x00\x00', | |
| 'flac': b'fLaC\x00\x00\x00\x22\x10\x00\x10\x00', | |
| 'ogg': b'OggS\x00\x02\x00\x00\x00\x00\x00\x00\x00\x00' | |
| } | |
| for format_name, header in audio_formats.items(): | |
| file_path = os.path.join(temp_base_dir, f'test_audio.{format_name}') | |
| with open(file_path, 'wb') as f: | |
| f.write(header + b'\x00' * 1000) # Add some padding | |
| files[format_name] = file_path | |
| yield files | |
| # Cleanup | |
| for file_path in files.values(): | |
| if os.path.exists(file_path): | |
| os.remove(file_path) | |
| def test_temp_directory_creation(self, audio_service, temp_base_dir): | |
| """Test temporary directory creation and structure.""" | |
| # Create a processing request to trigger temp directory creation | |
| audio_upload = AudioUploadDto( | |
| filename="test.wav", | |
| content=b"fake_audio_data", | |
| content_type="audio/wav", | |
| size=len(b"fake_audio_data") | |
| ) | |
| request = ProcessingRequestDto( | |
| audio=audio_upload, | |
| asr_model="whisper-small", | |
| target_language="es", | |
| voice="dummy", | |
| speed=1.0, | |
| requires_translation=True | |
| ) | |
| # Process and check temp directory creation | |
| result = audio_service.process_audio_pipeline(request) | |
| assert result.success is True | |
| # Verify base temp directory exists | |
| assert os.path.exists(temp_base_dir) | |
| assert os.path.isdir(temp_base_dir) | |
| def test_input_file_handling(self, audio_service, sample_audio_files): | |
| """Test handling of different input audio file formats.""" | |
| for format_name, file_path in sample_audio_files.items(): | |
| with open(file_path, 'rb') as f: | |
| content = f.read() | |
| audio_upload = AudioUploadDto( | |
| filename=f"test.{format_name}", | |
| content=content, | |
| content_type=f"audio/{format_name}", | |
| size=len(content) | |
| ) | |
| request = ProcessingRequestDto( | |
| audio=audio_upload, | |
| asr_model="whisper-small", | |
| target_language="en", | |
| voice="dummy", | |
| speed=1.0, | |
| requires_translation=False | |
| ) | |
| result = audio_service.process_audio_pipeline(request) | |
| assert result.success is True, f"Failed to process {format_name} file" | |
| assert result.audio_path is not None | |
| assert os.path.exists(result.audio_path) | |
| def test_output_file_generation(self, audio_service, temp_base_dir): | |
| """Test output audio file generation.""" | |
| audio_upload = AudioUploadDto( | |
| filename="input.wav", | |
| content=b"input_audio_data", | |
| content_type="audio/wav", | |
| size=len(b"input_audio_data") | |
| ) | |
| request = ProcessingRequestDto( | |
| audio=audio_upload, | |
| asr_model="whisper-small", | |
| target_language="es", | |
| voice="dummy", | |
| speed=1.0, | |
| requires_translation=True | |
| ) | |
| result = audio_service.process_audio_pipeline(request) | |
| assert result.success is True | |
| assert result.audio_path is not None | |
| # Verify output file exists and has content | |
| assert os.path.exists(result.audio_path) | |
| assert os.path.getsize(result.audio_path) > 0 | |
| # Verify file is in expected location | |
| assert temp_base_dir in result.audio_path | |
| def test_temp_file_cleanup_success(self, audio_service, temp_base_dir): | |
| """Test temporary file cleanup after successful processing.""" | |
| initial_files = set(os.listdir(temp_base_dir)) | |
| audio_upload = AudioUploadDto( | |
| filename="cleanup_test.wav", | |
| content=b"cleanup_test_data", | |
| content_type="audio/wav", | |
| size=len(b"cleanup_test_data") | |
| ) | |
| request = ProcessingRequestDto( | |
| audio=audio_upload, | |
| asr_model="whisper-small", | |
| target_language="es", | |
| voice="dummy", | |
| speed=1.0, | |
| requires_translation=True | |
| ) | |
| result = audio_service.process_audio_pipeline(request) | |
| assert result.success is True | |
| # Check that temporary processing files are cleaned up | |
| # (output file should remain) | |
| final_files = set(os.listdir(temp_base_dir)) | |
| new_files = final_files - initial_files | |
| # Should only have the output file and possibly log files | |
| assert len(new_files) <= 2 # output file + possible log file | |
| def test_temp_file_cleanup_on_error(self, audio_service, temp_base_dir, mock_container): | |
| """Test temporary file cleanup when processing fails.""" | |
| # Mock STT provider to fail | |
| mock_stt_provider = mock_container.get_stt_provider.return_value | |
| mock_stt_provider.transcribe.side_effect = Exception("STT failed") | |
| initial_files = set(os.listdir(temp_base_dir)) | |
| audio_upload = AudioUploadDto( | |
| filename="error_test.wav", | |
| content=b"error_test_data", | |
| content_type="audio/wav", | |
| size=len(b"error_test_data") | |
| ) | |
| request = ProcessingRequestDto( | |
| audio=audio_upload, | |
| asr_model="whisper-small", | |
| target_language="es", | |
| voice="dummy", | |
| speed=1.0, | |
| requires_translation=True | |
| ) | |
| result = audio_service.process_audio_pipeline(request) | |
| assert result.success is False | |
| # Verify cleanup occurred even on error | |
| final_files = set(os.listdir(temp_base_dir)) | |
| new_files = final_files - initial_files | |
| # Should have minimal new files (possibly just log files) | |
| assert len(new_files) <= 1 | |
| def test_large_file_handling(self, audio_service, temp_base_dir): | |
| """Test handling of large audio files.""" | |
| # Create large audio content (5MB) | |
| large_content = b"x" * (5 * 1024 * 1024) | |
| audio_upload = AudioUploadDto( | |
| filename="large_file.wav", | |
| content=large_content, | |
| content_type="audio/wav", | |
| size=len(large_content) | |
| ) | |
| request = ProcessingRequestDto( | |
| audio=audio_upload, | |
| asr_model="whisper-small", | |
| target_language="es", | |
| voice="dummy", | |
| speed=1.0, | |
| requires_translation=True | |
| ) | |
| result = audio_service.process_audio_pipeline(request) | |
| assert result.success is True | |
| assert result.audio_path is not None | |
| assert os.path.exists(result.audio_path) | |
| def test_concurrent_file_handling(self, audio_service, temp_base_dir): | |
| """Test concurrent file handling and cleanup.""" | |
| import threading | |
| import queue | |
| results_queue = queue.Queue() | |
| def process_file(file_id): | |
| try: | |
| audio_upload = AudioUploadDto( | |
| filename=f"concurrent_{file_id}.wav", | |
| content=f"concurrent_data_{file_id}".encode(), | |
| content_type="audio/wav", | |
| size=len(f"concurrent_data_{file_id}".encode()) | |
| ) | |
| request = ProcessingRequestDto( | |
| audio=audio_upload, | |
| asr_model="whisper-small", | |
| target_language="es", | |
| voice="dummy", | |
| speed=1.0, | |
| requires_translation=True | |
| ) | |
| result = audio_service.process_audio_pipeline(request) | |
| results_queue.put((file_id, result)) | |
| except Exception as e: | |
| results_queue.put((file_id, e)) | |
| # Start multiple threads | |
| threads = [] | |
| for i in range(3): | |
| thread = threading.Thread(target=process_file, args=(i,)) | |
| threads.append(thread) | |
| thread.start() | |
| # Wait for completion | |
| for thread in threads: | |
| thread.join() | |
| # Verify results | |
| results = {} | |
| while not results_queue.empty(): | |
| file_id, result = results_queue.get() | |
| if isinstance(result, Exception): | |
| pytest.fail(f"Concurrent processing failed for file {file_id}: {result}") | |
| results[file_id] = result | |
| assert len(results) == 3 | |
| for file_id, result in results.items(): | |
| assert result.success is True | |
| assert result.audio_path is not None | |
| assert os.path.exists(result.audio_path) | |
| def test_file_permission_handling(self, audio_service, temp_base_dir): | |
| """Test file permission handling.""" | |
| audio_upload = AudioUploadDto( | |
| filename="permission_test.wav", | |
| content=b"permission_test_data", | |
| content_type="audio/wav", | |
| size=len(b"permission_test_data") | |
| ) | |
| request = ProcessingRequestDto( | |
| audio=audio_upload, | |
| asr_model="whisper-small", | |
| target_language="es", | |
| voice="dummy", | |
| speed=1.0, | |
| requires_translation=True | |
| ) | |
| result = audio_service.process_audio_pipeline(request) | |
| assert result.success is True | |
| assert result.audio_path is not None | |
| # Verify file permissions | |
| file_stat = os.stat(result.audio_path) | |
| assert file_stat.st_mode & 0o600 # At least owner read/write | |
| def test_disk_space_monitoring(self, audio_service, temp_base_dir): | |
| """Test disk space monitoring during processing.""" | |
| import shutil | |
| # Get initial disk space | |
| initial_space = shutil.disk_usage(temp_base_dir) | |
| audio_upload = AudioUploadDto( | |
| filename="space_test.wav", | |
| content=b"space_test_data" * 1000, # Larger content | |
| content_type="audio/wav", | |
| size=len(b"space_test_data" * 1000) | |
| ) | |
| request = ProcessingRequestDto( | |
| audio=audio_upload, | |
| asr_model="whisper-small", | |
| target_language="es", | |
| voice="dummy", | |
| speed=1.0, | |
| requires_translation=True | |
| ) | |
| result = audio_service.process_audio_pipeline(request) | |
| assert result.success is True | |
| # Verify disk space hasn't been exhausted | |
| final_space = shutil.disk_usage(temp_base_dir) | |
| assert final_space.free > 0 | |
| def test_file_naming_conventions(self, audio_service, temp_base_dir): | |
| """Test file naming conventions and uniqueness.""" | |
| results = [] | |
| # Process multiple files to test naming | |
| for i in range(3): | |
| audio_upload = AudioUploadDto( | |
| filename=f"naming_test_{i}.wav", | |
| content=f"naming_test_data_{i}".encode(), | |
| content_type="audio/wav", | |
| size=len(f"naming_test_data_{i}".encode()) | |
| ) | |
| request = ProcessingRequestDto( | |
| audio=audio_upload, | |
| asr_model="whisper-small", | |
| target_language="es", | |
| voice="dummy", | |
| speed=1.0, | |
| requires_translation=True | |
| ) | |
| result = audio_service.process_audio_pipeline(request) | |
| results.append(result) | |
| # Verify all results are successful | |
| for result in results: | |
| assert result.success is True | |
| assert result.audio_path is not None | |
| # Verify unique file names | |
| output_paths = [r.audio_path for r in results] | |
| assert len(set(output_paths)) == len(output_paths) # All unique | |
| # Verify naming convention | |
| for path in output_paths: | |
| filename = os.path.basename(path) | |
| assert filename.startswith("output_") | |
| assert filename.endswith(".wav") | |
| def test_file_encoding_handling(self, audio_service, temp_base_dir): | |
| """Test handling of different file encodings and special characters.""" | |
| # Test with filename containing special characters | |
| special_filename = "test_file_ñáéíóú_测试.wav" | |
| audio_upload = AudioUploadDto( | |
| filename=special_filename, | |
| content=b"encoding_test_data", | |
| content_type="audio/wav", | |
| size=len(b"encoding_test_data") | |
| ) | |
| request = ProcessingRequestDto( | |
| audio=audio_upload, | |
| asr_model="whisper-small", | |
| target_language="es", | |
| voice="dummy", | |
| speed=1.0, | |
| requires_translation=True | |
| ) | |
| result = audio_service.process_audio_pipeline(request) | |
| assert result.success is True | |
| assert result.audio_path is not None | |
| assert os.path.exists(result.audio_path) | |
| def test_file_cleanup_context_manager(self, mock_container, mock_config, temp_base_dir): | |
| """Test file cleanup using context manager.""" | |
| initial_files = set(os.listdir(temp_base_dir)) | |
| with AudioProcessingApplicationService(mock_container, mock_config) as service: | |
| audio_upload = AudioUploadDto( | |
| filename="context_test.wav", | |
| content=b"context_test_data", | |
| content_type="audio/wav", | |
| size=len(b"context_test_data") | |
| ) | |
| request = ProcessingRequestDto( | |
| audio=audio_upload, | |
| asr_model="whisper-small", | |
| target_language="es", | |
| voice="dummy", | |
| speed=1.0, | |
| requires_translation=True | |
| ) | |
| result = service.process_audio_pipeline(request) | |
| assert result.success is True | |
| # Verify cleanup occurred when exiting context | |
| final_files = set(os.listdir(temp_base_dir)) | |
| new_files = final_files - initial_files | |
| # Should have minimal new files after context exit | |
| assert len(new_files) <= 1 # Possibly just log file | |
| def test_file_recovery_after_interruption(self, audio_service, temp_base_dir, mock_container): | |
| """Test file recovery mechanisms after processing interruption.""" | |
| # Mock provider to simulate interruption | |
| mock_tts_provider = mock_container.get_tts_provider.return_value | |
| mock_tts_provider.synthesize.side_effect = KeyboardInterrupt("Simulated interruption") | |
| audio_upload = AudioUploadDto( | |
| filename="interruption_test.wav", | |
| content=b"interruption_test_data", | |
| content_type="audio/wav", | |
| size=len(b"interruption_test_data") | |
| ) | |
| request = ProcessingRequestDto( | |
| audio=audio_upload, | |
| asr_model="whisper-small", | |
| target_language="es", | |
| voice="dummy", | |
| speed=1.0, | |
| requires_translation=True | |
| ) | |
| # Process should handle interruption gracefully | |
| with pytest.raises(KeyboardInterrupt): | |
| audio_service.process_audio_pipeline(request) | |
| # Verify cleanup still occurred | |
| # (In real implementation, this would be handled by signal handlers) | |
| def test_file_metadata_preservation(self, audio_service, temp_base_dir): | |
| """Test preservation of file metadata during processing.""" | |
| original_filename = "metadata_test.wav" | |
| original_content = b"metadata_test_data" | |
| audio_upload = AudioUploadDto( | |
| filename=original_filename, | |
| content=original_content, | |
| content_type="audio/wav", | |
| size=len(original_content) | |
| ) | |
| request = ProcessingRequestDto( | |
| audio=audio_upload, | |
| asr_model="whisper-small", | |
| target_language="es", | |
| voice="dummy", | |
| speed=1.0, | |
| requires_translation=True | |
| ) | |
| result = audio_service.process_audio_pipeline(request) | |
| assert result.success is True | |
| assert result.metadata is not None | |
| # Verify original filename is preserved in metadata | |
| correlation_id = result.metadata.get('correlation_id') | |
| assert correlation_id is not None | |
| # Verify output file exists | |
| assert result.audio_path is not None | |
| assert os.path.exists(result.audio_path) |