Spaces:
Build error
Build error
import streamlit as st | |
import base64 | |
from threading import Lock, Thread | |
import cv2 | |
import openai | |
from dotenv import load_dotenv | |
from langchain.prompts import ChatPromptTemplate, MessagesPlaceholder | |
from langchain.schema.messages import SystemMessage | |
from langchain_community.chat_message_histories import ChatMessageHistory | |
from langchain_core.output_parsers import StrOutputParser | |
from langchain_core.runnables.history import RunnableWithMessageHistory | |
from langchain_openai import ChatOpenAI | |
from langchain_google_genai import ChatGoogleGenerativeAI | |
import pyaudio | |
import speech_recognition as sr | |
import os | |
import json | |
import random | |
import requests | |
import time | |
import zipfile | |
from PIL import Image | |
from urllib.parse import quote | |
load_dotenv() | |
class WebcamStream: | |
def __init__(self): | |
self.stream = cv2.VideoCapture(0) | |
_, self.frame = self.stream.read() | |
self.running = False | |
self.lock = Lock() | |
def start(self): | |
if self.running: | |
return self | |
self.running = True | |
self.thread = Thread(target=self.update, args=()) | |
self.thread.start() | |
return self | |
def update(self): | |
while self.running: | |
_, frame = self.stream.read() | |
self.lock.acquire() | |
self.frame = frame | |
self.lock.release() | |
def read(self, encode=False): | |
self.lock.acquire() | |
frame = self.frame.copy() | |
self.lock.release() | |
if encode: | |
_, buffer = cv2.imencode(".jpeg", frame) | |
return base64.b64encode(buffer) | |
return frame | |
def stop(self): | |
self.running = False | |
if self.thread.is_alive(): | |
self.thread.join() | |
def __exit__(self, exc_type, exc_value, exc_traceback): | |
self.stream.release() | |
class Assistant: | |
def __init__(self, model): | |
self.chain = self._create_inference_chain(model) | |
def answer(self, prompt, image): | |
if not prompt: | |
return | |
st.write("Prompt:", prompt) | |
response = self.chain.invoke( | |
{"prompt": prompt, "image_base64": image.decode()}, | |
config={"configurable": {"session_id": "unused"}}, | |
).strip() | |
st.write("Response:", response) | |
if response: | |
self._tts(response) | |
def _tts(self, response): | |
player = pyaudio.PyAudio().open(format=pyaudio.paInt16, channels=1, rate=24000, output=True) | |
with openai.audio.speech.with_streaming_response.create( | |
model="tts-1", | |
voice="alloy", | |
response_format="pcm", | |
input=response, | |
) as stream: | |
for chunk in stream.iter_bytes(chunk_size=1024): | |
player.write(chunk) | |
def _create_inference_chain(self, model): | |
SYSTEM_PROMPT = """ | |
You are a witty assistant that will use the chat history and the image | |
provided by the user to answer its questions. | |
Use few words on your answers. Go straight to the point. Do not use any | |
emoticons or emojis. Do not ask the user any questions. | |
Be friendly and helpful. Show some personality. Do not be too formal. | |
""" | |
prompt_template = ChatPromptTemplate.from_messages([ | |
SystemMessage(content=SYSTEM_PROMPT), | |
MessagesPlaceholder(variable_name="chat_history"), | |
("human", [ | |
{"type": "text", "text": "{prompt}"}, | |
{"type": "image_url", "image_url": "data:image/jpeg;base64,{image_base64}"}, | |
]), | |
]) | |
chain = prompt_template | model | StrOutputParser() | |
chat_message_history = ChatMessageHistory() | |
return RunnableWithMessageHistory( | |
chain, | |
lambda _: chat_message_history, | |
input_messages_key="prompt", | |
history_messages_key="chat_history", | |
) | |
def main(): | |
st.title("ππ Scholarly Article Document Search with Memory") | |
webcam_stream = WebcamStream().start() | |
model = ChatGoogleGenerativeAI(model="gemini-1.5-flash-latest") | |
assistant = Assistant(model) | |
# Sidebar | |
st.sidebar.title("Options") | |
should_save = st.sidebar.checkbox("πΎ Save", value=True, help="Save your session data.") | |
# Main content | |
query = st.text_input("Enter your search query:") | |
if st.button("Search"): | |
image = webcam_stream.read(encode=True) | |
assistant.answer(query, image) | |
# File upload | |
uploaded_file = st.file_uploader("Upload a file for context", type=["txt", "pdf", "docx"]) | |
if uploaded_file: | |
file_contents = uploaded_file.read() | |
st.write("File contents:", file_contents) | |
# Display webcam feed | |
st.image(webcam_stream.read(), channels="BGR") | |
# Audio recording | |
if st.button("Record Audio"): | |
r = sr.Recognizer() | |
with sr.Microphone() as source: | |
st.write("Speak now...") | |
audio = r.listen(source) | |
st.write("Processing audio...") | |
try: | |
text = r.recognize_google(audio) | |
st.write("You said:", text) | |
assistant.answer(text, webcam_stream.read(encode=True)) | |
except sr.UnknownValueError: | |
st.write("Could not understand audio") | |
except sr.RequestError as e: | |
st.write("Could not request results; {0}".format(e)) | |
webcam_stream.stop() | |
if __name__ == "__main__": | |
main() |