Spaces:
Runtime error
Runtime error
| import warnings | |
| warnings.filterwarnings('ignore', category=DeprecationWarning) | |
| import io | |
| import os | |
| from tqdm import tqdm | |
| os.environ['MKL_SERVICE_FORCE_INTEL'] = '1' | |
| from pathlib import Path | |
| from collections import OrderedDict | |
| import hydra | |
| import numpy as np | |
| import torch | |
| import tools.utils as utils | |
| from tools.replay import load_episode | |
| torch.backends.cudnn.benchmark = True | |
| if os.name == "nt": | |
| import msvcrt | |
| def portable_lock(fp): | |
| fp.seek(0) | |
| msvcrt.locking(fp, msvcrt.LK_LOCK, 1) | |
| def portable_unlock(fp): | |
| fp.seek(0) | |
| msvcrt.locking(fp, msvcrt.LK_UNLCK, 1) | |
| else: | |
| import fcntl | |
| def portable_lock(fp): | |
| fcntl.flock(fp, fcntl.LOCK_EX | fcntl.LOCK_NB) | |
| def portable_unlock(fp): | |
| fcntl.flock(fp, fcntl.LOCK_UN) | |
| class Locker: | |
| def __init__(self, lock_name): | |
| # e.g. lock_name = "./lockfile.lck" | |
| self.lock_name = lock_name | |
| def __enter__(self,): | |
| open_mode = os.O_RDWR | os.O_CREAT | os.O_TRUNC | |
| self.fd = os.open(self.lock_name, open_mode) | |
| portable_lock(self.fd) | |
| def __exit__(self, _type, value, tb): | |
| portable_unlock(self.fd) | |
| os.close(self.fd) | |
| try: | |
| os.remove(self.lock_name) | |
| except: | |
| pass | |
| class Workspace: | |
| def __init__(self, cfg, savedir=None, workdir=None,): | |
| self.workdir = Path.cwd() if workdir is None else workdir | |
| print(f'workspace: {self.workdir}') | |
| assert int(cfg.viclip_encode) == 1, "encoding only one (video or img)" | |
| if cfg.viclip_encode: | |
| self.key_to_add = 'clip_video' | |
| self.key_to_process = getattr(cfg, 'key_to_process', 'observation') | |
| self.cfg = cfg | |
| self.device = torch.device(cfg.device) | |
| # create envs | |
| task = cfg.task | |
| self.task = task | |
| img_size = cfg.img_size | |
| import envs.main as envs | |
| self.train_env = envs.make(task, cfg.obs_type, cfg.action_repeat, cfg.seed, img_size=img_size, viclip_encode=cfg.viclip_encode, device='cuda') | |
| self.dataset_path = Path(cfg.dataset_dir) | |
| self.timer = utils.Timer() | |
| self._global_step = 0 | |
| self._global_episode = 0 | |
| def process(self): | |
| filenames = sorted(self.dataset_path.glob('**/*.npz')) | |
| print(f"Found {len(filenames)} files") | |
| episodes_to_process = {} | |
| for idx, fname in tqdm(enumerate(filenames)): | |
| lockname = str(fname.absolute()) + ".lck" | |
| try: | |
| with Locker(lockname): | |
| episode = load_episode(fname) | |
| # validate before continuing | |
| if type(episode[self.key_to_add]) == np.ndarray and episode[self.key_to_add].size > 1 and episode[self.key_to_add].shape[0] == episode[self.key_to_process].shape[0]: | |
| continue | |
| else: | |
| del episode[self.key_to_add] | |
| add_data = self.train_env.process_episode(episode[self.key_to_process]) # .cpu().numpy() | |
| if idx == 0: | |
| print(add_data.shape) | |
| episode[self.key_to_add] = add_data | |
| # save episode | |
| with io.BytesIO() as f1: | |
| np.savez_compressed(f1, **episode) | |
| f1.seek(0) | |
| with fname.open('wb') as f2: | |
| f2.write(f1.read()) | |
| except BlockingIOError: | |
| print(f"File busy: {str(fname)}") | |
| continue | |
| def start_processing(cfg, savedir, workdir): | |
| from process_dataset import Workspace as W | |
| root_dir = Path.cwd() | |
| cfg.workdir = str(root_dir) | |
| workspace = W(cfg, savedir, workdir) | |
| workspace.root_dir = root_dir | |
| snapshot = workspace.root_dir / 'last_snapshot.pt' | |
| if snapshot.exists(): | |
| print(f'resuming: {snapshot}') | |
| workspace.load_snapshot(workspace.root_dir) | |
| workspace.process() | |
| def main(cfg): | |
| start_processing(cfg, None, None) | |
| if __name__ == '__main__': | |
| main() |