Spaces:
Running
Running
File size: 6,247 Bytes
fef0a8d 99eb93c fef0a8d 4cab0f7 294c109 7820541 45099c6 542f90d fef0a8d 294c109 fef0a8d f8a64f8 45099c6 5268082 e52a62d 0629ecb 4bd5128 294c109 4bd5128 96f2f76 7820541 409f566 7820541 4cab0f7 7820541 4cab0f7 45099c6 409f566 4cab0f7 294c109 4cab0f7 5268082 4cab0f7 5a25e75 029aec2 294c109 7820541 c60c480 294c109 7820541 4cab0f7 7820541 4cab0f7 7147f65 3ca6e64 4cab0f7 e6f4055 409f566 e6f4055 7147f65 def2ac9 842b1c3 4cab0f7 842b1c3 4cab0f7 e6f4055 f8a64f8 7820541 5a25e75 294c109 5a25e75 af21e2a 5268082 1b6a68d 5268082 294c109 5a25e75 8a86647 294c109 5268082 294c109 5268082 46010b5 4cab0f7 5268082 eb3d2f3 5268082 eb3d2f3 5268082 5a25e75 5268082 5a25e75 5268082 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 |
# Imports
import gradio as gr
import spaces
import torch
import os
import librosa
from PIL import Image, ImageSequence
from decord import VideoReader, cpu
from transformers import AutoModel, AutoTokenizer, AutoProcessor
# Variables
DEVICE = "auto"
if DEVICE == "auto":
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
print(f"[SYSTEM] | Using {DEVICE} type compute device.")
DEFAULT_INPUT = "Describe in one short sentence."
MAX_FRAMES = 64
model_name = "openbmb/MiniCPM-o-2_6"
repo = AutoModel.from_pretrained(model_name, trust_remote_code=True, attn_implementation="sdpa", torch_dtype=torch.bfloat16).to(DEVICE)
tokenizer = AutoTokenizer.from_pretrained(model_name, trust_remote_code=True)
processor = AutoProcessor.from_pretrained(model_name, trust_remote_code=True)
css = '''
.gradio-container{max-width: 560px !important}
h1{text-align:center}
footer {
visibility: hidden
}
'''
input_prefixes = {
"Image": "(A image file called β has been attached, describe the image content) ",
"GIF": "(A GIF file called β has been attached, describe the GIF content) ",
"Video": "(A video with audio file called β has been attached, describe the video content and the audio content embedded into the video) ",
"Audio": "(A audio file called β has been attached, describe the audio content) ",
}
filetypes = {
"Image": [".jpg", ".jpeg", ".png", ".bmp"],
"GIF": [".gif"],
"Video": [".mp4", ".mov", ".avi", ".mkv"],
"Audio": [".wav", ".mp3", ".flac", ".aac"],
}
def uniform_sample(idxs, n):
gap = len(idxs) / n
return [idxs[int(i * gap + gap / 2)] for i in range(n)]
def build_omni_chunks(path, sr=16000, seconds_per_unit=1):
vr = VideoReader(path, ctx=cpu(0))
fps = round(vr.get_avg_fps())
audio_np, _ = librosa.load(path, sr=sr, mono=True)
total_units = math.ceil(len(vr) / fps / seconds_per_unit)
content = []
for i in range(total_units):
frame = Image.fromarray(vr[int(i * fps * seconds_per_unit)].asnumpy().astype("uint8"))
audio_chunk = audio_np[sr * i * seconds_per_unit : sr * (i + 1) * seconds_per_unit]
content.extend(["<unit>", frame, audio_chunk])
return content
def encode_video(path):
vr = VideoReader(path, ctx=cpu(0))
fps = round(vr.get_avg_fps())
idxs = list(range(0, len(vr), fps))
if len(idxs) > MAX_FRAMES:
idxs = uniform_sample(idxs, MAX_FRAMES)
frames = vr.get_batch(idxs).asnumpy()
return [Image.fromarray(f.astype("uint8")) for f in frames]
def encode_gif(path):
img = Image.open(path)
frames = [frame.copy().convert("RGB") for frame in ImageSequence.Iterator(img)]
if len(frames) > MAX_FRAMES:
frames = uniform_sample(frames, MAX_FRAMES)
return frames
@spaces.GPU(duration=60)
def generate(input, instruction=DEFAULT_INPUT, sampling=False, temperature=0.7, top_p=0.8, top_k=100, repetition_penalty=1.05, max_tokens=512):
print(input)
print(instruction)
if not input:
return "No input provided."
extension = os.path.splitext(input)[1].lower()
filetype = None
for category, extensions in filetypes.items():
if extension in extensions:
filetype = category
break
content = []
if filetype == "Image":
image = Image.open(input).convert("RGB")
content.append(image)
elif filetype == "GIF":
frames = encode_gif(input)
content.extend(frames)
elif filetype == "Video":
omni_content = build_omni_chunks(input) + [instruction]
sys_msg = repo.get_sys_prompt(mode="omni", language="en")
msgs = [sys_msg, {"role": "user", "content": omni_content}]
params = dict(msgs=msgs, tokenizer=tokenizer, omni_input=True, **kw)
return repo.chat(**params
elif filetype == "Audio":
audio_np, sample_rate = librosa.load(input, sr=16000, mono=True)
chunk_tensor = torch.from_numpy(audio_np).float().to(DEVICE)
content.append({"array": chunk_tensor, "sampling_rate": sample_rate})
"""
elif filetype == "Video":
frames = encode_video(input)
content.extend(frames)
audio, _ = librosa.load(input, sr=16000, mono=True)
content.append(audio)
elif filetype == "Audio":
audio, _ = librosa.load(input, sr=16000, mono=True)
content.append(audio)
else:
return "Unsupported file type."
"""
filename = os.path.basename(input)
prefix = input_prefixes[filetype].replace("β", filename)
content.append(prefix + instruction)
inputs_payload = [{"role": "user", "content": content}]
params = {
"msgs": inputs_payload,
"tokenizer": tokenizer,
"sampling": sampling,
"temperature": temperature,
"top_p": top_p,
"top_k": top_k,
"repetition_penalty": repetition_penalty,
"max_new_tokens": max_tokens,
}
output = repo.chat(**params)
print(output)
return output
def cloud():
print("[CLOUD] | Space maintained.")
# Initialize
with gr.Blocks(css=css) as main:
with gr.Column():
input = gr.File(label="Input", file_types=["image", "video", "audio"], type="filepath")
instruction = gr.Textbox(lines=1, value=DEFAULT_INPUT, label="Instruction")
sampling = gr.Checkbox(value=False, label="Sampling")
temperature = gr.Slider(minimum=0.01, maximum=1.99, step=0.01, value=0.7, label="Temperature")
top_p = gr.Slider(minimum=0, maximum=1, step=0.01, value=0.8, label="Top P")
top_k = gr.Slider(minimum=0, maximum=1000, step=1, value=100, label="Top K")
repetition_penalty = gr.Slider(minimum=0.01, maximum=1.99, step=0.01, value=1.05, label="Repetition Penalty")
max_tokens = gr.Slider(minimum=1, maximum=4096, step=1, value=512, label="Max Tokens")
submit = gr.Button("βΆ")
maintain = gr.Button("βοΈ")
with gr.Column():
output = gr.Textbox(lines=1, value="", label="Output")
submit.click(fn=generate, inputs=[input, instruction, sampling, temperature, top_p, top_k, repetition_penalty, max_tokens], outputs=[output], queue=False)
maintain.click(cloud, inputs=[], outputs=[], queue=False)
main.launch(show_api=True) |