Spaces:
Sleeping
Sleeping
import gradio as gr | |
from pygame import Vector2 | |
import time | |
import threading | |
import queue | |
from simulator_env import StreamableSimulation, SwarmAgent, MyConfig, MyWindow | |
import speech_processing | |
import text_processing | |
import safety_module | |
import bt_generator | |
from pathlib import Path | |
BASE = Path(__file__).parent | |
class GradioStreamer: | |
_instance = None | |
def __new__(cls): | |
if cls._instance is None: | |
cls._instance = super(GradioStreamer, cls).__new__(cls) | |
cls._instance.initialized = False | |
return cls._instance | |
def __init__(self): | |
if not self.initialized: | |
self.latest_frame = None | |
self.running = True | |
self.sim = None | |
self.sim_thread = None | |
self.initialized = True | |
self.quit = False | |
def update_frame(self, frame): | |
self.latest_frame = frame | |
def run_simulation(self): | |
# Instantiate simulation and agents here: | |
nest_pos = Vector2(450, 400) | |
target_pos = Vector2(300, 200) | |
agent_images = ["white.png", "green.png", "red circle.png"] | |
image_paths = [str(BASE / "images" / fname) for fname in agent_images] | |
# agent_images_paths = ["./images/white.png", "./images/green.png", "./images/red circle.png"] | |
config = MyConfig(radius=25, visualise_chunks=True, movement_speed=2.0) | |
self.sim = StreamableSimulation(config=config) | |
loaded_agent_images = self.sim._load_image(image_paths) | |
# loaded_agent_images = self.sim._load_image(agent_images_paths) | |
# Create agents (each agent builds its own BT in its __init__) | |
for _ in range(50): | |
agents_pos = Vector2(450, 400) | |
agent = SwarmAgent( | |
images=loaded_agent_images, | |
simulation=self.sim, | |
pos=agents_pos, | |
nest_pos=nest_pos, | |
target_pos=target_pos | |
) | |
self.sim._agents.add(agent) | |
self.sim._all.add(agent) | |
# (Optionally spawn obstacles and sites.) | |
self.sim.spawn_obstacle(str(BASE / "images" / "rect_obst.png"), 350, 50) | |
self.sim.spawn_obstacle(str(BASE / "images" / "rect_obst (1).png"), 100, 350) | |
self.sim.spawn_site(str(BASE / "images" / "rect.png"), 300, 200) | |
self.sim.spawn_site(str(BASE / "images" / "nest.png"), 450, 400) | |
start_time = time.time() # Record the start time | |
while self.running: | |
self.sim.tick() | |
if not self.sim.frame_queue.empty(): | |
frame = self.sim.frame_queue.get() | |
self.update_frame(frame) | |
time.sleep(1/10) # Maintain a frame rate of ~30 FPS | |
# Stop after 2 minute | |
if time.time() - start_time >= 120: | |
print("Simulation stopped after 1 minute.") | |
break | |
def stream(self): | |
while True: | |
if self.sim is not None and self.latest_frame is not None: | |
yield self.latest_frame | |
else: | |
# Optionally, yield a blank image or None. | |
yield None | |
time.sleep(1/30) | |
def start_simulation(self): | |
"""Start the simulation, creating a new thread if necessary.""" | |
if not self.sim_thread or not self.sim_thread.is_alive(): | |
self.running = True | |
self.quit = False | |
self.latest_frame = None | |
self.sim_thread = threading.Thread(target=self.run_simulation, daemon=True) | |
self.sim_thread.start() | |
def clear_frame_queue(self): | |
if self.sim: | |
try: | |
while True: | |
self.sim.frame_queue.get_nowait() | |
except queue.Empty: | |
pass | |
def stop_simulation(self): | |
print("Stopping Simulation...") | |
self.running = False | |
self.quit = True | |
if self.sim: | |
for agent in self.sim._agents: | |
agent.bt_active = False | |
self.sim.running = False | |
self.sim.stop() | |
self.clear_frame_queue() | |
self.sim = None | |
if self.sim_thread and self.sim_thread.is_alive(): | |
self.sim_thread.join(timeout=2) | |
print("Simulation thread terminated.") | |
self.latest_frame = None # Clear the displayed frame | |
print("Simulation stopped successfully.") | |
def stop_gradio_interface(): | |
raise Exception("Simulation stopped!") | |
def create_gradio_interface(): | |
streamer = GradioStreamer() | |
def on_translate_or_process(): | |
streamer.start_simulation() | |
return gr.update(visible=True) | |
def on_stop(): | |
print("Simulation on_stop") | |
streamer.stop_simulation() | |
return gr.update(visible=False) | |
behaviors = bt_generator.call_behaviors() | |
formatted_behaviors = "\n".join( | |
f"- **{name}**: {doc.split('Returns:')[0].strip()}" | |
for name, doc in sorted( | |
behaviors.items(), | |
key=lambda item: item[0].lower() | |
) | |
) | |
# Gradio Interface | |
with gr.Blocks() as demo: | |
gr.Markdown( | |
""" | |
# π **SwarmChat:** Enabling HumanβSwarm Interaction and Robot Control via Natural Language | |
Easily talk to virtual robots, and see the result live. | |
""" | |
) | |
gr.Markdown( | |
""" | |
**How it works** | |
1. Speak or type a task in *any EU language* (e.g. βFind Food, then change color to greenβ). | |
2. Press **Start** to launch the simulator. Use **Stop** to halt & reset. | |
3. SwarmChat translates your command, runs a safety check, and auto-builds a behaviour tree (BT). | |
> The BT XML is shown on the right so you can copy / save it for real robots. | |
""" | |
) | |
with gr.Tabs(): | |
# Tab for microphone input | |
with gr.Tab("Microphone Input"): | |
gr.Markdown("## ποΈ Voice mode") | |
gr.Markdown(""" | |
Use your microphone to record audio instructions for the swarm. The system translates them into a robot-executable BT. | |
""") | |
with gr.Row(): | |
with gr.Column(): | |
microphone_input = gr.Audio(sources=["microphone"], type="filepath", label="ποΈ Record Audio") | |
safety_checkbox = gr.Checkbox(label="Turn off Safety Model") | |
with gr.Column(): | |
output_text_audio = gr.Textbox(label="π Translated Instructions to English" ) | |
safty_check_audio = gr.Textbox(label="β Safety Check") | |
translate_button_audio = gr.Button("Start") | |
simulation_output = gr.Image(label="Live Stream", streaming=True, visible=False) | |
stop_button = gr.Button("Stop") | |
with gr.Row(): | |
with gr.Column(): | |
gr.Markdown(f"""**π The available behaviours so far.**\n{formatted_behaviors}\n\nThese are the only low-level actions/conditions the model is allowed to use yet.""") | |
with gr.Column(): | |
generated_BT_audio = gr.Textbox(label="Generated behavior tree") | |
translate_button_audio.click( | |
fn=speech_processing.translate_audio, | |
inputs=microphone_input, | |
outputs=output_text_audio | |
).then( | |
fn=safety_module.check_safety, | |
inputs=[output_text_audio,safety_checkbox], | |
outputs=safty_check_audio | |
).then( | |
fn=lambda x: x if x == "Safe" else stop_gradio_interface(), | |
inputs=safty_check_audio, | |
outputs=None | |
).success( | |
fn=bt_generator.generate_behavior_tree, | |
# fn=test_LLM_generate_BT, | |
inputs=output_text_audio, | |
outputs=generated_BT_audio | |
).success( | |
fn=on_translate_or_process, | |
outputs=simulation_output | |
) | |
stop_button.click(fn=on_stop,outputs=simulation_output) | |
demo.load(fn=streamer.stream, outputs=simulation_output) | |
# Tab for text input | |
with gr.Tab("π Text Input"): | |
gr.Markdown("## π Text mode") | |
gr.Markdown(""" | |
Enter text-based instructions for the swarm. The system translates them into a robot-executable BT. | |
""") | |
with gr.Row(): | |
with gr.Column(): | |
text_input = gr.Textbox(lines=4, placeholder="Enter your instructions here...", label="π Input Text") | |
safety_checkbox_text = gr.Checkbox(label="Turn off Safety Model") | |
with gr.Column(): | |
output_text_text = gr.Textbox(label="π Translated Instructions to English", lines=2) | |
safty_check_text = gr.Textbox(label="β Safety Check") | |
process_button_text = gr.Button("Start") | |
simulation_output = gr.Image(label="Live Stream", streaming=True, visible=False) | |
stop_button = gr.Button("Stop") | |
with gr.Row(): | |
with gr.Column(): | |
gr.Markdown(f"""**π The available behaviours so far.**\n{formatted_behaviors}\n\nThese are the only low-level actions/conditions the model is allowed to use yet.""") | |
with gr.Column(): | |
generated_BT_text = gr.Textbox(label="Generated behavior tree") | |
process_button_text.click( | |
fn=text_processing.translate_text, | |
inputs=text_input, | |
outputs=output_text_text | |
).then( | |
fn=safety_module.check_safety, | |
inputs=[output_text_text,safety_checkbox_text], | |
outputs=safty_check_text | |
).then( | |
fn=lambda x: x if x == "Safe" else stop_gradio_interface(), | |
inputs=safty_check_text, | |
outputs=None | |
).success( | |
fn=bt_generator.generate_behavior_tree, | |
# fn=test_LLM_generate_BT, | |
inputs=output_text_text, | |
outputs=generated_BT_text | |
).success( | |
fn=on_translate_or_process, | |
outputs=simulation_output | |
) | |
stop_button.click(fn=on_stop,outputs=simulation_output) | |
demo.load(fn=streamer.stream, outputs=simulation_output) | |
return demo | |
if __name__ == "__main__": | |
demo = create_gradio_interface() | |
try: | |
demo.launch(server_port=7860, server_name="0.0.0.0", share=True) | |
finally: | |
streamer = GradioStreamer() | |
streamer.stop_simulation() | |