File size: 16,251 Bytes
27867f1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
"""
MIT License

Copyright (C) 2021 ROCKY4546
https://github.com/rocky4546

This file is part of Cabernet

Permission is hereby granted, free of charge, to any person obtaining a copy of this software
and associated documentation files (the "Software"), to deal in the Software without restriction,
including without limitation the rights to use, copy, modify, merge, publish, distribute,
sublicense, and/or sell copies of the Software, and to permit persons to whom the Software
is furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all copies or
substantial portions of the Software.
"""

import os
import json
import logging
import os
import pathlib
import signal
import threading
import time
import urllib
from threading import Thread
from logging import config
from http.server import HTTPServer
from urllib.parse import urlparse

from lib.common import utils
from lib.common.decorators import gettunerrequest
from lib.web.pages.templates import web_templates
from lib.db.db_config_defn import DBConfigDefn
from lib.streams.m3u8_redirect import M3U8Redirect
from lib.streams.internal_proxy import InternalProxy
from lib.streams.ffmpeg_proxy import FFMpegProxy
from lib.streams.streamlink_proxy import StreamlinkProxy
from lib.streams.thread_queue import ThreadQueue
from .web_handler import WebHTTPHandler


@gettunerrequest.route('/tunerstatus')
def tunerstatus(_webserver):
    _webserver.do_mime_response(200, 'application/json', json.dumps(WebHTTPHandler.rmg_station_scans, cls=ObjectJsonEncoder))


@gettunerrequest.route('RE:/watch/.+')
def watch(_webserver):
    sid = _webserver.content_path.replace('/watch/', '')
    _webserver.do_tuning(sid, _webserver.query_data['name'], _webserver.query_data['instance'])


@gettunerrequest.route('/logreset')
def logreset(_webserver):
    logging.config.fileConfig(fname=_webserver.config['paths']['config_file'],
                              disable_existing_loggers=False)
    _webserver.do_mime_response(200, 'text/html')


@gettunerrequest.route('RE:/auto/v.+')
def autov(_webserver):
    channel = _webserver.content_path.replace('/auto/v', '')
    station_list = TunerHttpHandler.channels_db.get_channels(
        _webserver.query_data['name'], _webserver.query_data['instance'])

    # check channel number with adjustments
    for station in station_list.keys():
        updated_chnum = utils.wrap_chnum(
            str(station_list[station][0]['display_number']), station_list[station][0]['namespace'],
            station_list[station][0]['instance'], _webserver.config)
        if updated_chnum == channel:
            _webserver.do_tuning(station, _webserver.query_data['name'],
                                 _webserver.query_data['instance'])
            return

    _webserver.do_mime_response(503, 'text/html', web_templates['htmlError'].format('503 - Unknown channel'))


class ObjectJsonEncoder(json.JSONEncoder):
    def default(self, obj):
        if isinstance(obj, ThreadQueue):
            return str(obj)
        else:
            return json.JSONEncoder.default(self.obj)


class TunerHttpHandler(WebHTTPHandler):

    def __init__(self, *args):
        os.chdir(os.path.dirname(os.path.abspath(__file__)))
        self.script_dir = pathlib.Path(os.path.dirname(os.path.abspath(__file__)))
        self.ffmpeg_proc = None  # process for running ffmpeg
        self.block_moving_avg = 0
        self.last_refresh = None
        self.block_prev_pts = 0
        self.block_prev_time = None
        self.buffer_prev_time = None
        self.block_max_pts = 0
        self.small_pkt_streaming = False
        self.real_namespace = None
        self.real_instance = None
        self.content_path = None
        self.query_data = None
        self.m3u8_redirect = M3U8Redirect(TunerHttpHandler.plugins, TunerHttpHandler.hdhr_queue)
        self.internal_proxy = InternalProxy(TunerHttpHandler.plugins, TunerHttpHandler.hdhr_queue)
        self.ffmpeg_proxy = FFMpegProxy(TunerHttpHandler.plugins, TunerHttpHandler.hdhr_queue)
        self.streamlink_proxy = StreamlinkProxy(TunerHttpHandler.plugins, TunerHttpHandler.hdhr_queue)
        self.db_configdefn = DBConfigDefn(self.config)
        try:
            super().__init__(*args)
        except ConnectionResetError as ex:
            self.logger.warning(
                'ConnectionResetError occurred, will try again {}'
                .format(ex))
            time.sleep(1)
            super().__init__(*args)
        except ValueError as ex:
            self.logger.warning(
                'ValueError occurred, Bad stream recieved.  {}'
                .format(ex))
            raise

    def do_GET(self):
        try:
            self.content_path, self.query_data = self.get_query_data()
            if gettunerrequest.call_url(self, self.content_path):
                pass
            else:
                self.logger.warning('Unknown request to {}'.format(self.content_path))
                self.do_mime_response(501, 'text/html', web_templates['htmlError'].format('501 - Not Implemented'))
        except Exception as ex:
            self.logger.exception('{}{}'.format(
                'UNEXPECTED EXCEPTION on GET=', ex))

    def do_POST(self):
        try:
            self.content_path = self.path
            self.query_data = {}
            # get POST data
            if self.headers.get('Content-Length') != '0':
                post_data = self.rfile.read(int(self.headers.get('Content-Length'))).decode('utf-8')
                # if an input is empty, then it will remove it from the list when the dict is gen
                self.query_data = urllib.parse.parse_qs(post_data)

            # get QUERYSTRING
            if self.path.find('?') != -1:
                get_data = self.path[(self.path.find('?') + 1):]
                get_data_elements = get_data.split('&')
                for get_data_item in get_data_elements:
                    get_data_item_split = get_data_item.split('=')
                    if len(get_data_item_split) > 1:
                        self.query_data[get_data_item_split[0]] = get_data_item_split[1]

            self.do_mime_response(501, 'text/html', web_templates['htmlError'].format('501 - Badly Formatted Message'))
        except Exception as ex:
            self.logger.exception('{}{}'.format(
                'UNEXPECTED EXCEPTION on POST=', ex))

    def do_tuning(self, sid, _namespace, _instance):
        # refresh the config data in case it changed in the web_admin process
        self.plugins.config_obj.refresh_config_data()
        self.config = self.db_configdefn.get_config()
        self.plugins.config_obj.data = self.config
        # try:
        station_list = TunerHttpHandler.channels_db.get_channels(_namespace, _instance)
        try:
            self.real_namespace, self.real_instance, station_data = self.get_ns_inst_station(station_list[sid])
            if not self.config[self.real_namespace.lower()]['enabled']:
                self.logger.warning(
                    'Plugin is not enabled, ignoring request: {} sid:{}'
                    .format(self.real_namespace, sid))
                self.do_mime_response(503, 'text/html', web_templates['htmlError'].format('503 - Plugin Disabled'))
                return
            if not self.plugins.plugins[self.real_namespace].plugin_obj:
                self.logger.warning(
                    'Plugin not initialized, ignoring request: {}:{} sid:{}'
                    .format(self.real_namespace, self.real_instance, sid))
                self.do_mime_response(503, 'text/html',
                                      web_templates['htmlError'].format('503 - Plugin Not Initialized'))
                return
            section = self.plugins.plugins[self.real_namespace].plugin_obj.instances[self.real_instance].config_section
            if not self.config[section]['enabled']:
                self.logger.warning(
                    'Plugin Instance is not enabled, ignoring request: {}:{} sid:{}'
                    .format(self.real_namespace, self.real_instance, sid))
                self.do_mime_response(503, 'text/html',
                                      web_templates['htmlError'].format('503 - Plugin Instance Disabled'))
                return
        except (KeyError, TypeError):
            self.logger.warning(
                'Unknown Channel ID, not found in database {} {} {}'
                .format(_namespace, _instance, sid))
            self.do_mime_response(503, 'text/html', web_templates['htmlError'].format('503 - Unknown channel'))
            return
        self.logger.notice('{}:{} Tuning to channel {}'.format(self.real_namespace, self.real_instance, sid))
        if self.config[section]['player-stream_type'] == 'm3u8redirect':
            self.do_dict_response(self.m3u8_redirect.gen_m3u8_response(station_data))
            return
        elif self.config[section]['player-stream_type'] == 'internalproxy':
            resp = self.internal_proxy.gen_response(
                self.real_namespace, self.real_instance, 
                station_data['display_number'], station_data['json'].get('VOD'))
            self.do_dict_response(resp)
            if resp['tuner'] < 0:
                return
            else:
                self.internal_proxy.stream(station_data, self.wfile, self.terminate_queue, resp['tuner'])
        elif self.config[section]['player-stream_type'] == 'ffmpegproxy':
            resp = self.ffmpeg_proxy.gen_response(
                self.real_namespace, self.real_instance, 
                station_data['display_number'], station_data['json'].get('VOD'))
            self.do_dict_response(resp)
            if resp['tuner'] < 0:
                return
            else:
                self.ffmpeg_proxy.stream(station_data, self.wfile, resp['tuner'])
        elif self.config[section]['player-stream_type'] == 'streamlinkproxy':
            resp = self.streamlink_proxy.gen_response(
                self.real_namespace, self.real_instance, 
                station_data['display_number'], station_data['json'].get('VOD'))
            self.do_dict_response(resp)
            if resp['tuner'] < 0:
                return
            else:
                self.streamlink_proxy.stream(station_data, self.wfile, resp['tuner'])
        else:
            self.do_mime_response(501, 'text/html', web_templates['htmlError'].format('501 - Unknown streamtype'))
            self.logger.error('Unknown [player-stream_type] {}'
                              .format(self.config[section]['player-stream_type']))
            return
        station_scans = WebHTTPHandler.rmg_station_scans[self.real_namespace][resp['tuner']]
        if station_scans != 'Idle':
            if station_scans['mux'] is None or not station_scans['mux'].is_alive():
                self.logger.notice('Provider Connection Closed, ch_id={} {}'.format(sid, threading.get_ident()))
                WebHTTPHandler.rmg_station_scans[self.real_namespace][resp['tuner']] = 'Idle'
            else:
                self.logger.info('1 Client Connection Closed, provider continuing ch_id={} {}'.format(sid, threading.get_ident()))
        else:
            self.logger.info('2 Client Connection Closed, provider continuing ch_id={} {}'.format(sid, threading.get_ident()))
        time.sleep(0.01)

    def get_ns_inst_station(self, _station_data):
        lowest_namespace = _station_data[0]['namespace']
        lowest_instance = _station_data[0]['instance']
        station = _station_data[0]

        # do simple checks first.
        # is there only one channel?
        if len(_station_data) == 1:
            return lowest_namespace, \
                lowest_instance, \
                station

        # Is there only one channel instance enabled?
        i = 0
        for one_station in _station_data:
            if one_station['enabled']:
                station = one_station
                i += 1
        if i == 1:
            return station['namespace'], \
                station['instance'], \
                station

        # round robin capability when instances are tied to a single provider
        # must make sure the channel is enabled for both instances
        ns = []
        inst = []
        counter = {}
        for one_station in _station_data:
            ns.append(one_station['namespace'])
            inst.append(one_station['instance'])
            counter[one_station['instance']] = 0
        for namespace, status_list in WebHTTPHandler.rmg_station_scans.items():
            for status in status_list:
                if type(status) is dict:
                    if status['instance'] not in counter:
                        counter[status['instance']] = 1
                    else:
                        counter[status['instance']] += 1

        # pick the instance with the lowest counter
        lowest_value = 100
        for instance, value in counter.items():
            if value < lowest_value:
                lowest_value = value
                lowest_instance = instance
        for i in range(len(inst)):
            if inst[i] == lowest_instance:
                lowest_namespace = ns[i]
                break

        # find the station data associated with the pick
        for one_station in _station_data:
            if one_station['namespace'] == lowest_namespace and \
                    one_station['instance'] == lowest_instance:
                station = one_station
                break
        return lowest_namespace, lowest_instance, station

    @classmethod
    def init_class_var_sub(cls, _plugins, _hdhr_queue, _terminate_queue, _sched_queue):
        WebHTTPHandler.logger = logging.getLogger(__name__)
        tuner_count = 0
        for plugin_name in _plugins.plugins.keys():
            if plugin_name:
                if _plugins.config_obj.data.get(plugin_name.lower()):
                    if 'player-tuner_count' in _plugins.config_obj.data[plugin_name.lower()]:
                        WebHTTPHandler.logger.debug('{} Implementing {} tuners for {}'
                                                    .format(cls.__name__,
                                                            _plugins.config_obj.data[plugin_name.lower()][
                                                                'player-tuner_count'],
                                                            plugin_name))
                        tuner_count += _plugins.config_obj.data[plugin_name.lower()]['player-tuner_count']
        WebHTTPHandler.total_instances = tuner_count
        super(TunerHttpHandler, cls).init_class_var(_plugins, _hdhr_queue, _terminate_queue)


class TunerHttpServer(Thread):

    def __init__(self, server_socket, _plugins):
        Thread.__init__(self)
        self.bind_ip = _plugins.config_obj.data['web']['bind_ip']
        self.bind_port = _plugins.config_obj.data['web']['plex_accessible_port']
        self.socket = server_socket
        self.server_close = None
        self.start()

    def run(self):
        HttpHandlerClass = FactoryTunerHttpHandler()
        httpd = HTTPServer((self.bind_ip, int(self.bind_port)), HttpHandlerClass, bind_and_activate=False)
        httpd.socket = self.socket
        httpd.server_bind = self.server_close = lambda self: None
        httpd.serve_forever()


def FactoryTunerHttpHandler():
    class CustomHttpHandler(TunerHttpHandler):
        def __init__(self, *args, **kwargs):
            super(CustomHttpHandler, self).__init__(*args, **kwargs)

    return CustomHttpHandler

def child_exited(sig, frame):
    logger = logging.getLogger(__name__)
    try:
        pid, exitcode = os.wait()
        logger.warning('Child process {} exited with code {}'.format(pid, exitcode))
    except ChildProcessError as ex:
        logger.warning('Child exit error {}'.format(str(ex)))

def start(_plugins, _hdhr_queue, _terminate_queue):
    # uncomment this to find out about m3u8 subprocess exits
    #signal.signal(signal.SIGCHLD, child_exited)
    TunerHttpHandler.start_httpserver(
        _plugins, _hdhr_queue, _terminate_queue,
        _plugins.config_obj.data['web']['plex_accessible_port'],
        TunerHttpServer)