Spaces:
Sleeping
Sleeping
import io | |
import os | |
from contextlib import closing | |
import boto3 | |
import gradio as gr | |
import requests | |
from config.config import TALKING_HEAD_WIDTH, LOOPING_TALKING_HEAD_VIDEO_PATH | |
from utilities.audio import AZURE_VOICE_DATA, POLLY_VOICE_DATA | |
from utilities.polly_utils import NEURAL_ENGINE | |
def create_html_video(file_name, width): | |
tmp_file = gr.File(LOOPING_TALKING_HEAD_VIDEO_PATH, visible=False) | |
temp_file_url = "/file=" + tmp_file.value['name'] | |
html_video = f'<video width={width} height={width} autoplay muted loop><source src={temp_file_url} type="video/mp4" poster="Masahiro.png"></video>' | |
return html_video | |
def update_talking_head(widget, state): | |
if widget: | |
state = widget | |
video_html_talking_head = create_html_video(LOOPING_TALKING_HEAD_VIDEO_PATH, TALKING_HEAD_WIDTH) | |
return state, video_html_talking_head | |
else: | |
# return state, create_html_video(LOOPING_TALKING_HEAD, "32") | |
return None, "<pre></pre>" | |
def do_html_audio_speak(words_to_speak, polly_language): | |
polly_client = boto3.Session( | |
aws_access_key_id=os.environ["AWS_ACCESS_KEY_ID"], | |
aws_secret_access_key=os.environ["AWS_SECRET_ACCESS_KEY"], | |
region_name=os.environ["AWS_DEFAULT_REGION"] | |
).client('polly') | |
# voice_id, language_code, engine = POLLY_VOICE_DATA.get_voice(polly_language, "Female") | |
voice_id, language_code, engine = POLLY_VOICE_DATA.get_voice(polly_language, "Male") | |
if not voice_id: | |
# voice_id = "Joanna" | |
voice_id = "Matthew" | |
language_code = "en-US" | |
engine = NEURAL_ENGINE | |
response = polly_client.synthesize_speech( | |
Text=words_to_speak, | |
OutputFormat='mp3', | |
VoiceId=voice_id, | |
LanguageCode=language_code, | |
Engine=engine | |
) | |
html_audio = '<pre>no audio</pre>' | |
# Save the audio stream returned by Amazon Polly on Lambda's temp directory | |
if "AudioStream" in response: | |
with closing(response["AudioStream"]) as stream: | |
# output = os.path.join("/tmp/", "speech.mp3") | |
try: | |
with open('assets/audios/tempfile.mp3', 'wb') as f: | |
f.write(stream.read()) | |
temp_aud_file = gr.File("assets/audios/tempfile.mp3") | |
temp_aud_file_url = "/file=" + temp_aud_file.value['name'] | |
html_audio = f'<audio autoplay><source src={temp_aud_file_url} type="audio/mp3"></audio>' | |
except IOError as error: | |
# Could not write to file, exit gracefully | |
print(error) | |
return None, None | |
else: | |
# The response didn't contain audio data, exit gracefully | |
print("Could not stream audio") | |
return None, None | |
return html_audio, "assets/audios/tempfile.mp3" | |
def do_html_video_speak(words_to_speak, azure_language): | |
azure_voice = AZURE_VOICE_DATA.get_voice(azure_language, "Male") | |
if not azure_voice: | |
azure_voice = "en-US-ChristopherNeural" | |
headers = {"Authorization": f"Bearer {os.environ['EXHUMAN_API_KEY']}"} | |
body = { | |
'bot_name': 'Masahiro', | |
'bot_response': words_to_speak, | |
'azure_voice': azure_voice, | |
'azure_style': 'friendly', | |
'animation_pipeline': 'high_speed', | |
} | |
api_endpoint = "https://api.exh.ai/animations/v1/generate_lipsync" | |
res = requests.post(api_endpoint, json=body, headers=headers) | |
print("res.status_code: ", res.status_code) | |
html_video = '<pre>no video</pre>' | |
if isinstance(res.content, bytes): | |
response_stream = io.BytesIO(res.content) | |
print("len(res.content)): ", len(res.content)) | |
with open('videos/tempfile.mp4', 'wb') as f: | |
f.write(response_stream.read()) | |
temp_file = gr.File("videos/tempfile.mp4") | |
temp_file_url = "/file=" + temp_file.value['name'] | |
html_video = f'<video width={TALKING_HEAD_WIDTH} height={TALKING_HEAD_WIDTH} autoplay><source src={temp_file_url} type="video/mp4" poster="Masahiro.png"></video>' | |
else: | |
print('video url unknown') | |
return html_video, "videos/tempfile.mp4" | |