File size: 8,453 Bytes
918bdb4
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f3473c1
 
918bdb4
 
f3473c1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
918bdb4
 
 
 
 
 
 
 
f3473c1
 
918bdb4
 
 
 
 
 
 
f3473c1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
918bdb4
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f3473c1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
"""
Centralized Logging Configuration for Yuga Planner

This module provides a unified logging configuration that:
1. Respects the YUGA_DEBUG environment variable for debug logging
2. Uses consistent formatting across the entire codebase
3. Eliminates the need for individual logging.basicConfig() calls

Usage:
    from utils.logging_config import setup_logging, get_logger

    # Initialize logging (typically done once per module)
    setup_logging()
    logger = get_logger(__name__)

    # Use logging methods
    logger.debug("Debug message - only shown when YUGA_DEBUG=true")
    logger.info("Info message - always shown")
    logger.warning("Warning message")
    logger.error("Error message")

Environment Variables:
    YUGA_DEBUG: Set to "true" to enable debug logging

Migration from old logging:
    Replace:
        import logging
        logging.basicConfig(level=logging.INFO)
        logger = logging.getLogger(__name__)

    With:
        from utils.logging_config import setup_logging, get_logger
        setup_logging()
        logger = get_logger(__name__)
"""

import os, sys, logging, threading, time

from typing import Optional

from collections import deque


class LogCapture:
    """Capture logs for real-time streaming to UI"""

    def __init__(self, max_lines: int = 1000):
        self.max_lines = max_lines
        self.log_buffer = deque(maxlen=max_lines)
        self.session_buffer = deque(maxlen=max_lines)  # Current session logs
        self.lock = threading.Lock()
        self.session_start_time = None

    def add_log(self, record: logging.LogRecord):
        """Add a log record to the UI streaming buffer (filtered for essential logs only)"""
        # This only affects UI streaming - console logs are handled separately
        logger_name = record.name
        message = record.getMessage()

        # Skip all UI, gradio, httpx, and other system logs for UI streaming
        skip_loggers = [
            "gradio",
            "httpx",
            "uvicorn",
            "fastapi",
            "urllib3",
            "ui.pages.chat",
            "ui.",
            "asyncio",
            "websockets",
            "handlers.tool_call_handler",
            "services.mcp_client",
        ]

        # Skip if it's a system logger
        if any(skip in logger_name for skip in skip_loggers):
            return

        # Only include essential task splitting and constraint solver logs for UI
        essential_patterns = [
            "=== Step 1: Task Breakdown ===",
            "=== Step 2: Time Estimation ===",
            "=== Step 3: Skill Matching ===",
            "Processing",
            "tasks for time estimation",
            "Completed time estimation",
            "Completed skill matching",
            "Generated",
            "tasks with skills",
            "Starting solve process",
            "Preparing schedule for solving",
            "Starting schedule solver",
            "solving",
            "constraint",
            "optimization",
        ]

        # Check if this log message contains essential information
        is_essential = any(
            pattern.lower() in message.lower() for pattern in essential_patterns
        )

        # Only include essential logs from factory and handler modules for UI
        allowed_modules = ["factory.", "handlers.mcp_backend", "services.schedule"]
        module_allowed = any(
            logger_name.startswith(module) for module in allowed_modules
        )

        if not (module_allowed and is_essential):
            return

        # Format for clean streaming display in UI
        timestamp = time.strftime("%H:%M:%S", time.localtime(record.created))

        # Clean up the message for better display
        match message:
            case msg if "===" in msg:
                # Task breakdown steps
                formatted_log = f"⏳ {msg.replace('===', '').strip()}"

            case msg if "Processing" in msg and "time estimation" in msg:
                formatted_log = f"⏱️ {msg}"

            case msg if "Completed" in msg:
                formatted_log = f"βœ… {msg}"

            case msg if "Generated" in msg and "tasks" in msg:
                formatted_log = f"🎯 {msg}"

            case msg if "Starting solve process" in msg or "Starting schedule solver" in msg:
                formatted_log = f"⚑ {msg}"

            case msg if "Preparing schedule" in msg:
                formatted_log = f"πŸ“‹ {msg}"

            case _:
                formatted_log = f"πŸ”§ {message}"

        with self.lock:
            self.log_buffer.append(formatted_log)

            # Add to session buffer if session is active
            if self.session_start_time and record.created >= self.session_start_time:
                self.session_buffer.append(formatted_log)

    def start_session(self):
        """Start capturing logs for current session"""
        with self.lock:
            self.session_start_time = time.time()
            self.session_buffer.clear()

    def get_session_logs(self) -> list:
        """Get all logs from current session"""
        with self.lock:
            return list(self.session_buffer)

    def get_recent_logs(self, count: int = 50) -> list:
        """Get recent logs"""
        with self.lock:
            return list(self.log_buffer)[-count:]


class StreamingLogHandler(logging.Handler):
    """Custom log handler that captures logs for streaming"""

    def __init__(self, log_capture: LogCapture):
        super().__init__()
        self.log_capture = log_capture

    def emit(self, record):
        try:
            self.log_capture.add_log(record)
        except Exception:
            self.handleError(record)


# Global log capture instance
_log_capture = LogCapture()
_streaming_handler = None


def setup_logging(level: Optional[str] = None) -> None:
    """
    Set up centralized logging configuration for the application.

    Args:
        level: Override the logging level. If None, uses YUGA_DEBUG environment variable.
    """
    global _streaming_handler

    # Determine logging level
    if level is not None:
        log_level = getattr(logging, level.upper(), logging.INFO)
    else:
        debug_enabled = os.getenv("YUGA_DEBUG", "false").lower() == "true"
        log_level = logging.DEBUG if debug_enabled else logging.INFO

    # Get root logger
    root_logger = logging.getLogger()

    # Only configure if not already configured
    if not root_logger.handlers or _streaming_handler is None:
        # Clear existing handlers to avoid duplicates
        root_logger.handlers.clear()

        # Create formatter
        formatter = logging.Formatter(
            "%(asctime)s - %(name)s - %(levelname)s - %(message)s",
            datefmt="%Y-%m-%d %H:%M:%S",
        )

        # Console handler for terminal output (shows ALL logs)
        console_handler = logging.StreamHandler(sys.stdout)
        console_handler.setLevel(log_level)
        console_handler.setFormatter(formatter)

        # Streaming handler for UI capture (filtered to essential logs only)
        _streaming_handler = StreamingLogHandler(_log_capture)
        _streaming_handler.setLevel(
            logging.DEBUG
        )  # Capture all levels, but filter in handler

        # Configure root logger
        root_logger.setLevel(logging.DEBUG)

        # Add both handlers
        root_logger.addHandler(console_handler)
        root_logger.addHandler(_streaming_handler)

    # Log the configuration
    logger = logging.getLogger(__name__)
    logger.debug("Debug logging enabled via YUGA_DEBUG environment variable")


def get_logger(name: str) -> logging.Logger:
    """
    Get a logger instance with the specified name.

    Args:
        name: Name for the logger, typically __name__

    Returns:
        Configured logger instance
    """
    return logging.getLogger(name)


def is_debug_enabled() -> bool:
    """Check if debug logging is enabled via environment variable."""
    return os.getenv("YUGA_DEBUG", "false").lower() == "true"


def get_log_capture() -> LogCapture:
    """Get the global log capture instance for UI streaming"""
    return _log_capture


def start_session_logging():
    """Start capturing logs for the current chat session"""
    _log_capture.start_session()


def get_session_logs() -> list:
    """Get all logs from the current session for streaming to UI"""
    return _log_capture.get_session_logs()