File size: 7,753 Bytes
a005c19 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 |
import threading
import time
from typing import Optional, Callable
import logging
class SheetMonitor:
def __init__(self, sheet_manager, check_interval: float = 1.0):
"""
Initialize SheetMonitor with a sheet manager instance.
"""
self.sheet_manager = sheet_manager
self.check_interval = check_interval
# Threading control
self.monitor_thread = None
self.is_running = threading.Event()
self.pause_monitoring = threading.Event()
self.monitor_paused = threading.Event()
# Queue status
self.has_data = threading.Event()
# Logging setup
logging.basicConfig(level=logging.INFO)
self.logger = logging.getLogger(__name__)
def start_monitoring(self):
"""Start the monitoring thread."""
if self.monitor_thread is not None and self.monitor_thread.is_alive():
self.logger.warning("Monitoring thread is already running")
return
self.is_running.set()
self.pause_monitoring.clear()
self.monitor_thread = threading.Thread(target=self._monitor_loop)
self.monitor_thread.daemon = True
self.monitor_thread.start()
self.logger.info("Started monitoring thread")
def stop_monitoring(self):
"""Stop the monitoring thread."""
self.is_running.clear()
if self.monitor_thread:
self.monitor_thread.join()
self.logger.info("Stopped monitoring thread")
def pause(self):
"""Pause the monitoring."""
self.pause_monitoring.set()
self.monitor_paused.wait()
self.logger.info("Monitoring paused")
def resume(self):
"""Resume the monitoring."""
self.pause_monitoring.clear()
self.monitor_paused.clear()
# ์ฆ์ ์ฒดํฌ ์ํ
self.logger.info("Monitoring resumed, checking for new data...")
values = self.sheet_manager.get_all_values()
if values:
self.has_data.set()
self.logger.info(f"Found data after resume: {values}")
def _monitor_loop(self):
"""Main monitoring loop that checks for data in sheet."""
while self.is_running.is_set():
if self.pause_monitoring.is_set():
self.monitor_paused.set()
self.pause_monitoring.wait()
self.monitor_paused.clear()
# continue
try:
# Check if there's any data in the sheet
values = self.sheet_manager.get_all_values()
self.logger.info(f"Monitoring: Current column={self.sheet_manager.column_name}, "
f"Values found={len(values)}, "
f"Has data={self.has_data.is_set()}")
if values: # If there's any non-empty value
self.has_data.set()
self.logger.info(f"Data detected: {values}")
else:
self.has_data.clear()
self.logger.info("No data in sheet, waiting...")
time.sleep(self.check_interval)
except Exception as e:
self.logger.error(f"Error in monitoring loop: {str(e)}")
time.sleep(self.check_interval)
class MainLoop:
def __init__(self, sheet_manager, sheet_monitor, callback_function: Callable = None):
"""
Initialize MainLoop with sheet manager and monitor instances.
"""
self.sheet_manager = sheet_manager
self.monitor = sheet_monitor
self.callback = callback_function
self.is_running = threading.Event()
self.logger = logging.getLogger(__name__)
def start(self):
"""Start the main processing loop."""
self.is_running.set()
self.monitor.start_monitoring()
self._main_loop()
def stop(self):
"""Stop the main processing loop."""
self.is_running.clear()
self.monitor.stop_monitoring()
def process_new_value(self):
"""Process values by calling pop function for multiple columns and custom callback."""
try:
# Store original column
original_column = self.sheet_manager.column_name
# Pop from huggingface_id column
model_id = self.sheet_manager.pop()
if model_id:
# Pop from benchmark_name column
self.sheet_manager.change_column("benchmark_name")
benchmark_name = self.sheet_manager.pop()
# Pop from prompt_cfg_name column
self.sheet_manager.change_column("prompt_cfg_name")
prompt_cfg_name = self.sheet_manager.pop()
# Return to original column
self.sheet_manager.change_column(original_column)
self.logger.info(f"Processed values - model_id: {model_id}, "
f"benchmark_name: {benchmark_name}, "
f"prompt_cfg_name: {prompt_cfg_name}")
if self.callback:
# Pass all three values to callback
self.callback(model_id, benchmark_name, prompt_cfg_name)
return model_id, benchmark_name, prompt_cfg_name
except Exception as e:
self.logger.error(f"Error processing values: {str(e)}")
# Return to original column in case of error
try:
self.sheet_manager.change_column(original_column)
except:
pass
return None
def _main_loop(self):
"""Main processing loop."""
while self.is_running.is_set():
# Wait for data to be available
if self.monitor.has_data.wait(timeout=1.0):
# Pause monitoring
self.monitor.pause()
# Process the value
self.process_new_value()
# Check if there's still data in the sheet
values = self.sheet_manager.get_all_values()
self.logger.info(f"After processing: Current column={self.sheet_manager.column_name}, "
f"Values remaining={len(values)}")
if not values:
self.monitor.has_data.clear()
self.logger.info("All data processed, clearing has_data flag")
else:
self.logger.info(f"Remaining data: {values}")
# Resume monitoring
self.monitor.resume()
## TODO
# API ๋ถ๋น ํธ์ถ ๋ฌธ์ ๋ก ๋ง์ฝ์ ์ฐธ์กฐํ๋ค๊ฐ ์คํจํ ๊ฒฝ์ฐ ๋๊ธฐํ๋ค๊ฐ ๋ค์ ์๋ํ๊ฒ๋ ์ค๊ณ
# Example usage
if __name__ == "__main__":
import sys
from pathlib import Path
sys.path.append(str(Path(__file__).parent.parent.parent))
from sheet_manager.sheet_crud.sheet_crud import SheetManager
from pia_bench.pipe_line.piepline import PiaBenchMark
def my_custom_function(huggingface_id, benchmark_name, prompt_cfg_name):
piabenchmark = PiaBenchMark(huggingface_id, benchmark_name, prompt_cfg_name)
piabenchmark.bench_start()
# Initialize components
sheet_manager = SheetManager()
monitor = SheetMonitor(sheet_manager, check_interval=10.0)
main_loop = MainLoop(sheet_manager, monitor, callback_function=my_custom_function)
try:
main_loop.start()
while True:
time.sleep(5)
except KeyboardInterrupt:
main_loop.stop() |