|
from typing import Dict, Any, List |
|
from datetime import timedelta |
|
|
|
def format_time(seconds: float) -> str: |
|
"""Format seconds as HH:MM:SS.""" |
|
return str(timedelta(seconds=int(seconds))) |
|
|
|
def format_response(state: Dict[str, Any], with_diarization: bool = False) -> Dict[str, Any]: |
|
""" |
|
Format the shared state into a client-friendly response. |
|
|
|
Args: |
|
state: Current shared state dictionary |
|
with_diarization: Whether to include diarization formatting |
|
|
|
Returns: |
|
Formatted response dictionary ready to send to client |
|
""" |
|
tokens = state["tokens"] |
|
buffer_transcription = state["buffer_transcription"] |
|
buffer_diarization = state["buffer_diarization"] |
|
end_attributed_speaker = state["end_attributed_speaker"] |
|
remaining_time_transcription = state["remaining_time_transcription"] |
|
remaining_time_diarization = state["remaining_time_diarization"] |
|
sep = state["sep"] |
|
|
|
|
|
if not tokens: |
|
return { |
|
"lines": [{ |
|
"speaker": 1, |
|
"text": "", |
|
"beg": format_time(0), |
|
"end": format_time(0), |
|
"diff": 0 |
|
}], |
|
"buffer_transcription": buffer_transcription, |
|
"buffer_diarization": buffer_diarization, |
|
"remaining_time_transcription": remaining_time_transcription, |
|
"remaining_time_diarization": remaining_time_diarization |
|
} |
|
|
|
|
|
previous_speaker = -1 |
|
lines = [] |
|
last_end_diarized = 0 |
|
undiarized_text = [] |
|
|
|
for token in tokens: |
|
speaker = token.speaker |
|
|
|
|
|
if with_diarization: |
|
if (speaker == -1 or speaker == 0) and token.end >= end_attributed_speaker: |
|
undiarized_text.append(token.text) |
|
continue |
|
elif (speaker == -1 or speaker == 0) and token.end < end_attributed_speaker: |
|
speaker = previous_speaker |
|
|
|
if speaker not in [-1, 0]: |
|
last_end_diarized = max(token.end, last_end_diarized) |
|
|
|
|
|
if speaker != previous_speaker or not lines: |
|
lines.append({ |
|
"speaker": speaker, |
|
"text": token.text, |
|
"beg": format_time(token.start), |
|
"end": format_time(token.end), |
|
"diff": round(token.end - last_end_diarized, 2) |
|
}) |
|
previous_speaker = speaker |
|
elif token.text: |
|
lines[-1]["text"] += sep + token.text |
|
lines[-1]["end"] = format_time(token.end) |
|
lines[-1]["diff"] = round(token.end - last_end_diarized, 2) |
|
|
|
|
|
if undiarized_text: |
|
combined_buffer = sep.join(undiarized_text) |
|
if buffer_transcription: |
|
combined_buffer += sep + buffer_transcription |
|
buffer_diarization = combined_buffer |
|
|
|
return { |
|
"lines": lines, |
|
"buffer_transcription": buffer_transcription, |
|
"buffer_diarization": buffer_diarization, |
|
"remaining_time_transcription": remaining_time_transcription, |
|
"remaining_time_diarization": remaining_time_diarization |
|
} |