Spaces:
Running
Running
#!/usr/bin/env python3 | |
""" | |
Покращений модуль для повного логування діагностичних сесій MAI-DX | |
з детальним захопленням розмов між агентами. (ВЕРСІЯ З ВИПРАВЛЕНИМ ПАРСЕРОМ І ПОВЕРНЕНИМ ВИВОДОМ) | |
""" | |
import os | |
import io | |
import sys | |
import json | |
import time | |
import uuid | |
import re | |
import logging | |
from datetime import datetime | |
from typing import List, Dict, Any, Optional, Callable | |
from dataclasses import dataclass, asdict, field | |
class AgentMessage: | |
timestamp: str | |
agent_name: str | |
message_type: str | |
content: str | |
metadata: Dict[str, Any] = field(default_factory=dict) | |
class AgentConversation: | |
round_number: int | |
start_time: str | |
end_time: Optional[str] = None | |
messages: List[AgentMessage] = field(default_factory=list) | |
decision: Optional[str] = None | |
cost_incurred: float = 0.0 | |
class DiagnosisSession: | |
case_id: str | |
timestamp: str | |
case_name: str | |
patient_info: str | |
mode: str | |
budget: int = 0 | |
diagnosis: str = "N/A" | |
confidence: float = 0.0 | |
cost: float = 0.0 | |
iterations: int = 0 | |
duration: float = 0.0 | |
status: str = "In Progress" | |
reasoning: str = "N/A" | |
conversations: List[AgentConversation] = field(default_factory=list) | |
raw_output: str = "" | |
class EnhancedOutputCapture: | |
def __init__(self): | |
self.captured_output = io.StringIO() | |
self.original_stdout = sys.stdout | |
def __enter__(self): | |
sys.stdout = self | |
return self | |
def __exit__(self, exc_type, exc_val, exc_tb): | |
sys.stdout = self.original_stdout | |
def write(self, text): | |
""" | |
FIX: Повертаємо вивід в консоль. | |
Цей метод тепер і записує вивід у буфер для парсингу, | |
і дублює його в оригінальну консоль. | |
""" | |
self.captured_output.write(text) | |
self.original_stdout.write(text) | |
def flush(self): | |
self.original_stdout.flush() | |
def get_value(self): | |
return self._strip_ansi_codes(self.captured_output.getvalue()) | |
def _strip_ansi_codes(text): | |
ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])') | |
return ansi_escape.sub('', text) | |
class MAIDxConversationLogger: | |
def __init__(self, log_dir: str = "mai_dx_logs"): | |
self.log_dir = log_dir | |
os.makedirs(self.log_dir, exist_ok=True) | |
def create_session(self, case_name: str, patient_info: str, mode: str, budget: int) -> DiagnosisSession: | |
case_id = f"case_{datetime.now().strftime('%Y%m%d_%H%M%S')}_{uuid.uuid4().hex[:6]}" | |
return DiagnosisSession( | |
case_id=case_id, | |
timestamp=datetime.now().isoformat(), | |
case_name=case_name, | |
patient_info=patient_info, | |
mode=mode, | |
budget=budget | |
) | |
def finalize_and_save_session(self, session: DiagnosisSession, result: Any, raw_output: str, duration: float) -> DiagnosisSession: | |
session.raw_output = raw_output | |
self._parse_captured_output(session, raw_output) | |
session.diagnosis = getattr(result, 'final_diagnosis', 'N/A') | |
session.confidence = getattr(result, 'accuracy_score', 0.0) | |
session.cost = getattr(result, 'total_cost', 0.0) | |
session.reasoning = getattr(result, 'accuracy_reasoning', 'N/A') | |
session.iterations = len(session.conversations) | |
session.status = "✅ Успішно" if session.confidence >= 3.0 else "⚠️ Потребує перегляду" | |
session.duration = duration | |
self._save_session_to_file(session) | |
self.export_conversation_html(session) | |
return session | |
def _parse_captured_output(self, session: DiagnosisSession, captured_text: str): | |
lines = captured_text.split('\n') | |
current_round: Optional[AgentConversation] = None | |
current_agent: Optional[str] = None | |
buffer: List[str] = [] | |
in_agent_output = False | |
round_number = 0 | |
for line in lines: | |
stripped_line = line.strip() | |
if "Agent Name" in line and ("╭" in line or "┌" in line): | |
if not current_round or in_agent_output: | |
round_number += 1 | |
if current_round: | |
current_round.end_time = datetime.now().isoformat() | |
current_round = AgentConversation(round_number=round_number, start_time=datetime.now().isoformat()) | |
session.conversations.append(current_round) | |
agent_match = re.search(r'Agent Name (.*?) (?:\[|\s)', line) | |
if agent_match: | |
current_agent = agent_match.group(1).strip() | |
in_agent_output = True | |
buffer = [] | |
continue | |
if in_agent_output and ("╰" in line or "└" in line): | |
if buffer and current_agent and current_round: | |
self._add_agent_message(current_round, current_agent, '\n'.join(buffer)) | |
in_agent_output, current_agent, buffer = False, None, [] | |
continue | |
if in_agent_output: | |
clean_line = re.sub(r'^[│|]\s*|\s*[│|]\s*$', '', line) | |
buffer.append(clean_line) | |
continue | |
if current_round: | |
current_round.end_time = datetime.now().isoformat() | |
def _add_agent_message(self, conversation: AgentConversation, agent_name: str, content: str): | |
message_type = "output" | |
formatted_content = content.strip() | |
if "Structured Output - Attempting Function Call Execution" in formatted_content: | |
message_type = "function_call" | |
elif "Score:" in formatted_content and "Justification:" in formatted_content: | |
message_type = "judgement" | |
elif "No tests have been proposed" in formatted_content: | |
message_type = "status_update" | |
message = AgentMessage( | |
datetime.now().isoformat(), agent_name, message_type, formatted_content, {'raw_content': content} | |
) | |
conversation.messages.append(message) | |
def _save_session_to_file(self, session: DiagnosisSession): | |
file_path = os.path.join(self.log_dir, f"{session.case_id}.json") | |
with open(file_path, 'w', encoding='utf-8') as f: | |
json.dump(asdict(session), f, ensure_ascii=False, indent=2) | |
raw_output_path = os.path.join(self.log_dir, f"{session.case_id}_raw.txt") | |
with open(raw_output_path, 'w', encoding='utf-8') as f: | |
f.write(session.raw_output) | |
def export_conversation_html(self, session: DiagnosisSession) -> str: | |
html_path = os.path.join(self.log_dir, f"{session.case_id}_conversation.html") | |
html_content = f"<html><head><title>Log - {session.case_id}</title></head><body>" | |
html_content += f"<h1>Session: {session.case_id}</h1>" | |
for conv in session.conversations: | |
html_content += f"<h2>Round {conv.round_number}</h2>" | |
for msg in conv.messages: | |
html_content += f"<div><b>{msg.agent_name}</b> [{msg.message_type}]:<pre>{msg.content}</pre></div>" | |
html_content += "</body></html>" | |
with open(html_path, 'w', encoding='utf-8') as f: | |
f.write(html_content) | |
return html_path |