Sense / app.py
Staticaliza's picture
Update app.py
409f566 verified
raw
history blame
6.25 kB
# Imports
import gradio as gr
import spaces
import torch
import os
import librosa
from PIL import Image, ImageSequence
from decord import VideoReader, cpu
from transformers import AutoModel, AutoTokenizer, AutoProcessor
# Variables
DEVICE = "auto"
if DEVICE == "auto":
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
print(f"[SYSTEM] | Using {DEVICE} type compute device.")
DEFAULT_INPUT = "Describe in one short sentence."
MAX_FRAMES = 64
model_name = "openbmb/MiniCPM-o-2_6"
repo = AutoModel.from_pretrained(model_name, trust_remote_code=True, attn_implementation="sdpa", torch_dtype=torch.bfloat16).to(DEVICE)
tokenizer = AutoTokenizer.from_pretrained(model_name, trust_remote_code=True)
processor = AutoProcessor.from_pretrained(model_name, trust_remote_code=True)
css = '''
.gradio-container{max-width: 560px !important}
h1{text-align:center}
footer {
visibility: hidden
}
'''
input_prefixes = {
"Image": "(A image file called β–ˆ has been attached, describe the image content) ",
"GIF": "(A GIF file called β–ˆ has been attached, describe the GIF content) ",
"Video": "(A video with audio file called β–ˆ has been attached, describe the video content and the audio content embedded into the video) ",
"Audio": "(A audio file called β–ˆ has been attached, describe the audio content) ",
}
filetypes = {
"Image": [".jpg", ".jpeg", ".png", ".bmp"],
"GIF": [".gif"],
"Video": [".mp4", ".mov", ".avi", ".mkv"],
"Audio": [".wav", ".mp3", ".flac", ".aac"],
}
def uniform_sample(idxs, n):
gap = len(idxs) / n
return [idxs[int(i * gap + gap / 2)] for i in range(n)]
def build_omni_chunks(path, sr=16000, seconds_per_unit=1):
vr = VideoReader(path, ctx=cpu(0))
fps = round(vr.get_avg_fps())
audio_np, _ = librosa.load(path, sr=sr, mono=True)
total_units = math.ceil(len(vr) / fps / seconds_per_unit)
content = []
for i in range(total_units):
frame = Image.fromarray(vr[int(i * fps * seconds_per_unit)].asnumpy().astype("uint8"))
audio_chunk = audio_np[sr * i * seconds_per_unit : sr * (i + 1) * seconds_per_unit]
content.extend(["<unit>", frame, audio_chunk])
return content
def encode_video(path):
vr = VideoReader(path, ctx=cpu(0))
fps = round(vr.get_avg_fps())
idxs = list(range(0, len(vr), fps))
if len(idxs) > MAX_FRAMES:
idxs = uniform_sample(idxs, MAX_FRAMES)
frames = vr.get_batch(idxs).asnumpy()
return [Image.fromarray(f.astype("uint8")) for f in frames]
def encode_gif(path):
img = Image.open(path)
frames = [frame.copy().convert("RGB") for frame in ImageSequence.Iterator(img)]
if len(frames) > MAX_FRAMES:
frames = uniform_sample(frames, MAX_FRAMES)
return frames
@spaces.GPU(duration=60)
def generate(input, instruction=DEFAULT_INPUT, sampling=False, temperature=0.7, top_p=0.8, top_k=100, repetition_penalty=1.05, max_tokens=512):
print(input)
print(instruction)
if not input:
return "No input provided."
extension = os.path.splitext(input)[1].lower()
filetype = None
for category, extensions in filetypes.items():
if extension in extensions:
filetype = category
break
content = []
if filetype == "Image":
image = Image.open(input).convert("RGB")
content.append(image)
elif filetype == "GIF":
frames = encode_gif(input)
content.extend(frames)
elif filetype == "Video":
omni_content = build_omni_chunks(input) + [instruction]
sys_msg = repo.get_sys_prompt(mode="omni", language="en")
msgs = [sys_msg, {"role": "user", "content": omni_content}]
params = dict(msgs=msgs, tokenizer=tokenizer, omni_input=True, **kw)
return repo.chat(**params
elif filetype == "Audio":
audio_np, sample_rate = librosa.load(input, sr=16000, mono=True)
chunk_tensor = torch.from_numpy(audio_np).float().to(DEVICE)
content.append({"array": chunk_tensor, "sampling_rate": sample_rate})
"""
elif filetype == "Video":
frames = encode_video(input)
content.extend(frames)
audio, _ = librosa.load(input, sr=16000, mono=True)
content.append(audio)
elif filetype == "Audio":
audio, _ = librosa.load(input, sr=16000, mono=True)
content.append(audio)
else:
return "Unsupported file type."
"""
filename = os.path.basename(input)
prefix = input_prefixes[filetype].replace("β–ˆ", filename)
content.append(prefix + instruction)
inputs_payload = [{"role": "user", "content": content}]
params = {
"msgs": inputs_payload,
"tokenizer": tokenizer,
"sampling": sampling,
"temperature": temperature,
"top_p": top_p,
"top_k": top_k,
"repetition_penalty": repetition_penalty,
"max_new_tokens": max_tokens,
}
output = repo.chat(**params)
print(output)
return output
def cloud():
print("[CLOUD] | Space maintained.")
# Initialize
with gr.Blocks(css=css) as main:
with gr.Column():
input = gr.File(label="Input", file_types=["image", "video", "audio"], type="filepath")
instruction = gr.Textbox(lines=1, value=DEFAULT_INPUT, label="Instruction")
sampling = gr.Checkbox(value=False, label="Sampling")
temperature = gr.Slider(minimum=0.01, maximum=1.99, step=0.01, value=0.7, label="Temperature")
top_p = gr.Slider(minimum=0, maximum=1, step=0.01, value=0.8, label="Top P")
top_k = gr.Slider(minimum=0, maximum=1000, step=1, value=100, label="Top K")
repetition_penalty = gr.Slider(minimum=0.01, maximum=1.99, step=0.01, value=1.05, label="Repetition Penalty")
max_tokens = gr.Slider(minimum=1, maximum=4096, step=1, value=512, label="Max Tokens")
submit = gr.Button("β–Ά")
maintain = gr.Button("☁️")
with gr.Column():
output = gr.Textbox(lines=1, value="", label="Output")
submit.click(fn=generate, inputs=[input, instruction, sampling, temperature, top_p, top_k, repetition_penalty, max_tokens], outputs=[output], queue=False)
maintain.click(cloud, inputs=[], outputs=[], queue=False)
main.launch(show_api=True)