import gradio as gr from pygame import Vector2 import time import threading import queue from simulator_env import StreamableSimulation, SwarmAgent, MyConfig, MyWindow import speech_processing import text_processing import safety_module import bt_generator from pathlib import Path BASE = Path(__file__).parent class GradioStreamer: _instance = None def __new__(cls): if cls._instance is None: cls._instance = super(GradioStreamer, cls).__new__(cls) cls._instance.initialized = False return cls._instance def __init__(self): if not self.initialized: self.latest_frame = None self.running = True self.sim = None self.sim_thread = None self.initialized = True self.quit = False def update_frame(self, frame): self.latest_frame = frame def run_simulation(self): # Instantiate simulation and agents here: nest_pos = Vector2(450, 400) target_pos = Vector2(300, 200) agent_images = ["white.png", "green.png", "red circle.png"] image_paths = [str(BASE / "images" / fname) for fname in agent_images] # agent_images_paths = ["./images/white.png", "./images/green.png", "./images/red circle.png"] config = MyConfig(radius=25, visualise_chunks=True, movement_speed=2.0) self.sim = StreamableSimulation(config=config) loaded_agent_images = self.sim._load_image(image_paths) # loaded_agent_images = self.sim._load_image(agent_images_paths) # Create agents (each agent builds its own BT in its __init__) for _ in range(50): agents_pos = Vector2(450, 400) agent = SwarmAgent( images=loaded_agent_images, simulation=self.sim, pos=agents_pos, nest_pos=nest_pos, target_pos=target_pos ) self.sim._agents.add(agent) self.sim._all.add(agent) # (Optionally spawn obstacles and sites.) self.sim.spawn_obstacle(str(BASE / "images" / "rect_obst.png"), 350, 50) self.sim.spawn_obstacle(str(BASE / "images" / "rect_obst (1).png"), 100, 350) self.sim.spawn_site(str(BASE / "images" / "rect.png"), 300, 200) self.sim.spawn_site(str(BASE / "images" / "nest.png"), 450, 400) start_time = time.time() # Record the start time while self.running: self.sim.tick() if not self.sim.frame_queue.empty(): frame = self.sim.frame_queue.get() self.update_frame(frame) time.sleep(1/10) # Maintain a frame rate of ~30 FPS # Stop after 2 minute if time.time() - start_time >= 120: print("Simulation stopped after 1 minute.") break def stream(self): while True: if self.sim is not None and self.latest_frame is not None: yield self.latest_frame else: # Optionally, yield a blank image or None. yield None time.sleep(1/30) def start_simulation(self): """Start the simulation, creating a new thread if necessary.""" if not self.sim_thread or not self.sim_thread.is_alive(): self.running = True self.quit = False self.latest_frame = None self.sim_thread = threading.Thread(target=self.run_simulation, daemon=True) self.sim_thread.start() def clear_frame_queue(self): if self.sim: try: while True: self.sim.frame_queue.get_nowait() except queue.Empty: pass def stop_simulation(self): print("Stopping Simulation...") self.running = False self.quit = True if self.sim: for agent in self.sim._agents: agent.bt_active = False self.sim.running = False self.sim.stop() self.clear_frame_queue() self.sim = None if self.sim_thread and self.sim_thread.is_alive(): self.sim_thread.join(timeout=2) print("Simulation thread terminated.") self.latest_frame = None # Clear the displayed frame print("Simulation stopped successfully.") def stop_gradio_interface(): raise Exception("Simulation stopped!") def create_gradio_interface(): streamer = GradioStreamer() def on_translate_or_process(): streamer.start_simulation() return gr.update(visible=True) def on_stop(): print("Simulation on_stop") streamer.stop_simulation() return gr.update(visible=False) behaviors = bt_generator.call_behaviors() formatted_behaviors = "\n".join( f"- **{name}**: {doc.split('Returns:')[0].strip()}" for name, doc in sorted( behaviors.items(), key=lambda item: item[0].lower() ) ) # Gradio Interface with gr.Blocks() as demo: gr.Markdown( """ # 🐝 **SwarmChat:** Enabling Human–Swarm Interaction and Robot Control via Natural Language Easily talk to virtual robots, and see the result live. """ ) gr.Markdown( """ **How it works** 1. Speak or type a task in *any EU language* (e.g. β€œFind Food, then change color to green”). 2. Press **Start** to launch the simulator. Use **Stop** to halt & reset. 3. SwarmChat translates your command, runs a safety check, and auto-builds a behaviour tree (BT). > The BT XML is shown on the right so you can copy / save it for real robots. """ ) with gr.Tabs(): # Tab for microphone input with gr.Tab("Microphone Input"): gr.Markdown("## πŸŽ™οΈ Voice mode") gr.Markdown(""" Use your microphone to record audio instructions for the swarm. The system translates them into a robot-executable BT. """) with gr.Row(): with gr.Column(): microphone_input = gr.Audio(sources=["microphone"], type="filepath", label="πŸŽ™οΈ Record Audio") safety_checkbox = gr.Checkbox(label="Turn off Safety Model") with gr.Column(): output_text_audio = gr.Textbox(label="πŸ“„ Translated Instructions to English" ) safty_check_audio = gr.Textbox(label="βœ… Safety Check") translate_button_audio = gr.Button("Start") simulation_output = gr.Image(label="Live Stream", streaming=True, visible=False) stop_button = gr.Button("Stop") with gr.Row(): with gr.Column(): gr.Markdown(f"""**πŸ›  The available behaviours so far.**\n{formatted_behaviors}\n\nThese are the only low-level actions/conditions the model is allowed to use yet.""") with gr.Column(): generated_BT_audio = gr.Textbox(label="Generated behavior tree") translate_button_audio.click( fn=speech_processing.translate_audio, inputs=microphone_input, outputs=output_text_audio ).then( fn=safety_module.check_safety, inputs=[output_text_audio,safety_checkbox], outputs=safty_check_audio ).then( fn=lambda x: x if x == "Safe" else stop_gradio_interface(), inputs=safty_check_audio, outputs=None ).success( fn=bt_generator.generate_behavior_tree, # fn=test_LLM_generate_BT, inputs=output_text_audio, outputs=generated_BT_audio ).success( fn=on_translate_or_process, outputs=simulation_output ) stop_button.click(fn=on_stop,outputs=simulation_output) demo.load(fn=streamer.stream, outputs=simulation_output) # Tab for text input with gr.Tab("πŸ“ Text Input"): gr.Markdown("## πŸ“ Text mode") gr.Markdown(""" Enter text-based instructions for the swarm. The system translates them into a robot-executable BT. """) with gr.Row(): with gr.Column(): text_input = gr.Textbox(lines=4, placeholder="Enter your instructions here...", label="πŸ“ Input Text") safety_checkbox_text = gr.Checkbox(label="Turn off Safety Model") with gr.Column(): output_text_text = gr.Textbox(label="πŸ“„ Translated Instructions to English", lines=2) safty_check_text = gr.Textbox(label="βœ… Safety Check") process_button_text = gr.Button("Start") simulation_output = gr.Image(label="Live Stream", streaming=True, visible=False) stop_button = gr.Button("Stop") with gr.Row(): with gr.Column(): gr.Markdown(f"""**πŸ›  The available behaviours so far.**\n{formatted_behaviors}\n\nThese are the only low-level actions/conditions the model is allowed to use yet.""") with gr.Column(): generated_BT_text = gr.Textbox(label="Generated behavior tree") process_button_text.click( fn=text_processing.translate_text, inputs=text_input, outputs=output_text_text ).then( fn=safety_module.check_safety, inputs=[output_text_text,safety_checkbox_text], outputs=safty_check_text ).then( fn=lambda x: x if x == "Safe" else stop_gradio_interface(), inputs=safty_check_text, outputs=None ).success( fn=bt_generator.generate_behavior_tree, # fn=test_LLM_generate_BT, inputs=output_text_text, outputs=generated_BT_text ).success( fn=on_translate_or_process, outputs=simulation_output ) stop_button.click(fn=on_stop,outputs=simulation_output) demo.load(fn=streamer.stream, outputs=simulation_output) return demo if __name__ == "__main__": demo = create_gradio_interface() try: demo.launch(server_port=7860, server_name="0.0.0.0", share=True) finally: streamer = GradioStreamer() streamer.stop_simulation()