Spaces:
Runtime error
Runtime error
| import asyncio | |
| from collections import deque | |
| import os | |
| import threading | |
| import time | |
| import av | |
| import numpy as np | |
| import streamlit as st | |
| from streamlit_webrtc import WebRtcMode, webrtc_streamer | |
| import pydub | |
| import torch | |
| # import av | |
| # import cv2 | |
| from sample_utils.turn import get_ice_servers | |
| import json | |
| from typing import List | |
| from dotenv import load_dotenv | |
| load_dotenv() | |
| async def main(): | |
| system_one = { | |
| "audio_bit_rate": 16000, | |
| # "audio_bit_rate": 32000, | |
| # "audio_bit_rate": 48000, | |
| # "vision_embeddings_fps": 5, | |
| "vision_embeddings_fps": 2, | |
| } | |
| system_one["video_detection_emotions"] = [ | |
| "a happy person", | |
| "the person is happy", | |
| "the person's emotional state is happy", | |
| "a sad person", | |
| "a scared person", | |
| "a disgusted person", | |
| "an angry person", | |
| "a suprised person", | |
| "a bored person", | |
| "an interested person", | |
| "a guilty person", | |
| "an indiffert person", | |
| "a distracted person", | |
| ] | |
| # system_one["video_detection_emotions"] = [ | |
| # "Happiness", | |
| # "Sadness", | |
| # "Fear", | |
| # "Disgust", | |
| # "Anger", | |
| # "Surprise", | |
| # "Boredom", | |
| # "Interest", | |
| # "Excitement", | |
| # "Guilt", | |
| # "Shame", | |
| # "Relief", | |
| # "Love", | |
| # "Embarrassment", | |
| # "Pride", | |
| # "Envy", | |
| # "Jealousy", | |
| # "Anxiety", | |
| # "Hope", | |
| # "Despair", | |
| # "Frustration", | |
| # "Confusion", | |
| # "Curiosity", | |
| # "Contentment", | |
| # "Indifference", | |
| # "Anticipation", | |
| # "Gratitude", | |
| # "Bitterness" | |
| # ] | |
| system_one["video_detection_engement"] = [ | |
| "the person is engaged in the conversation", | |
| "the person is not engaged in the conversation", | |
| "the person is looking at me", | |
| "the person is not looking at me", | |
| "the person is talking to me", | |
| "the person is not talking to me", | |
| "the person is engaged", | |
| "the person is talking", | |
| "the person is listening", | |
| ] | |
| system_one["video_detection_present"] = [ | |
| "the view from a webcam", | |
| "the view from a webcam we see a person", | |
| # "the view from a webcam. I see a person", | |
| # "the view from a webcam. The person is looking at the camera", | |
| # "i am a webcam", | |
| # "i am a webcam and i see a person", | |
| # "i am a webcam and i see a person. The person is looking at me", | |
| # "a person", | |
| # "a person on a Zoom call", | |
| # "a person on a FaceTime call", | |
| # "a person on a WebCam call", | |
| # "no one", | |
| # " ", | |
| # "multiple people", | |
| # "a group of people", | |
| ] | |
| system_one_audio_status = st.empty() | |
| playing = st.checkbox("Playing", value=True) | |
| def handle_audio_frame(frame): | |
| # if self.vosk.AcceptWaveform(data): | |
| pass | |
| # create frames to be returned. | |
| new_frames = [] | |
| for frame in frames: | |
| input_array = frame.to_ndarray() | |
| new_frame = av.AudioFrame.from_ndarray( | |
| np.zeros(input_array.shape, dtype=input_array.dtype), | |
| layout=frame.layout.name, | |
| ) | |
| new_frame.sample_rate = frame.sample_rate | |
| new_frames.append(new_frame) | |
| # TODO: replace with the audio we want to send to the other side. | |
| return new_frames | |
| system_one_audio_status.write("Initializing CLIP model") | |
| from clip_transform import CLIPTransform | |
| clip_transform = CLIPTransform() | |
| system_one_audio_status.write("Initializing chat pipeline") | |
| from chat_pipeline import ChatPipeline | |
| chat_pipeline = ChatPipeline() | |
| await chat_pipeline.start() | |
| system_one_audio_status.write("Initializing CLIP templates") | |
| embeddings = clip_transform.text_to_embeddings(system_one["video_detection_emotions"]) | |
| system_one["video_detection_emotions_embeddings"] = embeddings | |
| embeddings = clip_transform.text_to_embeddings(system_one["video_detection_engement"]) | |
| system_one["video_detection_engement_embeddings"] = embeddings | |
| embeddings = clip_transform.text_to_embeddings(system_one["video_detection_present"]) | |
| system_one["video_detection_present_embeddings"] = embeddings | |
| system_one_audio_status.write("Initializing webrtc_streamer") | |
| webrtc_ctx = webrtc_streamer( | |
| key="charles", | |
| desired_playing_state=playing, | |
| # audio_receiver_size=4096, | |
| queued_audio_frames_callback=queued_audio_frames_callback, | |
| queued_video_frames_callback=queued_video_frames_callback, | |
| mode=WebRtcMode.SENDRECV, | |
| rtc_configuration={"iceServers": get_ice_servers()}, | |
| async_processing=True, | |
| ) | |
| if not webrtc_ctx.state.playing: | |
| exit | |
| system_one_audio_status.write("Initializing streaming") | |
| system_one_audio_output = st.empty() | |
| system_one_video_output = st.empty() | |
| system_one_audio_history = [] | |
| system_one_audio_history_output = st.empty() | |
| sound_chunk = pydub.AudioSegment.empty() | |
| current_video_embedding = None | |
| current_video_embedding_timestamp = time.monotonic() | |
| def get_dot_similarities(video_embedding, embeddings, embeddings_labels): | |
| dot_product = torch.mm(embeddings, video_embedding.T) | |
| similarity_image_label = [(float("{:.4f}".format(dot_product[i][0])), embeddings_labels[i]) for i in range(len(embeddings_labels))] | |
| similarity_image_label.sort(reverse=True) | |
| return similarity_image_label | |
| def get_top_3_similarities_as_a_string(video_embedding, embeddings, embeddings_labels): | |
| similarities = get_dot_similarities(video_embedding, embeddings, embeddings_labels) | |
| top_3 = "" | |
| range_len = 3 if len(similarities) > 3 else len(similarities) | |
| for i in range(range_len): | |
| top_3 += f"{similarities[i][1]} ({similarities[i][0]}) " | |
| return top_3 | |
| while True: | |
| try: | |
| if webrtc_ctx.state.playing: | |
| # handle video | |
| video_frames = [] | |
| with video_frames_deque_lock: | |
| while len(video_frames_deque) > 0: | |
| frame = video_frames_deque.popleft() | |
| video_frames.append(frame) | |
| get_embeddings = False | |
| get_embeddings |= current_video_embedding is None | |
| current_time = time.monotonic() | |
| elapsed_time = current_time - current_video_embedding_timestamp | |
| get_embeddings |= elapsed_time > 1. / system_one['vision_embeddings_fps'] | |
| if get_embeddings and len(video_frames) > 0: | |
| current_video_embedding_timestamp = current_time | |
| current_video_embedding = clip_transform.image_to_embeddings(video_frames[-1].to_ndarray()) | |
| emotions_top_3 = get_top_3_similarities_as_a_string(current_video_embedding, system_one["video_detection_emotions_embeddings"], system_one["video_detection_emotions"]) | |
| engagement_top_3 = get_top_3_similarities_as_a_string(current_video_embedding, system_one["video_detection_engement_embeddings"], system_one["video_detection_engement"]) | |
| present_top_3 = get_top_3_similarities_as_a_string(current_video_embedding, system_one["video_detection_present_embeddings"], system_one["video_detection_present"]) | |
| # table_content = "**System 1 Video:**\n\n" | |
| table_content = "| System 1 Video | |\n| --- | --- |\n" | |
| table_content += f"| Present | {present_top_3} |\n" | |
| table_content += f"| Emotion | {emotions_top_3} |\n" | |
| table_content += f"| Engagement | {engagement_top_3} |\n" | |
| system_one_video_output.markdown(table_content) | |
| # system_one_video_output.markdown(f"**System 1 Video:** \n [Emotion: {emotions_top_3}], \n [Engagement: {engagement_top_3}], \n [Present: {present_top_3}] ") | |
| # for similarity, image_label in similarity_image_label: | |
| # print (f"{similarity} {image_label}") | |
| if len(audio_frames) == 0: | |
| time.sleep(0.1) | |
| system_one_audio_status.write("No frame arrived.") | |
| continue | |
| system_one_audio_status.write("Running. Say something!") | |
| for audio_frame in audio_frames: | |
| sound = pydub.AudioSegment( | |
| data=audio_frame.to_ndarray().tobytes(), | |
| sample_width=audio_frame.format.bytes, | |
| frame_rate=audio_frame.sample_rate, | |
| channels=len(audio_frame.layout.channels), | |
| ) | |
| sound = sound.set_channels(1) | |
| sound = sound.set_frame_rate(system_one['audio_bit_rate']) | |
| sound_chunk += sound | |
| if len(sound_chunk) > 0: | |
| buffer = np.array(sound_chunk.get_array_of_samples()) | |
| text, speaker_finished = do_work(buffer.tobytes()) | |
| system_one_audio_output.markdown(f"**System 1 Audio:** {text}") | |
| if speaker_finished and len(text) > 0: | |
| system_one_audio_history.append(text) | |
| if len(system_one_audio_history) > 10: | |
| system_one_audio_history = system_one_audio_history[-10:] | |
| table_content = "| System 1 Audio History |\n| --- |\n" | |
| table_content += "\n".join([f"| {item} |" for item in reversed(system_one_audio_history)]) | |
| system_one_audio_history_output.markdown(table_content) | |
| await chat_pipeline.enqueue(text) | |
| sound_chunk = pydub.AudioSegment.empty() | |
| else: | |
| system_one_audio_status.write("Stopped.") | |
| break | |
| except KeyboardInterrupt: | |
| print("Pipeline interrupted by user") | |
| except Exception as e: | |
| print(f"An error occurred: {e}") | |
| if __name__ == "__main__": | |
| asyncio.run(main()) |