import gradio as gr from huggingface_hub import HfApi def build_teleop_command( robot_type, robot_port, robot_id, cam_index, cam_width, cam_height, cam_fps, teleop_type, teleop_port, teleop_id, fps, teleop_duration, display_data, ): cam_cfg = ( "{ front: {type: opencv, index_or_path: %d, width: %d, height: %d, fps: %d}}" % (cam_index, cam_width, cam_height, cam_fps) ) cmd = [ "python -m lerobot.teleoperate", f"--robot.type={robot_type}", f"--robot.port={robot_port}", f"--robot.id={robot_id}", f"--robot.cameras=\"{cam_cfg}\"", f"--teleop.type={teleop_type}", f"--teleop.port={teleop_port}", f"--teleop.id={teleop_id}", f"--fps={fps}", ] if teleop_duration: cmd.append(f"--teleop_time_s={teleop_duration}") cmd.append(f"--display_data={'true' if display_data else 'false'}") return " \\\n ".join(cmd) def build_record_command( robot_type, robot_port, robot_id, cam_index, cam_width, cam_height, cam_fps, teleop_type, teleop_port, teleop_id, display_data, dataset_repo, num_episodes, single_task, resume, push_to_hub, use_existing, existing_ds, ): # if using existing dataset, override dataset_repo if use_existing and existing_ds: dataset_repo = existing_ds camera_cfg = ( "{ front: {type: opencv, index_or_path: %d, width: %d, height: %d, fps: %d}}" % (cam_index, cam_width, cam_height, cam_fps) ) cmd = [ "python -m lerobot.record", f"--robot.type={robot_type}", f"--robot.port={robot_port}", f"--robot.id={robot_id}", f"--robot.cameras=\"{camera_cfg}\"", f"--teleop.type={teleop_type}", f"--teleop.port={teleop_port}", f"--teleop.id={teleop_id}", f"--display_data={'true' if display_data else 'false'}", f"--dataset.repo_id={dataset_repo}", f"--dataset.num_episodes={num_episodes}", f"--dataset.single_task=\"{single_task}\"", ] cmd.append(f"--dataset.push_to_hub={'true' if push_to_hub else 'false'}") if resume: cmd.append("--resume=True") return " \\\n ".join(cmd) def build_train_command( policy_path, dataset_repo, batch_size, steps, output_dir, job_name, device, wandb_enable, policy_repo_id, ): cmd = [ "python -m lerobot.scripts.train", f"--policy.path={policy_path}", f"--dataset.repo_id={dataset_repo}", f"--batch_size={batch_size}", f"--steps={steps}", f"--output_dir={output_dir}", f"--job_name={job_name}", f"--policy.device={device}", f"--wandb.enable={'true' if wandb_enable else 'false'}", f"--policy.repo_id={policy_repo_id}" if policy_repo_id else "", ] # filter empty strings cmd = [c for c in cmd if c] return " \\\n ".join(cmd) def build_eval_command( robot_type, robot_port, robot_id, cam_index, cam_width, cam_height, cam_fps, display_data, dataset_repo, num_episodes, single_task, policy_path, resume, ): camera_cfg = ( "{ front: {type: opencv, index_or_path: %d, width: %d, height: %d, fps: %d}}" % (cam_index, cam_width, cam_height, cam_fps) ) cmd = [ "python -m lerobot.record", f"--robot.type={robot_type}", f"--robot.port={robot_port}", f"--robot.id={robot_id}", f"--robot.cameras=\"{camera_cfg}\"", f"--display_data={'true' if display_data else 'false'}", f"--dataset.repo_id={dataset_repo}", f"--dataset.num_episodes={num_episodes}", f"--dataset.single_task=\"{single_task}\"", f"--policy.path={policy_path}", ] if resume: cmd.append("--resume=True") return " \\\n ".join(cmd) # Helper to list datasets on Hugging Face for given username def _list_remote_datasets(username: str): try: api = HfApi() datasets = api.list_datasets(author=username) return sorted([d.id for d in datasets]) except Exception: return [] def build_ui(): with gr.Blocks(title="Lerobot Scripts Controller (Generate Only)") as demo: hf_username_tb = gr.Textbox(label="HF Username", value="arpitg1304") with gr.Tabs(): # Teleoperate Tab with gr.TabItem("Teleoperate Robot"): gr.Markdown("### Teleoperate robot with camera") with gr.Row(): robot_type = gr.Textbox(label="Robot Type", value="so101_follower") robot_port = gr.Textbox(label="Robot Port", value="/dev/ttyACM0") robot_id = gr.Textbox(label="Robot ID", value="follower") with gr.Row(): cam_index = gr.Number(label="Cam Index", value=0, precision=0) cam_width = gr.Number(label="Width", value=640, precision=0) cam_height = gr.Number(label="Height", value=480, precision=0) cam_fps = gr.Number(label="FPS", value=30, precision=0) with gr.Row(): teleop_type = gr.Textbox(label="Teleop Type", value="so101_leader") teleop_port = gr.Textbox(label="Teleop Port", value="/dev/ttyACM1") teleop_id = gr.Textbox(label="Teleop ID", value="leader") with gr.Row(): fps = gr.Number(label="Loop FPS", value=60, precision=0) teleop_duration = gr.Number(label="Duration (s)", value=60, precision=0) display_data = gr.Checkbox(label="Display Data", value=True) teleop_cmd = gr.Textbox(label="Generated Command", interactive=False, lines=16) inputs_teleop = [ robot_type, robot_port, robot_id, cam_index, cam_width, cam_height, cam_fps, teleop_type, teleop_port, teleop_id, fps, teleop_duration, display_data, ] gr.Button("Generate Command").click(build_teleop_command, inputs_teleop, outputs=teleop_cmd) # Record Data Tab with gr.TabItem("Record Data"): gr.Markdown("### Record episodes with policy") with gr.Row(): robot_type2 = gr.Textbox(label="Robot Type", value="so101_follower") robot_port2 = gr.Textbox(label="Robot Port", value="/dev/ttyACM0") robot_id2 = gr.Textbox(label="Robot ID", value="follower") with gr.Row(): cam_index2 = gr.Number(label="Cam Index", value=0, precision=0) cam_width2 = gr.Number(label="Width", value=640, precision=0) cam_height2 = gr.Number(label="Height", value=480, precision=0) cam_fps2 = gr.Number(label="FPS", value=30, precision=0) with gr.Row(): teleop_type_r = gr.Textbox(label="Teleop Type", value="so101_leader") teleop_port_r = gr.Textbox(label="Teleop Port", value="/dev/ttyACM1") teleop_id_r = gr.Textbox(label="Teleop ID", value="leader") with gr.Row(): display_data2 = gr.Checkbox(label="Display Data", value=True) dataset_repo = gr.Textbox(label="Dataset Repo", value="") num_episodes = gr.Number(label="Num Episodes", value=2, precision=0) single_task = gr.Textbox(label="Single Task", value="Grab the cylinder") resume_chk = gr.Checkbox(label="Resume", value=False) push_hub_chk = gr.Checkbox(label="Push to Hub", value=False) with gr.Row(): use_existing = gr.Checkbox(label="Use Existing Dataset", value=False) existing_dd = gr.Dropdown(label="User Datasets", choices=_list_remote_datasets(hf_username_tb.value), visible=False) # Toggle dropdown visibility use_existing.change(lambda f: gr.update(visible=f), inputs=use_existing, outputs=existing_dd) # Update dataset choices when username changes def _update_ds_choices(username): return gr.update(choices=_list_remote_datasets(username)) hf_username_tb.change(_update_ds_choices, inputs=hf_username_tb, outputs=existing_dd) record_cmd = gr.Textbox(label="Generated Command", interactive=False, lines=16) inputs_rec = [ robot_type2, robot_port2, robot_id2, cam_index2, cam_width2, cam_height2, cam_fps2, teleop_type_r, teleop_port_r, teleop_id_r, display_data2, dataset_repo, num_episodes, single_task, resume_chk, push_hub_chk, use_existing, existing_dd, ] gr.Button("Generate Command").click(build_record_command, inputs_rec, record_cmd) # Train Policy Tab with gr.TabItem("Train Policy"): gr.Markdown("### Train Policy") # Row 1: Policy path (full width) policy_path_t = gr.Textbox(label="Base Policy Path", value="lerobot/smolvla_base") # Row 2: Dataset + Device + WandB enable with gr.Row(): dataset_repo_t = gr.Dropdown( label="User Dataset", choices=_list_remote_datasets(hf_username_tb.value), scale=4, ) device_t = gr.Dropdown(label="Device", choices=["cpu", "cuda"], value="cuda", scale=1) wandb_chk = gr.Checkbox(label="W&B", value=True, scale=1) # Row 3: Batch size & Steps with gr.Row(): batch_size_t = gr.Number(label="Batch Size", value=16, precision=0, scale=1) steps_t = gr.Number(label="Steps", value=20000, precision=0, scale=1) # Row 4: Output dir (full width) output_dir_t = gr.Textbox(label="Output Dir", value="outputs/train/my_smolvla_1") # Row 5: Job name & Policy repo id with gr.Row(): job_name_t = gr.Textbox(label="Job Name", value="smolvla_place_cylinder", scale=1) policy_repo_t = gr.Textbox(label="Policy Repo ID (optional)", value="", scale=1) # Update train dataset dropdown when username changes hf_username_tb.change(_update_ds_choices, inputs=hf_username_tb, outputs=dataset_repo_t) train_cmd = gr.Textbox(label="Generated Command", interactive=False, lines=16) gr.Button("Generate Command").click( build_train_command, [ policy_path_t, dataset_repo_t, batch_size_t, steps_t, output_dir_t, job_name_t, device_t, wandb_chk, policy_repo_t, ], train_cmd, ) # Evaluate Policy Tab with gr.TabItem("Evaluate Policy"): gr.Markdown("### Evaluate Policy") with gr.Row(): robot_type_e = gr.Textbox(label="Robot Type", value="so101_follower") robot_port_e = gr.Textbox(label="Robot Port", value="/dev/ttyACM0") robot_id_e = gr.Textbox(label="Robot ID", value="follower") with gr.Row(): cam_index_e = gr.Number(label="Cam Index", value=0, precision=0) cam_width_e = gr.Number(label="Width", value=640, precision=0) cam_height_e = gr.Number(label="Height", value=480, precision=0) cam_fps_e = gr.Number(label="FPS", value=30, precision=0) with gr.Row(): display_data_e = gr.Checkbox(label="Display Data", value=True) dataset_repo_e = gr.Dropdown(label="User Dataset", choices=_list_remote_datasets(hf_username_tb.value)) num_episodes_e = gr.Number(label="Num Episodes", value=2, precision=0) single_task_e = gr.Textbox(label="Single Task", value="place cylinder") with gr.Row(): policy_path_e = gr.Textbox(label="Policy Path", value="arpitg1304/smolvla_place_cylinder") resume_e = gr.Checkbox(label="Resume", value=True) eval_cmd = gr.Textbox(label="Generated Command", interactive=False, lines=16) # update evaluate dataset dropdown when username changes hf_username_tb.change(_update_ds_choices, inputs=hf_username_tb, outputs=dataset_repo_e) inputs_eval = [ robot_type_e, robot_port_e, robot_id_e, cam_index_e, cam_width_e, cam_height_e, cam_fps_e, display_data_e, dataset_repo_e, num_episodes_e, single_task_e, policy_path_e, resume_e, ] gr.Button("Generate Command").click( build_eval_command, inputs_eval, eval_cmd, ) return demo if __name__ == "__main__": build_ui().launch()