Spaces:
Sleeping
Sleeping
import streamlit as st | |
from streamlit_webrtc import webrtc_streamer, VideoTransformerBase, WebRtcMode | |
import av | |
import threading | |
import numpy as np | |
from PIL import Image | |
import time | |
import base64 | |
from io import BytesIO | |
import uuid | |
def get_new_uuid(): | |
# Generate a random UUID (UUID4) and convert it to a string | |
return str(uuid.uuid4()) | |
# Usage | |
def numpy_image_to_base64(img_array, format="PNG"): | |
# Convert NumPy array to PIL Image | |
pil_img = Image.fromarray(img_array) | |
# Save PIL Image to a bytes buffer | |
buff = BytesIO() | |
pil_img.save(buff, format=format) | |
# Encode bytes to base64 string | |
img_str = base64.b64encode(buff.getvalue()).decode("utf-8") | |
return img_str | |
def base64_to_image(base64_string): | |
# Decode the Base64 string to bytes | |
image_data = base64.b64decode(base64_string) | |
# Wrap bytes in a BytesIO buffer | |
image_buffer = BytesIO(image_data) | |
# Open image with PIL | |
img = Image.open(image_buffer) | |
return img | |
def get_ice_servers(): | |
# Using Google's public STUN server | |
return [{"urls": ["stun:stun.l.google.com:19302"]}] | |
# Initialize session state keys if not present | |
if 'current_image' not in st.session_state: | |
st.session_state['current_image'] = None | |
if 'on' not in st.session_state: | |
st.session_state['on'] = False | |
if "new_id" not in st.session_state: | |
st.session_state["new_id"] = get_new_uuid() | |
class VideoTransformer(VideoTransformerBase): | |
def __init__(self, params): | |
self.frame_lock = threading.Lock() | |
self.params = params | |
self.buffer_size = 2 | |
self.frame_buffer = [None] * self.buffer_size | |
self.cv = 0 # Circular buffer index | |
def transform(self, frame: av.VideoFrame) -> np.ndarray: | |
# Convert frame to numpy array in BGR format | |
img = frame.to_ndarray(format="bgr24") | |
# Store frame in circular buffer thread-safely | |
with self.frame_lock: | |
self.frame_buffer[self.cv] = img | |
self.cv = (self.cv + 1) % self.buffer_size | |
self.params['current_image'] = img | |
return img | |
def get_latest_frame(self) -> np.ndarray: | |
with self.frame_lock: | |
idx = (self.cv - 1) % self.buffer_size | |
return self.frame_buffer[idx] | |
# Layout with two columns | |
col1, col2 = st.columns(2) | |
with col1: | |
# Start the webrtc streamer with audio disabled | |
st.code(st.session_state['new_id']) | |
rtc_configuration = { | |
"iceServers": [{"urls": ["stun:stun.l.google.com:19302"]}] | |
} | |
ctx = webrtc_streamer( | |
key="example", | |
video_transformer_factory=lambda: VideoTransformer(st.session_state), | |
media_stream_constraints={"video": True, "audio": False}, | |
async_transform=True, | |
rtc_configuration={ | |
"iceServers": [{"urls": ["stun:stun.l.google.com:19302"]}] | |
} | |
) | |
with col2: | |
# st.title("Live Video Frame Display") | |
text_value = st.text_area(label="Add room id",placeholder="please paste id") | |
# Button to toggle frame display on/off | |
toggle = st.button("Toggle Frame Display") | |
if toggle: | |
st.session_state['on'] = not st.session_state['on'] | |
image_placeholder = st.empty() | |
# Display loop: show frames from buffer at ~30 fps when toggled on | |
if ctx.video_transformer and st.session_state['on']: | |
while True: | |
frame = ctx.video_transformer.get_latest_frame() | |
if frame is not None: | |
# Convert BGR to RGB for correct color display | |
value = numpy_image_to_base64(frame) | |
with open(st.session_state['new_id']+".txt","w") as write: | |
write.write(value) | |
# frame_rgb = frame[:, :, ::-1] | |
with open(st.session_state['new_id']+".txt","w") as write: | |
write.write(value) | |
with open(st.session_state['new_id']+".txt") as read: | |
temp = read.read() | |
img = base64_to_image( temp) | |
img = np.array(img) | |
img=img[:, :, ::-1] | |
image_placeholder.image(img, channels="RGB") | |
else: | |
image_placeholder.text("Waiting for frames...") | |
time.sleep(1/30) # ~30 fps | |
else: | |
image_placeholder.text("Click 'Toggle Frame Display' to start showing frames.") | |