File size: 12,483 Bytes
723683f
 
 
99397a6
 
723683f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
99397a6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1862389
5e1c9fb
1862389
 
 
 
9a354ff
5e1c9fb
9a354ff
 
 
 
 
 
 
 
 
 
5e1c9fb
9a354ff
 
 
 
5e1c9fb
9a354ff
 
 
5e1c9fb
9a354ff
 
5e1c9fb
9a354ff
5e1c9fb
9a354ff
 
 
 
 
 
 
 
 
 
 
5e1c9fb
9a354ff
5e1c9fb
9a354ff
 
 
 
 
5e1c9fb
9a354ff
5e1c9fb
9a354ff
5e1c9fb
9a354ff
 
5e1c9fb
 
9a354ff
723683f
 
9a354ff
 
723683f
9a354ff
 
723683f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9a354ff
 
723683f
 
 
 
 
9a354ff
 
723683f
 
9a354ff
 
723683f
 
9a354ff
723683f
 
 
 
9a354ff
723683f
 
 
9a354ff
 
723683f
 
 
 
 
 
9a354ff
 
723683f
 
99397a6
 
9a354ff
723683f
 
 
 
 
 
 
 
99397a6
 
9a354ff
723683f
99397a6
 
9a354ff
723683f
9a354ff
 
 
723683f
9a354ff
 
723683f
99397a6
 
9a354ff
 
723683f
 
 
9a354ff
723683f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
5e1c9fb
9a354ff
1862389
 
 
 
 
 
 
 
 
 
 
9a354ff
 
5063ec4
5e1c9fb
 
9a354ff
 
 
 
 
5e1c9fb
9a354ff
 
5e1c9fb
 
9a354ff
 
 
5e1c9fb
 
 
9a354ff
723683f
9a354ff
 
5e1c9fb
9a354ff
 
1862389
9a354ff
 
723683f
 
 
 
 
 
 
dc81eda
 
723683f
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
import cv2
from mtcnn.mtcnn import MTCNN
from utils import *
import requests
from urllib.parse import urlparse


cloth_examples = get_cloth_examples(hr=0)
cloth_hr_examples = get_cloth_examples(hr=1)
pose_examples = get_pose_examples()
tip1, tip2 = get_tips()
face_detector = MTCNN()

# Description
title = r"""
<h1 align="center">Outfit Anyway: Best customer try-on You ever See</h1>
"""

description = r"""
<b>Join discord to know more about </b> <a href='https://discord.com/invite/QgJWCtSG58' target='_blank'><b> heybeauty prebuy vton solution</b></a>.<br>
"""


def is_http_resource_accessible(url):
    """
    Check if HTTP resource is accessible
    Returns True if accessible, False otherwise
    """
    if not url or not isinstance(url, str):
        return False
    
    try:
        # Parse URL to check if it's a valid HTTP/HTTPS URL
        parsed = urlparse(url)
        if parsed.scheme not in ['http', 'https']:
            return False
        
        # Make a HEAD request to check if resource exists
        response = requests.head(url, timeout=5, allow_redirects=True)
        return response.status_code == 200
    except Exception as e:
        print(f"Error checking resource accessibility: {e}")
        return False


def onPoseChange(prompt_text, source_image, token_input, request: gr.Request):
    """Handle pose change request"""
    # Check token first
    if token_input != POSEToken:
        return "please input the correct token!", None, None, None
    
    if source_image is None:
        return "Please provide source image first!", None, None, None
    
    if not prompt_text or prompt_text.strip() == "":
        prompt_text = "Change the pose: two hands on hips.#Change the pose: arms extended to show outfit."
    
    try:
        client_ip = request.client.host
        x_forwarded_for = dict(request.headers).get('x-forwarded-for')
        if x_forwarded_for:
            client_ip = x_forwarded_for
        
        # If source_image is a local file path, upload it first
        if isinstance(source_image, str) and not source_image.startswith('http'):
            timeId = int(str(time.time()).replace(".", "")) + random.randint(1000, 9999)
            image_url = upload_pose_img(client_ip, timeId, source_image)
            if not image_url:
                return "Image upload failed!", None, None, None
        else:
            image_url = source_image
        
        # Initiate pose change request
        pose_result = public_pose_changer(image_url, prompt_text)
        if pose_result is None:
            return "Pose change request failed!", None, None, None
        
        # Poll for results
        max_try = 120
        wait_s = 1
        for i in range(max_try):
            time.sleep(wait_s)
            result = get_pose_changer_res(pose_result['id'])
            
            if result is None:
                continue
            elif result['status'] == 'PROCESSING':
                continue
            elif result['status'] == 'SUCCEED':
                # Extract the first 3 valid output images
                output_images = [None, None, None]
                for j in range(1, 4):  # output1 to output3
                    output_key = f'output{j}'
                    if output_key in result and result[output_key] and result[output_key].strip():
                        timestamp = int(time.time() * 1000)
                        output_images[j-1] = result[output_key] + f"?t={timestamp}"
                
                return f"Pose change completed! {result.get('msg', '')}", output_images[0], output_images[1], output_images[2]
            elif result['status'] == 'FAILED':
                return f"Pose change failed: {result.get('msg', '')}", None, None, None
        
        return "Pose change timeout!", None, None, None
        
    except Exception as e:
        print(f"Pose change exception: {e}")
        return f"Processing exception: {str(e)}", None, None, None

def onClick(cloth_image, pose_image, high_resolution, request: gr.Request):
    if pose_image is None:
        yield None, "no pose image found !", "", None
        return None, "no pose image found !", "", None
    if cloth_image is None:
        yield None, "no cloth image found !", "", None
        return None, "no cloth image found !", "", None

    pose_id = os.path.basename(pose_image).split(".")[0]
    cloth_id = int(os.path.basename(cloth_image).split(".")[0])
            
    try:

        client_ip = request.client.host
        x_forwarded_for = dict(request.headers).get('x-forwarded-for')
        if x_forwarded_for:
            client_ip = x_forwarded_for
            
        pose_np = cv2.imread(pose_image)
        faces = face_detector.detect_faces(pose_np[:,:,::-1])
        if len(faces)==0:
            print(client_ip, 'faces num is 0! ', flush=True)
            yield None, "Fatal Error !!! No face detected !!! You must upload a human photo!!! Not clothing photo!!!", "", None
            return None, "Fatal Error !!! No face detected !!! You must upload a human photo!!! Not clothing photo!!!", "", None
        else:
            x, y, w, h = faces[0]["box"]
            H, W = pose_np.shape[:2]
            max_face_ratio = 1/3.3
            if w/W>max_face_ratio or h/H>max_face_ratio:
                yield None, "Fatal Error !!! Headshot is not allowed !!! You must upload a full-body or half-body photo!!!", "", None
                return None, "Fatal Error !!! Headshot is not allowed !!! You must upload a full-body or half-body photo!!!", "", None

        if not check_region_warp(client_ip):
            yield None, "Failed !!! Our server is under maintenance, please try again later", "", None
            return None, "Failed !!! Our server is under maintenance, please try again later", "", None
            
        # client_ip = '8.8.8.8'
        yield None, "begin to upload ", "", None

        timeId = int(  str(time.time()).replace(".", "")  )+random.randint(1000, 9999)
        upload_url = upload_pose_img(client_ip, timeId, pose_image)
        # exit(0)
        yield None, "begin to public task ", "", None
        # return None, "begin to public task ", ""
        
        if len(upload_url)==0:
            yield None, "fail to upload", "", None
            return None, "fail to upload", "", None

        if high_resolution:
            public_res = publicClothSwap(upload_url, cloth_id, is_hr=1)
        else:
            public_res = publicClothSwap(upload_url, cloth_id, is_hr=0)
        if public_res is None:
            yield None, "fail to public you task", "", None
            return None, "fail to public you task", "", None

        print(client_ip, public_res['mid_result'])
        # Check if mid_result resource is accessible
        mid_result = public_res['mid_result'] if is_http_resource_accessible(public_res['mid_result']) else None
        yield mid_result, f"task is processing, task id: {public_res['id']}, {public_res['msg']}", "", mid_result

        max_try = 120*3
        wait_s = 0.5
        for i in range(max_try):
            time.sleep(wait_s)
            state = getInfRes(public_res['id'])
            timestamp = int(time.time() * 1000)
            if state is None:
                mid_result = public_res['mid_result'] if is_http_resource_accessible(public_res['mid_result']) else None
                result_url = mid_result + f"?t={timestamp}" if mid_result else None
                yield result_url, "task query failed,", "", result_url
            elif state['status']=='PROCESSING':
                mid_result = public_res['mid_result'] if is_http_resource_accessible(public_res['mid_result']) else None
                result_url = mid_result + f"?t={timestamp}" if mid_result else None
                yield result_url, f"task is processing, query {i}", "", result_url
            elif state['status']=='SUCCEED':
                result_image = state['output1'] + f"?t={timestamp}"
                yield result_image, f"task finished, {state['msg']}", "", result_image
                return result_image, f"task finished, {state['msg']}", "", result_image
            elif state['status']=='FAILED':
                yield None, f"task failed, {state['msg']}", "", None
                return None, f"task failed, {state['msg']}", "", None
            else:
                mid_result = public_res['mid_result'] if is_http_resource_accessible(public_res['mid_result']) else None
                result_url = mid_result + f"?t={timestamp}" if mid_result else None
                yield result_url, f"task is on processing, query {i}", "", result_url
        return None, "no machine...", "", None
    except Exception as e:
        print(e)
        raise e
        return None, "fail to create task", "", None

with gr.Blocks() as demo:
    gr.Markdown(title)
    gr.Markdown(description)

    with gr.Accordion('upload tips', open=False):
        with gr.Row():
            gr.HTML(f"<img src=\"{tip1}\" >")
            gr.HTML(f"<img src=\"{tip2}\" >")
                    
    with gr.Row():
        with gr.Column():
            cloth_image = gr.Image(value=None, interactive=False, type="filepath", label="choose a clothing")
            example = gr.Examples(inputs=cloth_image,examples_per_page=20,examples=cloth_examples, label="clothing")
            hr_example = gr.Examples(inputs=cloth_image,examples_per_page=9,examples=cloth_hr_examples, label="invalid clothing")
            
        with gr.Column():
            pose_image = gr.Image(value=None, type="filepath", label="choose/upload a photo")
            example_pose = gr.Examples(inputs=pose_image,
                                      examples_per_page=20,
                                      examples=pose_examples)
            
        with gr.Column():
            with gr.Column():
                # size_slider = gr.Slider(-3, 3, value=1, interactive=True, label="clothes size")
                high_resolution = gr.Checkbox(value=False, label="high resolution", interactive=True)
                
                run_button = gr.Button(value="Run")
                info_text = gr.Textbox(value="", interactive=False, 
                    label='runtime information')                
                res_image = gr.Image(label="result image", value=None, type="filepath")
                MK01 = gr.Markdown()

    # Add pose changer module
    with gr.Accordion('pose changer', open=False):
        # Top: token input
        with gr.Row():
            token_input = gr.Textbox(
                value="",
                label="Access Token",
                placeholder="请输入token...",
                type="password",
                scale=1
            )
        
        # Middle: text box and button
        with gr.Row():
            pose_prompt = gr.Textbox(
                value="Change the pose: hands on hips.#Change the pose: arms extended.",
                label="Pose Change Prompt",
                placeholder="Enter pose change description...",
                lines=2,
                scale=4
            )
            change_button = gr.Button(value="Change", scale=1)
        
        # Bottom: source image on left, result images on right
        with gr.Row():
            with gr.Column(scale=1):
                pose_changer_image = gr.Image(value=None, type="filepath", label="Source Image", interactive=True)
                pose_change_info = gr.Textbox(value="", interactive=False, label="Processing Info")
            
            with gr.Column(scale=2):
                with gr.Row():
                    pose_result_1 = gr.Image(label="Result 1", value=None, type="filepath")
                    pose_result_2 = gr.Image(label="Result 2", value=None, type="filepath")
                    pose_result_3 = gr.Image(label="Result 3", value=None, type="filepath")
    
    run_button.click(fn=onClick, inputs=[cloth_image, pose_image, high_resolution], 
                     outputs=[res_image, info_text, MK01, pose_changer_image])
    
    # Bind pose changer change button
    change_button.click(
        fn=onPoseChange,
        inputs=[pose_prompt, pose_changer_image, token_input],
        outputs=[pose_change_info, pose_result_1, pose_result_2, pose_result_3]
    )


if __name__ == "__main__":

    demo.queue(max_size=50)
    # demo.queue(concurrency_count=60)
    # demo.launch(server_name='0.0.0.0', server_port=225)
    # demo.launch(server_name='0.0.0.0')
    demo.launch()