File size: 20,423 Bytes
453ed2e
aa0d34a
1a833ba
aa0d34a
 
 
 
6914f7a
aa0d34a
 
 
 
 
 
 
 
 
 
 
 
 
 
453ed2e
 
aa0d34a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
453ed2e
aa0d34a
 
 
 
a29e3ba
aa0d34a
 
 
7391723
aa0d34a
 
 
 
 
 
 
 
 
 
 
c000f9c
 
aa0d34a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
e07df8b
aa0d34a
b770306
aa0d34a
 
32c01db
aa0d34a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c000f9c
aa0d34a
 
 
 
 
 
c000f9c
453ed2e
 
aa0d34a
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
import gradio as gr
import os
import random
import httpx
import asyncio
from dataclasses import dataclass, field
from typing import Any

# 常量定义
HTTP_STATUS_CENSORED = 451
HTTP_STATUS_OK = 200
MAX_SEED = 2147483647 # (2**31 - 1)
MAX_IMAGE_SIZE = 2048
MIN_IMAGE_SIZE = 256 # Smallest dimension for SDXL like models often 512, but API might support smaller. Adjusted to API's limits.

# 调试模式
DEBUG_MODE = os.environ.get("DEBUG_MODE", "false").lower() == "true"

# 模型配置映射
MODEL_CONFIGS = {
    "ep3": "ep3.pth",
    "ep3latest": "ep3latest.pth"
}

def validate_dimensions(width: int, height: int) -> tuple[int, int]:
    """验证并调整图片尺寸"""
    width = max(MIN_IMAGE_SIZE, min(int(width), MAX_IMAGE_SIZE))
    height = max(MIN_IMAGE_SIZE, min(int(height), MAX_IMAGE_SIZE))
    width = (width // 32) * 32
    height = (height // 32) * 32
    return width, height

@dataclass
class LuminaConfig:
    """Lumina模型配置"""
    model_name: str | None = None
    cfg: float | None = None
    step: int | None = None

@dataclass
class ImageGenerationConfig:
    """图像生成配置"""
    prompts: list[dict[str, Any]] = field(default_factory=list)
    width: int = 1024
    height: int = 1024
    seed: int | None = None
    use_polish: bool = False # This wasn't exposed in UI, assuming false
    is_lumina: bool = True
    lumina_config: LuminaConfig = field(default_factory=LuminaConfig)

class ImageClient:
    """图像生成客户端"""
    def __init__(self) -> None:
        self.x_token = os.environ.get("API_TOKEN", "")
        if not self.x_token:
            print("Warning: API_TOKEN environment variable not set. Using a placeholder. API calls will likely fail.")
            self.x_token = "YOUR_API_TOKEN_PLACEHOLDER" # Placeholder for app to load

        self.lumina_api_url = "https://ops.api.talesofai.cn/v3/make_image"
        self.lumina_task_status_url = "https://ops.api.talesofai.cn/v1/artifact/task/{task_uuid}"
        self.max_polling_attempts = 100 
        self.polling_interval = 3.0     
        self.default_headers = {
            "Content-Type": "application/json",
            "x-platform": "nieta-app/web", # Or a generic identifier if preferred
            "X-Token": self.x_token,
        }

    def _prepare_prompt_data(self, prompt: str, negative_prompt: str = "") -> list[dict[str, Any]]:
        prompts_data = [{"type": "freetext", "value": prompt, "weight": 1.0}]
        if negative_prompt:
            prompts_data.append({"type": "freetext", "value": negative_prompt, "weight": -1.0})
        prompts_data.append({
            "type": "elementum", "value": "b5edccfe-46a2-4a14-a8ff-f4d430343805",
            "uuid": "b5edccfe-46a2-4a14-a8ff-f4d430343805", "weight": 1.0, "name": "lumina1",
            "img_url": "https://oss.talesofai.cn/picture_s/1y7f53e6itfn_0.jpeg",
            "domain": "", "parent": "", "label": None, "sort_index": 0, "status": "IN_USE",
            "polymorphi_values": {}, "sub_type": None,
        })
        return prompts_data

    def _build_payload(self, config: ImageGenerationConfig) -> dict[str, Any]:
        payload = {
            "storyId": "", "jobType": "universal", "width": config.width, "height": config.height,
            "rawPrompt": config.prompts, "seed": config.seed, "meta": {"entrance": "PICTURE,PURE"},
            "context_model_series": None, "negative_freetext": "", # Negative handled in rawPrompt
            "advanced_translator": config.use_polish,
        }
        if config.is_lumina:
            client_args = {}
            if config.lumina_config.model_name: client_args["ckpt_name"] = config.lumina_config.model_name
            if config.lumina_config.cfg is not None: client_args["cfg"] = str(config.lumina_config.cfg)
            if config.lumina_config.step is not None: client_args["steps"] = str(config.lumina_config.step)
            if client_args: payload["client_args"] = client_args
        return payload

    async def _poll_task_status(self, task_uuid: str, progress: gr.Progress | None = None) -> dict[str, Any]:
        status_url = self.lumina_task_status_url.format(task_uuid=task_uuid)
        last_status_message = ""
        async with httpx.AsyncClient(timeout=30.0) as client: # Timeout for individual poll request
            for attempt in range(self.max_polling_attempts):
                if progress:
                    progress(attempt / self.max_polling_attempts, desc=f"Polling task status ({attempt+1}/{self.max_polling_attempts})... {last_status_message}")

                try:
                    response = await client.get(status_url, headers=self.default_headers)
                    response.raise_for_status() # Will raise HTTPError for 4xx/5xx
                    result = response.json()
                except httpx.HTTPStatusError as e:
                    return {"success": False, "error": f"获取任务状态失败: {e.response.status_code} - {e.response.text}"}
                except httpx.RequestError as e: # Catches network errors, timeouts for this specific request
                    return {"success": False, "error": f"网络请求错误: {str(e)}"}
                except Exception as e: # Catch JSON parsing errors or other unexpected issues
                    return {"success": False, "error": f"任务状态响应处理失败: {str(e)}"}

                task_status = result.get("task_status")
                last_status_message = f"Status: {task_status}"
                if DEBUG_MODE: print(f"DEBUG: Poll {attempt+1}, Task {task_uuid}, Status: {task_status}, Result: {result}")

                if task_status == "SUCCESS":
                    artifacts = result.get("artifacts", [])
                    if artifacts and "url" in artifacts[0]:
                        return {"success": True, "image_url": artifacts[0]["url"]}
                    return {"success": False, "error": "任务成功但未找到图像URL。"}
                elif task_status in ["FAILURE", "ILLEGAL_IMAGE", "TIMEOUT"]:
                    error_msg = result.get("error", f"任务失败,状态: {task_status}")
                    if "error_message" in result: error_msg = result["error_message"] # API specific field
                    return {"success": False, "error": error_msg}
                
                # For PENDING, RUNNING, QUEUED, or unknown statuses, continue polling
                await asyncio.sleep(self.polling_interval)
        return {"success": False, "error": "⏳ 生图任务轮询超时(5分钟),请稍后重试。"}

    async def generate_image(self, prompt_str: str, negative_prompt_str: str, seed_val: int, width_val: int, height_val: int, cfg_val: float, steps_val: int, model_name_str: str = "ep3", progress: gr.Progress | None = None) -> tuple[str | None, str | None]:
        if not self.x_token or self.x_token == "YOUR_API_TOKEN_PLACEHOLDER":
            return None, "API_TOKEN未配置。请在环境变量中设置API_TOKEN以使用此功能。"
        try:
            if progress: progress(0.05, desc="准备请求...")
            model_path = MODEL_CONFIGS.get(model_name_str, MODEL_CONFIGS["ep3"])
            config = ImageGenerationConfig(
                prompts=self._prepare_prompt_data(prompt_str, negative_prompt_str),
                width=width_val, height=height_val, seed=seed_val,
                lumina_config=LuminaConfig(model_name=model_path, cfg=cfg_val, step=steps_val)
            )
            payload = self._build_payload(config)
            if DEBUG_MODE: print(f"DEBUG: API Payload: {payload}, Headers: {self.default_headers}")

            if progress: progress(0.1, desc="发送生成请求...")
            async with httpx.AsyncClient(timeout=60.0) as client: # Timeout for initial POST request
                response = await client.post(self.lumina_api_url, json=payload, headers=self.default_headers)

            if DEBUG_MODE: print(f"DEBUG: API Initial Response: {response.status_code}, {response.text[:500]}")

            if response.status_code == HTTP_STATUS_CENSORED: return None, "内容不合规,请修改提示词。"
            if response.status_code == 433: return None, "⏳ 服务器繁忙(达到并发上限),请稍后重试。"
            
            try:
                response.raise_for_status() # Check for other HTTP errors
                task_uuid = response.text.strip().replace('"', "")
                if not task_uuid or len(task_uuid) < 10: # Basic UUID validation
                    return None, f"未能获取有效的任务ID。API响应: {response.text[:200]}"
            except httpx.HTTPStatusError as e:
                err_text = e.response.text
                try: err_json = e.response.json(); err_text = err_json.get("message", err_text) 
                except: pass
                return None, f"API请求失败: {e.response.status_code} - {err_text[:200]}"


            if progress: progress(0.2, desc=f"任务已提交 (ID: {task_uuid[:8]}...), 开始轮询状态...")
            poll_result = await self._poll_task_status(task_uuid, progress)
            if poll_result["success"]:
                if progress: progress(1, desc="图片生成成功!")
                return poll_result["image_url"], None
            else:
                return None, poll_result["error"]
        except httpx.TimeoutException:
            return None, "API请求超时,请检查网络连接或稍后再试。"
        except httpx.RequestError as e:
            return None, f"网络请求错误: {str(e)}"
        except Exception as e:
            if DEBUG_MODE: import traceback; traceback.print_exc()
            return None, f"生成图片时发生意外错误: {str(e)}"

# Initialize client
try:
    image_client = ImageClient()
except Exception as e: # Catch any init error
    print(f"Failed to initialize ImageClient: {e}")
    image_client = None 

# Example prompts
example_titles = [
    "A stylized female demon with red hair and glitch effects",
    "A young man relaxes on a hazy urban rooftop", 
    "A gentle, freckled girl embraces a goat in a meadow"
]
full_prompts = {
    example_titles[0]: "Stylized anime illustration of a female demon or supernatural character with vibrant red hair in twintails/pigtails and glowing purple eyes. Character has black horns and features bandage-like cross markings on face. Subject wears a black sleeveless top and holds a pink bubblegum or candy sphere near mouth. Digital glitch effects create pixelated elements in her hair and around background. Dramatic lighting with stark white/black contrasting background featuring cracks or lightning patterns. Character has gold/yellow accessories including bracelets and hair decorations. Modern anime art style with sharp contrast and vivid colors. Portrait composition showing three-quarter view of character with confident or playful expression. Color palette dominated by reds, blacks, whites, purple and pink accents. Surreal or otherworldly atmosphere enhanced by particle effects and lighting. Professional digital illustration combining traditional anime aesthetics with contemporary glitch art elements. Character design suggests edgy or alternative styling with possible cyberpunk or modern demon girl influences.",
    example_titles[1]: "Atmospheric anime illustration of young man with messy brown hair on urban rooftop during overcast day. Character wears white dress shirt and dark trousers, leaning back against railing while holding canned drink. Scene set on building rooftop with industrial elements like water tower, power lines, and metal structures visible. Cityscape background shows apartment buildings and urban architecture through soft hazy lighting. Subject has relaxed pose suggesting brief break or moment of contemplation. Color palette uses muted whites, grays, and industrial tones creating realistic urban atmosphere. Art style combines detailed architectural elements with soft, painterly technique. Composition emphasizes vertical lines of city buildings and metal structures. Professional digital artwork capturing slice-of-life moment in urban setting. Scene suggests peaceful solitude amid busy city environment. Lighting creates gentle, overcast mood with subtle shadows and highlights. Character design and setting reflect contemporary Japanese salary-man or office worker aesthetic.",
    example_titles[2]: "Enchanting anime illustration of a gentle, freckled girl with long, wavy orange hair and elegant ram horns, tenderly embracing a white baby goat in a sunlit meadow. The composition is a close-up, focusing on the upper body and faces of both the girl and the goat, capturing an intimate and heartwarming moment. She wears a vintage-inspired dress with a high collar, puffed sleeves, and a delicate white headband, adorned with golden ribbons and lace details. The sunlight bathes the scene in warm, golden tones, casting soft shadows and creating a dreamy, pastoral atmosphere. The background is filled with lush green grass and scattered white flowers, enhancing the idyllic countryside setting. The art style is painterly and vibrant, with expressive brushwork and a focus on light and texture, evoking a sense of peace, innocence, and connection with nature."
}

async def infer(
    prompt_text, seed_val, randomize_seed_val, width_val, height_val,
    cfg_val, steps_val, model_name_val, progress=gr.Progress(track_tqdm=True)
):
    if image_client is None:
        raise gr.Error("ImageClient 未正确初始化。请检查应用日志和API_TOKEN配置。")
    if not prompt_text.strip():
        raise gr.Error("提示词不能为空。请输入您想生成的图像描述。")

    current_seed = int(seed_val)
    if randomize_seed_val:
        current_seed = random.randint(0, MAX_SEED)

    width_val, height_val = validate_dimensions(width_val, height_val)

    if not (1.0 <= float(cfg_val) <= 20.0): raise gr.Error("CFG Scale 必须在 1.0 到 20.0 之间。")
    if not (1 <= int(steps_val) <= 50): raise gr.Error("Steps 必须在 1 到 50 之间。")

    progress(0, desc="开始生成...")
    image_url, error = await image_client.generate_image(
        prompt_str=prompt_text, negative_prompt_str="", # Negative prompt not exposed, can be added
        seed_val=current_seed, width_val=width_val, height_val=height_val,
        cfg_val=float(cfg_val), steps_val=int(steps_val), model_name_str=model_name_val,
        progress=progress
    )

    if error:
        # Check if the error is already user-friendly, if not, provide a generic one
        if "API请求失败" in error or "内容不合规" in error or "服务器繁忙" in error or "任务轮询超时" in error or "API_TOKEN" in error:
            raise gr.Error(error)
        else:
            # For less clear errors, provide a generic message and log the detail if in debug mode
            if DEBUG_MODE: print(f"Internal error during image generation: {error}")
            raise gr.Error(f"图片生成失败: {error}. 请稍后再试或检查提示词。")


    return image_url, current_seed


# Links for HTML header
DISCORD_LINK = os.environ.get("DISCORD_LINK", "https://discord.gg/your-community") # Example
APP_INDEX_LINK = os.environ.get("APP_INDEX_LINK", "https://huggingface.co/spaces") # Example
APP_INDEX_ICON = "https://huggingface.co/front/assets/huggingface_logo-noborder.svg" # Using HF logo


with gr.Blocks(theme=gr.themes.Soft(), title="Lumina Image Playground") as demo:
    gr.HTML(f"""
        <div style="display: flex; justify-content: flex-end; align-items: center; gap: 15px; margin-bottom: 10px; padding: 5px;">
            <a href="{DISCORD_LINK}" target="_blank" style="text-decoration: none; color: #5865F2; font-weight: 500; display: inline-flex; align-items: center; gap: 5px;">
                <img src="https://assets-global.website-files.com/6257adef93867e50d84d30e2/636e0a69f118df70ad7828d4_icon_clyde_blurple_RGB.svg" alt="Discord" style="height: 20px;">
                Join Discord
            </a>
            <a href="{APP_INDEX_LINK}" target="_blank" style="text-decoration: none; color: #333; font-weight: 500; display: inline-flex; align-items: center; gap: 5px;">
                <img src="{APP_INDEX_ICON}" alt="App Index" style="height: 20px; border-radius: 3px;">
                More Apps
            </a>
        </div>
    """)

    gr.Markdown("<h1>🎨 Lumina Text-to-Image Playground</h1>")
    gr.Markdown("Describe your vision and let the AI bring it to life! Uses an external API for image generation.")

    with gr.Row(variant="panel"):
        with gr.Column(scale=2): # Controls Panel
            gr.Markdown("## ⚙️ Generation Controls")
            prompt = gr.Textbox(
                label="Prompt", lines=5,
                placeholder="e.g., A majestic dragon soaring through a cyberpunk city skyline, neon lights reflecting off its scales, intricate details.",
                info="Describe the image you want to create."
            )
            
            with gr.Accordion("🔧 Advanced Settings", open=True):
                model_name = gr.Dropdown(
                    label="Model Version", choices=list(MODEL_CONFIGS.keys()), value="ep3",
                    info="Select the generation model."
                )
                with gr.Row():
                    cfg = gr.Slider(label="CFG Scale", minimum=1.0, maximum=20.0, step=0.1, value=5.5, info="Guidance strength. Higher values adhere more to prompt.")
                    steps = gr.Slider(label="Sampling Steps", minimum=1, maximum=50, step=1, value=30, info="Number of steps. More steps can improve quality but take longer.")
                
                with gr.Row():
                    width = gr.Slider(label="Width", minimum=MIN_IMAGE_SIZE, maximum=MAX_IMAGE_SIZE, step=32, value=1024)
                    height = gr.Slider(label="Height", minimum=MIN_IMAGE_SIZE, maximum=MAX_IMAGE_SIZE, step=32, value=1024)
                
                with gr.Row():
                    seed = gr.Slider(label="Seed", minimum=0, maximum=MAX_SEED, step=1, value=random.randint(0, MAX_SEED))
                    randomize_seed = gr.Checkbox(label="Randomize Seed", value=True, info="Use a new random seed for each generation if checked.")
            
            run_button = gr.Button("🚀 Generate Image", variant="primary", scale=0) # scale=0 for button to not take full width in some cases if alone
            
            with gr.Group():
                 gr.Markdown("### ✨ Example Prompts")
                 for i, title in enumerate(example_titles):
                     btn = gr.Button(title)
                     btn.click(lambda t=title: full_prompts[t], outputs=[prompt])


        with gr.Column(scale=3): # Output Panel
            gr.Markdown("## 🖼️ Generated Image")
            result_image = gr.Image(
                label="Output Image", show_label=False, type="filepath",
                height=600, # Max display height
                show_download_button=True, interactive=False,
                elem_id="result_image_display" # for potential CSS targeting if needed
            )
            generated_seed_info = gr.Textbox(label="Seed Used", interactive=False, placeholder="The seed for the generated image will appear here.")

    # Event Handlers
    inputs_list = [prompt, seed, randomize_seed, width, height, cfg, steps, model_name]
    outputs_list = [result_image, generated_seed_info]

    run_button.click(fn=infer, inputs=inputs_list, outputs=outputs_list, api_name="generate_image")
    prompt.submit(fn=infer, inputs=inputs_list, outputs=outputs_list, api_name="generate_image_submit")


if __name__ == "__main__":
    if DEBUG_MODE:
        print("DEBUG_MODE is enabled.")
    if not os.environ.get("API_TOKEN"):
        print("**************************************************************************************")
        print("WARNING: API_TOKEN environment variable is not set locally.")
        print("The application will run, but image generation will fail until API_TOKEN is provided.")
        print("You can set it by running: export API_TOKEN='your_actual_token_here'")
        print("Or if using a .env file, ensure it's loaded or API_TOKEN is set in your run config.")
        print("**************************************************************************************")
    
    demo.launch(debug=DEBUG_MODE, show_error=True)