File size: 13,717 Bytes
5ef7cfa 5bd5a3c 5ef7cfa bb3e03f 5ef7cfa 881ca99 b1b8168 f2002a0 5ef7cfa dd69770 5ef7cfa b1b8168 c9a4bcd b1b8168 8d65d47 b1b8168 5ef7cfa c9a4bcd 5ef7cfa f2002a0 5ef7cfa 4ae0a54 ed5f9d1 5ef7cfa 4ae0a54 5ef7cfa 8d7811f 934094f 5ef7cfa c9a4bcd dd69770 c9a4bcd dd69770 c9a4bcd 5ef7cfa |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 |
import json
import time
import logging
import pyaudio
import requests
import traceback
import numpy as np
from queue import Queue
from typing import Optional, Union
from RealtimeTTS.engines import BaseEngine, TimingInfo
# Default configuration values
DEFAULT_API_URL = "http://127.0.0.1:1234"
DEFAULT_HEADERS = {"Content-Type": "application/json"}
DEFAULT_MODEL = "SebastianBodza/Kartoffel_Orpheus-3B_german_synthetic-v0.1"
DEFAULT_VOICE = "Martin"
STOP_SEQUENCE = "<custom_token_2>"
SAMPLE_RATE = 24000 # Specific sample rate for Orpheus
# Special token definitions for prompt formatting and token decoding
START_TOKEN_ID = 128259
END_TOKEN_IDS = [128009, 128260, 128261, 128257]
CUSTOM_TOKEN_PREFIX = "<custom_token_"
class OrpheusVoice:
def __init__(self, name: str, gender: str | None = None):
self.name = name
self.gender = gender # optional, falls du es anzeigen willst
class OrpheusEngine(BaseEngine):
"""
Real-time Text-to-Speech (TTS) engine for the Orpheus model via LM Studio API.
This engine supports real-time token generation, audio synthesis, and voice configuration.
_SPEAKERS = [
# männlich
OrpheusVoice("Jakob", "m"),
OrpheusVoice("Anton", "m"),
OrpheusVoice("Julian", "m"),
OrpheusVoice("Jan", "m"),
OrpheusVoice("Alexander", "m"),
OrpheusVoice("Emil", "m"),
OrpheusVoice("Ben", "m"),
OrpheusVoice("Elias", "m"),
OrpheusVoice("Felix", "m"),
OrpheusVoice("Jonas", "m"),
OrpheusVoice("Noah", "m"),
OrpheusVoice("Maximilian", "m"),
# weiblich
OrpheusVoice("Sophie", "f"),
OrpheusVoice("Marie", "f"),
OrpheusVoice("Mia", "f"),
OrpheusVoice("Maria", "f"),
OrpheusVoice("Sophia", "f"),
OrpheusVoice("Lina", "f"),
OrpheusVoice("Lea", "f"),
]
"""
_SPEAKERS = [
# männlich
OrpheusVoice("Martin", "m"),
OrpheusVoice("Luca", "m"),
# weiblich
OrpheusVoice("Anne", "f"),
OrpheusVoice("Emma", "f"),
]
def __init__(
self,
api_url: str = DEFAULT_API_URL,
model: str = DEFAULT_MODEL,
headers: dict = DEFAULT_HEADERS,
voice: Optional[OrpheusVoice] = None,
temperature: float = 0.6,
top_p: float = 0.9,
max_tokens: int = 1200,
repetition_penalty: float = 1.1,
debug: bool = False
):
"""
Initialize the Orpheus TTS engine with the given parameters.
Args:
api_url (str): Endpoint URL for the LM Studio API.
model (str): Model name to use for synthesis.
headers (dict): HTTP headers for API requests.
voice (Optional[OrpheusVoice]): OrpheusVoice configuration. Defaults to DEFAULT_VOICE.
temperature (float): Sampling temperature (0-1) for text generation.
top_p (float): Top-p sampling parameter for controlling diversity.
max_tokens (int): Maximum tokens to generate per API request.
repetition_penalty (float): Penalty factor for repeated phrases.
debug (bool): Flag to enable debug output.
"""
super().__init__()
self.api_url = api_url
self.model = model
self.headers = headers
self.voice = voice or OrpheusVoice(DEFAULT_VOICE)
self.temperature = temperature
self.top_p = top_p
self.max_tokens = max_tokens
self.repetition_penalty = repetition_penalty
self.debug = debug
self.queue = Queue()
self.post_init()
def post_init(self):
"""Set up additional engine attributes."""
self.engine_name = "orpheus"
def get_stream_info(self):
"""
Retrieve PyAudio stream configuration.
Returns:
tuple: Format, channel count, and sample rate for PyAudio.
"""
return pyaudio.paInt16, 1, SAMPLE_RATE
def synthesize(self, text: str) -> bool:
"""
Convert text to speech and stream audio data.
Args:
text (str): The input text to be synthesized.
Returns:
bool: True if synthesis was successful, False otherwise.
"""
super().synthesize(text)
try:
# Process tokens and put generated audio chunks into the queue
for audio_chunk in self._token_decoder(self._generate_tokens(text)):
# bail out immediately if someone called .stop()
if self.stop_synthesis_event.is_set():
logging.info("OrpheusEngine: synthesis stopped by user")
return False
print(f"Audio chunk size: {len(audio_chunk)}")
self.queue.put(audio_chunk)
return True
except Exception as e:
traceback.print_exc()
logging.error(f"Synthesis error: {e}")
return False
def synthesize(self, text: str) -> bool:
"""
Convert text to speech and stream audio data via Orpheus.
Drops initial and trailing near-silent chunks.
"""
super().synthesize(text)
try:
for audio_chunk in self._token_decoder(self._generate_tokens(text)):
# bail out if user called .stop()
if self.stop_synthesis_event.is_set():
logging.info("OrpheusEngine: synthesis stopped by user")
return False
# forward this chunk
self.queue.put(audio_chunk)
return True
except Exception as e:
traceback.print_exc()
logging.error(f"Synthesis error: {e}")
return False
def _generate_tokens(self, prompt: str):
"""
Generate a token stream using the LM Studio API.
Args:
prompt (str): The input text prompt.
Yields:
str: Each token's text as it is received from the API.
"""
logging.debug(f"Generating tokens for prompt: {prompt}")
formatted_prompt = self._format_prompt(prompt)
payload = {
"model": self.model,
"messages": [{"role": "user", "content": f"<|audio|>{voice}: {text}<|eot_id|>"}],
"max_tokens": self.max_tokens,
"temperature": self.temperature,
"top_p": self.top_p,
"frequency_penalty": self.repetition_penalty, # optional,
"stream": True,
"skip_special_tokens": False
}
try:
logging.debug(f"Requesting API URL: {self.api_url} with payload: {payload} and headers: {self.headers}")
response = requests.post(
f"{self.api_url}/v1/chat/completions", # <—— neuer Pfad
headers=self.headers,
json=payload,
stream=True
)
response.raise_for_status()
token_counter = 0
start_time = time.time() # Start timing token generation
for line in response.iter_lines():
# stop on demand
if self.stop_synthesis_event.is_set():
logging.debug("OrpheusEngine: token generation aborted")
break
if line:
line = line.decode('utf-8')
if line.startswith('data: '):
data_str = line[6:]
if data_str.strip() == '[DONE]':
break
try:
data = json.loads(data_str)
if 'choices' in data and data['choices']:
delta = data["choices"][0]["delta"]
token_text = delta.get("content", "")
if "<custom_token_" in token_text:
logging.debug(f"SNAC-frame: {token_text[:40]}")
if token_text:
token_counter += 1
# Print the time it took to get the first token
if token_counter == 1:
elapsed = time.time() - start_time
logging.info(f"Time to first token: {elapsed:.2f} seconds")
yield token_text
except json.JSONDecodeError as e:
logging.error(f"Error decoding JSON: {e}")
continue
except requests.RequestException as e:
logging.error(f"API request failed: {e}")
def _format_prompt(self, prompt: str) -> str:
"""
Format the text prompt with special tokens required by Orpheus.
Args:
prompt (str): The raw text prompt.
Returns:
str: The formatted prompt including voice and termination token.
"""
return f"<|audio|>{self.voice.name}: {prompt}<|eot_id|>"
def _token_decoder(self, token_gen):
"""
Decode tokens from the generator and convert them into audio samples.
This method aggregates tokens in a buffer and converts them into audio chunks
once enough tokens have been collected.
Args:
token_gen: Generator yielding token strings.
Yields:
Audio samples ready to be streamed.
"""
buffer = []
count = 0
logging.debug("Starting token decoding from token generator.")
for token_text in token_gen:
# bail out if stop was requested
if self.stop_synthesis_event.is_set():
logging.debug("OrpheusEngine: token decoding aborted")
break
token = self.turn_token_into_id(token_text, count)
if token is not None and token > 0:
buffer.append(token)
count += 1
# Process every 7 tokens after an initial threshold
if count % 7 == 0 and count > 27:
buffer_to_proc = buffer[-28:]
audio_samples = self._convert_buffer(buffer_to_proc, count)
if audio_samples is not None:
yield audio_samples
def turn_token_into_id(self, token_string: str, index: int) -> Optional[int]:
"""
Convert a token string to a numeric ID for audio processing.
The conversion takes into account the custom token prefix and an index-based offset.
Args:
token_string (str): The token text.
index (int): The current token index.
Returns:
Optional[int]: The numeric token ID or None if conversion fails.
"""
token_string = token_string.strip()
last_token_start = token_string.rfind(CUSTOM_TOKEN_PREFIX)
if last_token_start == -1:
return None
last_token = token_string[last_token_start:]
if last_token.startswith(CUSTOM_TOKEN_PREFIX) and last_token.endswith(">"):
try:
number_str = last_token[14:-1]
token_id = int(number_str) - 10 - ((index % 7) * 4096)
return token_id
except ValueError:
return None
else:
return None
def _convert_buffer(self, multiframe, count: int):
"""
Convert a buffer of token frames into audio samples.
This method uses an external decoder to convert the collected token frames.
Args:
multiframe: List of token IDs to be converted.
count (int): The current token count (used for conversion logic).
Returns:
Converted audio samples if successful; otherwise, None.
"""
try:
from .orpheus_decoder import convert_to_audio as orpheus_convert_to_audio
converted = orpheus_convert_to_audio(multiframe, count)
if converted is None:
logging.warning("Conversion returned None.")
return converted
except Exception as e:
logging.error(f"Failed to convert buffer to audio: {e}")
logging.info("Returning None after failed conversion.")
return None
def get_voices(self): # FastAPI /voices-Route
return self._SPEAKERS
def set_voice(self, voice_name: str) -> None:
if voice_name not in [v.name for v in self._SPEAKERS]:
raise ValueError(f"Unknown Orpheus speaker '{voice_name}'")
self.voice = OrpheusVoice(voice_name)
def set_voice_parameters(self, **kwargs):
"""
Update voice generation parameters.
Valid parameters include 'temperature', 'top_p', 'max_tokens', and 'repetition_penalty'.
Args:
**kwargs: Arbitrary keyword arguments for valid voice parameters.
"""
valid_params = ['temperature', 'top_p', 'max_tokens', 'repetition_penalty']
for param, value in kwargs.items():
if param in valid_params:
setattr(self, param, value)
elif self.debug:
logging.warning(f"Ignoring invalid parameter: {param}")
def __del__(self):
"""
Destructor to clean up resources.
Puts a None into the queue to signal termination of audio processing.
"""
self.queue.put(None) |