# pylama:ignore=E722 """ MIT License Copyright (C) 2023 ROCKY4546 https://github.com/rocky4546 This file is part of Cabernet Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. """ import traceback import datetime import errno import logging import re import xml.dom.minidom as minidom from xml.etree import ElementTree import lib.common.utils as utils import lib.tvheadend.epg_category as epg_category from lib.common.decorators import getrequest from lib.db.db_channels import DBChannels from lib.db.db_epg import DBepg from lib.web.pages.templates import web_templates @getrequest.route('/xmltv.xml') def xmltv_xml(_webserver): try: epg = EPG(_webserver) epg.get_epg_xml(_webserver) except MemoryError as e: _webserver.do_mime_response( 501, 'text/html', web_templates['htmlError'].format('501 - MemoryError: {}'.format(e))) class EPG: # https://github.com/XMLTV/xmltv/blob/master/xmltv.dtd def __init__(self, _webserver): self.logger = logging.getLogger(__name__) self.webserver = _webserver self.config = _webserver.plugins.config_obj.data self.epg_db = DBepg(self.config) self.channels_db = DBChannels(self.config) self.plugins = _webserver.plugins self.namespace = _webserver.query_data['name'] self.instance = _webserver.query_data['instance'] self.tv_tag = False self.today = datetime.datetime.utcnow().date() self.prog_processed = [] def get_next_epg_day(self): is_enabled = False day_data = None ns = None inst = None day = None while not is_enabled: day_data, ns, inst, day = self.epg_db.get_next_row() if day_data is None: break if day < self.today: continue config_section = utils.instance_config_section(ns, inst) if not self.config.get(ns.lower()) \ or not self.config[ns.lower()]['enabled'] \ or not self.config.get(config_section) \ or not self.config[config_section]['enabled'] \ or not self.config[config_section].get('epg-enabled'): continue is_enabled = True return day_data, ns, inst, day def get_epg_xml(self, _webserver): xml_out = None if self.namespace is not None \ and not self.plugins.plugins.get(self.namespace): _webserver.do_mime_response( 501, 'text/html', web_templates['htmlError'].format('501 - Invalid Namespace: {}'.format(self.namespace))) return try: _webserver.do_dict_response({ 'code': 200, 'headers': {'Content-type': 'application/xml; Transfer-Encoding: chunked'}, 'text': None}) xml_out = self.gen_header_xml() channel_list = self.channels_db.get_channels(self.namespace, self.instance) self.gen_channel_xml(xml_out, channel_list) self.write_xml(xml_out, keep_xml_prolog=True) xml_out = None self.epg_db.init_get_query(self.namespace, self.instance) day_data, ns, inst, day = self.get_next_epg_day() self.logger.debug('Processing EPG data {}:{} {}' .format(ns, inst, day)) self.prog_processed = [] while day_data: xml_out = EPG.gen_minimal_header_xml() self.gen_program_xml(xml_out, day_data, channel_list, ns, inst) self.write_xml(xml_out) xml_out.clear() day_data, ns, inst, day = self.get_next_epg_day() self.logger.debug('Processing EPG data {}:{} {}' .format(ns, inst, day)) day_data = None self.epg_db.close_query() self.webserver.wfile.write(b'\r\n') self.webserver.wfile.flush() except MemoryError as e: self.logger.error('MemoryError parsing large xml') raise e except IOError as ex: # Check we hit a broken pipe when trying to write back to the client if ex.errno in [errno.EPIPE, errno.ECONNABORTED, errno.ECONNRESET, errno.ECONNREFUSED]: # Normal process. Client request end of stream self.logger.info('Connection dropped by client {}' .format(ex)) xml_out.clear() return else: self.logger.error('{}{}'.format( 'UNEXPECTED EXCEPTION=', ex)) raise xml_out = None # clear to help garbage collection def write_xml(self, _xml, keep_xml_prolog=False): if self.config['epg']['epg_prettyprint']: if not keep_xml_prolog: epg_dom = minidom.parseString(ElementTree.tostring(_xml, encoding='UTF-8', method='xml')).toprettyxml() if epg_dom.endswith('\n'): epg_dom = epg_dom.replace('\n', '', 1) epg_dom = epg_dom.replace('', '', 1) else: epg_dom = '' else: epg_dom = minidom.parseString(ElementTree.tostring(_xml, encoding='UTF-8', method='xml')).toprettyxml() epg_dom = epg_dom.replace('\n','\n\n',1) if epg_dom.endswith('\n'): epg_dom = re.sub('\n$', '', epg_dom) else: epg_dom = re.sub('"/>\n$', '">', epg_dom) self.webserver.wfile.write(epg_dom.encode()) else: if not keep_xml_prolog: epg_dom = ElementTree.tostring(_xml) if epg_dom.endswith(b''): epg_dom = b'' else: epg_dom = epg_dom.replace(b'', b'', 1) epg_dom = epg_dom.replace(b'', b'', 1) else: epg_dom = b'' epg_dom = epg_dom + ElementTree.tostring(_xml) if epg_dom.endswith(b''): epg_dom = re.sub(b'$', b'', epg_dom) else: epg_dom = re.sub(b'" />$', b'">', epg_dom) self.webserver.wfile.write(epg_dom + b'\r\n') epg_dom = None # clear to help garbage collection return True def gen_channel_xml(self, _et_root, _channel_list): sids_processed = [] for sid, sid_data_list in _channel_list.items(): if sid in sids_processed: continue sids_processed.append(sid) for ch_data in sid_data_list: if not ch_data['enabled']: continue config_section = utils.instance_config_section(ch_data['namespace'], ch_data['instance']) if not self.config.get(ch_data['namespace'].lower()) \ or not self.config[ch_data['namespace'].lower()]['enabled'] \ or not self.config.get(config_section) \ or not self.config[config_section]['enabled'] \ or not self.config[config_section].get('epg-enabled'): continue updated_chnum = utils.wrap_chnum( ch_data['display_number'], ch_data['namespace'], ch_data['instance'], self.config) if self.config['epg'].get('epg_add_plugin_to_channel_id'): ch_ref = ch_data['namespace'] + '-' else: ch_ref = '' if self.config['epg'].get('epg_use_channel_number'): ch_ref += updated_chnum else: ch_ref += sid c_out = EPG.sub_el(_et_root, 'channel', id=ch_ref) EPG.sub_el(c_out, 'display-name', _text='%s %s' % (updated_chnum, ch_data['display_name'])) EPG.sub_el(c_out, 'display-name', _text=ch_data['display_name']) EPG.sub_el(c_out, 'display-name', _text=ch_data['json']['callsign']) EPG.sub_el(c_out, 'display-name', _text='%s %s' % (updated_chnum, ch_data['json']['callsign'])) EPG.sub_el(c_out, 'lcn', _text='%s' % (updated_chnum)) if self.config['epg']['epg_channel_icon'] and ch_data['thumbnail'] is not None: EPG.sub_el(c_out, 'icon', src=ch_data['thumbnail']) break return _et_root def gen_program_xml(self, _et_root, _prog_list, _channel_list, _ns, _inst): for prog_data in _prog_list: proginfo = prog_data['start'] + prog_data['channel'] if proginfo in self.prog_processed: continue skip = False try: for ch_data in _channel_list[prog_data['channel']]: if ch_data['namespace'] == _ns \ and ch_data['instance'] == _inst: if not ch_data['enabled']: skip = True break config_section = utils.instance_config_section(ch_data['namespace'], ch_data['instance']) if not self.config[ch_data['namespace'].lower()]['enabled']: skip = True break if not self.config[config_section]['enabled']: skip = True break if not self.config[config_section]['epg-enabled']: skip = True break except KeyError as ex: skip = True if skip: continue self.prog_processed.append(proginfo) if self.config['epg'].get('epg_add_plugin_to_channel_id'): ch_ref = ch_data['namespace'] + '-' else: ch_ref = '' if self.config['epg'].get('epg_use_channel_number'): ch_data = _channel_list[prog_data['channel']][0] updated_chnum = utils.wrap_chnum( ch_data['display_number'], ch_data['namespace'], ch_data['instance'], self.config) ch_ref += updated_chnum else: ch_ref += prog_data['channel'] prog_out = EPG.sub_el(_et_root, 'programme', start=prog_data['start'], stop=prog_data['stop'], channel=ch_ref) if prog_data['title']: EPG.sub_el(prog_out, 'title', lang='en', _text=prog_data['title']) if prog_data['subtitle']: EPG.sub_el(prog_out, 'sub-title', lang='en', _text=prog_data['subtitle']) descr_add = '' if self.config['epg']['description'] == 'extend': if prog_data['formatted_date']: descr_add += '(' + prog_data['formatted_date'] + ') ' if prog_data['genres']: descr_add += ' / '.join(prog_data['genres']) + ' / ' if prog_data['se_common']: descr_add += prog_data['se_common'] elif prog_data['episode']: descr_add += 'E' + str(prog_data['episode']) descr_add += '\n' + prog_data['desc'] elif self.config['epg']['description'] == 'brief': descr_add = prog_data['short_desc'] elif self.config['epg']['description'] == 'normal': descr_add = prog_data['desc'] else: self.logger.warning('Config value [epg][description] is invalid: ' + self.config['epg']['description']) EPG.sub_el(prog_out, 'desc', lang='en', _text=descr_add) if prog_data['video_quality']: video_out = EPG.sub_el(prog_out, 'video') EPG.sub_el(video_out, 'quality', prog_data['video_quality']) if prog_data['air_date']: EPG.sub_el(prog_out, 'date', _text=prog_data['air_date']) EPG.sub_el(prog_out, 'length', units='minutes', _text=str(prog_data['length'])) if prog_data['genres']: for f in prog_data['genres']: if self.config['epg']['genre'] == 'normal': pass elif self.config['epg']['genre'] == 'tvheadend': if f in epg_category.TVHEADEND.keys(): f = epg_category.TVHEADEND[f] else: self.logger.warning('Config value [epg][genre] is invalid: ' + self.config['epg']['genre']) EPG.sub_el(prog_out, 'category', lang='en', _text=f.strip()) if prog_data['icon'] and self.config['epg']['epg_program_icon']: EPG.sub_el(prog_out, 'icon', src=prog_data['icon']) if prog_data['actors'] or prog_data['directors']: r = ElementTree.SubElement(prog_out, 'credits') if prog_data['directors']: for actor in prog_data['directors']: EPG.sub_el(r, 'director', _text=actor) if prog_data['actors']: for actor in prog_data['actors']: EPG.sub_el(r, 'actor', _text=actor) if prog_data['rating']: r = ElementTree.SubElement(prog_out, 'rating') EPG.sub_el(r, 'value', _text=prog_data['rating']) if prog_data['se_common']: EPG.sub_el(prog_out, 'episode-num', system='common', _text=prog_data['se_common']) EPG.sub_el(prog_out, 'episode-num', system='SxxExx', _text=prog_data['se_common']) if prog_data['se_progid']: EPG.sub_el(prog_out, 'episode-num', system='dd_progid', _text=prog_data['se_progid']) if prog_data['se_xmltv_ns']: EPG.sub_el(prog_out, 'episode-num', system='xmltv_ns', _text=prog_data['se_xmltv_ns']) if prog_data['is_new']: EPG.sub_el(prog_out, 'new') else: EPG.sub_el(prog_out, 'previously-shown') if prog_data['cc']: EPG.sub_el(prog_out, 'subtitles', type='teletext') if prog_data['premiere']: EPG.sub_el(prog_out, 'premiere') def gen_header_xml(self): if self.namespace is None: website = utils.CABERNET_URL name = utils.CABERNET_ID else: website = self.plugins.plugins[self.namespace].plugin_settings['website'] name = self.plugins.plugins[self.namespace].plugin_settings['name'] xml_out = ElementTree.Element('!DOCTYPE') xml_out = ElementTree.Element('tv') xml_out.set('source-info-url', website) xml_out.set('source-info-name', name) xml_out.set('generator-info-name', utils.CABERNET_ID) xml_out.set('generator-info-url', utils.CABERNET_URL) return xml_out @staticmethod def gen_minimal_header_xml(): return ElementTree.Element('tv') @staticmethod def sub_el(_parent, _name, _text=None, **kwargs): el = ElementTree.SubElement(_parent, _name, **kwargs) if _text: el.text = _text return el