Spaces:
Runtime error
Runtime error
#!/usr/bin/env python3 | |
# Note: | |
# - Hardlinks are copied | |
# - The size of symlinks and directories is meaningless, it depends on whatever | |
# the filesystem/tar file reports | |
import argparse | |
import json | |
import os | |
import stat | |
import sys | |
import itertools | |
import logging | |
import hashlib | |
import tarfile | |
VERSION = 3 | |
IDX_NAME = 0 | |
IDX_SIZE = 1 | |
IDX_MTIME = 2 | |
IDX_MODE = 3 | |
IDX_UID = 4 | |
IDX_GID = 5 | |
# target for symbolic links | |
# child nodes for directories | |
# filename for files | |
IDX_TARGET = 6 | |
IDX_FILENAME = 6 | |
HASH_LENGTH = 8 | |
S_IFLNK = 0xA000 | |
S_IFREG = 0x8000 | |
S_IFDIR = 0x4000 | |
def hash_file(filename) -> str: | |
with open(filename, "rb", buffering=0) as f: | |
return hash_fileobj(f) | |
def hash_fileobj(f) -> str: | |
h = hashlib.sha256() | |
for b in iter(lambda: f.read(128*1024), b""): | |
h.update(b) | |
return h.hexdigest() | |
def main(): | |
logging.basicConfig(format="%(message)s") | |
logger = logging.getLogger("fs2json") | |
logger.setLevel(logging.DEBUG) | |
args = argparse.ArgumentParser(description="Create filesystem JSON. Example:\n" | |
" ./fs2json.py --exclude /boot/ --out fs.json /mnt/", | |
formatter_class=argparse.RawTextHelpFormatter | |
) | |
args.add_argument("--exclude", | |
action="append", | |
metavar="path", | |
help="Path to exclude (relative to base path). Can be specified multiple times.") | |
args.add_argument("--out", | |
metavar="out", | |
nargs="?", | |
type=argparse.FileType("w"), | |
help="File to write to (defaults to stdout)", | |
default=sys.stdout) | |
args.add_argument("path", | |
metavar="path-or-tar", | |
help="Base path or tar file to include in JSON") | |
args = args.parse_args() | |
path = os.path.normpath(args.path) | |
if os.path.isfile(path): | |
tar = tarfile.open(path, "r") | |
else: | |
tar = None | |
if tar: | |
(root, total_size) = handle_tar(logger, tar) | |
else: | |
(root, total_size) = handle_dir(logger, path, args.exclude) | |
if False: | |
# normalize the order of children, useful to debug differences between | |
# the tar and filesystem reader | |
def sort_children(children): | |
for c in children: | |
if isinstance(c[IDX_TARGET], list): | |
sort_children(c[IDX_TARGET]) | |
children.sort() | |
sort_children(root) | |
result = { | |
"fsroot": root, | |
"version": VERSION, | |
"size": total_size, | |
} | |
logger.info("Creating json ...") | |
json.dump(result, args.out, check_circular=False, separators=(',', ':')) | |
def handle_dir(logger, path, exclude): | |
path = path + "/" | |
exclude = exclude or [] | |
exclude = [os.path.join("/", os.path.normpath(p)) for p in exclude] | |
exclude = set(exclude) | |
def onerror(oserror): | |
logger.warning(oserror) | |
rootdepth = path.count("/") | |
files = os.walk(path, onerror=onerror) | |
prevpath = [] | |
mainroot = [] | |
filename_to_hash = {} | |
total_size = 0 | |
rootstack = [mainroot] | |
def make_node(st, name): | |
obj = [None] * 7 | |
obj[IDX_NAME] = name | |
obj[IDX_SIZE] = st.st_size | |
obj[IDX_MTIME] = int(st.st_mtime) | |
obj[IDX_MODE] = int(st.st_mode) | |
obj[IDX_UID] = st.st_uid | |
obj[IDX_GID] = st.st_gid | |
nonlocal total_size | |
total_size += st.st_size | |
# Missing: | |
# int(st.st_atime), | |
# int(st.st_ctime), | |
return obj | |
logger.info("Creating file tree ...") | |
for f in files: | |
dirpath, dirnames, filenames = f | |
pathparts = dirpath.split("/") | |
pathparts = pathparts[rootdepth:] | |
fullpath = os.path.join("/", *pathparts) | |
if fullpath in exclude: | |
dirnames[:] = [] | |
continue | |
depth = 0 | |
for this, prev in zip(pathparts, prevpath): | |
if this != prev: | |
break | |
depth += 1 | |
for _name in prevpath[depth:]: | |
rootstack.pop() | |
oldroot = rootstack[-1] | |
assert len(pathparts[depth:]) == 1 | |
openname = pathparts[-1] | |
if openname == "": | |
root = mainroot | |
else: | |
root = [] | |
st = os.stat(dirpath) | |
rootobj = make_node(st, openname) | |
rootobj[IDX_TARGET] = root | |
oldroot.append(rootobj) | |
rootstack.append(root) | |
for filename in itertools.chain(filenames, dirnames): | |
absname = os.path.join(dirpath, filename) | |
st = os.lstat(absname) | |
isdir = stat.S_ISDIR(st.st_mode) | |
islink = stat.S_ISLNK(st.st_mode) | |
isfile = stat.S_ISREG(st.st_mode) | |
if isdir and not islink: | |
continue | |
obj = make_node(st, filename) | |
if islink: | |
target = os.readlink(absname) | |
obj[IDX_TARGET] = target | |
elif isfile: | |
file_hash = hash_file(absname) | |
filename = file_hash[0:HASH_LENGTH] + ".bin" | |
existing = filename_to_hash.get(filename) | |
assert existing is None or existing == file_hash, "Collision in short hash (%s and %s)" % (existing, file_hash) | |
filename_to_hash[filename] = file_hash | |
obj[IDX_FILENAME] = filename | |
while obj[-1] is None: | |
obj.pop() | |
root.append(obj) | |
prevpath = pathparts | |
return (mainroot, total_size) | |
def handle_tar(logger, tar): | |
mainroot = [] | |
filename_to_hash = {} | |
total_size = 0 | |
for member in tar.getmembers(): | |
parts = member.name.split("/") | |
name = parts.pop() | |
dir = mainroot | |
for p in parts: | |
for c in dir: | |
if c[IDX_NAME] == p: | |
dir = c[IDX_TARGET] | |
obj = [None] * 7 | |
obj[IDX_NAME] = name | |
obj[IDX_SIZE] = member.size | |
obj[IDX_MTIME] = member.mtime | |
obj[IDX_MODE] = member.mode | |
obj[IDX_UID] = member.uid | |
obj[IDX_GID] = member.gid | |
if member.isfile() or member.islnk(): | |
obj[IDX_MODE] |= S_IFREG | |
f = tar.extractfile(member) | |
file_hash = hash_fileobj(f) | |
filename = file_hash[0:HASH_LENGTH] + ".bin" | |
existing = filename_to_hash.get(filename) | |
assert existing is None or existing == file_hash, "Collision in short hash (%s and %s)" % (existing, file_hash) | |
filename_to_hash[filename] = file_hash | |
obj[IDX_FILENAME] = filename | |
if member.islnk(): | |
# fix size for hard links | |
f.seek(0, os.SEEK_END) | |
obj[IDX_SIZE] = int(f.tell()) | |
elif member.isdir(): | |
obj[IDX_MODE] |= S_IFDIR | |
obj[IDX_TARGET] = [] | |
elif member.issym(): | |
obj[IDX_MODE] |= S_IFLNK | |
obj[IDX_TARGET] = member.linkname | |
else: | |
logger.error("Unsupported type: {} ({})".format(member.type, name)) | |
total_size += obj[IDX_SIZE] | |
while obj[-1] is None: | |
obj.pop() | |
dir.append(obj) | |
return mainroot, total_size | |
if __name__ == "__main__": | |
main() | |