Spaces:
Running
Running
File size: 10,971 Bytes
0c6ce6a e9b69d2 0c6ce6a 41b5e7a 0c6ce6a 41b5e7a e8c4059 d989475 0c6ce6a 41b5e7a d989475 0c6ce6a efe2f24 41b5e7a 0c6ce6a e8c4059 41b5e7a 0c6ce6a 41b5e7a d989475 41b5e7a 0c6ce6a 41b5e7a 0c6ce6a 41b5e7a 0c6ce6a 41b5e7a 0c6ce6a 41b5e7a 0c6ce6a 41b5e7a 0c6ce6a e8c4059 41b5e7a 0c6ce6a 41b5e7a 0c6ce6a 41b5e7a 0c6ce6a 41b5e7a 0c6ce6a c8ab947 0c6ce6a e9b69d2 0c6ce6a 74de460 0c6ce6a e9b69d2 0c6ce6a 85474b3 0c6ce6a b7cfba0 0c6ce6a |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 |
from fastapi import FastAPI, Query, HTTPException, BackgroundTasks
from fastapi.responses import StreamingResponse
from pydantic import BaseModel, Field
from typing import List, Dict, Optional, Tuple, Generator
import torch
import os
import io
import numpy as np
from kokoro import KModel, KPipeline
import spaces
import time
app = FastAPI(title="Kokoro TTS API", description="API for Kokoro text-to-speech conversion")
# Constants
IS_DUPLICATE = not os.getenv('SPACE_ID', '').startswith('hexgrad/')
CHAR_LIMIT = None if IS_DUPLICATE else 5000
CUDA_AVAILABLE = torch.cuda.is_available()
# Initialize models
models = {gpu: KModel().to('cuda' if gpu else 'cpu').eval() for gpu in [False] + ([True] if CUDA_AVAILABLE else [])}
pipelines = {lang_code: KPipeline(lang_code=lang_code, model=False) for lang_code in 'ab'}
pipelines['a'].g2p.lexicon.golds['kokoro'] = 'kหOkษษนO'
pipelines['b'].g2p.lexicon.golds['kokoro'] = 'kหQkษษนQ'
# Voice choices
CHOICES = {
'๐บ๐ธ ๐บ Heart โค๏ธ': 'af_heart',
'๐บ๐ธ ๐บ Bella ๐ฅ': 'af_bella',
'๐บ๐ธ ๐บ Nicole ๐ง': 'af_nicole',
'๐บ๐ธ ๐บ Aoede': 'af_aoede',
'๐บ๐ธ ๐บ Kore': 'af_kore',
'๐บ๐ธ ๐บ Sarah': 'af_sarah',
'๐บ๐ธ ๐บ Nova': 'af_nova',
'๐บ๐ธ ๐บ Sky': 'af_sky',
'๐บ๐ธ ๐บ Alloy': 'af_alloy',
'๐บ๐ธ ๐บ Jessica': 'af_jessica',
'๐บ๐ธ ๐บ River': 'af_river',
'๐บ๐ธ ๐น Michael': 'am_michael',
'๐บ๐ธ ๐น Fenrir': 'am_fenrir',
'๐บ๐ธ ๐น Puck': 'am_puck',
'๐บ๐ธ ๐น Echo': 'am_echo',
'๐บ๐ธ ๐น Eric': 'am_eric',
'๐บ๐ธ ๐น Liam': 'am_liam',
'๐บ๐ธ ๐น Onyx': 'am_onyx',
'๐บ๐ธ ๐น Santa': 'am_santa',
'๐บ๐ธ ๐น Adam': 'am_adam',
'๐ฌ๐ง ๐บ Emma': 'bf_emma',
'๐ฌ๐ง ๐บ Isabella': 'bf_isabella',
'๐ฌ๐ง ๐บ Alice': 'bf_alice',
'๐ฌ๐ง ๐บ Lily': 'bf_lily',
'๐ฌ๐ง ๐น George': 'bm_george',
'๐ฌ๐ง ๐น Fable': 'bm_fable',
'๐ฌ๐ง ๐น Lewis': 'bm_lewis',
'๐ฌ๐ง ๐น Daniel': 'bm_daniel',
}
# Load voices
for v in CHOICES.values():
pipelines[v[0]].load_voice(v)
# Sample text files
with open('en.txt', 'r') as r:
RANDOM_QUOTES = [line.strip() for line in r]
def get_gatsby():
with open('gatsby5k.md', 'r') as r:
return r.read().strip()
def get_frankenstein():
with open('frankenstein5k.md', 'r') as r:
return r.read().strip()
# Pydantic models
class TTSRequest(BaseModel):
text: str = Field(..., description="Text to convert to speech")
voice: str = Field("af_heart", description="Voice ID to use for TTS")
speed: float = Field(1.0, description="Speech speed factor (0.5 to 2.0)", ge=0.5, le=2.0)
use_gpu: bool = Field(CUDA_AVAILABLE, description="Whether to use GPU for inference")
class TextRequest(BaseModel):
text: str = Field(..., description="Text to tokenize")
voice: str = Field("af_heart", description="Voice ID to use for tokenization")
class Voice(BaseModel):
display_name: str
id: str
language: str
gender: str
class VoiceList(BaseModel):
voices: List[Voice]
# GPU wrapper function
@spaces.GPU(duration=30)
def forward_gpu(ps, ref_s, speed):
return models[True](ps, ref_s, speed)
# Helper functions
def generate_first(text: str, voice: str = 'af_heart', speed: float = 1.0, use_gpu: bool = CUDA_AVAILABLE):
"""Generate audio for the first sentence/segment of text"""
text = text if CHAR_LIMIT is None else text.strip()[:CHAR_LIMIT]
pipeline = pipelines[voice[0]]
pack = pipeline.load_voice(voice)
use_gpu = use_gpu and CUDA_AVAILABLE
for _, ps, _ in pipeline(text, voice, speed):
ref_s = pack[len(ps)-1]
try:
if use_gpu:
audio = forward_gpu(ps, ref_s, speed)
else:
audio = models[False](ps, ref_s, speed)
except Exception as e:
if use_gpu:
# Fallback to CPU
audio = models[False](ps, ref_s, speed)
else:
raise HTTPException(status_code=500, detail=str(e))
return (24000, audio.numpy()), ps
return None, ''
def tokenize_first(text: str, voice: str = 'af_heart'):
"""Tokenize the first sentence/segment of text"""
pipeline = pipelines[voice[0]]
for _, ps, _ in pipeline(text, voice):
return ps
return ''
def generate_all(text: str, voice: str = 'af_heart', speed: float = 1.0, use_gpu: bool = CUDA_AVAILABLE) -> Generator:
"""Generate audio for all segments of text"""
text = text if CHAR_LIMIT is None else text.strip()[:CHAR_LIMIT]
pipeline = pipelines[voice[0]]
pack = pipeline.load_voice(voice)
use_gpu = use_gpu and CUDA_AVAILABLE
for _, ps, _ in pipeline(text, voice, speed):
ref_s = pack[len(ps)-1]
try:
if use_gpu:
audio = forward_gpu(ps, ref_s, speed)
else:
audio = models[False](ps, ref_s, speed)
except Exception as e:
if use_gpu:
# Fallback to CPU
audio = models[False](ps, ref_s, speed)
else:
raise HTTPException(status_code=500, detail=str(e))
yield audio.numpy()
def create_wav(audio_data, sample_rate=24000):
"""Convert numpy array to WAV bytes"""
import wave
import struct
wav_io = io.BytesIO()
with wave.open(wav_io, 'wb') as wav_file:
wav_file.setnchannels(1) # Mono
wav_file.setsampwidth(2) # 16-bit
wav_file.setframerate(sample_rate)
# Convert float32 to int16
audio_data = (audio_data * 32767).astype(np.int16)
wav_file.writeframes(audio_data.tobytes())
wav_io.seek(0)
return wav_io.read()
def stream_wav_chunks(audio_chunks, sample_rate=24000):
"""Stream WAV chunks as they're generated"""
# Write WAV header first
header_io = io.BytesIO()
with wave.open(header_io, 'wb') as wav_file:
wav_file.setnchannels(1) # Mono
wav_file.setsampwidth(2) # 16-bit
wav_file.setframerate(sample_rate)
# We don't know the total frames yet
wav_file.writeframes(b'')
# Get header bytes
header_io.seek(0)
header_bytes = header_io.read(44) # WAV header is 44 bytes
yield header_bytes
# Stream audio chunks
for chunk in audio_chunks:
# Convert float32 to int16
audio_data = (chunk * 32767).astype(np.int16)
yield audio_data.tobytes()
time.sleep(0.1) # Small delay to avoid overwhelming the client
# API Routes
@app.get("/", tags=["Info"])
async def root():
"""API root with basic information"""
return {
"message": "Kokoro TTS API",
"description": "Convert text to speech using Kokoro TTS model",
"endpoints": {
"GET /voices": "List available voices",
"POST /tts": "Convert text to speech",
"POST /tokenize": "Tokenize text",
"GET /stream": "Stream audio from text",
"GET /samples": "Get sample texts"
}
}
@app.get("/voices", response_model=VoiceList, tags=["Voices"])
async def list_voices():
"""List all available voices"""
voice_list = []
for display_name, voice_id in CHOICES.items():
# Parse display name format: "๐บ๐ธ ๐บ Heart โค๏ธ"
parts = display_name.split()
language = "US English" if "๐บ๐ธ" in display_name else "UK English"
gender = "Female" if "๐บ" in display_name else "Male"
voice_list.append(Voice(
display_name=display_name,
id=voice_id,
language=language,
gender=gender
))
return VoiceList(voices=voice_list)
@app.post("/tts", tags=["Text-to-Speech"])
async def text_to_speech(request: TTSRequest):
"""Convert text to speech"""
if request.voice not in CHOICES.values():
raise HTTPException(status_code=400, detail=f"Voice '{request.voice}' not found. Use /voices to see available options.")
result, _ = generate_first(request.text, request.voice, request.speed, request.use_gpu)
if result is None:
raise HTTPException(status_code=500, detail="Failed to generate audio")
sample_rate, audio_data = result
wav_bytes = create_wav(audio_data, sample_rate)
return StreamingResponse(
io.BytesIO(wav_bytes),
media_type="audio/wav",
headers={"Content-Disposition": f"attachment; filename=tts_{request.voice}.wav"}
)
@app.post("/tokenize", tags=["Text Processing"])
async def tokenize_text(request: TextRequest):
"""Tokenize input text"""
if request.voice not in CHOICES.values():
raise HTTPException(status_code=400, detail=f"Voice '{request.voice}' not found. Use /voices to see available options.")
tokens = tokenize_first(request.text, request.voice)
return {"text": request.text, "tokens": tokens}
@app.get("/stream", tags=["Text-to-Speech"])
async def stream_tts(
text: str = Query(..., description="Text to convert to speech"),
voice: str = Query("af_heart", description="Voice ID"),
speed: float = Query(1.0, description="Speech speed", ge=0.5, le=2.0),
use_gpu: bool = Query(CUDA_AVAILABLE, description="Use GPU for inference")
):
"""Stream audio from text as it's generated"""
if voice not in CHOICES.values():
raise HTTPException(status_code=400, detail=f"Voice '{voice}' not found. Use /voices to see available options.")
# Limit text if needed
if CHAR_LIMIT is not None:
text = text.strip()[:CHAR_LIMIT]
# Create generator for audio chunks
audio_chunks = generate_all(text, voice, speed, use_gpu)
# Stream as WAV
return StreamingResponse(
stream_wav_chunks(audio_chunks),
media_type="audio/wav",
headers={"Content-Disposition": f"attachment; filename=stream_{voice}.wav"}
)
@app.get("/samples", tags=["Sample Text"])
async def get_samples():
"""Get sample texts"""
import random
return {
"random_quote": random.choice(RANDOM_QUOTES),
"gatsby_excerpt": get_gatsby()[:200] + "...", # First 200 chars
"frankenstein_excerpt": get_frankenstein()[:200] + "..." # First 200 chars
}
@app.get("/sample/{sample_type}", tags=["Sample Text"])
async def get_sample(sample_type: str):
"""Get a specific sample text"""
import random
if sample_type == "random":
return {"text": random.choice(RANDOM_QUOTES)}
elif sample_type == "gatsby":
return {"text": get_gatsby()}
elif sample_type == "frankenstein":
return {"text": get_frankenstein()}
else:
raise HTTPException(status_code=404, detail=f"Sample type '{sample_type}' not found")
if __name__ == "__main__":
import uvicorn
uvicorn.run("app:app", host="0.0.0.0", port=8000, reload=True) |