Spaces:
Running
Running
File size: 5,109 Bytes
ac5de5b |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 |
# utils.py
# Utility functions for the Dia TTS server
import logging
import time
import os
import io
import numpy as np
import soundfile as sf
from typing import Optional, Tuple
logger = logging.getLogger(__name__)
# --- Audio Processing ---
def encode_audio(
audio_array: np.ndarray, sample_rate: int, output_format: str = "opus"
) -> Optional[bytes]:
"""
Encodes a NumPy audio array into the specified format in memory.
Args:
audio_array: NumPy array containing audio data (float32, range [-1, 1]).
sample_rate: Sample rate of the audio data.
output_format: Desired output format ('opus' or 'wav').
Returns:
Bytes object containing the encoded audio, or None on failure.
"""
if audio_array is None or audio_array.size == 0:
logger.warning("encode_audio received empty or None audio array.")
return None
start_time = time.time()
output_buffer = io.BytesIO()
try:
if output_format == "opus":
# Soundfile expects int16 for Opus usually, but let's try float32 first
# It might convert internally or require specific subtypes.
# If this fails, we might need to convert to int16 first:
# audio_int16 = (audio_array * 32767).astype(np.int16)
# sf.write(output_buffer, audio_int16, sample_rate, format='ogg', subtype='opus')
sf.write(
output_buffer, audio_array, sample_rate, format="ogg", subtype="opus"
)
content_type = "audio/ogg; codecs=opus"
elif output_format == "wav":
# WAV typically uses int16
audio_int16 = (audio_array * 32767).astype(np.int16)
sf.write(
output_buffer, audio_int16, sample_rate, format="wav", subtype="pcm_16"
)
content_type = "audio/wav"
else:
logger.error(f"Unsupported output format requested: {output_format}")
return None
encoded_bytes = output_buffer.getvalue()
end_time = time.time()
logger.info(
f"Encoded {len(encoded_bytes)} bytes to {output_format} in {end_time - start_time:.3f} seconds."
)
return encoded_bytes
except ImportError:
logger.critical(
"`soundfile` or its dependency `libsndfile` not found/installed correctly. Cannot encode audio."
)
raise # Re-raise critical error
except Exception as e:
logger.error(f"Error encoding audio to {output_format}: {e}", exc_info=True)
return None
def save_audio_to_file(
audio_array: np.ndarray, sample_rate: int, file_path: str
) -> bool:
"""
Saves a NumPy audio array to a WAV file.
Args:
audio_array: NumPy array containing audio data (float32, range [-1, 1]).
sample_rate: Sample rate of the audio data.
file_path: Path to save the WAV file.
Returns:
True if saving was successful, False otherwise.
"""
if audio_array is None or audio_array.size == 0:
logger.warning("save_audio_to_file received empty or None audio array.")
return False
if not file_path.lower().endswith(".wav"):
logger.warning(
f"File path '{file_path}' does not end with .wav. Saving as WAV anyway."
)
# Optionally change the extension: file_path += ".wav"
start_time = time.time()
try:
# Ensure output directory exists
os.makedirs(os.path.dirname(file_path), exist_ok=True)
# WAV typically uses int16
audio_int16 = (audio_array * 32767).astype(np.int16)
sf.write(file_path, audio_int16, sample_rate, format="wav", subtype="pcm_16")
end_time = time.time()
logger.info(
f"Saved WAV file to {file_path} in {end_time - start_time:.3f} seconds."
)
return True
except ImportError:
logger.critical(
"`soundfile` or its dependency `libsndfile` not found/installed correctly. Cannot save audio."
)
return False # Indicate failure
except Exception as e:
logger.error(f"Error saving WAV file to {file_path}: {e}", exc_info=True)
return False
# --- Other Utilities (Optional) ---
class PerformanceMonitor:
"""Simple performance monitoring."""
def __init__(self):
self.start_time = time.time()
self.events = []
def record(self, event_name: str):
self.events.append((event_name, time.time()))
def report(self) -> str:
report_lines = ["Performance Report:"]
last_time = self.start_time
total_duration = time.time() - self.start_time
for name, timestamp in self.events:
duration = timestamp - last_time
report_lines.append(f" - {name}: {duration:.3f}s")
last_time = timestamp
report_lines.append(f"Total Duration: {total_duration:.3f}s")
return "\n".join(report_lines)
|