if __name__ == "__main__": print("Starting server") import logging # Enable or disable debug logging DEBUG_LOGGING = False if DEBUG_LOGGING: logging.basicConfig(level=logging.DEBUG) else: logging.basicConfig(level=logging.WARNING) from RealtimeTTS import ( TextToAudioStream, ) from engines.orpheus_engine import OrpheusEngine from huggingface_hub import login from fastapi.responses import StreamingResponse, HTMLResponse, FileResponse from fastapi.middleware.cors import CORSMiddleware from fastapi import FastAPI, Query, Request from fastapi.staticfiles import StaticFiles from queue import Queue import threading import logging import uvicorn import wave import io import os HF_TOKEN = os.getenv("HF_TOKEN") if HF_TOKEN: print("🔑 Logging in to Hugging Face Hub...") login(HF_TOKEN) engines: dict[str, "BaseEngine"] = {} voices: dict[str, list] = {} current_engine = None play_text_to_speech_semaphore = threading.Semaphore(1) PORT = int(os.getenv("TTS_FASTAPI_PORT", 7860)) # Zahl kongruent halten SUPPORTED_ENGINES = ["orpheus"] engines["orpheus"] = OrpheusEngine( api_url=os.getenv("ORPHEUS_API_URL"), # http://127.0.0.1:1234/v1/completions model=os.getenv("ORPHEUS_MODEL") # Kartoffel-ID ) voices["orpheus"] = engines["orpheus"].get_voices() current_engine = engines["orpheus"] # change start engine by moving engine name # to the first position in SUPPORTED_ENGINES # START_ENGINE = SUPPORTED_ENGINES[0] START_ENGINE = "orpheus" BROWSER_IDENTIFIERS = [ "mozilla", "chrome", "safari", "firefox", "edge", "opera", "msie", "trident", ] origins = [ "http://localhost", f"http://localhost:{PORT}", "http://127.0.0.1", f"http://127.0.0.1:{PORT}", "https://localhost", f"https://localhost:{PORT}", "https://127.0.0.1", f"https://127.0.0.1:{PORT}", ] speaking_lock = threading.Lock() tts_lock = threading.Lock() gen_lock = threading.Lock() class TTSRequestHandler: def __init__(self, engine): self.engine = engine self.audio_queue = Queue() self.stream = TextToAudioStream( engine, on_audio_stream_stop=self.on_audio_stream_stop, muted=True ) self.speaking = False def on_audio_chunk(self, chunk): self.audio_queue.put(chunk) def on_audio_stream_stop(self): self.audio_queue.put(None) self.speaking = False def play_text_to_speech(self, text): self.speaking = True self.stream.feed(text) logging.debug(f"Playing audio for text: {text}") print(f'Synthesizing: "{text}"') self.stream.play_async(on_audio_chunk=self.on_audio_chunk, muted=True) def audio_chunk_generator(self, send_wave_headers): first_chunk = False try: while True: chunk = self.audio_queue.get() if chunk is None: print("Terminating stream") break if not first_chunk: if send_wave_headers: print("Sending wave header") yield create_wave_header_for_engine(self.engine) first_chunk = True yield chunk except Exception as e: print(f"Error during streaming: {str(e)}") app = FastAPI() app.mount("/static", StaticFiles(directory="static"), name="static") app.add_middleware( CORSMiddleware, allow_origins=origins, allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Define a CSP that allows 'self' for script sources for firefox csp = { "default-src": "'self'", "script-src": "'self'", "style-src": "'self' 'unsafe-inline'", "img-src": "'self' data:", "font-src": "'self' data:", "media-src": "'self' blob:", } csp_string = "; ".join(f"{key} {value}" for key, value in csp.items()) @app.middleware("http") async def add_security_headers(request: Request, call_next): response = await call_next(request) response.headers["Content-Security-Policy"] = csp_string return response @app.get("/favicon.ico") async def favicon(): return FileResponse("static/favicon.ico") def _set_engine(engine_name): global current_engine, stream if current_engine is None: current_engine = engines[engine_name] else: current_engine = engines[engine_name] if voices[engine_name]: engines[engine_name].set_voice(voices[engine_name][0].name) @app.get("/set_engine") def set_engine(request: Request, engine_name: str = Query(...)): if engine_name not in engines: return {"error": "Engine not supported"} try: _set_engine(engine_name) return {"message": f"Switched to {engine_name} engine"} except Exception as e: logging.error(f"Error switching engine: {str(e)}") return {"error": "Failed to switch engine"} def is_browser_request(request): user_agent = request.headers.get("user-agent", "").lower() is_browser = any(browser_id in user_agent for browser_id in BROWSER_IDENTIFIERS) return is_browser def create_wave_header_for_engine(engine): _, _, sample_rate = engine.get_stream_info() num_channels = 1 sample_width = 2 frame_rate = sample_rate wav_header = io.BytesIO() with wave.open(wav_header, "wb") as wav_file: wav_file.setnchannels(num_channels) wav_file.setsampwidth(sample_width) wav_file.setframerate(frame_rate) wav_header.seek(0) wave_header_bytes = wav_header.read() wav_header.close() # Create a new BytesIO with the correct MIME type for Firefox final_wave_header = io.BytesIO() final_wave_header.write(wave_header_bytes) final_wave_header.seek(0) return final_wave_header.getvalue() @app.get("/tts") async def tts(request: Request, text: str = Query(...)): with tts_lock: request_handler = TTSRequestHandler(current_engine) browser_request = is_browser_request(request) if play_text_to_speech_semaphore.acquire(blocking=False): try: threading.Thread( target=request_handler.play_text_to_speech, args=(text,), daemon=True, ).start() finally: play_text_to_speech_semaphore.release() return StreamingResponse( request_handler.audio_chunk_generator(browser_request), media_type="audio/wav", ) @app.get("/engines") def get_engines(): return list(engines.keys()) @app.get("/voices") def get_voices(): # falls noch keine Engine gewählt/initialisiert ist if engine is None: # <-- dein zentrales Engine-Objekt return [] # OrpheusEngine.get_voices() liefert eine Liste von OrpheusVoice-Objekten return [v.__dict__ for v in engine.get_voices()] @app.get("/setvoice") def set_voice(request: Request, voice_name: str = Query(...)): print(f"Getting request: {voice_name}") if not current_engine: print("No engine is currently selected") return {"error": "No engine is currently selected"} try: print(f"Setting voice to {voice_name}") current_engine.set_voice(voice_name) return {"message": f"Voice set to {voice_name} successfully"} except Exception as e: print(f"Error setting voice: {str(e)}") logging.error(f"Error setting voice: {str(e)}") return {"error": "Failed to set voice"} @app.get("/") def root_page(): engines_options = "".join( [ f'' for engine in engines.keys() ] ) content = f""" Text-To-Speech

Text to Speech

""" return HTMLResponse(content=content) if __name__ == "__main__": print("Initializing TTS Engines") for engine_name in SUPPORTED_ENGINES: if "azure" == engine_name: azure_api_key = os.environ.get("AZURE_SPEECH_KEY") azure_region = os.environ.get("AZURE_SPEECH_REGION") if azure_api_key and azure_region: print("Initializing azure engine") engines["azure"] = AzureEngine(azure_api_key, azure_region) if "elevenlabs" == engine_name: elevenlabs_api_key = os.environ.get("ELEVENLABS_API_KEY") if elevenlabs_api_key: print("Initializing elevenlabs engine") engines["elevenlabs"] = ElevenlabsEngine(elevenlabs_api_key) if "system" == engine_name: print("Initializing system engine") engines["system"] = SystemEngine() if "coqui" == engine_name: print("Initializing coqui engine") engines["coqui"] = CoquiEngine() if "kokoro" == engine_name: print("Initializing kokoro engine") engines["kokoro"] = KokoroEngine() if "openai" == engine_name: print("Initializing openai engine") engines["openai"] = OpenAIEngine() for _engine in engines.keys(): print(f"Retrieving voices for TTS Engine {_engine}") try: voices[_engine] = engines[_engine].get_voices() except Exception as e: voices[_engine] = [] logging.error(f"Error retrieving voices for {_engine}: {str(e)}") _set_engine(START_ENGINE) print("Server ready") uvicorn.run(app, host="0.0.0.0", port=PORT)