Spaces:
Running
Running
# Thank you to the authors of seewav for dedicating it into the public domain. | |
# This program is also dedicated into the public domain. | |
# You may use it, at your choice, under the Unlicense, CC0, or WTFPL license. | |
# Enjoy! | |
# Mostly from: https://github.com/adefossez/seewav | |
# Original author: adefossez | |
import math | |
import subprocess | |
import cairo | |
import gradio as gr | |
import numpy as np | |
import tqdm | |
from pydub import AudioSegment | |
def read_audio(audio, seek=None, duration=None): | |
"""Read the `audio` file, starting at `seek` (or 0) seconds for `duration` (or all) seconds. | |
Returns `float[channels, samples]`. | |
""" | |
audio_segment = AudioSegment.from_file(audio) | |
channels = audio_segment.channels | |
samplerate = audio_segment.frame_rate | |
if seek is not None: | |
seek_ms = int(seek * 1000) | |
audio_segment = audio_segment[seek_ms:] | |
if duration is not None: | |
duration_ms = int(duration * 1000) | |
audio_segment = audio_segment[:duration_ms] | |
samples = audio_segment.get_array_of_samples() | |
wav = np.array(samples, dtype=np.float32) | |
return wav.reshape(channels, -1), samplerate | |
def sigmoid(x): | |
return 1 / (1 + np.exp(-x)) | |
def envelope(wav, window, stride): | |
"""Extract the envelope of the waveform `wav` (float[samples]), using average pooling | |
with `window` samples and the given `stride`. | |
""" | |
# pos = np.pad(np.maximum(wav, 0), window // 2) | |
wav = np.pad(wav, window // 2) | |
out = [] | |
for off in range(0, len(wav) - window, stride): | |
frame = wav[off : off + window] | |
out.append(np.maximum(frame, 0).mean()) | |
out = np.array(out) | |
# Some form of audio compressor based on the sigmoid. | |
out = 1.9 * (sigmoid(2.5 * out) - 0.5) | |
return out | |
def draw_env(envs, out, fg_colors, bg_color, size): | |
"""Internal function, draw a single frame (two frames for stereo) using cairo and save | |
it to the `out` file as png. envs is a list of envelopes over channels, each env | |
is a float[bars] representing the height of the envelope to draw. Each entry will | |
be represented by a bar. | |
""" | |
surface = cairo.ImageSurface(cairo.FORMAT_ARGB32, *size) | |
ctx = cairo.Context(surface) | |
ctx.scale(*size) | |
ctx.set_source_rgb(*bg_color) | |
ctx.rectangle(0, 0, 1, 1) | |
ctx.fill() | |
K = len(envs) # Number of waves to draw (waves are stacked vertically) | |
T = len(envs[0]) # Numbert of time steps | |
pad_ratio = 0.1 # spacing ratio between 2 bars | |
width = 1.0 / (T * (1 + 2 * pad_ratio)) | |
pad = pad_ratio * width | |
delta = 2 * pad + width | |
ctx.set_line_width(width) | |
for step in range(T): | |
for i in range(K): | |
half = 0.5 * envs[i][step] # (semi-)height of the bar | |
half /= K # as we stack K waves vertically | |
midrule = (1 + 2 * i) / (2 * K) # midrule of i-th wave | |
ctx.set_source_rgb(*fg_colors[i]) | |
ctx.move_to(pad + step * delta, midrule - half) | |
ctx.line_to(pad + step * delta, midrule) | |
ctx.stroke() | |
ctx.set_source_rgba(*fg_colors[i], 0.8) | |
ctx.move_to(pad + step * delta, midrule) | |
ctx.line_to(pad + step * delta, midrule + 0.9 * half) | |
ctx.stroke() | |
surface.write_to_png(out) | |
def interpole(x1, y1, x2, y2, x): | |
return y1 + (y2 - y1) * (x - x1) / (x2 - x1) | |
def visualize( | |
audio, | |
tmp, | |
out, | |
seek=None, | |
duration=None, | |
rate=60, | |
bars=50, | |
speed=4, | |
time=0.4, | |
oversample=3, | |
fg_color=(0.2, 0.2, 0.2), | |
fg_color2=(0.5, 0.3, 0.6), | |
bg_color=(1, 1, 1), | |
size=(400, 400), | |
stereo=False, | |
): | |
"""Generate the visualisation for the `audio` file, using a `tmp` folder and saving the final | |
video in `out`. | |
`seek` and `durations` gives the extract location if any. | |
`rate` is the framerate of the output video. | |
`bars` is the number of bars in the animation. | |
`speed` is the base speed of transition. Depending on volume, actual speed will vary | |
between 0.5 and 2 times it. | |
`time` amount of audio shown at once on a frame. | |
`oversample` higher values will lead to more frequent changes. | |
`fg_color` is the rgb color to use for the foreground. | |
`fg_color2` is the rgb color to use for the second wav if stereo is set. | |
`bg_color` is the rgb color to use for the background. | |
`size` is the `(width, height)` in pixels to generate. | |
`stereo` is whether to create 2 waves. | |
""" | |
try: | |
wav, sr = read_audio(audio, seek=seek, duration=duration) | |
except (OSError, ValueError) as err: | |
raise gr.Error(err) | |
# wavs is a list of wav over channels | |
wavs = [] | |
if stereo: | |
assert wav.shape[0] == 2, "stereo requires stereo audio file" | |
wavs.append(wav[0]) | |
wavs.append(wav[1]) | |
else: | |
wav = wav.mean(0) | |
wavs.append(wav) | |
for i, wav in enumerate(wavs): | |
wavs[i] = wav / wav.std() | |
window = int(sr * time / bars) | |
stride = int(window / oversample) | |
# envs is a list of env over channels | |
envs = [] | |
for wav in wavs: | |
env = envelope(wav, window, stride) | |
env = np.pad(env, (bars // 2, 2 * bars)) | |
envs.append(env) | |
duration = len(wavs[0]) / sr | |
frames = int(rate * duration) | |
smooth = np.hanning(bars) | |
for idx in tqdm.tqdm(range(frames)): | |
pos = ((idx / rate) * sr) / stride / bars | |
off = int(pos) | |
loc = pos - off | |
denvs = [] | |
for env in envs: | |
env1 = env[off * bars : (off + 1) * bars] | |
env2 = env[(off + 1) * bars : (off + 2) * bars] | |
# we want loud parts to be updated faster | |
maxvol = math.log10(1e-4 + env2.max()) * 10 | |
speedup = np.clip(interpole(-6, 0.5, 0, 2, maxvol), 0.5, 2) | |
w = sigmoid(speed * speedup * (loc - 0.5)) | |
denv = (1 - w) * env1 + w * env2 | |
denv *= smooth | |
denvs.append(denv) | |
draw_env(denvs, tmp / f"{idx:06d}.png", (fg_color, fg_color2), bg_color, size) | |
subprocess.run( | |
[ | |
"ffmpeg", | |
"-y", | |
"-loglevel", | |
"panic", | |
"-r", | |
str(rate), | |
"-f", | |
"image2", | |
"-s", | |
f"{size[0]}x{size[1]}", | |
"-i", | |
"%06d.png", | |
"-i", | |
audio, | |
"-c:a", | |
"aac", | |
"-vcodec", | |
"libx264", | |
"-crf", | |
"10", | |
"-pix_fmt", | |
"yuv420p", | |
out.resolve(), | |
], | |
check=True, | |
cwd=tmp, | |
) | |
return out | |
def parse_color(colorstr): | |
"""Given a comma separated rgb(a) colors, returns a 4-tuple of float.""" | |
try: | |
r, g, b = [float(i) for i in colorstr.split(",")] | |
return r, g, b | |
except ValueError: | |
raise gr.Error("Format for color is 3 floats separated by commas 0.xx,0.xx,0.xx, rgb order") | |
def hex_to_rgb(hex_color): | |
hex_color = hex_color.lstrip("#") | |
r = int(hex_color[0:2], 16) / 255.0 | |
g = int(hex_color[2:4], 16) / 255.0 | |
b = int(hex_color[4:6], 16) / 255.0 | |
return (r, g, b) | |