File size: 7,753 Bytes
a005c19
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
import threading
import time
from typing import Optional, Callable
import logging

class SheetMonitor:
    def __init__(self, sheet_manager, check_interval: float = 1.0):
        """
        Initialize SheetMonitor with a sheet manager instance.
        """
        self.sheet_manager = sheet_manager
        self.check_interval = check_interval
        
        # Threading control
        self.monitor_thread = None
        self.is_running = threading.Event()
        self.pause_monitoring = threading.Event()
        self.monitor_paused = threading.Event()
        
        # Queue status
        self.has_data = threading.Event()
        
        # Logging setup
        logging.basicConfig(level=logging.INFO)
        self.logger = logging.getLogger(__name__)

    def start_monitoring(self):
        """Start the monitoring thread."""
        if self.monitor_thread is not None and self.monitor_thread.is_alive():
            self.logger.warning("Monitoring thread is already running")
            return

        self.is_running.set()
        self.pause_monitoring.clear()
        self.monitor_thread = threading.Thread(target=self._monitor_loop)
        self.monitor_thread.daemon = True
        self.monitor_thread.start()
        self.logger.info("Started monitoring thread")

    def stop_monitoring(self):
        """Stop the monitoring thread."""
        self.is_running.clear()
        if self.monitor_thread:
            self.monitor_thread.join()
        self.logger.info("Stopped monitoring thread")

    def pause(self):
        """Pause the monitoring."""
        self.pause_monitoring.set()
        self.monitor_paused.wait()
        self.logger.info("Monitoring paused")

    def resume(self):
        """Resume the monitoring."""
        self.pause_monitoring.clear()
        self.monitor_paused.clear()
        # ์ฆ‰์‹œ ์ฒดํฌ ์ˆ˜ํ–‰
        self.logger.info("Monitoring resumed, checking for new data...")
        values = self.sheet_manager.get_all_values()
        if values:
            self.has_data.set()
            self.logger.info(f"Found data after resume: {values}")


    def _monitor_loop(self):
        """Main monitoring loop that checks for data in sheet."""
        while self.is_running.is_set():
            if self.pause_monitoring.is_set():
                self.monitor_paused.set()
                self.pause_monitoring.wait()
                self.monitor_paused.clear()
                # continue

            try:
                # Check if there's any data in the sheet
                values = self.sheet_manager.get_all_values()
                self.logger.info(f"Monitoring: Current column={self.sheet_manager.column_name}, "
                            f"Values found={len(values)}, "
                            f"Has data={self.has_data.is_set()}")
                
                if values:  # If there's any non-empty value
                    self.has_data.set()
                    self.logger.info(f"Data detected: {values}")
                else:
                    self.has_data.clear()
                    self.logger.info("No data in sheet, waiting...")
                
                time.sleep(self.check_interval)
                
            except Exception as e:
                self.logger.error(f"Error in monitoring loop: {str(e)}")
                time.sleep(self.check_interval)

class MainLoop:
    def __init__(self, sheet_manager, sheet_monitor, callback_function: Callable = None):
        """
        Initialize MainLoop with sheet manager and monitor instances.
        """
        self.sheet_manager = sheet_manager
        self.monitor = sheet_monitor
        self.callback = callback_function
        self.is_running = threading.Event()
        self.logger = logging.getLogger(__name__)

    def start(self):
        """Start the main processing loop."""
        self.is_running.set()
        self.monitor.start_monitoring()
        self._main_loop()

    def stop(self):
        """Stop the main processing loop."""
        self.is_running.clear()
        self.monitor.stop_monitoring()

    def process_new_value(self):
        """Process values by calling pop function for multiple columns and custom callback."""
        try:
            # Store original column
            original_column = self.sheet_manager.column_name
            
            # Pop from huggingface_id column
            model_id = self.sheet_manager.pop()
            
            if model_id:
                # Pop from benchmark_name column
                self.sheet_manager.change_column("benchmark_name")
                benchmark_name = self.sheet_manager.pop()
                
                # Pop from prompt_cfg_name column
                self.sheet_manager.change_column("prompt_cfg_name")
                prompt_cfg_name = self.sheet_manager.pop()
                
                # Return to original column
                self.sheet_manager.change_column(original_column)
                
                self.logger.info(f"Processed values - model_id: {model_id}, "
                            f"benchmark_name: {benchmark_name}, "
                            f"prompt_cfg_name: {prompt_cfg_name}")
                
                if self.callback:
                    # Pass all three values to callback
                    self.callback(model_id, benchmark_name, prompt_cfg_name)
                    
                return model_id, benchmark_name, prompt_cfg_name
                
        except Exception as e:
            self.logger.error(f"Error processing values: {str(e)}")
            # Return to original column in case of error
            try:
                self.sheet_manager.change_column(original_column)
            except:
                pass
            return None

    def _main_loop(self):
        """Main processing loop."""
        while self.is_running.is_set():
            # Wait for data to be available
            if self.monitor.has_data.wait(timeout=1.0):
                # Pause monitoring
                self.monitor.pause()
                
                # Process the value
                self.process_new_value()
                
                # Check if there's still data in the sheet
                values = self.sheet_manager.get_all_values()
                self.logger.info(f"After processing: Current column={self.sheet_manager.column_name}, "
                            f"Values remaining={len(values)}")
                
                if not values:
                    self.monitor.has_data.clear()
                    self.logger.info("All data processed, clearing has_data flag")
                else:
                    self.logger.info(f"Remaining data: {values}")
                
                # Resume monitoring
                self.monitor.resume()
## TODO
# API ๋ถ„๋‹น ํ˜ธ์ถœ ๋ฌธ์ œ๋กœ ๋งŒ์•ฝ์— ์ฐธ์กฐํ•˜๋‹ค๊ฐ€ ์‹คํŒจํ•  ๊ฒฝ์šฐ ๋Œ€๊ธฐํ–ˆ๋‹ค๊ฐ€ ๋‹ค์‹œ ์‹œ๋„ํ•˜๊ฒŒ๋” ์„ค๊ณ„


# Example usage
if __name__ == "__main__":
    import sys
    from pathlib import Path
    sys.path.append(str(Path(__file__).parent.parent.parent))
    from sheet_manager.sheet_crud.sheet_crud import SheetManager
    from pia_bench.pipe_line.piepline import PiaBenchMark
    def my_custom_function(huggingface_id, benchmark_name, prompt_cfg_name):
        piabenchmark = PiaBenchMark(huggingface_id, benchmark_name, prompt_cfg_name)
        piabenchmark.bench_start()

    # Initialize components
    sheet_manager = SheetManager()
    monitor = SheetMonitor(sheet_manager, check_interval=10.0)
    main_loop = MainLoop(sheet_manager, monitor, callback_function=my_custom_function)

    try:
        main_loop.start()
        while True:
            time.sleep(5)
    except KeyboardInterrupt:
        main_loop.stop()