# Imports import gradio as gr import spaces import torch import os import math import gc import librosa import tempfile from PIL import Image, ImageSequence from decord import VideoReader, cpu from moviepy.editor import VideoFileClip from transformers import AutoModel, AutoTokenizer, AutoProcessor # Variables DEVICE = "auto" if DEVICE == "auto": DEVICE = "cuda" if torch.cuda.is_available() else "cpu" print(f"[SYSTEM] | Using {DEVICE} type compute device.") DEFAULT_INPUT = "Describe in one short sentence." MAX_FRAMES = 64 AUDIO_SR = 16000 model_name = "openbmb/MiniCPM-o-2_6" repo = AutoModel.from_pretrained(model_name, trust_remote_code=True, attn_implementation="sdpa", torch_dtype=torch.bfloat16).to(DEVICE) tokenizer = AutoTokenizer.from_pretrained(model_name, trust_remote_code=True) processor = AutoProcessor.from_pretrained(model_name, trust_remote_code=True) css = ''' .gradio-container{max-width: 560px !important} h1{text-align:center} footer { visibility: hidden } ''' input_prefixes = { "Image": "(A image file called █ has been attached, describe the image content) ", "GIF": "(A GIF file called █ has been attached, describe the GIF content) ", "Video": "(A audio video file called █ has been attached, describe the video content and the audio content) ", "Audio": "(A audio file called █ has been attached, describe the audio content) ", } filetypes = { "Image": [".jpg", ".jpeg", ".png", ".bmp"], "GIF": [".gif"], "Video": [".mp4", ".mov", ".avi", ".mkv"], "Audio": [".wav", ".mp3", ".flac", ".aac"], } # Functions def infer_filetype(ext): return next((k for k, v in filetypes.items() if ext in v), None) def uniform_sample(seq, n): step = max(len(seq) // n, 1) return seq[::step][:n] def frames_from_video(path): vr = VideoReader(path, ctx = cpu(0)) idx = uniform_sample(range(len(vr)), MAX_FRAMES) batch = vr.get_batch(idx).asnumpy() return [Image.fromarray(frame.astype("uint8")) for frame in batch] def audio_from_video(path): clip = VideoFileClip(path) with tempfile.NamedTemporaryFile(suffix = ".wav", delete = True) as tmp: clip.audio.write_audiofile(tmp.name, codec = "pcm_s16le", fps = AUDIO_SR, verbose = False, logger = None) audio_np, _ = librosa.load(tmp.name, sr = AUDIO_SR, mono = True) clip.close() return audio_np def load_audio(path): audio_np, _ = librosa.load(path, sr = AUDIO_SR, mono = True) return audio_np def build_video_omni(path, prefix, instruction): frames = frames_from_video(path) audio = audio_from_video(path) contents = [prefix + instruction] total = max(len(frames), math.ceil(len(audio) / AUDIO_SR)) for i in range(total): frame = frames[i] if i < len(frames) else frames[-1] chunk = audio[AUDIO_SR * i : AUDIO_SR * (i + 1)] contents.extend(["", frame, chunk]) return contents def build_image_omni(path, prefix, instruction): image = Image.open(path).convert("RGB") return [prefix + instruction, image] def build_gif_omni(path, prefix, instruction): img = Image.open(path) frames = [f.copy().convert("RGB") for f in ImageSequence.Iterator(img)] frames = uniform_sample(frames, MAX_FRAMES) return [prefix + instruction, *frames] def build_audio_omni(path, prefix, instruction): audio = load_audio(path) return [prefix + instruction, audio] @spaces.GPU(duration = 60) def generate(input, instruction = DEFAULT_INPUT, sampling = False, temperature = 0.7, top_p = 0.8, top_k = 100, repetition_penalty = 1.05, max_tokens = 512): if not input: return "no input provided." extension = os.path.splitext(input)[1].lower() filetype = infer_filetype(extension) if not filetype: return "unsupported file type." filename = os.path.basename(input) prefix = input_prefixes[filetype].replace("█", filename) builder_map = { "Image": build_image_omni, "GIF" : build_gif_omni, "Video": build_video_omni, "Audio": build_audio_omni } omni_content = builder_map[filetype](input, prefix, instruction) sys_msg = repo.get_sys_prompt(mode = "omni", language = "en") msgs = [sys_msg, { "role": "user", "content": omni_content }] output = repo.chat( msgs = msgs, tokenizer = tokenizer, sampling = sampling, temperature = temperature, top_p = top_p, top_k = top_k, repetition_penalty = repetition_penalty, max_new_tokens = max_tokens, omni_input = True, use_image_id = False, max_slice_nums = 2 ) torch.cuda.empty_cache() gc.collect() return output def cloud(): print("[CLOUD] | Space maintained.") # Initialize with gr.Blocks(css=css) as main: with gr.Column(): input = gr.File(label="Input", file_types=["image", "video", "audio"], type="filepath") instruction = gr.Textbox(lines=1, value=DEFAULT_INPUT, label="Instruction") sampling = gr.Checkbox(value=False, label="Sampling") temperature = gr.Slider(minimum=0.01, maximum=1.99, step=0.01, value=0.7, label="Temperature") top_p = gr.Slider(minimum=0, maximum=1, step=0.01, value=0.8, label="Top P") top_k = gr.Slider(minimum=0, maximum=1000, step=1, value=100, label="Top K") repetition_penalty = gr.Slider(minimum=0.01, maximum=1.99, step=0.01, value=1.05, label="Repetition Penalty") max_tokens = gr.Slider(minimum=1, maximum=4096, step=1, value=512, label="Max Tokens") submit = gr.Button("▶") maintain = gr.Button("☁️") with gr.Column(): output = gr.Textbox(lines=1, value="", label="Output") submit.click(fn=generate, inputs=[input, instruction, sampling, temperature, top_p, top_k, repetition_penalty, max_tokens], outputs=[output], queue=False) maintain.click(cloud, inputs=[], outputs=[], queue=False) main.launch(show_api=True)