File size: 3,994 Bytes
bf97fde
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
import streamlit as st
from streamlit_webrtc import webrtc_streamer, VideoTransformerBase
import av
import threading
import numpy as np
from PIL import Image
import time
import base64
from io import BytesIO
import uuid

def get_new_uuid():
    # Generate a random UUID (UUID4) and convert it to a string
    return str(uuid.uuid4())

# Usage

def numpy_image_to_base64(img_array, format="PNG"):
    # Convert NumPy array to PIL Image
    pil_img = Image.fromarray(img_array)
    # Save PIL Image to a bytes buffer
    buff = BytesIO()
    pil_img.save(buff, format=format)
    # Encode bytes to base64 string
    img_str = base64.b64encode(buff.getvalue()).decode("utf-8")
    return img_str
def base64_to_image(base64_string):
    # Decode the Base64 string to bytes
    image_data = base64.b64decode(base64_string)
    # Wrap bytes in a BytesIO buffer
    image_buffer = BytesIO(image_data)
    # Open image with PIL
    img = Image.open(image_buffer)
    return img

# Initialize session state keys if not present
if 'current_image' not in st.session_state:
    st.session_state['current_image'] = None
if 'on' not in st.session_state:
    st.session_state['on'] = False
if "new_id" not in st.session_state:
    st.session_state["new_id"] = get_new_uuid()
class VideoTransformer(VideoTransformerBase):
    def __init__(self, params):
        self.frame_lock = threading.Lock()
        self.params = params
        self.buffer_size = 2
        self.frame_buffer = [None] * self.buffer_size
        self.cv = 0  # Circular buffer index

    def transform(self, frame: av.VideoFrame) -> np.ndarray:
        # Convert frame to numpy array in BGR format
        img = frame.to_ndarray(format="bgr24")

        # Store frame in circular buffer thread-safely
        with self.frame_lock:
            self.frame_buffer[self.cv] = img
            self.cv = (self.cv + 1) % self.buffer_size
            self.params['current_image'] = img

        return img

    def get_latest_frame(self) -> np.ndarray:
        with self.frame_lock:
            idx = (self.cv - 1) % self.buffer_size
            return self.frame_buffer[idx]

# Layout with two columns

col1, col2 = st.columns(2)

with col1:
    # Start the webrtc streamer with audio disabled
    st.code(st.session_state['new_id'])
    ctx = webrtc_streamer(
        key="example",
        video_transformer_factory=lambda: VideoTransformer(st.session_state),
        media_stream_constraints={"video": True, "audio": False},
        async_transform=True,
    )

with col2:
    # st.title("Live Video Frame Display")
    text_value = st.text_area(label="Add room id",placeholder="please paste id")
    # Button to toggle frame display on/off
    toggle = st.button("Toggle Frame Display")

    if toggle:
        st.session_state['on'] = not st.session_state['on']

    image_placeholder = st.empty()

    # Display loop: show frames from buffer at ~30 fps when toggled on
    if ctx.video_transformer and st.session_state['on']:
        while True:
            frame = ctx.video_transformer.get_latest_frame()
            if frame is not None:
                # Convert BGR to RGB for correct color display
                value = numpy_image_to_base64(frame)
                with open(st.session_state['new_id']+".txt","w") as write:
                    write.write(value)
                # frame_rgb = frame[:, :, ::-1]
                with open(st.session_state['new_id']+".txt","w") as write:
                    write.write(value)
                with open(st.session_state['new_id']+".txt") as read:
                    temp = read.read()
                img = base64_to_image( temp)
                img = np.array(img)
                img=img[:, :, ::-1]
                image_placeholder.image(img, channels="RGB")
            else:
                image_placeholder.text("Waiting for frames...")

            time.sleep(1/30)  # ~30 fps
    else:
        image_placeholder.text("Click 'Toggle Frame Display' to start showing frames.")