|
import json |
|
import time |
|
import logging |
|
import pyaudio |
|
import requests |
|
import traceback |
|
import numpy as np |
|
from queue import Queue |
|
from typing import Optional, Union |
|
from RealtimeTTS.engines import BaseEngine, TimingInfo |
|
|
|
|
|
DEFAULT_API_URL = "http://127.0.0.1:1234" |
|
DEFAULT_HEADERS = {"Content-Type": "application/json"} |
|
DEFAULT_MODEL = "SebastianBodza/Kartoffel_Orpheus-3B_german_synthetic-v0.1" |
|
DEFAULT_VOICE = "Martin" |
|
STOP_SEQUENCE = "<custom_token_2>" |
|
SAMPLE_RATE = 24000 |
|
|
|
|
|
START_TOKEN_ID = 128259 |
|
END_TOKEN_IDS = [128009, 128260, 128261, 128257] |
|
CUSTOM_TOKEN_PREFIX = "<custom_token_" |
|
|
|
class OrpheusVoice: |
|
def __init__(self, name: str, gender: str | None = None): |
|
self.name = name |
|
self.gender = gender |
|
|
|
|
|
|
|
class OrpheusEngine(BaseEngine): |
|
""" |
|
Real-time Text-to-Speech (TTS) engine for the Orpheus model via LM Studio API. |
|
|
|
This engine supports real-time token generation, audio synthesis, and voice configuration. |
|
|
|
_SPEAKERS = [ |
|
# männlich |
|
OrpheusVoice("Jakob", "m"), |
|
OrpheusVoice("Anton", "m"), |
|
OrpheusVoice("Julian", "m"), |
|
OrpheusVoice("Jan", "m"), |
|
OrpheusVoice("Alexander", "m"), |
|
OrpheusVoice("Emil", "m"), |
|
OrpheusVoice("Ben", "m"), |
|
OrpheusVoice("Elias", "m"), |
|
OrpheusVoice("Felix", "m"), |
|
OrpheusVoice("Jonas", "m"), |
|
OrpheusVoice("Noah", "m"), |
|
OrpheusVoice("Maximilian", "m"), |
|
# weiblich |
|
OrpheusVoice("Sophie", "f"), |
|
OrpheusVoice("Marie", "f"), |
|
OrpheusVoice("Mia", "f"), |
|
OrpheusVoice("Maria", "f"), |
|
OrpheusVoice("Sophia", "f"), |
|
OrpheusVoice("Lina", "f"), |
|
OrpheusVoice("Lea", "f"), |
|
] |
|
""" |
|
_SPEAKERS = [ |
|
|
|
OrpheusVoice("Martin", "m"), |
|
OrpheusVoice("Luca", "m"), |
|
|
|
OrpheusVoice("Anne", "f"), |
|
OrpheusVoice("Emma", "f"), |
|
] |
|
def __init__( |
|
self, |
|
api_url: str = DEFAULT_API_URL, |
|
model: str = DEFAULT_MODEL, |
|
headers: dict = DEFAULT_HEADERS, |
|
voice: Optional[OrpheusVoice] = None, |
|
temperature: float = 0.6, |
|
top_p: float = 0.9, |
|
max_tokens: int = 1200, |
|
repetition_penalty: float = 1.1, |
|
debug: bool = False |
|
): |
|
""" |
|
Initialize the Orpheus TTS engine with the given parameters. |
|
|
|
Args: |
|
api_url (str): Endpoint URL for the LM Studio API. |
|
model (str): Model name to use for synthesis. |
|
headers (dict): HTTP headers for API requests. |
|
voice (Optional[OrpheusVoice]): OrpheusVoice configuration. Defaults to DEFAULT_VOICE. |
|
temperature (float): Sampling temperature (0-1) for text generation. |
|
top_p (float): Top-p sampling parameter for controlling diversity. |
|
max_tokens (int): Maximum tokens to generate per API request. |
|
repetition_penalty (float): Penalty factor for repeated phrases. |
|
debug (bool): Flag to enable debug output. |
|
""" |
|
super().__init__() |
|
self.api_url = api_url |
|
self.model = model |
|
self.headers = headers |
|
self.voice = voice or OrpheusVoice(DEFAULT_VOICE) |
|
self.temperature = temperature |
|
self.top_p = top_p |
|
self.max_tokens = max_tokens |
|
self.repetition_penalty = repetition_penalty |
|
self.debug = debug |
|
self.queue = Queue() |
|
self.post_init() |
|
|
|
def post_init(self): |
|
"""Set up additional engine attributes.""" |
|
self.engine_name = "orpheus" |
|
|
|
def get_stream_info(self): |
|
""" |
|
Retrieve PyAudio stream configuration. |
|
|
|
Returns: |
|
tuple: Format, channel count, and sample rate for PyAudio. |
|
""" |
|
return pyaudio.paInt16, 1, SAMPLE_RATE |
|
|
|
def synthesize(self, text: str) -> bool: |
|
""" |
|
Convert text to speech and stream audio data. |
|
|
|
Args: |
|
text (str): The input text to be synthesized. |
|
|
|
Returns: |
|
bool: True if synthesis was successful, False otherwise. |
|
""" |
|
super().synthesize(text) |
|
|
|
try: |
|
|
|
for audio_chunk in self._token_decoder(self._generate_tokens(text)): |
|
|
|
if self.stop_synthesis_event.is_set(): |
|
logging.info("OrpheusEngine: synthesis stopped by user") |
|
return False |
|
print(f"Audio chunk size: {len(audio_chunk)}") |
|
self.queue.put(audio_chunk) |
|
return True |
|
except Exception as e: |
|
traceback.print_exc() |
|
logging.error(f"Synthesis error: {e}") |
|
return False |
|
|
|
def synthesize(self, text: str) -> bool: |
|
""" |
|
Convert text to speech and stream audio data via Orpheus. |
|
Drops initial and trailing near-silent chunks. |
|
""" |
|
super().synthesize(text) |
|
|
|
try: |
|
for audio_chunk in self._token_decoder(self._generate_tokens(text)): |
|
|
|
if self.stop_synthesis_event.is_set(): |
|
logging.info("OrpheusEngine: synthesis stopped by user") |
|
return False |
|
|
|
|
|
self.queue.put(audio_chunk) |
|
|
|
return True |
|
|
|
except Exception as e: |
|
traceback.print_exc() |
|
logging.error(f"Synthesis error: {e}") |
|
return False |
|
|
|
|
|
def _generate_tokens(self, prompt: str): |
|
""" |
|
Generate a token stream using the LM Studio API. |
|
|
|
Args: |
|
prompt (str): The input text prompt. |
|
|
|
Yields: |
|
str: Each token's text as it is received from the API. |
|
""" |
|
logging.debug(f"Generating tokens for prompt: {prompt}") |
|
formatted_prompt = self._format_prompt(prompt) |
|
|
|
payload = { |
|
"model": self.model, |
|
"messages": [{"role": "user", "content": f"<|audio|>{voice}: {text}<|eot_id|>"}], |
|
"max_tokens": self.max_tokens, |
|
"temperature": self.temperature, |
|
"top_p": self.top_p, |
|
"frequency_penalty": self.repetition_penalty, |
|
"stream": True, |
|
"skip_special_tokens": False |
|
} |
|
|
|
try: |
|
logging.debug(f"Requesting API URL: {self.api_url} with payload: {payload} and headers: {self.headers}") |
|
response = requests.post( |
|
f"{self.api_url}/v1/chat/completions", |
|
headers=self.headers, |
|
json=payload, |
|
stream=True |
|
) |
|
response.raise_for_status() |
|
|
|
token_counter = 0 |
|
start_time = time.time() |
|
for line in response.iter_lines(): |
|
|
|
if self.stop_synthesis_event.is_set(): |
|
logging.debug("OrpheusEngine: token generation aborted") |
|
break |
|
if line: |
|
line = line.decode('utf-8') |
|
if line.startswith('data: '): |
|
data_str = line[6:] |
|
if data_str.strip() == '[DONE]': |
|
break |
|
|
|
try: |
|
data = json.loads(data_str) |
|
if 'choices' in data and data['choices']: |
|
delta = data["choices"][0]["delta"] |
|
token_text = delta.get("content", "") |
|
if "<custom_token_" in token_text: |
|
logging.debug(f"SNAC-frame: {token_text[:40]}") |
|
if token_text: |
|
token_counter += 1 |
|
|
|
if token_counter == 1: |
|
elapsed = time.time() - start_time |
|
logging.info(f"Time to first token: {elapsed:.2f} seconds") |
|
yield token_text |
|
except json.JSONDecodeError as e: |
|
logging.error(f"Error decoding JSON: {e}") |
|
continue |
|
|
|
except requests.RequestException as e: |
|
logging.error(f"API request failed: {e}") |
|
|
|
def _format_prompt(self, prompt: str) -> str: |
|
""" |
|
Format the text prompt with special tokens required by Orpheus. |
|
|
|
Args: |
|
prompt (str): The raw text prompt. |
|
|
|
Returns: |
|
str: The formatted prompt including voice and termination token. |
|
""" |
|
return f"<|audio|>{self.voice.name}: {prompt}<|eot_id|>" |
|
|
|
def _token_decoder(self, token_gen): |
|
""" |
|
Decode tokens from the generator and convert them into audio samples. |
|
|
|
This method aggregates tokens in a buffer and converts them into audio chunks |
|
once enough tokens have been collected. |
|
|
|
Args: |
|
token_gen: Generator yielding token strings. |
|
|
|
Yields: |
|
Audio samples ready to be streamed. |
|
""" |
|
buffer = [] |
|
count = 0 |
|
|
|
logging.debug("Starting token decoding from token generator.") |
|
for token_text in token_gen: |
|
|
|
if self.stop_synthesis_event.is_set(): |
|
logging.debug("OrpheusEngine: token decoding aborted") |
|
break |
|
token = self.turn_token_into_id(token_text, count) |
|
if token is not None and token > 0: |
|
buffer.append(token) |
|
count += 1 |
|
|
|
|
|
if count % 7 == 0 and count > 27: |
|
buffer_to_proc = buffer[-28:] |
|
audio_samples = self._convert_buffer(buffer_to_proc, count) |
|
if audio_samples is not None: |
|
yield audio_samples |
|
|
|
def turn_token_into_id(self, token_string: str, index: int) -> Optional[int]: |
|
""" |
|
Convert a token string to a numeric ID for audio processing. |
|
|
|
The conversion takes into account the custom token prefix and an index-based offset. |
|
|
|
Args: |
|
token_string (str): The token text. |
|
index (int): The current token index. |
|
|
|
Returns: |
|
Optional[int]: The numeric token ID or None if conversion fails. |
|
""" |
|
token_string = token_string.strip() |
|
last_token_start = token_string.rfind(CUSTOM_TOKEN_PREFIX) |
|
|
|
if last_token_start == -1: |
|
return None |
|
|
|
last_token = token_string[last_token_start:] |
|
|
|
if last_token.startswith(CUSTOM_TOKEN_PREFIX) and last_token.endswith(">"): |
|
try: |
|
number_str = last_token[14:-1] |
|
token_id = int(number_str) - 10 - ((index % 7) * 4096) |
|
return token_id |
|
except ValueError: |
|
return None |
|
else: |
|
return None |
|
|
|
def _convert_buffer(self, multiframe, count: int): |
|
""" |
|
Convert a buffer of token frames into audio samples. |
|
|
|
This method uses an external decoder to convert the collected token frames. |
|
|
|
Args: |
|
multiframe: List of token IDs to be converted. |
|
count (int): The current token count (used for conversion logic). |
|
|
|
Returns: |
|
Converted audio samples if successful; otherwise, None. |
|
""" |
|
try: |
|
from .orpheus_decoder import convert_to_audio as orpheus_convert_to_audio |
|
converted = orpheus_convert_to_audio(multiframe, count) |
|
if converted is None: |
|
logging.warning("Conversion returned None.") |
|
return converted |
|
except Exception as e: |
|
logging.error(f"Failed to convert buffer to audio: {e}") |
|
logging.info("Returning None after failed conversion.") |
|
return None |
|
|
|
def get_voices(self): |
|
return self._SPEAKERS |
|
|
|
def set_voice(self, voice_name: str) -> None: |
|
if voice_name not in [v.name for v in self._SPEAKERS]: |
|
raise ValueError(f"Unknown Orpheus speaker '{voice_name}'") |
|
self.voice = OrpheusVoice(voice_name) |
|
|
|
def set_voice_parameters(self, **kwargs): |
|
""" |
|
Update voice generation parameters. |
|
|
|
Valid parameters include 'temperature', 'top_p', 'max_tokens', and 'repetition_penalty'. |
|
|
|
Args: |
|
**kwargs: Arbitrary keyword arguments for valid voice parameters. |
|
""" |
|
valid_params = ['temperature', 'top_p', 'max_tokens', 'repetition_penalty'] |
|
for param, value in kwargs.items(): |
|
if param in valid_params: |
|
setattr(self, param, value) |
|
elif self.debug: |
|
logging.warning(f"Ignoring invalid parameter: {param}") |
|
|
|
def __del__(self): |
|
""" |
|
Destructor to clean up resources. |
|
|
|
Puts a None into the queue to signal termination of audio processing. |
|
""" |
|
self.queue.put(None) |