Sense / app.py
Staticaliza's picture
Update app.py
3e7bef2 verified
raw
history blame
6.59 kB
# Imports
import gradio as gr
import spaces
import torch
import os
import math
import gc
import librosa
import tempfile
from PIL import Image, ImageSequence
from decord import VideoReader, cpu
from moviepy.editor import VideoFileClip
from transformers import AutoModel, AutoTokenizer, AutoProcessor
# Variables
DEVICE = "auto"
if DEVICE == "auto":
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
print(f"[SYSTEM] | Using {DEVICE} type compute device.")
DEFAULT_INPUT = "Describe in one short sentence."
MAX_FRAMES = 64
AUDIO_SR = 16000
model_name = "openbmb/MiniCPM-o-2_6"
repo = AutoModel.from_pretrained(model_name, trust_remote_code=True, attn_implementation="sdpa", torch_dtype=torch.bfloat16).to(DEVICE)
tokenizer = AutoTokenizer.from_pretrained(model_name, trust_remote_code=True)
processor = AutoProcessor.from_pretrained(model_name, trust_remote_code=True)
css = '''
.gradio-container{max-width: 560px !important}
h1{text-align:center}
footer {
visibility: hidden
}
'''
global_instruction = "You will analyze video, audio and text input and output your description of the given content with as much keywords and always take a guess."
input_prefixes = {
"Image": "A image file called β–ˆ has been attached, describe the image content.",
"GIF": "A GIF file called β–ˆ has been attached, describe the GIF content.",
"Video": "A audio video file called β–ˆ has been attached, describe the video content and the audio content.",
"Audio": "A audio file called β–ˆ has been attached, describe the audio content.",
}
filetypes = {
"Image": [".jpg", ".jpeg", ".png", ".bmp"],
"GIF": [".gif"],
"Video": [".mp4", ".mov", ".avi", ".mkv"],
"Audio": [".wav", ".mp3", ".flac", ".aac"],
}
# Functions
def infer_filetype(ext):
return next((k for k, v in filetypes.items() if ext in v), None)
def uniform_sample(seq, n):
step = max(len(seq) // n, 1)
return seq[::step][:n]
def frames_from_video(path):
vr = VideoReader(path, ctx = cpu(0))
idx = uniform_sample(range(len(vr)), MAX_FRAMES)
batch = vr.get_batch(idx).asnumpy()
return [Image.fromarray(frame.astype("uint8")) for frame in batch]
def audio_from_video(path):
clip = VideoFileClip(path)
with tempfile.NamedTemporaryFile(suffix = ".wav", delete = True) as tmp:
clip.audio.write_audiofile(tmp.name,
codec = "pcm_s16le",
fps = AUDIO_SR,
verbose = False,
logger = None)
audio_np, _ = librosa.load(tmp.name, sr = AUDIO_SR, mono = True)
clip.close()
return audio_np
def load_audio(path):
audio_np, _ = librosa.load(path, sr = AUDIO_SR, mono = True)
return audio_np
def build_video_omni(path, instruction):
frames = frames_from_video(path)
audio = audio_from_video(path)
contents = [instruction]
audio_secs = math.ceil(len(audio) / AUDIO_SR)
total_units = max(1, min(len(frames), audio_secs))
for i in range(total_units):
frame = frames[i] if i < len(frames) else frames[-1]
start = i * AUDIO_SR
end = min((i + 1) * AUDIO_SR, len(audio))
chunk = audio[start:end]
if chunk.size == 0: break
contents.extend(["<unit>", frame, chunk])
return contents
def build_image_omni(path, instruction):
image = Image.open(path).convert("RGB")
return [instruction, image]
def build_gif_omni(path, instruction):
img = Image.open(path)
frames = [f.copy().convert("RGB") for f in ImageSequence.Iterator(img)]
frames = uniform_sample(frames, MAX_FRAMES)
return [instruction, *frames]
def build_audio_omni(path, instruction):
audio = load_audio(path)
return [instruction, audio]
@spaces.GPU(duration=30)
def generate(input,
instruction = DEFAULT_INPUT,
sampling = False,
temperature = 0.7,
top_p = 0.8,
top_k = 100,
repetition_penalty = 1.05,
max_tokens = 512):
if not input: return "no input provided."
extension = os.path.splitext(input)[1].lower()
filetype = infer_filetype(extension)
if not filetype: return "unsupported file type."
filename = os.path.basename(input)
prefix = input_prefixes[filetype].replace("β–ˆ", filename)
builder_map = {
"Image": build_image_omni,
"GIF" : build_gif_omni,
"Video": build_video_omni,
"Audio": build_audio_omni
}
instruction = f"{prefix}\n{instruction}"
msgs = [{ "role": "user", "content": global_instruction }, { "role": "user", "content": omni_content }]
print(msgs)
output = repo.chat(
msgs = msgs,
tokenizer = tokenizer,
sampling = sampling,
temperature = temperature,
top_p = top_p,
top_k = top_k,
repetition_penalty = repetition_penalty,
max_new_tokens = max_tokens,
omni_input = True,
use_image_id = False,
max_slice_nums = 2
)
torch.cuda.empty_cache()
gc.collect()
return output
def cloud():
print("[CLOUD] | Space maintained.")
# Initialize
with gr.Blocks(css=css) as main:
with gr.Column():
input = gr.File(label="Input", file_types=["image", "video", "audio"], type="filepath")
instruction = gr.Textbox(lines=1, value=DEFAULT_INPUT, label="Instruction")
sampling = gr.Checkbox(value=False, label="Sampling")
temperature = gr.Slider(minimum=0.01, maximum=1.99, step=0.01, value=0.7, label="Temperature")
top_p = gr.Slider(minimum=0, maximum=1, step=0.01, value=0.8, label="Top P")
top_k = gr.Slider(minimum=0, maximum=1000, step=1, value=100, label="Top K")
repetition_penalty = gr.Slider(minimum=0.01, maximum=1.99, step=0.01, value=1.05, label="Repetition Penalty")
max_tokens = gr.Slider(minimum=1, maximum=4096, step=1, value=512, label="Max Tokens")
submit = gr.Button("β–Ά")
maintain = gr.Button("☁️")
with gr.Column():
output = gr.Textbox(lines=1, value="", label="Output")
submit.click(fn=generate, inputs=[input, instruction, sampling, temperature, top_p, top_k, repetition_penalty, max_tokens], outputs=[output], queue=False)
maintain.click(cloud, inputs=[], outputs=[], queue=False)
main.launch(show_api=True)