Spaces:
Sleeping
Sleeping
import base64 | |
from threading import Lock, Thread | |
import cv2 | |
import openai | |
import sounddevice as sd | |
import streamlit as st | |
from cv2 import VideoCapture, imencode | |
from dotenv import load_dotenv | |
from langchain.prompts import ChatPromptTemplate, MessagesPlaceholder | |
from langchain.schema.messages import SystemMessage | |
from langchain_community.chat_message_histories import ChatMessageHistory | |
from langchain_core.output_parsers import StrOutputParser | |
from langchain_core.runnables.history import RunnableWithMessageHistory | |
from langchain_openai import ChatOpenAI | |
from scipy.io.wavfile import write | |
from speech_recognition import Recognizer, UnknownValueError, AudioData | |
load_dotenv() | |
class WebcamStream: | |
def __init__(self): | |
self.stream = VideoCapture(index=0) | |
_, self.frame = self.stream.read() | |
self.running = False | |
self.lock = Lock() | |
def start(self): | |
if self.running: | |
return self | |
self.running = True | |
self.thread = Thread(target=self.update, args=()) | |
self.thread.start() | |
return self | |
def update(self): | |
while self.running: | |
_, frame = self.stream.read() | |
self.lock.acquire() | |
self.frame = frame | |
self.lock.release() | |
def read(self, encode=False): | |
self.lock.acquire() | |
frame = self.frame.copy() | |
self.lock.release() | |
if encode: | |
_, buffer = imencode(".jpeg", frame) | |
return base64.b64encode(buffer) | |
return frame | |
def stop(self): | |
self.running = False | |
if self.thread.is_alive(): | |
self.thread.join() | |
def __exit__(self, exc_type, exc_value, exc_traceback): | |
self.stream.release() | |
class Assistant: | |
def __init__(self, model): | |
self.chain = self._create_inference_chain(model) | |
def answer(self, prompt, image): | |
if not prompt: | |
return | |
print("Prompt:", prompt) | |
response = self.chain.invoke( | |
{"prompt": prompt, "image_base64": image.decode()}, | |
config={"configurable": {"session_id": "unused"}}, | |
).strip() | |
print("Response:", response) | |
if response: | |
self._tts(response) | |
def _tts(self, response): | |
# Simulate TTS: normally you'd use a library or API here | |
print(f"TTS: {response}") | |
def _create_inference_chain(self, model): | |
SYSTEM_PROMPT = """ | |
You are a witty assistant that will use the chat history and the image | |
provided by the user to answer its questions. Your job is to answer | |
questions. | |
Use few words on your answers. Go straight to the point. Do not use any | |
emoticons or emojis. | |
Be friendly and helpful. Show some personality. | |
""" | |
prompt_template = ChatPromptTemplate.from_messages( | |
[ | |
SystemMessage(content=SYSTEM_PROMPT), | |
MessagesPlaceholder(variable_name="chat_history"), | |
( | |
"human", | |
[ | |
{"type": "text", "text": "{prompt}"}, | |
{ | |
"type": "image_url", | |
"image_url": "data:image/jpeg;base64,{image_base64}", | |
}, | |
], | |
), | |
] | |
) | |
chain = prompt_template | model | StrOutputParser() | |
chat_message_history = ChatMessageHistory() | |
return RunnableWithMessageHistory( | |
chain, | |
lambda _: chat_message_history, | |
input_messages_key="prompt", | |
history_messages_key="chat_history", | |
) | |
def main(): | |
st.title("AI Assistant with Webcam Stream") | |
# Instantiate Webcam Stream and start it | |
webcam_stream = WebcamStream().start() | |
# model = ChatGoogleGenerativeAI(model="gemini-1.5-flash-latest") | |
# You can use OpenAI's GPT-4o model instead of Gemini Flash by uncommenting the following line: | |
model = ChatOpenAI(model="gpt-4o") | |
assistant = Assistant(model) | |
# UI for webcam feed | |
st.subheader("Webcam Feed") | |
def run_webcam(): | |
while True: | |
frame = webcam_stream.read() | |
_, buffer = cv2.imencode('.jpg', frame) | |
frame_data = base64.b64encode(buffer).decode('utf-8') | |
# Display frame in Streamlit app | |
st.image(f"data:image/jpeg;base64,{frame_data}", use_column_width=True) | |
st.experimental_rerun() | |
webcam_thread = Thread(target=run_webcam) | |
webcam_thread.start() | |
st.subheader("Ask the Assistant") | |
prompt = st.text_input("Enter your question:") | |
if st.button("Submit"): | |
if prompt: | |
assistant.answer(prompt, webcam_stream.read(encode=True)) | |
else: | |
st.warning("Please enter a prompt to submit.") | |
if st.button("Stop Webcam"): | |
webcam_stream.stop() | |
cv2.destroyAllWindows() | |
if __name__ == "__main__": | |
main() |