|
if __name__ == "__main__": |
|
print("Starting server") |
|
import logging |
|
|
|
|
|
DEBUG_LOGGING = False |
|
|
|
if DEBUG_LOGGING: |
|
logging.basicConfig(level=logging.DEBUG) |
|
else: |
|
logging.basicConfig(level=logging.WARNING) |
|
|
|
|
|
from RealtimeTTS import ( |
|
TextToAudioStream, |
|
) |
|
from engines.orpheus_engine import OrpheusEngine |
|
|
|
from huggingface_hub import login |
|
|
|
from fastapi.responses import StreamingResponse, HTMLResponse, FileResponse |
|
from fastapi.middleware.cors import CORSMiddleware |
|
from fastapi import FastAPI, Query, Request |
|
from fastapi.staticfiles import StaticFiles |
|
|
|
from queue import Queue |
|
import threading |
|
import logging |
|
import uvicorn |
|
import wave |
|
import io |
|
import os |
|
|
|
HF_TOKEN = os.getenv("HF_TOKEN") |
|
if HF_TOKEN: |
|
print("🔑 Logging in to Hugging Face Hub...") |
|
login(HF_TOKEN) |
|
|
|
engines: dict[str, "BaseEngine"] = {} |
|
voices: dict[str, list] = {} |
|
current_engine = None |
|
|
|
play_text_to_speech_semaphore = threading.Semaphore(1) |
|
|
|
PORT = int(os.getenv("TTS_FASTAPI_PORT", 7860)) |
|
SUPPORTED_ENGINES = ["orpheus"] |
|
engines["orpheus"] = OrpheusEngine( |
|
api_url=os.getenv("ORPHEUS_API_URL"), |
|
model=os.getenv("ORPHEUS_MODEL") |
|
) |
|
|
|
voices["orpheus"] = engines["orpheus"].get_voices() |
|
|
|
current_engine = engines["orpheus"] |
|
|
|
|
|
|
|
|
|
START_ENGINE = "orpheus" |
|
|
|
BROWSER_IDENTIFIERS = [ |
|
"mozilla", |
|
"chrome", |
|
"safari", |
|
"firefox", |
|
"edge", |
|
"opera", |
|
"msie", |
|
"trident", |
|
] |
|
|
|
origins = [ |
|
"http://localhost", |
|
f"http://localhost:{PORT}", |
|
"http://127.0.0.1", |
|
f"http://127.0.0.1:{PORT}", |
|
"https://localhost", |
|
f"https://localhost:{PORT}", |
|
"https://127.0.0.1", |
|
f"https://127.0.0.1:{PORT}", |
|
] |
|
|
|
|
|
speaking_lock = threading.Lock() |
|
tts_lock = threading.Lock() |
|
gen_lock = threading.Lock() |
|
|
|
|
|
class TTSRequestHandler: |
|
def __init__(self, engine): |
|
self.engine = engine |
|
self.audio_queue = Queue() |
|
self.stream = TextToAudioStream( |
|
engine, on_audio_stream_stop=self.on_audio_stream_stop, muted=True |
|
) |
|
self.speaking = False |
|
|
|
def on_audio_chunk(self, chunk): |
|
self.audio_queue.put(chunk) |
|
|
|
def on_audio_stream_stop(self): |
|
self.audio_queue.put(None) |
|
self.speaking = False |
|
|
|
def play_text_to_speech(self, text): |
|
self.speaking = True |
|
self.stream.feed(text) |
|
logging.debug(f"Playing audio for text: {text}") |
|
print(f'Synthesizing: "{text}"') |
|
self.stream.play_async(on_audio_chunk=self.on_audio_chunk, muted=True) |
|
|
|
def audio_chunk_generator(self, send_wave_headers): |
|
first_chunk = False |
|
try: |
|
while True: |
|
chunk = self.audio_queue.get() |
|
if chunk is None: |
|
print("Terminating stream") |
|
break |
|
if not first_chunk: |
|
if send_wave_headers: |
|
print("Sending wave header") |
|
yield create_wave_header_for_engine(self.engine) |
|
first_chunk = True |
|
yield chunk |
|
except Exception as e: |
|
print(f"Error during streaming: {str(e)}") |
|
|
|
|
|
app = FastAPI() |
|
app.mount("/static", StaticFiles(directory="static"), name="static") |
|
app.add_middleware( |
|
CORSMiddleware, |
|
allow_origins=origins, |
|
allow_credentials=True, |
|
allow_methods=["*"], |
|
allow_headers=["*"], |
|
) |
|
|
|
|
|
csp = { |
|
"default-src": "'self'", |
|
"script-src": "'self'", |
|
"style-src": "'self' 'unsafe-inline'", |
|
"img-src": "'self' data:", |
|
"font-src": "'self' data:", |
|
"media-src": "'self' blob:", |
|
} |
|
csp_string = "; ".join(f"{key} {value}" for key, value in csp.items()) |
|
|
|
|
|
@app.middleware("http") |
|
async def add_security_headers(request: Request, call_next): |
|
response = await call_next(request) |
|
response.headers["Content-Security-Policy"] = csp_string |
|
return response |
|
|
|
|
|
@app.get("/favicon.ico") |
|
async def favicon(): |
|
return FileResponse("static/favicon.ico") |
|
|
|
|
|
def _set_engine(engine_name): |
|
global current_engine, stream |
|
if current_engine is None: |
|
current_engine = engines[engine_name] |
|
else: |
|
current_engine = engines[engine_name] |
|
|
|
if voices[engine_name]: |
|
engines[engine_name].set_voice(voices[engine_name][0].name) |
|
|
|
|
|
@app.get("/set_engine") |
|
def set_engine(request: Request, engine_name: str = Query(...)): |
|
if engine_name not in engines: |
|
return {"error": "Engine not supported"} |
|
|
|
try: |
|
_set_engine(engine_name) |
|
return {"message": f"Switched to {engine_name} engine"} |
|
except Exception as e: |
|
logging.error(f"Error switching engine: {str(e)}") |
|
return {"error": "Failed to switch engine"} |
|
|
|
|
|
def is_browser_request(request): |
|
user_agent = request.headers.get("user-agent", "").lower() |
|
is_browser = any(browser_id in user_agent for browser_id in BROWSER_IDENTIFIERS) |
|
return is_browser |
|
|
|
|
|
def create_wave_header_for_engine(engine): |
|
_, _, sample_rate = engine.get_stream_info() |
|
|
|
num_channels = 1 |
|
sample_width = 2 |
|
frame_rate = sample_rate |
|
|
|
wav_header = io.BytesIO() |
|
with wave.open(wav_header, "wb") as wav_file: |
|
wav_file.setnchannels(num_channels) |
|
wav_file.setsampwidth(sample_width) |
|
wav_file.setframerate(frame_rate) |
|
|
|
wav_header.seek(0) |
|
wave_header_bytes = wav_header.read() |
|
wav_header.close() |
|
|
|
|
|
final_wave_header = io.BytesIO() |
|
final_wave_header.write(wave_header_bytes) |
|
final_wave_header.seek(0) |
|
|
|
return final_wave_header.getvalue() |
|
|
|
|
|
@app.get("/tts") |
|
async def tts(request: Request, text: str = Query(...)): |
|
with tts_lock: |
|
request_handler = TTSRequestHandler(current_engine) |
|
browser_request = is_browser_request(request) |
|
|
|
if play_text_to_speech_semaphore.acquire(blocking=False): |
|
try: |
|
threading.Thread( |
|
target=request_handler.play_text_to_speech, |
|
args=(text,), |
|
daemon=True, |
|
).start() |
|
finally: |
|
play_text_to_speech_semaphore.release() |
|
|
|
return StreamingResponse( |
|
request_handler.audio_chunk_generator(browser_request), |
|
media_type="audio/wav", |
|
) |
|
|
|
|
|
@app.get("/engines") |
|
def get_engines(): |
|
return list(engines.keys()) |
|
|
|
@app.get("/voices") |
|
def get_voices(): |
|
|
|
if engine is None: |
|
return [] |
|
|
|
|
|
return [v.__dict__ for v in engine.get_voices()] |
|
|
|
|
|
@app.get("/setvoice") |
|
def set_voice(request: Request, voice_name: str = Query(...)): |
|
print(f"Getting request: {voice_name}") |
|
if not current_engine: |
|
print("No engine is currently selected") |
|
return {"error": "No engine is currently selected"} |
|
|
|
try: |
|
print(f"Setting voice to {voice_name}") |
|
current_engine.set_voice(voice_name) |
|
return {"message": f"Voice set to {voice_name} successfully"} |
|
except Exception as e: |
|
print(f"Error setting voice: {str(e)}") |
|
logging.error(f"Error setting voice: {str(e)}") |
|
return {"error": "Failed to set voice"} |
|
|
|
|
|
@app.get("/") |
|
def root_page(): |
|
engines_options = "".join( |
|
[ |
|
f'<option value="{engine}">{engine.title()}</option>' |
|
for engine in engines.keys() |
|
] |
|
) |
|
content = f""" |
|
<!DOCTYPE html> |
|
<html> |
|
<head> |
|
<title>Text-To-Speech</title> |
|
<style> |
|
body {{ |
|
font-family: Arial, sans-serif; |
|
background-color: #f0f0f0; |
|
margin: 0; |
|
padding: 0; |
|
}} |
|
h2 {{ |
|
color: #333; |
|
text-align: center; |
|
}} |
|
#container {{ |
|
width: 80%; |
|
margin: 50px auto; |
|
background-color: #fff; |
|
border-radius: 10px; |
|
padding: 20px; |
|
box-shadow: 0 0 10px rgba(0, 0, 0, 0.1); |
|
}} |
|
label {{ |
|
font-weight: bold; |
|
}} |
|
select, textarea {{ |
|
width: 100%; |
|
padding: 10px; |
|
margin: 10px 0; |
|
border: 1px solid #ccc; |
|
border-radius: 5px; |
|
box-sizing: border-box; |
|
font-size: 16px; |
|
}} |
|
button {{ |
|
display: block; |
|
width: 100%; |
|
padding: 15px; |
|
background-color: #007bff; |
|
border: none; |
|
border-radius: 5px; |
|
color: #fff; |
|
font-size: 16px; |
|
cursor: pointer; |
|
transition: background-color 0.3s; |
|
}} |
|
button:hover {{ |
|
background-color: #0056b3; |
|
}} |
|
audio {{ |
|
width: 80%; |
|
margin: 10px auto; |
|
display: block; |
|
}} |
|
</style> |
|
</head> |
|
<body> |
|
<div id="container"> |
|
<h2>Text to Speech</h2> |
|
<label for="engine">Select Engine:</label> |
|
<select id="engine"> |
|
{engines_options} |
|
</select> |
|
<label for="voice">Select Voice:</label> |
|
<select id="voice"> |
|
<!-- Options will be dynamically populated by JavaScript --> |
|
</select> |
|
<textarea id="text" rows="4" cols="50" placeholder="Enter text here..."></textarea> |
|
<button id="speakButton">Speak</button> |
|
<audio id="audio" controls></audio> <!-- Hidden audio player --> |
|
</div> |
|
<script src="/static/tts.js"></script> |
|
</body> |
|
</html> |
|
""" |
|
return HTMLResponse(content=content) |
|
|
|
|
|
if __name__ == "__main__": |
|
print("Initializing TTS Engines") |
|
|
|
for engine_name in SUPPORTED_ENGINES: |
|
if "azure" == engine_name: |
|
azure_api_key = os.environ.get("AZURE_SPEECH_KEY") |
|
azure_region = os.environ.get("AZURE_SPEECH_REGION") |
|
if azure_api_key and azure_region: |
|
print("Initializing azure engine") |
|
engines["azure"] = AzureEngine(azure_api_key, azure_region) |
|
|
|
if "elevenlabs" == engine_name: |
|
elevenlabs_api_key = os.environ.get("ELEVENLABS_API_KEY") |
|
if elevenlabs_api_key: |
|
print("Initializing elevenlabs engine") |
|
engines["elevenlabs"] = ElevenlabsEngine(elevenlabs_api_key) |
|
|
|
if "system" == engine_name: |
|
print("Initializing system engine") |
|
engines["system"] = SystemEngine() |
|
|
|
if "coqui" == engine_name: |
|
print("Initializing coqui engine") |
|
engines["coqui"] = CoquiEngine() |
|
|
|
if "kokoro" == engine_name: |
|
print("Initializing kokoro engine") |
|
engines["kokoro"] = KokoroEngine() |
|
|
|
if "openai" == engine_name: |
|
print("Initializing openai engine") |
|
engines["openai"] = OpenAIEngine() |
|
|
|
for _engine in engines.keys(): |
|
print(f"Retrieving voices for TTS Engine {_engine}") |
|
try: |
|
voices[_engine] = engines[_engine].get_voices() |
|
except Exception as e: |
|
voices[_engine] = [] |
|
logging.error(f"Error retrieving voices for {_engine}: {str(e)}") |
|
|
|
_set_engine(START_ENGINE) |
|
|
|
print("Server ready") |
|
uvicorn.run(app, host="0.0.0.0", port=PORT) |