|
import streamlit as st |
|
import mediapipe as mp |
|
import numpy as np |
|
import base64 |
|
import io |
|
import PIL.Image |
|
import asyncio |
|
import os |
|
from google import genai |
|
from streamlit_webrtc import webrtc_streamer |
|
import av |
|
import pyaudio |
|
from mediapipe.tasks import python |
|
from mediapipe.tasks.python import vision |
|
|
|
|
|
FORMAT = pyaudio.paInt16 |
|
CHANNELS = 1 |
|
SEND_SAMPLE_RATE = 16000 |
|
RECEIVE_SAMPLE_RATE = 24000 |
|
CHUNK_SIZE = 1024 |
|
|
|
|
|
genai.configure(api_key=os.getenv("GOOGLE_API_KEY")) |
|
client = genai.Client(http_options={"api_version": "v1alpha"}) |
|
MODEL = "models/gemini-2.0-flash-exp" |
|
CONFIG = {"generation_config": {"response_modalities": ["AUDIO"]}} |
|
|
|
class AudioProcessor: |
|
def __init__(self): |
|
self.audio = pyaudio.PyAudio() |
|
self.stream = None |
|
self.audio_queue = asyncio.Queue() |
|
|
|
def start_stream(self): |
|
mic_info = self.audio.get_default_input_device_info() |
|
self.stream = self.audio.open( |
|
format=FORMAT, |
|
channels=CHANNELS, |
|
rate=SEND_SAMPLE_RATE, |
|
input=True, |
|
input_device_index=mic_info["index"], |
|
frames_per_buffer=CHUNK_SIZE, |
|
) |
|
|
|
def stop_stream(self): |
|
if self.stream: |
|
self.stream.stop_stream() |
|
self.stream.close() |
|
self.stream = None |
|
|
|
class VideoProcessor: |
|
def __init__(self): |
|
self.frame_queue = asyncio.Queue(maxsize=5) |
|
self.mp_draw = mp.solutions.drawing_utils |
|
self.mp_face_detection = mp.solutions.face_detection |
|
self.face_detection = self.mp_face_detection.FaceDetection( |
|
min_detection_confidence=0.5) |
|
|
|
def video_frame_callback(self, frame): |
|
|
|
img = frame.to_ndarray(format="rgb24") |
|
|
|
|
|
results = self.face_detection.process(img) |
|
|
|
|
|
if results.detections: |
|
for detection in results.detections: |
|
self.mp_draw.draw_detection(img, detection) |
|
|
|
|
|
pil_img = PIL.Image.fromarray(img) |
|
pil_img.thumbnail([1024, 1024]) |
|
|
|
|
|
image_io = io.BytesIO() |
|
pil_img.save(image_io, format="jpeg") |
|
image_io.seek(0) |
|
|
|
frame_data = { |
|
"mime_type": "image/jpeg", |
|
"data": base64.b64encode(image_io.read()).decode() |
|
} |
|
|
|
try: |
|
self.frame_queue.put_nowait(frame_data) |
|
except asyncio.QueueFull: |
|
pass |
|
|
|
return av.VideoFrame.from_ndarray(img, format="rgb24") |
|
|
|
def __del__(self): |
|
|
|
if hasattr(self, 'face_detection'): |
|
self.face_detection.close() |
|
|
|
def initialize_session_state(): |
|
if 'audio_processor' not in st.session_state: |
|
st.session_state.audio_processor = AudioProcessor() |
|
if 'video_processor' not in st.session_state: |
|
st.session_state.video_processor = VideoProcessor() |
|
if 'session' not in st.session_state: |
|
st.session_state.session = None |
|
if 'messages' not in st.session_state: |
|
st.session_state.messages = [] |
|
|
|
def display_chat_messages(): |
|
for message in st.session_state.messages: |
|
with st.chat_message(message["role"]): |
|
st.markdown(message["content"]) |
|
|
|
def main(): |
|
st.title("Gemini Interactive Assistant") |
|
|
|
|
|
initialize_session_state() |
|
|
|
|
|
st.sidebar.title("Settings") |
|
input_mode = st.sidebar.radio( |
|
"Input Mode", |
|
["Text Only", "Audio + Video", "Audio Only"] |
|
) |
|
|
|
|
|
enable_face_detection = st.sidebar.checkbox("Enable Face Detection", value=True) |
|
|
|
if enable_face_detection: |
|
detection_confidence = st.sidebar.slider( |
|
"Face Detection Confidence", |
|
min_value=0.0, |
|
max_value=1.0, |
|
value=0.5, |
|
step=0.1 |
|
) |
|
st.session_state.video_processor.face_detection = ( |
|
st.session_state.video_processor.mp_face_detection.FaceDetection( |
|
min_detection_confidence=detection_confidence |
|
) |
|
) |
|
|
|
|
|
display_chat_messages() |
|
|
|
|
|
if input_mode == "Text Only": |
|
user_input = st.chat_input("Your message") |
|
if user_input: |
|
|
|
st.session_state.messages.append({"role": "user", "content": user_input}) |
|
with st.chat_message("user"): |
|
st.markdown(user_input) |
|
|
|
async def send_message(): |
|
async with client.aio.live.connect(model=MODEL, config=CONFIG) as session: |
|
await session.send(user_input, end_of_turn=True) |
|
turn = session.receive() |
|
async for response in turn: |
|
if text := response.text: |
|
|
|
st.session_state.messages.append( |
|
{"role": "assistant", "content": text} |
|
) |
|
with st.chat_message("assistant"): |
|
st.markdown(text) |
|
|
|
asyncio.run(send_message()) |
|
|
|
else: |
|
|
|
if input_mode == "Audio + Video": |
|
ctx = webrtc_streamer( |
|
key="gemini-stream", |
|
video_frame_callback=st.session_state.video_processor.video_frame_callback, |
|
rtc_configuration={"iceServers": [{"urls": ["stun:stun.l.google.com:19302"]}]}, |
|
media_stream_constraints={"video": True, "audio": True}, |
|
) |
|
|
|
|
|
col1, col2 = st.columns(2) |
|
with col1: |
|
if st.button("Start Recording", type="primary"): |
|
st.session_state.audio_processor.start_stream() |
|
st.session_state['recording'] = True |
|
|
|
with col2: |
|
if st.button("Stop Recording", type="secondary"): |
|
st.session_state.audio_processor.stop_stream() |
|
st.session_state['recording'] = False |
|
|
|
async def process_audio_stream(): |
|
while st.session_state.get('recording', False): |
|
if st.session_state.audio_processor.stream: |
|
data = st.session_state.audio_processor.stream.read(CHUNK_SIZE) |
|
await st.session_state.audio_processor.audio_queue.put({ |
|
"data": data, |
|
"mime_type": "audio/pcm" |
|
}) |
|
await asyncio.sleep(0.1) |
|
|
|
if __name__ == "__main__": |
|
main() |