""" MIT License Copyright (C) 2023 ROCKY4546 https://github.com/rocky4546 This file is part of Cabernet Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. """ import errno import subprocess import time import lib.common.exceptions as exceptions from lib.clients.web_handler import WebHTTPHandler from lib.streams.video import Video from lib.db.db_config_defn import DBConfigDefn from .stream import Stream from .stream_queue import StreamQueue from .pts_validation import PTSValidation IDLE_TIMER = 20 # Duration for no video causing a refresh MAX_IDLE_TIMER = 120 # duration for no video causing stream termination class StreamlinkProxy(Stream): def __init__(self, _plugins, _hdhr_queue): self.streamlink_proc = None self.last_refresh = None self.block_prev_time = None self.buffer_prev_time = None self.small_pkt_streaming = False self.block_max_pts = 0 self.block_prev_pts = 0 self.prev_last_pts = 0 self.default_duration = 0 self.block_moving_avg = 0 self.channel_dict = None self.write_buffer = None self.stream_queue = None self.pts_validation = None self.tuner_no = -1 super().__init__(_plugins, _hdhr_queue) self.db_configdefn = DBConfigDefn(self.config) self.video = Video(self.config) def update_tuner_status(self, _status): ch_num = self.channel_dict['display_number'] namespace = self.channel_dict['namespace'] scan_list = WebHTTPHandler.rmg_station_scans[namespace] tuner = scan_list[self.tuner_no] if type(tuner) == dict and tuner['ch'] == ch_num: WebHTTPHandler.rmg_station_scans[namespace][self.tuner_no]['status'] = _status def stream(self, _channel_dict, _write_buffer, _tuner_no): global MAX_IDLE_TIMER self.logger.info('Using streamlink_proxy for channel {}'.format(_channel_dict['uid'])) self.tuner_no = _tuner_no self.channel_dict = _channel_dict self.write_buffer = _write_buffer self.config = self.db_configdefn.get_config() MAX_IDLE_TIMER = self.config[self.namespace.lower()]['stream-g_stream_timeout'] self.pts_validation = PTSValidation(self.config, self.channel_dict) channel_uri = self.get_stream_uri(self.channel_dict) if not channel_uri: self.logger.warning('Unknown Channel {}'.format(_channel_dict['uid'])) return self.streamlink_proc = self.open_streamlink_proc(channel_uri) if not self.streamlink_proc: return time.sleep(0.01) self.last_refresh = time.time() self.block_prev_time = self.last_refresh self.buffer_prev_time = self.last_refresh try: self.read_buffer() except exceptions.CabernetException as ex: self.logger.info(str(ex)) return while True: if not self.video.data: self.logger.info( '1 No Video Data, refreshing stream {} {}' .format(_channel_dict['uid'], self.streamlink_proc.pid)) self.streamlink_proc = self.refresh_stream() else: try: self.validate_stream() self.update_tuner_status('Streaming') start_ttw = time.time() self.write_buffer.write(self.video.data) delta_ttw = time.time() - start_ttw self.logger.info( 'Serving {} {} ({}B) ttw:{:.2f}s' .format(self.streamlink_proc.pid, _channel_dict['uid'], len(self.video.data), delta_ttw)) except IOError as e: if e.errno in [errno.EPIPE, errno.ECONNABORTED, errno.ECONNRESET, errno.ECONNREFUSED]: self.logger.info('1. Connection dropped by end device {}'.format(self.streamlink_proc.pid)) break else: self.logger.error('{}{}'.format( '1 UNEXPECTED EXCEPTION=', e)) raise try: self.read_buffer() except exceptions.CabernetException as ex: self.logger.info('{} {}'.format(ex, self.streamlink_proc.pid)) break except Exception as e: self.logger.error('{}{}'.format( '2 UNEXPECTED EXCEPTION=', e)) break self.terminate_stream() def validate_stream(self): if not self.config[self.config_section]['player-enable_pts_filter']: return has_changed = True while has_changed: has_changed = False results = self.pts_validation.check_pts(self.video) if results['byteoffset'] != 0: if results['byteoffset'] < 0: self.write_buffer.write(self.video.data[-results['byteoffset']:len(self.video.data) - 1]) else: self.write_buffer.write(self.video.data[0:results['byteoffset']]) has_changed = True if results['refresh_stream']: self.streamlink_proc = self.refresh_stream() self.read_buffer() has_changed = True if results['reread_buffer']: self.read_buffer() has_changed = True return def read_buffer(self): global MAX_IDLE_TIMER global IDLE_TIMER data_found = False self.video.data = None idle_timer = MAX_IDLE_TIMER # time slice segments are less than 10 seconds while not data_found: self.video.data = self.stream_queue.read() if self.video.data: data_found = True else: if self.stream_queue.is_terminated: raise exceptions.CabernetException('Streamlink Terminated, exiting stream {}'.format(self.streamlink_proc.pid)) time.sleep(1) idle_timer -= 1 if idle_timer % IDLE_TIMER == 0: self.logger.info( '2 No Video Data, refreshing stream {}' .format(self.streamlink_proc.pid)) self.streamlink_proc = self.refresh_stream() if idle_timer < 1: idle_timer = MAX_IDLE_TIMER # time slice segments are less than 10 seconds self.logger.info( 'No Video Data, terminating stream {}' .format(self.streamlink_proc.pid)) time.sleep(15) self.streamlink_proc = self.terminate_stream() raise exceptions.CabernetException('Unable to get video stream, terminating') elif int(MAX_IDLE_TIMER / 2) == idle_timer: self.update_tuner_status('No Reply') return def terminate_stream(self): self.logger.debug('Terminating streamlink stream {}'.format(self.streamlink_proc.pid)) while True: try: self.streamlink_proc.terminate() self.streamlink_proc.wait(timeout=1.5) break except ValueError: pass except subprocess.TimeoutExpired: time.sleep(0.01) def refresh_stream(self): self.last_refresh = time.time() channel_uri = self.get_stream_uri(self.channel_dict) self.terminate_stream() self.logger.debug('{}{}'.format( 'Refresh Stream channelUri=', channel_uri)) streamlink_process = self.open_streamlink_proc(channel_uri) # make sure the previous streamlink is terminated before exiting self.buffer_prev_time = time.time() return streamlink_process def open_streamlink_proc(self, _channel_uri): """ streamlink drops the first 9 frame/video packets when the program starts. this means everytime a refresh occurs, 9 frames will be dropped. This is visible by looking at the video packets for a 6 second window being 171 instead of 180. Following the first read, the packets increase to 180. """ header = self.channel_dict['json'].get('Header') str_array = [] llevel = self.config['handler_loghandler']['level'] if llevel == 'DEBUG': sl_llevel = 'trace' elif llevel == 'INFO': sl_llevel = 'info' elif llevel == 'NOTICE': sl_llevel = 'warning' elif llevel == 'WARNING': sl_llevel = 'error' else: sl_llevel = 'none' if header: for key, value in header.items(): str_array.append('--http-header') str_array.append(key + '=' + value) if key == 'Referer': self.logger.debug('Using HTTP Referer: {} Channel: {}'.format(value, self.channel_dict['uid'])) uri = '{}'.format(_channel_uri) streamlink_command = [ self.config['paths']['streamlink_path'], '--stdout', '--loglevel', sl_llevel, '--ffmpeg-fout', 'mpegts', '--stream-segment-attempts', '2', '--stream-segment-timeout', '5', uri, '720,best' ] streamlink_command.extend(str_array) try: streamlink_process = subprocess.Popen( streamlink_command, stdout=subprocess.PIPE, bufsize=-1) except: self.logger.error('Streamlink Binary Not Found: {}'.format(self.config['paths']['streamlink_path'])) return self.stream_queue = StreamQueue(188, streamlink_process, self.channel_dict['uid']) time.sleep(0.1) return streamlink_process