File size: 4,339 Bytes
bf97fde
da72d70
bf97fde
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
da72d70
 
 
 
bf97fde
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
0f9194e
 
 
 
 
bf97fde
 
 
 
 
73cac8b
 
 
bf97fde
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
import streamlit as st
from streamlit_webrtc import webrtc_streamer, VideoTransformerBase, WebRtcMode
import av
import threading
import numpy as np
from PIL import Image
import time
import base64
from io import BytesIO
import uuid

def get_new_uuid():
    # Generate a random UUID (UUID4) and convert it to a string
    return str(uuid.uuid4())

# Usage

def numpy_image_to_base64(img_array, format="PNG"):
    # Convert NumPy array to PIL Image
    pil_img = Image.fromarray(img_array)
    # Save PIL Image to a bytes buffer
    buff = BytesIO()
    pil_img.save(buff, format=format)
    # Encode bytes to base64 string
    img_str = base64.b64encode(buff.getvalue()).decode("utf-8")
    return img_str
def base64_to_image(base64_string):
    # Decode the Base64 string to bytes
    image_data = base64.b64decode(base64_string)
    # Wrap bytes in a BytesIO buffer
    image_buffer = BytesIO(image_data)
    # Open image with PIL
    img = Image.open(image_buffer)
    return img
def get_ice_servers():
    # Using Google's public STUN server
    return [{"urls": ["stun:stun.l.google.com:19302"]}]
    
# Initialize session state keys if not present
if 'current_image' not in st.session_state:
    st.session_state['current_image'] = None
if 'on' not in st.session_state:
    st.session_state['on'] = False
if "new_id" not in st.session_state:
    st.session_state["new_id"] = get_new_uuid()
class VideoTransformer(VideoTransformerBase):
    def __init__(self, params):
        self.frame_lock = threading.Lock()
        self.params = params
        self.buffer_size = 2
        self.frame_buffer = [None] * self.buffer_size
        self.cv = 0  # Circular buffer index

    def transform(self, frame: av.VideoFrame) -> np.ndarray:
        # Convert frame to numpy array in BGR format
        img = frame.to_ndarray(format="bgr24")

        # Store frame in circular buffer thread-safely
        with self.frame_lock:
            self.frame_buffer[self.cv] = img
            self.cv = (self.cv + 1) % self.buffer_size
            self.params['current_image'] = img

        return img

    def get_latest_frame(self) -> np.ndarray:
        with self.frame_lock:
            idx = (self.cv - 1) % self.buffer_size
            return self.frame_buffer[idx]

# Layout with two columns

col1, col2 = st.columns(2)

with col1:
    # Start the webrtc streamer with audio disabled
    st.code(st.session_state['new_id'])
    
    
    rtc_configuration = {
    "iceServers": [{"urls": ["stun:stun.l.google.com:19302"]}]
}
    ctx = webrtc_streamer(
        key="example",
        video_transformer_factory=lambda: VideoTransformer(st.session_state),
        media_stream_constraints={"video": True, "audio": False},
        async_transform=True,
        rtc_configuration={
            "iceServers": [{"urls": ["stun:stun.l.google.com:19302"]}]
        }
    )

with col2:
    # st.title("Live Video Frame Display")
    text_value = st.text_area(label="Add room id",placeholder="please paste id")
    # Button to toggle frame display on/off
    toggle = st.button("Toggle Frame Display")

    if toggle:
        st.session_state['on'] = not st.session_state['on']

    image_placeholder = st.empty()

    # Display loop: show frames from buffer at ~30 fps when toggled on
    if ctx.video_transformer and st.session_state['on']:
        while True:
            frame = ctx.video_transformer.get_latest_frame()
            if frame is not None:
                # Convert BGR to RGB for correct color display
                value = numpy_image_to_base64(frame)
                with open(st.session_state['new_id']+".txt","w") as write:
                    write.write(value)
                # frame_rgb = frame[:, :, ::-1]
                with open(st.session_state['new_id']+".txt","w") as write:
                    write.write(value)
                with open(st.session_state['new_id']+".txt") as read:
                    temp = read.read()
                img = base64_to_image( temp)
                img = np.array(img)
                img=img[:, :, ::-1]
                image_placeholder.image(img, channels="RGB")
            else:
                image_placeholder.text("Waiting for frames...")

            time.sleep(1/30)  # ~30 fps
    else:
        image_placeholder.text("Click 'Toggle Frame Display' to start showing frames.")