|
import gradio as gr |
|
from huggingface_hub import HfApi |
|
|
|
def build_teleop_command( |
|
robot_type, |
|
robot_port, |
|
robot_id, |
|
cam_index, |
|
cam_width, |
|
cam_height, |
|
cam_fps, |
|
teleop_type, |
|
teleop_port, |
|
teleop_id, |
|
fps, |
|
teleop_duration, |
|
display_data, |
|
): |
|
cam_cfg = ( |
|
"{ front: {type: opencv, index_or_path: %d, width: %d, height: %d, fps: %d}}" |
|
% (cam_index, cam_width, cam_height, cam_fps) |
|
) |
|
|
|
cmd = [ |
|
"python -m lerobot.teleoperate", |
|
f"--robot.type={robot_type}", |
|
f"--robot.port={robot_port}", |
|
f"--robot.id={robot_id}", |
|
f"--robot.cameras=\"{cam_cfg}\"", |
|
f"--teleop.type={teleop_type}", |
|
f"--teleop.port={teleop_port}", |
|
f"--teleop.id={teleop_id}", |
|
f"--fps={fps}", |
|
] |
|
if teleop_duration: |
|
cmd.append(f"--teleop_time_s={teleop_duration}") |
|
cmd.append(f"--display_data={'true' if display_data else 'false'}") |
|
return " \\\n ".join(cmd) |
|
|
|
|
|
def build_record_command( |
|
robot_type, |
|
robot_port, |
|
robot_id, |
|
cam_index, |
|
cam_width, |
|
cam_height, |
|
cam_fps, |
|
teleop_type, |
|
teleop_port, |
|
teleop_id, |
|
display_data, |
|
dataset_repo, |
|
num_episodes, |
|
single_task, |
|
resume, |
|
push_to_hub, |
|
use_existing, |
|
existing_ds, |
|
): |
|
|
|
if use_existing and existing_ds: |
|
dataset_repo = existing_ds |
|
|
|
camera_cfg = ( |
|
"{ front: {type: opencv, index_or_path: %d, width: %d, height: %d, fps: %d}}" |
|
% (cam_index, cam_width, cam_height, cam_fps) |
|
) |
|
cmd = [ |
|
"python -m lerobot.record", |
|
f"--robot.type={robot_type}", |
|
f"--robot.port={robot_port}", |
|
f"--robot.id={robot_id}", |
|
f"--robot.cameras=\"{camera_cfg}\"", |
|
f"--teleop.type={teleop_type}", |
|
f"--teleop.port={teleop_port}", |
|
f"--teleop.id={teleop_id}", |
|
f"--display_data={'true' if display_data else 'false'}", |
|
f"--dataset.repo_id={dataset_repo}", |
|
f"--dataset.num_episodes={num_episodes}", |
|
f"--dataset.single_task=\"{single_task}\"", |
|
] |
|
cmd.append(f"--dataset.push_to_hub={'true' if push_to_hub else 'false'}") |
|
if resume: |
|
cmd.append("--resume=True") |
|
return " \\\n ".join(cmd) |
|
|
|
|
|
def build_train_command( |
|
policy_path, |
|
dataset_repo, |
|
batch_size, |
|
steps, |
|
output_dir, |
|
job_name, |
|
device, |
|
wandb_enable, |
|
policy_repo_id, |
|
): |
|
cmd = [ |
|
"python -m lerobot.scripts.train", |
|
f"--policy.path={policy_path}", |
|
f"--dataset.repo_id={dataset_repo}", |
|
f"--batch_size={batch_size}", |
|
f"--steps={steps}", |
|
f"--output_dir={output_dir}", |
|
f"--job_name={job_name}", |
|
f"--policy.device={device}", |
|
f"--wandb.enable={'true' if wandb_enable else 'false'}", |
|
f"--policy.repo_id={policy_repo_id}" if policy_repo_id else "", |
|
] |
|
|
|
cmd = [c for c in cmd if c] |
|
return " \\\n ".join(cmd) |
|
|
|
|
|
def build_eval_command( |
|
robot_type, |
|
robot_port, |
|
robot_id, |
|
cam_index, |
|
cam_width, |
|
cam_height, |
|
cam_fps, |
|
display_data, |
|
dataset_repo, |
|
num_episodes, |
|
single_task, |
|
policy_path, |
|
resume, |
|
): |
|
camera_cfg = ( |
|
"{ front: {type: opencv, index_or_path: %d, width: %d, height: %d, fps: %d}}" |
|
% (cam_index, cam_width, cam_height, cam_fps) |
|
) |
|
|
|
cmd = [ |
|
"python -m lerobot.record", |
|
f"--robot.type={robot_type}", |
|
f"--robot.port={robot_port}", |
|
f"--robot.id={robot_id}", |
|
f"--robot.cameras=\"{camera_cfg}\"", |
|
f"--display_data={'true' if display_data else 'false'}", |
|
f"--dataset.repo_id={dataset_repo}", |
|
f"--dataset.num_episodes={num_episodes}", |
|
f"--dataset.single_task=\"{single_task}\"", |
|
f"--policy.path={policy_path}", |
|
] |
|
if resume: |
|
cmd.append("--resume=True") |
|
return " \\\n ".join(cmd) |
|
|
|
|
|
|
|
def _list_remote_datasets(username: str): |
|
try: |
|
api = HfApi() |
|
datasets = api.list_datasets(author=username) |
|
return sorted([d.id for d in datasets]) |
|
except Exception: |
|
return [] |
|
|
|
|
|
def build_ui(): |
|
with gr.Blocks(title="Lerobot Scripts Controller (Generate Only)") as demo: |
|
hf_username_tb = gr.Textbox(label="HF Username", value="arpitg1304") |
|
with gr.Tabs(): |
|
|
|
with gr.TabItem("Teleoperate Robot"): |
|
gr.Markdown("### Teleoperate robot with camera") |
|
with gr.Row(): |
|
robot_type = gr.Textbox(label="Robot Type", value="so101_follower") |
|
robot_port = gr.Textbox(label="Robot Port", value="/dev/ttyACM0") |
|
robot_id = gr.Textbox(label="Robot ID", value="follower") |
|
with gr.Row(): |
|
cam_index = gr.Number(label="Cam Index", value=0, precision=0) |
|
cam_width = gr.Number(label="Width", value=640, precision=0) |
|
cam_height = gr.Number(label="Height", value=480, precision=0) |
|
cam_fps = gr.Number(label="FPS", value=30, precision=0) |
|
with gr.Row(): |
|
teleop_type = gr.Textbox(label="Teleop Type", value="so101_leader") |
|
teleop_port = gr.Textbox(label="Teleop Port", value="/dev/ttyACM1") |
|
teleop_id = gr.Textbox(label="Teleop ID", value="leader") |
|
with gr.Row(): |
|
fps = gr.Number(label="Loop FPS", value=60, precision=0) |
|
teleop_duration = gr.Number(label="Duration (s)", value=60, precision=0) |
|
display_data = gr.Checkbox(label="Display Data", value=True) |
|
teleop_cmd = gr.Textbox(label="Generated Command", interactive=False, lines=16) |
|
|
|
inputs_teleop = [ |
|
robot_type, |
|
robot_port, |
|
robot_id, |
|
cam_index, |
|
cam_width, |
|
cam_height, |
|
cam_fps, |
|
teleop_type, |
|
teleop_port, |
|
teleop_id, |
|
fps, |
|
teleop_duration, |
|
display_data, |
|
] |
|
gr.Button("Generate Command").click(build_teleop_command, inputs_teleop, outputs=teleop_cmd) |
|
|
|
|
|
with gr.TabItem("Record Data"): |
|
gr.Markdown("### Record episodes with policy") |
|
with gr.Row(): |
|
robot_type2 = gr.Textbox(label="Robot Type", value="so101_follower") |
|
robot_port2 = gr.Textbox(label="Robot Port", value="/dev/ttyACM0") |
|
robot_id2 = gr.Textbox(label="Robot ID", value="follower") |
|
with gr.Row(): |
|
cam_index2 = gr.Number(label="Cam Index", value=0, precision=0) |
|
cam_width2 = gr.Number(label="Width", value=640, precision=0) |
|
cam_height2 = gr.Number(label="Height", value=480, precision=0) |
|
cam_fps2 = gr.Number(label="FPS", value=30, precision=0) |
|
with gr.Row(): |
|
teleop_type_r = gr.Textbox(label="Teleop Type", value="so101_leader") |
|
teleop_port_r = gr.Textbox(label="Teleop Port", value="/dev/ttyACM1") |
|
teleop_id_r = gr.Textbox(label="Teleop ID", value="leader") |
|
with gr.Row(): |
|
display_data2 = gr.Checkbox(label="Display Data", value=True) |
|
dataset_repo = gr.Textbox(label="Dataset Repo", value="") |
|
num_episodes = gr.Number(label="Num Episodes", value=2, precision=0) |
|
single_task = gr.Textbox(label="Single Task", value="Grab the cylinder") |
|
resume_chk = gr.Checkbox(label="Resume", value=False) |
|
push_hub_chk = gr.Checkbox(label="Push to Hub", value=False) |
|
with gr.Row(): |
|
use_existing = gr.Checkbox(label="Use Existing Dataset", value=False) |
|
existing_dd = gr.Dropdown(label="User Datasets", choices=_list_remote_datasets(hf_username_tb.value), visible=False) |
|
|
|
use_existing.change(lambda f: gr.update(visible=f), inputs=use_existing, outputs=existing_dd) |
|
|
|
|
|
def _update_ds_choices(username): |
|
return gr.update(choices=_list_remote_datasets(username)) |
|
|
|
hf_username_tb.change(_update_ds_choices, inputs=hf_username_tb, outputs=existing_dd) |
|
|
|
record_cmd = gr.Textbox(label="Generated Command", interactive=False, lines=16) |
|
inputs_rec = [ |
|
robot_type2, |
|
robot_port2, |
|
robot_id2, |
|
cam_index2, |
|
cam_width2, |
|
cam_height2, |
|
cam_fps2, |
|
teleop_type_r, |
|
teleop_port_r, |
|
teleop_id_r, |
|
display_data2, |
|
dataset_repo, |
|
num_episodes, |
|
single_task, |
|
resume_chk, |
|
push_hub_chk, |
|
use_existing, |
|
existing_dd, |
|
] |
|
gr.Button("Generate Command").click(build_record_command, inputs_rec, record_cmd) |
|
|
|
|
|
with gr.TabItem("Train Policy"): |
|
gr.Markdown("### Train Policy") |
|
|
|
policy_path_t = gr.Textbox(label="Base Policy Path", value="lerobot/smolvla_base") |
|
|
|
|
|
with gr.Row(): |
|
dataset_repo_t = gr.Dropdown( |
|
label="User Dataset", |
|
choices=_list_remote_datasets(hf_username_tb.value), |
|
scale=4, |
|
) |
|
device_t = gr.Dropdown(label="Device", choices=["cpu", "cuda"], value="cuda", scale=1) |
|
wandb_chk = gr.Checkbox(label="W&B", value=True, scale=1) |
|
|
|
|
|
with gr.Row(): |
|
batch_size_t = gr.Number(label="Batch Size", value=16, precision=0, scale=1) |
|
steps_t = gr.Number(label="Steps", value=20000, precision=0, scale=1) |
|
|
|
|
|
output_dir_t = gr.Textbox(label="Output Dir", value="outputs/train/my_smolvla_1") |
|
|
|
|
|
with gr.Row(): |
|
job_name_t = gr.Textbox(label="Job Name", value="smolvla_place_cylinder", scale=1) |
|
policy_repo_t = gr.Textbox(label="Policy Repo ID (optional)", value="", scale=1) |
|
|
|
|
|
hf_username_tb.change(_update_ds_choices, inputs=hf_username_tb, outputs=dataset_repo_t) |
|
|
|
train_cmd = gr.Textbox(label="Generated Command", interactive=False, lines=16) |
|
gr.Button("Generate Command").click( |
|
build_train_command, |
|
[ |
|
policy_path_t, |
|
dataset_repo_t, |
|
batch_size_t, |
|
steps_t, |
|
output_dir_t, |
|
job_name_t, |
|
device_t, |
|
wandb_chk, |
|
policy_repo_t, |
|
], |
|
train_cmd, |
|
) |
|
|
|
|
|
with gr.TabItem("Evaluate Policy"): |
|
gr.Markdown("### Evaluate Policy") |
|
with gr.Row(): |
|
robot_type_e = gr.Textbox(label="Robot Type", value="so101_follower") |
|
robot_port_e = gr.Textbox(label="Robot Port", value="/dev/ttyACM0") |
|
robot_id_e = gr.Textbox(label="Robot ID", value="follower") |
|
with gr.Row(): |
|
cam_index_e = gr.Number(label="Cam Index", value=0, precision=0) |
|
cam_width_e = gr.Number(label="Width", value=640, precision=0) |
|
cam_height_e = gr.Number(label="Height", value=480, precision=0) |
|
cam_fps_e = gr.Number(label="FPS", value=30, precision=0) |
|
with gr.Row(): |
|
display_data_e = gr.Checkbox(label="Display Data", value=True) |
|
dataset_repo_e = gr.Dropdown(label="User Dataset", choices=_list_remote_datasets(hf_username_tb.value)) |
|
num_episodes_e = gr.Number(label="Num Episodes", value=2, precision=0) |
|
single_task_e = gr.Textbox(label="Single Task", value="place cylinder") |
|
with gr.Row(): |
|
policy_path_e = gr.Textbox(label="Policy Path", value="arpitg1304/smolvla_place_cylinder") |
|
resume_e = gr.Checkbox(label="Resume", value=True) |
|
eval_cmd = gr.Textbox(label="Generated Command", interactive=False, lines=16) |
|
|
|
hf_username_tb.change(_update_ds_choices, inputs=hf_username_tb, outputs=dataset_repo_e) |
|
|
|
inputs_eval = [ |
|
robot_type_e, |
|
robot_port_e, |
|
robot_id_e, |
|
cam_index_e, |
|
cam_width_e, |
|
cam_height_e, |
|
cam_fps_e, |
|
display_data_e, |
|
dataset_repo_e, |
|
num_episodes_e, |
|
single_task_e, |
|
policy_path_e, |
|
resume_e, |
|
] |
|
gr.Button("Generate Command").click( |
|
build_eval_command, |
|
inputs_eval, |
|
eval_cmd, |
|
) |
|
|
|
return demo |
|
|
|
|
|
if __name__ == "__main__": |
|
build_ui().launch() |