#! /usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright 2016 Google Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Utilities for examining/injecting spatial media metadata in MP4/MOV files."""
import collections
import os
import re
import struct
import traceback
import xml.etree
import xml.etree.ElementTree
from spatialmedia import mpeg
MPEG_FILE_EXTENSIONS = [".mp4", ".mov"]
SPHERICAL_UUID_ID = (
b"\xff\xcc\x82\x63\xf8\x55\x4a\x93\x88\x14\x58\x7a\x02\x52\x1f\xdd")
# XML contents.
RDF_PREFIX = " xmlns:rdf=\"http://www.w3.org/1999/02/22-rdf-syntax-ns#\" "
SPHERICAL_XML_HEADER = \
""\
""
SPHERICAL_XML_CONTENTS = \
"true"\
"true"\
""\
"Spherical Metadata Tool"\
""\
"equirectangular"
SPHERICAL_XML_CONTENTS_TOP_BOTTOM = \
"top-bottom"
SPHERICAL_XML_CONTENTS_LEFT_RIGHT = \
"left-right"
# Parameter order matches that of the crop option.
SPHERICAL_XML_CONTENTS_CROP_FORMAT = \
"{0}"\
""\
"{1}"\
""\
"{2}"\
"{3}"\
"{4}"\
"{5}"
SPHERICAL_XML_FOOTER = ""
SPHERICAL_TAGS_LIST = [
"Spherical",
"Stitched",
"StitchingSoftware",
"ProjectionType",
"SourceCount",
"StereoMode",
"InitialViewHeadingDegrees",
"InitialViewPitchDegrees",
"InitialViewRollDegrees",
"Timestamp",
"CroppedAreaImageWidthPixels",
"CroppedAreaImageHeightPixels",
"FullPanoWidthPixels",
"FullPanoHeightPixels",
"CroppedAreaLeftPixels",
"CroppedAreaTopPixels",
]
class Metadata(object):
def __init__(self):
self.video = None
self.audio = None
class ParsedMetadata(object):
def __init__(self):
self.video = dict()
self.audio = None
self.num_audio_channels = 0
SPHERICAL_PREFIX = "{http://ns.google.com/videos/1.0/spherical/}"
SPHERICAL_TAGS = dict()
for tag in SPHERICAL_TAGS_LIST:
SPHERICAL_TAGS[SPHERICAL_PREFIX + tag] = tag
integer_regex_group = "(\d+)"
crop_regex = "^{0}$".format(":".join([integer_regex_group] * 6))
MAX_SUPPORTED_AMBIX_ORDER = 1
SpatialAudioDescription = collections.namedtuple(
'SpatialAudioDescription',
'order is_supported has_head_locked_stereo')
def get_spatial_audio_description(num_channels):
for i in range(1, MAX_SUPPORTED_AMBIX_ORDER+1):
if (i + 1)*(i + 1) == num_channels:
return SpatialAudioDescription(
order=i, is_supported=True, has_head_locked_stereo=False)
elif ((i + 1)*(i + 1) + 2) == num_channels:
return SpatialAudioDescription(
order=i, is_supported=True, has_head_locked_stereo=True)
return SpatialAudioDescription(
order=-1, is_supported=False, has_head_locked_stereo=True)
def spherical_uuid(metadata):
"""Constructs a uuid containing spherical metadata.
Args:
metadata: String, xml to inject in spherical tag.
Returns:
uuid_leaf: a box containing spherical metadata.
"""
uuid_leaf = mpeg.Box()
assert(len(SPHERICAL_UUID_ID) == 16)
uuid_leaf.name = mpeg.constants.TAG_UUID
uuid_leaf.header_size = 8
uuid_leaf.content_size = 0
uuid_leaf.contents = SPHERICAL_UUID_ID + metadata.encode("utf-8")
uuid_leaf.content_size = len(uuid_leaf.contents)
return uuid_leaf
def mpeg4_add_spherical(mpeg4_file, in_fh, metadata):
"""Adds a spherical uuid box to an mpeg4 file for all video tracks.
Args:
mpeg4_file: mpeg4, Mpeg4 file structure to add metadata.
in_fh: file handle, Source for uncached file contents.
metadata: string, xml metadata to inject into spherical tag.
"""
for element in mpeg4_file.moov_box.contents:
if element.name == mpeg.constants.TAG_TRAK:
added = False
element.remove(mpeg.constants.TAG_UUID)
for sub_element in element.contents:
if sub_element.name != mpeg.constants.TAG_MDIA:
continue
for mdia_sub_element in sub_element.contents:
if mdia_sub_element.name != mpeg.constants.TAG_HDLR:
continue
position = mdia_sub_element.content_start() + 8
in_fh.seek(position)
if in_fh.read(4) == mpeg.constants.TRAK_TYPE_VIDE:
added = True
break
if added:
if not element.add(spherical_uuid(metadata)):
return False
break
mpeg4_file.resize()
return True
def mpeg4_add_spatial_audio(mpeg4_file, in_fh, audio_metadata, console):
"""Adds spatial audio metadata to the first audio track of the input
mpeg4_file. Returns False on failure.
Args:
mpeg4_file: mpeg4, Mpeg4 file structure to add metadata.
in_fh: file handle, Source for uncached file contents.
audio_metadata: dictionary ('ambisonic_type': string,
'ambisonic_order': int, 'head_locked_stereo': Bool),
Supports 'periphonic' ambisonic type only.
"""
for element in mpeg4_file.moov_box.contents:
if element.name == mpeg.constants.TAG_TRAK:
for sub_element in element.contents:
if sub_element.name != mpeg.constants.TAG_MDIA:
continue
for mdia_sub_element in sub_element.contents:
if mdia_sub_element.name != mpeg.constants.TAG_HDLR:
continue
position = mdia_sub_element.content_start() + 8
in_fh.seek(position)
if in_fh.read(4) == mpeg.constants.TAG_SOUN:
return inject_spatial_audio_atom(
in_fh, sub_element, audio_metadata, console)
return True
def mpeg4_add_audio_metadata(mpeg4_file, in_fh, audio_metadata, console):
num_audio_tracks = get_num_audio_tracks(mpeg4_file, in_fh)
if num_audio_tracks > 1:
console("Error: Expected 1 audio track. Found %d" % num_audio_tracks)
return False
return mpeg4_add_spatial_audio(mpeg4_file, in_fh, audio_metadata, console)
def inject_spatial_audio_atom(
in_fh, audio_media_atom, audio_metadata, console):
for atom in audio_media_atom.contents:
if atom.name != mpeg.constants.TAG_MINF:
continue
for element in atom.contents:
if element.name != mpeg.constants.TAG_STBL:
continue
for sub_element in element.contents:
if sub_element.name != mpeg.constants.TAG_STSD:
continue
for sample_description in sub_element.contents:
if sample_description.name in\
mpeg.constants.SOUND_SAMPLE_DESCRIPTIONS:
in_fh.seek(sample_description.position +
sample_description.header_size + 16)
num_channels = get_num_audio_channels(
sub_element, in_fh)
expected_num_channels = \
get_expected_num_audio_channels(
audio_metadata["ambisonic_type"],
audio_metadata["ambisonic_order"],
audio_metadata["head_locked_stereo"])
if num_channels != expected_num_channels:
head_locked_stereo_msg = (" with head-locked stereo" if
audio_metadata["head_locked_stereo"] else "")
err_msg = "Error: Found %d audio channel(s). "\
"Expected %d channel(s) for %s ambisonics "\
"of order %d%s."\
% (num_channels,
expected_num_channels,
audio_metadata["ambisonic_type"],
audio_metadata["ambisonic_order"],
head_locked_stereo_msg)
console(err_msg)
return False
sa3d_atom = mpeg.SA3DBox.create(
num_channels, audio_metadata)
sample_description.contents.append(sa3d_atom)
return True
def parse_spherical_xml(contents, console):
"""Returns spherical metadata for a set of xml data.
Args:
contents: string, spherical metadata xml contents.
Returns:
dictionary containing the parsed spherical metadata values.
"""
try:
parsed_xml = xml.etree.ElementTree.XML(contents)
except xml.etree.ElementTree.ParseError:
try:
console(traceback.format_exc())
console(contents)
index = contents.find(" full_width_pixels or
cropped_height_pixels > full_height_pixels):
print("Error with crop params: cropped area dimensions are "\
"invalid: width = {width} height = {height}".format(
width=cropped_width_pixels,
height=cropped_height_pixels))
return False
# We are pretty restrictive and don't allow anything strange. There
# could be use-cases for a horizontal offset that essentially
# translates the domain, but we don't support this (so that no
# extra work has to be done on the client).
total_width = cropped_offset_left_pixels + cropped_width_pixels
total_height = cropped_offset_top_pixels + cropped_height_pixels
if (cropped_offset_left_pixels < 0 or
cropped_offset_top_pixels < 0 or
total_width > full_width_pixels or
total_height > full_height_pixels):
print("Error with crop params: cropped area offsets are "\
"invalid: left = {left} top = {top} "\
"left+cropped width: {total_width} "\
"top+cropped height: {total_height}".format(
left=cropped_offset_left_pixels,
top=cropped_offset_top_pixels,
total_width=total_width,
total_height=total_height))
return False
additional_xml += SPHERICAL_XML_CONTENTS_CROP_FORMAT.format(
cropped_width_pixels, cropped_height_pixels,
full_width_pixels, full_height_pixels,
cropped_offset_left_pixels, cropped_offset_top_pixels)
spherical_xml = (SPHERICAL_XML_HEADER +
SPHERICAL_XML_CONTENTS +
additional_xml +
SPHERICAL_XML_FOOTER)
return spherical_xml
def get_descriptor_length(in_fh):
"""Derives the length of the MP4 elementary stream descriptor at the
current position in the input file.
"""
descriptor_length = 0
for i in range(4):
size_byte = struct.unpack(">c", in_fh.read(1))[0]
descriptor_length = (descriptor_length << 7 |
ord(size_byte) & int("0x7f", 0))
if (ord(size_byte) != int("0x80", 0)):
break
return descriptor_length
def get_expected_num_audio_channels(
ambisonics_type, ambisonics_order, head_locked_stereo):
""" Returns the expected number of ambisonic components for a given
ambisonic type and ambisonic order.
"""
head_locked_stereo_channels = 2 if head_locked_stereo == True else 0
if (ambisonics_type == 'periphonic'):
return (((ambisonics_order + 1) * (ambisonics_order + 1)) +
head_locked_stereo_channels)
else:
return -1
def get_num_audio_channels(stsd, in_fh):
if stsd.name != mpeg.constants.TAG_STSD:
print("get_num_audio_channels should be given a STSD box")
return -1
for sample_description in stsd.contents:
if sample_description.name == mpeg.constants.TAG_MP4A:
return get_aac_num_channels(sample_description, in_fh)
elif sample_description.name in mpeg.constants.SOUND_SAMPLE_DESCRIPTIONS:
return get_sample_description_num_channels(sample_description, in_fh)
return -1
def get_sample_description_num_channels(sample_description, in_fh):
"""Reads the number of audio channels from a sound sample description.
"""
p = in_fh.tell()
in_fh.seek(sample_description.content_start() + 8)
version = struct.unpack(">h", in_fh.read(2))[0]
revision_level = struct.unpack(">h", in_fh.read(2))[0]
vendor = struct.unpack(">i", in_fh.read(4))[0]
if version == 0:
num_audio_channels = struct.unpack(">h", in_fh.read(2))[0]
sample_size_bytes = struct.unpack(">h", in_fh.read(2))[0]
elif version == 1:
num_audio_channels = struct.unpack(">h", in_fh.read(2))[0]
sample_size_bytes = struct.unpack(">h", in_fh.read(2))[0]
samples_per_packet = struct.unpack(">i", in_fh.read(4))[0]
bytes_per_packet = struct.unpack(">i", in_fh.read(4))[0]
bytes_per_frame = struct.unpack(">i", in_fh.read(4))[0]
bytes_per_sample = struct.unpack(">i", in_fh.read(4))[0]
elif version == 2:
always_3 = struct.unpack(">h", in_fh.read(2))[0]
always_16 = struct.unpack(">h", in_fh.read(2))[0]
always_minus_2 = struct.unpack(">h", in_fh.read(2))[0]
always_0 = struct.unpack(">h", in_fh.read(2))[0]
always_65536 = struct.unpack(">i", in_fh.read(4))[0]
size_of_struct_only = struct.unpack(">i", in_fh.read(4))[0]
audio_sample_rate = struct.unpack(">d", in_fh.read(8))[0]
num_audio_channels = struct.unpack(">i", in_fh.read(4))[0]
else:
print("Unsupported version for " + sample_description.name + " box")
return -1
in_fh.seek(p)
return num_audio_channels
def get_aac_num_channels(box, in_fh):
"""Reads the number of audio channels from AAC's AudioSpecificConfig
descriptor within the esds child box of the input mp4a or wave box.
"""
p = in_fh.tell()
if box.name not in [mpeg.constants.TAG_MP4A, mpeg.constants.TAG_WAVE]:
return -1
for element in box.contents:
if element.name == mpeg.constants.TAG_WAVE:
# Handle .mov with AAC audio, where the structure is:
# stsd -> mp4a -> wave -> esds
channel_configuration = get_aac_num_channels(element, in_fh)
break
if element.name != mpeg.constants.TAG_ESDS:
continue
in_fh.seek(element.content_start() + 4)
descriptor_tag = struct.unpack(">c", in_fh.read(1))[0]
# Verify the read descriptor is an elementary stream descriptor
if ord(descriptor_tag) != 3: # Not an MP4 elementary stream.
print("Error: failed to read elementary stream descriptor.")
return -1
get_descriptor_length(in_fh)
in_fh.seek(3, 1) # Seek to the decoder configuration descriptor
config_descriptor_tag = struct.unpack(">c", in_fh.read(1))[0]
# Verify the read descriptor is a decoder config. descriptor.
if ord(config_descriptor_tag) != 4:
print("Error: failed to read decoder config. descriptor.")
return -1
get_descriptor_length(in_fh)
in_fh.seek(13, 1) # offset to the decoder specific config descriptor.
decoder_specific_descriptor_tag = struct.unpack(">c", in_fh.read(1))[0]
# Verify the read descriptor is a decoder specific info descriptor
if ord(decoder_specific_descriptor_tag) != 5:
print("Error: failed to read MP4 audio decoder specific config.")
return -1
audio_specific_descriptor_size = get_descriptor_length(in_fh)
assert audio_specific_descriptor_size >= 2
decoder_descriptor = struct.unpack(">h", in_fh.read(2))[0]
object_type = (int("F800", 16) & decoder_descriptor) >> 11
sampling_frequency_index = (int("0780", 16) & decoder_descriptor) >> 7
if sampling_frequency_index == 0:
# TODO: If the sample rate is 96kHz an additional 24 bit offset
# value here specifies the actual sample rate.
print("Error: Greater than 48khz audio is currently not supported.")
return -1
channel_configuration = (int("0078", 16) & decoder_descriptor) >> 3
in_fh.seek(p)
return channel_configuration
def get_num_audio_tracks(mpeg4_file, in_fh):
""" Returns the number of audio track in the input mpeg4 file. """
num_audio_tracks = 0
for element in mpeg4_file.moov_box.contents:
if (element.name == mpeg.constants.TAG_TRAK):
for sub_element in element.contents:
if (sub_element.name != mpeg.constants.TAG_MDIA):
continue
for mdia_sub_element in sub_element.contents:
if (mdia_sub_element.name != mpeg.constants.TAG_HDLR):
continue
position = mdia_sub_element.content_start() + 8
in_fh.seek(position)
if (in_fh.read(4) == mpeg.constants.TAG_SOUN):
num_audio_tracks += 1
return num_audio_tracks
def get_spatial_audio_metadata(ambisonic_order, head_locked_stereo):
num_channels = get_expected_num_audio_channels(
"periphonic", ambisonic_order, head_locked_stereo)
metadata = {
"ambisonic_order": 0,
"head_locked_stereo": False,
"ambisonic_type": "periphonic",
"ambisonic_channel_ordering": "ACN",
"ambisonic_normalization": "SN3D",
"channel_map": [],
}
metadata['ambisonic_order'] = ambisonic_order
metadata['head_locked_stereo'] = head_locked_stereo
metadata['channel_map'] = range(0, num_channels)
return metadata