Spaces:
Running
Running
# Imports | |
import gradio as gr | |
import spaces | |
import torch | |
import os | |
import math | |
import gc | |
import librosa | |
import tempfile | |
from PIL import Image, ImageSequence | |
from decord import VideoReader, cpu | |
from transformers import AutoModel, AutoTokenizer, AutoProcessor | |
# Variables | |
DEVICE = "auto" | |
if DEVICE == "auto": | |
DEVICE = "cuda" if torch.cuda.is_available() else "cpu" | |
print(f"[SYSTEM] | Using {DEVICE} type compute device.") | |
DEFAULT_INPUT = "Describe in one short sentence." | |
MAX_FRAMES = 64 | |
AUDIO_SR = 16000 | |
model_name = "openbmb/MiniCPM-o-2_6" | |
repo = AutoModel.from_pretrained(model_name, trust_remote_code=True, attn_implementation="sdpa", torch_dtype=torch.bfloat16).to(DEVICE) | |
tokenizer = AutoTokenizer.from_pretrained(model_name, trust_remote_code=True) | |
processor = AutoProcessor.from_pretrained(model_name, trust_remote_code=True) | |
css = ''' | |
.gradio-container{max-width: 560px !important} | |
h1{text-align:center} | |
footer { | |
visibility: hidden | |
} | |
''' | |
global_instruction = "You will analyze image, GIF, video, and audio input, then use as much keywords to describe the given content and take as much guesses of what it could be." | |
input_prefixes = { | |
"Image": "Analyze the 'β' image.", | |
"GIF": "Analyze the 'β' GIF.", | |
"Video": "Analyze the 'β' video including the audio associated with the video.", | |
"Audio": "Analyze the 'β' audio.", | |
} | |
filetypes = { | |
"Image": [".jpg", ".jpeg", ".png", ".bmp"], | |
"GIF": [".gif"], | |
"Video": [".mp4", ".mov", ".avi", ".mkv"], | |
"Audio": [".wav", ".mp3", ".flac", ".aac"], | |
} | |
# Functions | |
uniform_sample=lambda seq, n: seq[::max(len(seq) // n,1)][:n] | |
def build_video(path): | |
vr = VideoReader(path, ctx = cpu(0)) | |
i = uniform_sample(range(len(vr)), MAX_FRAMES) | |
batch = vr.get_batch(i).asnumpy() | |
frames = [Image.fromarray(frame.astype("uint8")) for frame in batch] | |
audio = build_audio(path) | |
audio_length = math.ceil(len(audio) / AUDIO_SR) | |
total_length = max(1, min(len(frames), audio_length)) | |
contents = [] | |
for i in range(total_length): | |
frame = frames[i] if i < len(frames) else frames[-1] | |
start = i * AUDIO_SR | |
end = min((i + 1) * AUDIO_SR, len(audio)) | |
chunk = audio[start:end] | |
if chunk.size == 0: break | |
contents.extend([frame, chunk]) | |
return contents | |
def build_image(path): | |
image = Image.open(path).convert("RGB") | |
return image | |
def build_gif(path): | |
image = Image.open(path) | |
frames = [f.copy().convert("RGB") for f in ImageSequence.Iterator(image)] | |
frames = uniform_sample(frames, MAX_FRAMES) | |
return frames | |
def build_audio(path): | |
audio, _ = librosa.load(path, sr=AUDIO_SR, mono=True) | |
return audio | |
def generate(input, instruction=DEFAULT_INPUT, sampling=False, temperature=0.7, top_p=0.8, top_k=100, repetition_penalty=1.05, max_tokens=512): | |
if not input: return "No input provided." | |
extension = os.path.splitext(input)[1].lower() | |
filetype = next((k for k, v in filetypes.items() if extension in v), None) | |
if not filetype: return "Unsupported file type." | |
filename = os.path.basename(input) | |
prefix = input_prefixes[filetype].replace("β", filename) | |
builder_map = { | |
"Image": build_image, | |
"GIF" : build_gif, | |
"Video": build_video, | |
"Audio": build_audio, | |
} | |
instruction = f"{global_instruction}\n{prefix}\n{instruction}" | |
omni_content = builder_map[filetype](input) | |
msgs = [{ "role": "user", "content": [omni_content, instruction] }] | |
print(msgs) | |
output = repo.chat( | |
msgs=msgs, | |
tokenizer=tokenizer, | |
sampling=sampling, | |
temperature= temperature, | |
top_p=top_p, | |
top_k=top_k, | |
repetition_penalty=repetition_penalty, | |
max_new_tokens=max_tokens, | |
omni_input=True, | |
use_image_id=False, | |
max_slice_nums=9 | |
) | |
torch.cuda.empty_cache() | |
gc.collect() | |
return output | |
def cloud(): | |
print("[CLOUD] | Space maintained.") | |
# Initialize | |
with gr.Blocks(css=css) as main: | |
with gr.Column(): | |
input = gr.File(label="Input", file_types=["image", "video", "audio"], type="filepath") | |
instruction = gr.Textbox(lines=1, value=DEFAULT_INPUT, label="Instruction") | |
sampling = gr.Checkbox(value=False, label="Sampling") | |
temperature = gr.Slider(minimum=0, maximum=2, step=0.01, value=1, label="Temperature") | |
top_p = gr.Slider(minimum=0, maximum=1, step=0.01, value=0.95, label="Top P") | |
top_k = gr.Slider(minimum=0, maximum=1000, step=1, value=50, label="Top K") | |
repetition_penalty = gr.Slider(minimum=0, maximum=2, step=0.01, value=1.05, label="Repetition Penalty") | |
max_tokens = gr.Slider(minimum=1, maximum=4096, step=1, value=512, label="Max Tokens") | |
submit = gr.Button("βΆ") | |
maintain = gr.Button("βοΈ") | |
with gr.Column(): | |
output = gr.Textbox(lines=1, value="", label="Output") | |
submit.click(fn=generate, inputs=[input, instruction, sampling, temperature, top_p, top_k, repetition_penalty, max_tokens], outputs=[output], queue=False) | |
maintain.click(cloud, inputs=[], outputs=[], queue=False) | |
main.launch(show_api=True) |