cab / lib /streams /pts_validation.py
docs4you's picture
Upload 487 files
27867f1 verified
"""
MIT License
Copyright (C) 2023 ROCKY4546
https://github.com/rocky4546
This file is part of Cabernet
Permission is hereby granted, free of charge, to any person obtaining a copy of this software
and associated documentation files (the “Software”), to deal in the Software without restriction,
including without limitation the rights to use, copy, modify, merge, publish, distribute,
sublicense, and/or sell copies of the Software, and to permit persons to whom the Software
is furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all copies or
substantial portions of the Software.
"""
import json
import logging
import subprocess
import lib.common.utils as utils
class PTSValidation:
logger = None
def __init__(self, _config, _channel_dict):
self.ffmpeg_proc = None
self.last_refresh = None
self.buffer_prev_time = None
self.small_pkt_streaming = False
self.block_max_pts = 0
self.block_prev_pts = 0
self.prev_last_pts = 0
self.default_duration = 0
self.block_moving_avg = 0
self.channel_dict = _channel_dict
self.write_buffer = None
self.stream_queue = None
self.config = _config
self.pts_json = None
if PTSValidation.logger is None:
PTSValidation.logger = logging.getLogger(__name__)
self.config_section = utils.instance_config_section(
self.channel_dict['namespace'], self.channel_dict['instance'])
def check_pts(self, _video):
"""
Checks the PTS in the video stream. If a bad PTS packet is found,
it will update the video stream until the stream is valid.
returns a dict containing 3 values
byteoffset (if >0, then write the offset before continuing)
refresh_stream (if True, then refresh the stream)
reread_buffer (if True, then drop current video.data and re-read buffer)
The items should be processed in the order listed above
"""
self.pts_json = self.get_probe_results(_video)
if self.pts_json is None:
return {'refresh_stream': False, 'byteoffset': 0, 'reread_buffer': False}
pkt_len = self.check_for_video_pkts()
if pkt_len < 1:
return {'refresh_stream': False, 'byteoffset': 0, 'reread_buffer': False}
pts_data = self.get_pts_values(self.pts_json)
if pts_data is None:
return {'refresh_stream': False, 'byteoffset': 0, 'reread_buffer': False}
pts_minimum = int(self.config[self.config_section]['player-pts_minimum'])
if pts_data['first_pts'] < pts_minimum:
if pts_data['last_pts'] < pts_minimum:
self.logger.debug('Small PTS for entire stream, drop and refresh buffer')
return {'refresh_stream': False, 'byteoffset': 0, 'reread_buffer': True}
elif pts_data['last_pts'] <= self.prev_last_pts:
self.logger.debug('Small PTS to Large PTS with entire PTS in the past. last_pts={} vs prev={}'
.format(pts_data['last_pts'], self.prev_last_pts))
return {'refresh_stream': False, 'byteoffset': 0, 'reread_buffer': True}
else:
byte_offset = self.find_bad_pkt_offset(from_front=False)
if byte_offset > 0:
self.logger.debug('{} {}{}'.format(
'Small bad PTS on front with good large PTS on end.',
'Writing good bytes=', byte_offset))
return {'refresh_stream': False, 'byteoffset': -byte_offset, 'reread_buffer': True}
else:
self.logger.debug('RARE CASE: Large delta but no bad PTS ... unknown case, ignore')
self.prev_last_pts = pts_data['last_pts']
return {'refresh_stream': False, 'byteoffset': 0, 'reread_buffer': False}
elif pts_data['last_pts'] < pts_minimum:
self.logger.debug('RARE CASE: Large PTS on front with small PTS on end.')
return {'refresh_stream': True, 'byteoffset': 0, 'reread_buffer': False}
elif pts_data['delta_from_prev'] > \
int(self.config[self.config_section]['player-pts_max_delta']):
self.logger.debug('{} {}{}'.format(
'Large delta PTS between reads. Refreshing Stream',
'DELTA=', pts_data['delta_from_prev']))
return {'refresh_stream': True, 'byteoffset': 0, 'reread_buffer': False}
elif pts_data['pts_size'] > \
int(self.config[self.config_section]['player-pts_max_delta']):
byte_offset = self.find_bad_pkt_offset(from_front=True)
if byte_offset > 0:
self.logger.debug('{} {}{}'.format(
'Large delta PTS with good front.',
'Writing good bytes=', byte_offset))
return {'refresh_stream': True, 'byteoffset': byte_offset, 'reread_buffer': False}
else:
self.logger.debug('RARE CASE: Large delta but no bad PTS ... unknown case, ignore')
self.prev_last_pts = pts_data['last_pts']
return {'refresh_stream': False, 'byteoffset': 0, 'reread_buffer': False}
elif pts_data['first_pts'] < self.prev_last_pts:
if pts_data['last_pts'] <= self.prev_last_pts:
self.logger.debug('Entire PTS buffer in the past last_pts={} vs prev={}'.format(pts_data['last_pts'],
self.prev_last_pts))
return {'refresh_stream': False, 'byteoffset': 0, 'reread_buffer': True}
else:
byte_offset = self.find_past_pkt_offset(self.prev_last_pts)
self.logger.debug('{} {}{} {}'.format(
'PTS buffer in the past.',
' Writing end bytes from offset=', byte_offset,
'out to client'))
if byte_offset < 0:
return {'refresh_stream': False, 'byteoffset': 0, 'reread_buffer': True}
else:
self.prev_last_pts = pts_data['last_pts']
return {'refresh_stream': False, 'byteoffset': -byte_offset, 'reread_buffer': True}
else:
self.prev_last_pts = pts_data['last_pts']
return {'refresh_stream': False, 'byteoffset': 0, 'reread_buffer': False}
def check_for_video_pkts(self):
try:
pkt_len = len(self.pts_json['packets'])
except KeyError:
pkt_len = 0
self.logger.debug('Packet received with no video packet included')
return pkt_len
def get_pts_values(self, _pts_json):
try:
first_pts = _pts_json['packets'][0]['pts']
if self.prev_last_pts == 0:
delta_from_prev = 0
else:
delta_from_prev = first_pts - self.prev_last_pts
end_of_json = len(self.pts_json['packets']) - 1
if 'duration' in self.pts_json['packets'][end_of_json]:
dur = self.pts_json['packets'][end_of_json]['duration']
self.default_duration = dur
else:
dur = self.default_duration
last_pts = self.pts_json['packets'][end_of_json]['pts'] + dur
except KeyError:
self.logger.info('KeyError exception: no pts in first or last packet, ignore')
return None
pts_size = abs(last_pts - first_pts)
self.logger.debug('{}{} {}{} {}{} {}{} {}{}'.format(
'First PTS=', first_pts,
'Last PTS=', last_pts,
'PTS SIZE=', pts_size,
'DELTA PTS=', delta_from_prev,
'Pkts Rcvd=', len(_pts_json['packets'])))
return {'first_pts': first_pts, 'last_pts': last_pts,
'pts_size': pts_size, 'delta_from_prev': delta_from_prev}
def find_bad_pkt_offset(self, from_front):
"""
Determine where in the stream the pts diverges
"""
num_of_pkts = len(self.pts_json['packets']) - 1 # index from 0 to len - 1
i = 1
prev_pkt_pts = self.pts_json['packets'][0]['pts']
byte_offset = -1
size = 0
while i < num_of_pkts:
next_pkt_pts = self.pts_json['packets'][i]['pts']
if size == 0 and 'size' in self.pts_json['packets'][i]:
size = int(self.pts_json['packets'][i]['size'])
if abs(next_pkt_pts - prev_pkt_pts) \
> int(self.config[self.config_section]['player-pts_max_delta']):
# found place where bad packets start
# only video codecs have byte position info
if from_front:
pts = prev_pkt_pts
byte_offset = int((int(self.pts_json['packets'][i - 1]['pos']) + size) / 188) * 188
self.prev_last_pts = pts
else:
pts = next_pkt_pts
byte_offset = int((int(self.pts_json['packets'][i]['pos']) - 1) / 188) * 188
self.prev_last_pts = self.pts_json['packets'][num_of_pkts]['pts']
self.logger.debug('Middle PTS {} byte_offset={}'.format(pts, byte_offset))
break
i += 1
prev_pkt_pts = next_pkt_pts
return byte_offset
def find_past_pkt_offset(self, prev_last_pts):
num_of_pkts = len(self.pts_json['packets']) - 1 # index from 0 to len - 1
next_pkt_pts = 0
i = 0
byte_offset = -1
while i < num_of_pkts:
prev_pkt_dts = next_pkt_pts
next_pkt_pts = self.pts_json['packets'][i]['pts']
if next_pkt_pts >= prev_last_pts - 2:
# found place where future packets start
# only video codecs have byte position info
byte_offset = int(int(self.pts_json['packets'][i]['pos']) / 188) * 188
self.logger.debug(
'{}{} {}{} {}{}'.format('Future PTS at byte_offset=', byte_offset, 'pkt_pts=', next_pkt_pts,
'prev_pkt=', prev_pkt_dts))
break
i += 1
return byte_offset
def get_probe_results(self, _video):
ffprobe_command = [self.config['paths']['ffprobe_path'],
'-print_format', 'json',
'-v', 'quiet', '-show_packets',
'-select_streams', 'v:0',
'-show_entries', 'side_data=:packet=pts,pos,duration,size',
'-']
cmdpts = subprocess.Popen(ffprobe_command,
stdin=subprocess.PIPE, stdout=subprocess.PIPE)
ptsout = cmdpts.communicate(_video.data)[0]
exit_code = cmdpts.wait()
if exit_code != 0:
self.logger.warning('FFPROBE failed to execute with error code: {}'
.format(exit_code))
return None
return json.loads(ptsout)