File size: 11,494 Bytes
712d204
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
import gradio as gr
from pygame import Vector2
import time
import threading
import queue
from simulator_env import StreamableSimulation, SwarmAgent, MyConfig, MyWindow

import speech_processing
import text_processing
import safety_module
import bt_generator

from pathlib import Path

BASE = Path(__file__).parent

class GradioStreamer:
    _instance = None
    
    def __new__(cls):
        if cls._instance is None:
            cls._instance = super(GradioStreamer, cls).__new__(cls)
            cls._instance.initialized = False
        return cls._instance
    
    def __init__(self):
        if not self.initialized:
            self.latest_frame = None
            self.running = True
            self.sim = None
            self.sim_thread = None
            self.initialized = True
            self.quit = False
    
    def update_frame(self, frame):
        self.latest_frame = frame

    def run_simulation(self):
            # Instantiate simulation and agents here:
            
            nest_pos = Vector2(450, 400)
            target_pos = Vector2(300, 200)
            
            agent_images = ["white.png", "green.png", "red circle.png"]
            image_paths = [str(BASE / "images" / fname) for fname in agent_images]
            

            # agent_images_paths = ["./images/white.png", "./images/green.png", "./images/red circle.png"]
            config = MyConfig(radius=25, visualise_chunks=True, movement_speed=2.0)
            self.sim = StreamableSimulation(config=config)
            loaded_agent_images = self.sim._load_image(image_paths)
            # loaded_agent_images = self.sim._load_image(agent_images_paths)

            # Create agents (each agent builds its own BT in its __init__)
            for _ in range(50):
                agents_pos = Vector2(450, 400)
                agent = SwarmAgent(
                    images=loaded_agent_images,
                    simulation=self.sim,
                    pos=agents_pos,
                    nest_pos=nest_pos,
                    target_pos=target_pos
                )
                self.sim._agents.add(agent)
                self.sim._all.add(agent)
            # (Optionally spawn obstacles and sites.)
            self.sim.spawn_obstacle(str(BASE / "images" / "rect_obst.png"), 350, 50)
            self.sim.spawn_obstacle(str(BASE / "images" / "rect_obst (1).png"), 100, 350)

            self.sim.spawn_site(str(BASE / "images" / "rect.png"), 300, 200)
            self.sim.spawn_site(str(BASE / "images" / "nest.png"), 450, 400)


            start_time = time.time()  # Record the start time

            
            while self.running:
                self.sim.tick()
                
                if not self.sim.frame_queue.empty():
                    frame = self.sim.frame_queue.get()
                    self.update_frame(frame)    

                time.sleep(1/10)  # Maintain a frame rate of ~30 FPS
                # Stop after 2 minute
                if time.time() - start_time >= 120:
                    print("Simulation stopped after 1 minute.")
                    break

            

    def stream(self):
        while True:
            if self.sim is not None and self.latest_frame is not None:
                yield self.latest_frame
            else:
                # Optionally, yield a blank image or None.
                yield None
            time.sleep(1/30)


    def start_simulation(self):
        """Start the simulation, creating a new thread if necessary."""
        if not self.sim_thread or not self.sim_thread.is_alive():
            self.running = True      
            self.quit = False        
            self.latest_frame = None 
            self.sim_thread = threading.Thread(target=self.run_simulation, daemon=True)
            self.sim_thread.start()


    def clear_frame_queue(self):
        if self.sim:
            try:
                while True:
                    self.sim.frame_queue.get_nowait()
            except queue.Empty:
                pass



    def stop_simulation(self):
        print("Stopping Simulation...")
        self.running = False
        self.quit = True
        if self.sim:
            for agent in self.sim._agents:
                agent.bt_active = False
            self.sim.running = False
            self.sim.stop()
            self.clear_frame_queue()
            self.sim = None
        if self.sim_thread and self.sim_thread.is_alive():
            self.sim_thread.join(timeout=2)
            print("Simulation thread terminated.")
        self.latest_frame = None  # Clear the displayed frame
        print("Simulation stopped successfully.")




def stop_gradio_interface():
    raise Exception("Simulation stopped!")


def create_gradio_interface():
    streamer = GradioStreamer()
    
    def on_translate_or_process():
        streamer.start_simulation()
        return gr.update(visible=True)
    
    def on_stop():
        print("Simulation on_stop")
        streamer.stop_simulation()
        return gr.update(visible=False)
    
    behaviors = bt_generator.call_behaviors()       
    formatted_behaviors = "\n".join(
        f"- **{name}**: {doc.split('Returns:')[0].strip()}"
        for name, doc in sorted(
            behaviors.items(),
            key=lambda item: item[0].lower()
                )
            )

    # Gradio Interface
    with gr.Blocks() as demo:
        gr.Markdown(
            """
        # ๐Ÿ **SwarmChat:** Enabling Humanโ€“Swarm Interaction and Robot Control via Natural Language
        Easily talk to virtual robots, and see the result live.  
        """
        )
        gr.Markdown(
            """
        **How it works**

        1.  Speak or type a task in *any EU language* (e.g. โ€œFind Food, then change color to greenโ€).
        2.  Press **Start** to launch the simulator. Use **Stop** to halt & reset.
        3.  SwarmChat translates your command, runs a safety check, and auto-builds a behaviour tree (BT).          

        > The BT XML is shown on the right so you can copy / save it for real robots.
        """
        )  
        with gr.Tabs():
            # Tab for microphone input
            with gr.Tab("Microphone Input"):
                gr.Markdown("## ๐ŸŽ™๏ธ Voice mode")
                gr.Markdown("""
                Use your microphone to record audio instructions for the swarm. The system translates them into a robot-executable BT.
                """)
                with gr.Row():
                    with gr.Column():
                        microphone_input = gr.Audio(sources=["microphone"], type="filepath", label="๐ŸŽ™๏ธ Record Audio")
                        safety_checkbox = gr.Checkbox(label="Turn off Safety Model")                        
                    with gr.Column():
                        output_text_audio = gr.Textbox(label="๐Ÿ“„ Translated Instructions to English" )
                        safty_check_audio = gr.Textbox(label="โœ… Safety Check")
                        
                
                translate_button_audio = gr.Button("Start")

                simulation_output = gr.Image(label="Live Stream", streaming=True, visible=False)
                stop_button = gr.Button("Stop")
                with gr.Row():
                    with gr.Column():
                        gr.Markdown(f"""**๐Ÿ›  The available behaviours so far.**\n{formatted_behaviors}\n\nThese are the only low-level actions/conditions the model is allowed to use yet.""")

                    with gr.Column():
                        generated_BT_audio = gr.Textbox(label="Generated behavior tree")
                
                translate_button_audio.click(
                    fn=speech_processing.translate_audio,
                    inputs=microphone_input,
                    outputs=output_text_audio
                ).then(
                    fn=safety_module.check_safety, 
                    inputs=[output_text_audio,safety_checkbox], 
                    outputs=safty_check_audio
                ).then(
                    fn=lambda x: x if x == "Safe" else stop_gradio_interface(),
                    inputs=safty_check_audio, 
                    outputs=None
                ).success(
                    fn=bt_generator.generate_behavior_tree,
                    # fn=test_LLM_generate_BT,
                    inputs=output_text_audio, 
                    outputs=generated_BT_audio
                ).success(                    
                    fn=on_translate_or_process,
                    outputs=simulation_output
                )


            stop_button.click(fn=on_stop,outputs=simulation_output)
            demo.load(fn=streamer.stream, outputs=simulation_output)

            # Tab for text input
            with gr.Tab("๐Ÿ“ Text Input"):
                gr.Markdown("## ๐Ÿ“ Text mode")
                gr.Markdown("""
                Enter text-based instructions for the swarm. The system translates them into a robot-executable BT.
                """)
                with gr.Row():
                    with gr.Column():
                        text_input = gr.Textbox(lines=4, placeholder="Enter your instructions here...", label="๐Ÿ“ Input Text")
                        safety_checkbox_text = gr.Checkbox(label="Turn off Safety Model")                        
                    with gr.Column():
                        output_text_text = gr.Textbox(label="๐Ÿ“„ Translated Instructions to English", lines=2)
                        safty_check_text = gr.Textbox(label="โœ… Safety Check")

                process_button_text = gr.Button("Start")

                simulation_output = gr.Image(label="Live Stream", streaming=True, visible=False)
                stop_button = gr.Button("Stop")
                with gr.Row():
                    with gr.Column():
                        gr.Markdown(f"""**๐Ÿ›  The available behaviours so far.**\n{formatted_behaviors}\n\nThese are the only low-level actions/conditions the model is allowed to use yet.""")

                    with gr.Column():
                        generated_BT_text = gr.Textbox(label="Generated behavior tree")

                process_button_text.click(
                    fn=text_processing.translate_text,
                    inputs=text_input,
                    outputs=output_text_text
                ).then(
                    fn=safety_module.check_safety, 
                    inputs=[output_text_text,safety_checkbox_text], 
                    outputs=safty_check_text
                ).then(
                    fn=lambda x: x if x == "Safe" else stop_gradio_interface(), 
                    inputs=safty_check_text, 
                    outputs=None
                ).success(
                    fn=bt_generator.generate_behavior_tree,
                    # fn=test_LLM_generate_BT,
                    inputs=output_text_text, 
                    outputs=generated_BT_text
                ).success(                    
                    fn=on_translate_or_process,
                    outputs=simulation_output
                )
                stop_button.click(fn=on_stop,outputs=simulation_output)
                demo.load(fn=streamer.stream, outputs=simulation_output)
    
    return demo

if __name__ == "__main__":
    demo = create_gradio_interface()
    try:
        demo.launch(server_port=7860, server_name="0.0.0.0", share=True)
    finally:
        streamer = GradioStreamer()
        streamer.stop_simulation()