/** * Shared calibration procedures for SO-100 devices (both leader and follower) * Mirrors Python lerobot calibrate.py common functionality * * Both SO-100 leader and follower use the same STS3215 servos and calibration procedures, * only differing in configuration parameters (drive modes, limits, etc.) */ import * as readline from "readline"; import { SerialPort } from "serialport"; import logUpdate from "log-update"; /** * Sign-magnitude encoding functions for Feetech STS3215 motors * Mirrors Python lerobot/common/utils/encoding_utils.py */ /** * Encode a signed integer using sign-magnitude format * Bit at sign_bit_index represents sign (0=positive, 1=negative) * Lower bits represent magnitude */ function encodeSignMagnitude(value: number, signBitIndex: number): number { const maxMagnitude = (1 << signBitIndex) - 1; const magnitude = Math.abs(value); if (magnitude > maxMagnitude) { throw new Error( `Magnitude ${magnitude} exceeds ${maxMagnitude} (max for signBitIndex=${signBitIndex})` ); } const directionBit = value < 0 ? 1 : 0; return (directionBit << signBitIndex) | magnitude; } /** * Decode a sign-magnitude encoded value back to signed integer * Extracts sign bit and magnitude, then applies sign */ function decodeSignMagnitude( encodedValue: number, signBitIndex: number ): number { const directionBit = (encodedValue >> signBitIndex) & 1; const magnitudeMask = (1 << signBitIndex) - 1; const magnitude = encodedValue & magnitudeMask; return directionBit ? -magnitude : magnitude; } /** * Device configuration for calibration * Despite the "SO100" name, this interface is now device-agnostic and configurable * for any robot using similar serial protocols (Feetech STS3215, etc.) */ import type { SO100CalibrationConfig, CalibrationResults, } from "../types/calibration.js"; /** * Initialize device communication * Common for both SO-100 leader and follower (same hardware) */ export async function initializeDeviceCommunication( config: SO100CalibrationConfig ): Promise { try { // Test ping to servo ID 1 (same protocol for all SO-100 devices) const pingPacket = Buffer.from([0xff, 0xff, 0x01, 0x02, 0x01, 0xfb]); if (!config.port || !config.port.isOpen) { throw new Error("Serial port not open"); } await new Promise((resolve, reject) => { config.port.write(pingPacket, (error) => { if (error) { reject(new Error(`Failed to send ping: ${error.message}`)); } else { resolve(); } }); }); try { await readData(config.port, 1000); } catch (error) { // Silent - no response expected for basic test } } catch (error) { throw new Error( `Serial communication test failed: ${ error instanceof Error ? error.message : error }` ); } } /** * Read current motor positions * Uses device-specific protocol - configurable for different robot types */ export async function readMotorPositions( config: SO100CalibrationConfig, quiet: boolean = false ): Promise { const motorPositions: number[] = []; for (let i = 0; i < config.motorIds.length; i++) { const motorId = config.motorIds[i]; const motorName = config.motorNames[i]; try { // Create Read Position packet using configurable address const packet = Buffer.from([ 0xff, 0xff, motorId, 0x04, 0x02, config.protocol.presentPositionAddress, // Configurable address instead of hardcoded 0x38 0x02, 0x00, ]); const checksum = ~( motorId + 0x04 + 0x02 + config.protocol.presentPositionAddress + 0x02 ) & 0xff; packet[7] = checksum; if (!config.port || !config.port.isOpen) { throw new Error("Serial port not open"); } await new Promise((resolve, reject) => { config.port.write(packet, (error) => { if (error) { reject(new Error(`Failed to send read packet: ${error.message}`)); } else { resolve(); } }); }); try { const response = await readData(config.port, 100); // Faster timeout for 30Hz performance if (response.length >= 7) { const id = response[2]; const error = response[4]; if (id === motorId && error === 0) { const position = response[5] | (response[6] << 8); motorPositions.push(position); } else { // Use half of max resolution as fallback instead of hardcoded 2047 motorPositions.push( Math.floor((config.protocol.resolution - 1) / 2) ); } } else { motorPositions.push(Math.floor((config.protocol.resolution - 1) / 2)); } } catch (readError) { motorPositions.push(Math.floor((config.protocol.resolution - 1) / 2)); } } catch (error) { motorPositions.push(Math.floor((config.protocol.resolution - 1) / 2)); } // Minimal delay between servo reads for 30Hz performance await new Promise((resolve) => setTimeout(resolve, 2)); } return motorPositions; } /** * Interactive calibration procedure * Same flow for both leader and follower, just different configurations */ export async function performInteractiveCalibration( config: SO100CalibrationConfig ): Promise { // Step 1: Set homing position await promptUser( `Move the SO-100 ${config.deviceType} to the MIDDLE of its range of motion and press ENTER...` ); const homingOffsets = await setHomingOffsets(config); // Step 2: Record ranges of motion with live updates const { rangeMins, rangeMaxes } = await recordRangesOfMotion(config); // Step 3: Set special range for wrist_roll (full turn motor) rangeMins["wrist_roll"] = 0; rangeMaxes["wrist_roll"] = 4095; // Step 4: Write hardware position limits to motors (matching Python behavior) await writeHardwarePositionLimits(config, rangeMins, rangeMaxes); // Compile results in Python-compatible format const results: CalibrationResults = {}; for (let i = 0; i < config.motorNames.length; i++) { const motorName = config.motorNames[i]; const motorId = config.motorIds[i]; results[motorName] = { id: motorId, drive_mode: config.driveModes[i], homing_offset: homingOffsets[motorName], range_min: rangeMins[motorName], range_max: rangeMaxes[motorName], }; } return results; } /** * Set motor limits (device-specific) */ export async function setMotorLimits( config: SO100CalibrationConfig ): Promise { // Silent unless error - motor limits configured internally } /** * Verify calibration was successful */ export async function verifyCalibration( config: SO100CalibrationConfig ): Promise { // Silent unless error - calibration verification passed internally } /** * Reset homing offsets to 0 for all motors * Mirrors Python reset_calibration() - critical step before calculating new offsets * This ensures Present_Position reflects true physical position without existing offsets */ async function resetHomingOffsets( config: SO100CalibrationConfig ): Promise { for (let i = 0; i < config.motorIds.length; i++) { const motorId = config.motorIds[i]; const motorName = config.motorNames[i]; try { // Write 0 to Homing_Offset register using configurable address const homingOffsetValue = 0; // Create Write Homing_Offset packet using configurable address const packet = Buffer.from([ 0xff, 0xff, // Header motorId, // Servo ID 0x05, // Length (Instruction + Address + Data + Checksum) 0x03, // Instruction: WRITE_DATA config.protocol.homingOffsetAddress, // Configurable address instead of hardcoded 0x1f homingOffsetValue & 0xff, // Data_L (low byte) (homingOffsetValue >> 8) & 0xff, // Data_H (high byte) 0x00, // Checksum (will calculate) ]); // Calculate checksum using configurable address const checksum = ~( motorId + 0x05 + 0x03 + config.protocol.homingOffsetAddress + (homingOffsetValue & 0xff) + ((homingOffsetValue >> 8) & 0xff) ) & 0xff; packet[8] = checksum; if (!config.port || !config.port.isOpen) { throw new Error("Serial port not open"); } // Send reset packet await new Promise((resolve, reject) => { config.port.write(packet, (error) => { if (error) { reject( new Error( `Failed to reset homing offset for ${motorName}: ${error.message}` ) ); } else { resolve(); } }); }); // Wait for response (silent unless error) try { await readData(config.port, 200); } catch (error) { // Silent - response not required for successful operation } } catch (error) { throw new Error( `Failed to reset homing offset for ${motorName}: ${ error instanceof Error ? error.message : error }` ); } // Small delay between motor writes await new Promise((resolve) => setTimeout(resolve, 20)); } } /** * Record homing offsets (current positions as center) * Mirrors Python bus.set_half_turn_homings() * * CRITICAL: Must reset existing homing offsets to 0 first (like Python does) * CRITICAL: Must WRITE the new homing offsets to motors immediately (like Python does) */ async function setHomingOffsets( config: SO100CalibrationConfig ): Promise<{ [motor: string]: number }> { // CRITICAL: Reset existing homing offsets to 0 first (matching Python) await resetHomingOffsets(config); // Wait a moment for reset to take effect await new Promise((resolve) => setTimeout(resolve, 100)); // Now read positions (which will be true physical positions) const currentPositions = await readMotorPositions(config); const homingOffsets: { [motor: string]: number } = {}; for (let i = 0; i < config.motorNames.length; i++) { const motorName = config.motorNames[i]; const position = currentPositions[i]; // Generic formula: pos - int((max_res - 1) / 2) using configurable resolution const halfTurn = Math.floor((config.protocol.resolution - 1) / 2); homingOffsets[motorName] = position - halfTurn; } // CRITICAL: Write homing offsets to motors immediately (matching Python exactly) // Python does: for motor, offset in homing_offsets.items(): self.write("Homing_Offset", motor, offset) await writeHomingOffsetsToMotors(config, homingOffsets); return homingOffsets; } /** * Write homing offsets to motor registers immediately * Mirrors Python's immediate writing in set_half_turn_homings() */ async function writeHomingOffsetsToMotors( config: SO100CalibrationConfig, homingOffsets: { [motor: string]: number } ): Promise { for (let i = 0; i < config.motorIds.length; i++) { const motorId = config.motorIds[i]; const motorName = config.motorNames[i]; const homingOffset = homingOffsets[motorName]; try { // Encode using sign-magnitude format (like Python) const encodedOffset = encodeSignMagnitude( homingOffset, config.protocol.signMagnitudeBit ); // Create Write Homing_Offset packet const packet = Buffer.from([ 0xff, 0xff, // Header motorId, // Servo ID 0x05, // Length 0x03, // Instruction: WRITE_DATA config.protocol.homingOffsetAddress, // Homing_Offset address encodedOffset & 0xff, // Data_L (low byte) (encodedOffset >> 8) & 0xff, // Data_H (high byte) 0x00, // Checksum (will calculate) ]); // Calculate checksum const checksum = ~( motorId + 0x05 + 0x03 + config.protocol.homingOffsetAddress + (encodedOffset & 0xff) + ((encodedOffset >> 8) & 0xff) ) & 0xff; packet[8] = checksum; if (!config.port || !config.port.isOpen) { throw new Error("Serial port not open"); } // Send packet await new Promise((resolve, reject) => { config.port.write(packet, (error) => { if (error) { reject( new Error( `Failed to write homing offset for ${motorName}: ${error.message}` ) ); } else { resolve(); } }); }); // Wait for response (silent unless error) try { await readData(config.port, 200); } catch (error) { // Silent - response not required for successful operation } } catch (error) { throw new Error( `Failed to write homing offset for ${motorName}: ${ error instanceof Error ? error.message : error }` ); } // Small delay between motor writes await new Promise((resolve) => setTimeout(resolve, 20)); } } /** * Record ranges of motion with live updating table * Mirrors Python bus.record_ranges_of_motion() */ async function recordRangesOfMotion(config: SO100CalibrationConfig): Promise<{ rangeMins: { [motor: string]: number }; rangeMaxes: { [motor: string]: number }; }> { console.log( "Move all joints sequentially through their entire ranges of motion." ); console.log( "Positions will be recorded continuously. Press ENTER to stop...\n" ); const rangeMins: { [motor: string]: number } = {}; const rangeMaxes: { [motor: string]: number } = {}; // Read actual current positions (matching Python exactly) // Python does: start_positions = self.sync_read("Present_Position", motors, normalize=False) // mins = start_positions.copy(); maxes = start_positions.copy() const startPositions = await readMotorPositions(config); for (let i = 0; i < config.motorNames.length; i++) { const motorName = config.motorNames[i]; const startPosition = startPositions[i]; rangeMins[motorName] = startPosition; // Use actual position, not hardcoded 2047 rangeMaxes[motorName] = startPosition; // Use actual position, not hardcoded 2047 } let recording = true; let readCount = 0; // Set up readline to detect Enter key const rl = readline.createInterface({ input: process.stdin, output: process.stdout, }); rl.on("line", () => { recording = false; rl.close(); }); // Continuous recording loop with live updates - THE LIVE UPDATING TABLE! while (recording) { try { const positions = await readMotorPositions(config); // Always quiet during live recording readCount++; // Update min/max ranges for (let i = 0; i < config.motorNames.length; i++) { const motorName = config.motorNames[i]; const position = positions[i]; if (position < rangeMins[motorName]) { rangeMins[motorName] = position; } if (position > rangeMaxes[motorName]) { rangeMaxes[motorName] = position; } } // Show real-time feedback every 3 reads for faster updates - LIVE TABLE UPDATE if (readCount % 3 === 0) { // Build the live table content let liveTable = `Readings: ${readCount}\n\n`; liveTable += "Motor Name Current Min Max Range\n"; liveTable += "─".repeat(55) + "\n"; for (let i = 0; i < config.motorNames.length; i++) { const motorName = config.motorNames[i]; const current = positions[i]; const min = rangeMins[motorName]; const max = rangeMaxes[motorName]; const range = max - min; liveTable += `${motorName.padEnd(15)} ${current .toString() .padStart(6)} ${min.toString().padStart(6)} ${max .toString() .padStart(6)} ${range.toString().padStart(8)}\n`; } liveTable += "\nMove joints through their full range..."; // Update the display in place (no new console lines!) logUpdate(liveTable); } // Minimal delay for 30Hz reading rate (~33ms cycle time) await new Promise((resolve) => setTimeout(resolve, 10)); } catch (error) { console.warn( `Read error: ${error instanceof Error ? error.message : error}` ); await new Promise((resolve) => setTimeout(resolve, 100)); } } // Stop live updating and return to normal console logUpdate.done(); return { rangeMins, rangeMaxes }; } /** * Prompt user for input (real implementation with readline) */ async function promptUser(message: string): Promise { const rl = readline.createInterface({ input: process.stdin, output: process.stdout, }); return new Promise((resolve) => { rl.question(message, (answer) => { rl.close(); resolve(answer); }); }); } /** * Read data from serial port with timeout */ async function readData( port: SerialPort, timeout: number = 5000 ): Promise { if (!port || !port.isOpen) { throw new Error("Serial port not open"); } return new Promise((resolve, reject) => { const timer = setTimeout(() => { reject(new Error("Read timeout")); }, timeout); port.once("data", (data: Buffer) => { clearTimeout(timer); resolve(data); }); }); } /** * Write hardware position limits to motors * Mirrors Python lerobot write_calibration() behavior where it writes: * - Min_Position_Limit register with calibration.range_min * - Max_Position_Limit register with calibration.range_max * This physically constrains the motors to the calibrated ranges */ async function writeHardwarePositionLimits( config: SO100CalibrationConfig, rangeMins: { [motor: string]: number }, rangeMaxes: { [motor: string]: number } ): Promise { for (let i = 0; i < config.motorIds.length; i++) { const motorId = config.motorIds[i]; const motorName = config.motorNames[i]; const minLimit = rangeMins[motorName]; const maxLimit = rangeMaxes[motorName]; try { // Write Min_Position_Limit register await writeMotorRegister( config, motorId, config.protocol.minPositionLimitAddress, minLimit, `Min_Position_Limit for ${motorName}` ); // Small delay between writes await new Promise((resolve) => setTimeout(resolve, 20)); // Write Max_Position_Limit register await writeMotorRegister( config, motorId, config.protocol.maxPositionLimitAddress, maxLimit, `Max_Position_Limit for ${motorName}` ); // Small delay between motors await new Promise((resolve) => setTimeout(resolve, 20)); } catch (error) { throw new Error( `Failed to write position limits for ${motorName}: ${ error instanceof Error ? error.message : error }` ); } } } /** * Generic function to write a 2-byte value to a motor register * Used for both Min_Position_Limit and Max_Position_Limit */ async function writeMotorRegister( config: SO100CalibrationConfig, motorId: number, registerAddress: number, value: number, description: string ): Promise { // Create Write Register packet const packet = Buffer.from([ 0xff, 0xff, // Header motorId, // Servo ID 0x05, // Length (Instruction + Address + Data + Checksum) 0x03, // Instruction: WRITE_DATA registerAddress, // Register address value & 0xff, // Data_L (low byte) (value >> 8) & 0xff, // Data_H (high byte) 0x00, // Checksum (will calculate) ]); // Calculate checksum const checksum = ~( motorId + 0x05 + 0x03 + registerAddress + (value & 0xff) + ((value >> 8) & 0xff) ) & 0xff; packet[8] = checksum; if (!config.port || !config.port.isOpen) { throw new Error("Serial port not open"); } // Send packet await new Promise((resolve, reject) => { config.port.write(packet, (error) => { if (error) { reject(new Error(`Failed to write ${description}: ${error.message}`)); } else { resolve(); } }); }); // Wait for response (silent unless error) try { await readData(config.port, 200); } catch (error) { // Silent - response not required for successful operation } }