cab / lib /clients /web_tuner.py
docs4you's picture
Upload 487 files
27867f1 verified
"""
MIT License
Copyright (C) 2021 ROCKY4546
https://github.com/rocky4546
This file is part of Cabernet
Permission is hereby granted, free of charge, to any person obtaining a copy of this software
and associated documentation files (the "Software"), to deal in the Software without restriction,
including without limitation the rights to use, copy, modify, merge, publish, distribute,
sublicense, and/or sell copies of the Software, and to permit persons to whom the Software
is furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all copies or
substantial portions of the Software.
"""
import os
import json
import logging
import os
import pathlib
import signal
import threading
import time
import urllib
from threading import Thread
from logging import config
from http.server import HTTPServer
from urllib.parse import urlparse
from lib.common import utils
from lib.common.decorators import gettunerrequest
from lib.web.pages.templates import web_templates
from lib.db.db_config_defn import DBConfigDefn
from lib.streams.m3u8_redirect import M3U8Redirect
from lib.streams.internal_proxy import InternalProxy
from lib.streams.ffmpeg_proxy import FFMpegProxy
from lib.streams.streamlink_proxy import StreamlinkProxy
from lib.streams.thread_queue import ThreadQueue
from .web_handler import WebHTTPHandler
@gettunerrequest.route('/tunerstatus')
def tunerstatus(_webserver):
_webserver.do_mime_response(200, 'application/json', json.dumps(WebHTTPHandler.rmg_station_scans, cls=ObjectJsonEncoder))
@gettunerrequest.route('RE:/watch/.+')
def watch(_webserver):
sid = _webserver.content_path.replace('/watch/', '')
_webserver.do_tuning(sid, _webserver.query_data['name'], _webserver.query_data['instance'])
@gettunerrequest.route('/logreset')
def logreset(_webserver):
logging.config.fileConfig(fname=_webserver.config['paths']['config_file'],
disable_existing_loggers=False)
_webserver.do_mime_response(200, 'text/html')
@gettunerrequest.route('RE:/auto/v.+')
def autov(_webserver):
channel = _webserver.content_path.replace('/auto/v', '')
station_list = TunerHttpHandler.channels_db.get_channels(
_webserver.query_data['name'], _webserver.query_data['instance'])
# check channel number with adjustments
for station in station_list.keys():
updated_chnum = utils.wrap_chnum(
str(station_list[station][0]['display_number']), station_list[station][0]['namespace'],
station_list[station][0]['instance'], _webserver.config)
if updated_chnum == channel:
_webserver.do_tuning(station, _webserver.query_data['name'],
_webserver.query_data['instance'])
return
_webserver.do_mime_response(503, 'text/html', web_templates['htmlError'].format('503 - Unknown channel'))
class ObjectJsonEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, ThreadQueue):
return str(obj)
else:
return json.JSONEncoder.default(self.obj)
class TunerHttpHandler(WebHTTPHandler):
def __init__(self, *args):
os.chdir(os.path.dirname(os.path.abspath(__file__)))
self.script_dir = pathlib.Path(os.path.dirname(os.path.abspath(__file__)))
self.ffmpeg_proc = None # process for running ffmpeg
self.block_moving_avg = 0
self.last_refresh = None
self.block_prev_pts = 0
self.block_prev_time = None
self.buffer_prev_time = None
self.block_max_pts = 0
self.small_pkt_streaming = False
self.real_namespace = None
self.real_instance = None
self.content_path = None
self.query_data = None
self.m3u8_redirect = M3U8Redirect(TunerHttpHandler.plugins, TunerHttpHandler.hdhr_queue)
self.internal_proxy = InternalProxy(TunerHttpHandler.plugins, TunerHttpHandler.hdhr_queue)
self.ffmpeg_proxy = FFMpegProxy(TunerHttpHandler.plugins, TunerHttpHandler.hdhr_queue)
self.streamlink_proxy = StreamlinkProxy(TunerHttpHandler.plugins, TunerHttpHandler.hdhr_queue)
self.db_configdefn = DBConfigDefn(self.config)
try:
super().__init__(*args)
except ConnectionResetError as ex:
self.logger.warning(
'ConnectionResetError occurred, will try again {}'
.format(ex))
time.sleep(1)
super().__init__(*args)
except ValueError as ex:
self.logger.warning(
'ValueError occurred, Bad stream recieved. {}'
.format(ex))
raise
def do_GET(self):
try:
self.content_path, self.query_data = self.get_query_data()
if gettunerrequest.call_url(self, self.content_path):
pass
else:
self.logger.warning('Unknown request to {}'.format(self.content_path))
self.do_mime_response(501, 'text/html', web_templates['htmlError'].format('501 - Not Implemented'))
except Exception as ex:
self.logger.exception('{}{}'.format(
'UNEXPECTED EXCEPTION on GET=', ex))
def do_POST(self):
try:
self.content_path = self.path
self.query_data = {}
# get POST data
if self.headers.get('Content-Length') != '0':
post_data = self.rfile.read(int(self.headers.get('Content-Length'))).decode('utf-8')
# if an input is empty, then it will remove it from the list when the dict is gen
self.query_data = urllib.parse.parse_qs(post_data)
# get QUERYSTRING
if self.path.find('?') != -1:
get_data = self.path[(self.path.find('?') + 1):]
get_data_elements = get_data.split('&')
for get_data_item in get_data_elements:
get_data_item_split = get_data_item.split('=')
if len(get_data_item_split) > 1:
self.query_data[get_data_item_split[0]] = get_data_item_split[1]
self.do_mime_response(501, 'text/html', web_templates['htmlError'].format('501 - Badly Formatted Message'))
except Exception as ex:
self.logger.exception('{}{}'.format(
'UNEXPECTED EXCEPTION on POST=', ex))
def do_tuning(self, sid, _namespace, _instance):
# refresh the config data in case it changed in the web_admin process
self.plugins.config_obj.refresh_config_data()
self.config = self.db_configdefn.get_config()
self.plugins.config_obj.data = self.config
# try:
station_list = TunerHttpHandler.channels_db.get_channels(_namespace, _instance)
try:
self.real_namespace, self.real_instance, station_data = self.get_ns_inst_station(station_list[sid])
if not self.config[self.real_namespace.lower()]['enabled']:
self.logger.warning(
'Plugin is not enabled, ignoring request: {} sid:{}'
.format(self.real_namespace, sid))
self.do_mime_response(503, 'text/html', web_templates['htmlError'].format('503 - Plugin Disabled'))
return
if not self.plugins.plugins[self.real_namespace].plugin_obj:
self.logger.warning(
'Plugin not initialized, ignoring request: {}:{} sid:{}'
.format(self.real_namespace, self.real_instance, sid))
self.do_mime_response(503, 'text/html',
web_templates['htmlError'].format('503 - Plugin Not Initialized'))
return
section = self.plugins.plugins[self.real_namespace].plugin_obj.instances[self.real_instance].config_section
if not self.config[section]['enabled']:
self.logger.warning(
'Plugin Instance is not enabled, ignoring request: {}:{} sid:{}'
.format(self.real_namespace, self.real_instance, sid))
self.do_mime_response(503, 'text/html',
web_templates['htmlError'].format('503 - Plugin Instance Disabled'))
return
except (KeyError, TypeError):
self.logger.warning(
'Unknown Channel ID, not found in database {} {} {}'
.format(_namespace, _instance, sid))
self.do_mime_response(503, 'text/html', web_templates['htmlError'].format('503 - Unknown channel'))
return
self.logger.notice('{}:{} Tuning to channel {}'.format(self.real_namespace, self.real_instance, sid))
if self.config[section]['player-stream_type'] == 'm3u8redirect':
self.do_dict_response(self.m3u8_redirect.gen_m3u8_response(station_data))
return
elif self.config[section]['player-stream_type'] == 'internalproxy':
resp = self.internal_proxy.gen_response(
self.real_namespace, self.real_instance,
station_data['display_number'], station_data['json'].get('VOD'))
self.do_dict_response(resp)
if resp['tuner'] < 0:
return
else:
self.internal_proxy.stream(station_data, self.wfile, self.terminate_queue, resp['tuner'])
elif self.config[section]['player-stream_type'] == 'ffmpegproxy':
resp = self.ffmpeg_proxy.gen_response(
self.real_namespace, self.real_instance,
station_data['display_number'], station_data['json'].get('VOD'))
self.do_dict_response(resp)
if resp['tuner'] < 0:
return
else:
self.ffmpeg_proxy.stream(station_data, self.wfile, resp['tuner'])
elif self.config[section]['player-stream_type'] == 'streamlinkproxy':
resp = self.streamlink_proxy.gen_response(
self.real_namespace, self.real_instance,
station_data['display_number'], station_data['json'].get('VOD'))
self.do_dict_response(resp)
if resp['tuner'] < 0:
return
else:
self.streamlink_proxy.stream(station_data, self.wfile, resp['tuner'])
else:
self.do_mime_response(501, 'text/html', web_templates['htmlError'].format('501 - Unknown streamtype'))
self.logger.error('Unknown [player-stream_type] {}'
.format(self.config[section]['player-stream_type']))
return
station_scans = WebHTTPHandler.rmg_station_scans[self.real_namespace][resp['tuner']]
if station_scans != 'Idle':
if station_scans['mux'] is None or not station_scans['mux'].is_alive():
self.logger.notice('Provider Connection Closed, ch_id={} {}'.format(sid, threading.get_ident()))
WebHTTPHandler.rmg_station_scans[self.real_namespace][resp['tuner']] = 'Idle'
else:
self.logger.info('1 Client Connection Closed, provider continuing ch_id={} {}'.format(sid, threading.get_ident()))
else:
self.logger.info('2 Client Connection Closed, provider continuing ch_id={} {}'.format(sid, threading.get_ident()))
time.sleep(0.01)
def get_ns_inst_station(self, _station_data):
lowest_namespace = _station_data[0]['namespace']
lowest_instance = _station_data[0]['instance']
station = _station_data[0]
# do simple checks first.
# is there only one channel?
if len(_station_data) == 1:
return lowest_namespace, \
lowest_instance, \
station
# Is there only one channel instance enabled?
i = 0
for one_station in _station_data:
if one_station['enabled']:
station = one_station
i += 1
if i == 1:
return station['namespace'], \
station['instance'], \
station
# round robin capability when instances are tied to a single provider
# must make sure the channel is enabled for both instances
ns = []
inst = []
counter = {}
for one_station in _station_data:
ns.append(one_station['namespace'])
inst.append(one_station['instance'])
counter[one_station['instance']] = 0
for namespace, status_list in WebHTTPHandler.rmg_station_scans.items():
for status in status_list:
if type(status) is dict:
if status['instance'] not in counter:
counter[status['instance']] = 1
else:
counter[status['instance']] += 1
# pick the instance with the lowest counter
lowest_value = 100
for instance, value in counter.items():
if value < lowest_value:
lowest_value = value
lowest_instance = instance
for i in range(len(inst)):
if inst[i] == lowest_instance:
lowest_namespace = ns[i]
break
# find the station data associated with the pick
for one_station in _station_data:
if one_station['namespace'] == lowest_namespace and \
one_station['instance'] == lowest_instance:
station = one_station
break
return lowest_namespace, lowest_instance, station
@classmethod
def init_class_var_sub(cls, _plugins, _hdhr_queue, _terminate_queue, _sched_queue):
WebHTTPHandler.logger = logging.getLogger(__name__)
tuner_count = 0
for plugin_name in _plugins.plugins.keys():
if plugin_name:
if _plugins.config_obj.data.get(plugin_name.lower()):
if 'player-tuner_count' in _plugins.config_obj.data[plugin_name.lower()]:
WebHTTPHandler.logger.debug('{} Implementing {} tuners for {}'
.format(cls.__name__,
_plugins.config_obj.data[plugin_name.lower()][
'player-tuner_count'],
plugin_name))
tuner_count += _plugins.config_obj.data[plugin_name.lower()]['player-tuner_count']
WebHTTPHandler.total_instances = tuner_count
super(TunerHttpHandler, cls).init_class_var(_plugins, _hdhr_queue, _terminate_queue)
class TunerHttpServer(Thread):
def __init__(self, server_socket, _plugins):
Thread.__init__(self)
self.bind_ip = _plugins.config_obj.data['web']['bind_ip']
self.bind_port = _plugins.config_obj.data['web']['plex_accessible_port']
self.socket = server_socket
self.server_close = None
self.start()
def run(self):
HttpHandlerClass = FactoryTunerHttpHandler()
httpd = HTTPServer((self.bind_ip, int(self.bind_port)), HttpHandlerClass, bind_and_activate=False)
httpd.socket = self.socket
httpd.server_bind = self.server_close = lambda self: None
httpd.serve_forever()
def FactoryTunerHttpHandler():
class CustomHttpHandler(TunerHttpHandler):
def __init__(self, *args, **kwargs):
super(CustomHttpHandler, self).__init__(*args, **kwargs)
return CustomHttpHandler
def child_exited(sig, frame):
logger = logging.getLogger(__name__)
try:
pid, exitcode = os.wait()
logger.warning('Child process {} exited with code {}'.format(pid, exitcode))
except ChildProcessError as ex:
logger.warning('Child exit error {}'.format(str(ex)))
def start(_plugins, _hdhr_queue, _terminate_queue):
# uncomment this to find out about m3u8 subprocess exits
#signal.signal(signal.SIGCHLD, child_exited)
TunerHttpHandler.start_httpserver(
_plugins, _hdhr_queue, _terminate_queue,
_plugins.config_obj.data['web']['plex_accessible_port'],
TunerHttpServer)