Spaces:
Paused
Paused
"""Add commentMore actions | |
Notification sounds for Wan2GP video generation application | |
Pure Python audio notification system with multiple backend support | |
""" | |
import os | |
import sys | |
import threading | |
import time | |
import numpy as np | |
def generate_notification_beep(volume=50, sample_rate=44100): | |
"""Generate pleasant C major chord notification sound""" | |
if volume == 0: | |
return np.array([]) | |
volume = max(0, min(100, volume)) | |
# Volume curve mapping: 25%->50%, 50%->75%, 75%->100%, 100%->105% | |
if volume <= 25: | |
volume_mapped = (volume / 25.0) * 0.5 | |
elif volume <= 50: | |
volume_mapped = 0.5 + ((volume - 25) / 25.0) * 0.25 | |
elif volume <= 75: | |
volume_mapped = 0.75 + ((volume - 50) / 25.0) * 0.25 | |
else: | |
volume_mapped = 1.0 + ((volume - 75) / 25.0) * 0.05 # Only 5% boost instead of 15% | |
volume = volume_mapped | |
# C major chord frequencies | |
freq_c = 261.63 # C4 | |
freq_e = 329.63 # E4 | |
freq_g = 392.00 # G4 | |
duration = 0.8 | |
t = np.linspace(0, duration, int(sample_rate * duration), False) | |
# Generate chord components | |
wave_c = np.sin(freq_c * 2 * np.pi * t) * 0.4 | |
wave_e = np.sin(freq_e * 2 * np.pi * t) * 0.3 | |
wave_g = np.sin(freq_g * 2 * np.pi * t) * 0.2 | |
wave = wave_c + wave_e + wave_g | |
# Prevent clipping | |
max_amplitude = np.max(np.abs(wave)) | |
if max_amplitude > 0: | |
wave = wave / max_amplitude * 0.8 | |
# ADSR envelope | |
def apply_adsr_envelope(wave_data): | |
length = len(wave_data) | |
attack_time = int(0.2 * length) | |
decay_time = int(0.1 * length) | |
release_time = int(0.5 * length) | |
envelope = np.ones(length) | |
if attack_time > 0: | |
envelope[:attack_time] = np.power(np.linspace(0, 1, attack_time), 3) | |
if decay_time > 0: | |
start_idx = attack_time | |
end_idx = attack_time + decay_time | |
envelope[start_idx:end_idx] = np.linspace(1, 0.85, decay_time) | |
if release_time > 0: | |
start_idx = length - release_time | |
envelope[start_idx:] = 0.85 * np.exp(-4 * np.linspace(0, 1, release_time)) | |
return wave_data * envelope | |
wave = apply_adsr_envelope(wave) | |
# Simple low-pass filter | |
def simple_lowpass_filter(signal, cutoff_ratio=0.8): | |
window_size = max(3, int(len(signal) * 0.001)) | |
if window_size % 2 == 0: | |
window_size += 1 | |
kernel = np.ones(window_size) / window_size | |
padded = np.pad(signal, window_size//2, mode='edge') | |
filtered = np.convolve(padded, kernel, mode='same') | |
return filtered[window_size//2:-window_size//2] | |
wave = simple_lowpass_filter(wave) | |
# Add reverb effect | |
if len(wave) > sample_rate // 4: | |
delay_samples = int(0.12 * sample_rate) | |
reverb = np.zeros_like(wave) | |
reverb[delay_samples:] = wave[:-delay_samples] * 0.08 | |
wave = wave + reverb | |
# Apply volume first, then normalize to prevent clipping | |
wave = wave * volume * 0.5 | |
# Final normalization with safety margin | |
max_amplitude = np.max(np.abs(wave)) | |
if max_amplitude > 0.85: # If approaching clipping threshold | |
wave = wave / max_amplitude * 0.85 # More conservative normalization | |
return wave | |
_mixer_lock = threading.Lock() | |
def play_audio_with_pygame(audio_data, sample_rate=44100): | |
""" | |
Play audio with clean stereo output - sounds like single notification from both speakers | |
""" | |
try: | |
import pygame | |
with _mixer_lock: | |
if len(audio_data) == 0: | |
return False | |
# Clean mixer initialization - quit any existing mixer first | |
if pygame.mixer.get_init() is not None: | |
pygame.mixer.quit() | |
time.sleep(0.2) # Longer pause to ensure clean shutdown | |
# Initialize fresh mixer | |
pygame.mixer.pre_init( | |
frequency=sample_rate, | |
size=-16, | |
channels=2, | |
buffer=512 # Smaller buffer to reduce latency/doubling | |
) | |
pygame.mixer.init() | |
# Verify clean initialization | |
mixer_info = pygame.mixer.get_init() | |
if mixer_info is None or mixer_info[2] != 2: | |
return False | |
# Prepare audio - ensure clean conversion | |
audio_int16 = (audio_data * 32767).astype(np.int16) | |
if len(audio_int16.shape) > 1: | |
audio_int16 = audio_int16.flatten() | |
# Create clean stereo with identical channels | |
stereo_data = np.zeros((len(audio_int16), 2), dtype=np.int16) | |
stereo_data[:, 0] = audio_int16 # Left channel | |
stereo_data[:, 1] = audio_int16 # Right channel | |
# Create sound and play once | |
sound = pygame.sndarray.make_sound(stereo_data) | |
# Ensure only one playback | |
pygame.mixer.stop() # Stop any previous sounds | |
sound.play() | |
# Wait for completion | |
duration_ms = int(len(audio_data) / sample_rate * 1000) + 50 | |
pygame.time.wait(duration_ms) | |
return True | |
except ImportError: | |
return False | |
except Exception as e: | |
print(f"Pygame clean error: {e}") | |
return False | |
def play_audio_with_sounddevice(audio_data, sample_rate=44100): | |
"""Play audio using sounddevice backend""" | |
try: | |
import sounddevice as sd | |
sd.play(audio_data, sample_rate) | |
sd.wait() | |
return True | |
except ImportError: | |
return False | |
except Exception as e: | |
print(f"Sounddevice error: {e}") | |
return False | |
def play_audio_with_winsound(audio_data, sample_rate=44100): | |
"""Play audio using winsound backend (Windows only)""" | |
if sys.platform != "win32": | |
return False | |
try: | |
import winsound | |
import wave | |
import tempfile | |
import uuid | |
temp_dir = tempfile.gettempdir() | |
temp_filename = os.path.join(temp_dir, f"notification_{uuid.uuid4().hex}.wav") | |
try: | |
with wave.open(temp_filename, 'w') as wav_file: | |
wav_file.setnchannels(1) | |
wav_file.setsampwidth(2) | |
wav_file.setframerate(sample_rate) | |
audio_int16 = (audio_data * 32767).astype(np.int16) | |
wav_file.writeframes(audio_int16.tobytes()) | |
winsound.PlaySound(temp_filename, winsound.SND_FILENAME) | |
finally: | |
# Clean up temp file | |
for _ in range(3): | |
try: | |
if os.path.exists(temp_filename): | |
os.unlink(temp_filename) | |
break | |
except: | |
time.sleep(0.1) | |
return True | |
except ImportError: | |
return False | |
except Exception as e: | |
print(f"Winsound error: {e}") | |
return False | |
def play_notification_sound(volume=50): | |
"""Play notification sound with specified volume""" | |
if volume == 0: | |
return | |
audio_data = generate_notification_beep(volume=volume) | |
if len(audio_data) == 0: | |
return | |
# Try audio backends in order | |
audio_backends = [ | |
play_audio_with_pygame, | |
play_audio_with_sounddevice, | |
play_audio_with_winsound, | |
] | |
for backend in audio_backends: | |
try: | |
if backend(audio_data): | |
return | |
except Exception as e: | |
continue | |
# Fallback: terminal beep | |
print(f"All audio backends failed, using terminal beep") | |
print('\a') | |
def play_notification_async(volume=50): | |
"""Play notification sound asynchronously (non-blocking)""" | |
def play_sound(): | |
try: | |
play_notification_sound(volume) | |
except Exception as e: | |
print(f"Error playing notification sound: {e}") | |
sound_thread = threading.Thread(target=play_sound, daemon=True) | |
sound_thread.start() | |
def notify_video_completion(video_path=None, volume=50): | |
"""Notify about completed video generation""" | |
play_notification_async(volume) | |
if __name__ == "__main__": | |
print("Testing notification sounds with different volumes...") | |
print("Auto-detecting available audio backends...") | |
volumes = [25, 50, 75, 100] | |
for vol in volumes: | |
print(f"Testing volume {vol}%:") | |
play_notification_sound(vol) | |
time.sleep(2) | |
print("Test completed!") |