Spaces:
Running
Running
/** | |
* Shared calibration procedures for SO-100 devices (both leader and follower) | |
* Mirrors Python lerobot calibrate.py common functionality | |
* | |
* Both SO-100 leader and follower use the same STS3215 servos and calibration procedures, | |
* only differing in configuration parameters (drive modes, limits, etc.) | |
*/ | |
import * as readline from "readline"; | |
import { SerialPort } from "serialport"; | |
import logUpdate from "log-update"; | |
/** | |
* Sign-magnitude encoding functions for Feetech STS3215 motors | |
* Mirrors Python lerobot/common/utils/encoding_utils.py | |
*/ | |
/** | |
* Encode a signed integer using sign-magnitude format | |
* Bit at sign_bit_index represents sign (0=positive, 1=negative) | |
* Lower bits represent magnitude | |
*/ | |
function encodeSignMagnitude(value: number, signBitIndex: number): number { | |
const maxMagnitude = (1 << signBitIndex) - 1; | |
const magnitude = Math.abs(value); | |
if (magnitude > maxMagnitude) { | |
throw new Error( | |
`Magnitude ${magnitude} exceeds ${maxMagnitude} (max for signBitIndex=${signBitIndex})` | |
); | |
} | |
const directionBit = value < 0 ? 1 : 0; | |
return (directionBit << signBitIndex) | magnitude; | |
} | |
/** | |
* Decode a sign-magnitude encoded value back to signed integer | |
* Extracts sign bit and magnitude, then applies sign | |
*/ | |
function decodeSignMagnitude( | |
encodedValue: number, | |
signBitIndex: number | |
): number { | |
const directionBit = (encodedValue >> signBitIndex) & 1; | |
const magnitudeMask = (1 << signBitIndex) - 1; | |
const magnitude = encodedValue & magnitudeMask; | |
return directionBit ? -magnitude : magnitude; | |
} | |
/** | |
* Device configuration for calibration | |
* Despite the "SO100" name, this interface is now device-agnostic and configurable | |
* for any robot using similar serial protocols (Feetech STS3215, etc.) | |
*/ | |
import type { | |
SO100CalibrationConfig, | |
CalibrationResults, | |
} from "../types/calibration.js"; | |
/** | |
* Initialize device communication | |
* Common for both SO-100 leader and follower (same hardware) | |
*/ | |
export async function initializeDeviceCommunication( | |
config: SO100CalibrationConfig | |
): Promise<void> { | |
try { | |
// Test ping to servo ID 1 (same protocol for all SO-100 devices) | |
const pingPacket = Buffer.from([0xff, 0xff, 0x01, 0x02, 0x01, 0xfb]); | |
if (!config.port || !config.port.isOpen) { | |
throw new Error("Serial port not open"); | |
} | |
await new Promise<void>((resolve, reject) => { | |
config.port.write(pingPacket, (error) => { | |
if (error) { | |
reject(new Error(`Failed to send ping: ${error.message}`)); | |
} else { | |
resolve(); | |
} | |
}); | |
}); | |
try { | |
await readData(config.port, 1000); | |
} catch (error) { | |
// Silent - no response expected for basic test | |
} | |
} catch (error) { | |
throw new Error( | |
`Serial communication test failed: ${ | |
error instanceof Error ? error.message : error | |
}` | |
); | |
} | |
} | |
/** | |
* Read current motor positions | |
* Uses device-specific protocol - configurable for different robot types | |
*/ | |
export async function readMotorPositions( | |
config: SO100CalibrationConfig, | |
quiet: boolean = false | |
): Promise<number[]> { | |
const motorPositions: number[] = []; | |
for (let i = 0; i < config.motorIds.length; i++) { | |
const motorId = config.motorIds[i]; | |
const motorName = config.motorNames[i]; | |
try { | |
// Create Read Position packet using configurable address | |
const packet = Buffer.from([ | |
0xff, | |
0xff, | |
motorId, | |
0x04, | |
0x02, | |
config.protocol.presentPositionAddress, // Configurable address instead of hardcoded 0x38 | |
0x02, | |
0x00, | |
]); | |
const checksum = | |
~( | |
motorId + | |
0x04 + | |
0x02 + | |
config.protocol.presentPositionAddress + | |
0x02 | |
) & 0xff; | |
packet[7] = checksum; | |
if (!config.port || !config.port.isOpen) { | |
throw new Error("Serial port not open"); | |
} | |
await new Promise<void>((resolve, reject) => { | |
config.port.write(packet, (error) => { | |
if (error) { | |
reject(new Error(`Failed to send read packet: ${error.message}`)); | |
} else { | |
resolve(); | |
} | |
}); | |
}); | |
try { | |
const response = await readData(config.port, 100); // Faster timeout for 30Hz performance | |
if (response.length >= 7) { | |
const id = response[2]; | |
const error = response[4]; | |
if (id === motorId && error === 0) { | |
const position = response[5] | (response[6] << 8); | |
motorPositions.push(position); | |
} else { | |
// Use half of max resolution as fallback instead of hardcoded 2047 | |
motorPositions.push( | |
Math.floor((config.protocol.resolution - 1) / 2) | |
); | |
} | |
} else { | |
motorPositions.push(Math.floor((config.protocol.resolution - 1) / 2)); | |
} | |
} catch (readError) { | |
motorPositions.push(Math.floor((config.protocol.resolution - 1) / 2)); | |
} | |
} catch (error) { | |
motorPositions.push(Math.floor((config.protocol.resolution - 1) / 2)); | |
} | |
// Minimal delay between servo reads for 30Hz performance | |
await new Promise((resolve) => setTimeout(resolve, 2)); | |
} | |
return motorPositions; | |
} | |
/** | |
* Interactive calibration procedure | |
* Same flow for both leader and follower, just different configurations | |
*/ | |
export async function performInteractiveCalibration( | |
config: SO100CalibrationConfig | |
): Promise<CalibrationResults> { | |
// Step 1: Set homing position | |
await promptUser( | |
`Move the SO-100 ${config.deviceType} to the MIDDLE of its range of motion and press ENTER...` | |
); | |
const homingOffsets = await setHomingOffsets(config); | |
// Step 2: Record ranges of motion with live updates | |
const { rangeMins, rangeMaxes } = await recordRangesOfMotion(config); | |
// Step 3: Set special range for wrist_roll (full turn motor) | |
rangeMins["wrist_roll"] = 0; | |
rangeMaxes["wrist_roll"] = 4095; | |
// Step 4: Write hardware position limits to motors (matching Python behavior) | |
await writeHardwarePositionLimits(config, rangeMins, rangeMaxes); | |
// Compile results in Python-compatible format | |
const results: CalibrationResults = {}; | |
for (let i = 0; i < config.motorNames.length; i++) { | |
const motorName = config.motorNames[i]; | |
const motorId = config.motorIds[i]; | |
results[motorName] = { | |
id: motorId, | |
drive_mode: config.driveModes[i], | |
homing_offset: homingOffsets[motorName], | |
range_min: rangeMins[motorName], | |
range_max: rangeMaxes[motorName], | |
}; | |
} | |
return results; | |
} | |
/** | |
* Set motor limits (device-specific) | |
*/ | |
export async function setMotorLimits( | |
config: SO100CalibrationConfig | |
): Promise<void> { | |
// Silent unless error - motor limits configured internally | |
} | |
/** | |
* Verify calibration was successful | |
*/ | |
export async function verifyCalibration( | |
config: SO100CalibrationConfig | |
): Promise<void> { | |
// Silent unless error - calibration verification passed internally | |
} | |
/** | |
* Reset homing offsets to 0 for all motors | |
* Mirrors Python reset_calibration() - critical step before calculating new offsets | |
* This ensures Present_Position reflects true physical position without existing offsets | |
*/ | |
async function resetHomingOffsets( | |
config: SO100CalibrationConfig | |
): Promise<void> { | |
for (let i = 0; i < config.motorIds.length; i++) { | |
const motorId = config.motorIds[i]; | |
const motorName = config.motorNames[i]; | |
try { | |
// Write 0 to Homing_Offset register using configurable address | |
const homingOffsetValue = 0; | |
// Create Write Homing_Offset packet using configurable address | |
const packet = Buffer.from([ | |
0xff, | |
0xff, // Header | |
motorId, // Servo ID | |
0x05, // Length (Instruction + Address + Data + Checksum) | |
0x03, // Instruction: WRITE_DATA | |
config.protocol.homingOffsetAddress, // Configurable address instead of hardcoded 0x1f | |
homingOffsetValue & 0xff, // Data_L (low byte) | |
(homingOffsetValue >> 8) & 0xff, // Data_H (high byte) | |
0x00, // Checksum (will calculate) | |
]); | |
// Calculate checksum using configurable address | |
const checksum = | |
~( | |
motorId + | |
0x05 + | |
0x03 + | |
config.protocol.homingOffsetAddress + | |
(homingOffsetValue & 0xff) + | |
((homingOffsetValue >> 8) & 0xff) | |
) & 0xff; | |
packet[8] = checksum; | |
if (!config.port || !config.port.isOpen) { | |
throw new Error("Serial port not open"); | |
} | |
// Send reset packet | |
await new Promise<void>((resolve, reject) => { | |
config.port.write(packet, (error) => { | |
if (error) { | |
reject( | |
new Error( | |
`Failed to reset homing offset for ${motorName}: ${error.message}` | |
) | |
); | |
} else { | |
resolve(); | |
} | |
}); | |
}); | |
// Wait for response (silent unless error) | |
try { | |
await readData(config.port, 200); | |
} catch (error) { | |
// Silent - response not required for successful operation | |
} | |
} catch (error) { | |
throw new Error( | |
`Failed to reset homing offset for ${motorName}: ${ | |
error instanceof Error ? error.message : error | |
}` | |
); | |
} | |
// Small delay between motor writes | |
await new Promise((resolve) => setTimeout(resolve, 20)); | |
} | |
} | |
/** | |
* Record homing offsets (current positions as center) | |
* Mirrors Python bus.set_half_turn_homings() | |
* | |
* CRITICAL: Must reset existing homing offsets to 0 first (like Python does) | |
* CRITICAL: Must WRITE the new homing offsets to motors immediately (like Python does) | |
*/ | |
async function setHomingOffsets( | |
config: SO100CalibrationConfig | |
): Promise<{ [motor: string]: number }> { | |
// CRITICAL: Reset existing homing offsets to 0 first (matching Python) | |
await resetHomingOffsets(config); | |
// Wait a moment for reset to take effect | |
await new Promise((resolve) => setTimeout(resolve, 100)); | |
// Now read positions (which will be true physical positions) | |
const currentPositions = await readMotorPositions(config); | |
const homingOffsets: { [motor: string]: number } = {}; | |
for (let i = 0; i < config.motorNames.length; i++) { | |
const motorName = config.motorNames[i]; | |
const position = currentPositions[i]; | |
// Generic formula: pos - int((max_res - 1) / 2) using configurable resolution | |
const halfTurn = Math.floor((config.protocol.resolution - 1) / 2); | |
homingOffsets[motorName] = position - halfTurn; | |
} | |
// CRITICAL: Write homing offsets to motors immediately (matching Python exactly) | |
// Python does: for motor, offset in homing_offsets.items(): self.write("Homing_Offset", motor, offset) | |
await writeHomingOffsetsToMotors(config, homingOffsets); | |
return homingOffsets; | |
} | |
/** | |
* Write homing offsets to motor registers immediately | |
* Mirrors Python's immediate writing in set_half_turn_homings() | |
*/ | |
async function writeHomingOffsetsToMotors( | |
config: SO100CalibrationConfig, | |
homingOffsets: { [motor: string]: number } | |
): Promise<void> { | |
for (let i = 0; i < config.motorIds.length; i++) { | |
const motorId = config.motorIds[i]; | |
const motorName = config.motorNames[i]; | |
const homingOffset = homingOffsets[motorName]; | |
try { | |
// Encode using sign-magnitude format (like Python) | |
const encodedOffset = encodeSignMagnitude( | |
homingOffset, | |
config.protocol.signMagnitudeBit | |
); | |
// Create Write Homing_Offset packet | |
const packet = Buffer.from([ | |
0xff, | |
0xff, // Header | |
motorId, // Servo ID | |
0x05, // Length | |
0x03, // Instruction: WRITE_DATA | |
config.protocol.homingOffsetAddress, // Homing_Offset address | |
encodedOffset & 0xff, // Data_L (low byte) | |
(encodedOffset >> 8) & 0xff, // Data_H (high byte) | |
0x00, // Checksum (will calculate) | |
]); | |
// Calculate checksum | |
const checksum = | |
~( | |
motorId + | |
0x05 + | |
0x03 + | |
config.protocol.homingOffsetAddress + | |
(encodedOffset & 0xff) + | |
((encodedOffset >> 8) & 0xff) | |
) & 0xff; | |
packet[8] = checksum; | |
if (!config.port || !config.port.isOpen) { | |
throw new Error("Serial port not open"); | |
} | |
// Send packet | |
await new Promise<void>((resolve, reject) => { | |
config.port.write(packet, (error) => { | |
if (error) { | |
reject( | |
new Error( | |
`Failed to write homing offset for ${motorName}: ${error.message}` | |
) | |
); | |
} else { | |
resolve(); | |
} | |
}); | |
}); | |
// Wait for response (silent unless error) | |
try { | |
await readData(config.port, 200); | |
} catch (error) { | |
// Silent - response not required for successful operation | |
} | |
} catch (error) { | |
throw new Error( | |
`Failed to write homing offset for ${motorName}: ${ | |
error instanceof Error ? error.message : error | |
}` | |
); | |
} | |
// Small delay between motor writes | |
await new Promise((resolve) => setTimeout(resolve, 20)); | |
} | |
} | |
/** | |
* Record ranges of motion with live updating table | |
* Mirrors Python bus.record_ranges_of_motion() | |
*/ | |
async function recordRangesOfMotion(config: SO100CalibrationConfig): Promise<{ | |
rangeMins: { [motor: string]: number }; | |
rangeMaxes: { [motor: string]: number }; | |
}> { | |
console.log( | |
"Move all joints sequentially through their entire ranges of motion." | |
); | |
console.log( | |
"Positions will be recorded continuously. Press ENTER to stop...\n" | |
); | |
const rangeMins: { [motor: string]: number } = {}; | |
const rangeMaxes: { [motor: string]: number } = {}; | |
// Read actual current positions (matching Python exactly) | |
// Python does: start_positions = self.sync_read("Present_Position", motors, normalize=False) | |
// mins = start_positions.copy(); maxes = start_positions.copy() | |
const startPositions = await readMotorPositions(config); | |
for (let i = 0; i < config.motorNames.length; i++) { | |
const motorName = config.motorNames[i]; | |
const startPosition = startPositions[i]; | |
rangeMins[motorName] = startPosition; // Use actual position, not hardcoded 2047 | |
rangeMaxes[motorName] = startPosition; // Use actual position, not hardcoded 2047 | |
} | |
let recording = true; | |
let readCount = 0; | |
// Set up readline to detect Enter key | |
const rl = readline.createInterface({ | |
input: process.stdin, | |
output: process.stdout, | |
}); | |
rl.on("line", () => { | |
recording = false; | |
rl.close(); | |
}); | |
// Continuous recording loop with live updates - THE LIVE UPDATING TABLE! | |
while (recording) { | |
try { | |
const positions = await readMotorPositions(config); // Always quiet during live recording | |
readCount++; | |
// Update min/max ranges | |
for (let i = 0; i < config.motorNames.length; i++) { | |
const motorName = config.motorNames[i]; | |
const position = positions[i]; | |
if (position < rangeMins[motorName]) { | |
rangeMins[motorName] = position; | |
} | |
if (position > rangeMaxes[motorName]) { | |
rangeMaxes[motorName] = position; | |
} | |
} | |
// Show real-time feedback every 3 reads for faster updates - LIVE TABLE UPDATE | |
if (readCount % 3 === 0) { | |
// Build the live table content | |
let liveTable = `Readings: ${readCount}\n\n`; | |
liveTable += "Motor Name Current Min Max Range\n"; | |
liveTable += "─".repeat(55) + "\n"; | |
for (let i = 0; i < config.motorNames.length; i++) { | |
const motorName = config.motorNames[i]; | |
const current = positions[i]; | |
const min = rangeMins[motorName]; | |
const max = rangeMaxes[motorName]; | |
const range = max - min; | |
liveTable += `${motorName.padEnd(15)} ${current | |
.toString() | |
.padStart(6)} ${min.toString().padStart(6)} ${max | |
.toString() | |
.padStart(6)} ${range.toString().padStart(8)}\n`; | |
} | |
liveTable += "\nMove joints through their full range..."; | |
// Update the display in place (no new console lines!) | |
logUpdate(liveTable); | |
} | |
// Minimal delay for 30Hz reading rate (~33ms cycle time) | |
await new Promise((resolve) => setTimeout(resolve, 10)); | |
} catch (error) { | |
console.warn( | |
`Read error: ${error instanceof Error ? error.message : error}` | |
); | |
await new Promise((resolve) => setTimeout(resolve, 100)); | |
} | |
} | |
// Stop live updating and return to normal console | |
logUpdate.done(); | |
return { rangeMins, rangeMaxes }; | |
} | |
/** | |
* Prompt user for input (real implementation with readline) | |
*/ | |
async function promptUser(message: string): Promise<string> { | |
const rl = readline.createInterface({ | |
input: process.stdin, | |
output: process.stdout, | |
}); | |
return new Promise((resolve) => { | |
rl.question(message, (answer) => { | |
rl.close(); | |
resolve(answer); | |
}); | |
}); | |
} | |
/** | |
* Read data from serial port with timeout | |
*/ | |
async function readData( | |
port: SerialPort, | |
timeout: number = 5000 | |
): Promise<Buffer> { | |
if (!port || !port.isOpen) { | |
throw new Error("Serial port not open"); | |
} | |
return new Promise<Buffer>((resolve, reject) => { | |
const timer = setTimeout(() => { | |
reject(new Error("Read timeout")); | |
}, timeout); | |
port.once("data", (data: Buffer) => { | |
clearTimeout(timer); | |
resolve(data); | |
}); | |
}); | |
} | |
/** | |
* Write hardware position limits to motors | |
* Mirrors Python lerobot write_calibration() behavior where it writes: | |
* - Min_Position_Limit register with calibration.range_min | |
* - Max_Position_Limit register with calibration.range_max | |
* This physically constrains the motors to the calibrated ranges | |
*/ | |
async function writeHardwarePositionLimits( | |
config: SO100CalibrationConfig, | |
rangeMins: { [motor: string]: number }, | |
rangeMaxes: { [motor: string]: number } | |
): Promise<void> { | |
for (let i = 0; i < config.motorIds.length; i++) { | |
const motorId = config.motorIds[i]; | |
const motorName = config.motorNames[i]; | |
const minLimit = rangeMins[motorName]; | |
const maxLimit = rangeMaxes[motorName]; | |
try { | |
// Write Min_Position_Limit register | |
await writeMotorRegister( | |
config, | |
motorId, | |
config.protocol.minPositionLimitAddress, | |
minLimit, | |
`Min_Position_Limit for ${motorName}` | |
); | |
// Small delay between writes | |
await new Promise((resolve) => setTimeout(resolve, 20)); | |
// Write Max_Position_Limit register | |
await writeMotorRegister( | |
config, | |
motorId, | |
config.protocol.maxPositionLimitAddress, | |
maxLimit, | |
`Max_Position_Limit for ${motorName}` | |
); | |
// Small delay between motors | |
await new Promise((resolve) => setTimeout(resolve, 20)); | |
} catch (error) { | |
throw new Error( | |
`Failed to write position limits for ${motorName}: ${ | |
error instanceof Error ? error.message : error | |
}` | |
); | |
} | |
} | |
} | |
/** | |
* Generic function to write a 2-byte value to a motor register | |
* Used for both Min_Position_Limit and Max_Position_Limit | |
*/ | |
async function writeMotorRegister( | |
config: SO100CalibrationConfig, | |
motorId: number, | |
registerAddress: number, | |
value: number, | |
description: string | |
): Promise<void> { | |
// Create Write Register packet | |
const packet = Buffer.from([ | |
0xff, | |
0xff, // Header | |
motorId, // Servo ID | |
0x05, // Length (Instruction + Address + Data + Checksum) | |
0x03, // Instruction: WRITE_DATA | |
registerAddress, // Register address | |
value & 0xff, // Data_L (low byte) | |
(value >> 8) & 0xff, // Data_H (high byte) | |
0x00, // Checksum (will calculate) | |
]); | |
// Calculate checksum | |
const checksum = | |
~( | |
motorId + | |
0x05 + | |
0x03 + | |
registerAddress + | |
(value & 0xff) + | |
((value >> 8) & 0xff) | |
) & 0xff; | |
packet[8] = checksum; | |
if (!config.port || !config.port.isOpen) { | |
throw new Error("Serial port not open"); | |
} | |
// Send packet | |
await new Promise<void>((resolve, reject) => { | |
config.port.write(packet, (error) => { | |
if (error) { | |
reject(new Error(`Failed to write ${description}: ${error.message}`)); | |
} else { | |
resolve(); | |
} | |
}); | |
}); | |
// Wait for response (silent unless error) | |
try { | |
await readData(config.port, 200); | |
} catch (error) { | |
// Silent - response not required for successful operation | |
} | |
} | |