|
import numpy as np |
|
|
|
from sortedcontainers import SortedList |
|
|
|
PYANNOTE_SEGMENT = 'segment' |
|
|
|
|
|
class Timeline: |
|
@classmethod |
|
def from_df(cls, df, uri = None): |
|
return cls(segments=list(df[PYANNOTE_SEGMENT]), uri=uri) |
|
|
|
def __init__(self, segments = None, uri = None): |
|
if segments is None: segments = () |
|
segments_set = set([segment for segment in segments if segment]) |
|
|
|
self.segments_set_ = segments_set |
|
self.segments_list_ = SortedList(segments_set) |
|
self.segments_boundaries_ = SortedList((boundary for segment in segments_set for boundary in segment)) |
|
self.uri = uri |
|
|
|
def __len__(self): |
|
return len(self.segments_set_) |
|
|
|
def __nonzero__(self): |
|
return self.__bool__() |
|
|
|
def __bool__(self): |
|
return len(self.segments_set_) > 0 |
|
|
|
def __iter__(self): |
|
return iter(self.segments_list_) |
|
|
|
def __getitem__(self, k): |
|
return self.segments_list_[k] |
|
|
|
def __eq__(self, other): |
|
return self.segments_set_ == other.segments_set_ |
|
|
|
def __ne__(self, other): |
|
return self.segments_set_ != other.segments_set_ |
|
|
|
def index(self, segment): |
|
return self.segments_list_.index(segment) |
|
|
|
def add(self, segment): |
|
segments_set_ = self.segments_set_ |
|
if segment in segments_set_ or not segment: return self |
|
|
|
segments_set_.add(segment) |
|
self.segments_list_.add(segment) |
|
|
|
segments_boundaries_ = self.segments_boundaries_ |
|
segments_boundaries_.add(segment.start) |
|
segments_boundaries_.add(segment.end) |
|
|
|
return self |
|
|
|
def remove(self, segment): |
|
segments_set_ = self.segments_set_ |
|
if segment not in segments_set_: return self |
|
|
|
segments_set_.remove(segment) |
|
self.segments_list_.remove(segment) |
|
|
|
segments_boundaries_ = self.segments_boundaries_ |
|
segments_boundaries_.remove(segment.start) |
|
segments_boundaries_.remove(segment.end) |
|
|
|
return self |
|
|
|
def discard(self, segment): |
|
return self.remove(segment) |
|
|
|
def __ior__(self, timeline): |
|
return self.update(timeline) |
|
|
|
def update(self, timeline): |
|
segments_set = self.segments_set_ |
|
segments_set |= timeline.segments_set_ |
|
|
|
self.segments_list_ = SortedList(segments_set) |
|
self.segments_boundaries_ = SortedList((boundary for segment in segments_set for boundary in segment)) |
|
|
|
return self |
|
|
|
def __or__(self, timeline): |
|
return self.union(timeline) |
|
|
|
def union(self, timeline): |
|
return Timeline(segments=self.segments_set_ | timeline.segments_set_, uri=self.uri) |
|
|
|
def co_iter(self, other): |
|
for segment in self.segments_list_: |
|
temp = Segment(start=segment.end, end=segment.end) |
|
|
|
for other_segment in other.segments_list_.irange(maximum=temp): |
|
if segment.intersects(other_segment): yield segment, other_segment |
|
|
|
def crop_iter(self, support, mode = 'intersection', returns_mapping = False): |
|
if mode not in {'loose', 'strict', 'intersection'}: raise ValueError |
|
if not isinstance(support, (Segment, Timeline)): raise TypeError |
|
|
|
if isinstance(support, Segment): |
|
support = Timeline(segments=([support] if support else []), uri=self.uri) |
|
|
|
for yielded in self.crop_iter(support, mode=mode, returns_mapping=returns_mapping): |
|
yield yielded |
|
|
|
return |
|
|
|
support = support.support() |
|
|
|
if mode == 'loose': |
|
for segment, _ in self.co_iter(support): |
|
yield segment |
|
|
|
return |
|
|
|
if mode == 'strict': |
|
for segment, other_segment in self.co_iter(support): |
|
if segment in other_segment: yield segment |
|
|
|
return |
|
|
|
for segment, other_segment in self.co_iter(support): |
|
mapped_to = segment & other_segment |
|
if not mapped_to: continue |
|
|
|
if returns_mapping: yield segment, mapped_to |
|
else: yield mapped_to |
|
|
|
def crop(self, support, mode = 'intersection', returns_mapping = False): |
|
if mode == 'intersection' and returns_mapping: |
|
segments, mapping = [], {} |
|
|
|
for segment, mapped_to in self.crop_iter(support, mode='intersection', returns_mapping=True): |
|
segments.append(mapped_to) |
|
mapping[mapped_to] = mapping.get(mapped_to, list()) + [segment] |
|
|
|
return Timeline(segments=segments, uri=self.uri), mapping |
|
|
|
return Timeline(segments=self.crop_iter(support, mode=mode), uri=self.uri) |
|
|
|
def overlapping(self, t): |
|
return list(self.overlapping_iter(t)) |
|
|
|
def overlapping_iter(self, t): |
|
for segment in self.segments_list_.irange(maximum=Segment(start=t, end=t)): |
|
if segment.overlaps(t): yield segment |
|
|
|
def get_overlap(self): |
|
overlaps_tl = Timeline(uri=self.uri) |
|
|
|
for s1, s2 in self.co_iter(self): |
|
if s1 == s2: continue |
|
|
|
overlaps_tl.add(s1 & s2) |
|
|
|
return overlaps_tl.support() |
|
|
|
def extrude(self, removed, mode = 'intersection'): |
|
if isinstance(removed, Segment): removed = Timeline([removed]) |
|
|
|
if mode == "loose": mode = "strict" |
|
elif mode == "strict": mode = "loose" |
|
|
|
return self.crop(removed.gaps(support=Timeline([self.extent()], uri=self.uri)), mode=mode) |
|
|
|
def __str__(self): |
|
n = len(self.segments_list_) |
|
string = "[" |
|
|
|
for i, segment in enumerate(self.segments_list_): |
|
string += str(segment) |
|
string += "\n " if i + 1 < n else "" |
|
|
|
string += "]" |
|
return string |
|
|
|
def __repr__(self): |
|
return "<Timeline(uri=%s, segments=%s)>" % (self.uri, list(self.segments_list_)) |
|
|
|
def __contains__(self, included): |
|
if isinstance(included, Segment): return included in self.segments_set_ |
|
elif isinstance(included, Timeline): return self.segments_set_.issuperset(included.segments_set_) |
|
else: raise TypeError |
|
|
|
def empty(self): |
|
return Timeline(uri=self.uri) |
|
|
|
def covers(self, other): |
|
gaps = self.gaps(support=other.extent()) |
|
|
|
for _ in gaps.co_iter(other): |
|
return False |
|
|
|
return True |
|
|
|
def copy(self, segment_func = None): |
|
if segment_func is None: return Timeline(segments=self.segments_list_, uri=self.uri) |
|
return Timeline(segments=[segment_func(s) for s in self.segments_list_], uri=self.uri) |
|
|
|
def extent(self): |
|
if self.segments_set_: |
|
segments_boundaries_ = self.segments_boundaries_ |
|
return Segment(start=segments_boundaries_[0], end=segments_boundaries_[-1]) |
|
|
|
return Segment(start=0.0, end=0.0) |
|
|
|
def support_iter(self, collar = 0.0): |
|
if not self: return |
|
|
|
new_segment = self.segments_list_[0] |
|
|
|
for segment in self: |
|
possible_gap = segment ^ new_segment |
|
|
|
if not possible_gap or possible_gap.duration < collar: new_segment |= segment |
|
else: |
|
yield new_segment |
|
new_segment = segment |
|
|
|
yield new_segment |
|
|
|
def support(self, collar = 0.): |
|
return Timeline(segments=self.support_iter(collar), uri=self.uri) |
|
|
|
def duration(self): |
|
return sum(s.duration for s in self.support_iter()) |
|
|
|
def gaps_iter(self, support = None): |
|
if support is None: support = self.extent() |
|
if not isinstance(support, (Segment, Timeline)): raise TypeError |
|
|
|
if isinstance(support, Segment): |
|
end = support.start |
|
|
|
for segment in self.crop(support, mode='intersection').support(): |
|
gap = Segment(start=end, end=segment.start) |
|
if gap: yield gap |
|
|
|
end = segment.end |
|
|
|
gap = Segment(start=end, end=support.end) |
|
if gap: yield gap |
|
elif isinstance(support, Timeline): |
|
for segment in support.support(): |
|
for gap in self.gaps_iter(support=segment): |
|
yield gap |
|
|
|
def gaps(self, support = None): |
|
return Timeline(segments=self.gaps_iter(support=support), uri=self.uri) |
|
|
|
def segmentation(self): |
|
support = self.support() |
|
timestamps = set([]) |
|
|
|
for (start, end) in self: |
|
timestamps.add(start) |
|
timestamps.add(end) |
|
|
|
timestamps = sorted(timestamps) |
|
if len(timestamps) == 0: return Timeline(uri=self.uri) |
|
|
|
segments = [] |
|
start = timestamps[0] |
|
|
|
for end in timestamps[1:]: |
|
segment = Segment(start=start, end=end) |
|
|
|
if segment and support.overlapping(segment.middle): segments.append(segment) |
|
start = end |
|
|
|
return Timeline(segments=segments, uri=self.uri) |
|
|
|
def _iter_uem(self): |
|
uri = self.uri if self.uri else "<NA>" |
|
|
|
for segment in self: |
|
yield f"{uri} 1 {segment.start:.3f} {segment.end:.3f}\n" |
|
|
|
def to_uem(self): |
|
return "".join([line for line in self._iter_uem()]) |
|
|
|
def write_uem(self, file): |
|
for line in self._iter_uem(): |
|
file.write(line) |
|
|
|
def _repr_png_(self): |
|
return None |
|
|
|
class Segment: |
|
def __init__(self, start, end): |
|
self.start = start |
|
self.end = end |
|
|
|
@staticmethod |
|
def set_precision(ndigits = None): |
|
global AUTO_ROUND_TIME, SEGMENT_PRECISION |
|
|
|
if ndigits is None: |
|
AUTO_ROUND_TIME = False |
|
SEGMENT_PRECISION = 1e-6 |
|
else: |
|
AUTO_ROUND_TIME = True |
|
SEGMENT_PRECISION = 10 ** (-ndigits) |
|
|
|
def __bool__(self): |
|
return bool((self.end - self.start) > SEGMENT_PRECISION) |
|
|
|
def __post_init__(self): |
|
if AUTO_ROUND_TIME: |
|
object.__setattr__(self, 'start', int(self.start / SEGMENT_PRECISION + 0.5) * SEGMENT_PRECISION) |
|
object.__setattr__(self, 'end', int(self.end / SEGMENT_PRECISION + 0.5) * SEGMENT_PRECISION) |
|
|
|
@property |
|
def duration(self): |
|
return self.end - self.start if self else 0. |
|
|
|
@property |
|
def middle(self): |
|
return .5 * (self.start + self.end) |
|
|
|
def __iter__(self): |
|
yield self.start |
|
yield self.end |
|
|
|
def copy(self): |
|
return Segment(start=self.start, end=self.end) |
|
|
|
def __contains__(self, other): |
|
return (self.start <= other.start) and (self.end >= other.end) |
|
|
|
def __and__(self, other): |
|
return Segment(start=max(self.start, other.start), end=min(self.end, other.end)) |
|
|
|
def intersects(self, other): |
|
return (self.start < other.start and other.start < self.end - SEGMENT_PRECISION) or (self.start > other.start and self.start < other.end - SEGMENT_PRECISION) or (self.start == other.start) |
|
|
|
def overlaps(self, t): |
|
return self.start <= t and self.end >= t |
|
|
|
def __or__(self, other): |
|
if not self: return other |
|
if not other: return self |
|
|
|
return Segment(start=min(self.start, other.start), end=max(self.end, other.end)) |
|
|
|
def __xor__(self, other): |
|
if (not self) or (not other): raise ValueError |
|
|
|
return Segment(start=min(self.end, other.end), end=max(self.start, other.start)) |
|
|
|
def _str_helper(self, seconds): |
|
from datetime import timedelta |
|
|
|
negative = seconds < 0 |
|
td = timedelta(seconds=abs(seconds)) |
|
|
|
hours, remainder = divmod(td.seconds + 86400 * td.days, 3600) |
|
minutes, seconds = divmod(remainder, 60) |
|
|
|
return '%s%02d:%02d:%02d.%03d' % ('-' if negative else ' ', hours, minutes, seconds, td.microseconds / 1000) |
|
|
|
def __str__(self): |
|
if self: return '[%s --> %s]' % (self._str_helper(self.start), self._str_helper(self.end)) |
|
return '[]' |
|
|
|
def __repr__(self): |
|
return '<Segment(%g, %g)>' % (self.start, self.end) |
|
|
|
def _repr_png_(self): |
|
return None |
|
|
|
class SlidingWindow: |
|
def __init__(self, duration=0.030, step=0.010, start=0.000, end=None): |
|
if duration <= 0: raise ValueError |
|
self.__duration = duration |
|
if step <= 0: raise ValueError |
|
|
|
self.__step = step |
|
self.__start = start |
|
|
|
if end is None: self.__end = np.inf |
|
else: |
|
if end <= start: raise ValueError |
|
self.__end = end |
|
|
|
self.__i = -1 |
|
|
|
@property |
|
def start(self): |
|
return self.__start |
|
|
|
@property |
|
def end(self): |
|
return self.__end |
|
|
|
@property |
|
def step(self): |
|
return self.__step |
|
|
|
@property |
|
def duration(self): |
|
return self.__duration |
|
|
|
def closest_frame(self, t): |
|
return int(np.rint((t - self.__start - .5 * self.__duration) / self.__step)) |
|
|
|
def samples(self, from_duration, mode = 'strict'): |
|
if mode == 'strict': return int(np.floor((from_duration - self.duration) / self.step)) + 1 |
|
elif mode == 'loose': return int(np.floor((from_duration + self.duration) / self.step)) |
|
elif mode == 'center': return int(np.rint((from_duration / self.step))) |
|
|
|
def crop(self, focus, mode = 'loose', fixed = None, return_ranges = False): |
|
if not isinstance(focus, (Segment, Timeline)): raise TypeError |
|
|
|
if isinstance(focus, Timeline): |
|
if fixed is not None: raise ValueError |
|
|
|
if return_ranges: |
|
ranges = [] |
|
|
|
for i, s in enumerate(focus.support()): |
|
rng = self.crop(s, mode=mode, fixed=fixed, return_ranges=True) |
|
|
|
if i == 0 or rng[0][0] > ranges[-1][1]: ranges += rng |
|
else: ranges[-1][1] = rng[0][1] |
|
|
|
return ranges |
|
|
|
return np.unique(np.hstack([self.crop(s, mode=mode, fixed=fixed, return_ranges=False) for s in focus.support()])) |
|
|
|
if mode == 'loose': |
|
i = int(np.ceil((focus.start - self.duration - self.start) / self.step)) |
|
|
|
if fixed is None: |
|
j = int(np.floor((focus.end - self.start) / self.step)) |
|
rng = (i, j + 1) |
|
else: |
|
n = self.samples(fixed, mode='loose') |
|
rng = (i, i + n) |
|
elif mode == 'strict': |
|
i = int(np.ceil((focus.start - self.start) / self.step)) |
|
|
|
if fixed is None: |
|
j = int(np.floor((focus.end - self.duration - self.start) / self.step)) |
|
rng = (i, j + 1) |
|
else: |
|
n = self.samples(fixed, mode='strict') |
|
rng = (i, i + n) |
|
elif mode == 'center': |
|
i = self.closest_frame(focus.start) |
|
|
|
if fixed is None: |
|
j = self.closest_frame(focus.end) |
|
rng = (i, j + 1) |
|
else: |
|
n = self.samples(fixed, mode='center') |
|
rng = (i, i + n) |
|
else: raise ValueError |
|
|
|
if return_ranges: return [list(rng)] |
|
return np.array(range(*rng), dtype=np.int64) |
|
|
|
def segmentToRange(self, segment): |
|
return self.segment_to_range(segment) |
|
|
|
def segment_to_range(self, segment): |
|
return self.closest_frame(segment.start), int(segment.duration / self.step) + 1 |
|
|
|
def rangeToSegment(self, i0, n): |
|
return self.range_to_segment(i0, n) |
|
|
|
def range_to_segment(self, i0, n): |
|
start = self.__start + (i0 - .5) * self.__step + .5 * self.__duration |
|
|
|
if i0 == 0: start = self.start |
|
return Segment(start, start + (n * self.__step)) |
|
|
|
def samplesToDuration(self, nSamples): |
|
return self.samples_to_duration(nSamples) |
|
|
|
def samples_to_duration(self, n_samples): |
|
return self.range_to_segment(0, n_samples).duration |
|
|
|
def durationToSamples(self, duration): |
|
return self.duration_to_samples(duration) |
|
|
|
def duration_to_samples(self, duration): |
|
return self.segment_to_range(Segment(0, duration))[1] |
|
|
|
def __getitem__(self, i): |
|
start = self.__start + i * self.__step |
|
if start >= self.__end: return None |
|
|
|
return Segment(start=start, end=start + self.__duration) |
|
|
|
def next(self): |
|
return self.__next__() |
|
|
|
def __next__(self): |
|
self.__i += 1 |
|
window = self[self.__i] |
|
|
|
if window: return window |
|
else: raise StopIteration() |
|
|
|
def __iter__(self): |
|
self.__i = -1 |
|
return self |
|
|
|
def __len__(self): |
|
if np.isinf(self.__end): raise ValueError |
|
i = self.closest_frame(self.__end) |
|
|
|
while (self[i]): |
|
i += 1 |
|
|
|
length = i |
|
return length |
|
|
|
def copy(self): |
|
return self.__class__(duration=self.duration, step=self.step, start=self.start, end=self.end) |
|
|
|
def __call__(self, support, align_last = False): |
|
if isinstance(support, Timeline): segments = support |
|
elif isinstance(support, Segment): segments = Timeline(segments=[support]) |
|
else: raise TypeError |
|
|
|
for segment in segments: |
|
if segment.duration < self.duration: continue |
|
|
|
for s in SlidingWindow(duration=self.duration, step=self.step, start=segment.start, end=segment.end): |
|
if s in segment: |
|
yield s |
|
last = s |
|
|
|
if align_last and last.end < segment.end: yield Segment(start=segment.end - self.duration, end=segment.end) |