import streamlit as st from streamlit_webrtc import webrtc_streamer, VideoTransformerBase, WebRtcMode import av import threading import numpy as np from PIL import Image import time import base64 from io import BytesIO import uuid def get_new_uuid(): # Generate a random UUID (UUID4) and convert it to a string return str(uuid.uuid4()) # Usage def numpy_image_to_base64(img_array, format="PNG"): # Convert NumPy array to PIL Image pil_img = Image.fromarray(img_array) # Save PIL Image to a bytes buffer buff = BytesIO() pil_img.save(buff, format=format) # Encode bytes to base64 string img_str = base64.b64encode(buff.getvalue()).decode("utf-8") return img_str def base64_to_image(base64_string): # Decode the Base64 string to bytes image_data = base64.b64decode(base64_string) # Wrap bytes in a BytesIO buffer image_buffer = BytesIO(image_data) # Open image with PIL img = Image.open(image_buffer) return img def get_ice_servers(): # Using Google's public STUN server return [{"urls": ["stun:stun.l.google.com:19302"]}] # Initialize session state keys if not present if 'current_image' not in st.session_state: st.session_state['current_image'] = None if 'on' not in st.session_state: st.session_state['on'] = False if "new_id" not in st.session_state: st.session_state["new_id"] = get_new_uuid() class VideoTransformer(VideoTransformerBase): def __init__(self, params): self.frame_lock = threading.Lock() self.params = params self.buffer_size = 2 self.frame_buffer = [None] * self.buffer_size self.cv = 0 # Circular buffer index def transform(self, frame: av.VideoFrame) -> np.ndarray: # Convert frame to numpy array in BGR format img = frame.to_ndarray(format="bgr24") # Store frame in circular buffer thread-safely with self.frame_lock: self.frame_buffer[self.cv] = img self.cv = (self.cv + 1) % self.buffer_size self.params['current_image'] = img return img def get_latest_frame(self) -> np.ndarray: with self.frame_lock: idx = (self.cv - 1) % self.buffer_size return self.frame_buffer[idx] # Layout with two columns col1, col2 = st.columns(2) with col1: # Start the webrtc streamer with audio disabled st.code(st.session_state['new_id']) rtc_configuration = { "iceServers": [{"urls": ["stun:stun.l.google.com:19302"]}] } ctx = webrtc_streamer( key="example", video_transformer_factory=lambda: VideoTransformer(st.session_state), media_stream_constraints={"video": True, "audio": False}, async_transform=True, rtc_configuration={ "iceServers": [{"urls": ["stun:stun.l.google.com:19302"]}] } ) with col2: # st.title("Live Video Frame Display") text_value = st.text_area(label="Add room id",placeholder="please paste id") # Button to toggle frame display on/off toggle = st.button("Toggle Frame Display") if toggle: st.session_state['on'] = not st.session_state['on'] image_placeholder = st.empty() # Display loop: show frames from buffer at ~30 fps when toggled on if ctx.video_transformer and st.session_state['on']: while True: frame = ctx.video_transformer.get_latest_frame() if frame is not None: # Convert BGR to RGB for correct color display value = numpy_image_to_base64(frame) with open(st.session_state['new_id']+".txt","w") as write: write.write(value) # frame_rgb = frame[:, :, ::-1] with open(st.session_state['new_id']+".txt","w") as write: write.write(value) with open(st.session_state['new_id']+".txt") as read: temp = read.read() img = base64_to_image( temp) img = np.array(img) img=img[:, :, ::-1] image_placeholder.image(img, channels="RGB") else: image_placeholder.text("Waiting for frames...") time.sleep(1/30) # ~30 fps else: image_placeholder.text("Click 'Toggle Frame Display' to start showing frames.")