Sense / app.py
Staticaliza's picture
Update app.py
03f6f58 verified
raw
history blame
6.22 kB
# Imports
import gradio as gr
import spaces
import torch
import os
import math
import librosa
from PIL import Image, ImageSequence
from decord import VideoReader, cpu
from transformers import AutoModel, AutoTokenizer, AutoProcessor
# Variables
DEVICE = "auto"
if DEVICE == "auto":
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
print(f"[SYSTEM] | Using {DEVICE} type compute device.")
DEFAULT_INPUT = "Describe in one short sentence."
MAX_FRAMES = 64
model_name = "openbmb/MiniCPM-o-2_6"
repo = AutoModel.from_pretrained(model_name, trust_remote_code=True, attn_implementation="sdpa", torch_dtype=torch.bfloat16).to(DEVICE)
tokenizer = AutoTokenizer.from_pretrained(model_name, trust_remote_code=True)
processor = AutoProcessor.from_pretrained(model_name, trust_remote_code=True)
css = '''
.gradio-container{max-width: 560px !important}
h1{text-align:center}
footer {
visibility: hidden
}
'''
input_prefixes = {
"Image": "(A image file called β–ˆ has been attached, describe the image content) ",
"GIF": "(A GIF file called β–ˆ has been attached, describe the GIF content) ",
"Video": "(A video with audio file called β–ˆ has been attached, describe the video content and the audio content embedded into the video) ",
"Audio": "(A audio file called β–ˆ has been attached, describe the audio content) ",
}
filetypes = {
"Image": [".jpg", ".jpeg", ".png", ".bmp"],
"GIF": [".gif"],
"Video": [".mp4", ".mov", ".avi", ".mkv"],
"Audio": [".wav", ".mp3", ".flac", ".aac"],
}
def uniform_sample(idxs, n):
gap = len(idxs) / n
return [idxs[int(i * gap + gap / 2)] for i in range(n)]
def build_omni_chunks(path, sr=16000, seconds_per_unit=1):
vr = VideoReader(path, ctx=cpu(0))
fps = round(vr.get_avg_fps())
audio_np, _ = librosa.load(path, sr=sr, mono=True)
total_units = math.ceil(len(vr) / fps / seconds_per_unit)
content = []
for i in range(total_units):
frame = Image.fromarray(vr[int(i * fps * seconds_per_unit)].asnumpy().astype("uint8"))
audio_chunk = audio_np[sr * i * seconds_per_unit : sr * (i + 1) * seconds_per_unit]
content.extend(["<unit>", frame, audio_chunk])
return content
def encode_video(path):
vr = VideoReader(path, ctx=cpu(0))
fps = round(vr.get_avg_fps())
idxs = list(range(0, len(vr), fps))
if len(idxs) > MAX_FRAMES:
idxs = uniform_sample(idxs, MAX_FRAMES)
frames = vr.get_batch(idxs).asnumpy()
return [Image.fromarray(f.astype("uint8")) for f in frames]
def encode_gif(path):
img = Image.open(path)
frames = [frame.copy().convert("RGB") for frame in ImageSequence.Iterator(img)]
if len(frames) > MAX_FRAMES:
frames = uniform_sample(frames, MAX_FRAMES)
return frames
@spaces.GPU(duration=60)
def generate(input, instruction=DEFAULT_INPUT, sampling=False, temperature=0.7, top_p=0.8, top_k=100, repetition_penalty=1.05, max_tokens=512):
print(input)
print(instruction)
if not input:
return "No input provided."
extension = os.path.splitext(input)[1].lower()
filetype = None
for category, extensions in filetypes.items():
if extension in extensions:
filetype = category
break
content = []
if filetype == "Image":
image = Image.open(input).convert("RGB")
content.append(image)
elif filetype == "GIF":
frames = encode_gif(input)
content.extend(frames)
elif filetype == "Video":
omni_content = build_omni_chunks(input) + [instruction]
sys_msg = repo.get_sys_prompt(mode="omni", language="en")
msgs = [sys_msg, {"role": "user", "content": omni_content}]
print(msgs)
elif filetype == "Audio":
audio_np, sample_rate = librosa.load(input, sr=16000, mono=True)
chunk_tensor = torch.from_numpy(audio_np).float().to(DEVICE)
content.append({"array": chunk_tensor, "sampling_rate": sample_rate})
"""
elif filetype == "Video":
frames = encode_video(input)
content.extend(frames)
audio, _ = librosa.load(input, sr=16000, mono=True)
content.append(audio)
elif filetype == "Audio":
audio, _ = librosa.load(input, sr=16000, mono=True)
content.append(audio)
else:
return "Unsupported file type."
"""
filename = os.path.basename(input)
prefix = input_prefixes[filetype].replace("β–ˆ", filename)
content.append(prefix + instruction)
inputs_payload = [{"role": "user", "content": content}]
params = {
"msgs": msgs or inputs_payload,
"tokenizer": tokenizer,
"sampling": sampling,
"temperature": temperature,
"top_p": top_p,
"top_k": top_k,
"repetition_penalty": repetition_penalty,
"max_new_tokens": max_tokens,
"omni_input": filetype == "Video",
}
output = repo.chat(**params)
print(output)
return output
def cloud():
print("[CLOUD] | Space maintained.")
# Initialize
with gr.Blocks(css=css) as main:
with gr.Column():
input = gr.File(label="Input", file_types=["image", "video", "audio"], type="filepath")
instruction = gr.Textbox(lines=1, value=DEFAULT_INPUT, label="Instruction")
sampling = gr.Checkbox(value=False, label="Sampling")
temperature = gr.Slider(minimum=0.01, maximum=1.99, step=0.01, value=0.7, label="Temperature")
top_p = gr.Slider(minimum=0, maximum=1, step=0.01, value=0.8, label="Top P")
top_k = gr.Slider(minimum=0, maximum=1000, step=1, value=100, label="Top K")
repetition_penalty = gr.Slider(minimum=0.01, maximum=1.99, step=0.01, value=1.05, label="Repetition Penalty")
max_tokens = gr.Slider(minimum=1, maximum=4096, step=1, value=512, label="Max Tokens")
submit = gr.Button("β–Ά")
maintain = gr.Button("☁️")
with gr.Column():
output = gr.Textbox(lines=1, value="", label="Output")
submit.click(fn=generate, inputs=[input, instruction, sampling, temperature, top_p, top_k, repetition_penalty, max_tokens], outputs=[output], queue=False)
maintain.click(cloud, inputs=[], outputs=[], queue=False)
main.launch(show_api=True)