# segmenter.py - Text segmentation and speaker assignment import re from typing import List, Tuple import logging logger = logging.getLogger(__name__) class TextSegmenter: def __init__(self): # Changed speakers to Nari DIA's expected tags self.speakers = ["S1", "S2"] self.current_speaker_index = 0 def segment_and_assign_speakers( self, text: str, mode: str = "auto" ) -> List[Tuple[str, str]]: """ Segment text and assign speakers. Args: text: Input text to segment mode: Segmentation mode ("auto", "paragraph", "dialogue") Returns: List of (speaker, text) tuples """ if mode == "paragraph": return self._segment_by_paragraphs(text) elif mode == "dialogue": return self._segment_by_dialogue(text) else: # auto mode return self._segment_auto(text) def _segment_by_paragraphs(self, text: str) -> List[Tuple[str, str]]: """Segment by paragraphs, alternating speakers.""" paragraphs = [p.strip() for p in text.split('\n\n') if p.strip()] segments = [] for i, paragraph in enumerate(paragraphs): speaker = self.speakers[i % len(self.speakers)] segments.append((speaker, paragraph)) return segments def _segment_by_dialogue(self, text: str) -> List[Tuple[str, str]]: """Segment by detecting dialogue patterns.""" lines = text.split('\n') segments = [] current_segment = [] # Start with the first speaker in the list current_speaker = self.speakers[0] for line in lines: line = line.strip() if not line: continue # Check for dialogue markers if (line.startswith('"') or line.startswith("'") or line.startswith('-') or line.startswith('—')): # Save previous segment if current_segment: segments.append((current_speaker, ' '.join(current_segment))) # Switch speaker and start new segment self.current_speaker_index = (self.current_speaker_index + 1) % len(self.speakers) current_speaker = self.speakers[self.current_speaker_index] current_segment = [line] else: current_segment.append(line) # Add final segment if current_segment: segments.append((current_speaker, ' '.join(current_segment))) return segments def _segment_auto(self, text: str) -> List[Tuple[str, str]]: """Automatic segmentation using multiple heuristics.""" segments = [] paragraphs = [p.strip() for p in text.split('\n\n') if p.strip()] if len(paragraphs) > 1: return self._segment_by_paragraphs(text) sentences = self._split_into_sentences(text) if len(sentences) > 10: return self._segment_by_sentence_groups(sentences) return self._segment_simple(text) def _split_into_sentences(self, text: str) -> List[str]: """Split text into sentences.""" # Simple sentence splitting # Use a more robust regex to avoid splitting on abbreviations (e.g., "Mr.") # This is a common simple improvement, though full NLP libraries are best for complex cases. sentences = re.split(r'(?<=[.!?])\s+', text) # Split after . ! ? followed by space return [s.strip() for s in sentences if s.strip()] def _segment_by_sentence_groups(self, sentences: List[str]) -> List[Tuple[str, str]]: """Group sentences and assign to different speakers.""" segments = [] group_size = max(2, len(sentences) // 8) for i in range(0, len(sentences), group_size): group = sentences[i:i + group_size] speaker = self.speakers[i // group_size % len(self.speakers)] text_segment = ' '.join(group) # No need to add '.' if already present from sentence splitting segments.append((speaker, text_segment)) return segments def _segment_simple(self, text: str) -> List[Tuple[str, str]]: """Simple segmentation for short texts.""" words = text.split() total_words = len(words) if total_words < 50: return [(self.speakers[0], text)] # Assign to S1 num_segments = min(len(self.speakers), max(2, total_words // 100)) # Limit segments by available speakers segment_size = total_words // num_segments segments = [] for i in range(num_segments): start_idx = i * segment_size end_idx = (i + 1) * segment_size if i < num_segments - 1 else total_words segment_words = words[start_idx:end_idx] segment_text = ' '.join(segment_words) speaker = self.speakers[i % len(self.speakers)] segments.append((speaker, segment_text)) return segments