Michael Hu
Update application entry point and cleanup
19fd91c
raw
history blame
12.1 kB
"""
Main entry point for the Audio Translation Web Application
Handles file upload, processing pipeline, and UI rendering using DDD architecture
"""
import logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler("app.log"),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
import streamlit as st
import os
from typing import Optional
# Import application services and DTOs
from src.application.services.audio_processing_service import AudioProcessingApplicationService
from src.application.services.configuration_service import ConfigurationApplicationService
from src.application.dtos.audio_upload_dto import AudioUploadDto
from src.application.dtos.processing_request_dto import ProcessingRequestDto
from src.application.dtos.processing_result_dto import ProcessingResultDto
# Import infrastructure setup
from src.infrastructure.config.container_setup import initialize_global_container, get_global_container
# Initialize environment configurations
os.makedirs("temp/uploads", exist_ok=True)
os.makedirs("temp/outputs", exist_ok=True)
def configure_page():
"""Set up Streamlit page configuration"""
logger.info("Configuring Streamlit page")
st.set_page_config(
page_title="Audio Translator",
page_icon="🎧",
layout="wide",
initial_sidebar_state="expanded"
)
st.markdown("""
<style>
.reportview-container {margin-top: -2em;}
#MainMenu {visibility: hidden;}
.stDeployButton {display:none;}
.stAlert {padding: 20px !important;}
</style>
""", unsafe_allow_html=True)
def create_audio_upload_dto(uploaded_file) -> AudioUploadDto:
"""
Create AudioUploadDto from Streamlit uploaded file.
Args:
uploaded_file: Streamlit UploadedFile object
Returns:
AudioUploadDto: DTO containing upload information
"""
try:
content = uploaded_file.getbuffer().tobytes()
# Determine content type based on file extension
file_ext = os.path.splitext(uploaded_file.name.lower())[1]
content_type_map = {
'.wav': 'audio/wav',
'.mp3': 'audio/mpeg',
'.m4a': 'audio/mp4',
'.flac': 'audio/flac',
'.ogg': 'audio/ogg'
}
content_type = content_type_map.get(file_ext, 'audio/wav')
return AudioUploadDto(
filename=uploaded_file.name,
content=content,
content_type=content_type,
size=len(content)
)
except Exception as e:
logger.error(f"Failed to create AudioUploadDto: {e}")
raise ValueError(f"Invalid audio file: {str(e)}")
def handle_file_processing(
audio_upload: AudioUploadDto,
asr_model: str,
target_language: str,
voice: str,
speed: float,
source_language: Optional[str] = None
) -> ProcessingResultDto:
"""
Execute the complete processing pipeline using application services.
Args:
audio_upload: Audio upload DTO
asr_model: ASR model to use
target_language: Target language for translation
voice: Voice for TTS
speed: Speech speed
source_language: Source language (optional)
Returns:
ProcessingResultDto: Processing result
"""
logger.info(f"Starting processing for: {audio_upload.filename} using {asr_model} model")
progress_bar = st.progress(0)
status_text = st.empty()
try:
# Get application service from container
container = get_global_container()
audio_service = container.resolve(AudioProcessingApplicationService)
# Create processing request
request = ProcessingRequestDto(
audio=audio_upload,
asr_model=asr_model,
target_language=target_language,
voice=voice,
speed=speed,
source_language=source_language
)
# Update progress and status
status_text.markdown("πŸ” **Performing Speech Recognition...**")
progress_bar.progress(10)
# Process through application service
with st.spinner("Processing audio pipeline..."):
result = audio_service.process_audio_pipeline(request)
if result.success:
progress_bar.progress(100)
status_text.success("βœ… Processing Complete!")
logger.info(f"Processing completed successfully in {result.processing_time:.2f}s")
else:
status_text.error(f"❌ Processing Failed: {result.error_message}")
logger.error(f"Processing failed: {result.error_message}")
return result
except Exception as e:
logger.error(f"Processing failed: {str(e)}", exc_info=True)
status_text.error(f"❌ Processing Failed: {str(e)}")
st.exception(e)
# Return error result
return ProcessingResultDto.error_result(
error_message=str(e),
error_code='SYSTEM_ERROR'
)
def render_results(result: ProcessingResultDto):
"""
Display processing results using ProcessingResultDto.
Args:
result: Processing result DTO
"""
logger.info("Rendering results")
st.divider()
if not result.success:
st.error(f"Processing failed: {result.error_message}")
if result.error_code:
st.code(f"Error Code: {result.error_code}")
return
col1, col2 = st.columns([2, 1])
with col1:
# Display original text if available
if result.original_text:
st.subheader("Recognition Results")
st.code(result.original_text, language="text")
# Display translated text if available
if result.translated_text:
st.subheader("Translation Results")
st.code(result.translated_text, language="text")
# Display processing metadata
if result.metadata:
with st.expander("Processing Details"):
st.json(result.metadata)
with col2:
# Display audio output if available
if result.has_audio_output and result.audio_path:
st.subheader("Audio Output")
# Check if file exists and is accessible
if os.path.exists(result.audio_path):
# Standard audio player
st.audio(result.audio_path)
# Download button
try:
with open(result.audio_path, "rb") as f:
st.download_button(
label="Download Audio",
data=f,
file_name="translated_audio.wav",
mime="audio/wav"
)
except Exception as e:
st.warning(f"Download not available: {str(e)}")
else:
st.warning("Audio file not found or not accessible")
# Display processing time
st.metric("Processing Time", f"{result.processing_time:.2f}s")
def get_supported_configurations() -> dict:
"""
Get supported configurations from application service.
Returns:
dict: Supported configurations
"""
try:
container = get_global_container()
audio_service = container.resolve(AudioProcessingApplicationService)
return audio_service.get_supported_configurations()
except Exception as e:
logger.warning(f"Failed to get configurations: {e}")
# Return fallback configurations
return {
'asr_models': ['whisper-small', 'parakeet'],
'voices': ['kokoro', 'dia', 'cosyvoice2', 'dummy'],
'languages': ['en', 'zh', 'es', 'fr', 'de'],
'audio_formats': ['wav', 'mp3'],
'max_file_size_mb': 100,
'speed_range': {'min': 0.5, 'max': 2.0}
}
def initialize_session_state():
"""Initialize session state variables"""
if 'processing_result' not in st.session_state:
st.session_state.processing_result = None
if 'container_initialized' not in st.session_state:
st.session_state.container_initialized = False
def initialize_application():
"""Initialize the application with dependency injection container"""
if not st.session_state.container_initialized:
try:
logger.info("Initializing application container")
initialize_global_container()
st.session_state.container_initialized = True
logger.info("Application container initialized successfully")
except Exception as e:
logger.error(f"Failed to initialize application: {e}")
st.error(f"Application initialization failed: {str(e)}")
st.stop()
def main():
"""Main application workflow"""
logger.info("Starting application")
# Initialize application
initialize_application()
# Configure page
configure_page()
initialize_session_state()
st.title("🎧 High-Quality Audio Translation System")
st.markdown("Upload English Audio β†’ Get Chinese Speech Output")
# Get supported configurations
config = get_supported_configurations()
# Voice selection in sidebar
st.sidebar.header("TTS Settings")
# Map voice display names to internal IDs
voice_options = {
"Kokoro": "kokoro",
"Dia": "dia",
"CosyVoice2": "cosyvoice2",
"Dummy (Test)": "dummy"
}
selected_voice_display = st.sidebar.selectbox(
"Select Voice",
list(voice_options.keys()),
index=0
)
selected_voice = voice_options[selected_voice_display]
speed = st.sidebar.slider(
"Speech Speed",
config['speed_range']['min'],
config['speed_range']['max'],
1.0,
0.1
)
# Model selection
asr_model = st.selectbox(
"Select Speech Recognition Model",
options=config['asr_models'],
index=0,
help="Choose the ASR model for speech recognition"
)
# Language selection
language_options = {
"Chinese (Mandarin)": "zh",
"Spanish": "es",
"French": "fr",
"German": "de",
"English": "en"
}
selected_language_display = st.selectbox(
"Target Language",
list(language_options.keys()),
index=0,
help="Select the target language for translation"
)
target_language = language_options[selected_language_display]
# File upload
uploaded_file = st.file_uploader(
f"Select Audio File ({', '.join(config['audio_formats']).upper()})",
type=config['audio_formats'],
accept_multiple_files=False,
help=f"Maximum file size: {config['max_file_size_mb']}MB"
)
if uploaded_file:
logger.info(f"File uploaded: {uploaded_file.name}")
try:
# Create audio upload DTO
audio_upload = create_audio_upload_dto(uploaded_file)
# Display file information
st.info(f"πŸ“ **File:** {audio_upload.filename} ({audio_upload.size / 1024:.1f} KB)")
# Process button
if st.button("πŸš€ Process Audio", type="primary"):
# Process the audio
result = handle_file_processing(
audio_upload=audio_upload,
asr_model=asr_model,
target_language=target_language,
voice=selected_voice,
speed=speed,
source_language="en" # Assume English source for now
)
# Store result in session state
st.session_state.processing_result = result
# Display results if available
if st.session_state.processing_result:
render_results(st.session_state.processing_result)
except Exception as e:
st.error(f"Error processing file: {str(e)}")
logger.error(f"File processing error: {e}")
if __name__ == "__main__":
main()