|
""" |
|
MIT License |
|
|
|
Copyright (C) 2023 ROCKY4546 |
|
https://github.com/rocky4546 |
|
|
|
This file is part of Cabernet |
|
|
|
Permission is hereby granted, free of charge, to any person obtaining a copy of this software |
|
and associated documentation files (the "Software"), to deal in the Software without restriction, |
|
including without limitation the rights to use, copy, modify, merge, publish, distribute, |
|
sublicense, and/or sell copies of the Software, and to permit persons to whom the Software |
|
is furnished to do so, subject to the following conditions: |
|
|
|
The above copyright notice and this permission notice shall be included in all copies or |
|
substantial portions of the Software. |
|
""" |
|
|
|
import logging |
|
import threading |
|
import time |
|
from queue import Empty |
|
|
|
from multiprocessing import Queue, Process |
|
from threading import Thread |
|
|
|
|
|
class ThreadQueue(Thread): |
|
""" |
|
Takes a queue containing thread ids and pushes them |
|
into other queues associated with those threads |
|
Assumes queue item is a dict containing a name/value of "thread_id" |
|
'terminate' can be sent via name 'uri' to terminate a specific thread id |
|
""" |
|
|
|
|
|
def __init__(self, _queue, _config): |
|
Thread.__init__(self) |
|
self.logger = logging.getLogger(__name__ + str(threading.get_ident())) |
|
|
|
self.queue = _queue |
|
|
|
self.queue_list = {} |
|
self.config = _config |
|
self.terminate_requested = False |
|
|
|
self._remote_proc = None |
|
|
|
self._status_queue = None |
|
self.start() |
|
|
|
def __str__(self): |
|
""" |
|
Used to display the number of queues in the outgoing queue list |
|
""" |
|
return str(len(self.queue_list)) |
|
|
|
def run(self): |
|
thread_id = None |
|
try: |
|
while not self.terminate_requested: |
|
queue_item = self.queue.get() |
|
thread_id = queue_item.get('thread_id') |
|
if not thread_id: |
|
self.logger.warning('Badly formatted queue. thread_id required and missing thread_id:{} uri:{}' |
|
.format(queue_item.get('thread_id'), queue_item.get('uri'))) |
|
continue |
|
if not queue_item.get('uri'): |
|
self.logger.warning('Badly formatted queue. uri required and missing thread_id:{} uri:{}' |
|
.format(queue_item.get('thread_id'), queue_item.get('uri'))) |
|
continue |
|
if queue_item.get('uri') == 'terminate': |
|
time.sleep(self.config['stream']['switch_channel_timeout']) |
|
self.del_thread(thread_id, True) |
|
out_queue = self.queue_list.get(thread_id) |
|
if out_queue: |
|
|
|
|
|
|
|
|
|
if out_queue.qsize() > 10: |
|
s = out_queue.qsize()/2 |
|
else: |
|
s = 0.0 |
|
out_queue.put(queue_item) |
|
self.sleep(s) |
|
|
|
except (KeyboardInterrupt, EOFError) as ex: |
|
self.terminate_requested = True |
|
self.clear_queues() |
|
self.logger.exception('{}{}'.format( |
|
'UNEXPECTED EXCEPTION ThreadQueue=', ex)) |
|
except Exception as ex: |
|
|
|
self.logger.exception('{}'.format( |
|
'UNEXPECTED EXCEPTION ThreadQueue')) |
|
for qdict in self.queue_list.items(): |
|
qdict[1].put({'thread_id': qdict[0], 'uri': 'terminate'}) |
|
self.terminate_requested = True |
|
self.clear_queues() |
|
time.sleep(0.01) |
|
|
|
self.clear_queues() |
|
self.terminate_requested = True |
|
self.logger.debug('ThreadQueue terminated') |
|
|
|
def clear_queues(self): |
|
self.clear_q(self.queue) |
|
|
|
def clear_q(self, _q): |
|
try: |
|
while True: |
|
item = _q.get_nowait() |
|
except (Empty, ValueError, EOFError, OSError) as ex: |
|
pass |
|
|
|
def add_thread(self, _thread_id, _queue): |
|
""" |
|
Adds the thread id to the list of queues this class is sending data |
|
""" |
|
out_queue = self.queue_list.get(_thread_id) |
|
self.queue_list[_thread_id] = _queue |
|
if not out_queue: |
|
self.logger.debug('Adding thread id queue to thread queue: {}'.format(_thread_id)) |
|
|
|
def del_thread(self, _thread_id, _is_inrun=False): |
|
""" |
|
Removes the thread id from the list of queues this class is sending data to |
|
if queue list is empty, then will also set the terminate to True |
|
and return True |
|
_is_inrun is set to true when the call comes from the thread run method, |
|
so wait for terminate is not required since it already is not waiting for get queue processing |
|
""" |
|
out_queue = self.queue_list.get(_thread_id) |
|
if out_queue: |
|
del self.queue_list[_thread_id] |
|
self.logger.debug('Removing thread id queue from thread queue: {}'.format(_thread_id)) |
|
if not len(self.queue_list): |
|
|
|
|
|
time.sleep(1.0) |
|
if not len(self.queue_list): |
|
self.logger.debug('Terminating thread queue') |
|
self.terminate_requested = True |
|
time.sleep(0.01) |
|
self.clear_queues() |
|
if _is_inrun: |
|
return True |
|
else: |
|
self.queue.put({'thread_id': _thread_id, 'uri': 'terminate'}) |
|
time.sleep(0.01) |
|
self.wait_for_termination() |
|
return True |
|
else: |
|
return False |
|
else: |
|
return True |
|
|
|
def wait_for_termination(self): |
|
count = 50 |
|
while self.is_alive() and count > 0: |
|
time.sleep(0.1) |
|
count -= 1 |
|
self.clear_queues() |
|
|
|
def sleep(self, _time): |
|
""" |
|
Creates a sleep function that will exit quickly if the termination flag is set |
|
""" |
|
start_ttw = time.time() |
|
for i in range(round(_time * 5)): |
|
if not self.terminate_requested: |
|
time.sleep(_time * 0.2) |
|
else: |
|
break |
|
delta_ttw = time.time() - start_ttw |
|
if delta_ttw > _time: |
|
break |
|
|
|
@property |
|
def remote_proc(self): |
|
""" |
|
process using the status_queue and sending to the incoming queue |
|
""" |
|
return self._remote_proc |
|
|
|
@remote_proc.setter |
|
def remote_proc(self, _proc): |
|
self._remote_proc = _proc |
|
|
|
@property |
|
def status_queue(self): |
|
""" |
|
queue used by the remote process as its incoming queue |
|
""" |
|
return self._status_queue |
|
|
|
@status_queue.setter |
|
def status_queue(self, _queue): |
|
self._status_queue = _queue |
|
|