GUI_MAI-DxO / enhanced_mai_dx_logger.py
DocUA's picture
Оновлено модуль логування MAI-DX: повернуто вивід в консоль, вдосконалено обробку захопленого тексту та покращено документацію. Виправлено помилки в парсері для надійнішого захоплення розмов агентів.
ac50cc2
#!/usr/bin/env python3
"""
Покращений модуль для повного логування діагностичних сесій MAI-DX
з детальним захопленням розмов між агентами. (ВЕРСІЯ З ВИПРАВЛЕНИМ ПАРСЕРОМ І ПОВЕРНЕНИМ ВИВОДОМ)
"""
import os
import io
import sys
import json
import time
import uuid
import re
import logging
from datetime import datetime
from typing import List, Dict, Any, Optional, Callable
from dataclasses import dataclass, asdict, field
@dataclass
class AgentMessage:
timestamp: str
agent_name: str
message_type: str
content: str
metadata: Dict[str, Any] = field(default_factory=dict)
@dataclass
class AgentConversation:
round_number: int
start_time: str
end_time: Optional[str] = None
messages: List[AgentMessage] = field(default_factory=list)
decision: Optional[str] = None
cost_incurred: float = 0.0
@dataclass
class DiagnosisSession:
case_id: str
timestamp: str
case_name: str
patient_info: str
mode: str
budget: int = 0
diagnosis: str = "N/A"
confidence: float = 0.0
cost: float = 0.0
iterations: int = 0
duration: float = 0.0
status: str = "In Progress"
reasoning: str = "N/A"
conversations: List[AgentConversation] = field(default_factory=list)
raw_output: str = ""
class EnhancedOutputCapture:
def __init__(self):
self.captured_output = io.StringIO()
self.original_stdout = sys.stdout
def __enter__(self):
sys.stdout = self
return self
def __exit__(self, exc_type, exc_val, exc_tb):
sys.stdout = self.original_stdout
def write(self, text):
"""
FIX: Повертаємо вивід в консоль.
Цей метод тепер і записує вивід у буфер для парсингу,
і дублює його в оригінальну консоль.
"""
self.captured_output.write(text)
self.original_stdout.write(text)
def flush(self):
self.original_stdout.flush()
def get_value(self):
return self._strip_ansi_codes(self.captured_output.getvalue())
@staticmethod
def _strip_ansi_codes(text):
ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])')
return ansi_escape.sub('', text)
class MAIDxConversationLogger:
def __init__(self, log_dir: str = "mai_dx_logs"):
self.log_dir = log_dir
os.makedirs(self.log_dir, exist_ok=True)
def create_session(self, case_name: str, patient_info: str, mode: str, budget: int) -> DiagnosisSession:
case_id = f"case_{datetime.now().strftime('%Y%m%d_%H%M%S')}_{uuid.uuid4().hex[:6]}"
return DiagnosisSession(
case_id=case_id,
timestamp=datetime.now().isoformat(),
case_name=case_name,
patient_info=patient_info,
mode=mode,
budget=budget
)
def finalize_and_save_session(self, session: DiagnosisSession, result: Any, raw_output: str, duration: float) -> DiagnosisSession:
session.raw_output = raw_output
self._parse_captured_output(session, raw_output)
session.diagnosis = getattr(result, 'final_diagnosis', 'N/A')
session.confidence = getattr(result, 'accuracy_score', 0.0)
session.cost = getattr(result, 'total_cost', 0.0)
session.reasoning = getattr(result, 'accuracy_reasoning', 'N/A')
session.iterations = len(session.conversations)
session.status = "✅ Успішно" if session.confidence >= 3.0 else "⚠️ Потребує перегляду"
session.duration = duration
self._save_session_to_file(session)
self.export_conversation_html(session)
return session
def _parse_captured_output(self, session: DiagnosisSession, captured_text: str):
lines = captured_text.split('\n')
current_round: Optional[AgentConversation] = None
current_agent: Optional[str] = None
buffer: List[str] = []
in_agent_output = False
round_number = 0
for line in lines:
stripped_line = line.strip()
if "Agent Name" in line and ("╭" in line or "┌" in line):
if not current_round or in_agent_output:
round_number += 1
if current_round:
current_round.end_time = datetime.now().isoformat()
current_round = AgentConversation(round_number=round_number, start_time=datetime.now().isoformat())
session.conversations.append(current_round)
agent_match = re.search(r'Agent Name (.*?) (?:\[|\s)', line)
if agent_match:
current_agent = agent_match.group(1).strip()
in_agent_output = True
buffer = []
continue
if in_agent_output and ("╰" in line or "└" in line):
if buffer and current_agent and current_round:
self._add_agent_message(current_round, current_agent, '\n'.join(buffer))
in_agent_output, current_agent, buffer = False, None, []
continue
if in_agent_output:
clean_line = re.sub(r'^[│|]\s*|\s*[│|]\s*$', '', line)
buffer.append(clean_line)
continue
if current_round:
current_round.end_time = datetime.now().isoformat()
def _add_agent_message(self, conversation: AgentConversation, agent_name: str, content: str):
message_type = "output"
formatted_content = content.strip()
if "Structured Output - Attempting Function Call Execution" in formatted_content:
message_type = "function_call"
elif "Score:" in formatted_content and "Justification:" in formatted_content:
message_type = "judgement"
elif "No tests have been proposed" in formatted_content:
message_type = "status_update"
message = AgentMessage(
datetime.now().isoformat(), agent_name, message_type, formatted_content, {'raw_content': content}
)
conversation.messages.append(message)
def _save_session_to_file(self, session: DiagnosisSession):
file_path = os.path.join(self.log_dir, f"{session.case_id}.json")
with open(file_path, 'w', encoding='utf-8') as f:
json.dump(asdict(session), f, ensure_ascii=False, indent=2)
raw_output_path = os.path.join(self.log_dir, f"{session.case_id}_raw.txt")
with open(raw_output_path, 'w', encoding='utf-8') as f:
f.write(session.raw_output)
def export_conversation_html(self, session: DiagnosisSession) -> str:
html_path = os.path.join(self.log_dir, f"{session.case_id}_conversation.html")
html_content = f"<html><head><title>Log - {session.case_id}</title></head><body>"
html_content += f"<h1>Session: {session.case_id}</h1>"
for conv in session.conversations:
html_content += f"<h2>Round {conv.round_number}</h2>"
for msg in conv.messages:
html_content += f"<div><b>{msg.agent_name}</b> [{msg.message_type}]:<pre>{msg.content}</pre></div>"
html_content += "</body></html>"
with open(html_path, 'w', encoding='utf-8') as f:
f.write(html_content)
return html_path