Spaces:
Running
Running
File size: 8,178 Bytes
e003942 02f51d7 e003942 02f51d7 4c7150a 02f51d7 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 |
import os
import asyncio
import shutil
import gradio as gr
from multimodal_module import MultiModalChatModule
# Optional: keep model cache persistent across restarts
os.makedirs("model_cache", exist_ok=True)
os.environ.setdefault("HF_HUB_DISABLE_TELEMETRY", "1")
os.environ.setdefault("HF_HUB_ENABLE_HF_TRANSFER", "1")
os.environ.setdefault("TOKENIZERS_PARALLELISM", "false")
mm = MultiModalChatModule()
# --- Small wrapper so Gradio file paths work with your module's .download_to_drive API ---
class _GradioFile:
def __init__(self, path: str | None):
self.path = path
async def download_to_drive(self, dest: str):
if not self.path:
raise ValueError("No file path provided.")
shutil.copy(self.path, dest)
# -------------------------
# TEXT CHAT
# -------------------------
async def chat_fn(user_id: str, message: str, lang: str):
uid = int(user_id or "1")
message = message or ""
lang = (lang or "en").strip()
return await mm.generate_response(message, uid, lang=lang)
# -------------------------
# TTS (generate_voice_reply)
# -------------------------
async def tts_fn(user_id: str, text: str, fmt: str):
uid = int(user_id or "1")
out_path = await mm.generate_voice_reply(text or "", user_id=uid, fmt=fmt)
# Gradio expects the file path for Audio/Image outputs
return out_path
# -------------------------
# VOICE -> TEXT (+emotion)
# -------------------------
async def voice_fn(user_id: str, audio_path: str | None):
uid = int(user_id or "1")
if not audio_path:
return {"text": "", "language": "en", "emotion": "no_audio", "is_speech": False}
result = await mm.process_voice_message(_GradioFile(audio_path), user_id=uid)
return result
# -------------------------
# IMAGE: caption
# -------------------------
async def img_caption_fn(user_id: str, image_path: str | None):
uid = int(user_id or "1")
if not image_path:
return "No image provided."
caption = await mm.process_image_message(_GradioFile(image_path), user_id=uid)
return caption
# -------------------------
# IMAGE: text2img
# -------------------------
async def img_generate_fn(user_id: str, prompt: str, width: int, height: int, steps: int):
uid = int(user_id or "1")
img_path = await mm.generate_image_from_text(prompt or "", user_id=uid, width=width, height=height, steps=steps)
return img_path
# -------------------------
# IMAGE: inpaint
# -------------------------
async def img_inpaint_fn(user_id: str, image_path: str | None, mask_path: str | None, prompt: str):
uid = int(user_id or "1")
if not image_path:
return None
out_path = await mm.edit_image_inpaint(
_GradioFile(image_path),
_GradioFile(mask_path) if mask_path else None,
prompt=prompt or "",
user_id=uid,
)
return out_path
# -------------------------
# VIDEO: process
# -------------------------
async def video_fn(user_id: str, video_path: str | None, max_frames: int):
uid = int(user_id or "1")
if not video_path:
return {"duration": 0, "fps": 0, "transcription": "", "captions": []}
result = await mm.process_video(_GradioFile(video_path), user_id=uid, max_frames=max_frames)
return result
# -------------------------
# FILE: process (pdf/docx/txt/csv)
# -------------------------
async def file_fn(user_id: str, file_path: str | None):
uid = int(user_id or "1")
if not file_path:
return {"summary": "", "length": 0, "type": ""}
result = await mm.process_file(_GradioFile(file_path), user_id=uid)
return result
# -------------------------
# CODE: complete
# -------------------------
async def code_complete_fn(prompt: str, max_tokens: int, temperature: float):
return await mm.code_complete(prompt or "", max_tokens=max_tokens, temperature=temperature)
# -------------------------
# CODE: execute (DANGEROUS)
# -------------------------
async def code_exec_fn(code: str, timeout: int):
# Your module already time-limits; still, treat as unsafe
result = await mm.execute_python_code(code or "", timeout=timeout)
# Present nicely
if "error" in result:
return f"ERROR: {result['error']}"
out = []
if result.get("stdout"):
out.append(f"[stdout]\n{result['stdout']}")
if result.get("stderr"):
out.append(f"[stderr]\n{result['stderr']}")
return "\n".join(out).strip() or "(no output)"
with gr.Blocks(title="Multimodal Space") as demo:
gr.Markdown("# ๐ฎ Multimodal Space")
with gr.Row():
user_id = gr.Textbox(label="User ID", value="1", scale=1)
lang = gr.Textbox(label="Language code (e.g., en, fr, es)", value="en", scale=1)
with gr.Tab("๐ฌ Chat"):
msg_in = gr.Textbox(label="Message")
msg_out = gr.Textbox(label="Response", interactive=False)
gr.Button("Send").click(chat_fn, [user_id, msg_in, lang], msg_out)
with gr.Tab("๐ฃ๏ธ Voice โ Text (+ Emotion)"):
audio_in = gr.Audio(sources=["microphone", "upload"], type="filepath", label="Upload/record voice (ogg/wav/mp3)")
voice_json = gr.JSON(label="Result")
gr.Button("Transcribe & Analyze").click(voice_fn, [user_id, audio_in], voice_json)
with gr.Tab("๐ TTS"):
tts_text = gr.Textbox(label="Text to speak")
tts_fmt = gr.Dropdown(choices=["ogg", "wav", "mp3"], value="ogg", label="Format")
tts_audio = gr.Audio(label="Generated Audio", interactive=False)
gr.Button("Generate Voice Reply").click(tts_fn, [user_id, tts_text, tts_fmt], tts_audio)
with gr.Tab("๐ผ๏ธ Image Caption"):
img_in = gr.Image(type="filepath", label="Image")
caption_out = gr.Textbox(label="Caption", interactive=False)
gr.Button("Caption").click(img_caption_fn, [user_id, img_in], caption_out)
with gr.Tab("๐จ Text โ Image"):
ti_prompt = gr.Textbox(label="Prompt")
with gr.Row():
ti_w = gr.Slider(256, 768, value=512, step=64, label="Width")
ti_h = gr.Slider(256, 768, value=512, step=64, label="Height")
ti_steps = gr.Slider(10, 50, value=30, step=1, label="Steps")
ti_out = gr.Image(label="Generated Image", interactive=False, type="filepath")
gr.Button("Generate").click(img_generate_fn, [user_id, ti_prompt, ti_w, ti_h, ti_steps], ti_out)
with gr.Tab("๐ฉน Inpaint"):
base_img = gr.Image(type="filepath", label="Base image")
mask_img = gr.Image(type="filepath", label="Mask (white = keep, black = edit)", optional=True)
inpaint_prompt = gr.Textbox(label="Prompt")
inpaint_out = gr.Image(label="Edited Image", interactive=False, type="filepath")
gr.Button("Inpaint").click(img_inpaint_fn, [user_id, base_img, mask_img, inpaint_prompt], inpaint_out)
with gr.Tab("๐๏ธ Video"):
vid_in = gr.Video(label="Video file")
max_frames = gr.Slider(1, 12, value=4, step=1, label="Max keyframes to sample")
vid_json = gr.JSON(label="Result (duration/fps/transcript/captions)")
gr.Button("Process Video").click(video_fn, [user_id, vid_in, max_frames], vid_json)
with gr.Tab("๐ File"):
file_in = gr.File(label="Upload file (pdf/docx/txt/csv)", type="filepath")
file_json = gr.JSON(label="Summary")
gr.Button("Process File").click(file_fn, [user_id, file_in], file_json)
with gr.Tab("๐จโ๐ป Code"):
cc_prompt = gr.Textbox(label="Completion prompt")
cc_tokens = gr.Slider(16, 1024, value=256, step=16, label="Max tokens")
cc_temp = gr.Slider(0.0, 1.0, value=0.2, step=0.05, label="Temperature")
cc_out = gr.Code(label="Completion")
gr.Button("Complete").click(code_complete_fn, [cc_prompt, cc_tokens, cc_temp], cc_out)
ce_code = gr.Code(label="Execute Python (sandboxed, time-limited)")
ce_timeout = gr.Slider(1, 10, value=5, step=1, label="Timeout (s)")
ce_out = gr.Code(label="Exec output")
gr.Button("Run Code").click(code_exec_fn, [ce_code, ce_timeout], ce_out)
# Make API-callable and Space-visible
demo.queue(concurrency_count=2, max_size=32).launch(server_name="0.0.0.0") |