Spaces:
Build error
Build error
""" | |
Main entry point for the Audio Translation Web Application | |
Handles file upload, processing pipeline, and UI rendering using DDD architecture | |
""" | |
import logging | |
logging.basicConfig( | |
level=logging.INFO, | |
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', | |
handlers=[ | |
logging.FileHandler("app.log"), | |
logging.StreamHandler() | |
] | |
) | |
logger = logging.getLogger(__name__) | |
import streamlit as st | |
import os | |
from typing import Optional | |
# Import application services and DTOs | |
from src.application.services.audio_processing_service import AudioProcessingApplicationService | |
from src.application.services.configuration_service import ConfigurationApplicationService | |
from src.application.dtos.audio_upload_dto import AudioUploadDto | |
from src.application.dtos.processing_request_dto import ProcessingRequestDto | |
from src.application.dtos.processing_result_dto import ProcessingResultDto | |
# Import infrastructure setup | |
from src.infrastructure.config.container_setup import initialize_global_container, get_global_container | |
# Initialize environment configurations | |
os.makedirs("temp/uploads", exist_ok=True) | |
os.makedirs("temp/outputs", exist_ok=True) | |
def configure_page(): | |
"""Set up Streamlit page configuration""" | |
logger.info("Configuring Streamlit page") | |
st.set_page_config( | |
page_title="Audio Translator", | |
page_icon="π§", | |
layout="wide", | |
initial_sidebar_state="expanded" | |
) | |
st.markdown(""" | |
<style> | |
.reportview-container {margin-top: -2em;} | |
#MainMenu {visibility: hidden;} | |
.stDeployButton {display:none;} | |
.stAlert {padding: 20px !important;} | |
</style> | |
""", unsafe_allow_html=True) | |
def create_audio_upload_dto(uploaded_file) -> AudioUploadDto: | |
""" | |
Create AudioUploadDto from Streamlit uploaded file. | |
Args: | |
uploaded_file: Streamlit UploadedFile object | |
Returns: | |
AudioUploadDto: DTO containing upload information | |
""" | |
try: | |
content = uploaded_file.getbuffer().tobytes() | |
# Determine content type based on file extension | |
file_ext = os.path.splitext(uploaded_file.name.lower())[1] | |
content_type_map = { | |
'.wav': 'audio/wav', | |
'.mp3': 'audio/mpeg', | |
'.m4a': 'audio/mp4', | |
'.flac': 'audio/flac', | |
'.ogg': 'audio/ogg' | |
} | |
content_type = content_type_map.get(file_ext, 'audio/wav') | |
return AudioUploadDto( | |
filename=uploaded_file.name, | |
content=content, | |
content_type=content_type, | |
size=len(content) | |
) | |
except Exception as e: | |
logger.error(f"Failed to create AudioUploadDto: {e}") | |
raise ValueError(f"Invalid audio file: {str(e)}") | |
def handle_file_processing( | |
audio_upload: AudioUploadDto, | |
asr_model: str, | |
target_language: str, | |
voice: str, | |
speed: float, | |
source_language: Optional[str] = None | |
) -> ProcessingResultDto: | |
""" | |
Execute the complete processing pipeline using application services. | |
Args: | |
audio_upload: Audio upload DTO | |
asr_model: ASR model to use | |
target_language: Target language for translation | |
voice: Voice for TTS | |
speed: Speech speed | |
source_language: Source language (optional) | |
Returns: | |
ProcessingResultDto: Processing result | |
""" | |
logger.info(f"Starting processing for: {audio_upload.filename} using {asr_model} model") | |
progress_bar = st.progress(0) | |
status_text = st.empty() | |
try: | |
# Get application service from container | |
container = get_global_container() | |
audio_service = container.resolve(AudioProcessingApplicationService) | |
# Create processing request | |
request = ProcessingRequestDto( | |
audio=audio_upload, | |
asr_model=asr_model, | |
target_language=target_language, | |
voice=voice, | |
speed=speed, | |
source_language=source_language | |
) | |
# Update progress and status | |
status_text.markdown("π **Performing Speech Recognition...**") | |
progress_bar.progress(10) | |
# Process through application service | |
with st.spinner("Processing audio pipeline..."): | |
result = audio_service.process_audio_pipeline(request) | |
if result.success: | |
progress_bar.progress(100) | |
status_text.success("β Processing Complete!") | |
logger.info(f"Processing completed successfully in {result.processing_time:.2f}s") | |
else: | |
status_text.error(f"β Processing Failed: {result.error_message}") | |
logger.error(f"Processing failed: {result.error_message}") | |
return result | |
except Exception as e: | |
logger.error(f"Processing failed: {str(e)}", exc_info=True) | |
status_text.error(f"β Processing Failed: {str(e)}") | |
st.exception(e) | |
# Return error result | |
return ProcessingResultDto.error_result( | |
error_message=str(e), | |
error_code='SYSTEM_ERROR' | |
) | |
def render_results(result: ProcessingResultDto): | |
""" | |
Display processing results using ProcessingResultDto. | |
Args: | |
result: Processing result DTO | |
""" | |
logger.info("Rendering results") | |
st.divider() | |
if not result.success: | |
st.error(f"Processing failed: {result.error_message}") | |
if result.error_code: | |
st.code(f"Error Code: {result.error_code}") | |
return | |
col1, col2 = st.columns([2, 1]) | |
with col1: | |
# Display original text if available | |
if result.original_text: | |
st.subheader("Recognition Results") | |
st.code(result.original_text, language="text") | |
# Display translated text if available | |
if result.translated_text: | |
st.subheader("Translation Results") | |
st.code(result.translated_text, language="text") | |
# Display processing metadata | |
if result.metadata: | |
with st.expander("Processing Details"): | |
st.json(result.metadata) | |
with col2: | |
# Display audio output if available | |
if result.has_audio_output and result.audio_path: | |
st.subheader("Audio Output") | |
# Check if file exists and is accessible | |
if os.path.exists(result.audio_path): | |
# Standard audio player | |
st.audio(result.audio_path) | |
# Download button | |
try: | |
with open(result.audio_path, "rb") as f: | |
st.download_button( | |
label="Download Audio", | |
data=f, | |
file_name="translated_audio.wav", | |
mime="audio/wav" | |
) | |
except Exception as e: | |
st.warning(f"Download not available: {str(e)}") | |
else: | |
st.warning("Audio file not found or not accessible") | |
# Display processing time | |
st.metric("Processing Time", f"{result.processing_time:.2f}s") | |
def get_supported_configurations() -> dict: | |
""" | |
Get supported configurations from application service. | |
Returns: | |
dict: Supported configurations | |
""" | |
try: | |
container = get_global_container() | |
audio_service = container.resolve(AudioProcessingApplicationService) | |
return audio_service.get_supported_configurations() | |
except Exception as e: | |
logger.warning(f"Failed to get configurations: {e}") | |
# Return fallback configurations | |
return { | |
'asr_models': ['whisper-small', 'parakeet'], | |
'voices': ['kokoro', 'dia', 'cosyvoice2', 'dummy'], | |
'languages': ['en', 'zh', 'es', 'fr', 'de'], | |
'audio_formats': ['wav', 'mp3'], | |
'max_file_size_mb': 100, | |
'speed_range': {'min': 0.5, 'max': 2.0} | |
} | |
def initialize_session_state(): | |
"""Initialize session state variables""" | |
if 'processing_result' not in st.session_state: | |
st.session_state.processing_result = None | |
if 'container_initialized' not in st.session_state: | |
st.session_state.container_initialized = False | |
def initialize_application(): | |
"""Initialize the application with dependency injection container""" | |
if not st.session_state.container_initialized: | |
try: | |
logger.info("Initializing application container") | |
initialize_global_container() | |
st.session_state.container_initialized = True | |
logger.info("Application container initialized successfully") | |
except Exception as e: | |
logger.error(f"Failed to initialize application: {e}") | |
st.error(f"Application initialization failed: {str(e)}") | |
st.stop() | |
def main(): | |
"""Main application workflow""" | |
logger.info("Starting application") | |
# Initialize application | |
initialize_application() | |
# Configure page | |
configure_page() | |
initialize_session_state() | |
st.title("π§ High-Quality Audio Translation System") | |
st.markdown("Upload English Audio β Get Chinese Speech Output") | |
# Get supported configurations | |
config = get_supported_configurations() | |
# Voice selection in sidebar | |
st.sidebar.header("TTS Settings") | |
# Map voice display names to internal IDs | |
voice_options = { | |
"Kokoro": "kokoro", | |
"Dia": "dia", | |
"CosyVoice2": "cosyvoice2", | |
"Dummy (Test)": "dummy" | |
} | |
selected_voice_display = st.sidebar.selectbox( | |
"Select Voice", | |
list(voice_options.keys()), | |
index=0 | |
) | |
selected_voice = voice_options[selected_voice_display] | |
speed = st.sidebar.slider( | |
"Speech Speed", | |
config['speed_range']['min'], | |
config['speed_range']['max'], | |
1.0, | |
0.1 | |
) | |
# Model selection | |
asr_model = st.selectbox( | |
"Select Speech Recognition Model", | |
options=config['asr_models'], | |
index=0, | |
help="Choose the ASR model for speech recognition" | |
) | |
# Language selection | |
language_options = { | |
"Chinese (Mandarin)": "zh", | |
"Spanish": "es", | |
"French": "fr", | |
"German": "de", | |
"English": "en" | |
} | |
selected_language_display = st.selectbox( | |
"Target Language", | |
list(language_options.keys()), | |
index=0, | |
help="Select the target language for translation" | |
) | |
target_language = language_options[selected_language_display] | |
# File upload | |
uploaded_file = st.file_uploader( | |
f"Select Audio File ({', '.join(config['audio_formats']).upper()})", | |
type=config['audio_formats'], | |
accept_multiple_files=False, | |
help=f"Maximum file size: {config['max_file_size_mb']}MB" | |
) | |
if uploaded_file: | |
logger.info(f"File uploaded: {uploaded_file.name}") | |
try: | |
# Create audio upload DTO | |
audio_upload = create_audio_upload_dto(uploaded_file) | |
# Display file information | |
st.info(f"π **File:** {audio_upload.filename} ({audio_upload.size / 1024:.1f} KB)") | |
# Process button | |
if st.button("π Process Audio", type="primary"): | |
# Process the audio | |
result = handle_file_processing( | |
audio_upload=audio_upload, | |
asr_model=asr_model, | |
target_language=target_language, | |
voice=selected_voice, | |
speed=speed, | |
source_language="en" # Assume English source for now | |
) | |
# Store result in session state | |
st.session_state.processing_result = result | |
# Display results if available | |
if st.session_state.processing_result: | |
render_results(st.session_state.processing_result) | |
except Exception as e: | |
st.error(f"Error processing file: {str(e)}") | |
logger.error(f"File processing error: {e}") | |
if __name__ == "__main__": | |
main() |