File size: 6,995 Bytes
7a0020b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
import json
import re
from pathlib import Path
from typing import List, Dict, Any, Optional
from dataclasses import dataclass, field

from pipeline_config import PipelineConfig

@dataclass
class TaskmasterDialogue:
    """Structured representation of a Taskmaster-1 dialogue."""
    conversation_id: str
    instruction_id: Optional[str]
    scenario: Optional[str]
    domain: str
    turns: List[Dict[str, Any]] = field(default_factory=list)
    
    def validate(self) -> bool:
        """Check if this dialogue has an ID and a list of turns."""
        return bool(self.conversation_id and isinstance(self.turns, list))

class TaskmasterProcessor:
    """
    Loads Taskmaster-1 dialogues, extracts domain from scenario, 
    filters them, and outputs a final pipeline-friendly format.
    """
    def __init__(self, config: PipelineConfig):
        self.config = config
    
    def load_taskmaster_dataset(self, base_dir: str, max_examples: Optional[int] = None) -> List[TaskmasterDialogue]:
        """
        Load and parse Taskmaster JSON for self-dialogs & woz-dialogs (Taskmaster-1).
        Combines scenario text + conversation utterances to detect domain more robustly.
        """
        required_files = {
            "self-dialogs": "self-dialogs.json",
            "woz-dialogs": "woz-dialogs.json",
            "ontology": "ontology.json",  # we might not actively use this, but let's expect it
        }
        # Check for missing
        missing = [k for k, v in required_files.items() if not Path(base_dir, v).exists()]
        if missing:
            raise FileNotFoundError(f"Missing Taskmaster files: {missing}")
        
        # Load ontology (optional usage)
        ontology_path = Path(base_dir, required_files["ontology"])
        with open(ontology_path, 'r', encoding='utf-8') as f:
            ontology = json.load(f)
            if self.config.debug:
                print(f"[TaskmasterProcessor] Loaded ontology with {len(ontology.keys())} top-level keys (unused).")
        
        dialogues: List[TaskmasterDialogue] = []
        
        # We'll read the 2 main files
        file_keys = ["self-dialogs", "woz-dialogs"]
        for file_key in file_keys:
            file_path = Path(base_dir, required_files[file_key])
            with open(file_path, 'r', encoding='utf-8') as f:
                raw_data = json.load(f)
            
            for d in raw_data:
                conversation_id = d.get("conversation_id", "")
                instruction_id = d.get("instruction_id", None)
                scenario_text = d.get("scenario", "")  # old scenario approach

                # Collect utterances -> turns
                utterances = d.get("utterances", [])
                turns = self._process_utterances(utterances)

                # Instead of only using scenario_text, we combine scenario + turn texts.
                # We'll pass everything to _extract_domain
                domain = self._extract_domain(
                    scenario_text,
                    turns  # pass the entire turn list so we can pick up domain keywords
                )

                # Create a structured object
                new_dlg = TaskmasterDialogue(
                    conversation_id=conversation_id,
                    instruction_id=instruction_id,
                    scenario=scenario_text,
                    domain=domain,
                    turns=turns
                )
                dialogues.append(new_dlg)
                
                if max_examples and len(dialogues) >= max_examples:
                    break
        
        if self.config.debug:
            print(f"[TaskmasterProcessor] Loaded {len(dialogues)} total dialogues from Taskmaster-1.")
        return dialogues
    
    def _extract_domain(self, scenario: str, turns: List[Dict[str, str]]) -> str:
        """
        Combine scenario text + all turn texts to detect the domain more robustly.
        """
        # 1) Combine scenario + conversation text
        combined_text = scenario.lower()
        for turn in turns:
            text = turn.get('text', '').strip().lower()
            combined_text += " " + text

        # 2) Expanded domain patterns (edit or expand as you wish)
        domain_patterns = {
            'restaurant': r'\b(restaurant|dining|food|reservation|table|menu|cuisine|eat)\b',
            'movie': r'\b(movie|cinema|film|ticket|showtime|theater)\b',
            'ride_share': r'\b(ride|taxi|uber|lyft|car\s?service|pickup|dropoff)\b',
            'coffee': r'\b(coffee|café|cafe|starbucks|espresso|latte|mocha|americano)\b',
            'pizza': r'\b(pizza|delivery|order\s?food|pepperoni|topping|pizzeria)\b',
            'auto': r'\b(car|vehicle|repair|maintenance|mechanic|oil\s?change)\b'
        }

        # 3) Return first matched domain or 'other'
        for dom, pattern in domain_patterns.items():
            if re.search(pattern, combined_text):
                print(f"Matched domain: {dom}")
                return dom
        
        print("No domain match, returning 'other'")
        return 'other'
    
    def _process_utterances(self, utterances: List[Dict[str, Any]]) -> List[Dict[str, str]]:
        """Map speaker to user/assistant, store text."""
        turns = []
        for utt in utterances:
            speaker = 'assistant' if utt.get('speaker') == 'ASSISTANT' else 'user'
            text = utt.get('text', '').strip()
            turns.append({
                'speaker': speaker,
                'text': text
            })
        return turns
    
    def filter_and_convert(self, dialogues: List[TaskmasterDialogue]) -> List[Dict]:
        """
        Filter out dialogues that don't meet min turns / min user words,
        then convert them to final pipeline dict:
        
        {
          "dialogue_id": "...",
          "domain": "...",
          "turns": [
            {"speaker": "user", "text": "..."},
            ...
          ]
        }
        """
        results = []
        for dlg in dialogues:
            if not dlg.validate():
                continue
            
            if len(dlg.turns) < self.config.min_turns:
                continue
            
            # Check user-turn min words
            keep = True
            for turn in dlg.turns:
                if turn['speaker'] == 'user':
                    word_count = len(turn['text'].split())
                    if word_count < self.config.min_user_words:
                        keep = False
                        break
            if not keep:
                continue
            
            pipeline_dlg = {
                'dialogue_id': dlg.conversation_id,
                'domain': dlg.domain,
                'turns': dlg.turns  # or you can refine further if needed
            }
            results.append(pipeline_dlg)
        
        if self.config.debug:
            print(f"[TaskmasterProcessor] Filtered down to {len(results)} dialogues.")
        return results