/** * Robot teleoperation using keyboard control * * Example: * ``` * npx lerobot teleoperate --robot.type=so100_follower --robot.port=COM4 --teleop.type=keyboard * ``` */ import { createSO100Follower } from "./robots/so100_follower.js"; import { KeyboardController } from "./utils/keyboard-teleop.js"; import type { TeleoperateConfig } from "./types/teleoperation.js"; /** * Main teleoperate function * Mirrors Python lerobot teleoperate.py structure */ export async function teleoperate(config: TeleoperateConfig): Promise { // Validate configuration if (!config.robot) { throw new Error("Robot configuration is required"); } if (!config.teleop || config.teleop.type !== "keyboard") { throw new Error("Only keyboard teleoperation is currently supported"); } const stepSize = config.step_size || 25; const duration = config.duration_s; let robot; let keyboardController; try { // Create robot switch (config.robot.type) { case "so100_follower": robot = createSO100Follower(config.robot); break; default: throw new Error(`Unsupported robot type: ${config.robot.type}`); } console.log( `Connecting to robot: ${config.robot.type} on ${config.robot.port}` ); if (config.robot.id) { console.log(`Robot ID: ${config.robot.id}`); } await robot.connect(false); // calibrate=false console.log("Robot connected successfully."); // Show calibration status const isCalibrated = (robot as any).isCalibrated; if (isCalibrated) { console.log( `✅ Loaded calibration for: ${config.robot.id || config.robot.type}` ); } else { console.log( `⚠️ No calibration found for: ${ config.robot.id || config.robot.type } (using defaults)` ); console.log( " Run 'npx lerobot calibrate' first for optimal performance!" ); } // Create keyboard controller keyboardController = new KeyboardController(robot, stepSize); console.log(""); console.log("Starting keyboard teleoperation..."); console.log("Controls:"); console.log(" ↑↓ Arrow Keys: Shoulder Lift"); console.log(" ←→ Arrow Keys: Shoulder Pan"); console.log(" W/S: Elbow Flex"); console.log(" A/D: Wrist Flex"); console.log(" Q/E: Wrist Roll"); console.log(" Space: Gripper Toggle"); console.log(" ESC: Emergency Stop"); console.log(" Ctrl+C: Exit"); console.log(""); console.log("Press any control key to begin..."); console.log(""); // Start teleoperation control loop await teleoperationLoop(keyboardController, robot, duration || null); } catch (error) { // Ensure we disconnect even if there's an error if (keyboardController) { try { await keyboardController.stop(); } catch (stopError) { console.warn("Warning: Failed to stop keyboard controller properly"); } } if (robot) { try { await robot.disconnect(); } catch (disconnectError) { console.warn("Warning: Failed to disconnect robot properly"); } } throw error; } } /** * Main teleoperation control loop */ async function teleoperationLoop( keyboardController: KeyboardController, robot: any, duration: number | null ): Promise { console.log("Initializing teleoperation..."); // Start keyboard controller await keyboardController.start(); const startTime = performance.now(); // Set up graceful shutdown let running = true; process.on("SIGINT", () => { console.log("\nShutting down gracefully..."); running = false; }); try { // Just wait for the keyboard controller to handle everything while (running) { // Check duration limit if (duration && performance.now() - startTime >= duration * 1000) { console.log(`\nDuration limit reached (${duration}s). Stopping...`); break; } // Small delay to prevent busy waiting await new Promise((resolve) => setTimeout(resolve, 100)); } } finally { console.log("\nStopping teleoperation..."); await keyboardController.stop(); await robot.disconnect(); console.log("Teleoperation stopped."); } } /** * Parse command line arguments in Python argparse style * Handles --robot.type=so100_follower --teleop.type=keyboard format */ export function parseArgs(args: string[]): TeleoperateConfig { const config: Partial = {}; for (const arg of args) { if (arg.startsWith("--robot.")) { if (!config.robot) { config.robot = { type: "so100_follower", port: "" }; } const [key, value] = arg.substring(8).split("="); switch (key) { case "type": if (value !== "so100_follower") { throw new Error(`Unsupported robot type: ${value}`); } config.robot.type = value as "so100_follower"; break; case "port": config.robot.port = value; break; case "id": config.robot.id = value; break; default: throw new Error(`Unknown robot parameter: ${key}`); } } else if (arg.startsWith("--teleop.")) { if (!config.teleop) { config.teleop = { type: "keyboard" }; } const [key, value] = arg.substring(9).split("="); switch (key) { case "type": if (value !== "keyboard") { throw new Error(`Unsupported teleoperator type: ${value}`); } config.teleop.type = value as "keyboard"; break; default: throw new Error(`Unknown teleoperator parameter: ${key}`); } } else if (arg.startsWith("--fps=")) { config.fps = parseInt(arg.substring(6)); if (isNaN(config.fps) || config.fps <= 0) { throw new Error("FPS must be a positive number"); } } else if (arg.startsWith("--step_size=")) { config.step_size = parseInt(arg.substring(12)); if (isNaN(config.step_size) || config.step_size <= 0) { throw new Error("Step size must be a positive number"); } } else if (arg.startsWith("--duration_s=")) { config.duration_s = parseInt(arg.substring(13)); if (isNaN(config.duration_s) || config.duration_s <= 0) { throw new Error("Duration must be a positive number"); } } else if (arg === "--help" || arg === "-h") { showUsage(); process.exit(0); } else if (!arg.startsWith("--")) { // Skip non-option arguments continue; } else { throw new Error(`Unknown argument: ${arg}`); } } // Validate required fields if (!config.robot?.port) { throw new Error("Robot port is required (--robot.port=PORT)"); } if (!config.teleop?.type) { throw new Error("Teleoperator type is required (--teleop.type=keyboard)"); } return config as TeleoperateConfig; } /** * Show usage information matching Python argparse output */ function showUsage(): void { console.log("Usage: lerobot teleoperate [options]"); console.log(""); console.log("Control a robot using keyboard input"); console.log(""); console.log("Options:"); console.log(" --robot.type=TYPE Robot type (so100_follower)"); console.log( " --robot.port=PORT Robot serial port (e.g., COM4, /dev/ttyUSB0)" ); console.log(" --robot.id=ID Robot identifier"); console.log(" --teleop.type=TYPE Teleoperator type (keyboard)"); console.log( " --fps=FPS Control loop frame rate (default: 60)" ); console.log( " --step_size=SIZE Position step size per keypress (default: 10)" ); console.log(" --duration_s=SECONDS Teleoperation duration in seconds"); console.log(" -h, --help Show this help message"); console.log(""); console.log("Keyboard Controls:"); console.log(" ↑↓ Arrow Keys Shoulder Lift"); console.log(" ←→ Arrow Keys Shoulder Pan"); console.log(" W/S Elbow Flex"); console.log(" A/D Wrist Flex"); console.log(" Q/E Wrist Roll"); console.log(" Space Gripper Toggle"); console.log(" ESC Emergency Stop"); console.log(" Ctrl+C Exit"); console.log(""); console.log("Examples:"); console.log( " lerobot teleoperate --robot.type=so100_follower --robot.port=COM4 --teleop.type=keyboard" ); console.log( " lerobot teleoperate --robot.type=so100_follower --robot.port=COM4 --teleop.type=keyboard --fps=30 --step_size=50" ); console.log(""); console.log("Use 'lerobot find-port' to discover available ports."); } /** * CLI entry point when called directly * Mirrors Python's if __name__ == "__main__": pattern */ export async function main(args: string[]): Promise { try { if (args.length === 0 || args.includes("--help") || args.includes("-h")) { showUsage(); return; } const config = parseArgs(args); await teleoperate(config); } catch (error) { if (error instanceof Error) { console.error("Error:", error.message); } else { console.error("Error:", error); } console.error(""); console.error("Please verify:"); console.error("1. The robot is connected to the specified port"); console.error("2. No other application is using the port"); console.error("3. You have permission to access the port"); console.error(""); console.error("Use 'lerobot find-port' to discover available ports."); process.exit(1); } } if (import.meta.url === `file://${process.argv[1]}`) { const args = process.argv.slice(2); main(args); }