Tomtom84's picture
Update app.py
8bd6538 verified
if __name__ == "__main__":
print("Starting server")
import logging
# Enable or disable debug logging
DEBUG_LOGGING = False
if DEBUG_LOGGING:
logging.basicConfig(level=logging.DEBUG)
else:
logging.basicConfig(level=logging.WARNING)
from RealtimeTTS import (
TextToAudioStream,
)
from engines.orpheus_engine import OrpheusEngine
from huggingface_hub import login
from fastapi.responses import StreamingResponse, HTMLResponse, FileResponse
from fastapi.middleware.cors import CORSMiddleware
from fastapi import FastAPI, Query, Request
from fastapi.staticfiles import StaticFiles
from queue import Queue
import threading
import logging
import uvicorn
import wave
import io
import os
HF_TOKEN = os.getenv("HF_TOKEN")
if HF_TOKEN:
print("🔑 Logging in to Hugging Face Hub...")
login(HF_TOKEN)
engines: dict[str, "BaseEngine"] = {}
voices: dict[str, list] = {}
current_engine = None
play_text_to_speech_semaphore = threading.Semaphore(1)
PORT = int(os.getenv("TTS_FASTAPI_PORT", 7860)) # Zahl kongruent halten
SUPPORTED_ENGINES = ["orpheus"]
engines["orpheus"] = OrpheusEngine(
api_url=os.getenv("ORPHEUS_API_URL"), # http://127.0.0.1:1234/v1/completions
model=os.getenv("ORPHEUS_MODEL") # Kartoffel-ID
)
voices["orpheus"] = engines["orpheus"].get_voices()
current_engine = engines["orpheus"]
# change start engine by moving engine name
# to the first position in SUPPORTED_ENGINES
# START_ENGINE = SUPPORTED_ENGINES[0]
START_ENGINE = "orpheus"
BROWSER_IDENTIFIERS = [
"mozilla",
"chrome",
"safari",
"firefox",
"edge",
"opera",
"msie",
"trident",
]
origins = [
"http://localhost",
f"http://localhost:{PORT}",
"http://127.0.0.1",
f"http://127.0.0.1:{PORT}",
"https://localhost",
f"https://localhost:{PORT}",
"https://127.0.0.1",
f"https://127.0.0.1:{PORT}",
]
speaking_lock = threading.Lock()
tts_lock = threading.Lock()
gen_lock = threading.Lock()
class TTSRequestHandler:
def __init__(self, engine):
self.engine = engine
self.audio_queue = Queue()
self.stream = TextToAudioStream(
engine, on_audio_stream_stop=self.on_audio_stream_stop, muted=True
)
self.speaking = False
def on_audio_chunk(self, chunk):
self.audio_queue.put(chunk)
def on_audio_stream_stop(self):
self.audio_queue.put(None)
self.speaking = False
def play_text_to_speech(self, text):
self.speaking = True
self.stream.feed(text)
logging.debug(f"Playing audio for text: {text}")
print(f'Synthesizing: "{text}"')
self.stream.play_async(on_audio_chunk=self.on_audio_chunk, muted=True)
def audio_chunk_generator(self, send_wave_headers):
first_chunk = False
try:
while True:
chunk = self.audio_queue.get()
if chunk is None:
print("Terminating stream")
break
if not first_chunk:
if send_wave_headers:
print("Sending wave header")
yield create_wave_header_for_engine(self.engine)
first_chunk = True
yield chunk
except Exception as e:
print(f"Error during streaming: {str(e)}")
app = FastAPI()
app.mount("/static", StaticFiles(directory="static"), name="static")
app.add_middleware(
CORSMiddleware,
allow_origins=origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Define a CSP that allows 'self' for script sources for firefox
csp = {
"default-src": "'self'",
"script-src": "'self'",
"style-src": "'self' 'unsafe-inline'",
"img-src": "'self' data:",
"font-src": "'self' data:",
"media-src": "'self' blob:",
}
csp_string = "; ".join(f"{key} {value}" for key, value in csp.items())
@app.middleware("http")
async def add_security_headers(request: Request, call_next):
response = await call_next(request)
response.headers["Content-Security-Policy"] = csp_string
return response
@app.get("/favicon.ico")
async def favicon():
return FileResponse("static/favicon.ico")
def _set_engine(engine_name):
global current_engine, stream
if current_engine is None:
current_engine = engines[engine_name]
else:
current_engine = engines[engine_name]
if voices[engine_name]:
engines[engine_name].set_voice(voices[engine_name][0].name)
@app.get("/set_engine")
def set_engine(request: Request, engine_name: str = Query(...)):
if engine_name not in engines:
return {"error": "Engine not supported"}
try:
_set_engine(engine_name)
return {"message": f"Switched to {engine_name} engine"}
except Exception as e:
logging.error(f"Error switching engine: {str(e)}")
return {"error": "Failed to switch engine"}
def is_browser_request(request):
user_agent = request.headers.get("user-agent", "").lower()
is_browser = any(browser_id in user_agent for browser_id in BROWSER_IDENTIFIERS)
return is_browser
def create_wave_header_for_engine(engine):
_, _, sample_rate = engine.get_stream_info()
num_channels = 1
sample_width = 2
frame_rate = sample_rate
wav_header = io.BytesIO()
with wave.open(wav_header, "wb") as wav_file:
wav_file.setnchannels(num_channels)
wav_file.setsampwidth(sample_width)
wav_file.setframerate(frame_rate)
wav_header.seek(0)
wave_header_bytes = wav_header.read()
wav_header.close()
# Create a new BytesIO with the correct MIME type for Firefox
final_wave_header = io.BytesIO()
final_wave_header.write(wave_header_bytes)
final_wave_header.seek(0)
return final_wave_header.getvalue()
@app.get("/tts")
async def tts(request: Request, text: str = Query(...)):
with tts_lock:
request_handler = TTSRequestHandler(current_engine)
browser_request = is_browser_request(request)
if play_text_to_speech_semaphore.acquire(blocking=False):
try:
threading.Thread(
target=request_handler.play_text_to_speech,
args=(text,),
daemon=True,
).start()
finally:
play_text_to_speech_semaphore.release()
return StreamingResponse(
request_handler.audio_chunk_generator(browser_request),
media_type="audio/wav",
)
@app.get("/engines")
def get_engines():
return list(engines.keys())
@app.get("/voices")
def get_voices():
# falls noch keine Engine gewählt/initialisiert ist
if engine is None: # <-- dein zentrales Engine-Objekt
return []
# OrpheusEngine.get_voices() liefert eine Liste von OrpheusVoice-Objekten
return [v.__dict__ for v in engine.get_voices()]
@app.get("/setvoice")
def set_voice(request: Request, voice_name: str = Query(...)):
print(f"Getting request: {voice_name}")
if not current_engine:
print("No engine is currently selected")
return {"error": "No engine is currently selected"}
try:
print(f"Setting voice to {voice_name}")
current_engine.set_voice(voice_name)
return {"message": f"Voice set to {voice_name} successfully"}
except Exception as e:
print(f"Error setting voice: {str(e)}")
logging.error(f"Error setting voice: {str(e)}")
return {"error": "Failed to set voice"}
@app.get("/")
def root_page():
engines_options = "".join(
[
f'<option value="{engine}">{engine.title()}</option>'
for engine in engines.keys()
]
)
content = f"""
<!DOCTYPE html>
<html>
<head>
<title>Text-To-Speech</title>
<style>
body {{
font-family: Arial, sans-serif;
background-color: #f0f0f0;
margin: 0;
padding: 0;
}}
h2 {{
color: #333;
text-align: center;
}}
#container {{
width: 80%;
margin: 50px auto;
background-color: #fff;
border-radius: 10px;
padding: 20px;
box-shadow: 0 0 10px rgba(0, 0, 0, 0.1);
}}
label {{
font-weight: bold;
}}
select, textarea {{
width: 100%;
padding: 10px;
margin: 10px 0;
border: 1px solid #ccc;
border-radius: 5px;
box-sizing: border-box;
font-size: 16px;
}}
button {{
display: block;
width: 100%;
padding: 15px;
background-color: #007bff;
border: none;
border-radius: 5px;
color: #fff;
font-size: 16px;
cursor: pointer;
transition: background-color 0.3s;
}}
button:hover {{
background-color: #0056b3;
}}
audio {{
width: 80%;
margin: 10px auto;
display: block;
}}
</style>
</head>
<body>
<div id="container">
<h2>Text to Speech</h2>
<label for="engine">Select Engine:</label>
<select id="engine">
{engines_options}
</select>
<label for="voice">Select Voice:</label>
<select id="voice">
<!-- Options will be dynamically populated by JavaScript -->
</select>
<textarea id="text" rows="4" cols="50" placeholder="Enter text here..."></textarea>
<button id="speakButton">Speak</button>
<audio id="audio" controls></audio> <!-- Hidden audio player -->
</div>
<script src="/static/tts.js"></script>
</body>
</html>
"""
return HTMLResponse(content=content)
if __name__ == "__main__":
print("Initializing TTS Engines")
for engine_name in SUPPORTED_ENGINES:
if "azure" == engine_name:
azure_api_key = os.environ.get("AZURE_SPEECH_KEY")
azure_region = os.environ.get("AZURE_SPEECH_REGION")
if azure_api_key and azure_region:
print("Initializing azure engine")
engines["azure"] = AzureEngine(azure_api_key, azure_region)
if "elevenlabs" == engine_name:
elevenlabs_api_key = os.environ.get("ELEVENLABS_API_KEY")
if elevenlabs_api_key:
print("Initializing elevenlabs engine")
engines["elevenlabs"] = ElevenlabsEngine(elevenlabs_api_key)
if "system" == engine_name:
print("Initializing system engine")
engines["system"] = SystemEngine()
if "coqui" == engine_name:
print("Initializing coqui engine")
engines["coqui"] = CoquiEngine()
if "kokoro" == engine_name:
print("Initializing kokoro engine")
engines["kokoro"] = KokoroEngine()
if "openai" == engine_name:
print("Initializing openai engine")
engines["openai"] = OpenAIEngine()
for _engine in engines.keys():
print(f"Retrieving voices for TTS Engine {_engine}")
try:
voices[_engine] = engines[_engine].get_voices()
except Exception as e:
voices[_engine] = []
logging.error(f"Error retrieving voices for {_engine}: {str(e)}")
_set_engine(START_ENGINE)
print("Server ready")
uvicorn.run(app, host="0.0.0.0", port=PORT)