""" MIT License Copyright (C) 2023 ROCKY4546 https://github.com/rocky4546 This file is part of Cabernet Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the “Software”), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. """ import ipaddress import logging import random import socket import string import struct import sys import zlib from ipaddress import IPv4Network from ipaddress import IPv4Address from multiprocessing import Process from threading import Thread import lib.common.utils as utils HDHR_PORT = 65001 HDHR_ADDR = '224.0.0.255' # multicast to local addresses only # HDHR_ADDR = '239.255.255.250' SERVER_ID = 'HDHR3' HDHOMERUN_TYPE_DISCOVER_REQ = 2 HDHOMERUN_TYPE_DISCOVER_RSP = 3 HDHOMERUN_TYPE_GETSET_REQ = 4 HDHOMERUN_TYPE_GETSET_RSP = 5 HDHOMERUN_GETSET_NAME = 3 HDHOMERUN_GETSET_VALUE = 4 HDHOMERUN_ERROR_MESSAGE = 5 HDHOMERUN_GETSET_LOCKKEY = 21 START_SEND_UDP_ATSC_PKTS = 1 STOP_SEND_UDP_ATSC_PKTS = 0 HDHOMERUN_BASE_URL = 0x2A HDHOMERUN_LINEUP_URL = 0x27 # HDHOMERUN_DEVICE_AUTH_STR = 0x2B # HDHOMERUN_DEVICE_TYPE_WILDCARD = 0xFFFFFFFF HDHOMERUN_DEVICE_TYPE_TUNER = 0x00000001 # HDHOMERUN_DEVICE_ID_WILDCARD = 0xFFFFFFFF msgs = { 'lockedErrMsg': """ERROR: resource locked by {}""", 'scanErrMsg': """ERROR: tuner busy""", } tuner_status_msg = { 'Idle': b'ch=none lock=none ss=0 snq=0 seq=0 bps=0 pps=0', 'Stream': b'ch=8vsb:183000000 lock=8vsb ss=98 snq=80 seq=90 bps=12345678 pps=1234', } logger = None def hdhr_process(config, _tuner_queue): global logger utils.logging_setup(config['paths']) logger = logging.getLogger(__name__) if config['hdhomerun']['udp_netmask'] is None: logger.error('Config setting [hdhomerun][udp_netmask] required. Exiting hdhr service') return try: IPv4Network(config['hdhomerun']['udp_netmask']) except (ipaddress.AddressValueError, ValueError) as err: logger.error( 'Illegal value in [hdhomerun][udp_netmask]. ' 'Format must be #.#.#.#/#. Exiting hdhr service. ERROR: {}'.format(err)) return hdhr = HDHRServer(config, _tuner_queue) # startup the multicast thread first which will exit when this function exits p_multi = Process(target=hdhr.run_multicast, args=(config["web"]["bind_ip"],)) p_multi.daemon = True p_multi.start() # startup the standard tcp listener, but have this hang the process # the socket listener will terminate from main.py when the process is stopped hdhr.run_listener(config["web"]["bind_ip"]) logger.info('hdhr_processing terminated') def hdhr_validate_device_id(_device_id): global logger hex_digits = set(string.hexdigits) if len(_device_id) != 8: logger.error('ERROR: HDHR Device ID must be 8 hexidecimal values') return False if not all(c in hex_digits for c in _device_id): logger.error('ERROR: HDHR Device ID characters must all be hex (0-A)') return False device_id_bin = bytes.fromhex(_device_id) cksum_lookup = [0xA, 0x5, 0xF, 0x6, 0x7, 0xC, 0x1, 0xB, 0x9, 0x2, 0x8, 0xD, 0x4, 0x3, 0xE, 0x0] device_id_int = int.from_bytes(device_id_bin, byteorder='big') cksum = 0 cksum ^= cksum_lookup[(device_id_int >> 28) & 0x0F] cksum ^= (device_id_int >> 24) & 0x0F cksum ^= cksum_lookup[(device_id_int >> 20) & 0x0F] cksum ^= (device_id_int >> 16) & 0x0F cksum ^= cksum_lookup[(device_id_int >> 12) & 0x0F] cksum ^= (device_id_int >> 8) & 0x0F cksum ^= cksum_lookup[(device_id_int >> 4) & 0x0F] cksum ^= (device_id_int >> 0) & 0x0F return cksum == 0 # given a device id, will adjust the last 4 bits to make it valid and return the integer value def hdhr_get_valid_device_id(_device_id): global logger hex_digits = set(string.hexdigits) if len(_device_id) != 8: logger.error('ERROR: HDHR Device ID must be 8 hexadecimal values') return 0 if not all(c in hex_digits for c in _device_id): logger.error('ERROR: HDHR Device ID characters must all be hex (0-A)') return 0 device_id_bin = bytes.fromhex(_device_id) cksum_lookup = [0xA, 0x5, 0xF, 0x6, 0x7, 0xC, 0x1, 0xB, 0x9, 0x2, 0x8, 0xD, 0x4, 0x3, 0xE, 0x0] device_id_int = int.from_bytes(device_id_bin, byteorder='big') cksum = 0 cksum ^= cksum_lookup[(device_id_int >> 28) & 0x0F] cksum ^= (device_id_int >> 24) & 0x0F cksum ^= cksum_lookup[(device_id_int >> 20) & 0x0F] cksum ^= (device_id_int >> 16) & 0x0F cksum ^= cksum_lookup[(device_id_int >> 12) & 0x0F] cksum ^= (device_id_int >> 8) & 0x0F cksum ^= cksum_lookup[(device_id_int >> 4) & 0x0F] new_dev_id = (device_id_int & 0xFFFFFFF0) + cksum return struct.pack('>I', new_dev_id).hex().upper() def hdhr_gen_device_id(): baseid = '105' + ''.join(random.choice('0123456789ABCDEF') for _ in range(4)) + '0' return hdhr_get_valid_device_id(baseid) class HDHRServer: """A class implementing a HDHR server. The notify_received and searchReceived methods are called when the appropriate type of datagram is received by the server.""" known = {} def __init__(self, _config, _tuner_queue): self.config = _config self.logger = logging.getLogger(__name__ + '_tcp') self.tuner_queue = _tuner_queue self.sock_multicast = None self.sock_listener = None self._t = None self.tuners = {} for area, area_data in self.config.items(): if 'player-tuner_count' in area_data.keys(): self.tuners[area] = dict.fromkeys(range(self.config[area]['player-tuner_count'])) for i in range(self.config[area]["player-tuner_count"]): self.tuners[area][i] = { 'channel': None, 'status': 'Idle' } def run_listener(self, _bind_ip=''): self.logger.info('TCP: Starting HDHR TCP listener server') self.sock_listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server_address = (_bind_ip, HDHR_PORT) self.sock_listener.bind(server_address) self.sock_listener.listen(3) self._t = Thread(target=self.process_queue, args=(self.tuner_queue,)) self._t.daemon = True self._t.start() while True: # wait for a connection connection, client_address = self.sock_listener.accept() t_conn = Thread(target=self.process_client_connection, args=(connection, client_address,)) t_conn.daemon = True t_conn.start() def process_client_connection(self, _connection, _address): # multi-threading multiple clients talking to the device at one time # buffer must be large enough to hold a full rcvd packets self.logger.debug('TCP: New connection established {}'.format(_address)) try: while True: msg = _connection.recv(1316) if not msg: # client disconnect self.logger.debug('TCP: Client terminated connection {}'.format(_address)) break self.logger.debug('TCP: data rcvd={}'.format(msg)) frame_type = HDHRServer.get_frame_type(msg) if frame_type == HDHOMERUN_TYPE_GETSET_REQ: req_dict = self.parse_getset_request(msg) response = self.create_getset_response(req_dict, _address) if response is not None: self.logger.debug('TCP: Sending response={}'.format(response)) _connection.sendall(response) else: self.logger.error('TCP: Unknown frame/message type from {} type={}'.format(_address, frame_type)) finally: _connection.close() def process_queue(self, _queue): while True: queue_item = _queue.get() # queue item has a command and arguments which are based on the command. self.tuners[queue_item['namespace'].lower()][queue_item['tuner']] = { 'channel': queue_item['channel'], 'status': queue_item['status'] } @staticmethod def get_frame_type(_msg): """ Get the type of message requested :param _msg: :return: """ # msg is in the first 2 bytes of the string (frame_type,) = struct.unpack('>H', _msg[:2]) return frame_type @staticmethod def gen_err_response(_frame_type, _tag, _text): # This is a tag type of HDHOMERUN_ERROR_MESSAGE # does not include the crc msg = msgs[_tag].format(*_text).encode() tag = utils.set_u8(HDHOMERUN_ERROR_MESSAGE) err_resp = utils.set_str(msg, True) msg_len = utils.set_u16(len(tag) + len(err_resp)) response = _frame_type + msg_len + tag + err_resp return response def create_getset_response(self, _req_dict, _address): (host, port) = _address frame_type = utils.set_u16(HDHOMERUN_TYPE_GETSET_RSP) name = _req_dict[HDHOMERUN_GETSET_NAME] name_str = name.decode('utf-8') # if HDHOMERUN_GETSET_VALUE in _req_dict.keys(): # value = _req_dict[HDHOMERUN_GETSET_VALUE] # else: # value = None if name == b'/sys/model': # required to id the device name_resp = utils.set_u8(HDHOMERUN_GETSET_NAME) + utils.set_str(name, True) value_resp = utils.set_u8(HDHOMERUN_GETSET_VALUE) + utils.set_str(b'hdhomerun4_atsc', True) msg_len = utils.set_u16(len(name_resp) + len(value_resp)) response = frame_type + msg_len + name_resp + value_resp x = zlib.crc32(response) crc = struct.pack('= len(_msg) - 4: return None, None, None (msg_type, length) = struct.unpack('BB', _msg[_offset:_offset + 2]) _offset += 2 (value,) = struct.unpack('%ds' % (length - 1), _msg[_offset:_offset + length - 1]) _offset += length return msg_type, value, _offset def run_multicast(self, _bind_ip=''): utils.logging_setup(self.config['paths']) self.logger = logging.getLogger(__name__ + '_udp') self.logger.info('UDP: Starting HDHR multicast server') self.sock_multicast = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) self.sock_multicast.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) self.sock_multicast.bind(('0.0.0.0', HDHR_PORT)) mreq = struct.pack('4sl', socket.inet_aton(HDHR_ADDR), socket.INADDR_ANY) self.sock_multicast.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq) self.sock_multicast.settimeout(2) while True: try: data, addr = self.sock_multicast.recvfrom(1024) self.datagram_received(data, addr) except socket.timeout: continue def datagram_received(self, _data, _host_port): """Handle a received multicast datagram.""" (host, port) = _host_port if self.config['hdhomerun']['udp_netmask'] is None: is_allowed = True else: try: net = IPv4Network(self.config['hdhomerun']['udp_netmask']) except (ipaddress.AddressValueError, ValueError) as err: self.logger.error( 'Illegal value in [hdhomerun][udp_netmask]. ' 'Format must be #.#.#.#/#. Exiting hdhr service. ERROR: {}'.format(err)) sys.exit(1) is_allowed = IPv4Address(host) in net if not is_allowed: return self.logger.debug('UDP: from {}:{}'.format(host, port)) try: (frame_type, msg_len, device_type, sub_dt_len, sub_dt, device_id, sub_did_len, sub_did) = \ struct.unpack('>HHBBIBBI', _data[0:-4]) # (crc,) = struct.unpack('