da03 commited on
Commit
a9d3da5
·
1 Parent(s): 021f723
Files changed (1) hide show
  1. sync_train_dataset.py +112 -17
sync_train_dataset.py CHANGED
@@ -376,35 +376,130 @@ def transfer_padding_file(sftp):
376
 
377
 
378
  def run_transfer_cycle():
379
- """Run a complete transfer cycle."""
380
  client = None
381
  try:
382
  # Connect to the remote server
383
  client = create_ssh_client()
384
  sftp = client.open_sftp()
385
 
386
- # Step 0: Transfer padding.npy file (critical for model operation)
387
- padding_success = transfer_padding_file(sftp)
388
- if not padding_success:
389
- logger.warning("Failed to transfer padding.npy file, but continuing with other transfers")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
390
 
391
- # Step 1: Transfer TAR files
392
- tar_count = transfer_tar_files(sftp)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
393
 
394
- # Step 2: Transfer PKL file (only if we have new TAR files or it's changed)
395
- if tar_count > 0 or not get_transfer_state("last_pkl_transfer"):
396
- pkl_success = transfer_pkl_file(sftp)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
397
  else:
398
- pkl_success = True # Assume success if we didn't need to transfer
 
 
 
 
 
 
399
 
400
- # Step 3: Transfer CSV file (only if PKL transfer succeeded)
401
- if pkl_success:
402
- csv_success = transfer_csv_file(sftp)
 
 
 
 
 
 
 
 
 
 
 
 
 
403
  else:
404
- logger.warning("Skipping CSV transfer because PKL transfer failed")
 
 
 
405
  csv_success = False
406
-
407
- return padding_success or tar_count > 0 or pkl_success or csv_success
408
  except Exception as e:
409
  logger.error(f"Error in transfer cycle: {str(e)}")
410
  return False
 
376
 
377
 
378
  def run_transfer_cycle():
379
+ """Run a complete transfer cycle with time-based consistency."""
380
  client = None
381
  try:
382
  # Connect to the remote server
383
  client = create_ssh_client()
384
  sftp = client.open_sftp()
385
 
386
+ # Step 0: Get a snapshot of all files with their timestamps
387
+ # This creates a consistent view of the remote directory at this point in time
388
+ logger.info("Taking snapshot of remote directory state")
389
+ remote_files = {}
390
+ for filename in sftp.listdir(REMOTE_DATA_DIR):
391
+ try:
392
+ file_path = os.path.join(REMOTE_DATA_DIR, filename)
393
+ stat = sftp.stat(file_path)
394
+ remote_files[filename] = {
395
+ 'size': stat.st_size,
396
+ 'mtime': stat.st_mtime,
397
+ 'path': file_path
398
+ }
399
+ except Exception as e:
400
+ logger.warning(f"Could not stat file {filename}: {str(e)}")
401
+
402
+ logger.info(f"Found {len(remote_files)} files in remote directory snapshot")
403
+
404
+ # Step 1: Transfer padding.npy file if needed
405
+ if "padding.npy" in remote_files:
406
+ file_info = remote_files["padding.npy"]
407
+ if not is_file_transferred("padding.npy", file_info['size'], file_info['mtime']):
408
+ # Check stability
409
+ is_stable, updated_stat = is_file_stable(sftp, file_info['path'])
410
+ if is_stable:
411
+ local_path = os.path.join(LOCAL_DATA_DIR, "padding.npy")
412
+ checksum = safe_transfer_file(sftp, file_info['path'], local_path)
413
+ mark_file_transferred("padding.npy", updated_stat.st_size, updated_stat.st_mtime, checksum)
414
+ logger.info("Successfully transferred padding.npy file")
415
+ else:
416
+ logger.warning("Padding file is still being written, skipping")
417
+ else:
418
+ logger.warning("padding.npy not found in remote directory")
419
+
420
+ # Step 2: Transfer TAR files from the snapshot
421
+ tar_pattern = re.compile(r'record_.*\.tar$')
422
+ tar_files = {name: info for name, info in remote_files.items() if tar_pattern.match(name)}
423
+ logger.info(f"Found {len(tar_files)} TAR files in snapshot")
424
 
425
+ tar_count = 0
426
+ for tar_file, file_info in tar_files.items():
427
+ # Skip if already transferred with same size and mtime
428
+ if is_file_transferred(tar_file, file_info['size'], file_info['mtime']):
429
+ logger.debug(f"Skipping already transferred file: {tar_file}")
430
+ continue
431
+
432
+ # Check if file is stable
433
+ is_stable, updated_stat = is_file_stable(sftp, file_info['path'])
434
+ if not is_stable:
435
+ logger.info(f"Skipping unstable file: {tar_file}")
436
+ continue
437
+
438
+ # Transfer the file
439
+ try:
440
+ local_path = os.path.join(LOCAL_DATA_DIR, tar_file)
441
+ checksum = safe_transfer_file(sftp, file_info['path'], local_path)
442
+ mark_file_transferred(tar_file, updated_stat.st_size, updated_stat.st_mtime, checksum)
443
+ tar_count += 1
444
+ except Exception as e:
445
+ logger.error(f"Failed to transfer {tar_file}: {str(e)}")
446
+
447
+ logger.info(f"Transferred {tar_count} new TAR files from snapshot")
448
 
449
+ # Step 3: Transfer PKL file from the snapshot
450
+ pkl_file = "image_action_mapping_with_key_states.pkl"
451
+ if pkl_file in remote_files:
452
+ file_info = remote_files[pkl_file]
453
+
454
+ # Only transfer if needed
455
+ if not is_file_transferred(pkl_file, file_info['size'], file_info['mtime']):
456
+ is_stable, updated_stat = is_file_stable(sftp, file_info['path'])
457
+ if is_stable:
458
+ local_path = os.path.join(LOCAL_DATA_DIR, pkl_file)
459
+ checksum = safe_transfer_file(sftp, file_info['path'], local_path)
460
+ mark_file_transferred(pkl_file, updated_stat.st_size, updated_stat.st_mtime, checksum)
461
+ update_transfer_state("last_pkl_transfer", datetime.now().isoformat())
462
+ logger.info("Successfully transferred PKL file from snapshot")
463
+ pkl_success = True
464
+ else:
465
+ logger.warning("PKL file is still being written, skipping")
466
+ pkl_success = False
467
+ else:
468
+ logger.debug("PKL file unchanged, skipping")
469
+ pkl_success = True
470
  else:
471
+ logger.warning("PKL file not found in snapshot")
472
+ pkl_success = False
473
+
474
+ # Step 4: Transfer CSV file from the snapshot (only if PKL succeeded)
475
+ csv_file = "train_dataset.target_frames.csv"
476
+ if pkl_success and csv_file in remote_files:
477
+ file_info = remote_files[csv_file]
478
 
479
+ # Only transfer if needed
480
+ if not is_file_transferred(csv_file, file_info['size'], file_info['mtime']):
481
+ is_stable, updated_stat = is_file_stable(sftp, file_info['path'])
482
+ if is_stable:
483
+ local_path = os.path.join(LOCAL_DATA_DIR, csv_file)
484
+ checksum = safe_transfer_file(sftp, file_info['path'], local_path)
485
+ mark_file_transferred(csv_file, updated_stat.st_size, updated_stat.st_mtime, checksum)
486
+ update_transfer_state("last_csv_transfer", datetime.now().isoformat())
487
+ logger.info("Successfully transferred CSV file from snapshot")
488
+ csv_success = True
489
+ else:
490
+ logger.warning("CSV file is still being written, skipping")
491
+ csv_success = False
492
+ else:
493
+ logger.debug("CSV file unchanged, skipping")
494
+ csv_success = True
495
  else:
496
+ if not pkl_success:
497
+ logger.warning("Skipping CSV transfer because PKL transfer failed")
498
+ else:
499
+ logger.warning("CSV file not found in snapshot")
500
  csv_success = False
501
+
502
+ return tar_count > 0 or pkl_success or csv_success
503
  except Exception as e:
504
  logger.error(f"Error in transfer cycle: {str(e)}")
505
  return False