# Imports import gradio as gr import spaces import torch import os import librosa from PIL import Image, ImageSequence from decord import VideoReader, cpu from transformers import AutoModel, AutoTokenizer, AutoProcessor # Variables DEVICE = "auto" if DEVICE == "auto": DEVICE = "cuda" if torch.cuda.is_available() else "cpu" print(f"[SYSTEM] | Using {DEVICE} type compute device.") DEFAULT_INPUT = "Describe in one short sentence." MAX_FRAMES = 64 model_name = "openbmb/MiniCPM-o-2_6" repo = AutoModel.from_pretrained(model_name, trust_remote_code=True, attn_implementation="sdpa", torch_dtype=torch.bfloat16).to(DEVICE) tokenizer = AutoTokenizer.from_pretrained(model_name, trust_remote_code=True) processor = AutoProcessor.from_pretrained(model_name, trust_remote_code=True) css = ''' .gradio-container{max-width: 560px !important} h1{text-align:center} footer { visibility: hidden } ''' input_prefixes = { "Image": "(A image file called █ has been attached) ", "GIF": "(A GIF file called █ has been attached) ", "Video": "(A video with audio file called █ has been attached) ", "Audio": "(A audio file called █ has been attached) ", } filetypes = { "Image": [".jpg", ".jpeg", ".png", ".bmp"], "GIF": [".gif"], "Video": [".mp4", ".mov", ".avi", ".mkv"], "Audio": [".wav", ".mp3", ".flac", ".aac"], } def uniform_sample(idxs, n): gap = len(idxs) / n return [idxs[int(i * gap + gap / 2)] for i in range(n)] def encode_video(path): vr = VideoReader(path, ctx=cpu(0)) fps = round(vr.get_avg_fps()) idxs = list(range(0, len(vr), fps)) if len(idxs) > MAX_FRAMES: idxs = uniform_sample(idxs, MAX_FRAMES) frames = vr.get_batch(idxs).asnumpy() return [Image.fromarray(f.astype("uint8")) for f in frames] def encode_gif(path): img = Image.open(path) frames = [frame.copy().convert("RGB") for frame in ImageSequence.Iterator(img)] if len(frames) > MAX_FRAMES: frames = uniform_sample(frames, MAX_FRAMES) return frames @spaces.GPU(duration=60) def generate(input, instruction=DEFAULT_INPUT, sampling=False, temperature=0.7, top_p=0.8, top_k=100, repetition_penalty=1.05, max_tokens=512): print(input) print(instruction) if not input: return "No input provided." extension = os.path.splitext(input)[1].lower() filetype = None for category, extensions in filetypes.items(): if extension in extensions: filetype = category break content = [] if filetype == "Image": image = Image.open(input).convert("RGB") content.append(image) elif filetype == "GIF": frames = encode_gif(input) content.extend(frames) elif filetype == "Video": frames = encode_video(input) content.extend(frames) audio, _ = librosa.load(input, sr=16000, mono=True) content.append(audio) elif filetype == "Audio": audio, _ = librosa.load(input, sr=16000, mono=True) content.append(audio) else: return "Unsupported file type." filename = os.path.basename(input) prefix = input_prefixes[filetype].replace("█", filename) content.append(prefix + instruction) inputs_payload = [{"role": "user", "content": content}] params = { "msgs": inputs_payload, "tokenizer": tokenizer, "sampling": sampling, "temperature": temperature, "top_p": top_p, "top_k": top_k, "repetition_penalty": repetition_penalty, "max_new_tokens": max_tokens, } output = repo.chat(**params) print(output) return output def cloud(): print("[CLOUD] | Space maintained.") # Initialize with gr.Blocks(css=css) as main: with gr.Column(): input = gr.File(label="Input", file_types=["image", "video", "audio"], type="filepath") instruction = gr.Textbox(lines=1, value=DEFAULT_INPUT, label="Instruction") sampling = gr.Checkbox(value=False, label="Sampling") temperature = gr.Slider(minimum=0.01, maximum=1.99, step=0.01, value=0.7, label="Temperature") top_p = gr.Slider(minimum=0, maximum=1, step=0.01, value=0.8, label="Top P") top_k = gr.Slider(minimum=0, maximum=1000, step=1, value=100, label="Top K") repetition_penalty = gr.Slider(minimum=0.01, maximum=1.99, step=0.01, value=1.05, label="Repetition Penalty") max_tokens = gr.Slider(minimum=1, maximum=4096, step=1, value=512, label="Max Tokens") submit = gr.Button("▶") maintain = gr.Button("☁️") with gr.Column(): output = gr.Textbox(lines=1, value="", label="Output") submit.click(fn=generate, inputs=[input, instruction, sampling, temperature, top_p, top_k, repetition_penalty, max_tokens], outputs=[output], queue=False) maintain.click(cloud, inputs=[], outputs=[], queue=False) main.launch(show_api=True)