import json import re from pathlib import Path from typing import List, Dict, Any, Optional from dataclasses import dataclass, field from pipeline_config import PipelineConfig @dataclass class TaskmasterDialogue: """Structured representation of a Taskmaster-1 dialogue.""" conversation_id: str instruction_id: Optional[str] scenario: Optional[str] domain: str turns: List[Dict[str, Any]] = field(default_factory=list) def validate(self) -> bool: """Check if this dialogue has an ID and a list of turns.""" return bool(self.conversation_id and isinstance(self.turns, list)) class TaskmasterProcessor: """ Loads Taskmaster-1 dialogues, extracts domain from scenario, filters them, and outputs a final pipeline-friendly format. """ def __init__(self, config: PipelineConfig): self.config = config def load_taskmaster_dataset(self, base_dir: str, max_examples: Optional[int] = None) -> List[TaskmasterDialogue]: """ Load and parse Taskmaster JSON for self-dialogs & woz-dialogs (Taskmaster-1). Combines scenario text + conversation utterances to detect domain more robustly. """ required_files = { "self-dialogs": "self-dialogs.json", "woz-dialogs": "woz-dialogs.json", "ontology": "ontology.json", # we might not actively use this, but let's expect it } # Check for missing missing = [k for k, v in required_files.items() if not Path(base_dir, v).exists()] if missing: raise FileNotFoundError(f"Missing Taskmaster files: {missing}") # Load ontology (optional usage) ontology_path = Path(base_dir, required_files["ontology"]) with open(ontology_path, 'r', encoding='utf-8') as f: ontology = json.load(f) if self.config.debug: print(f"[TaskmasterProcessor] Loaded ontology with {len(ontology.keys())} top-level keys (unused).") dialogues: List[TaskmasterDialogue] = [] # We'll read the 2 main files file_keys = ["self-dialogs", "woz-dialogs"] for file_key in file_keys: file_path = Path(base_dir, required_files[file_key]) with open(file_path, 'r', encoding='utf-8') as f: raw_data = json.load(f) for d in raw_data: conversation_id = d.get("conversation_id", "") instruction_id = d.get("instruction_id", None) scenario_text = d.get("scenario", "") # old scenario approach # Collect utterances -> turns utterances = d.get("utterances", []) turns = self._process_utterances(utterances) # Instead of only using scenario_text, we combine scenario + turn texts. # We'll pass everything to _extract_domain domain = self._extract_domain( scenario_text, turns # pass the entire turn list so we can pick up domain keywords ) # Create a structured object new_dlg = TaskmasterDialogue( conversation_id=conversation_id, instruction_id=instruction_id, scenario=scenario_text, domain=domain, turns=turns ) dialogues.append(new_dlg) if max_examples and len(dialogues) >= max_examples: break if self.config.debug: print(f"[TaskmasterProcessor] Loaded {len(dialogues)} total dialogues from Taskmaster-1.") return dialogues def _extract_domain(self, scenario: str, turns: List[Dict[str, str]]) -> str: """ Combine scenario text + all turn texts to detect the domain more robustly. """ # 1) Combine scenario + conversation text combined_text = scenario.lower() for turn in turns: text = turn.get('text', '').strip().lower() combined_text += " " + text # 2) Expanded domain patterns (edit or expand as you wish) domain_patterns = { 'restaurant': r'\b(restaurant|dining|food|reservation|table|menu|cuisine|eat)\b', 'movie': r'\b(movie|cinema|film|ticket|showtime|theater)\b', 'ride_share': r'\b(ride|taxi|uber|lyft|car\s?service|pickup|dropoff)\b', 'coffee': r'\b(coffee|café|cafe|starbucks|espresso|latte|mocha|americano)\b', 'pizza': r'\b(pizza|delivery|order\s?food|pepperoni|topping|pizzeria)\b', 'auto': r'\b(car|vehicle|repair|maintenance|mechanic|oil\s?change)\b' } # 3) Return first matched domain or 'other' for dom, pattern in domain_patterns.items(): if re.search(pattern, combined_text): print(f"Matched domain: {dom}") return dom print("No domain match, returning 'other'") return 'other' def _process_utterances(self, utterances: List[Dict[str, Any]]) -> List[Dict[str, str]]: """Map speaker to user/assistant, store text.""" turns = [] for utt in utterances: speaker = 'assistant' if utt.get('speaker') == 'ASSISTANT' else 'user' text = utt.get('text', '').strip() turns.append({ 'speaker': speaker, 'text': text }) return turns def filter_and_convert(self, dialogues: List[TaskmasterDialogue]) -> List[Dict]: """ Filter out dialogues that don't meet min turns / min user words, then convert them to final pipeline dict: { "dialogue_id": "...", "domain": "...", "turns": [ {"speaker": "user", "text": "..."}, ... ] } """ results = [] for dlg in dialogues: if not dlg.validate(): continue if len(dlg.turns) < self.config.min_turns: continue # Check user-turn min words keep = True for turn in dlg.turns: if turn['speaker'] == 'user': word_count = len(turn['text'].split()) if word_count < self.config.min_user_words: keep = False break if not keep: continue pipeline_dlg = { 'dialogue_id': dlg.conversation_id, 'domain': dlg.domain, 'turns': dlg.turns # or you can refine further if needed } results.append(pipeline_dlg) if self.config.debug: print(f"[TaskmasterProcessor] Filtered down to {len(results)} dialogues.") return results