|
""" |
|
MIT License |
|
|
|
Copyright (C) 2023 ROCKY4546 |
|
https://github.com/rocky4546 |
|
|
|
This file is part of Cabernet |
|
|
|
Permission is hereby granted, free of charge, to any person obtaining a copy of this software |
|
and associated documentation files (the “Software”), to deal in the Software without restriction, |
|
including without limitation the rights to use, copy, modify, merge, publish, distribute, |
|
sublicense, and/or sell copies of the Software, and to permit persons to whom the Software |
|
is furnished to do so, subject to the following conditions: |
|
|
|
The above copyright notice and this permission notice shall be included in all copies or |
|
substantial portions of the Software. |
|
""" |
|
|
|
import json |
|
import logging |
|
import subprocess |
|
|
|
import lib.common.utils as utils |
|
|
|
|
|
class PTSValidation: |
|
logger = None |
|
|
|
def __init__(self, _config, _channel_dict): |
|
self.ffmpeg_proc = None |
|
self.last_refresh = None |
|
self.buffer_prev_time = None |
|
self.small_pkt_streaming = False |
|
self.block_max_pts = 0 |
|
self.block_prev_pts = 0 |
|
self.prev_last_pts = 0 |
|
self.default_duration = 0 |
|
self.block_moving_avg = 0 |
|
self.channel_dict = _channel_dict |
|
self.write_buffer = None |
|
self.stream_queue = None |
|
self.config = _config |
|
self.pts_json = None |
|
if PTSValidation.logger is None: |
|
PTSValidation.logger = logging.getLogger(__name__) |
|
self.config_section = utils.instance_config_section( |
|
self.channel_dict['namespace'], self.channel_dict['instance']) |
|
|
|
def check_pts(self, _video): |
|
""" |
|
Checks the PTS in the video stream. If a bad PTS packet is found, |
|
it will update the video stream until the stream is valid. |
|
returns a dict containing 3 values |
|
byteoffset (if >0, then write the offset before continuing) |
|
refresh_stream (if True, then refresh the stream) |
|
reread_buffer (if True, then drop current video.data and re-read buffer) |
|
The items should be processed in the order listed above |
|
""" |
|
self.pts_json = self.get_probe_results(_video) |
|
if self.pts_json is None: |
|
return {'refresh_stream': False, 'byteoffset': 0, 'reread_buffer': False} |
|
pkt_len = self.check_for_video_pkts() |
|
if pkt_len < 1: |
|
return {'refresh_stream': False, 'byteoffset': 0, 'reread_buffer': False} |
|
pts_data = self.get_pts_values(self.pts_json) |
|
if pts_data is None: |
|
return {'refresh_stream': False, 'byteoffset': 0, 'reread_buffer': False} |
|
|
|
pts_minimum = int(self.config[self.config_section]['player-pts_minimum']) |
|
if pts_data['first_pts'] < pts_minimum: |
|
if pts_data['last_pts'] < pts_minimum: |
|
self.logger.debug('Small PTS for entire stream, drop and refresh buffer') |
|
return {'refresh_stream': False, 'byteoffset': 0, 'reread_buffer': True} |
|
elif pts_data['last_pts'] <= self.prev_last_pts: |
|
self.logger.debug('Small PTS to Large PTS with entire PTS in the past. last_pts={} vs prev={}' |
|
.format(pts_data['last_pts'], self.prev_last_pts)) |
|
return {'refresh_stream': False, 'byteoffset': 0, 'reread_buffer': True} |
|
else: |
|
byte_offset = self.find_bad_pkt_offset(from_front=False) |
|
if byte_offset > 0: |
|
self.logger.debug('{} {}{}'.format( |
|
'Small bad PTS on front with good large PTS on end.', |
|
'Writing good bytes=', byte_offset)) |
|
return {'refresh_stream': False, 'byteoffset': -byte_offset, 'reread_buffer': True} |
|
else: |
|
self.logger.debug('RARE CASE: Large delta but no bad PTS ... unknown case, ignore') |
|
self.prev_last_pts = pts_data['last_pts'] |
|
return {'refresh_stream': False, 'byteoffset': 0, 'reread_buffer': False} |
|
elif pts_data['last_pts'] < pts_minimum: |
|
self.logger.debug('RARE CASE: Large PTS on front with small PTS on end.') |
|
return {'refresh_stream': True, 'byteoffset': 0, 'reread_buffer': False} |
|
elif pts_data['delta_from_prev'] > \ |
|
int(self.config[self.config_section]['player-pts_max_delta']): |
|
self.logger.debug('{} {}{}'.format( |
|
'Large delta PTS between reads. Refreshing Stream', |
|
'DELTA=', pts_data['delta_from_prev'])) |
|
return {'refresh_stream': True, 'byteoffset': 0, 'reread_buffer': False} |
|
elif pts_data['pts_size'] > \ |
|
int(self.config[self.config_section]['player-pts_max_delta']): |
|
byte_offset = self.find_bad_pkt_offset(from_front=True) |
|
if byte_offset > 0: |
|
self.logger.debug('{} {}{}'.format( |
|
'Large delta PTS with good front.', |
|
'Writing good bytes=', byte_offset)) |
|
return {'refresh_stream': True, 'byteoffset': byte_offset, 'reread_buffer': False} |
|
else: |
|
self.logger.debug('RARE CASE: Large delta but no bad PTS ... unknown case, ignore') |
|
self.prev_last_pts = pts_data['last_pts'] |
|
return {'refresh_stream': False, 'byteoffset': 0, 'reread_buffer': False} |
|
|
|
elif pts_data['first_pts'] < self.prev_last_pts: |
|
if pts_data['last_pts'] <= self.prev_last_pts: |
|
self.logger.debug('Entire PTS buffer in the past last_pts={} vs prev={}'.format(pts_data['last_pts'], |
|
self.prev_last_pts)) |
|
return {'refresh_stream': False, 'byteoffset': 0, 'reread_buffer': True} |
|
else: |
|
byte_offset = self.find_past_pkt_offset(self.prev_last_pts) |
|
self.logger.debug('{} {}{} {}'.format( |
|
'PTS buffer in the past.', |
|
' Writing end bytes from offset=', byte_offset, |
|
'out to client')) |
|
if byte_offset < 0: |
|
return {'refresh_stream': False, 'byteoffset': 0, 'reread_buffer': True} |
|
else: |
|
self.prev_last_pts = pts_data['last_pts'] |
|
return {'refresh_stream': False, 'byteoffset': -byte_offset, 'reread_buffer': True} |
|
else: |
|
self.prev_last_pts = pts_data['last_pts'] |
|
return {'refresh_stream': False, 'byteoffset': 0, 'reread_buffer': False} |
|
|
|
def check_for_video_pkts(self): |
|
try: |
|
pkt_len = len(self.pts_json['packets']) |
|
except KeyError: |
|
pkt_len = 0 |
|
self.logger.debug('Packet received with no video packet included') |
|
return pkt_len |
|
|
|
def get_pts_values(self, _pts_json): |
|
try: |
|
first_pts = _pts_json['packets'][0]['pts'] |
|
if self.prev_last_pts == 0: |
|
delta_from_prev = 0 |
|
else: |
|
delta_from_prev = first_pts - self.prev_last_pts |
|
end_of_json = len(self.pts_json['packets']) - 1 |
|
if 'duration' in self.pts_json['packets'][end_of_json]: |
|
dur = self.pts_json['packets'][end_of_json]['duration'] |
|
self.default_duration = dur |
|
else: |
|
dur = self.default_duration |
|
last_pts = self.pts_json['packets'][end_of_json]['pts'] + dur |
|
except KeyError: |
|
self.logger.info('KeyError exception: no pts in first or last packet, ignore') |
|
return None |
|
pts_size = abs(last_pts - first_pts) |
|
self.logger.debug('{}{} {}{} {}{} {}{} {}{}'.format( |
|
'First PTS=', first_pts, |
|
'Last PTS=', last_pts, |
|
'PTS SIZE=', pts_size, |
|
'DELTA PTS=', delta_from_prev, |
|
'Pkts Rcvd=', len(_pts_json['packets']))) |
|
return {'first_pts': first_pts, 'last_pts': last_pts, |
|
'pts_size': pts_size, 'delta_from_prev': delta_from_prev} |
|
|
|
def find_bad_pkt_offset(self, from_front): |
|
""" |
|
Determine where in the stream the pts diverges |
|
""" |
|
num_of_pkts = len(self.pts_json['packets']) - 1 |
|
i = 1 |
|
prev_pkt_pts = self.pts_json['packets'][0]['pts'] |
|
byte_offset = -1 |
|
size = 0 |
|
while i < num_of_pkts: |
|
next_pkt_pts = self.pts_json['packets'][i]['pts'] |
|
|
|
if size == 0 and 'size' in self.pts_json['packets'][i]: |
|
size = int(self.pts_json['packets'][i]['size']) |
|
if abs(next_pkt_pts - prev_pkt_pts) \ |
|
> int(self.config[self.config_section]['player-pts_max_delta']): |
|
|
|
|
|
if from_front: |
|
pts = prev_pkt_pts |
|
byte_offset = int((int(self.pts_json['packets'][i - 1]['pos']) + size) / 188) * 188 |
|
self.prev_last_pts = pts |
|
else: |
|
pts = next_pkt_pts |
|
byte_offset = int((int(self.pts_json['packets'][i]['pos']) - 1) / 188) * 188 |
|
self.prev_last_pts = self.pts_json['packets'][num_of_pkts]['pts'] |
|
self.logger.debug('Middle PTS {} byte_offset={}'.format(pts, byte_offset)) |
|
break |
|
|
|
i += 1 |
|
prev_pkt_pts = next_pkt_pts |
|
return byte_offset |
|
|
|
def find_past_pkt_offset(self, prev_last_pts): |
|
num_of_pkts = len(self.pts_json['packets']) - 1 |
|
next_pkt_pts = 0 |
|
i = 0 |
|
byte_offset = -1 |
|
while i < num_of_pkts: |
|
prev_pkt_dts = next_pkt_pts |
|
next_pkt_pts = self.pts_json['packets'][i]['pts'] |
|
if next_pkt_pts >= prev_last_pts - 2: |
|
|
|
|
|
byte_offset = int(int(self.pts_json['packets'][i]['pos']) / 188) * 188 |
|
self.logger.debug( |
|
'{}{} {}{} {}{}'.format('Future PTS at byte_offset=', byte_offset, 'pkt_pts=', next_pkt_pts, |
|
'prev_pkt=', prev_pkt_dts)) |
|
break |
|
i += 1 |
|
return byte_offset |
|
|
|
def get_probe_results(self, _video): |
|
ffprobe_command = [self.config['paths']['ffprobe_path'], |
|
'-print_format', 'json', |
|
'-v', 'quiet', '-show_packets', |
|
'-select_streams', 'v:0', |
|
'-show_entries', 'side_data=:packet=pts,pos,duration,size', |
|
'-'] |
|
cmdpts = subprocess.Popen(ffprobe_command, |
|
stdin=subprocess.PIPE, stdout=subprocess.PIPE) |
|
ptsout = cmdpts.communicate(_video.data)[0] |
|
exit_code = cmdpts.wait() |
|
if exit_code != 0: |
|
self.logger.warning('FFPROBE failed to execute with error code: {}' |
|
.format(exit_code)) |
|
return None |
|
return json.loads(ptsout) |
|
|