dev-mode-realtts-orpheus / engines /orpheus_engine_BU.py
Tomtom84's picture
Rename engines/orpheus_engine.py to engines/orpheus_engine_BU.py
4708247 verified
import json
import time
import logging
import pyaudio
import requests
import traceback
import numpy as np
from queue import Queue
from typing import Optional, Union
from RealtimeTTS.engines import BaseEngine, TimingInfo
# Default configuration values
DEFAULT_API_URL = "http://127.0.0.1:1234"
DEFAULT_HEADERS = {"Content-Type": "application/json"}
DEFAULT_MODEL = "SebastianBodza/Kartoffel_Orpheus-3B_german_synthetic-v0.1"
DEFAULT_VOICE = "Martin"
STOP_SEQUENCE = "<custom_token_2>"
SAMPLE_RATE = 24000 # Specific sample rate for Orpheus
# Special token definitions for prompt formatting and token decoding
START_TOKEN_ID = 128259
END_TOKEN_IDS = [128009, 128260, 128261, 128257]
CUSTOM_TOKEN_PREFIX = "<custom_token_"
class OrpheusVoice:
def __init__(self, name: str, gender: str | None = None):
self.name = name
self.gender = gender # optional, falls du es anzeigen willst
class OrpheusEngine(BaseEngine):
"""
Real-time Text-to-Speech (TTS) engine for the Orpheus model via LM Studio API.
This engine supports real-time token generation, audio synthesis, and voice configuration.
_SPEAKERS = [
# männlich
OrpheusVoice("Jakob", "m"),
OrpheusVoice("Anton", "m"),
OrpheusVoice("Julian", "m"),
OrpheusVoice("Jan", "m"),
OrpheusVoice("Alexander", "m"),
OrpheusVoice("Emil", "m"),
OrpheusVoice("Ben", "m"),
OrpheusVoice("Elias", "m"),
OrpheusVoice("Felix", "m"),
OrpheusVoice("Jonas", "m"),
OrpheusVoice("Noah", "m"),
OrpheusVoice("Maximilian", "m"),
# weiblich
OrpheusVoice("Sophie", "f"),
OrpheusVoice("Marie", "f"),
OrpheusVoice("Mia", "f"),
OrpheusVoice("Maria", "f"),
OrpheusVoice("Sophia", "f"),
OrpheusVoice("Lina", "f"),
OrpheusVoice("Lea", "f"),
]
"""
_SPEAKERS = [
# männlich
OrpheusVoice("Martin", "m"),
OrpheusVoice("Luca", "m"),
# weiblich
OrpheusVoice("Anne", "f"),
OrpheusVoice("Emma", "f"),
]
def __init__(
self,
api_url: str = DEFAULT_API_URL,
model: str = DEFAULT_MODEL,
headers: dict = DEFAULT_HEADERS,
voice: Optional[OrpheusVoice] = None,
temperature: float = 0.6,
top_p: float = 0.9,
max_tokens: int = 1200,
repetition_penalty: float = 1.1,
debug: bool = False
):
"""
Initialize the Orpheus TTS engine with the given parameters.
Args:
api_url (str): Endpoint URL for the LM Studio API.
model (str): Model name to use for synthesis.
headers (dict): HTTP headers for API requests.
voice (Optional[OrpheusVoice]): OrpheusVoice configuration. Defaults to DEFAULT_VOICE.
temperature (float): Sampling temperature (0-1) for text generation.
top_p (float): Top-p sampling parameter for controlling diversity.
max_tokens (int): Maximum tokens to generate per API request.
repetition_penalty (float): Penalty factor for repeated phrases.
debug (bool): Flag to enable debug output.
"""
super().__init__()
self.api_url = api_url
self.model = model
self.headers = headers
self.voice = voice or OrpheusVoice(DEFAULT_VOICE)
self.temperature = temperature
self.top_p = top_p
self.max_tokens = max_tokens
self.repetition_penalty = repetition_penalty
self.debug = debug
self.queue = Queue()
self.post_init()
def post_init(self):
"""Set up additional engine attributes."""
self.engine_name = "orpheus"
def get_stream_info(self):
"""
Retrieve PyAudio stream configuration.
Returns:
tuple: Format, channel count, and sample rate for PyAudio.
"""
return pyaudio.paInt16, 1, SAMPLE_RATE
def synthesize(self, text: str) -> bool:
"""
Convert text to speech and stream audio data.
Args:
text (str): The input text to be synthesized.
Returns:
bool: True if synthesis was successful, False otherwise.
"""
super().synthesize(text)
try:
# Process tokens and put generated audio chunks into the queue
for audio_chunk in self._token_decoder(self._generate_tokens(text)):
# bail out immediately if someone called .stop()
if self.stop_synthesis_event.is_set():
logging.info("OrpheusEngine: synthesis stopped by user")
return False
print(f"Audio chunk size: {len(audio_chunk)}")
self.queue.put(audio_chunk)
return True
except Exception as e:
traceback.print_exc()
logging.error(f"Synthesis error: {e}")
return False
def synthesize(self, text: str) -> bool:
"""
Convert text to speech and stream audio data via Orpheus.
Drops initial and trailing near-silent chunks.
"""
super().synthesize(text)
try:
for audio_chunk in self._token_decoder(self._generate_tokens(text)):
# bail out if user called .stop()
if self.stop_synthesis_event.is_set():
logging.info("OrpheusEngine: synthesis stopped by user")
return False
# forward this chunk
self.queue.put(audio_chunk)
return True
except Exception as e:
traceback.print_exc()
logging.error(f"Synthesis error: {e}")
return False
def _generate_tokens(self, prompt: str):
"""
Generate a token stream using the LM Studio API.
Args:
prompt (str): The input text prompt.
Yields:
str: Each token's text as it is received from the API.
"""
logging.debug(f"Generating tokens for prompt: {prompt}")
formatted_prompt = self._format_prompt(prompt)
payload = {
"model": self.model,
"messages": [{"role": "user", "content": f"<|audio|>{voice}: {text}<|eot_id|>"}],
"max_tokens": self.max_tokens,
"temperature": self.temperature,
"top_p": self.top_p,
"frequency_penalty": self.repetition_penalty, # optional,
"stream": True,
"skip_special_tokens": False
}
try:
logging.debug(f"Requesting API URL: {self.api_url} with payload: {payload} and headers: {self.headers}")
response = requests.post(
f"{self.api_url}/v1/chat/completions", # <—— neuer Pfad
headers=self.headers,
json=payload,
stream=True
)
response.raise_for_status()
token_counter = 0
start_time = time.time() # Start timing token generation
for line in response.iter_lines():
# stop on demand
if self.stop_synthesis_event.is_set():
logging.debug("OrpheusEngine: token generation aborted")
break
if line:
line = line.decode('utf-8')
if line.startswith('data: '):
data_str = line[6:]
if data_str.strip() == '[DONE]':
break
try:
data = json.loads(data_str)
if 'choices' in data and data['choices']:
delta = data["choices"][0]["delta"]
token_text = delta.get("content", "")
if "<custom_token_" in token_text:
logging.debug(f"SNAC-frame: {token_text[:40]}")
if token_text:
token_counter += 1
# Print the time it took to get the first token
if token_counter == 1:
elapsed = time.time() - start_time
logging.info(f"Time to first token: {elapsed:.2f} seconds")
yield token_text
except json.JSONDecodeError as e:
logging.error(f"Error decoding JSON: {e}")
continue
except requests.RequestException as e:
logging.error(f"API request failed: {e}")
def _format_prompt(self, prompt: str) -> str:
"""
Format the text prompt with special tokens required by Orpheus.
Args:
prompt (str): The raw text prompt.
Returns:
str: The formatted prompt including voice and termination token.
"""
return f"<|audio|>{self.voice.name}: {prompt}<|eot_id|>"
def _token_decoder(self, token_gen):
"""
Decode tokens from the generator and convert them into audio samples.
This method aggregates tokens in a buffer and converts them into audio chunks
once enough tokens have been collected.
Args:
token_gen: Generator yielding token strings.
Yields:
Audio samples ready to be streamed.
"""
buffer = []
count = 0
logging.debug("Starting token decoding from token generator.")
for token_text in token_gen:
# bail out if stop was requested
if self.stop_synthesis_event.is_set():
logging.debug("OrpheusEngine: token decoding aborted")
break
token = self.turn_token_into_id(token_text, count)
if token is not None and token > 0:
buffer.append(token)
count += 1
# Process every 7 tokens after an initial threshold
if count % 7 == 0 and count > 27:
buffer_to_proc = buffer[-28:]
audio_samples = self._convert_buffer(buffer_to_proc, count)
if audio_samples is not None:
yield audio_samples
def turn_token_into_id(self, token_string: str, index: int) -> Optional[int]:
"""
Convert a token string to a numeric ID for audio processing.
The conversion takes into account the custom token prefix and an index-based offset.
Args:
token_string (str): The token text.
index (int): The current token index.
Returns:
Optional[int]: The numeric token ID or None if conversion fails.
"""
token_string = token_string.strip()
last_token_start = token_string.rfind(CUSTOM_TOKEN_PREFIX)
if last_token_start == -1:
return None
last_token = token_string[last_token_start:]
if last_token.startswith(CUSTOM_TOKEN_PREFIX) and last_token.endswith(">"):
try:
number_str = last_token[14:-1]
token_id = int(number_str) - 10 - ((index % 7) * 4096)
return token_id
except ValueError:
return None
else:
return None
def _convert_buffer(self, multiframe, count: int):
"""
Convert a buffer of token frames into audio samples.
This method uses an external decoder to convert the collected token frames.
Args:
multiframe: List of token IDs to be converted.
count (int): The current token count (used for conversion logic).
Returns:
Converted audio samples if successful; otherwise, None.
"""
try:
from .orpheus_decoder import convert_to_audio as orpheus_convert_to_audio
converted = orpheus_convert_to_audio(multiframe, count)
if converted is None:
logging.warning("Conversion returned None.")
return converted
except Exception as e:
logging.error(f"Failed to convert buffer to audio: {e}")
logging.info("Returning None after failed conversion.")
return None
def get_voices(self): # FastAPI /voices-Route
return self._SPEAKERS
def set_voice(self, voice_name: str) -> None:
if voice_name not in [v.name for v in self._SPEAKERS]:
raise ValueError(f"Unknown Orpheus speaker '{voice_name}'")
self.voice = OrpheusVoice(voice_name)
def set_voice_parameters(self, **kwargs):
"""
Update voice generation parameters.
Valid parameters include 'temperature', 'top_p', 'max_tokens', and 'repetition_penalty'.
Args:
**kwargs: Arbitrary keyword arguments for valid voice parameters.
"""
valid_params = ['temperature', 'top_p', 'max_tokens', 'repetition_penalty']
for param, value in kwargs.items():
if param in valid_params:
setattr(self, param, value)
elif self.debug:
logging.warning(f"Ignoring invalid parameter: {param}")
def __del__(self):
"""
Destructor to clean up resources.
Puts a None into the queue to signal termination of audio processing.
"""
self.queue.put(None)