jisujang's picture
first
a005c19
import threading
import time
from typing import Optional, Callable
import logging
class SheetMonitor:
def __init__(self, sheet_manager, check_interval: float = 1.0):
"""
Initialize SheetMonitor with a sheet manager instance.
"""
self.sheet_manager = sheet_manager
self.check_interval = check_interval
# Threading control
self.monitor_thread = None
self.is_running = threading.Event()
self.pause_monitoring = threading.Event()
self.monitor_paused = threading.Event()
# Queue status
self.has_data = threading.Event()
# Logging setup
logging.basicConfig(level=logging.INFO)
self.logger = logging.getLogger(__name__)
def start_monitoring(self):
"""Start the monitoring thread."""
if self.monitor_thread is not None and self.monitor_thread.is_alive():
self.logger.warning("Monitoring thread is already running")
return
self.is_running.set()
self.pause_monitoring.clear()
self.monitor_thread = threading.Thread(target=self._monitor_loop)
self.monitor_thread.daemon = True
self.monitor_thread.start()
self.logger.info("Started monitoring thread")
def stop_monitoring(self):
"""Stop the monitoring thread."""
self.is_running.clear()
if self.monitor_thread:
self.monitor_thread.join()
self.logger.info("Stopped monitoring thread")
def pause(self):
"""Pause the monitoring."""
self.pause_monitoring.set()
self.monitor_paused.wait()
self.logger.info("Monitoring paused")
def resume(self):
"""Resume the monitoring."""
self.pause_monitoring.clear()
self.monitor_paused.clear()
# ์ฆ‰์‹œ ์ฒดํฌ ์ˆ˜ํ–‰
self.logger.info("Monitoring resumed, checking for new data...")
values = self.sheet_manager.get_all_values()
if values:
self.has_data.set()
self.logger.info(f"Found data after resume: {values}")
def _monitor_loop(self):
"""Main monitoring loop that checks for data in sheet."""
while self.is_running.is_set():
if self.pause_monitoring.is_set():
self.monitor_paused.set()
self.pause_monitoring.wait()
self.monitor_paused.clear()
# continue
try:
# Check if there's any data in the sheet
values = self.sheet_manager.get_all_values()
self.logger.info(f"Monitoring: Current column={self.sheet_manager.column_name}, "
f"Values found={len(values)}, "
f"Has data={self.has_data.is_set()}")
if values: # If there's any non-empty value
self.has_data.set()
self.logger.info(f"Data detected: {values}")
else:
self.has_data.clear()
self.logger.info("No data in sheet, waiting...")
time.sleep(self.check_interval)
except Exception as e:
self.logger.error(f"Error in monitoring loop: {str(e)}")
time.sleep(self.check_interval)
class MainLoop:
def __init__(self, sheet_manager, sheet_monitor, callback_function: Callable = None):
"""
Initialize MainLoop with sheet manager and monitor instances.
"""
self.sheet_manager = sheet_manager
self.monitor = sheet_monitor
self.callback = callback_function
self.is_running = threading.Event()
self.logger = logging.getLogger(__name__)
def start(self):
"""Start the main processing loop."""
self.is_running.set()
self.monitor.start_monitoring()
self._main_loop()
def stop(self):
"""Stop the main processing loop."""
self.is_running.clear()
self.monitor.stop_monitoring()
def process_new_value(self):
"""Process values by calling pop function for multiple columns and custom callback."""
try:
# Store original column
original_column = self.sheet_manager.column_name
# Pop from huggingface_id column
model_id = self.sheet_manager.pop()
if model_id:
# Pop from benchmark_name column
self.sheet_manager.change_column("benchmark_name")
benchmark_name = self.sheet_manager.pop()
# Pop from prompt_cfg_name column
self.sheet_manager.change_column("prompt_cfg_name")
prompt_cfg_name = self.sheet_manager.pop()
# Return to original column
self.sheet_manager.change_column(original_column)
self.logger.info(f"Processed values - model_id: {model_id}, "
f"benchmark_name: {benchmark_name}, "
f"prompt_cfg_name: {prompt_cfg_name}")
if self.callback:
# Pass all three values to callback
self.callback(model_id, benchmark_name, prompt_cfg_name)
return model_id, benchmark_name, prompt_cfg_name
except Exception as e:
self.logger.error(f"Error processing values: {str(e)}")
# Return to original column in case of error
try:
self.sheet_manager.change_column(original_column)
except:
pass
return None
def _main_loop(self):
"""Main processing loop."""
while self.is_running.is_set():
# Wait for data to be available
if self.monitor.has_data.wait(timeout=1.0):
# Pause monitoring
self.monitor.pause()
# Process the value
self.process_new_value()
# Check if there's still data in the sheet
values = self.sheet_manager.get_all_values()
self.logger.info(f"After processing: Current column={self.sheet_manager.column_name}, "
f"Values remaining={len(values)}")
if not values:
self.monitor.has_data.clear()
self.logger.info("All data processed, clearing has_data flag")
else:
self.logger.info(f"Remaining data: {values}")
# Resume monitoring
self.monitor.resume()
## TODO
# API ๋ถ„๋‹น ํ˜ธ์ถœ ๋ฌธ์ œ๋กœ ๋งŒ์•ฝ์— ์ฐธ์กฐํ•˜๋‹ค๊ฐ€ ์‹คํŒจํ•  ๊ฒฝ์šฐ ๋Œ€๊ธฐํ–ˆ๋‹ค๊ฐ€ ๋‹ค์‹œ ์‹œ๋„ํ•˜๊ฒŒ๋” ์„ค๊ณ„
# Example usage
if __name__ == "__main__":
import sys
from pathlib import Path
sys.path.append(str(Path(__file__).parent.parent.parent))
from sheet_manager.sheet_crud.sheet_crud import SheetManager
from pia_bench.pipe_line.piepline import PiaBenchMark
def my_custom_function(huggingface_id, benchmark_name, prompt_cfg_name):
piabenchmark = PiaBenchMark(huggingface_id, benchmark_name, prompt_cfg_name)
piabenchmark.bench_start()
# Initialize components
sheet_manager = SheetManager()
monitor = SheetMonitor(sheet_manager, check_interval=10.0)
main_loop = MainLoop(sheet_manager, monitor, callback_function=my_custom_function)
try:
main_loop.start()
while True:
time.sleep(5)
except KeyboardInterrupt:
main_loop.stop()