File size: 6,995 Bytes
7a0020b |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 |
import json
import re
from pathlib import Path
from typing import List, Dict, Any, Optional
from dataclasses import dataclass, field
from pipeline_config import PipelineConfig
@dataclass
class TaskmasterDialogue:
"""Structured representation of a Taskmaster-1 dialogue."""
conversation_id: str
instruction_id: Optional[str]
scenario: Optional[str]
domain: str
turns: List[Dict[str, Any]] = field(default_factory=list)
def validate(self) -> bool:
"""Check if this dialogue has an ID and a list of turns."""
return bool(self.conversation_id and isinstance(self.turns, list))
class TaskmasterProcessor:
"""
Loads Taskmaster-1 dialogues, extracts domain from scenario,
filters them, and outputs a final pipeline-friendly format.
"""
def __init__(self, config: PipelineConfig):
self.config = config
def load_taskmaster_dataset(self, base_dir: str, max_examples: Optional[int] = None) -> List[TaskmasterDialogue]:
"""
Load and parse Taskmaster JSON for self-dialogs & woz-dialogs (Taskmaster-1).
Combines scenario text + conversation utterances to detect domain more robustly.
"""
required_files = {
"self-dialogs": "self-dialogs.json",
"woz-dialogs": "woz-dialogs.json",
"ontology": "ontology.json", # we might not actively use this, but let's expect it
}
# Check for missing
missing = [k for k, v in required_files.items() if not Path(base_dir, v).exists()]
if missing:
raise FileNotFoundError(f"Missing Taskmaster files: {missing}")
# Load ontology (optional usage)
ontology_path = Path(base_dir, required_files["ontology"])
with open(ontology_path, 'r', encoding='utf-8') as f:
ontology = json.load(f)
if self.config.debug:
print(f"[TaskmasterProcessor] Loaded ontology with {len(ontology.keys())} top-level keys (unused).")
dialogues: List[TaskmasterDialogue] = []
# We'll read the 2 main files
file_keys = ["self-dialogs", "woz-dialogs"]
for file_key in file_keys:
file_path = Path(base_dir, required_files[file_key])
with open(file_path, 'r', encoding='utf-8') as f:
raw_data = json.load(f)
for d in raw_data:
conversation_id = d.get("conversation_id", "")
instruction_id = d.get("instruction_id", None)
scenario_text = d.get("scenario", "") # old scenario approach
# Collect utterances -> turns
utterances = d.get("utterances", [])
turns = self._process_utterances(utterances)
# Instead of only using scenario_text, we combine scenario + turn texts.
# We'll pass everything to _extract_domain
domain = self._extract_domain(
scenario_text,
turns # pass the entire turn list so we can pick up domain keywords
)
# Create a structured object
new_dlg = TaskmasterDialogue(
conversation_id=conversation_id,
instruction_id=instruction_id,
scenario=scenario_text,
domain=domain,
turns=turns
)
dialogues.append(new_dlg)
if max_examples and len(dialogues) >= max_examples:
break
if self.config.debug:
print(f"[TaskmasterProcessor] Loaded {len(dialogues)} total dialogues from Taskmaster-1.")
return dialogues
def _extract_domain(self, scenario: str, turns: List[Dict[str, str]]) -> str:
"""
Combine scenario text + all turn texts to detect the domain more robustly.
"""
# 1) Combine scenario + conversation text
combined_text = scenario.lower()
for turn in turns:
text = turn.get('text', '').strip().lower()
combined_text += " " + text
# 2) Expanded domain patterns (edit or expand as you wish)
domain_patterns = {
'restaurant': r'\b(restaurant|dining|food|reservation|table|menu|cuisine|eat)\b',
'movie': r'\b(movie|cinema|film|ticket|showtime|theater)\b',
'ride_share': r'\b(ride|taxi|uber|lyft|car\s?service|pickup|dropoff)\b',
'coffee': r'\b(coffee|café|cafe|starbucks|espresso|latte|mocha|americano)\b',
'pizza': r'\b(pizza|delivery|order\s?food|pepperoni|topping|pizzeria)\b',
'auto': r'\b(car|vehicle|repair|maintenance|mechanic|oil\s?change)\b'
}
# 3) Return first matched domain or 'other'
for dom, pattern in domain_patterns.items():
if re.search(pattern, combined_text):
print(f"Matched domain: {dom}")
return dom
print("No domain match, returning 'other'")
return 'other'
def _process_utterances(self, utterances: List[Dict[str, Any]]) -> List[Dict[str, str]]:
"""Map speaker to user/assistant, store text."""
turns = []
for utt in utterances:
speaker = 'assistant' if utt.get('speaker') == 'ASSISTANT' else 'user'
text = utt.get('text', '').strip()
turns.append({
'speaker': speaker,
'text': text
})
return turns
def filter_and_convert(self, dialogues: List[TaskmasterDialogue]) -> List[Dict]:
"""
Filter out dialogues that don't meet min turns / min user words,
then convert them to final pipeline dict:
{
"dialogue_id": "...",
"domain": "...",
"turns": [
{"speaker": "user", "text": "..."},
...
]
}
"""
results = []
for dlg in dialogues:
if not dlg.validate():
continue
if len(dlg.turns) < self.config.min_turns:
continue
# Check user-turn min words
keep = True
for turn in dlg.turns:
if turn['speaker'] == 'user':
word_count = len(turn['text'].split())
if word_count < self.config.min_user_words:
keep = False
break
if not keep:
continue
pipeline_dlg = {
'dialogue_id': dlg.conversation_id,
'domain': dlg.domain,
'turns': dlg.turns # or you can refine further if needed
}
results.append(pipeline_dlg)
if self.config.debug:
print(f"[TaskmasterProcessor] Filtered down to {len(results)} dialogues.")
return results |