Spaces:
Running
Running
/** | |
* Robot teleoperation using keyboard control | |
* | |
* Example: | |
* ``` | |
* npx lerobot teleoperate --robot.type=so100_follower --robot.port=COM4 --teleop.type=keyboard | |
* ``` | |
*/ | |
import { createSO100Follower } from "./robots/so100_follower.js"; | |
import { KeyboardController } from "./utils/keyboard-teleop.js"; | |
import type { TeleoperateConfig } from "./types/teleoperation.js"; | |
/** | |
* Main teleoperate function | |
* Mirrors Python lerobot teleoperate.py structure | |
*/ | |
export async function teleoperate(config: TeleoperateConfig): Promise<void> { | |
// Validate configuration | |
if (!config.robot) { | |
throw new Error("Robot configuration is required"); | |
} | |
if (!config.teleop || config.teleop.type !== "keyboard") { | |
throw new Error("Only keyboard teleoperation is currently supported"); | |
} | |
const stepSize = config.step_size || 25; | |
const duration = config.duration_s; | |
let robot; | |
let keyboardController; | |
try { | |
// Create robot | |
switch (config.robot.type) { | |
case "so100_follower": | |
robot = createSO100Follower(config.robot); | |
break; | |
default: | |
throw new Error(`Unsupported robot type: ${config.robot.type}`); | |
} | |
console.log( | |
`Connecting to robot: ${config.robot.type} on ${config.robot.port}` | |
); | |
if (config.robot.id) { | |
console.log(`Robot ID: ${config.robot.id}`); | |
} | |
await robot.connect(false); // calibrate=false | |
console.log("Robot connected successfully."); | |
// Show calibration status | |
const isCalibrated = (robot as any).isCalibrated; | |
if (isCalibrated) { | |
console.log( | |
`β Loaded calibration for: ${config.robot.id || config.robot.type}` | |
); | |
} else { | |
console.log( | |
`β οΈ No calibration found for: ${ | |
config.robot.id || config.robot.type | |
} (using defaults)` | |
); | |
console.log( | |
" Run 'npx lerobot calibrate' first for optimal performance!" | |
); | |
} | |
// Create keyboard controller | |
keyboardController = new KeyboardController(robot, stepSize); | |
console.log(""); | |
console.log("Starting keyboard teleoperation..."); | |
console.log("Controls:"); | |
console.log(" ββ Arrow Keys: Shoulder Lift"); | |
console.log(" ββ Arrow Keys: Shoulder Pan"); | |
console.log(" W/S: Elbow Flex"); | |
console.log(" A/D: Wrist Flex"); | |
console.log(" Q/E: Wrist Roll"); | |
console.log(" Space: Gripper Toggle"); | |
console.log(" ESC: Emergency Stop"); | |
console.log(" Ctrl+C: Exit"); | |
console.log(""); | |
console.log("Press any control key to begin..."); | |
console.log(""); | |
// Start teleoperation control loop | |
await teleoperationLoop(keyboardController, robot, duration || null); | |
} catch (error) { | |
// Ensure we disconnect even if there's an error | |
if (keyboardController) { | |
try { | |
await keyboardController.stop(); | |
} catch (stopError) { | |
console.warn("Warning: Failed to stop keyboard controller properly"); | |
} | |
} | |
if (robot) { | |
try { | |
await robot.disconnect(); | |
} catch (disconnectError) { | |
console.warn("Warning: Failed to disconnect robot properly"); | |
} | |
} | |
throw error; | |
} | |
} | |
/** | |
* Main teleoperation control loop | |
*/ | |
async function teleoperationLoop( | |
keyboardController: KeyboardController, | |
robot: any, | |
duration: number | null | |
): Promise<void> { | |
console.log("Initializing teleoperation..."); | |
// Start keyboard controller | |
await keyboardController.start(); | |
const startTime = performance.now(); | |
// Set up graceful shutdown | |
let running = true; | |
process.on("SIGINT", () => { | |
console.log("\nShutting down gracefully..."); | |
running = false; | |
}); | |
try { | |
// Just wait for the keyboard controller to handle everything | |
while (running) { | |
// Check duration limit | |
if (duration && performance.now() - startTime >= duration * 1000) { | |
console.log(`\nDuration limit reached (${duration}s). Stopping...`); | |
break; | |
} | |
// Small delay to prevent busy waiting | |
await new Promise((resolve) => setTimeout(resolve, 100)); | |
} | |
} finally { | |
console.log("\nStopping teleoperation..."); | |
await keyboardController.stop(); | |
await robot.disconnect(); | |
console.log("Teleoperation stopped."); | |
} | |
} | |
/** | |
* Parse command line arguments in Python argparse style | |
* Handles --robot.type=so100_follower --teleop.type=keyboard format | |
*/ | |
export function parseArgs(args: string[]): TeleoperateConfig { | |
const config: Partial<TeleoperateConfig> = {}; | |
for (const arg of args) { | |
if (arg.startsWith("--robot.")) { | |
if (!config.robot) { | |
config.robot = { type: "so100_follower", port: "" }; | |
} | |
const [key, value] = arg.substring(8).split("="); | |
switch (key) { | |
case "type": | |
if (value !== "so100_follower") { | |
throw new Error(`Unsupported robot type: ${value}`); | |
} | |
config.robot.type = value as "so100_follower"; | |
break; | |
case "port": | |
config.robot.port = value; | |
break; | |
case "id": | |
config.robot.id = value; | |
break; | |
default: | |
throw new Error(`Unknown robot parameter: ${key}`); | |
} | |
} else if (arg.startsWith("--teleop.")) { | |
if (!config.teleop) { | |
config.teleop = { type: "keyboard" }; | |
} | |
const [key, value] = arg.substring(9).split("="); | |
switch (key) { | |
case "type": | |
if (value !== "keyboard") { | |
throw new Error(`Unsupported teleoperator type: ${value}`); | |
} | |
config.teleop.type = value as "keyboard"; | |
break; | |
default: | |
throw new Error(`Unknown teleoperator parameter: ${key}`); | |
} | |
} else if (arg.startsWith("--fps=")) { | |
config.fps = parseInt(arg.substring(6)); | |
if (isNaN(config.fps) || config.fps <= 0) { | |
throw new Error("FPS must be a positive number"); | |
} | |
} else if (arg.startsWith("--step_size=")) { | |
config.step_size = parseInt(arg.substring(12)); | |
if (isNaN(config.step_size) || config.step_size <= 0) { | |
throw new Error("Step size must be a positive number"); | |
} | |
} else if (arg.startsWith("--duration_s=")) { | |
config.duration_s = parseInt(arg.substring(13)); | |
if (isNaN(config.duration_s) || config.duration_s <= 0) { | |
throw new Error("Duration must be a positive number"); | |
} | |
} else if (arg === "--help" || arg === "-h") { | |
showUsage(); | |
process.exit(0); | |
} else if (!arg.startsWith("--")) { | |
// Skip non-option arguments | |
continue; | |
} else { | |
throw new Error(`Unknown argument: ${arg}`); | |
} | |
} | |
// Validate required fields | |
if (!config.robot?.port) { | |
throw new Error("Robot port is required (--robot.port=PORT)"); | |
} | |
if (!config.teleop?.type) { | |
throw new Error("Teleoperator type is required (--teleop.type=keyboard)"); | |
} | |
return config as TeleoperateConfig; | |
} | |
/** | |
* Show usage information matching Python argparse output | |
*/ | |
function showUsage(): void { | |
console.log("Usage: lerobot teleoperate [options]"); | |
console.log(""); | |
console.log("Control a robot using keyboard input"); | |
console.log(""); | |
console.log("Options:"); | |
console.log(" --robot.type=TYPE Robot type (so100_follower)"); | |
console.log( | |
" --robot.port=PORT Robot serial port (e.g., COM4, /dev/ttyUSB0)" | |
); | |
console.log(" --robot.id=ID Robot identifier"); | |
console.log(" --teleop.type=TYPE Teleoperator type (keyboard)"); | |
console.log( | |
" --fps=FPS Control loop frame rate (default: 60)" | |
); | |
console.log( | |
" --step_size=SIZE Position step size per keypress (default: 10)" | |
); | |
console.log(" --duration_s=SECONDS Teleoperation duration in seconds"); | |
console.log(" -h, --help Show this help message"); | |
console.log(""); | |
console.log("Keyboard Controls:"); | |
console.log(" ββ Arrow Keys Shoulder Lift"); | |
console.log(" ββ Arrow Keys Shoulder Pan"); | |
console.log(" W/S Elbow Flex"); | |
console.log(" A/D Wrist Flex"); | |
console.log(" Q/E Wrist Roll"); | |
console.log(" Space Gripper Toggle"); | |
console.log(" ESC Emergency Stop"); | |
console.log(" Ctrl+C Exit"); | |
console.log(""); | |
console.log("Examples:"); | |
console.log( | |
" lerobot teleoperate --robot.type=so100_follower --robot.port=COM4 --teleop.type=keyboard" | |
); | |
console.log( | |
" lerobot teleoperate --robot.type=so100_follower --robot.port=COM4 --teleop.type=keyboard --fps=30 --step_size=50" | |
); | |
console.log(""); | |
console.log("Use 'lerobot find-port' to discover available ports."); | |
} | |
/** | |
* CLI entry point when called directly | |
* Mirrors Python's if __name__ == "__main__": pattern | |
*/ | |
export async function main(args: string[]): Promise<void> { | |
try { | |
if (args.length === 0 || args.includes("--help") || args.includes("-h")) { | |
showUsage(); | |
return; | |
} | |
const config = parseArgs(args); | |
await teleoperate(config); | |
} catch (error) { | |
if (error instanceof Error) { | |
console.error("Error:", error.message); | |
} else { | |
console.error("Error:", error); | |
} | |
console.error(""); | |
console.error("Please verify:"); | |
console.error("1. The robot is connected to the specified port"); | |
console.error("2. No other application is using the port"); | |
console.error("3. You have permission to access the port"); | |
console.error(""); | |
console.error("Use 'lerobot find-port' to discover available ports."); | |
process.exit(1); | |
} | |
} | |
if (import.meta.url === `file://${process.argv[1]}`) { | |
const args = process.argv.slice(2); | |
main(args); | |
} | |