cab / lib /schedule /scheduler.py
docs4you's picture
Upload 487 files
27867f1 verified
"""
MIT License
Copyright (C) 2023 ROCKY4546
https://github.com/rocky4546
This file is part of Cabernet
Permission is hereby granted, free of charge, to any person obtaining a copy of this software
and associated documentation files (the "Software"), to deal in the Software without restriction,
including without limitation the rights to use, copy, modify, merge, publish, distribute,
sublicense, and/or sell copies of the Software, and to permit persons to whom the Software
is furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all copies or
substantial portions of the Software.
"""
import importlib
import logging
import time
from multiprocessing import Process
from threading import Thread
import lib.schedule.schedule
import lib.common.exceptions as exceptions
from lib.common.decorators import getrequest
from lib.db.db_scheduler import DBScheduler
from lib.web.pages.templates import web_templates
@getrequest.route('/api/scheduler')
def get_scheduler(_webserver):
try:
if _webserver.query_data.get('action') == 'runtask':
_webserver.sched_queue.put({'cmd': 'runtask', 'taskid': _webserver.query_data.get('taskid')})
time.sleep(0.1)
_webserver.do_mime_response(200, 'text/html', 'action executed: ' + _webserver.query_data['action'])
return
else:
_webserver.do_mime_response(
501, 'text/html',
web_templates['htmlError'].format('501 - Unknown action'))
except KeyError:
_webserver.do_mime_response(
501, 'text/html',
web_templates['htmlError'].format('501 - Badly formed request'))
class Scheduler(Thread):
"""
Assumed to be a singleton
triggers are associated with a task in the database and define when a task runs
jobs are listed in the Schedule object and run as cron jobs. Triggers with
their associated tasks define jobs.
Calls are from the sched_queue to run, delete or add triggers/jobs.
Tasks are not managed by this class.
Only one trigger/job can run from within a task at any point in time.
"""
scheduler_obj = None
def __init__(self, _plugins, _queue):
Thread.__init__(self)
self.logger = logging.getLogger(__name__)
self.plugins = _plugins
self.queue = _queue
self.config_obj = _plugins.config_obj
self.scheduler_db = DBScheduler(self.config_obj.data)
self.scheduler_db.reset_activity()
self.schedule = lib.schedule.schedule
self.daemon = True
self.stop_thread = False
Scheduler.scheduler_obj = self
def _queue_thread():
while not self.stop_thread:
queue_item = self.queue.get(True)
self.process_queue(queue_item)
_q_thread = Thread(target=_queue_thread, args=())
_q_thread.start()
self.start()
def run(self):
"""
Thread run method for the class.
- Executes all startup tasks
- Sets up the Schedule/Job objects based on database
- Loops getting queue events and runs any pending triggers
"""
self.setup_triggers()
triggers = self.scheduler_db.get_triggers_by_type('startup')
for trigger in triggers:
self.exec_trigger(trigger)
while not self.stop_thread:
self.schedule.run_pending()
for i in range(30):
if self.stop_thread:
break
time.sleep(1)
def terminate(self):
self.stop_thread = True
self.queue.put({'cmd': 'noop'})
def exec_trigger(self, _trigger):
"""
Main entry for the Schedule Job to run a task/event
"""
if self.scheduler_db.get_active_status(_trigger['taskid']):
self.logger.debug('Task currently running, ignored request {}:{}'.format(
_trigger['area'], _trigger['title']))
return
self.scheduler_db.start_task(_trigger['area'], _trigger['title'])
if _trigger['threadtype'] == 'thread':
self.logger.notice('Running threaded task {}:{}'.format(
_trigger['area'], _trigger['title']))
t_event = Thread(target=self.call_trigger, args=(_trigger,))
t_event.start()
elif _trigger['threadtype'] == 'process':
self.logger.notice('Running process task {}:{}'.format(
_trigger['area'], _trigger['title']))
p_event = Process(target=self.call_trigger, args=(_trigger,))
p_event.start()
else:
self.logger.notice('Running inline task {}:{}'.format(
_trigger['area'], _trigger['title']))
self.call_trigger(_trigger)
def call_trigger(self, _trigger):
"""
Calls the trigger function and times the result
"""
start = time.time()
try:
if _trigger['namespace'] == 'internal':
mod_name, func_name = _trigger['funccall'].rsplit('.', 1)
mod = importlib.import_module(mod_name)
call_f = getattr(mod, func_name)
results = call_f(self.plugins)
else:
if _trigger['namespace'] not in self.plugins.plugins:
self.logger.debug(
'{} scheduled tasks ignored. plugin missing'
.format(_trigger['namespace']))
results = False
else:
plugin_obj = self.plugins.plugins[_trigger['namespace']].plugin_obj
if plugin_obj is None:
self.logger.debug(
'{} scheduled tasks ignored. plugin disabled'
.format(_trigger['namespace']))
results = False
elif _trigger['instance'] is None:
call_f = getattr(plugin_obj, _trigger['funccall'])
results = call_f()
elif plugin_obj.instances.get(_trigger['instance']):
call_f = getattr(plugin_obj.instances[_trigger['instance']],
_trigger['funccall'])
results = call_f()
else:
self.logger.debug(
'{}:{} scheduled tasks ignored. instance missing'
.format(_trigger['namespace'], _trigger['instance']))
results = False
except exceptions.CabernetException as ex:
self.logger.warning('{}'.format(str(ex)))
results = False
except Exception as ex:
self.logger.exception('{}{}'.format(
'UNEXPECTED EXCEPTION on GET=', ex))
results = False
if results is None:
results = True
end = time.time()
duration = int(end - start)
if results:
time.sleep(0.2)
self.scheduler_db.finish_task(_trigger['area'], _trigger['title'], duration)
else:
self.scheduler_db.reset_activity(False, _trigger['area'], _trigger['title'])
def setup_triggers(self):
"""
Assumes the trigger is already in the database and adds the job
to the Schedule object
"""
triggers = self.scheduler_db.get_triggers_by_type('daily')
for trigger_data in triggers:
self.add_job(trigger_data)
triggers = self.scheduler_db.get_triggers_by_type('weekly')
for trigger_data in triggers:
self.add_job(trigger_data)
triggers = self.scheduler_db.get_triggers_by_type('interval')
for trigger_data in triggers:
self.add_job(trigger_data)
def add_job(self, _trigger):
"""
Adds a job to the schedule object using the trigger dict from the database
"""
if _trigger['timetype'] == 'daily':
self.schedule.every().day.at(_trigger['timeofday']).do(
self.exec_trigger, _trigger) \
.tag(_trigger['uuid'])
elif _trigger['timetype'] == 'weekly':
getattr(self.schedule.every(), _trigger['dayofweek'].lower()) \
.at(_trigger['timeofday']).do(
self.exec_trigger, _trigger) \
.tag(_trigger['uuid'])
elif _trigger['timetype'] == 'interval':
if _trigger['randdur'] < 0:
self.schedule.every(_trigger['interval']).minutes.do(
self.exec_trigger, _trigger) \
.tag(_trigger['uuid'])
else:
self.schedule.every(_trigger['interval']) \
.to(_trigger['interval'] + _trigger['randdur']) \
.minutes.do(self.exec_trigger, _trigger) \
.tag(_trigger['uuid'])
elif _trigger['timetype'] == 'startup':
pass
else:
self.logger.warning('Bad trigger timetype called {}'.format(_trigger))
# Need to add UNTIL method to trigger when provided
# database has timelimit in minutes and by default is set to -1.
# until does not work that way. Use a second trigger to clear the first if it is still running.
# but only works when the randum generator is not used.
# also it won't work for inline triggers since a second trigger cannot run.
def process_queue(self, _queue_item):
"""
cmd: run_job, arg: uuid
cmd: del_job, arg: uuid
cmd: add_job, arg: trigger data without uuid
"""
try:
if _queue_item['cmd'] == 'run':
self.run_trigger(_queue_item['uuid'])
elif _queue_item['cmd'] == 'runtask':
self.run_task(_queue_item['taskid'])
elif _queue_item['cmd'] == 'deltask':
self.delete_task(_queue_item['taskid'])
elif _queue_item['cmd'] == 'delinstance':
self.delete_instance(_queue_item['name'], _queue_item['instance'])
elif _queue_item['cmd'] == 'del':
self.delete_trigger(_queue_item['uuid'])
elif _queue_item['cmd'] == 'add':
self.add_trigger(_queue_item['trigger'])
elif _queue_item['cmd'] == 'noop':
pass
else:
self.logger.warning('UNKNOWN Scheduler cmd from queue: {}'.format(_queue_item))
except KeyError as e:
self.logger.warning('Badly formed scheduled request {} {}'.format(_queue_item, repr(e)))
def delete_trigger(self, _uuid):
self.logger.debug('Deleting trigger {}'.format(_uuid))
jobs = self.schedule.get_jobs(_uuid)
for job in jobs:
self.schedule.cancel_job(job)
self.scheduler_db.del_trigger(_uuid)
def run_trigger(self, _uuid):
jobs = self.schedule.get_jobs(_uuid)
if len(jobs) == 0:
self.logger.info('Invalid trigger uuid job in schedule for run request {}'.format(_uuid))
else:
for job in jobs:
job.run()
def add_trigger(self, trigger):
if trigger['timetype'] == 'startup':
self.create_trigger(trigger['area'], trigger['title'],
trigger['timetype'])
elif trigger['timetype'] == 'daily':
self.create_trigger(trigger['area'], trigger['title'],
trigger['timetype'],
timeofday=trigger['timeofday']
)
elif trigger['timetype'] == 'daily':
self.create_trigger(trigger['area'], trigger['title'],
trigger['timetype'],
timeofday=trigger['timeofday']
)
elif trigger['timetype'] == 'weekly':
self.create_trigger(trigger['area'], trigger['title'],
trigger['timetype'],
timeofday=trigger['timeofday'],
dayofweek=trigger['dayofweek']
)
elif trigger['timetype'] == 'interval':
self.create_trigger(trigger['area'], trigger['title'],
trigger['timetype'],
interval=trigger['interval'],
randdur=trigger['randdur']
)
def create_trigger(self, _area, _title, _timetype, timeofday=None,
dayofweek=None, interval=-1, timelimit=-1, randdur=-1):
self.logger.notice('Creating trigger {}:{}:{}'.format(_area, _title, _timetype))
uuid = self.scheduler_db.save_trigger(_area, _title, _timetype, timeofday,
dayofweek, interval, timelimit, randdur)
trigger = self.scheduler_db.get_trigger(uuid)
self.add_job(trigger)
def delete_instance(self, _name, _instance):
tasks = self.scheduler_db.get_tasks_by_name(_name, _instance)
for task in tasks:
self.logger.warning('deleting task {}'.format(task['taskid']))
self.delete_task(task['taskid'])
def delete_task(self, _taskid):
task = self.scheduler_db.get_task(_taskid)
if task is None:
self.logger.notice('Task to delete missing: {}'.format(_taskid))
return
triggers = self.scheduler_db.get_triggers(_taskid)
for trigger in triggers:
self.delete_trigger(trigger['uuid'])
self.logger.debug('Deleting schedule task: {}'.format(_taskid))
self.scheduler_db.del_task(task['area'], task['title'])
def run_task(self, _taskid):
triggers = self.scheduler_db.get_triggers(_taskid)
if len(triggers) == 0:
# check if the task has no triggers
task = self.scheduler_db.get_task(_taskid)
if task is not None:
self.exec_trigger(task)
else:
self.logger.warning('Invalid taskid when requesting to run task')
return
is_run = False
default_trigger = None
trigger = None
for trigger in triggers:
if trigger['timetype'] == 'startup':
continue
elif trigger['timetype'] == 'interval':
self.queue.put({'cmd': 'run', 'uuid': trigger['uuid']})
is_run = True
break
else:
default_trigger = trigger
if not is_run:
if default_trigger is not None:
self.queue.put({'cmd': 'run', 'uuid': trigger['uuid']})
else:
task = self.scheduler_db.get_task(_taskid)
if task is not None:
self.exec_trigger(task)
else:
self.logger.warning('Invalid taskid when requesting to run task')