|
import gradio as gr |
|
import asyncio |
|
import base64 |
|
import io |
|
import cv2 |
|
import sounddevice as sd |
|
import numpy as np |
|
import PIL.Image |
|
import mss |
|
from google import genai |
|
from google.genai import types |
|
import soundfile as sf |
|
|
|
|
|
SAMPLE_RATE = 24000 |
|
CHUNK_SIZE = 1024 |
|
MODEL = "models/gemini-2.0-flash-exp" |
|
|
|
class GeminiTTS: |
|
def __init__(self, api_key): |
|
self.client = genai.Client(http_options={"api_version": "v1alpha"}, api_key=api_key) |
|
self.audio_in_queue = asyncio.Queue() |
|
self.out_queue = asyncio.Queue(maxsize=5) |
|
self.session = None |
|
self.audio_stream = None |
|
|
|
self.config = types.LiveConnectConfig( |
|
response_modalities=["audio"], |
|
speech_config=types.SpeechConfig( |
|
voice_config=types.VoiceConfig( |
|
prebuilt_voice_config=types.PrebuiltVoiceConfig(voice_name="Puck") |
|
) |
|
), |
|
system_instruction=types.Content( |
|
parts=[types.Part.from_text(text="Answer user ask, replay same thing user say no other word explain")], |
|
role="user" |
|
), |
|
) |
|
|
|
async def _get_frame(self, cap): |
|
ret, frame = cap.read() |
|
if not ret: |
|
return None |
|
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) |
|
img = PIL.Image.fromarray(frame_rgb) |
|
img.thumbnail([1024, 1024]) |
|
image_io = io.BytesIO() |
|
img.save(image_io, format="jpeg") |
|
image_io.seek(0) |
|
return {"mime_type": "image/jpeg", "data": base64.b64encode(image_io.read()).decode()} |
|
|
|
async def _get_screen(self): |
|
sct = mss.mss() |
|
monitor = sct.monitors[0] |
|
i = sct.grab(monitor) |
|
img = PIL.Image.open(io.BytesIO(mss.tools.to_png(i.rgb, i.size))) |
|
image_io = io.BytesIO() |
|
img.save(image_io, format="jpeg") |
|
image_io.seek(0) |
|
return {"mime_type": "image/jpeg", "data": base64.b64encode(image_io.read()).decode()} |
|
|
|
async def record_audio(self, duration=5): |
|
"""Record audio using sounddevice""" |
|
print(f"Recording for {duration} seconds...") |
|
recording = sd.rec(int(duration * SAMPLE_RATE), |
|
samplerate=SAMPLE_RATE, |
|
channels=1, |
|
dtype='float32') |
|
sd.wait() |
|
return recording |
|
|
|
async def play_audio(self, audio_data): |
|
"""Play audio using sounddevice""" |
|
sd.play(audio_data, samplerate=SAMPLE_RATE) |
|
sd.wait() |
|
|
|
async def process_input(self, text=None, mode="text"): |
|
try: |
|
async with self.client.aio.live.connect(model=MODEL, config=self.config) as session: |
|
self.session = session |
|
|
|
if mode == "text" and text: |
|
await session.send(input=text or ".", end_of_turn=True) |
|
elif mode == "camera": |
|
cap = cv2.VideoCapture(0) |
|
frame = await self._get_frame(cap) |
|
cap.release() |
|
if frame: |
|
await session.send(input=frame) |
|
elif mode == "screen": |
|
frame = await self._get_screen() |
|
if frame: |
|
await session.send(input=frame) |
|
|
|
|
|
turn = session.receive() |
|
async for response in turn: |
|
if data := response.data: |
|
|
|
with io.BytesIO() as wav_buffer: |
|
sf.write(wav_buffer, data, SAMPLE_RATE, format='WAV') |
|
wav_buffer.seek(0) |
|
return (SAMPLE_RATE, wav_buffer.read()) |
|
if text := response.text: |
|
return text |
|
|
|
return "No response received" |
|
except Exception as e: |
|
return f"Error: {str(e)}" |
|
|
|
def create_gradio_interface(): |
|
tts_handler = None |
|
|
|
def init_tts(api_key): |
|
nonlocal tts_handler |
|
tts_handler = GeminiTTS(api_key) |
|
return "Gemini TTS Initialized!" |
|
|
|
async def generate_response(text, mode): |
|
if not tts_handler: |
|
raise gr.Error("Please initialize the TTS system first with your API key") |
|
|
|
result = await tts_handler.process_input(text, mode) |
|
|
|
if isinstance(result, tuple) and len(result) == 2: |
|
|
|
return result |
|
else: |
|
|
|
return result |
|
|
|
async def record_and_process(): |
|
if not tts_handler: |
|
raise gr.Error("Please initialize the TTS system first with your API key") |
|
|
|
|
|
recording = await tts_handler.record_audio(duration=5) |
|
|
|
|
|
|
|
await tts_handler.play_audio(recording) |
|
return (SAMPLE_RATE, recording.tobytes()) |
|
|
|
with gr.Blocks(title="Gemini TTS Interface") as demo: |
|
gr.Markdown("# π€ Gemini Text-to-Speech Interface with SoundDevice") |
|
|
|
with gr.Row(): |
|
api_key = gr.Textbox(label="Gemini API Key", type="password") |
|
init_btn = gr.Button("Initialize TTS") |
|
|
|
init_output = gr.Textbox(label="Initialization Status", interactive=False) |
|
init_btn.click(init_tts, inputs=api_key, outputs=init_output) |
|
|
|
with gr.Tab("Text Input"): |
|
with gr.Row(): |
|
text_input = gr.Textbox(label="Enter Text", lines=3) |
|
text_btn = gr.Button("Generate Speech") |
|
|
|
text_output = gr.Audio(label="Generated Speech") |
|
text_btn.click(generate_response, inputs=[text_input, gr.Text("text", visible=False)], outputs=text_output) |
|
|
|
with gr.Tab("Voice Input"): |
|
record_btn = gr.Button("Record and Process (5 sec)") |
|
voice_output = gr.Audio(label="Processed Audio") |
|
record_btn.click(record_and_process, outputs=voice_output) |
|
|
|
with gr.Tab("Camera Input"): |
|
camera_btn = gr.Button("Capture and Process") |
|
camera_output = gr.Audio(label="Generated Speech from Camera") |
|
camera_btn.click(generate_response, inputs=[gr.Text("", visible=False), gr.Text("camera", visible=False)], outputs=camera_output) |
|
|
|
with gr.Tab("Screen Capture"): |
|
screen_btn = gr.Button("Capture Screen and Process") |
|
screen_output = gr.Audio(label="Generated Speech from Screen") |
|
screen_btn.click(generate_response, inputs=[gr.Text("", visible=False), gr.Text("screen", visible=False)], outputs=screen_output) |
|
|
|
return demo |
|
|
|
if __name__ == "__main__": |
|
demo = create_gradio_interface() |
|
demo.launch() |