File size: 8,178 Bytes
e003942
 
02f51d7
e003942
 
 
02f51d7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
4c7150a
02f51d7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
import os
import asyncio
import shutil
import gradio as gr
from multimodal_module import MultiModalChatModule

# Optional: keep model cache persistent across restarts
os.makedirs("model_cache", exist_ok=True)
os.environ.setdefault("HF_HUB_DISABLE_TELEMETRY", "1")
os.environ.setdefault("HF_HUB_ENABLE_HF_TRANSFER", "1")
os.environ.setdefault("TOKENIZERS_PARALLELISM", "false")

mm = MultiModalChatModule()

# --- Small wrapper so Gradio file paths work with your module's .download_to_drive API ---
class _GradioFile:
    def __init__(self, path: str | None):
        self.path = path
    async def download_to_drive(self, dest: str):
        if not self.path:
            raise ValueError("No file path provided.")
        shutil.copy(self.path, dest)

# -------------------------
# TEXT CHAT
# -------------------------
async def chat_fn(user_id: str, message: str, lang: str):
    uid = int(user_id or "1")
    message = message or ""
    lang = (lang or "en").strip()
    return await mm.generate_response(message, uid, lang=lang)

# -------------------------
# TTS (generate_voice_reply)
# -------------------------
async def tts_fn(user_id: str, text: str, fmt: str):
    uid = int(user_id or "1")
    out_path = await mm.generate_voice_reply(text or "", user_id=uid, fmt=fmt)
    # Gradio expects the file path for Audio/Image outputs
    return out_path

# -------------------------
# VOICE -> TEXT (+emotion)
# -------------------------
async def voice_fn(user_id: str, audio_path: str | None):
    uid = int(user_id or "1")
    if not audio_path:
        return {"text": "", "language": "en", "emotion": "no_audio", "is_speech": False}
    result = await mm.process_voice_message(_GradioFile(audio_path), user_id=uid)
    return result

# -------------------------
# IMAGE: caption
# -------------------------
async def img_caption_fn(user_id: str, image_path: str | None):
    uid = int(user_id or "1")
    if not image_path:
        return "No image provided."
    caption = await mm.process_image_message(_GradioFile(image_path), user_id=uid)
    return caption

# -------------------------
# IMAGE: text2img
# -------------------------
async def img_generate_fn(user_id: str, prompt: str, width: int, height: int, steps: int):
    uid = int(user_id or "1")
    img_path = await mm.generate_image_from_text(prompt or "", user_id=uid, width=width, height=height, steps=steps)
    return img_path

# -------------------------
# IMAGE: inpaint
# -------------------------
async def img_inpaint_fn(user_id: str, image_path: str | None, mask_path: str | None, prompt: str):
    uid = int(user_id or "1")
    if not image_path:
        return None
    out_path = await mm.edit_image_inpaint(
        _GradioFile(image_path),
        _GradioFile(mask_path) if mask_path else None,
        prompt=prompt or "",
        user_id=uid,
    )
    return out_path

# -------------------------
# VIDEO: process
# -------------------------
async def video_fn(user_id: str, video_path: str | None, max_frames: int):
    uid = int(user_id or "1")
    if not video_path:
        return {"duration": 0, "fps": 0, "transcription": "", "captions": []}
    result = await mm.process_video(_GradioFile(video_path), user_id=uid, max_frames=max_frames)
    return result

# -------------------------
# FILE: process (pdf/docx/txt/csv)
# -------------------------
async def file_fn(user_id: str, file_path: str | None):
    uid = int(user_id or "1")
    if not file_path:
        return {"summary": "", "length": 0, "type": ""}
    result = await mm.process_file(_GradioFile(file_path), user_id=uid)
    return result

# -------------------------
# CODE: complete
# -------------------------
async def code_complete_fn(prompt: str, max_tokens: int, temperature: float):
    return await mm.code_complete(prompt or "", max_tokens=max_tokens, temperature=temperature)

# -------------------------
# CODE: execute (DANGEROUS)
# -------------------------
async def code_exec_fn(code: str, timeout: int):
    # Your module already time-limits; still, treat as unsafe
    result = await mm.execute_python_code(code or "", timeout=timeout)
    # Present nicely
    if "error" in result:
        return f"ERROR: {result['error']}"
    out = []
    if result.get("stdout"):
        out.append(f"[stdout]\n{result['stdout']}")
    if result.get("stderr"):
        out.append(f"[stderr]\n{result['stderr']}")
    return "\n".join(out).strip() or "(no output)"

with gr.Blocks(title="Multimodal Space") as demo:
    gr.Markdown("# ๐Ÿ”ฎ Multimodal Space")
    with gr.Row():
        user_id = gr.Textbox(label="User ID", value="1", scale=1)
        lang = gr.Textbox(label="Language code (e.g., en, fr, es)", value="en", scale=1)

    with gr.Tab("๐Ÿ’ฌ Chat"):
        msg_in = gr.Textbox(label="Message")
        msg_out = gr.Textbox(label="Response", interactive=False)
        gr.Button("Send").click(chat_fn, [user_id, msg_in, lang], msg_out)

    with gr.Tab("๐Ÿ—ฃ๏ธ Voice โ†’ Text (+ Emotion)"):
        audio_in = gr.Audio(sources=["microphone", "upload"], type="filepath", label="Upload/record voice (ogg/wav/mp3)")
        voice_json = gr.JSON(label="Result")
        gr.Button("Transcribe & Analyze").click(voice_fn, [user_id, audio_in], voice_json)

    with gr.Tab("๐Ÿ”Š TTS"):
        tts_text = gr.Textbox(label="Text to speak")
        tts_fmt = gr.Dropdown(choices=["ogg", "wav", "mp3"], value="ogg", label="Format")
        tts_audio = gr.Audio(label="Generated Audio", interactive=False)
        gr.Button("Generate Voice Reply").click(tts_fn, [user_id, tts_text, tts_fmt], tts_audio)

    with gr.Tab("๐Ÿ–ผ๏ธ Image Caption"):
        img_in = gr.Image(type="filepath", label="Image")
        caption_out = gr.Textbox(label="Caption", interactive=False)
        gr.Button("Caption").click(img_caption_fn, [user_id, img_in], caption_out)

    with gr.Tab("๐ŸŽจ Text โ†’ Image"):
        ti_prompt = gr.Textbox(label="Prompt")
        with gr.Row():
            ti_w = gr.Slider(256, 768, value=512, step=64, label="Width")
            ti_h = gr.Slider(256, 768, value=512, step=64, label="Height")
            ti_steps = gr.Slider(10, 50, value=30, step=1, label="Steps")
        ti_out = gr.Image(label="Generated Image", interactive=False, type="filepath")
        gr.Button("Generate").click(img_generate_fn, [user_id, ti_prompt, ti_w, ti_h, ti_steps], ti_out)

    with gr.Tab("๐Ÿฉน Inpaint"):
        base_img = gr.Image(type="filepath", label="Base image")
        mask_img = gr.Image(type="filepath", label="Mask (white = keep, black = edit)", optional=True)
        inpaint_prompt = gr.Textbox(label="Prompt")
        inpaint_out = gr.Image(label="Edited Image", interactive=False, type="filepath")
        gr.Button("Inpaint").click(img_inpaint_fn, [user_id, base_img, mask_img, inpaint_prompt], inpaint_out)

    with gr.Tab("๐ŸŽž๏ธ Video"):
        vid_in = gr.Video(label="Video file")
        max_frames = gr.Slider(1, 12, value=4, step=1, label="Max keyframes to sample")
        vid_json = gr.JSON(label="Result (duration/fps/transcript/captions)")
        gr.Button("Process Video").click(video_fn, [user_id, vid_in, max_frames], vid_json)

    with gr.Tab("๐Ÿ“„ File"):
        file_in = gr.File(label="Upload file (pdf/docx/txt/csv)", type="filepath")
        file_json = gr.JSON(label="Summary")
        gr.Button("Process File").click(file_fn, [user_id, file_in], file_json)

    with gr.Tab("๐Ÿ‘จโ€๐Ÿ’ป Code"):
        cc_prompt = gr.Textbox(label="Completion prompt")
        cc_tokens = gr.Slider(16, 1024, value=256, step=16, label="Max tokens")
        cc_temp = gr.Slider(0.0, 1.0, value=0.2, step=0.05, label="Temperature")
        cc_out = gr.Code(label="Completion")
        gr.Button("Complete").click(code_complete_fn, [cc_prompt, cc_tokens, cc_temp], cc_out)

        ce_code = gr.Code(label="Execute Python (sandboxed, time-limited)")
        ce_timeout = gr.Slider(1, 10, value=5, step=1, label="Timeout (s)")
        ce_out = gr.Code(label="Exec output")
        gr.Button("Run Code").click(code_exec_fn, [ce_code, ce_timeout], ce_out)

# Make API-callable and Space-visible
demo.queue(concurrency_count=2, max_size=32).launch(server_name="0.0.0.0")