import bpy, os from collections import defaultdict from tqdm import tqdm import numpy as np from numpy import ndarray from typing import Dict, Tuple, List, Optional, Union import trimesh import fast_simplification from scipy.spatial import KDTree import argparse import yaml from box import Box import os from .log import new_entry, add_error, add_warning, new_log, end_log from .raw_data import RawData def load(filepath: str): old_objs = set(bpy.context.scene.objects) if not os.path.exists(filepath): raise ValueError(f'File {filepath} does not exist !') try: if filepath.endswith(".vrm"): # enable vrm addon and load vrm model bpy.ops.preferences.addon_enable(module='vrm') bpy.ops.import_scene.vrm( filepath=filepath, use_addon_preferences=True, extract_textures_into_folder=False, make_new_texture_folder=False, set_shading_type_to_material_on_import=False, set_view_transform_to_standard_on_import=True, set_armature_display_to_wire=True, set_armature_display_to_show_in_front=True, set_armature_bone_shape_to_default=True, disable_bake=True, # customized option for better performance ) elif filepath.endswith(".obj"): bpy.ops.wm.obj_import(filepath=filepath) elif filepath.endswith(".fbx") or filepath.endswith(".FBX"): # end bone is removed using remove_dummy_bone bpy.ops.import_scene.fbx(filepath=filepath, ignore_leaf_bones=False, use_image_search=False) elif filepath.endswith(".glb") or filepath.endswith(".gltf"): bpy.ops.import_scene.gltf(filepath=filepath, import_pack_images=False) elif filepath.endswith(".dae"): bpy.ops.wm.collada_import(filepath=filepath) elif filepath.endswith(".blend"): with bpy.data.libraries.load(filepath) as (data_from, data_to): data_to.objects = data_from.objects for obj in data_to.objects: if obj is not None: bpy.context.collection.objects.link(obj) else: raise ValueError(f"not suported type {filepath}") except: raise ValueError(f"failed to load {filepath}") armature = [x for x in set(bpy.context.scene.objects)-old_objs if x.type=="ARMATURE"] if len(armature)==0: return None if len(armature)>1: raise ValueError(f"multiple armatures found") armature = armature[0] armature.select_set(True) bpy.context.view_layer.objects.active = armature bpy.ops.object.mode_set(mode='EDIT') for bone in bpy.data.armatures[0].edit_bones: bone.roll = 0. # change all roll to 0. to prevent weird behaviour bpy.ops.object.mode_set(mode='OBJECT') armature.select_set(False) bpy.ops.object.select_all(action='DESELECT') return armature # remove all data in bpy def clean_bpy(): # First try to purge orphan data try: bpy.ops.outliner.orphans_purge(do_local_ids=True, do_linked_ids=True, do_recursive=True) except Exception as e: print(f"Warning: Could not purge orphans: {e}") # Then remove all data by type data_types = [ bpy.data.actions, bpy.data.armatures, bpy.data.cameras, bpy.data.collections, bpy.data.curves, bpy.data.images, bpy.data.lights, bpy.data.materials, bpy.data.meshes, bpy.data.objects, bpy.data.textures, bpy.data.worlds, bpy.data.node_groups ] for data_collection in data_types: try: for item in data_collection: try: data_collection.remove(item) except Exception as e: print(f"Warning: Could not remove {item.name} from {data_collection}: {e}") except Exception as e: print(f"Warning: Error processing {data_collection}: {e}") # Force garbage collection to free memory import gc gc.collect() def get_arranged_bones(armature): matrix_world = armature.matrix_world arranged_bones = [] root = armature.pose.bones[0] while root.parent is not None: root = root.parent Q = [root] rot = np.array(matrix_world)[:3, :3] # dfs and sort while len(Q) != 0: b = Q.pop(0) arranged_bones.append(b) children = [] for cb in b.children: head = rot @ np.array(b.head) children.append((cb, head[0], head[1], head[2])) children = sorted(children, key=lambda x: (x[3], x[1], x[2])) _c = [x[0] for x in children] Q = _c + Q return arranged_bones def process_mesh(): meshes = [] for v in bpy.data.objects: if v.type == 'MESH': meshes.append(v) _dict_mesh = {} for obj in meshes: m = np.array(obj.matrix_world) matrix_world_rot = m[:3, :3] matrix_world_bias = m[:3, 3] rot = matrix_world_rot total_vertices = len(obj.data.vertices) vertex = np.zeros((4, total_vertices)) vertex_normal = np.zeros((total_vertices, 3)) obj_verts = obj.data.vertices faces = [] normals = [] for v in obj_verts: vertex_normal[v.index] = rot @ np.array(v.normal) # be careful ! vv = rot @ v.co vv = np.array(vv) + matrix_world_bias vertex[0:3, v.index] = vv vertex[3][v.index] = 1 # affine coordinate for polygon in obj.data.polygons: edges = polygon.edge_keys nodes = [] adj = {} for edge in edges: if adj.get(edge[0]) is None: adj[edge[0]] = [] adj[edge[0]].append(edge[1]) if adj.get(edge[1]) is None: adj[edge[1]] = [] adj[edge[1]].append(edge[0]) nodes.append(edge[0]) nodes.append(edge[1]) normal = polygon.normal nodes = list(set(sorted(nodes))) first = nodes[0] loop = [] now = first vis = {} while True: loop.append(now) vis[now] = True if vis.get(adj[now][0]) is None: now = adj[now][0] elif vis.get(adj[now][1]) is None: now = adj[now][1] else: break for (second, third) in zip(loop[1:], loop[2:]): faces.append((first + 1, second + 1, third + 1)) # the cursed +1 normals.append(rot @ normal) # and the cursed normal of BLENDER correct_faces = [] for (i, face) in enumerate(faces): normal = normals[i] v0 = face[0] - 1 v1 = face[1] - 1 v2 = face[2] - 1 v = np.cross( vertex[:3, v1] - vertex[:3, v0], vertex[:3, v2] - vertex[:3, v0], ) if (v*normal).sum() > 0: correct_faces.append(face) else: correct_faces.append((face[0], face[2], face[1])) if len(correct_faces) > 0: _dict_mesh[obj.name] = { 'vertex': vertex, 'face': correct_faces, } vertex = np.concatenate([_dict_mesh[name]['vertex'] for name in _dict_mesh], axis=1)[:3, :].transpose() total_faces = 0 now_bias = 0 for name in _dict_mesh: total_faces += len(_dict_mesh[name]['face']) faces = np.zeros((total_faces, 3), dtype=np.int64) tot = 0 for name in _dict_mesh: f = np.array(_dict_mesh[name]['face'], dtype=np.int64) faces[tot:tot+f.shape[0]] = f + now_bias now_bias += _dict_mesh[name]['vertex'].shape[1] tot += f.shape[0] return vertex, faces def process_armature( armature, arranged_bones, ) -> Tuple[np.ndarray, np.ndarray]: matrix_world = armature.matrix_world index = {} for (id, pbone) in enumerate(arranged_bones): index[pbone.name] = id root = armature.pose.bones[0] while root.parent is not None: root = root.parent m = np.array(matrix_world.to_4x4()) scale_inv = np.linalg.inv(np.diag(matrix_world.to_scale())) rot = m[:3, :3] bias = m[:3, 3] s = [] bpy.ops.object.editmode_toggle() edit_bones = armature.data.edit_bones J = len(arranged_bones) joints = np.zeros((J, 3), dtype=np.float32) tails = np.zeros((J, 3), dtype=np.float32) parents = [] name_to_id = {} names = [] matrix_local_stack = np.zeros((J, 4, 4), dtype=np.float32) for (id, pbone) in enumerate(arranged_bones): name = pbone.name names.append(name) matrix_local = np.array(pbone.bone.matrix_local) use_inherit_rotation = pbone.bone.use_inherit_rotation if use_inherit_rotation == False: add_warning(f"use_inherit_rotation of bone {name} is False !") head = rot @ matrix_local[0:3, 3] + bias s.append(head) edit_bone = edit_bones.get(name) tail = rot @ np.array(edit_bone.tail) + bias name_to_id[name] = id joints[id] = head tails[id] = tail parents.append(None if pbone.parent not in arranged_bones else name_to_id[pbone.parent.name]) # remove scale part matrix_local[:, 3:4] = m @ matrix_local[:, 3:4] matrix_local[:3, :3] = scale_inv @ matrix_local[:3, :3] matrix_local_stack[id] = matrix_local bpy.ops.object.editmode_toggle() return joints, tails, parents, names, matrix_local_stack def save_raw_data( path: str, vertices: ndarray, faces: ndarray, joints: Union[ndarray, None], tails: Union[ndarray, None], parents: Union[List[Union[int, None]], None], names: Union[List[str], None], matrix_local: Union[ndarray, None], target_count: int, ): mesh = trimesh.Trimesh(vertices=vertices, faces=faces) vertices = np.array(mesh.vertices, dtype=np.float32) faces = np.array(mesh.faces, dtype=np.int64) if faces.shape[0] > target_count: vertices, faces = fast_simplification.simplify(vertices, faces, target_count=target_count) mesh = trimesh.Trimesh(vertices=vertices, faces=faces) new_vertices = np.array(mesh.vertices, dtype=np.float32) new_vertex_normals = np.array(mesh.vertex_normals, dtype=np.float32) new_faces = np.array(mesh.faces, dtype=np.int64) new_face_normals = np.array(mesh.face_normals, dtype=np.float32) if joints is not None: new_joints = np.array(joints, dtype=np.float32) else: new_joints = None raw_data = RawData( vertices=new_vertices, vertex_normals=new_vertex_normals, faces=new_faces, face_normals=new_face_normals, joints=new_joints, tails=tails, skin=None, no_skin=None, parents=parents, names=names, matrix_local=matrix_local, ) raw_data.check() raw_data.save(path=path) def extract_builtin( output_folder: str, target_count: int, num_runs: int, id: int, time: str, files: List[Union[str, str]], ): log_path = "./logs" log_path = os.path.join(log_path, time) num_files = len(files) gap = num_files // num_runs start = gap * id end = gap * (id + 1) if id+1==num_runs: end = num_files files = sorted(files) if end!=-1: files = files[:end] new_log(log_path, f"extract_builtin_{start}_{end}") tot = 0 for file in tqdm(files[start:]): input_file = file[0] output_dir = file[1] clean_bpy() new_entry(input_file) try: print(f"Now processing {input_file}...") armature = load(input_file) print('save to:', output_dir) os.makedirs(output_dir, exist_ok=True) vertices, faces = process_mesh() if armature is not None: arranged_bones = get_arranged_bones(armature) joints, tails, parents, names, matrix_local = process_armature(armature, arranged_bones) else: joints = None tails = None parents = None names = None matrix_local = None save_file = os.path.join(output_dir, 'raw_data.npz') save_raw_data( path=save_file, vertices=vertices, faces=faces-1, joints=joints, tails=tails, parents=parents, names=names, matrix_local=matrix_local, target_count=target_count, ) tot += 1 except ValueError as e: add_error(str(e)) print(f"ValueError: {str(e)}") except RuntimeError as e: add_error(str(e)) print(f"RuntimeError: {str(e)}") except TimeoutError as e: add_error("time out") print("TimeoutError: Processing timed out") except Exception as e: add_error(f"Unexpected error: {str(e)}") print(f"Unexpected error: {str(e)}") end_log() print(f"{tot} models processed") def str2bool(v): if isinstance(v, bool): return v if v.lower() in ('yes', 'true', 't', 'y', '1'): return True elif v.lower() in ('no', 'false', 'f', 'n', '0'): return False else: raise argparse.ArgumentTypeError('Boolean value expected.') def nullable_string(val): if not val: return None return val def get_files( data_name: str, input_dataset_dir: str, output_dataset_dir: str, inputs: Union[str, None]=None, require_suffix: List[str]=['obj','fbx','FBX','dae','glb','gltf','vrm'], force_override: bool=False, warning: bool=True, ) -> List[Tuple[str, str]]: files = [] # (input_file, output_dir) if inputs is not None: # specified input file(s) vis = {} inputs = inputs.split(',') for file in inputs: file_name = file.removeprefix("./") # remove suffix file_name = '.'.join(file_name.split('.')[:-1]) output_dir = os.path.join(output_dataset_dir, file_name) raw_data_npz = os.path.join(output_dir, data_name) if not force_override and os.path.exists(raw_data_npz): continue if warning and output_dir in vis: print(f"\033[33mWARNING: duplicate output directory: {output_dir}, you need to rename prefix of files to avoid ambiguity\033[0m") vis[output_dir] = True files.append((file, output_dir)) else: vis = {} for root, dirs, f in os.walk(input_dataset_dir): for file in f: if file.split('.')[-1] in require_suffix: file_name = file.removeprefix("./") # remove suffix file_name = '.'.join(file_name.split('.')[:-1]) output_dir = os.path.join(output_dataset_dir, os.path.relpath(root, input_dataset_dir), file_name) raw_data_npz = os.path.join(output_dir, data_name) # Check if all required files exist if not force_override and os.path.exists(raw_data_npz): continue if warning and output_dir in vis: print(f"\033[33mWARNING: duplicate output directory: {output_dir}, you need to rename prefix of files to avoid ambiguity\033[0m") vis[output_dir] = True files.append((os.path.join(root, file), output_dir)) return files def parse(): parser = argparse.ArgumentParser() parser.add_argument('--config', type=str, required=True) parser.add_argument('--require_suffix', type=str, required=True) parser.add_argument('--faces_target_count', type=int, required=True) parser.add_argument('--num_runs', type=int, required=True) parser.add_argument('--force_override', type=str2bool, required=True) parser.add_argument('--id', type=int, required=True) parser.add_argument('--time', type=str, required=True) parser.add_argument('--input', type=nullable_string, required=False, default=None) parser.add_argument('--input_dir', type=nullable_string, required=False, default=None) parser.add_argument('--output_dir', type=nullable_string, required=False, default=None) return parser.parse_args() if __name__ == "__main__": args = parse() config = Box(yaml.safe_load(open(args.config, "r"))) num_runs = args.num_runs id = args.id timestamp = args.time require_suffix = args.require_suffix.split(',') force_override = args.force_override target_count = args.faces_target_count if args.input_dir: config.input_dataset_dir = args.input_dir if args.output_dir: config.output_dataset_dir = args.output_dir assert config.input_dataset_dir is not None or args.input is None, 'you cannot specify both input and input_dir' files = get_files( data_name='raw_data.npz', inputs=args.input, input_dataset_dir=config.input_dataset_dir, output_dataset_dir=config.output_dataset_dir, require_suffix=require_suffix, force_override=force_override, warning=True, ) extract_builtin( output_folder=config.output_dataset_dir, target_count=target_count, num_runs=num_runs, id=id, time=timestamp, files=files, )