Spaces:
Running
Running
import os | |
import asyncio | |
import shutil | |
import gradio as gr | |
from multimodal_module import MultiModalChatModule | |
# Optional: keep model cache persistent across restarts | |
os.makedirs("model_cache", exist_ok=True) | |
os.environ.setdefault("HF_HUB_DISABLE_TELEMETRY", "1") | |
os.environ.setdefault("HF_HUB_ENABLE_HF_TRANSFER", "1") | |
os.environ.setdefault("TOKENIZERS_PARALLELISM", "false") | |
mm = MultiModalChatModule() | |
# --- Small wrapper so Gradio file paths work with your module's .download_to_drive API --- | |
class _GradioFile: | |
def __init__(self, path: str | None): | |
self.path = path | |
async def download_to_drive(self, dest: str): | |
if not self.path: | |
raise ValueError("No file path provided.") | |
shutil.copy(self.path, dest) | |
# ------------------------- | |
# TEXT CHAT | |
# ------------------------- | |
async def chat_fn(user_id: str, message: str, lang: str): | |
uid = int(user_id or "1") | |
message = message or "" | |
lang = (lang or "en").strip() | |
return await mm.generate_response(message, uid, lang=lang) | |
# ------------------------- | |
# TTS (generate_voice_reply) | |
# ------------------------- | |
async def tts_fn(user_id: str, text: str, fmt: str): | |
uid = int(user_id or "1") | |
out_path = await mm.generate_voice_reply(text or "", user_id=uid, fmt=fmt) | |
# Gradio expects the file path for Audio/Image outputs | |
return out_path | |
# ------------------------- | |
# VOICE -> TEXT (+emotion) | |
# ------------------------- | |
async def voice_fn(user_id: str, audio_path: str | None): | |
uid = int(user_id or "1") | |
if not audio_path: | |
return {"text": "", "language": "en", "emotion": "no_audio", "is_speech": False} | |
result = await mm.process_voice_message(_GradioFile(audio_path), user_id=uid) | |
return result | |
# ------------------------- | |
# IMAGE: caption | |
# ------------------------- | |
async def img_caption_fn(user_id: str, image_path: str | None): | |
uid = int(user_id or "1") | |
if not image_path: | |
return "No image provided." | |
caption = await mm.process_image_message(_GradioFile(image_path), user_id=uid) | |
return caption | |
# ------------------------- | |
# IMAGE: text2img | |
# ------------------------- | |
async def img_generate_fn(user_id: str, prompt: str, width: int, height: int, steps: int): | |
uid = int(user_id or "1") | |
img_path = await mm.generate_image_from_text(prompt or "", user_id=uid, width=width, height=height, steps=steps) | |
return img_path | |
# ------------------------- | |
# IMAGE: inpaint | |
# ------------------------- | |
async def img_inpaint_fn(user_id: str, image_path: str | None, mask_path: str | None, prompt: str): | |
uid = int(user_id or "1") | |
if not image_path: | |
return None | |
out_path = await mm.edit_image_inpaint( | |
_GradioFile(image_path), | |
_GradioFile(mask_path) if mask_path else None, | |
prompt=prompt or "", | |
user_id=uid, | |
) | |
return out_path | |
# ------------------------- | |
# VIDEO: process | |
# ------------------------- | |
async def video_fn(user_id: str, video_path: str | None, max_frames: int): | |
uid = int(user_id or "1") | |
if not video_path: | |
return {"duration": 0, "fps": 0, "transcription": "", "captions": []} | |
result = await mm.process_video(_GradioFile(video_path), user_id=uid, max_frames=max_frames) | |
return result | |
# ------------------------- | |
# FILE: process (pdf/docx/txt/csv) | |
# ------------------------- | |
async def file_fn(user_id: str, file_path: str | None): | |
uid = int(user_id or "1") | |
if not file_path: | |
return {"summary": "", "length": 0, "type": ""} | |
result = await mm.process_file(_GradioFile(file_path), user_id=uid) | |
return result | |
# ------------------------- | |
# CODE: complete | |
# ------------------------- | |
async def code_complete_fn(prompt: str, max_tokens: int, temperature: float): | |
return await mm.code_complete(prompt or "", max_tokens=max_tokens, temperature=temperature) | |
# ------------------------- | |
# CODE: execute (DANGEROUS) | |
# ------------------------- | |
async def code_exec_fn(code: str, timeout: int): | |
# Your module already time-limits; still, treat as unsafe | |
result = await mm.execute_python_code(code or "", timeout=timeout) | |
# Present nicely | |
if "error" in result: | |
return f"ERROR: {result['error']}" | |
out = [] | |
if result.get("stdout"): | |
out.append(f"[stdout]\n{result['stdout']}") | |
if result.get("stderr"): | |
out.append(f"[stderr]\n{result['stderr']}") | |
return "\n".join(out).strip() or "(no output)" | |
with gr.Blocks(title="Multimodal Space") as demo: | |
gr.Markdown("# ๐ฎ Multimodal Space") | |
with gr.Row(): | |
user_id = gr.Textbox(label="User ID", value="1", scale=1) | |
lang = gr.Textbox(label="Language code (e.g., en, fr, es)", value="en", scale=1) | |
with gr.Tab("๐ฌ Chat"): | |
msg_in = gr.Textbox(label="Message") | |
msg_out = gr.Textbox(label="Response", interactive=False) | |
gr.Button("Send").click(chat_fn, [user_id, msg_in, lang], msg_out) | |
with gr.Tab("๐ฃ๏ธ Voice โ Text (+ Emotion)"): | |
audio_in = gr.Audio(sources=["microphone", "upload"], type="filepath", label="Upload/record voice (ogg/wav/mp3)") | |
voice_json = gr.JSON(label="Result") | |
gr.Button("Transcribe & Analyze").click(voice_fn, [user_id, audio_in], voice_json) | |
with gr.Tab("๐ TTS"): | |
tts_text = gr.Textbox(label="Text to speak") | |
tts_fmt = gr.Dropdown(choices=["ogg", "wav", "mp3"], value="ogg", label="Format") | |
tts_audio = gr.Audio(label="Generated Audio", interactive=False) | |
gr.Button("Generate Voice Reply").click(tts_fn, [user_id, tts_text, tts_fmt], tts_audio) | |
with gr.Tab("๐ผ๏ธ Image Caption"): | |
img_in = gr.Image(type="filepath", label="Image") | |
caption_out = gr.Textbox(label="Caption", interactive=False) | |
gr.Button("Caption").click(img_caption_fn, [user_id, img_in], caption_out) | |
with gr.Tab("๐จ Text โ Image"): | |
ti_prompt = gr.Textbox(label="Prompt") | |
with gr.Row(): | |
ti_w = gr.Slider(256, 768, value=512, step=64, label="Width") | |
ti_h = gr.Slider(256, 768, value=512, step=64, label="Height") | |
ti_steps = gr.Slider(10, 50, value=30, step=1, label="Steps") | |
ti_out = gr.Image(label="Generated Image", interactive=False, type="filepath") | |
gr.Button("Generate").click(img_generate_fn, [user_id, ti_prompt, ti_w, ti_h, ti_steps], ti_out) | |
with gr.Tab("๐ฉน Inpaint"): | |
base_img = gr.Image(type="filepath", label="Base image") | |
mask_img = gr.Image(type="filepath", label="Mask (white = keep, black = edit)", optional=True) | |
inpaint_prompt = gr.Textbox(label="Prompt") | |
inpaint_out = gr.Image(label="Edited Image", interactive=False, type="filepath") | |
gr.Button("Inpaint").click(img_inpaint_fn, [user_id, base_img, mask_img, inpaint_prompt], inpaint_out) | |
with gr.Tab("๐๏ธ Video"): | |
vid_in = gr.Video(label="Video file") | |
max_frames = gr.Slider(1, 12, value=4, step=1, label="Max keyframes to sample") | |
vid_json = gr.JSON(label="Result (duration/fps/transcript/captions)") | |
gr.Button("Process Video").click(video_fn, [user_id, vid_in, max_frames], vid_json) | |
with gr.Tab("๐ File"): | |
file_in = gr.File(label="Upload file (pdf/docx/txt/csv)", type="filepath") | |
file_json = gr.JSON(label="Summary") | |
gr.Button("Process File").click(file_fn, [user_id, file_in], file_json) | |
with gr.Tab("๐จโ๐ป Code"): | |
cc_prompt = gr.Textbox(label="Completion prompt") | |
cc_tokens = gr.Slider(16, 1024, value=256, step=16, label="Max tokens") | |
cc_temp = gr.Slider(0.0, 1.0, value=0.2, step=0.05, label="Temperature") | |
cc_out = gr.Code(label="Completion") | |
gr.Button("Complete").click(code_complete_fn, [cc_prompt, cc_tokens, cc_temp], cc_out) | |
ce_code = gr.Code(label="Execute Python (sandboxed, time-limited)") | |
ce_timeout = gr.Slider(1, 10, value=5, step=1, label="Timeout (s)") | |
ce_out = gr.Code(label="Exec output") | |
gr.Button("Run Code").click(code_exec_fn, [ce_code, ce_timeout], ce_out) | |
# Make API-callable and Space-visible | |
demo.queue(concurrency_count=2, max_size=32).launch(server_name="0.0.0.0") |