import os import re import time import random import numpy as np import pandas as pd import math import shutil import base64 # Torch and Audio import torch import torch.nn as nn import torch.optim as optim from torch.utils.data import Dataset, DataLoader import torch.nn.functional as F import torchaudio import librosa import librosa.display # Text and Audio Processing from unidecode import unidecode from inflect import engine import pydub import soundfile as sf # Transformers from transformers import ( WhisperProcessor, WhisperForConditionalGeneration, MarianTokenizer, MarianMTModel, ) # API Server from fastapi import FastAPI, UploadFile, File from fastapi.middleware.cors import CORSMiddleware from fastapi.staticfiles import StaticFiles # <--- ADD THIS IMPORT # Part 2: TTS Model Components (from your notebook) # Hyperparameters class Hyperparams: seed = 42 # We won't use these dataset paths, but keep them for hp object integrity csv_path = "path/to/metadata.csv" wav_path = "path/to/wavs" symbols = [ 'EOS', ' ', '!', ',', '-', '.', ';', '?', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z', 'à', 'â', 'è', 'é', 'ê', 'ü', '’', '“', '”' ] sr = 22050 n_fft = 2048 n_stft = int((n_fft//2) + 1) hop_length = int(n_fft/8.0) win_length = int(n_fft/2.0) mel_freq = 128 max_mel_time = 1024 power = 2.0 text_num_embeddings = 2*len(symbols) embedding_size = 256 encoder_embedding_size = 512 dim_feedforward = 1024 postnet_embedding_size = 1024 encoder_kernel_size = 3 postnet_kernel_size = 5 ampl_multiplier = 10.0 ampl_amin = 1e-10 db_multiplier = 1.0 ampl_ref = 1.0 ampl_power = 1.0 max_db = 100 scale_db = 10 hp = Hyperparams() # Text to Sequence symbol_to_id = {s: i for i, s in enumerate(hp.symbols)} def text_to_seq(text): text = text.lower() seq = [] for s in text: _id = symbol_to_id.get(s, None) if _id is not None: seq.append(_id) seq.append(symbol_to_id["EOS"]) return torch.IntTensor(seq) # Audio Processing spec_transform = torchaudio.transforms.Spectrogram(n_fft=hp.n_fft, win_length=hp.win_length, hop_length=hp.hop_length, power=hp.power) mel_scale_transform = torchaudio.transforms.MelScale(n_mels=hp.mel_freq, sample_rate=hp.sr, n_stft=hp.n_stft) DEVICE = "cuda" if torch.cuda.is_available() else "cpu" mel_inverse_transform = torchaudio.transforms.InverseMelScale(n_mels=hp.mel_freq, sample_rate=hp.sr, n_stft=hp.n_stft).to(DEVICE) griffnlim_transform = torchaudio.transforms.GriffinLim(n_fft=hp.n_fft, win_length=hp.win_length, hop_length=hp.hop_length).to(DEVICE) def pow_to_db_mel_spec(mel_spec): mel_spec = torchaudio.functional.amplitude_to_DB(mel_spec, multiplier=hp.ampl_multiplier, amin=hp.ampl_amin, db_multiplier=hp.db_multiplier, top_db=hp.max_db) mel_spec = mel_spec/hp.scale_db return mel_spec def db_to_power_mel_spec(mel_spec): mel_spec = mel_spec*hp.scale_db mel_spec = torchaudio.functional.DB_to_amplitude(mel_spec, ref=hp.ampl_ref, power=hp.ampl_power) return mel_spec def inverse_mel_spec_to_wav(mel_spec): power_mel_spec = db_to_power_mel_spec(mel_spec.to(DEVICE)) spectrogram = mel_inverse_transform(power_mel_spec) pseudo_wav = griffnlim_transform(spectrogram) return pseudo_wav def mask_from_seq_lengths(sequence_lengths: torch.Tensor, max_length: int) -> torch.BoolTensor: ones = sequence_lengths.new_ones(sequence_lengths.size(0), max_length) range_tensor = ones.cumsum(dim=1) return sequence_lengths.unsqueeze(1) >= range_tensor # --- TransformerTTS Model Architecture (Copied from notebook) class EncoderBlock(nn.Module): def __init__(self): super(EncoderBlock, self).__init__() self.norm_1 = nn.LayerNorm(normalized_shape=hp.embedding_size) self.attn = torch.nn.MultiheadAttention(embed_dim=hp.embedding_size, num_heads=4, dropout=0.1, batch_first=True) self.dropout_1 = torch.nn.Dropout(0.1) self.norm_2 = nn.LayerNorm(normalized_shape=hp.embedding_size) self.linear_1 = nn.Linear(hp.embedding_size, hp.dim_feedforward) self.dropout_2 = torch.nn.Dropout(0.1) self.linear_2 = nn.Linear(hp.dim_feedforward, hp.embedding_size) self.dropout_3 = torch.nn.Dropout(0.1) def forward(self, x, attn_mask=None, key_padding_mask=None): x_out = self.norm_1(x) x_out, _ = self.attn(query=x_out, key=x_out, value=x_out, attn_mask=attn_mask, key_padding_mask=key_padding_mask) x_out = self.dropout_1(x_out) x = x + x_out x_out = self.norm_2(x) x_out = self.linear_1(x_out) x_out = F.relu(x_out) x_out = self.dropout_2(x_out) x_out = self.linear_2(x_out) x_out = self.dropout_3(x_out) x = x + x_out return x class DecoderBlock(nn.Module): def __init__(self): super(DecoderBlock, self).__init__() self.norm_1 = nn.LayerNorm(normalized_shape=hp.embedding_size) self.self_attn = torch.nn.MultiheadAttention(embed_dim=hp.embedding_size, num_heads=4, dropout=0.1, batch_first=True) self.dropout_1 = torch.nn.Dropout(0.1) self.norm_2 = nn.LayerNorm(normalized_shape=hp.embedding_size) self.attn = torch.nn.MultiheadAttention(embed_dim=hp.embedding_size, num_heads=4, dropout=0.1, batch_first=True) self.dropout_2 = torch.nn.Dropout(0.1) self.norm_3 = nn.LayerNorm(normalized_shape=hp.embedding_size) self.linear_1 = nn.Linear(hp.embedding_size, hp.dim_feedforward) self.dropout_3 = torch.nn.Dropout(0.1) self.linear_2 = nn.Linear(hp.dim_feedforward, hp.embedding_size) self.dropout_4 = torch.nn.Dropout(0.1) def forward(self, x, memory, x_attn_mask=None, x_key_padding_mask=None, memory_attn_mask=None, memory_key_padding_mask=None): x_out, _ = self.self_attn(query=x, key=x, value=x, attn_mask=x_attn_mask, key_padding_mask=x_key_padding_mask) x_out = self.dropout_1(x_out) x = self.norm_1(x + x_out) x_out, _ = self.attn(query=x, key=memory, value=memory, attn_mask=memory_attn_mask, key_padding_mask=memory_key_padding_mask) x_out = self.dropout_2(x_out) x = self.norm_2(x + x_out) x_out = self.linear_1(x) x_out = F.relu(x_out) x_out = self.dropout_3(x_out) x_out = self.linear_2(x_out) x_out = self.dropout_4(x_out) x = self.norm_3(x + x_out) return x class EncoderPreNet(nn.Module): def __init__(self): super(EncoderPreNet, self).__init__() self.embedding = nn.Embedding(num_embeddings=hp.text_num_embeddings, embedding_dim=hp.encoder_embedding_size) self.linear_1 = nn.Linear(hp.encoder_embedding_size, hp.encoder_embedding_size) self.linear_2 = nn.Linear(hp.encoder_embedding_size, hp.embedding_size) self.conv_1 = nn.Conv1d(hp.encoder_embedding_size, hp.encoder_embedding_size, kernel_size=hp.encoder_kernel_size, stride=1, padding=int((hp.encoder_kernel_size - 1) / 2), dilation=1) self.bn_1 = nn.BatchNorm1d(hp.encoder_embedding_size) self.dropout_1 = torch.nn.Dropout(0.5) self.conv_2 = nn.Conv1d(hp.encoder_embedding_size, hp.encoder_embedding_size, kernel_size=hp.encoder_kernel_size, stride=1, padding=int((hp.encoder_kernel_size - 1) / 2), dilation=1) self.bn_2 = nn.BatchNorm1d(hp.encoder_embedding_size) self.dropout_2 = torch.nn.Dropout(0.5) self.conv_3 = nn.Conv1d(hp.encoder_embedding_size, hp.encoder_embedding_size, kernel_size=hp.encoder_kernel_size, stride=1, padding=int((hp.encoder_kernel_size - 1) / 2), dilation=1) self.bn_3 = nn.BatchNorm1d(hp.encoder_embedding_size) self.dropout_3 = torch.nn.Dropout(0.5) def forward(self, text): x = self.embedding(text) x = self.linear_1(x) x = x.transpose(2, 1) x = self.conv_1(x) x = self.bn_1(x) x = F.relu(x) x = self.dropout_1(x) x = self.conv_2(x) x = self.bn_2(x) x = F.relu(x) x = self.dropout_2(x) x = self.conv_3(x) x = self.bn_3(x) x = F.relu(x) x = self.dropout_3(x) x = x.transpose(1, 2) x = self.linear_2(x) return x class PostNet(nn.Module): def __init__(self): super(PostNet, self).__init__() self.conv_1 = nn.Conv1d(hp.mel_freq, hp.postnet_embedding_size, kernel_size=hp.postnet_kernel_size, stride=1, padding=int((hp.postnet_kernel_size - 1) / 2), dilation=1) self.bn_1 = nn.BatchNorm1d(hp.postnet_embedding_size) self.dropout_1 = torch.nn.Dropout(0.5) self.conv_2 = nn.Conv1d(hp.postnet_embedding_size, hp.postnet_embedding_size, kernel_size=hp.postnet_kernel_size, stride=1, padding=int((hp.postnet_kernel_size - 1) / 2), dilation=1) self.bn_2 = nn.BatchNorm1d(hp.postnet_embedding_size) self.dropout_2 = torch.nn.Dropout(0.5) self.conv_3 = nn.Conv1d(hp.postnet_embedding_size, hp.postnet_embedding_size, kernel_size=hp.postnet_kernel_size, stride=1, padding=int((hp.postnet_kernel_size - 1) / 2), dilation=1) self.bn_3 = nn.BatchNorm1d(hp.postnet_embedding_size) self.dropout_3 = torch.nn.Dropout(0.5) self.conv_4 = nn.Conv1d(hp.postnet_embedding_size, hp.postnet_embedding_size, kernel_size=hp.postnet_kernel_size, stride=1, padding=int((hp.postnet_kernel_size - 1) / 2), dilation=1) self.bn_4 = nn.BatchNorm1d(hp.postnet_embedding_size) self.dropout_4 = torch.nn.Dropout(0.5) self.conv_5 = nn.Conv1d(hp.postnet_embedding_size, hp.postnet_embedding_size, kernel_size=hp.postnet_kernel_size, stride=1, padding=int((hp.postnet_kernel_size - 1) / 2), dilation=1) self.bn_5 = nn.BatchNorm1d(hp.postnet_embedding_size) self.dropout_5 = torch.nn.Dropout(0.5) self.conv_6 = nn.Conv1d(hp.postnet_embedding_size, hp.mel_freq, kernel_size=hp.postnet_kernel_size, stride=1, padding=int((hp.postnet_kernel_size - 1) / 2), dilation=1) self.bn_6 = nn.BatchNorm1d(hp.mel_freq) self.dropout_6 = torch.nn.Dropout(0.5) def forward(self, x): x = x.transpose(2, 1) x = self.conv_1(x) x = self.bn_1(x); x = torch.tanh(x); x = self.dropout_1(x) x = self.conv_2(x) x = self.bn_2(x); x = torch.tanh(x); x = self.dropout_2(x) x = self.conv_3(x) x = self.bn_3(x); x = torch.tanh(x); x = self.dropout_3(x) x = self.conv_4(x) x = self.bn_4(x); x = torch.tanh(x); x = self.dropout_4(x) x = self.conv_5(x) x = self.bn_5(x); x = torch.tanh(x); x = self.dropout_5(x) x = self.conv_6(x) x = self.bn_6(x); x = self.dropout_6(x) x = x.transpose(1, 2) return x class DecoderPreNet(nn.Module): def __init__(self): super(DecoderPreNet, self).__init__() self.linear_1 = nn.Linear(hp.mel_freq, hp.embedding_size) self.linear_2 = nn.Linear(hp.embedding_size, hp.embedding_size) def forward(self, x): x = self.linear_1(x) x = F.relu(x) x = F.dropout(x, p=0.5, training=True) x = self.linear_2(x) x = F.relu(x) x = F.dropout(x, p=0.5, training=True) return x class TransformerTTS(nn.Module): def __init__(self, device=DEVICE): super(TransformerTTS, self).__init__() self.encoder_prenet = EncoderPreNet() self.decoder_prenet = DecoderPreNet() self.postnet = PostNet() self.pos_encoding = nn.Embedding(num_embeddings=hp.max_mel_time, embedding_dim=hp.embedding_size) self.encoder_block_1 = EncoderBlock() self.encoder_block_2 = EncoderBlock() self.encoder_block_3 = EncoderBlock() self.decoder_block_1 = DecoderBlock() self.decoder_block_2 = DecoderBlock() self.decoder_block_3 = DecoderBlock() self.linear_1 = nn.Linear(hp.embedding_size, hp.mel_freq) self.linear_2 = nn.Linear(hp.embedding_size, 1) self.norm_memory = nn.LayerNorm(normalized_shape=hp.embedding_size) def forward(self, text, text_len, mel, mel_len): N = text.shape[0]; S = text.shape[1]; TIME = mel.shape[1] self.src_key_padding_mask = torch.zeros((N, S), device=text.device).masked_fill(~mask_from_seq_lengths(text_len, max_length=S), float("-inf")) self.src_mask = torch.zeros((S, S), device=text.device).masked_fill(torch.triu(torch.full((S, S), True, dtype=torch.bool), diagonal=1).to(text.device), float("-inf")) self.tgt_key_padding_mask = torch.zeros((N, TIME), device=mel.device).masked_fill(~mask_from_seq_lengths(mel_len, max_length=TIME), float("-inf")) self.tgt_mask = torch.zeros((TIME, TIME), device=mel.device).masked_fill(torch.triu(torch.full((TIME, TIME), True, device=mel.device, dtype=torch.bool), diagonal=1), float("-inf")) self.memory_mask = torch.zeros((TIME, S), device=mel.device).masked_fill(torch.triu(torch.full((TIME, S), True, device=mel.device, dtype=torch.bool), diagonal=1), float("-inf")) text_x = self.encoder_prenet(text) pos_codes = self.pos_encoding(torch.arange(hp.max_mel_time).to(mel.device)) S = text_x.shape[1]; text_x = text_x + pos_codes[:S] text_x = self.encoder_block_1(text_x, attn_mask = self.src_mask, key_padding_mask = self.src_key_padding_mask) text_x = self.encoder_block_2(text_x, attn_mask = self.src_mask, key_padding_mask = self.src_key_padding_mask) text_x = self.encoder_block_3(text_x, attn_mask = self.src_mask, key_padding_mask = self.src_key_padding_mask) text_x = self.norm_memory(text_x) mel_x = self.decoder_prenet(mel); mel_x = mel_x + pos_codes[:TIME] mel_x = self.decoder_block_1(x=mel_x, memory=text_x, x_attn_mask=self.tgt_mask, x_key_padding_mask=self.tgt_key_padding_mask, memory_attn_mask=self.memory_mask, memory_key_padding_mask=self.src_key_padding_mask) mel_x = self.decoder_block_2(x=mel_x, memory=text_x, x_attn_mask=self.tgt_mask, x_key_padding_mask=self.tgt_key_padding_mask, memory_attn_mask=self.memory_mask, memory_key_padding_mask=self.src_key_padding_mask) mel_x = self.decoder_block_3(x=mel_x, memory=text_x, x_attn_mask=self.tgt_mask, x_key_padding_mask=self.tgt_key_padding_mask, memory_attn_mask=self.memory_mask, memory_key_padding_mask=self.src_key_padding_mask) mel_linear = self.linear_1(mel_x) mel_postnet = self.postnet(mel_linear) mel_postnet = mel_linear + mel_postnet stop_token = self.linear_2(mel_x) bool_mel_mask = self.tgt_key_padding_mask.ne(0).unsqueeze(-1).repeat(1, 1, hp.mel_freq) mel_linear = mel_linear.masked_fill(bool_mel_mask, 0) mel_postnet = mel_postnet.masked_fill(bool_mel_mask, 0) stop_token = stop_token.masked_fill(bool_mel_mask[:, :, 0].unsqueeze(-1), 1e3).squeeze(2) return mel_postnet, mel_linear, stop_token @torch.no_grad() def inference(self, text, max_length=800, stop_token_threshold=0.5, with_tqdm=True): self.eval(); self.train(False) text_lengths = torch.tensor(text.shape[1]).unsqueeze(0).to(DEVICE) N = 1 SOS = torch.zeros((N, 1, hp.mel_freq), device=DEVICE) mel_padded = SOS mel_lengths = torch.tensor(1).unsqueeze(0).to(DEVICE) stop_token_outputs = torch.FloatTensor([]).to(text.device) iters = range(max_length) for _ in iters: mel_postnet, mel_linear, stop_token = self(text, text_lengths, mel_padded, mel_lengths) mel_padded = torch.cat([mel_padded, mel_postnet[:, -1:, :]], dim=1) if torch.sigmoid(stop_token[:, -1]) > stop_token_threshold: break else: stop_token_outputs = torch.cat([stop_token_outputs, stop_token[:, -1:]], dim=1) mel_lengths = torch.tensor(mel_padded.shape[1]).unsqueeze(0).to(DEVICE) return mel_postnet, stop_token_outputs # Part 3: Model Loading # IMPORTANT: These paths assume you have placed the downloaded models # into a 'models' subfolder in your project directory. # --------------------------------- # --- Part 3: Model Loading (from Hugging Face Hub) # --------------------------------- # IMPORTANT: Replace "your-username" with your Hugging Face username # and the model names with the ones you created on the Hub. TTS_MODEL_HUB_ID = "MoHamdyy/transformer-tts-ljspeech" ASR_HUB_ID = "MoHamdyy/whisper-stt-model" MARIAN_HUB_ID = "MoHamdyy/marian-ar-en-translation" # Helper function to download the TTS model file from the Hub from huggingface_hub import hf_hub_download DEVICE = "cuda" if torch.cuda.is_available() else "cpu" print("Loading models from Hugging Face Hub to device:", DEVICE) # Load TTS Model from Hub try: print("Loading TTS model...") # Download the .pt file from its repo tts_model_path = hf_hub_download(repo_id=TTS_MODEL_HUB_ID, filename="train_SimpleTransfromerTTS.pt") state = torch.load(tts_model_path, map_location=DEVICE) TTS_MODEL = TransformerTTS().to(DEVICE) # Check for the correct key in the state dictionary if "model" in state: TTS_MODEL.load_state_dict(state["model"]) elif "state_dict" in state: TTS_MODEL.load_state_dict(state["state_dict"]) else: TTS_MODEL.load_state_dict(state) # Assume the whole file is the state_dict TTS_MODEL.eval() print("TTS model loaded successfully.") except Exception as e: print(f"Error loading TTS model: {e}") TTS_MODEL = None # Load STT (Whisper) Model from Hub try: print("Loading STT (Whisper) model...") stt_processor = WhisperProcessor.from_pretrained(ASR_HUB_ID) stt_model = WhisperForConditionalGeneration.from_pretrained(ASR_HUB_ID).to(DEVICE).eval() print("STT model loaded successfully.") except Exception as e: print(f"Error loading STT model: {e}") stt_processor = None stt_model = None # Load TTT (MarianMT) Model from Hub try: print("Loading TTT (MarianMT) model...") mt_tokenizer = MarianTokenizer.from_pretrained(MARIAN_HUB_ID) mt_model = MarianMTModel.from_pretrained(MARIAN_HUB_ID).to(DEVICE).eval() print("TTT model loaded successfully.") except Exception as e: print(f"Error loading TTT model: {e}") mt_tokenizer = None mt_model = None # Part 4: Full Pipeline Function def full_speech_translation_pipeline(audio_input_path: str): print(f"--- PIPELINE START: Processing {audio_input_path} ---") if audio_input_path is None or not os.path.exists(audio_input_path): msg = "Error: Audio file not provided or not found." print(msg) # Return empty/default values return "Error: No file", "", (hp.sr, np.array([]).astype(np.float32)) # STT Stage arabic_transcript = "STT Error: Processing failed." try: print("STT: Loading and resampling audio...") wav, sr = torchaudio.load(audio_input_path) if wav.size(0) > 1: wav = wav.mean(dim=0, keepdim=True) target_sr_stt = stt_processor.feature_extractor.sampling_rate if sr != target_sr_stt: wav = torchaudio.transforms.Resample(sr, target_sr_stt)(wav) audio_array_stt = wav.squeeze().cpu().numpy() print("STT: Extracting features and transcribing...") inputs = stt_processor(audio_array_stt, sampling_rate=target_sr_stt, return_tensors="pt").input_features.to(DEVICE) forced_ids = stt_processor.get_decoder_prompt_ids(language="arabic", task="transcribe") with torch.no_grad(): generated_ids = stt_model.generate(inputs, forced_decoder_ids=forced_ids, max_length=448) arabic_transcript = stt_processor.decode(generated_ids[0], skip_special_tokens=True).strip() print(f"STT Output: {arabic_transcript}") except Exception as e: print(f"STT Error: {e}") # TTT Stage english_translation = "TTT Error: Processing failed." if arabic_transcript and not arabic_transcript.startswith("STT Error"): try: print("TTT: Translating to English...") batch = mt_tokenizer(arabic_transcript, return_tensors="pt", padding=True).to(DEVICE) with torch.no_grad(): translated_ids = mt_model.generate(**batch, max_length=512) english_translation = mt_tokenizer.batch_decode(translated_ids, skip_special_tokens=True)[0].strip() print(f"TTT Output: {english_translation}") except Exception as e: print(f"TTT Error: {e}") else: english_translation = "(Skipped TTT due to STT failure)" print(english_translation) # TTS Stage synthesized_audio_np = np.array([]).astype(np.float32) if english_translation and not english_translation.startswith("TTT Error"): try: print("TTS: Synthesizing English speech...") sequence = text_to_seq(english_translation).unsqueeze(0).to(DEVICE) generated_mel, _ = TTS_MODEL.inference(sequence, max_length=hp.max_mel_time-20, stop_token_threshold=0.5, with_tqdm=False) print(f"TTS: Generated mel shape: {generated_mel.shape if generated_mel is not None else 'None'}") if generated_mel is not None and generated_mel.numel() > 0: mel_for_vocoder = generated_mel.detach().squeeze(0).transpose(0, 1) audio_tensor = inverse_mel_spec_to_wav(mel_for_vocoder) synthesized_audio_np = audio_tensor.cpu().numpy() print(f"TTS: Synthesized audio shape: {synthesized_audio_np.shape}") except Exception as e: print(f"TTS Error: {e}") print(f"--- PIPELINE END ---") return arabic_transcript, english_translation, (hp.sr, synthesized_audio_np) # Part 5: FastAPI Application app = FastAPI() # Allow Cross-Origin Resource Sharing (CORS) for your frontend app.add_middleware( CORSMiddleware, allow_origins=["*"], # Allows all origins allow_credentials=True, allow_methods=["*"], # Allows all methods allow_headers=["*"], # Allows all headers ) @app.post("/process-speech/") async def create_upload_file(file: UploadFile = File(...)): # Save the uploaded file temporarily temp_path = f"/tmp/{file.filename}" with open(temp_path, "wb") as buffer: shutil.copyfileobj(file.file, buffer) # Run the full pipeline arabic, english, (sr, audio_np) = full_speech_translation_pipeline(temp_path) # Prepare the audio to be sent back as base64 audio_base64 = "" if audio_np.size > 0: temp_wav_path = "/tmp/output.wav" sf.write(temp_wav_path, audio_np, sr) with open(temp_wav_path, "rb") as wav_file: audio_bytes = wav_file.read() audio_base64 = base64.b64encode(audio_bytes).decode('utf-8') # Return all results in a single JSON response return { "arabic_transcript": arabic, "english_translation": english, "audio_data": { "sample_rate": sr, "base64": audio_base64 } } app.mount("/", StaticFiles(directory="static", html=True), name="static")