""" MIT License Copyright (C) 2023 ROCKY4546 https://github.com/rocky4546 This file is part of Cabernet Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. """ import datetime import logging import time from lib.common.decorators import getrequest from lib.common.decorators import postrequest from lib.db.db_scheduler import DBScheduler @getrequest.route('/api/schedulehtml') def get_schedule_html(_webserver): schedule_html = ScheduleHTML(_webserver.config, _webserver.sched_queue) if 'run' in _webserver.query_data: schedule_html.run_task(_webserver.query_data['task']) time.sleep(0.05) html = schedule_html.get(_webserver.query_data) elif 'deltask' in _webserver.query_data: schedule_html.del_task(_webserver.query_data['task']) time.sleep(0.05) html = schedule_html.get(_webserver.query_data) elif 'delete' in _webserver.query_data: schedule_html.del_trigger(_webserver.query_data['trigger']) time.sleep(0.05) html = schedule_html.get_task(_webserver.query_data['task']) elif 'trigger' in _webserver.query_data: html = schedule_html.get_trigger(_webserver.query_data['task']) elif 'task' in _webserver.query_data: html = schedule_html.get_task(_webserver.query_data['task']) else: html = schedule_html.get(_webserver.query_data) _webserver.do_mime_response(200, 'text/html', html) @postrequest.route('/api/schedulehtml') def post_schedule_html(_webserver): schedule_html = ScheduleHTML(_webserver.config, _webserver.sched_queue) html = schedule_html.post_add_trigger(_webserver.query_data) _webserver.do_mime_response(200, 'text/html', html) class ScheduleHTML: def __init__(self, _config, _queue): self.logger = logging.getLogger(__name__) self.config = _config self.queue = _queue self.query_data = None self.scheduler_db = DBScheduler(self.config) def get(self, _query_data): self.query_data = _query_data return ''.join([self.header, self.body]) @property def header(self): return ''.join([ '', '', '', 'Scheduled Tasks', '', '', '', '' ]) @property def body(self): return ''.join(['', self.title, self.schedule_tasks, self.task, '']) @property def title(self): return ''.join([ '
', '

Scheduled Tasks

' ]) @property def schedule_tasks(self): tasks = self.scheduler_db.get_tasks() current_area = None html = ''.join([ '
', '' ]) i = 0 for task_dict in tasks: i += 1 if task_dict['area'] != current_area: if i > 1: html = ''.join([html, '' ]) current_area = task_dict['area'] if current_area in self.query_data: checked = "checked" else: checked = "" html = ''.join([ html, '
', '
', '', '', '
', '
' ]) html = ''.join([html, '
' ]) return html @property def task(self): return ''.join([ '
' ]) def get_task(self, _id): task_dict = self.scheduler_db.get_task(_id) if task_dict is None: self.logger.warning('get_task: Invalid task id: {}'.format(_id)) return '' html = ''.join([ '', '', '', '', '', '', '', '' '', '', '', '', '', '', '', ]) trigger_array = self.scheduler_db.get_triggers(_id) for trigger_dict in trigger_array: if trigger_dict['timetype'] == 'startup': trigger_str = 'At startup' elif trigger_dict['timetype'] == 'daily': trigger_str = 'Daily at ' + trigger_dict['timeofday'] elif trigger_dict['timetype'] == 'weekly': trigger_str = ''.join([ 'Every ', trigger_dict['dayofweek'], ' at ', trigger_dict['timeofday'] ]) elif trigger_dict['timetype'] == 'interval': interval_mins = trigger_dict['interval'] remainder_hrs = interval_mins % 60 if remainder_hrs != 0: interval_str = str(interval_mins) + ' minutes' else: interval_hrs = interval_mins // 60 interval_str = str(interval_hrs) + ' hours' trigger_str = 'Every ' + interval_str if trigger_dict['randdur'] != -1: trigger_str += ' with random maximum added time of ' + str(trigger_dict['randdur']) + ' minutes' else: trigger_str = 'UNKNOWN' html = ''.join([ html, '', '', '', '', '' ]) return ''.join([ html, '
', '', '
arrow_back
', str(task_dict['title']), '
', str(task_dict['description']), '
Namespace: ', str(task_dict['namespace']), '   Instance: ', str(task_dict['instance']), '   Priority: ', str(task_dict['priority']), '   Thread Type: ', str(task_dict['threadtype']), '
 
Task Triggers', '', '
', 'schedule', trigger_str, '', '', 'delete_forever
' ]) def get_trigger(self, _id): task_dict = self.scheduler_db.get_task(_id) if task_dict is None: self.logger.warning('get_trigger: Invalid task id: {}'.format(_id)) return '' if task_dict['namespace'] is None: namespace = "" else: namespace = task_dict['namespace'] if task_dict['instance'] is None: instance = "" else: instance = task_dict['instance'] return "".join([ '', '
', '', '', '', '', '', '' '', '', '' '', '', '', '', '', '', '', '
', '', '
arrow_back
', '
', 'Add Trigger
Task: ', task_dict['title'], '', '

', '

', '', '
Day:   ', '', '

', '
', ' : ', '', '

', '
Every:   ', '

', '
Max Random Added Time:   ', '

', '
', '   ', '
 
', '
' ]) def post_add_trigger(self, query_data): if query_data['timetype'][0] == 'startup': self.queue.put({'cmd': 'add', 'trigger': { 'area': query_data['area'][0], 'title': query_data['title'][0], 'timetype': query_data['timetype'][0] }}) time.sleep(0.05) return 'Startup Trigger added' elif query_data['timetype'][0] == 'daily': if query_data['timeofdayhr'][0] is None or query_data['timeofdaymin'][0] is None: return 'Time of Day is not set and is required' self.queue.put({'cmd': 'add', 'trigger': { 'area': query_data['area'][0], 'title': query_data['title'][0], 'timetype': query_data['timetype'][0], 'timeofday': query_data['timeofdayhr'][0] + ':' + query_data['timeofdaymin'][0] }}) time.sleep(0.05) return 'Daily Trigger added' elif query_data['timetype'][0] == 'weekly': if query_data['dayofweek'][0] is None: return 'Day of Week is not set and is required' if query_data['timeofdayhr'][0] is None or query_data['timeofdaymin'][0] is None: return 'Time of Day is not set and is required' self.queue.put({'cmd': 'add', 'trigger': { 'area': query_data['area'][0], 'title': query_data['title'][0], 'timetype': query_data['timetype'][0], 'timeofday': query_data['timeofdayhr'][0] + ':' + query_data['timeofdaymin'][0], 'dayofweek': query_data['dayofweek'][0] }}) time.sleep(0.05) return 'Weekly Trigger added' elif query_data['timetype'][0] == 'interval': if query_data['interval'][0] is None: return 'Interval is not set and is required' self.queue.put({'cmd': 'add', 'trigger': { 'area': query_data['area'][0], 'title': query_data['title'][0], 'timetype': query_data['timetype'][0], 'interval': query_data['interval'][0], 'randdur': query_data['randdur'][0] }}) time.sleep(0.05) return 'Interval Trigger added' return 'UNKNOWN' def del_trigger(self, _uuid): if self.scheduler_db.get_trigger(_uuid) is None: return None self.queue.put({'cmd': 'del', 'uuid': _uuid}) time.sleep(0.05) return 'Interval Trigger deleted' def run_task(self, _taskid): self.queue.put({'cmd': 'runtask', 'taskid': _taskid}) return None def del_task(self, _taskid): self.queue.put({'cmd': 'deltask', 'taskid': _taskid}) return None