Spaces:
Sleeping
Sleeping
Update app.py
Browse files
app.py
CHANGED
@@ -10,6 +10,7 @@ import logging
|
|
10 |
import requests
|
11 |
import io
|
12 |
import json
|
|
|
13 |
from typing import Optional, Dict, Any, List
|
14 |
from pathlib import Path
|
15 |
|
@@ -32,9 +33,18 @@ logger.info(f"π Running on device: {DEVICE}")
|
|
32 |
MODEL = None
|
33 |
CHATTERBOX_AVAILABLE = False
|
34 |
|
35 |
-
# Storage directories
|
36 |
-
|
37 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
38 |
os.makedirs(AUDIO_DIR, exist_ok=True)
|
39 |
os.makedirs(VOICES_DIR, exist_ok=True)
|
40 |
|
@@ -62,67 +72,133 @@ BUILTIN_VOICES = {
|
|
62 |
}
|
63 |
}
|
64 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
65 |
def load_voice_library():
|
66 |
-
"""Load saved custom voices from
|
67 |
global voice_library
|
68 |
voice_library = BUILTIN_VOICES.copy()
|
69 |
|
70 |
voices_json_path = os.path.join(VOICES_DIR, "voices.json")
|
71 |
-
|
72 |
-
|
|
|
73 |
with open(voices_json_path, 'r', encoding='utf-8') as f:
|
74 |
custom_voices = json.load(f)
|
75 |
voice_library.update(custom_voices)
|
76 |
-
logger.info(f"β
Loaded {len(custom_voices)} custom voices from
|
77 |
-
|
78 |
-
logger.
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
79 |
|
80 |
def save_voice_library():
|
81 |
-
"""Save custom voices to
|
82 |
try:
|
83 |
# Only save custom voices (not builtin)
|
84 |
custom_voices = {k: v for k, v in voice_library.items() if v.get("type") != "builtin"}
|
85 |
|
86 |
voices_json_path = os.path.join(VOICES_DIR, "voices.json")
|
|
|
|
|
|
|
|
|
87 |
with open(voices_json_path, 'w', encoding='utf-8') as f:
|
88 |
json.dump(custom_voices, f, ensure_ascii=False, indent=2)
|
89 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
90 |
except Exception as e:
|
91 |
logger.error(f"β Error saving voice library: {e}")
|
|
|
92 |
|
93 |
def create_voice_from_audio(audio_file, voice_name, voice_description="Custom voice"):
|
94 |
-
"""Create a new voice from uploaded audio"""
|
95 |
try:
|
96 |
voice_id = f"voice_{int(time.time())}_{uuid.uuid4().hex[:8]}"
|
97 |
|
98 |
-
#
|
99 |
-
audio_filename = f"{voice_id}.wav"
|
100 |
-
audio_path = os.path.join(VOICES_DIR, audio_filename)
|
101 |
-
|
102 |
-
# Convert and save audio
|
103 |
if isinstance(audio_file, tuple):
|
104 |
# Gradio audio format (sample_rate, audio_data)
|
105 |
sample_rate, audio_data = audio_file
|
106 |
-
sf.write(audio_path, audio_data, sample_rate)
|
107 |
else:
|
108 |
-
# File
|
109 |
-
sf.
|
|
|
|
|
|
|
|
|
|
|
110 |
|
111 |
-
# Create voice entry
|
112 |
voice_entry = {
|
113 |
"voice_id": voice_id,
|
114 |
"name": voice_name,
|
115 |
"description": voice_description,
|
116 |
-
"
|
|
|
117 |
"type": "custom",
|
118 |
-
"created_at": time.strftime("%Y-%m-%dT%H:%M:%SZ")
|
|
|
119 |
}
|
120 |
|
121 |
# Add to voice library
|
122 |
voice_library[voice_id] = voice_entry
|
|
|
|
|
123 |
save_voice_library()
|
124 |
|
125 |
-
logger.info(f"β
Created voice: {voice_name} ({voice_id})")
|
|
|
|
|
126 |
return voice_id, voice_entry
|
127 |
|
128 |
except Exception as e:
|
@@ -154,14 +230,25 @@ def download_audio_from_url(url):
|
|
154 |
return None
|
155 |
|
156 |
def get_voice_audio_path(voice_id):
|
157 |
-
"""Get the audio path for a voice (
|
158 |
if voice_id not in voice_library:
|
159 |
return None
|
160 |
|
161 |
voice_info = voice_library[voice_id]
|
162 |
|
163 |
-
# If it's a custom voice with
|
164 |
-
if voice_info.get("type") == "custom" and "
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
165 |
audio_path = voice_info["audio_path"]
|
166 |
if os.path.exists(audio_path):
|
167 |
return audio_path
|
@@ -370,8 +457,12 @@ def generate_tts_audio(
|
|
370 |
temp_audio_file = None
|
371 |
|
372 |
try:
|
373 |
-
|
374 |
-
|
|
|
|
|
|
|
|
|
375 |
temp_audio_file = audio_prompt_path
|
376 |
|
377 |
if audio_prompt_path:
|
@@ -400,8 +491,8 @@ def generate_tts_audio(
|
|
400 |
logger.error(f"β Audio generation failed: {e}")
|
401 |
raise
|
402 |
finally:
|
403 |
-
# Clean up temporary file (only if it's a downloaded URL)
|
404 |
-
if temp_audio_file and
|
405 |
try:
|
406 |
os.unlink(temp_audio_file)
|
407 |
logger.info(f"ποΈ Cleaned up temporary file: {temp_audio_file}")
|
@@ -529,7 +620,7 @@ async def delete_voice(voice_id: str):
|
|
529 |
raise HTTPException(status_code=400, detail="Cannot delete builtin voices")
|
530 |
|
531 |
try:
|
532 |
-
# Delete audio file
|
533 |
if "audio_path" in voice_info and os.path.exists(voice_info["audio_path"]):
|
534 |
os.unlink(voice_info["audio_path"])
|
535 |
|
@@ -538,6 +629,8 @@ async def delete_voice(voice_id: str):
|
|
538 |
del voice_library[voice_id]
|
539 |
save_voice_library()
|
540 |
|
|
|
|
|
541 |
return {
|
542 |
"success": True,
|
543 |
"message": f"Voice '{voice_name}' deleted successfully"
|
@@ -780,7 +873,7 @@ def create_gradio_interface():
|
|
780 |
|
781 |
voice_name = voice_info["name"]
|
782 |
|
783 |
-
# Delete audio file
|
784 |
if "audio_path" in voice_info and os.path.exists(voice_info["audio_path"]):
|
785 |
os.unlink(voice_info["audio_path"])
|
786 |
|
@@ -789,6 +882,8 @@ def create_gradio_interface():
|
|
789 |
save_voice_library()
|
790 |
|
791 |
updated_choices = get_voice_choices()
|
|
|
|
|
792 |
return (
|
793 |
f"β
Voice '{voice_name}' deleted successfully",
|
794 |
gr.update(choices=updated_choices, value=updated_choices[0][1] if updated_choices else None)
|
@@ -1035,14 +1130,14 @@ def create_gradio_interface():
|
|
1035 |
- **Device**: {DEVICE}
|
1036 |
- **ChatterboxTTS**: {chatterbox_status}
|
1037 |
- **Voice Library**: {len(voice_library)} voices loaded
|
|
|
1038 |
- **Generated Files**: {len(audio_cache)}
|
1039 |
-
- **Storage**: `{VOICES_DIR}/` for voices, `{AUDIO_DIR}/` for output
|
1040 |
|
1041 |
{'''### π Production Ready!
|
1042 |
-
Your ChatterboxTTS model is loaded with voice management
|
1043 |
**You're hearing beep sounds because ChatterboxTTS isn't loaded.**
|
1044 |
|
1045 |
-
Voice management is working, but you need ChatterboxTTS for real synthesis.'''}
|
1046 |
""")
|
1047 |
|
1048 |
return demo
|
|
|
10 |
import requests
|
11 |
import io
|
12 |
import json
|
13 |
+
import base64
|
14 |
from typing import Optional, Dict, Any, List
|
15 |
from pathlib import Path
|
16 |
|
|
|
33 |
MODEL = None
|
34 |
CHATTERBOX_AVAILABLE = False
|
35 |
|
36 |
+
# Storage directories - use persistent storage if available
|
37 |
+
if os.path.exists("/data"):
|
38 |
+
# Hugging Face Spaces persistent storage
|
39 |
+
VOICES_DIR = "/data/custom_voices"
|
40 |
+
AUDIO_DIR = "/data/generated_audio"
|
41 |
+
logger.info("β
Using Hugging Face Spaces persistent storage (/data)")
|
42 |
+
else:
|
43 |
+
# Fallback to local storage
|
44 |
+
VOICES_DIR = "custom_voices"
|
45 |
+
AUDIO_DIR = "generated_audio"
|
46 |
+
logger.warning("β οΈ Using local storage (voices will not persist)")
|
47 |
+
|
48 |
os.makedirs(AUDIO_DIR, exist_ok=True)
|
49 |
os.makedirs(VOICES_DIR, exist_ok=True)
|
50 |
|
|
|
72 |
}
|
73 |
}
|
74 |
|
75 |
+
def encode_audio_to_base64(audio_data, sample_rate):
|
76 |
+
"""Encode audio data to base64 string for storage"""
|
77 |
+
try:
|
78 |
+
# Create temporary file
|
79 |
+
temp_file = tempfile.NamedTemporaryFile(suffix=".wav", delete=False)
|
80 |
+
sf.write(temp_file.name, audio_data, sample_rate)
|
81 |
+
|
82 |
+
# Read as bytes and encode
|
83 |
+
with open(temp_file.name, 'rb') as f:
|
84 |
+
audio_bytes = f.read()
|
85 |
+
|
86 |
+
# Cleanup temp file
|
87 |
+
os.unlink(temp_file.name)
|
88 |
+
|
89 |
+
# Encode to base64
|
90 |
+
return base64.b64encode(audio_bytes).decode('utf-8')
|
91 |
+
except Exception as e:
|
92 |
+
logger.error(f"Error encoding audio: {e}")
|
93 |
+
return None
|
94 |
+
|
95 |
+
def decode_audio_from_base64(base64_string):
|
96 |
+
"""Decode base64 string back to audio file"""
|
97 |
+
try:
|
98 |
+
# Decode base64
|
99 |
+
audio_bytes = base64.b64decode(base64_string.encode('utf-8'))
|
100 |
+
|
101 |
+
# Create temporary file
|
102 |
+
temp_file = tempfile.NamedTemporaryFile(suffix=".wav", delete=False)
|
103 |
+
temp_file.write(audio_bytes)
|
104 |
+
temp_file.close()
|
105 |
+
|
106 |
+
return temp_file.name
|
107 |
+
except Exception as e:
|
108 |
+
logger.error(f"Error decoding audio: {e}")
|
109 |
+
return None
|
110 |
+
|
111 |
def load_voice_library():
|
112 |
+
"""Load saved custom voices from persistent storage"""
|
113 |
global voice_library
|
114 |
voice_library = BUILTIN_VOICES.copy()
|
115 |
|
116 |
voices_json_path = os.path.join(VOICES_DIR, "voices.json")
|
117 |
+
|
118 |
+
try:
|
119 |
+
if os.path.exists(voices_json_path):
|
120 |
with open(voices_json_path, 'r', encoding='utf-8') as f:
|
121 |
custom_voices = json.load(f)
|
122 |
voice_library.update(custom_voices)
|
123 |
+
logger.info(f"β
Loaded {len(custom_voices)} custom voices from persistent storage")
|
124 |
+
else:
|
125 |
+
logger.info("π No existing voice library found, starting fresh")
|
126 |
+
|
127 |
+
# Log voice library status
|
128 |
+
total_voices = len(voice_library)
|
129 |
+
custom_count = len([v for v in voice_library.values() if v.get("type") == "custom"])
|
130 |
+
builtin_count = len([v for v in voice_library.values() if v.get("type") == "builtin"])
|
131 |
+
logger.info(f"π Voice Library: {total_voices} total ({builtin_count} builtin, {custom_count} custom)")
|
132 |
+
|
133 |
+
except Exception as e:
|
134 |
+
logger.error(f"β Error loading voice library: {e}")
|
135 |
+
logger.info("π Starting with builtin voices only")
|
136 |
|
137 |
def save_voice_library():
|
138 |
+
"""Save custom voices to persistent storage"""
|
139 |
try:
|
140 |
# Only save custom voices (not builtin)
|
141 |
custom_voices = {k: v for k, v in voice_library.items() if v.get("type") != "builtin"}
|
142 |
|
143 |
voices_json_path = os.path.join(VOICES_DIR, "voices.json")
|
144 |
+
|
145 |
+
# Ensure directory exists
|
146 |
+
os.makedirs(os.path.dirname(voices_json_path), exist_ok=True)
|
147 |
+
|
148 |
with open(voices_json_path, 'w', encoding='utf-8') as f:
|
149 |
json.dump(custom_voices, f, ensure_ascii=False, indent=2)
|
150 |
+
|
151 |
+
logger.info(f"β
Saved {len(custom_voices)} custom voices to persistent storage")
|
152 |
+
logger.info(f"π Storage location: {voices_json_path}")
|
153 |
+
|
154 |
+
# Verify the save worked
|
155 |
+
if os.path.exists(voices_json_path):
|
156 |
+
file_size = os.path.getsize(voices_json_path)
|
157 |
+
logger.info(f"π Voice library file size: {file_size} bytes")
|
158 |
+
|
159 |
except Exception as e:
|
160 |
logger.error(f"β Error saving voice library: {e}")
|
161 |
+
logger.error(f"π Attempted path: {voices_json_path}")
|
162 |
|
163 |
def create_voice_from_audio(audio_file, voice_name, voice_description="Custom voice"):
|
164 |
+
"""Create a new voice from uploaded audio with persistent storage"""
|
165 |
try:
|
166 |
voice_id = f"voice_{int(time.time())}_{uuid.uuid4().hex[:8]}"
|
167 |
|
168 |
+
# Handle different audio input formats
|
|
|
|
|
|
|
|
|
169 |
if isinstance(audio_file, tuple):
|
170 |
# Gradio audio format (sample_rate, audio_data)
|
171 |
sample_rate, audio_data = audio_file
|
|
|
172 |
else:
|
173 |
+
# File path - load the audio
|
174 |
+
audio_data, sample_rate = sf.read(audio_file)
|
175 |
+
|
176 |
+
# Encode audio to base64 for persistent storage
|
177 |
+
audio_base64 = encode_audio_to_base64(audio_data, sample_rate)
|
178 |
+
if audio_base64 is None:
|
179 |
+
raise ValueError("Failed to encode audio")
|
180 |
|
181 |
+
# Create voice entry with embedded audio
|
182 |
voice_entry = {
|
183 |
"voice_id": voice_id,
|
184 |
"name": voice_name,
|
185 |
"description": voice_description,
|
186 |
+
"audio_base64": audio_base64, # Store audio as base64
|
187 |
+
"sample_rate": int(sample_rate),
|
188 |
"type": "custom",
|
189 |
+
"created_at": time.strftime("%Y-%m-%dT%H:%M:%SZ"),
|
190 |
+
"audio_duration": len(audio_data) / sample_rate
|
191 |
}
|
192 |
|
193 |
# Add to voice library
|
194 |
voice_library[voice_id] = voice_entry
|
195 |
+
|
196 |
+
# Save to persistent storage
|
197 |
save_voice_library()
|
198 |
|
199 |
+
logger.info(f"β
Created persistent voice: {voice_name} ({voice_id})")
|
200 |
+
logger.info(f"π΅ Audio: {len(audio_data)} samples, {sample_rate}Hz, {voice_entry['audio_duration']:.2f}s")
|
201 |
+
|
202 |
return voice_id, voice_entry
|
203 |
|
204 |
except Exception as e:
|
|
|
230 |
return None
|
231 |
|
232 |
def get_voice_audio_path(voice_id):
|
233 |
+
"""Get the audio path for a voice (decode from base64 if custom, download if builtin)"""
|
234 |
if voice_id not in voice_library:
|
235 |
return None
|
236 |
|
237 |
voice_info = voice_library[voice_id]
|
238 |
|
239 |
+
# If it's a custom voice with base64 audio
|
240 |
+
if voice_info.get("type") == "custom" and "audio_base64" in voice_info:
|
241 |
+
# Decode base64 to temporary file
|
242 |
+
temp_path = decode_audio_from_base64(voice_info["audio_base64"])
|
243 |
+
if temp_path:
|
244 |
+
logger.info(f"β
Decoded custom voice audio: {voice_info['name']}")
|
245 |
+
return temp_path
|
246 |
+
else:
|
247 |
+
logger.warning(f"β οΈ Failed to decode audio for voice {voice_id}")
|
248 |
+
return None
|
249 |
+
|
250 |
+
# If it's a legacy custom voice with file path (for backward compatibility)
|
251 |
+
elif voice_info.get("type") == "custom" and "audio_path" in voice_info:
|
252 |
audio_path = voice_info["audio_path"]
|
253 |
if os.path.exists(audio_path):
|
254 |
return audio_path
|
|
|
457 |
temp_audio_file = None
|
458 |
|
459 |
try:
|
460 |
+
# Get audio path for the voice
|
461 |
+
audio_prompt_path = get_voice_audio_path(voice_id)
|
462 |
+
temp_audio_file = None
|
463 |
+
|
464 |
+
# Check if we got a temporary file (from base64 decode or URL download)
|
465 |
+
if audio_prompt_path and (audio_prompt_path.startswith('/tmp/') or 'temp' in audio_prompt_path):
|
466 |
temp_audio_file = audio_prompt_path
|
467 |
|
468 |
if audio_prompt_path:
|
|
|
491 |
logger.error(f"β Audio generation failed: {e}")
|
492 |
raise
|
493 |
finally:
|
494 |
+
# Clean up temporary file (only if it's a downloaded URL or decoded audio)
|
495 |
+
if temp_audio_file and os.path.exists(temp_audio_file):
|
496 |
try:
|
497 |
os.unlink(temp_audio_file)
|
498 |
logger.info(f"ποΈ Cleaned up temporary file: {temp_audio_file}")
|
|
|
620 |
raise HTTPException(status_code=400, detail="Cannot delete builtin voices")
|
621 |
|
622 |
try:
|
623 |
+
# Delete legacy audio file if it exists
|
624 |
if "audio_path" in voice_info and os.path.exists(voice_info["audio_path"]):
|
625 |
os.unlink(voice_info["audio_path"])
|
626 |
|
|
|
629 |
del voice_library[voice_id]
|
630 |
save_voice_library()
|
631 |
|
632 |
+
logger.info(f"β
Deleted voice: {voice_name} ({voice_id})")
|
633 |
+
|
634 |
return {
|
635 |
"success": True,
|
636 |
"message": f"Voice '{voice_name}' deleted successfully"
|
|
|
873 |
|
874 |
voice_name = voice_info["name"]
|
875 |
|
876 |
+
# Delete legacy audio file if it exists
|
877 |
if "audio_path" in voice_info and os.path.exists(voice_info["audio_path"]):
|
878 |
os.unlink(voice_info["audio_path"])
|
879 |
|
|
|
882 |
save_voice_library()
|
883 |
|
884 |
updated_choices = get_voice_choices()
|
885 |
+
logger.info(f"β
UI: Deleted voice {voice_name} ({voice_id})")
|
886 |
+
|
887 |
return (
|
888 |
f"β
Voice '{voice_name}' deleted successfully",
|
889 |
gr.update(choices=updated_choices, value=updated_choices[0][1] if updated_choices else None)
|
|
|
1130 |
- **Device**: {DEVICE}
|
1131 |
- **ChatterboxTTS**: {chatterbox_status}
|
1132 |
- **Voice Library**: {len(voice_library)} voices loaded
|
1133 |
+
- **Storage**: {"β
Persistent (/data)" if VOICES_DIR.startswith("/data") else "β οΈ Temporary"}
|
1134 |
- **Generated Files**: {len(audio_cache)}
|
|
|
1135 |
|
1136 |
{'''### π Production Ready!
|
1137 |
+
Your ChatterboxTTS model is loaded with persistent voice management.''' if CHATTERBOX_AVAILABLE else '''### β οΈ Action Required
|
1138 |
**You're hearing beep sounds because ChatterboxTTS isn't loaded.**
|
1139 |
|
1140 |
+
Voice management is working with persistent storage, but you need ChatterboxTTS for real synthesis.'''}
|
1141 |
""")
|
1142 |
|
1143 |
return demo
|