Spaces:
Running
Running
File size: 5,284 Bytes
1f6c376 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 |
# segmenter.py - Text segmentation and speaker assignment
import re
from typing import List, Tuple
import logging
logger = logging.getLogger(__name__)
class TextSegmenter:
def __init__(self):
# Changed speakers to Nari DIA's expected tags
self.speakers = ["S1", "S2"]
self.current_speaker_index = 0
def segment_and_assign_speakers(
self,
text: str,
mode: str = "auto"
) -> List[Tuple[str, str]]:
"""
Segment text and assign speakers.
Args:
text: Input text to segment
mode: Segmentation mode ("auto", "paragraph", "dialogue")
Returns:
List of (speaker, text) tuples
"""
if mode == "paragraph":
return self._segment_by_paragraphs(text)
elif mode == "dialogue":
return self._segment_by_dialogue(text)
else: # auto mode
return self._segment_auto(text)
def _segment_by_paragraphs(self, text: str) -> List[Tuple[str, str]]:
"""Segment by paragraphs, alternating speakers."""
paragraphs = [p.strip() for p in text.split('\n\n') if p.strip()]
segments = []
for i, paragraph in enumerate(paragraphs):
speaker = self.speakers[i % len(self.speakers)]
segments.append((speaker, paragraph))
return segments
def _segment_by_dialogue(self, text: str) -> List[Tuple[str, str]]:
"""Segment by detecting dialogue patterns."""
lines = text.split('\n')
segments = []
current_segment = []
# Start with the first speaker in the list
current_speaker = self.speakers[0]
for line in lines:
line = line.strip()
if not line:
continue
# Check for dialogue markers
if (line.startswith('"') or line.startswith("'") or
line.startswith('-') or line.startswith('β')):
# Save previous segment
if current_segment:
segments.append((current_speaker, ' '.join(current_segment)))
# Switch speaker and start new segment
self.current_speaker_index = (self.current_speaker_index + 1) % len(self.speakers)
current_speaker = self.speakers[self.current_speaker_index]
current_segment = [line]
else:
current_segment.append(line)
# Add final segment
if current_segment:
segments.append((current_speaker, ' '.join(current_segment)))
return segments
def _segment_auto(self, text: str) -> List[Tuple[str, str]]:
"""Automatic segmentation using multiple heuristics."""
segments = []
paragraphs = [p.strip() for p in text.split('\n\n') if p.strip()]
if len(paragraphs) > 1:
return self._segment_by_paragraphs(text)
sentences = self._split_into_sentences(text)
if len(sentences) > 10:
return self._segment_by_sentence_groups(sentences)
return self._segment_simple(text)
def _split_into_sentences(self, text: str) -> List[str]:
"""Split text into sentences."""
# Simple sentence splitting
# Use a more robust regex to avoid splitting on abbreviations (e.g., "Mr.")
# This is a common simple improvement, though full NLP libraries are best for complex cases.
sentences = re.split(r'(?<=[.!?])\s+', text) # Split after . ! ? followed by space
return [s.strip() for s in sentences if s.strip()]
def _segment_by_sentence_groups(self, sentences: List[str]) -> List[Tuple[str, str]]:
"""Group sentences and assign to different speakers."""
segments = []
group_size = max(2, len(sentences) // 8)
for i in range(0, len(sentences), group_size):
group = sentences[i:i + group_size]
speaker = self.speakers[i // group_size % len(self.speakers)]
text_segment = ' '.join(group) # No need to add '.' if already present from sentence splitting
segments.append((speaker, text_segment))
return segments
def _segment_simple(self, text: str) -> List[Tuple[str, str]]:
"""Simple segmentation for short texts."""
words = text.split()
total_words = len(words)
if total_words < 50:
return [(self.speakers[0], text)] # Assign to S1
num_segments = min(len(self.speakers), max(2, total_words // 100)) # Limit segments by available speakers
segment_size = total_words // num_segments
segments = []
for i in range(num_segments):
start_idx = i * segment_size
end_idx = (i + 1) * segment_size if i < num_segments - 1 else total_words
segment_words = words[start_idx:end_idx]
segment_text = ' '.join(segment_words)
speaker = self.speakers[i % len(self.speakers)]
segments.append((speaker, segment_text))
return segments
|