JoeArmani
updates - new iteration with type token
7a0020b
raw
history blame
7 kB
import json
import re
from pathlib import Path
from typing import List, Dict, Any, Optional
from dataclasses import dataclass, field
from pipeline_config import PipelineConfig
@dataclass
class TaskmasterDialogue:
"""Structured representation of a Taskmaster-1 dialogue."""
conversation_id: str
instruction_id: Optional[str]
scenario: Optional[str]
domain: str
turns: List[Dict[str, Any]] = field(default_factory=list)
def validate(self) -> bool:
"""Check if this dialogue has an ID and a list of turns."""
return bool(self.conversation_id and isinstance(self.turns, list))
class TaskmasterProcessor:
"""
Loads Taskmaster-1 dialogues, extracts domain from scenario,
filters them, and outputs a final pipeline-friendly format.
"""
def __init__(self, config: PipelineConfig):
self.config = config
def load_taskmaster_dataset(self, base_dir: str, max_examples: Optional[int] = None) -> List[TaskmasterDialogue]:
"""
Load and parse Taskmaster JSON for self-dialogs & woz-dialogs (Taskmaster-1).
Combines scenario text + conversation utterances to detect domain more robustly.
"""
required_files = {
"self-dialogs": "self-dialogs.json",
"woz-dialogs": "woz-dialogs.json",
"ontology": "ontology.json", # we might not actively use this, but let's expect it
}
# Check for missing
missing = [k for k, v in required_files.items() if not Path(base_dir, v).exists()]
if missing:
raise FileNotFoundError(f"Missing Taskmaster files: {missing}")
# Load ontology (optional usage)
ontology_path = Path(base_dir, required_files["ontology"])
with open(ontology_path, 'r', encoding='utf-8') as f:
ontology = json.load(f)
if self.config.debug:
print(f"[TaskmasterProcessor] Loaded ontology with {len(ontology.keys())} top-level keys (unused).")
dialogues: List[TaskmasterDialogue] = []
# We'll read the 2 main files
file_keys = ["self-dialogs", "woz-dialogs"]
for file_key in file_keys:
file_path = Path(base_dir, required_files[file_key])
with open(file_path, 'r', encoding='utf-8') as f:
raw_data = json.load(f)
for d in raw_data:
conversation_id = d.get("conversation_id", "")
instruction_id = d.get("instruction_id", None)
scenario_text = d.get("scenario", "") # old scenario approach
# Collect utterances -> turns
utterances = d.get("utterances", [])
turns = self._process_utterances(utterances)
# Instead of only using scenario_text, we combine scenario + turn texts.
# We'll pass everything to _extract_domain
domain = self._extract_domain(
scenario_text,
turns # pass the entire turn list so we can pick up domain keywords
)
# Create a structured object
new_dlg = TaskmasterDialogue(
conversation_id=conversation_id,
instruction_id=instruction_id,
scenario=scenario_text,
domain=domain,
turns=turns
)
dialogues.append(new_dlg)
if max_examples and len(dialogues) >= max_examples:
break
if self.config.debug:
print(f"[TaskmasterProcessor] Loaded {len(dialogues)} total dialogues from Taskmaster-1.")
return dialogues
def _extract_domain(self, scenario: str, turns: List[Dict[str, str]]) -> str:
"""
Combine scenario text + all turn texts to detect the domain more robustly.
"""
# 1) Combine scenario + conversation text
combined_text = scenario.lower()
for turn in turns:
text = turn.get('text', '').strip().lower()
combined_text += " " + text
# 2) Expanded domain patterns (edit or expand as you wish)
domain_patterns = {
'restaurant': r'\b(restaurant|dining|food|reservation|table|menu|cuisine|eat)\b',
'movie': r'\b(movie|cinema|film|ticket|showtime|theater)\b',
'ride_share': r'\b(ride|taxi|uber|lyft|car\s?service|pickup|dropoff)\b',
'coffee': r'\b(coffee|café|cafe|starbucks|espresso|latte|mocha|americano)\b',
'pizza': r'\b(pizza|delivery|order\s?food|pepperoni|topping|pizzeria)\b',
'auto': r'\b(car|vehicle|repair|maintenance|mechanic|oil\s?change)\b'
}
# 3) Return first matched domain or 'other'
for dom, pattern in domain_patterns.items():
if re.search(pattern, combined_text):
print(f"Matched domain: {dom}")
return dom
print("No domain match, returning 'other'")
return 'other'
def _process_utterances(self, utterances: List[Dict[str, Any]]) -> List[Dict[str, str]]:
"""Map speaker to user/assistant, store text."""
turns = []
for utt in utterances:
speaker = 'assistant' if utt.get('speaker') == 'ASSISTANT' else 'user'
text = utt.get('text', '').strip()
turns.append({
'speaker': speaker,
'text': text
})
return turns
def filter_and_convert(self, dialogues: List[TaskmasterDialogue]) -> List[Dict]:
"""
Filter out dialogues that don't meet min turns / min user words,
then convert them to final pipeline dict:
{
"dialogue_id": "...",
"domain": "...",
"turns": [
{"speaker": "user", "text": "..."},
...
]
}
"""
results = []
for dlg in dialogues:
if not dlg.validate():
continue
if len(dlg.turns) < self.config.min_turns:
continue
# Check user-turn min words
keep = True
for turn in dlg.turns:
if turn['speaker'] == 'user':
word_count = len(turn['text'].split())
if word_count < self.config.min_user_words:
keep = False
break
if not keep:
continue
pipeline_dlg = {
'dialogue_id': dlg.conversation_id,
'domain': dlg.domain,
'turns': dlg.turns # or you can refine further if needed
}
results.append(pipeline_dlg)
if self.config.debug:
print(f"[TaskmasterProcessor] Filtered down to {len(results)} dialogues.")
return results