Spaces:
Sleeping
Sleeping
import os | |
import numpy as np | |
import gradio as gr | |
import requests | |
from genai_chat_ai import AI,create_chat_session | |
import torch | |
from typing import Any, Callable, Optional, Tuple, Union,Iterator | |
import numpy as np | |
import torch.nn as nn # Import the missing module | |
import noisereduce as nr | |
def remove_noise_nr(audio_data,sr=16000): | |
"""يزيل الضوضاء باستخدام مكتبة noisereduce.""" | |
reduced_noise = nr.reduce_noise(y=audio_data, sr=sr) | |
return reduced_noise | |
def _inference_forward_stream( | |
self, | |
input_ids: Optional[torch.Tensor] = None, | |
attention_mask: Optional[torch.Tensor] = None, | |
speaker_embeddings: Optional[torch.Tensor] = None, | |
output_attentions: Optional[bool] = None, | |
output_hidden_states: Optional[bool] = None, | |
return_dict: Optional[bool] = None, | |
padding_mask: Optional[torch.Tensor] = None, | |
chunk_size: int = 32, # Chunk size for streaming output | |
) -> Iterator[torch.Tensor]: | |
"""Generates speech waveforms in a streaming fashion.""" | |
if attention_mask is not None: | |
padding_mask = attention_mask.unsqueeze(-1).float() | |
else: | |
padding_mask = torch.ones_like(input_ids).unsqueeze(-1).float() | |
text_encoder_output = self.text_encoder( | |
input_ids=input_ids, | |
padding_mask=padding_mask, | |
attention_mask=attention_mask, | |
output_attentions=output_attentions, | |
output_hidden_states=output_hidden_states, | |
return_dict=return_dict, | |
) | |
hidden_states = text_encoder_output[0] if not return_dict else text_encoder_output.last_hidden_state | |
hidden_states = hidden_states.transpose(1, 2) | |
input_padding_mask = padding_mask.transpose(1, 2) | |
prior_means = text_encoder_output[1] if not return_dict else text_encoder_output.prior_means | |
prior_log_variances = text_encoder_output[2] if not return_dict else text_encoder_output.prior_log_variances | |
if self.config.use_stochastic_duration_prediction: | |
log_duration = self.duration_predictor( | |
hidden_states, | |
input_padding_mask, | |
speaker_embeddings, | |
reverse=True, | |
noise_scale=self.noise_scale_duration, | |
) | |
else: | |
log_duration = self.duration_predictor(hidden_states, input_padding_mask, speaker_embeddings) | |
length_scale = 1.0 / self.speaking_rate | |
duration = torch.ceil(torch.exp(log_duration) * input_padding_mask * length_scale) | |
predicted_lengths = torch.clamp_min(torch.sum(duration, [1, 2]), 1).long() | |
# Create a padding mask for the output lengths of shape (batch, 1, max_output_length) | |
indices = torch.arange(predicted_lengths.max(), dtype=predicted_lengths.dtype, device=predicted_lengths.device) | |
output_padding_mask = indices.unsqueeze(0) < predicted_lengths.unsqueeze(1) | |
output_padding_mask = output_padding_mask.unsqueeze(1).to(input_padding_mask.dtype) | |
# Reconstruct an attention tensor of shape (batch, 1, out_length, in_length) | |
attn_mask = torch.unsqueeze(input_padding_mask, 2) * torch.unsqueeze(output_padding_mask, -1) | |
batch_size, _, output_length, input_length = attn_mask.shape | |
cum_duration = torch.cumsum(duration, -1).view(batch_size * input_length, 1) | |
indices = torch.arange(output_length, dtype=duration.dtype, device=duration.device) | |
valid_indices = indices.unsqueeze(0) < cum_duration | |
valid_indices = valid_indices.to(attn_mask.dtype).view(batch_size, input_length, output_length) | |
padded_indices = valid_indices - nn.functional.pad(valid_indices, [0, 0, 1, 0, 0, 0])[:, :-1] | |
attn = padded_indices.unsqueeze(1).transpose(2, 3) * attn_mask | |
# Expand prior distribution | |
prior_means = torch.matmul(attn.squeeze(1), prior_means).transpose(1, 2) | |
prior_log_variances = torch.matmul(attn.squeeze(1), prior_log_variances).transpose(1, 2) | |
prior_latents = prior_means + torch.randn_like(prior_means) * torch.exp(prior_log_variances) * self.noise_scale | |
latents = self.flow(prior_latents, output_padding_mask, speaker_embeddings, reverse=True) | |
spectrogram = latents * output_padding_mask | |
for i in range(0, spectrogram.size(-1), chunk_size): | |
with torch.no_grad(): | |
wav=self.decoder(spectrogram[:,:,i : i + chunk_size] ,speaker_embeddings) | |
yield wav.squeeze().cpu().numpy() | |
api_key = os.environ.get("Id_mode_vits") | |
headers = {"Authorization": f"Bearer {api_key}"} | |
from transformers import AutoTokenizer,VitsModel | |
import torch | |
models= {} | |
tokenizer = AutoTokenizer.from_pretrained("asg2024/vits-ar-sa-huba",token=api_key) | |
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
def get_model(name_model): | |
global models | |
if name_model in models: | |
return models[name_model] | |
models[name_model]=VitsModel.from_pretrained(name_model,token=api_key).to(device) | |
return models[name_model] | |
def genrate_speech(text,name_model): | |
inputs=tokenizer(text,return_tensors="pt") | |
model=get_model(name_model) | |
with torch.no_grad(): | |
wav=model( | |
input_ids= inputs.input_ids.to(device), | |
attention_mask=inputs.attention_mask.to(device), | |
speaker_id=0 | |
).waveform.cpu().numpy().reshape(-1) | |
return model.config.sampling_rate,wav | |
def generate_audio(text,name_model,speaker_id=None): | |
inputs = tokenizer(text, return_tensors="pt")#.input_ids | |
speaker_embeddings = None | |
model=get_model(name_model) | |
#torch.cuda.empty_cache() | |
with torch.no_grad(): | |
for chunk in _inference_forward_stream(model,input_ids=inputs.input_ids,attention_mask=inputs.attention_mask,speaker_embeddings= speaker_embeddings,chunk_size=256): | |
yield 16000,chunk#.squeeze().cpu().numpy()#.astype(np.int16).tobytes() | |
def generate_audio_ai(text,name_model): | |
text_answer = get_answer_ai(text) | |
text_answer = remove_extra_spaces(text_answer) | |
inputs = tokenizer(text_answer, return_tensors="pt")#.input_ids | |
speaker_embeddings = None | |
model=get_model(name_model) | |
#torch.cuda.empty_cache() | |
with torch.no_grad(): | |
for chunk in _inference_forward_stream(model,input_ids=inputs.input_ids,attention_mask=inputs.attention_mask,speaker_embeddings= speaker_embeddings,chunk_size=256): | |
yield 16000,remove_noise_nr(chunk)#.cpu().numpy().squeeze()#.astype(np.int16).tobytes() | |
# yield generate_audio(text_answer,name_model) | |
def remove_extra_spaces(text): | |
""" | |
Removes extra spaces between words in a string. | |
Args: | |
text: The string to process. | |
Returns: | |
The string with extra spaces removed. | |
""" | |
return ' '.join(text.split()) | |
def query(text,API_URL): | |
payload={"inputs": text} | |
response = requests.post(API_URL, headers=headers, json=payload) | |
return response.content | |
def get_answer_ai(text): | |
global AI | |
try: | |
response = AI.send_message(text) | |
return response.text | |
except : | |
AI=create_chat_session() | |
response = AI.send_message(text) | |
return response.text | |
chat_history = [] # متغير لتخزين سجل المحادثة | |
def chatbot_fn(input_text, input_audio): | |
global chat_history | |
if input_text: | |
chat_history.append((input_text, None)) # إضافة رسالة المستخدم | |
response_text = get_answer_ai(input_text) | |
response_audio = genrate_speech(response_text,'asg2024/vits-ar-sa-huba') | |
elif input_audio: | |
pass | |
# chat_history.append((None, input_audio)) # إضافة رسالة صوتية للمستخدم | |
# input_text = convert_speech_to_text(input_audio) | |
# response_text = model.generate_response(input_text, chat_history) | |
# response_audio = convert_text_to_speech(response_text) | |
chat_history.append((None, response_audio)) # إضافة رد البوت | |
return chat_history | |
with gr.Blocks() as demo: # Use gr.Blocks to wrap the entire interface | |
with gr.Tab("ChatBot "): | |
chatbot = gr.Chatbot(label="محادثة") | |
with gr.Row(): | |
txt = gr.Textbox(label="أدخل رسالتك") | |
audio = gr.Audio(sources="microphone", type="filepath") | |
txt.change(chatbot_fn, [txt, audio], chatbot) | |
audio.change(chatbot_fn, [txt, audio], chatbot) | |
with gr.Tab("Chat AI "): | |
gr.Markdown("## AI: محادثة صوتية بالذكاء الاصطناعي باللهجة السعودية") | |
with gr.Row(): # Arrange input/output components side-by-side | |
with gr.Column(): | |
text_input = gr.Textbox(label="أدخل أي نص") | |
with gr.Column(): | |
model_choices = gr.Dropdown( | |
choices=[ | |
"asg2024/vits-ar-sa", | |
"asg2024/vits-ar-sa-huba", | |
"asg2024/vits-ar-sa-ms", | |
"asg2024/vits-ar-sa-magd", | |
"asg2024/vits-ar-sa-fahd", | |
], | |
label="اختر النموذج", | |
value="asg2024/vits-ar-sa-huba", | |
) | |
with gr.Row(): | |
btn = gr.Button("إرسال") | |
btn_ai_only = gr.Button("توليد رد الذكاء الاصطناعي فقط") | |
with gr.Row(): | |
user_audio = gr.Audio(label="صوت المدخل") | |
ai_audio = gr.Audio(label="رد AI الصوتي") | |
ai_text = gr.Textbox(label="رد AI النصي") | |
ai_audio2 = gr.Audio(label="2رد AI الصوتي",streaming=True) | |
# Use a single button to trigger both functionalities | |
def process_audio(text, model_choice, generate_user_audio=True): | |
API_URL = f"https://api-inference.huggingface.co/models/{model_choice}" | |
text_answer = get_answer_ai(text) | |
text_answer = remove_extra_spaces(text_answer) | |
data_ai = genrate_speech(text_answer,model_choice)#query(text_answer, API_URL) | |
if generate_user_audio: # Generate user audio if needed | |
data_user =genrate_speech(text,model_choice)# query(text, API_URL) | |
return data_user, data_ai, text_answer | |
else: | |
return data_ai # Return None for user_audio | |
btn.click( | |
process_audio, # Call the combined function | |
inputs=[text_input, model_choices], | |
outputs=[user_audio, ai_audio, ai_text], | |
) | |
# Additional button to generate only AI audio | |
btn_ai_only.click( | |
generate_audio_ai, | |
inputs=[text_input, model_choices], | |
outputs=[ai_audio2], | |
) | |
with gr.Tab("Live "): | |
gr.Markdown("## VITS: تحويل النص إلى كلام") | |
with gr.Row(): | |
speaker_id_input = gr.Number(label="معرّف المتحدث (اختياري)", interactive=True) | |
with gr.Column(): | |
model_choices2 = gr.Dropdown( | |
choices=[ | |
"asg2024/vits-ar-sa", | |
"asg2024/vits-ar-sa-huba", | |
"asg2024/vits-ar-sa-ms", | |
"asg2024/vits-ar-sa-magd", | |
"asg2024/vits-ar-sa-fahd", | |
], | |
label="اختر النموذج", | |
value="asg2024/vits-ar-sa-huba", | |
) | |
text_input = gr.Textbox(label="أدخل النص هنا") | |
generate_button = gr.Button("توليد وتشغيل الصوت") | |
audio_player = gr.Audio(label="أ audio",streaming=True) | |
# Update the event binding | |
generate_button.click(generate_audio, inputs=[text_input,model_choices2], outputs=audio_player) | |
if __name__ == "__main__": | |
demo.launch() | |