|
""" |
|
MIT License |
|
|
|
Copyright (C) 2023 ROCKY4546 |
|
https://github.com/rocky4546 |
|
|
|
This file is part of Cabernet |
|
|
|
Permission is hereby granted, free of charge, to any person obtaining a copy of this software |
|
and associated documentation files (the "Software"), to deal in the Software without restriction, |
|
including without limitation the rights to use, copy, modify, merge, publish, distribute, |
|
sublicense, and/or sell copies of the Software, and to permit persons to whom the Software |
|
is furnished to do so, subject to the following conditions: |
|
|
|
The above copyright notice and this permission notice shall be included in all copies or |
|
substantial portions of the Software. |
|
""" |
|
|
|
import importlib |
|
import logging |
|
import time |
|
from multiprocessing import Process |
|
from threading import Thread |
|
|
|
import lib.schedule.schedule |
|
import lib.common.exceptions as exceptions |
|
from lib.common.decorators import getrequest |
|
from lib.db.db_scheduler import DBScheduler |
|
from lib.web.pages.templates import web_templates |
|
|
|
|
|
@getrequest.route('/api/scheduler') |
|
def get_scheduler(_webserver): |
|
try: |
|
if _webserver.query_data.get('action') == 'runtask': |
|
_webserver.sched_queue.put({'cmd': 'runtask', 'taskid': _webserver.query_data.get('taskid')}) |
|
time.sleep(0.1) |
|
_webserver.do_mime_response(200, 'text/html', 'action executed: ' + _webserver.query_data['action']) |
|
return |
|
else: |
|
_webserver.do_mime_response( |
|
501, 'text/html', |
|
web_templates['htmlError'].format('501 - Unknown action')) |
|
except KeyError: |
|
_webserver.do_mime_response( |
|
501, 'text/html', |
|
web_templates['htmlError'].format('501 - Badly formed request')) |
|
|
|
|
|
class Scheduler(Thread): |
|
""" |
|
Assumed to be a singleton |
|
triggers are associated with a task in the database and define when a task runs |
|
jobs are listed in the Schedule object and run as cron jobs. Triggers with |
|
their associated tasks define jobs. |
|
Calls are from the sched_queue to run, delete or add triggers/jobs. |
|
Tasks are not managed by this class. |
|
Only one trigger/job can run from within a task at any point in time. |
|
""" |
|
scheduler_obj = None |
|
|
|
def __init__(self, _plugins, _queue): |
|
Thread.__init__(self) |
|
self.logger = logging.getLogger(__name__) |
|
self.plugins = _plugins |
|
self.queue = _queue |
|
self.config_obj = _plugins.config_obj |
|
self.scheduler_db = DBScheduler(self.config_obj.data) |
|
self.scheduler_db.reset_activity() |
|
self.schedule = lib.schedule.schedule |
|
self.daemon = True |
|
self.stop_thread = False |
|
Scheduler.scheduler_obj = self |
|
|
|
def _queue_thread(): |
|
while not self.stop_thread: |
|
queue_item = self.queue.get(True) |
|
self.process_queue(queue_item) |
|
|
|
_q_thread = Thread(target=_queue_thread, args=()) |
|
_q_thread.start() |
|
self.start() |
|
|
|
def run(self): |
|
""" |
|
Thread run method for the class. |
|
- Executes all startup tasks |
|
- Sets up the Schedule/Job objects based on database |
|
- Loops getting queue events and runs any pending triggers |
|
""" |
|
self.setup_triggers() |
|
triggers = self.scheduler_db.get_triggers_by_type('startup') |
|
for trigger in triggers: |
|
self.exec_trigger(trigger) |
|
while not self.stop_thread: |
|
self.schedule.run_pending() |
|
for i in range(30): |
|
if self.stop_thread: |
|
break |
|
time.sleep(1) |
|
|
|
def terminate(self): |
|
self.stop_thread = True |
|
self.queue.put({'cmd': 'noop'}) |
|
|
|
def exec_trigger(self, _trigger): |
|
""" |
|
Main entry for the Schedule Job to run a task/event |
|
""" |
|
if self.scheduler_db.get_active_status(_trigger['taskid']): |
|
self.logger.debug('Task currently running, ignored request {}:{}'.format( |
|
_trigger['area'], _trigger['title'])) |
|
return |
|
|
|
self.scheduler_db.start_task(_trigger['area'], _trigger['title']) |
|
if _trigger['threadtype'] == 'thread': |
|
self.logger.notice('Running threaded task {}:{}'.format( |
|
_trigger['area'], _trigger['title'])) |
|
t_event = Thread(target=self.call_trigger, args=(_trigger,)) |
|
t_event.start() |
|
elif _trigger['threadtype'] == 'process': |
|
self.logger.notice('Running process task {}:{}'.format( |
|
_trigger['area'], _trigger['title'])) |
|
p_event = Process(target=self.call_trigger, args=(_trigger,)) |
|
p_event.start() |
|
else: |
|
self.logger.notice('Running inline task {}:{}'.format( |
|
_trigger['area'], _trigger['title'])) |
|
self.call_trigger(_trigger) |
|
|
|
def call_trigger(self, _trigger): |
|
""" |
|
Calls the trigger function and times the result |
|
""" |
|
start = time.time() |
|
try: |
|
if _trigger['namespace'] == 'internal': |
|
mod_name, func_name = _trigger['funccall'].rsplit('.', 1) |
|
mod = importlib.import_module(mod_name) |
|
call_f = getattr(mod, func_name) |
|
results = call_f(self.plugins) |
|
else: |
|
if _trigger['namespace'] not in self.plugins.plugins: |
|
self.logger.debug( |
|
'{} scheduled tasks ignored. plugin missing' |
|
.format(_trigger['namespace'])) |
|
results = False |
|
else: |
|
plugin_obj = self.plugins.plugins[_trigger['namespace']].plugin_obj |
|
if plugin_obj is None: |
|
self.logger.debug( |
|
'{} scheduled tasks ignored. plugin disabled' |
|
.format(_trigger['namespace'])) |
|
results = False |
|
elif _trigger['instance'] is None: |
|
call_f = getattr(plugin_obj, _trigger['funccall']) |
|
results = call_f() |
|
elif plugin_obj.instances.get(_trigger['instance']): |
|
call_f = getattr(plugin_obj.instances[_trigger['instance']], |
|
_trigger['funccall']) |
|
results = call_f() |
|
else: |
|
self.logger.debug( |
|
'{}:{} scheduled tasks ignored. instance missing' |
|
.format(_trigger['namespace'], _trigger['instance'])) |
|
results = False |
|
|
|
except exceptions.CabernetException as ex: |
|
self.logger.warning('{}'.format(str(ex))) |
|
results = False |
|
except Exception as ex: |
|
self.logger.exception('{}{}'.format( |
|
'UNEXPECTED EXCEPTION on GET=', ex)) |
|
results = False |
|
if results is None: |
|
results = True |
|
end = time.time() |
|
duration = int(end - start) |
|
if results: |
|
time.sleep(0.2) |
|
self.scheduler_db.finish_task(_trigger['area'], _trigger['title'], duration) |
|
else: |
|
self.scheduler_db.reset_activity(False, _trigger['area'], _trigger['title']) |
|
|
|
def setup_triggers(self): |
|
""" |
|
Assumes the trigger is already in the database and adds the job |
|
to the Schedule object |
|
""" |
|
triggers = self.scheduler_db.get_triggers_by_type('daily') |
|
for trigger_data in triggers: |
|
self.add_job(trigger_data) |
|
|
|
triggers = self.scheduler_db.get_triggers_by_type('weekly') |
|
for trigger_data in triggers: |
|
self.add_job(trigger_data) |
|
|
|
triggers = self.scheduler_db.get_triggers_by_type('interval') |
|
for trigger_data in triggers: |
|
self.add_job(trigger_data) |
|
|
|
def add_job(self, _trigger): |
|
""" |
|
Adds a job to the schedule object using the trigger dict from the database |
|
""" |
|
if _trigger['timetype'] == 'daily': |
|
self.schedule.every().day.at(_trigger['timeofday']).do( |
|
self.exec_trigger, _trigger) \ |
|
.tag(_trigger['uuid']) |
|
elif _trigger['timetype'] == 'weekly': |
|
getattr(self.schedule.every(), _trigger['dayofweek'].lower()) \ |
|
.at(_trigger['timeofday']).do( |
|
self.exec_trigger, _trigger) \ |
|
.tag(_trigger['uuid']) |
|
elif _trigger['timetype'] == 'interval': |
|
if _trigger['randdur'] < 0: |
|
self.schedule.every(_trigger['interval']).minutes.do( |
|
self.exec_trigger, _trigger) \ |
|
.tag(_trigger['uuid']) |
|
else: |
|
self.schedule.every(_trigger['interval']) \ |
|
.to(_trigger['interval'] + _trigger['randdur']) \ |
|
.minutes.do(self.exec_trigger, _trigger) \ |
|
.tag(_trigger['uuid']) |
|
elif _trigger['timetype'] == 'startup': |
|
pass |
|
else: |
|
self.logger.warning('Bad trigger timetype called {}'.format(_trigger)) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def process_queue(self, _queue_item): |
|
""" |
|
cmd: run_job, arg: uuid |
|
cmd: del_job, arg: uuid |
|
cmd: add_job, arg: trigger data without uuid |
|
""" |
|
try: |
|
if _queue_item['cmd'] == 'run': |
|
self.run_trigger(_queue_item['uuid']) |
|
elif _queue_item['cmd'] == 'runtask': |
|
self.run_task(_queue_item['taskid']) |
|
elif _queue_item['cmd'] == 'deltask': |
|
self.delete_task(_queue_item['taskid']) |
|
elif _queue_item['cmd'] == 'delinstance': |
|
self.delete_instance(_queue_item['name'], _queue_item['instance']) |
|
elif _queue_item['cmd'] == 'del': |
|
self.delete_trigger(_queue_item['uuid']) |
|
elif _queue_item['cmd'] == 'add': |
|
self.add_trigger(_queue_item['trigger']) |
|
elif _queue_item['cmd'] == 'noop': |
|
pass |
|
else: |
|
self.logger.warning('UNKNOWN Scheduler cmd from queue: {}'.format(_queue_item)) |
|
except KeyError as e: |
|
self.logger.warning('Badly formed scheduled request {} {}'.format(_queue_item, repr(e))) |
|
|
|
def delete_trigger(self, _uuid): |
|
self.logger.debug('Deleting trigger {}'.format(_uuid)) |
|
jobs = self.schedule.get_jobs(_uuid) |
|
for job in jobs: |
|
self.schedule.cancel_job(job) |
|
self.scheduler_db.del_trigger(_uuid) |
|
|
|
def run_trigger(self, _uuid): |
|
jobs = self.schedule.get_jobs(_uuid) |
|
if len(jobs) == 0: |
|
self.logger.info('Invalid trigger uuid job in schedule for run request {}'.format(_uuid)) |
|
else: |
|
for job in jobs: |
|
job.run() |
|
|
|
def add_trigger(self, trigger): |
|
if trigger['timetype'] == 'startup': |
|
self.create_trigger(trigger['area'], trigger['title'], |
|
trigger['timetype']) |
|
elif trigger['timetype'] == 'daily': |
|
self.create_trigger(trigger['area'], trigger['title'], |
|
trigger['timetype'], |
|
timeofday=trigger['timeofday'] |
|
) |
|
elif trigger['timetype'] == 'daily': |
|
self.create_trigger(trigger['area'], trigger['title'], |
|
trigger['timetype'], |
|
timeofday=trigger['timeofday'] |
|
) |
|
elif trigger['timetype'] == 'weekly': |
|
self.create_trigger(trigger['area'], trigger['title'], |
|
trigger['timetype'], |
|
timeofday=trigger['timeofday'], |
|
dayofweek=trigger['dayofweek'] |
|
) |
|
elif trigger['timetype'] == 'interval': |
|
self.create_trigger(trigger['area'], trigger['title'], |
|
trigger['timetype'], |
|
interval=trigger['interval'], |
|
randdur=trigger['randdur'] |
|
) |
|
|
|
def create_trigger(self, _area, _title, _timetype, timeofday=None, |
|
dayofweek=None, interval=-1, timelimit=-1, randdur=-1): |
|
self.logger.notice('Creating trigger {}:{}:{}'.format(_area, _title, _timetype)) |
|
uuid = self.scheduler_db.save_trigger(_area, _title, _timetype, timeofday, |
|
dayofweek, interval, timelimit, randdur) |
|
trigger = self.scheduler_db.get_trigger(uuid) |
|
self.add_job(trigger) |
|
|
|
def delete_instance(self, _name, _instance): |
|
tasks = self.scheduler_db.get_tasks_by_name(_name, _instance) |
|
for task in tasks: |
|
self.logger.warning('deleting task {}'.format(task['taskid'])) |
|
self.delete_task(task['taskid']) |
|
|
|
def delete_task(self, _taskid): |
|
task = self.scheduler_db.get_task(_taskid) |
|
if task is None: |
|
self.logger.notice('Task to delete missing: {}'.format(_taskid)) |
|
return |
|
|
|
triggers = self.scheduler_db.get_triggers(_taskid) |
|
for trigger in triggers: |
|
self.delete_trigger(trigger['uuid']) |
|
self.logger.debug('Deleting schedule task: {}'.format(_taskid)) |
|
self.scheduler_db.del_task(task['area'], task['title']) |
|
|
|
def run_task(self, _taskid): |
|
triggers = self.scheduler_db.get_triggers(_taskid) |
|
if len(triggers) == 0: |
|
|
|
task = self.scheduler_db.get_task(_taskid) |
|
if task is not None: |
|
self.exec_trigger(task) |
|
else: |
|
self.logger.warning('Invalid taskid when requesting to run task') |
|
return |
|
|
|
is_run = False |
|
default_trigger = None |
|
trigger = None |
|
for trigger in triggers: |
|
if trigger['timetype'] == 'startup': |
|
continue |
|
elif trigger['timetype'] == 'interval': |
|
self.queue.put({'cmd': 'run', 'uuid': trigger['uuid']}) |
|
is_run = True |
|
break |
|
else: |
|
default_trigger = trigger |
|
if not is_run: |
|
if default_trigger is not None: |
|
self.queue.put({'cmd': 'run', 'uuid': trigger['uuid']}) |
|
else: |
|
task = self.scheduler_db.get_task(_taskid) |
|
if task is not None: |
|
self.exec_trigger(task) |
|
else: |
|
self.logger.warning('Invalid taskid when requesting to run task') |
|
|