arpitg1304's picture
Create app.py
59aa4e5 verified
import gradio as gr
from huggingface_hub import HfApi
def build_teleop_command(
robot_type,
robot_port,
robot_id,
cam_index,
cam_width,
cam_height,
cam_fps,
teleop_type,
teleop_port,
teleop_id,
fps,
teleop_duration,
display_data,
):
cam_cfg = (
"{ front: {type: opencv, index_or_path: %d, width: %d, height: %d, fps: %d}}"
% (cam_index, cam_width, cam_height, cam_fps)
)
cmd = [
"python -m lerobot.teleoperate",
f"--robot.type={robot_type}",
f"--robot.port={robot_port}",
f"--robot.id={robot_id}",
f"--robot.cameras=\"{cam_cfg}\"",
f"--teleop.type={teleop_type}",
f"--teleop.port={teleop_port}",
f"--teleop.id={teleop_id}",
f"--fps={fps}",
]
if teleop_duration:
cmd.append(f"--teleop_time_s={teleop_duration}")
cmd.append(f"--display_data={'true' if display_data else 'false'}")
return " \\\n ".join(cmd)
def build_record_command(
robot_type,
robot_port,
robot_id,
cam_index,
cam_width,
cam_height,
cam_fps,
teleop_type,
teleop_port,
teleop_id,
display_data,
dataset_repo,
num_episodes,
single_task,
resume,
push_to_hub,
use_existing,
existing_ds,
):
# if using existing dataset, override dataset_repo
if use_existing and existing_ds:
dataset_repo = existing_ds
camera_cfg = (
"{ front: {type: opencv, index_or_path: %d, width: %d, height: %d, fps: %d}}"
% (cam_index, cam_width, cam_height, cam_fps)
)
cmd = [
"python -m lerobot.record",
f"--robot.type={robot_type}",
f"--robot.port={robot_port}",
f"--robot.id={robot_id}",
f"--robot.cameras=\"{camera_cfg}\"",
f"--teleop.type={teleop_type}",
f"--teleop.port={teleop_port}",
f"--teleop.id={teleop_id}",
f"--display_data={'true' if display_data else 'false'}",
f"--dataset.repo_id={dataset_repo}",
f"--dataset.num_episodes={num_episodes}",
f"--dataset.single_task=\"{single_task}\"",
]
cmd.append(f"--dataset.push_to_hub={'true' if push_to_hub else 'false'}")
if resume:
cmd.append("--resume=True")
return " \\\n ".join(cmd)
def build_train_command(
policy_path,
dataset_repo,
batch_size,
steps,
output_dir,
job_name,
device,
wandb_enable,
policy_repo_id,
):
cmd = [
"python -m lerobot.scripts.train",
f"--policy.path={policy_path}",
f"--dataset.repo_id={dataset_repo}",
f"--batch_size={batch_size}",
f"--steps={steps}",
f"--output_dir={output_dir}",
f"--job_name={job_name}",
f"--policy.device={device}",
f"--wandb.enable={'true' if wandb_enable else 'false'}",
f"--policy.repo_id={policy_repo_id}" if policy_repo_id else "",
]
# filter empty strings
cmd = [c for c in cmd if c]
return " \\\n ".join(cmd)
def build_eval_command(
robot_type,
robot_port,
robot_id,
cam_index,
cam_width,
cam_height,
cam_fps,
display_data,
dataset_repo,
num_episodes,
single_task,
policy_path,
resume,
):
camera_cfg = (
"{ front: {type: opencv, index_or_path: %d, width: %d, height: %d, fps: %d}}"
% (cam_index, cam_width, cam_height, cam_fps)
)
cmd = [
"python -m lerobot.record",
f"--robot.type={robot_type}",
f"--robot.port={robot_port}",
f"--robot.id={robot_id}",
f"--robot.cameras=\"{camera_cfg}\"",
f"--display_data={'true' if display_data else 'false'}",
f"--dataset.repo_id={dataset_repo}",
f"--dataset.num_episodes={num_episodes}",
f"--dataset.single_task=\"{single_task}\"",
f"--policy.path={policy_path}",
]
if resume:
cmd.append("--resume=True")
return " \\\n ".join(cmd)
# Helper to list datasets on Hugging Face for given username
def _list_remote_datasets(username: str):
try:
api = HfApi()
datasets = api.list_datasets(author=username)
return sorted([d.id for d in datasets])
except Exception:
return []
def build_ui():
with gr.Blocks(title="Lerobot Scripts Controller (Generate Only)") as demo:
hf_username_tb = gr.Textbox(label="HF Username", value="arpitg1304")
with gr.Tabs():
# Teleoperate Tab
with gr.TabItem("Teleoperate Robot"):
gr.Markdown("### Teleoperate robot with camera")
with gr.Row():
robot_type = gr.Textbox(label="Robot Type", value="so101_follower")
robot_port = gr.Textbox(label="Robot Port", value="/dev/ttyACM0")
robot_id = gr.Textbox(label="Robot ID", value="follower")
with gr.Row():
cam_index = gr.Number(label="Cam Index", value=0, precision=0)
cam_width = gr.Number(label="Width", value=640, precision=0)
cam_height = gr.Number(label="Height", value=480, precision=0)
cam_fps = gr.Number(label="FPS", value=30, precision=0)
with gr.Row():
teleop_type = gr.Textbox(label="Teleop Type", value="so101_leader")
teleop_port = gr.Textbox(label="Teleop Port", value="/dev/ttyACM1")
teleop_id = gr.Textbox(label="Teleop ID", value="leader")
with gr.Row():
fps = gr.Number(label="Loop FPS", value=60, precision=0)
teleop_duration = gr.Number(label="Duration (s)", value=60, precision=0)
display_data = gr.Checkbox(label="Display Data", value=True)
teleop_cmd = gr.Textbox(label="Generated Command", interactive=False, lines=16)
inputs_teleop = [
robot_type,
robot_port,
robot_id,
cam_index,
cam_width,
cam_height,
cam_fps,
teleop_type,
teleop_port,
teleop_id,
fps,
teleop_duration,
display_data,
]
gr.Button("Generate Command").click(build_teleop_command, inputs_teleop, outputs=teleop_cmd)
# Record Data Tab
with gr.TabItem("Record Data"):
gr.Markdown("### Record episodes with policy")
with gr.Row():
robot_type2 = gr.Textbox(label="Robot Type", value="so101_follower")
robot_port2 = gr.Textbox(label="Robot Port", value="/dev/ttyACM0")
robot_id2 = gr.Textbox(label="Robot ID", value="follower")
with gr.Row():
cam_index2 = gr.Number(label="Cam Index", value=0, precision=0)
cam_width2 = gr.Number(label="Width", value=640, precision=0)
cam_height2 = gr.Number(label="Height", value=480, precision=0)
cam_fps2 = gr.Number(label="FPS", value=30, precision=0)
with gr.Row():
teleop_type_r = gr.Textbox(label="Teleop Type", value="so101_leader")
teleop_port_r = gr.Textbox(label="Teleop Port", value="/dev/ttyACM1")
teleop_id_r = gr.Textbox(label="Teleop ID", value="leader")
with gr.Row():
display_data2 = gr.Checkbox(label="Display Data", value=True)
dataset_repo = gr.Textbox(label="Dataset Repo", value="")
num_episodes = gr.Number(label="Num Episodes", value=2, precision=0)
single_task = gr.Textbox(label="Single Task", value="Grab the cylinder")
resume_chk = gr.Checkbox(label="Resume", value=False)
push_hub_chk = gr.Checkbox(label="Push to Hub", value=False)
with gr.Row():
use_existing = gr.Checkbox(label="Use Existing Dataset", value=False)
existing_dd = gr.Dropdown(label="User Datasets", choices=_list_remote_datasets(hf_username_tb.value), visible=False)
# Toggle dropdown visibility
use_existing.change(lambda f: gr.update(visible=f), inputs=use_existing, outputs=existing_dd)
# Update dataset choices when username changes
def _update_ds_choices(username):
return gr.update(choices=_list_remote_datasets(username))
hf_username_tb.change(_update_ds_choices, inputs=hf_username_tb, outputs=existing_dd)
record_cmd = gr.Textbox(label="Generated Command", interactive=False, lines=16)
inputs_rec = [
robot_type2,
robot_port2,
robot_id2,
cam_index2,
cam_width2,
cam_height2,
cam_fps2,
teleop_type_r,
teleop_port_r,
teleop_id_r,
display_data2,
dataset_repo,
num_episodes,
single_task,
resume_chk,
push_hub_chk,
use_existing,
existing_dd,
]
gr.Button("Generate Command").click(build_record_command, inputs_rec, record_cmd)
# Train Policy Tab
with gr.TabItem("Train Policy"):
gr.Markdown("### Train Policy")
# Row 1: Policy path (full width)
policy_path_t = gr.Textbox(label="Base Policy Path", value="lerobot/smolvla_base")
# Row 2: Dataset + Device + WandB enable
with gr.Row():
dataset_repo_t = gr.Dropdown(
label="User Dataset",
choices=_list_remote_datasets(hf_username_tb.value),
scale=4,
)
device_t = gr.Dropdown(label="Device", choices=["cpu", "cuda"], value="cuda", scale=1)
wandb_chk = gr.Checkbox(label="W&B", value=True, scale=1)
# Row 3: Batch size & Steps
with gr.Row():
batch_size_t = gr.Number(label="Batch Size", value=16, precision=0, scale=1)
steps_t = gr.Number(label="Steps", value=20000, precision=0, scale=1)
# Row 4: Output dir (full width)
output_dir_t = gr.Textbox(label="Output Dir", value="outputs/train/my_smolvla_1")
# Row 5: Job name & Policy repo id
with gr.Row():
job_name_t = gr.Textbox(label="Job Name", value="smolvla_place_cylinder", scale=1)
policy_repo_t = gr.Textbox(label="Policy Repo ID (optional)", value="", scale=1)
# Update train dataset dropdown when username changes
hf_username_tb.change(_update_ds_choices, inputs=hf_username_tb, outputs=dataset_repo_t)
train_cmd = gr.Textbox(label="Generated Command", interactive=False, lines=16)
gr.Button("Generate Command").click(
build_train_command,
[
policy_path_t,
dataset_repo_t,
batch_size_t,
steps_t,
output_dir_t,
job_name_t,
device_t,
wandb_chk,
policy_repo_t,
],
train_cmd,
)
# Evaluate Policy Tab
with gr.TabItem("Evaluate Policy"):
gr.Markdown("### Evaluate Policy")
with gr.Row():
robot_type_e = gr.Textbox(label="Robot Type", value="so101_follower")
robot_port_e = gr.Textbox(label="Robot Port", value="/dev/ttyACM0")
robot_id_e = gr.Textbox(label="Robot ID", value="follower")
with gr.Row():
cam_index_e = gr.Number(label="Cam Index", value=0, precision=0)
cam_width_e = gr.Number(label="Width", value=640, precision=0)
cam_height_e = gr.Number(label="Height", value=480, precision=0)
cam_fps_e = gr.Number(label="FPS", value=30, precision=0)
with gr.Row():
display_data_e = gr.Checkbox(label="Display Data", value=True)
dataset_repo_e = gr.Dropdown(label="User Dataset", choices=_list_remote_datasets(hf_username_tb.value))
num_episodes_e = gr.Number(label="Num Episodes", value=2, precision=0)
single_task_e = gr.Textbox(label="Single Task", value="place cylinder")
with gr.Row():
policy_path_e = gr.Textbox(label="Policy Path", value="arpitg1304/smolvla_place_cylinder")
resume_e = gr.Checkbox(label="Resume", value=True)
eval_cmd = gr.Textbox(label="Generated Command", interactive=False, lines=16)
# update evaluate dataset dropdown when username changes
hf_username_tb.change(_update_ds_choices, inputs=hf_username_tb, outputs=dataset_repo_e)
inputs_eval = [
robot_type_e,
robot_port_e,
robot_id_e,
cam_index_e,
cam_width_e,
cam_height_e,
cam_fps_e,
display_data_e,
dataset_repo_e,
num_episodes_e,
single_task_e,
policy_path_e,
resume_e,
]
gr.Button("Generate Command").click(
build_eval_command,
inputs_eval,
eval_cmd,
)
return demo
if __name__ == "__main__":
build_ui().launch()