import json import time import logging import pyaudio import requests import traceback import numpy as np from queue import Queue from typing import Optional, Union from RealtimeTTS.engines import BaseEngine, TimingInfo # Default configuration values DEFAULT_API_URL = "http://127.0.0.1:1234" DEFAULT_HEADERS = {"Content-Type": "application/json"} DEFAULT_MODEL = "SebastianBodza/Kartoffel_Orpheus-3B_german_synthetic-v0.1" DEFAULT_VOICE = "Martin" STOP_SEQUENCE = "" SAMPLE_RATE = 24000 # Specific sample rate for Orpheus # Special token definitions for prompt formatting and token decoding START_TOKEN_ID = 128259 END_TOKEN_IDS = [128009, 128260, 128261, 128257] CUSTOM_TOKEN_PREFIX = " bool: """ Convert text to speech and stream audio data. Args: text (str): The input text to be synthesized. Returns: bool: True if synthesis was successful, False otherwise. """ super().synthesize(text) try: # Process tokens and put generated audio chunks into the queue for audio_chunk in self._token_decoder(self._generate_tokens(text)): # bail out immediately if someone called .stop() if self.stop_synthesis_event.is_set(): logging.info("OrpheusEngine: synthesis stopped by user") return False print(f"Audio chunk size: {len(audio_chunk)}") self.queue.put(audio_chunk) return True except Exception as e: traceback.print_exc() logging.error(f"Synthesis error: {e}") return False def synthesize(self, text: str) -> bool: """ Convert text to speech and stream audio data via Orpheus. Drops initial and trailing near-silent chunks. """ super().synthesize(text) try: for audio_chunk in self._token_decoder(self._generate_tokens(text)): # bail out if user called .stop() if self.stop_synthesis_event.is_set(): logging.info("OrpheusEngine: synthesis stopped by user") return False # forward this chunk self.queue.put(audio_chunk) return True except Exception as e: traceback.print_exc() logging.error(f"Synthesis error: {e}") return False def _generate_tokens(self, prompt: str): """ Generate a token stream using the LM Studio API. Args: prompt (str): The input text prompt. Yields: str: Each token's text as it is received from the API. """ logging.debug(f"Generating tokens for prompt: {prompt}") formatted_prompt = self._format_prompt(prompt) payload = { "model": self.model, "messages": [{"role": "user", "content": f"<|audio|>{voice}: {text}<|eot_id|>"}], "max_tokens": self.max_tokens, "temperature": self.temperature, "top_p": self.top_p, "frequency_penalty": self.repetition_penalty, # optional, "stream": True, "skip_special_tokens": False } try: logging.debug(f"Requesting API URL: {self.api_url} with payload: {payload} and headers: {self.headers}") response = requests.post( f"{self.api_url}/v1/chat/completions", # <—— neuer Pfad headers=self.headers, json=payload, stream=True ) response.raise_for_status() token_counter = 0 start_time = time.time() # Start timing token generation for line in response.iter_lines(): # stop on demand if self.stop_synthesis_event.is_set(): logging.debug("OrpheusEngine: token generation aborted") break if line: line = line.decode('utf-8') if line.startswith('data: '): data_str = line[6:] if data_str.strip() == '[DONE]': break try: data = json.loads(data_str) if 'choices' in data and data['choices']: delta = data["choices"][0]["delta"] token_text = delta.get("content", "") if " str: """ Format the text prompt with special tokens required by Orpheus. Args: prompt (str): The raw text prompt. Returns: str: The formatted prompt including voice and termination token. """ return f"<|audio|>{self.voice.name}: {prompt}<|eot_id|>" def _token_decoder(self, token_gen): """ Decode tokens from the generator and convert them into audio samples. This method aggregates tokens in a buffer and converts them into audio chunks once enough tokens have been collected. Args: token_gen: Generator yielding token strings. Yields: Audio samples ready to be streamed. """ buffer = [] count = 0 logging.debug("Starting token decoding from token generator.") for token_text in token_gen: # bail out if stop was requested if self.stop_synthesis_event.is_set(): logging.debug("OrpheusEngine: token decoding aborted") break token = self.turn_token_into_id(token_text, count) if token is not None and token > 0: buffer.append(token) count += 1 # Process every 7 tokens after an initial threshold if count % 7 == 0 and count > 27: buffer_to_proc = buffer[-28:] audio_samples = self._convert_buffer(buffer_to_proc, count) if audio_samples is not None: yield audio_samples def turn_token_into_id(self, token_string: str, index: int) -> Optional[int]: """ Convert a token string to a numeric ID for audio processing. The conversion takes into account the custom token prefix and an index-based offset. Args: token_string (str): The token text. index (int): The current token index. Returns: Optional[int]: The numeric token ID or None if conversion fails. """ token_string = token_string.strip() last_token_start = token_string.rfind(CUSTOM_TOKEN_PREFIX) if last_token_start == -1: return None last_token = token_string[last_token_start:] if last_token.startswith(CUSTOM_TOKEN_PREFIX) and last_token.endswith(">"): try: number_str = last_token[14:-1] token_id = int(number_str) - 10 - ((index % 7) * 4096) return token_id except ValueError: return None else: return None def _convert_buffer(self, multiframe, count: int): """ Convert a buffer of token frames into audio samples. This method uses an external decoder to convert the collected token frames. Args: multiframe: List of token IDs to be converted. count (int): The current token count (used for conversion logic). Returns: Converted audio samples if successful; otherwise, None. """ try: from .orpheus_decoder import convert_to_audio as orpheus_convert_to_audio converted = orpheus_convert_to_audio(multiframe, count) if converted is None: logging.warning("Conversion returned None.") return converted except Exception as e: logging.error(f"Failed to convert buffer to audio: {e}") logging.info("Returning None after failed conversion.") return None def get_voices(self): # FastAPI /voices-Route return self._SPEAKERS def set_voice(self, voice_name: str) -> None: if voice_name not in [v.name for v in self._SPEAKERS]: raise ValueError(f"Unknown Orpheus speaker '{voice_name}'") self.voice = OrpheusVoice(voice_name) def set_voice_parameters(self, **kwargs): """ Update voice generation parameters. Valid parameters include 'temperature', 'top_p', 'max_tokens', and 'repetition_penalty'. Args: **kwargs: Arbitrary keyword arguments for valid voice parameters. """ valid_params = ['temperature', 'top_p', 'max_tokens', 'repetition_penalty'] for param, value in kwargs.items(): if param in valid_params: setattr(self, param, value) elif self.debug: logging.warning(f"Ignoring invalid parameter: {param}") def __del__(self): """ Destructor to clean up resources. Puts a None into the queue to signal termination of audio processing. """ self.queue.put(None)