File size: 5,284 Bytes
1f6c376
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
# segmenter.py - Text segmentation and speaker assignment
import re
from typing import List, Tuple
import logging

logger = logging.getLogger(__name__)

class TextSegmenter:
    def __init__(self):
        # Changed speakers to Nari DIA's expected tags
        self.speakers = ["S1", "S2"] 
        self.current_speaker_index = 0
    
    def segment_and_assign_speakers(
        self, 
        text: str, 
        mode: str = "auto"
    ) -> List[Tuple[str, str]]:
        """
        Segment text and assign speakers.
        
        Args:
            text: Input text to segment
            mode: Segmentation mode ("auto", "paragraph", "dialogue")
            
        Returns:
            List of (speaker, text) tuples
        """
        if mode == "paragraph":
            return self._segment_by_paragraphs(text)
        elif mode == "dialogue":
            return self._segment_by_dialogue(text)
        else:  # auto mode
            return self._segment_auto(text)
    
    def _segment_by_paragraphs(self, text: str) -> List[Tuple[str, str]]:
        """Segment by paragraphs, alternating speakers."""
        paragraphs = [p.strip() for p in text.split('\n\n') if p.strip()]
        segments = []
        
        for i, paragraph in enumerate(paragraphs):
            speaker = self.speakers[i % len(self.speakers)]
            segments.append((speaker, paragraph))
        
        return segments
    
    def _segment_by_dialogue(self, text: str) -> List[Tuple[str, str]]:
        """Segment by detecting dialogue patterns."""
        lines = text.split('\n')
        segments = []
        current_segment = []
        # Start with the first speaker in the list
        current_speaker = self.speakers[0] 
        
        for line in lines:
            line = line.strip()
            if not line:
                continue
            
            # Check for dialogue markers
            if (line.startswith('"') or line.startswith("'") or 
                line.startswith('-') or line.startswith('β€”')):
                
                # Save previous segment
                if current_segment:
                    segments.append((current_speaker, ' '.join(current_segment)))
                
                # Switch speaker and start new segment
                self.current_speaker_index = (self.current_speaker_index + 1) % len(self.speakers)
                current_speaker = self.speakers[self.current_speaker_index]
                current_segment = [line]
            else:
                current_segment.append(line)
        
        # Add final segment
        if current_segment:
            segments.append((current_speaker, ' '.join(current_segment)))
        
        return segments
    
    def _segment_auto(self, text: str) -> List[Tuple[str, str]]:
        """Automatic segmentation using multiple heuristics."""
        segments = []
        
        paragraphs = [p.strip() for p in text.split('\n\n') if p.strip()]
        
        if len(paragraphs) > 1:
            return self._segment_by_paragraphs(text)
        
        sentences = self._split_into_sentences(text)
        if len(sentences) > 10:
            return self._segment_by_sentence_groups(sentences)
        
        return self._segment_simple(text)
    
    def _split_into_sentences(self, text: str) -> List[str]:
        """Split text into sentences."""
        # Simple sentence splitting
        # Use a more robust regex to avoid splitting on abbreviations (e.g., "Mr.")
        # This is a common simple improvement, though full NLP libraries are best for complex cases.
        sentences = re.split(r'(?<=[.!?])\s+', text) # Split after . ! ? followed by space
        return [s.strip() for s in sentences if s.strip()]
    
    def _segment_by_sentence_groups(self, sentences: List[str]) -> List[Tuple[str, str]]:
        """Group sentences and assign to different speakers."""
        segments = []
        group_size = max(2, len(sentences) // 8) 
        
        for i in range(0, len(sentences), group_size):
            group = sentences[i:i + group_size]
            speaker = self.speakers[i // group_size % len(self.speakers)]
            text_segment = ' '.join(group) # No need to add '.' if already present from sentence splitting
            segments.append((speaker, text_segment))
        
        return segments
    
    def _segment_simple(self, text: str) -> List[Tuple[str, str]]:
        """Simple segmentation for short texts."""
        words = text.split()
        total_words = len(words)
        
        if total_words < 50:
            return [(self.speakers[0], text)] # Assign to S1
        
        num_segments = min(len(self.speakers), max(2, total_words // 100)) # Limit segments by available speakers
        segment_size = total_words // num_segments
        
        segments = []
        for i in range(num_segments):
            start_idx = i * segment_size
            end_idx = (i + 1) * segment_size if i < num_segments - 1 else total_words
            
            segment_words = words[start_idx:end_idx]
            segment_text = ' '.join(segment_words)
            speaker = self.speakers[i % len(self.speakers)]
            
            segments.append((speaker, segment_text))
        
        return segments