|
import threading |
|
import time |
|
from typing import Optional, Callable |
|
import logging |
|
|
|
class SheetMonitor: |
|
def __init__(self, sheet_manager, check_interval: float = 1.0): |
|
""" |
|
Initialize SheetMonitor with a sheet manager instance. |
|
""" |
|
self.sheet_manager = sheet_manager |
|
self.check_interval = check_interval |
|
|
|
|
|
self.monitor_thread = None |
|
self.is_running = threading.Event() |
|
self.pause_monitoring = threading.Event() |
|
self.monitor_paused = threading.Event() |
|
|
|
|
|
self.has_data = threading.Event() |
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
self.logger = logging.getLogger(__name__) |
|
|
|
def start_monitoring(self): |
|
"""Start the monitoring thread.""" |
|
if self.monitor_thread is not None and self.monitor_thread.is_alive(): |
|
self.logger.warning("Monitoring thread is already running") |
|
return |
|
|
|
self.is_running.set() |
|
self.pause_monitoring.clear() |
|
self.monitor_thread = threading.Thread(target=self._monitor_loop) |
|
self.monitor_thread.daemon = True |
|
self.monitor_thread.start() |
|
self.logger.info("Started monitoring thread") |
|
|
|
def stop_monitoring(self): |
|
"""Stop the monitoring thread.""" |
|
self.is_running.clear() |
|
if self.monitor_thread: |
|
self.monitor_thread.join() |
|
self.logger.info("Stopped monitoring thread") |
|
|
|
def pause(self): |
|
"""Pause the monitoring.""" |
|
self.pause_monitoring.set() |
|
self.monitor_paused.wait() |
|
self.logger.info("Monitoring paused") |
|
|
|
def resume(self): |
|
"""Resume the monitoring.""" |
|
self.pause_monitoring.clear() |
|
self.monitor_paused.clear() |
|
|
|
self.logger.info("Monitoring resumed, checking for new data...") |
|
values = self.sheet_manager.get_all_values() |
|
if values: |
|
self.has_data.set() |
|
self.logger.info(f"Found data after resume: {values}") |
|
|
|
|
|
def _monitor_loop(self): |
|
"""Main monitoring loop that checks for data in sheet.""" |
|
while self.is_running.is_set(): |
|
if self.pause_monitoring.is_set(): |
|
self.monitor_paused.set() |
|
self.pause_monitoring.wait() |
|
self.monitor_paused.clear() |
|
|
|
|
|
try: |
|
|
|
values = self.sheet_manager.get_all_values() |
|
self.logger.info(f"Monitoring: Current column={self.sheet_manager.column_name}, " |
|
f"Values found={len(values)}, " |
|
f"Has data={self.has_data.is_set()}") |
|
|
|
if values: |
|
self.has_data.set() |
|
self.logger.info(f"Data detected: {values}") |
|
else: |
|
self.has_data.clear() |
|
self.logger.info("No data in sheet, waiting...") |
|
|
|
time.sleep(self.check_interval) |
|
|
|
except Exception as e: |
|
self.logger.error(f"Error in monitoring loop: {str(e)}") |
|
time.sleep(self.check_interval) |
|
|
|
class MainLoop: |
|
def __init__(self, sheet_manager, sheet_monitor, callback_function: Callable = None): |
|
""" |
|
Initialize MainLoop with sheet manager and monitor instances. |
|
""" |
|
self.sheet_manager = sheet_manager |
|
self.monitor = sheet_monitor |
|
self.callback = callback_function |
|
self.is_running = threading.Event() |
|
self.logger = logging.getLogger(__name__) |
|
|
|
def start(self): |
|
"""Start the main processing loop.""" |
|
self.is_running.set() |
|
self.monitor.start_monitoring() |
|
self._main_loop() |
|
|
|
def stop(self): |
|
"""Stop the main processing loop.""" |
|
self.is_running.clear() |
|
self.monitor.stop_monitoring() |
|
|
|
def process_new_value(self): |
|
"""Process values by calling pop function for multiple columns and custom callback.""" |
|
try: |
|
|
|
original_column = self.sheet_manager.column_name |
|
|
|
|
|
model_id = self.sheet_manager.pop() |
|
|
|
if model_id: |
|
|
|
self.sheet_manager.change_column("benchmark_name") |
|
benchmark_name = self.sheet_manager.pop() |
|
|
|
|
|
self.sheet_manager.change_column("prompt_cfg_name") |
|
prompt_cfg_name = self.sheet_manager.pop() |
|
|
|
|
|
self.sheet_manager.change_column(original_column) |
|
|
|
self.logger.info(f"Processed values - model_id: {model_id}, " |
|
f"benchmark_name: {benchmark_name}, " |
|
f"prompt_cfg_name: {prompt_cfg_name}") |
|
|
|
if self.callback: |
|
|
|
self.callback(model_id, benchmark_name, prompt_cfg_name) |
|
|
|
return model_id, benchmark_name, prompt_cfg_name |
|
|
|
except Exception as e: |
|
self.logger.error(f"Error processing values: {str(e)}") |
|
|
|
try: |
|
self.sheet_manager.change_column(original_column) |
|
except: |
|
pass |
|
return None |
|
|
|
def _main_loop(self): |
|
"""Main processing loop.""" |
|
while self.is_running.is_set(): |
|
|
|
if self.monitor.has_data.wait(timeout=1.0): |
|
|
|
self.monitor.pause() |
|
|
|
|
|
self.process_new_value() |
|
|
|
|
|
values = self.sheet_manager.get_all_values() |
|
self.logger.info(f"After processing: Current column={self.sheet_manager.column_name}, " |
|
f"Values remaining={len(values)}") |
|
|
|
if not values: |
|
self.monitor.has_data.clear() |
|
self.logger.info("All data processed, clearing has_data flag") |
|
else: |
|
self.logger.info(f"Remaining data: {values}") |
|
|
|
|
|
self.monitor.resume() |
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
import sys |
|
from pathlib import Path |
|
sys.path.append(str(Path(__file__).parent.parent.parent)) |
|
from sheet_manager.sheet_crud.sheet_crud import SheetManager |
|
from pia_bench.pipe_line.piepline import PiaBenchMark |
|
def my_custom_function(huggingface_id, benchmark_name, prompt_cfg_name): |
|
piabenchmark = PiaBenchMark(huggingface_id, benchmark_name, prompt_cfg_name) |
|
piabenchmark.bench_start() |
|
|
|
|
|
sheet_manager = SheetManager() |
|
monitor = SheetMonitor(sheet_manager, check_interval=10.0) |
|
main_loop = MainLoop(sheet_manager, monitor, callback_function=my_custom_function) |
|
|
|
try: |
|
main_loop.start() |
|
while True: |
|
time.sleep(5) |
|
except KeyboardInterrupt: |
|
main_loop.stop() |