#! /usr/bin/env python # -*- coding: utf-8 -*- # Copyright 2016 Google Inc. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """Utilities for examining/injecting spatial media metadata in MP4/MOV files.""" import collections import os import re import struct import traceback import xml.etree import xml.etree.ElementTree from spatialmedia import mpeg MPEG_FILE_EXTENSIONS = [".mp4", ".mov"] SPHERICAL_UUID_ID = ( b"\xff\xcc\x82\x63\xf8\x55\x4a\x93\x88\x14\x58\x7a\x02\x52\x1f\xdd") # XML contents. RDF_PREFIX = " xmlns:rdf=\"http://www.w3.org/1999/02/22-rdf-syntax-ns#\" " SPHERICAL_XML_HEADER = \ ""\ "" SPHERICAL_XML_CONTENTS = \ "true"\ "true"\ ""\ "Spherical Metadata Tool"\ ""\ "equirectangular" SPHERICAL_XML_CONTENTS_TOP_BOTTOM = \ "top-bottom" SPHERICAL_XML_CONTENTS_LEFT_RIGHT = \ "left-right" # Parameter order matches that of the crop option. SPHERICAL_XML_CONTENTS_CROP_FORMAT = \ "{0}"\ ""\ "{1}"\ ""\ "{2}"\ "{3}"\ "{4}"\ "{5}" SPHERICAL_XML_FOOTER = "" SPHERICAL_TAGS_LIST = [ "Spherical", "Stitched", "StitchingSoftware", "ProjectionType", "SourceCount", "StereoMode", "InitialViewHeadingDegrees", "InitialViewPitchDegrees", "InitialViewRollDegrees", "Timestamp", "CroppedAreaImageWidthPixels", "CroppedAreaImageHeightPixels", "FullPanoWidthPixels", "FullPanoHeightPixels", "CroppedAreaLeftPixels", "CroppedAreaTopPixels", ] class Metadata(object): def __init__(self): self.video = None self.audio = None class ParsedMetadata(object): def __init__(self): self.video = dict() self.audio = None self.num_audio_channels = 0 SPHERICAL_PREFIX = "{http://ns.google.com/videos/1.0/spherical/}" SPHERICAL_TAGS = dict() for tag in SPHERICAL_TAGS_LIST: SPHERICAL_TAGS[SPHERICAL_PREFIX + tag] = tag integer_regex_group = "(\d+)" crop_regex = "^{0}$".format(":".join([integer_regex_group] * 6)) MAX_SUPPORTED_AMBIX_ORDER = 1 SpatialAudioDescription = collections.namedtuple( 'SpatialAudioDescription', 'order is_supported has_head_locked_stereo') def get_spatial_audio_description(num_channels): for i in range(1, MAX_SUPPORTED_AMBIX_ORDER+1): if (i + 1)*(i + 1) == num_channels: return SpatialAudioDescription( order=i, is_supported=True, has_head_locked_stereo=False) elif ((i + 1)*(i + 1) + 2) == num_channels: return SpatialAudioDescription( order=i, is_supported=True, has_head_locked_stereo=True) return SpatialAudioDescription( order=-1, is_supported=False, has_head_locked_stereo=True) def spherical_uuid(metadata): """Constructs a uuid containing spherical metadata. Args: metadata: String, xml to inject in spherical tag. Returns: uuid_leaf: a box containing spherical metadata. """ uuid_leaf = mpeg.Box() assert(len(SPHERICAL_UUID_ID) == 16) uuid_leaf.name = mpeg.constants.TAG_UUID uuid_leaf.header_size = 8 uuid_leaf.content_size = 0 uuid_leaf.contents = SPHERICAL_UUID_ID + metadata.encode("utf-8") uuid_leaf.content_size = len(uuid_leaf.contents) return uuid_leaf def mpeg4_add_spherical(mpeg4_file, in_fh, metadata): """Adds a spherical uuid box to an mpeg4 file for all video tracks. Args: mpeg4_file: mpeg4, Mpeg4 file structure to add metadata. in_fh: file handle, Source for uncached file contents. metadata: string, xml metadata to inject into spherical tag. """ for element in mpeg4_file.moov_box.contents: if element.name == mpeg.constants.TAG_TRAK: added = False element.remove(mpeg.constants.TAG_UUID) for sub_element in element.contents: if sub_element.name != mpeg.constants.TAG_MDIA: continue for mdia_sub_element in sub_element.contents: if mdia_sub_element.name != mpeg.constants.TAG_HDLR: continue position = mdia_sub_element.content_start() + 8 in_fh.seek(position) if in_fh.read(4) == mpeg.constants.TRAK_TYPE_VIDE: added = True break if added: if not element.add(spherical_uuid(metadata)): return False break mpeg4_file.resize() return True def mpeg4_add_spatial_audio(mpeg4_file, in_fh, audio_metadata, console): """Adds spatial audio metadata to the first audio track of the input mpeg4_file. Returns False on failure. Args: mpeg4_file: mpeg4, Mpeg4 file structure to add metadata. in_fh: file handle, Source for uncached file contents. audio_metadata: dictionary ('ambisonic_type': string, 'ambisonic_order': int, 'head_locked_stereo': Bool), Supports 'periphonic' ambisonic type only. """ for element in mpeg4_file.moov_box.contents: if element.name == mpeg.constants.TAG_TRAK: for sub_element in element.contents: if sub_element.name != mpeg.constants.TAG_MDIA: continue for mdia_sub_element in sub_element.contents: if mdia_sub_element.name != mpeg.constants.TAG_HDLR: continue position = mdia_sub_element.content_start() + 8 in_fh.seek(position) if in_fh.read(4) == mpeg.constants.TAG_SOUN: return inject_spatial_audio_atom( in_fh, sub_element, audio_metadata, console) return True def mpeg4_add_audio_metadata(mpeg4_file, in_fh, audio_metadata, console): num_audio_tracks = get_num_audio_tracks(mpeg4_file, in_fh) if num_audio_tracks > 1: console("Error: Expected 1 audio track. Found %d" % num_audio_tracks) return False return mpeg4_add_spatial_audio(mpeg4_file, in_fh, audio_metadata, console) def inject_spatial_audio_atom( in_fh, audio_media_atom, audio_metadata, console): for atom in audio_media_atom.contents: if atom.name != mpeg.constants.TAG_MINF: continue for element in atom.contents: if element.name != mpeg.constants.TAG_STBL: continue for sub_element in element.contents: if sub_element.name != mpeg.constants.TAG_STSD: continue for sample_description in sub_element.contents: if sample_description.name in\ mpeg.constants.SOUND_SAMPLE_DESCRIPTIONS: in_fh.seek(sample_description.position + sample_description.header_size + 16) num_channels = get_num_audio_channels( sub_element, in_fh) expected_num_channels = \ get_expected_num_audio_channels( audio_metadata["ambisonic_type"], audio_metadata["ambisonic_order"], audio_metadata["head_locked_stereo"]) if num_channels != expected_num_channels: head_locked_stereo_msg = (" with head-locked stereo" if audio_metadata["head_locked_stereo"] else "") err_msg = "Error: Found %d audio channel(s). "\ "Expected %d channel(s) for %s ambisonics "\ "of order %d%s."\ % (num_channels, expected_num_channels, audio_metadata["ambisonic_type"], audio_metadata["ambisonic_order"], head_locked_stereo_msg) console(err_msg) return False sa3d_atom = mpeg.SA3DBox.create( num_channels, audio_metadata) sample_description.contents.append(sa3d_atom) return True def parse_spherical_xml(contents, console): """Returns spherical metadata for a set of xml data. Args: contents: string, spherical metadata xml contents. Returns: dictionary containing the parsed spherical metadata values. """ try: parsed_xml = xml.etree.ElementTree.XML(contents) except xml.etree.ElementTree.ParseError: try: console(traceback.format_exc()) console(contents) index = contents.find(" full_width_pixels or cropped_height_pixels > full_height_pixels): print("Error with crop params: cropped area dimensions are "\ "invalid: width = {width} height = {height}".format( width=cropped_width_pixels, height=cropped_height_pixels)) return False # We are pretty restrictive and don't allow anything strange. There # could be use-cases for a horizontal offset that essentially # translates the domain, but we don't support this (so that no # extra work has to be done on the client). total_width = cropped_offset_left_pixels + cropped_width_pixels total_height = cropped_offset_top_pixels + cropped_height_pixels if (cropped_offset_left_pixels < 0 or cropped_offset_top_pixels < 0 or total_width > full_width_pixels or total_height > full_height_pixels): print("Error with crop params: cropped area offsets are "\ "invalid: left = {left} top = {top} "\ "left+cropped width: {total_width} "\ "top+cropped height: {total_height}".format( left=cropped_offset_left_pixels, top=cropped_offset_top_pixels, total_width=total_width, total_height=total_height)) return False additional_xml += SPHERICAL_XML_CONTENTS_CROP_FORMAT.format( cropped_width_pixels, cropped_height_pixels, full_width_pixels, full_height_pixels, cropped_offset_left_pixels, cropped_offset_top_pixels) spherical_xml = (SPHERICAL_XML_HEADER + SPHERICAL_XML_CONTENTS + additional_xml + SPHERICAL_XML_FOOTER) return spherical_xml def get_descriptor_length(in_fh): """Derives the length of the MP4 elementary stream descriptor at the current position in the input file. """ descriptor_length = 0 for i in range(4): size_byte = struct.unpack(">c", in_fh.read(1))[0] descriptor_length = (descriptor_length << 7 | ord(size_byte) & int("0x7f", 0)) if (ord(size_byte) != int("0x80", 0)): break return descriptor_length def get_expected_num_audio_channels( ambisonics_type, ambisonics_order, head_locked_stereo): """ Returns the expected number of ambisonic components for a given ambisonic type and ambisonic order. """ head_locked_stereo_channels = 2 if head_locked_stereo == True else 0 if (ambisonics_type == 'periphonic'): return (((ambisonics_order + 1) * (ambisonics_order + 1)) + head_locked_stereo_channels) else: return -1 def get_num_audio_channels(stsd, in_fh): if stsd.name != mpeg.constants.TAG_STSD: print("get_num_audio_channels should be given a STSD box") return -1 for sample_description in stsd.contents: if sample_description.name == mpeg.constants.TAG_MP4A: return get_aac_num_channels(sample_description, in_fh) elif sample_description.name in mpeg.constants.SOUND_SAMPLE_DESCRIPTIONS: return get_sample_description_num_channels(sample_description, in_fh) return -1 def get_sample_description_num_channels(sample_description, in_fh): """Reads the number of audio channels from a sound sample description. """ p = in_fh.tell() in_fh.seek(sample_description.content_start() + 8) version = struct.unpack(">h", in_fh.read(2))[0] revision_level = struct.unpack(">h", in_fh.read(2))[0] vendor = struct.unpack(">i", in_fh.read(4))[0] if version == 0: num_audio_channels = struct.unpack(">h", in_fh.read(2))[0] sample_size_bytes = struct.unpack(">h", in_fh.read(2))[0] elif version == 1: num_audio_channels = struct.unpack(">h", in_fh.read(2))[0] sample_size_bytes = struct.unpack(">h", in_fh.read(2))[0] samples_per_packet = struct.unpack(">i", in_fh.read(4))[0] bytes_per_packet = struct.unpack(">i", in_fh.read(4))[0] bytes_per_frame = struct.unpack(">i", in_fh.read(4))[0] bytes_per_sample = struct.unpack(">i", in_fh.read(4))[0] elif version == 2: always_3 = struct.unpack(">h", in_fh.read(2))[0] always_16 = struct.unpack(">h", in_fh.read(2))[0] always_minus_2 = struct.unpack(">h", in_fh.read(2))[0] always_0 = struct.unpack(">h", in_fh.read(2))[0] always_65536 = struct.unpack(">i", in_fh.read(4))[0] size_of_struct_only = struct.unpack(">i", in_fh.read(4))[0] audio_sample_rate = struct.unpack(">d", in_fh.read(8))[0] num_audio_channels = struct.unpack(">i", in_fh.read(4))[0] else: print("Unsupported version for " + sample_description.name + " box") return -1 in_fh.seek(p) return num_audio_channels def get_aac_num_channels(box, in_fh): """Reads the number of audio channels from AAC's AudioSpecificConfig descriptor within the esds child box of the input mp4a or wave box. """ p = in_fh.tell() if box.name not in [mpeg.constants.TAG_MP4A, mpeg.constants.TAG_WAVE]: return -1 for element in box.contents: if element.name == mpeg.constants.TAG_WAVE: # Handle .mov with AAC audio, where the structure is: # stsd -> mp4a -> wave -> esds channel_configuration = get_aac_num_channels(element, in_fh) break if element.name != mpeg.constants.TAG_ESDS: continue in_fh.seek(element.content_start() + 4) descriptor_tag = struct.unpack(">c", in_fh.read(1))[0] # Verify the read descriptor is an elementary stream descriptor if ord(descriptor_tag) != 3: # Not an MP4 elementary stream. print("Error: failed to read elementary stream descriptor.") return -1 get_descriptor_length(in_fh) in_fh.seek(3, 1) # Seek to the decoder configuration descriptor config_descriptor_tag = struct.unpack(">c", in_fh.read(1))[0] # Verify the read descriptor is a decoder config. descriptor. if ord(config_descriptor_tag) != 4: print("Error: failed to read decoder config. descriptor.") return -1 get_descriptor_length(in_fh) in_fh.seek(13, 1) # offset to the decoder specific config descriptor. decoder_specific_descriptor_tag = struct.unpack(">c", in_fh.read(1))[0] # Verify the read descriptor is a decoder specific info descriptor if ord(decoder_specific_descriptor_tag) != 5: print("Error: failed to read MP4 audio decoder specific config.") return -1 audio_specific_descriptor_size = get_descriptor_length(in_fh) assert audio_specific_descriptor_size >= 2 decoder_descriptor = struct.unpack(">h", in_fh.read(2))[0] object_type = (int("F800", 16) & decoder_descriptor) >> 11 sampling_frequency_index = (int("0780", 16) & decoder_descriptor) >> 7 if sampling_frequency_index == 0: # TODO: If the sample rate is 96kHz an additional 24 bit offset # value here specifies the actual sample rate. print("Error: Greater than 48khz audio is currently not supported.") return -1 channel_configuration = (int("0078", 16) & decoder_descriptor) >> 3 in_fh.seek(p) return channel_configuration def get_num_audio_tracks(mpeg4_file, in_fh): """ Returns the number of audio track in the input mpeg4 file. """ num_audio_tracks = 0 for element in mpeg4_file.moov_box.contents: if (element.name == mpeg.constants.TAG_TRAK): for sub_element in element.contents: if (sub_element.name != mpeg.constants.TAG_MDIA): continue for mdia_sub_element in sub_element.contents: if (mdia_sub_element.name != mpeg.constants.TAG_HDLR): continue position = mdia_sub_element.content_start() + 8 in_fh.seek(position) if (in_fh.read(4) == mpeg.constants.TAG_SOUN): num_audio_tracks += 1 return num_audio_tracks def get_spatial_audio_metadata(ambisonic_order, head_locked_stereo): num_channels = get_expected_num_audio_channels( "periphonic", ambisonic_order, head_locked_stereo) metadata = { "ambisonic_order": 0, "head_locked_stereo": False, "ambisonic_type": "periphonic", "ambisonic_channel_ordering": "ACN", "ambisonic_normalization": "SN3D", "channel_map": [], } metadata['ambisonic_order'] = ambisonic_order metadata['head_locked_stereo'] = head_locked_stereo metadata['channel_map'] = range(0, num_channels) return metadata