Spaces:
Running
Running
import cv2 | |
from mtcnn.mtcnn import MTCNN | |
from utils import * | |
import requests | |
from urllib.parse import urlparse | |
cloth_examples = get_cloth_examples(hr=0) | |
cloth_hr_examples = get_cloth_examples(hr=1) | |
pose_examples = get_pose_examples() | |
tip1, tip2 = get_tips() | |
face_detector = MTCNN() | |
# Description | |
title = r""" | |
<h1 align="center">Outfit Anyway: Best customer try-on You ever See</h1> | |
""" | |
description = r""" | |
<b>Join discord to know more about </b> <a href='https://discord.com/invite/QgJWCtSG58' target='_blank'><b> heybeauty prebuy vton solution</b></a>.<br> | |
""" | |
def is_http_resource_accessible(url): | |
""" | |
Check if HTTP resource is accessible | |
Returns True if accessible, False otherwise | |
""" | |
if not url or not isinstance(url, str): | |
return False | |
try: | |
# Parse URL to check if it's a valid HTTP/HTTPS URL | |
parsed = urlparse(url) | |
if parsed.scheme not in ['http', 'https']: | |
return False | |
# Make a HEAD request to check if resource exists | |
response = requests.head(url, timeout=5, allow_redirects=True) | |
return response.status_code == 200 | |
except Exception as e: | |
print(f"Error checking resource accessibility: {e}") | |
return False | |
def onPoseChange(prompt_text, source_image, token_input, request: gr.Request): | |
"""Handle pose change request""" | |
# Check token first | |
if token_input != POSEToken: | |
return "please input the correct token!", None, None, None | |
if source_image is None: | |
return "Please provide source image first!", None, None, None | |
if not prompt_text or prompt_text.strip() == "": | |
prompt_text = "Change the pose: two hands on hips.#Change the pose: arms extended to show outfit." | |
try: | |
client_ip = request.client.host | |
x_forwarded_for = dict(request.headers).get('x-forwarded-for') | |
if x_forwarded_for: | |
client_ip = x_forwarded_for | |
# If source_image is a local file path, upload it first | |
if isinstance(source_image, str) and not source_image.startswith('http'): | |
timeId = int(str(time.time()).replace(".", "")) + random.randint(1000, 9999) | |
image_url = upload_pose_img(client_ip, timeId, source_image) | |
if not image_url: | |
return "Image upload failed!", None, None, None | |
else: | |
image_url = source_image | |
# Initiate pose change request | |
pose_result = public_pose_changer(image_url, prompt_text) | |
if pose_result is None: | |
return "Pose change request failed!", None, None, None | |
# Poll for results | |
max_try = 120 | |
wait_s = 1 | |
for i in range(max_try): | |
time.sleep(wait_s) | |
result = get_pose_changer_res(pose_result['id']) | |
if result is None: | |
continue | |
elif result['status'] == 'PROCESSING': | |
continue | |
elif result['status'] == 'SUCCEED': | |
# Extract the first 3 valid output images | |
output_images = [None, None, None] | |
for j in range(1, 4): # output1 to output3 | |
output_key = f'output{j}' | |
if output_key in result and result[output_key] and result[output_key].strip(): | |
timestamp = int(time.time() * 1000) | |
output_images[j-1] = result[output_key] + f"?t={timestamp}" | |
return f"Pose change completed! {result.get('msg', '')}", output_images[0], output_images[1], output_images[2] | |
elif result['status'] == 'FAILED': | |
return f"Pose change failed: {result.get('msg', '')}", None, None, None | |
return "Pose change timeout!", None, None, None | |
except Exception as e: | |
print(f"Pose change exception: {e}") | |
return f"Processing exception: {str(e)}", None, None, None | |
def onClick(cloth_image, pose_image, high_resolution, request: gr.Request): | |
if pose_image is None: | |
yield None, "no pose image found !", "", None | |
return None, "no pose image found !", "", None | |
if cloth_image is None: | |
yield None, "no cloth image found !", "", None | |
return None, "no cloth image found !", "", None | |
pose_id = os.path.basename(pose_image).split(".")[0] | |
cloth_id = int(os.path.basename(cloth_image).split(".")[0]) | |
try: | |
client_ip = request.client.host | |
x_forwarded_for = dict(request.headers).get('x-forwarded-for') | |
if x_forwarded_for: | |
client_ip = x_forwarded_for | |
pose_np = cv2.imread(pose_image) | |
faces = face_detector.detect_faces(pose_np[:,:,::-1]) | |
if len(faces)==0: | |
print(client_ip, 'faces num is 0! ', flush=True) | |
yield None, "Fatal Error !!! No face detected !!! You must upload a human photo!!! Not clothing photo!!!", "", None | |
return None, "Fatal Error !!! No face detected !!! You must upload a human photo!!! Not clothing photo!!!", "", None | |
else: | |
x, y, w, h = faces[0]["box"] | |
H, W = pose_np.shape[:2] | |
max_face_ratio = 1/3.3 | |
if w/W>max_face_ratio or h/H>max_face_ratio: | |
yield None, "Fatal Error !!! Headshot is not allowed !!! You must upload a full-body or half-body photo!!!", "", None | |
return None, "Fatal Error !!! Headshot is not allowed !!! You must upload a full-body or half-body photo!!!", "", None | |
if not check_region_warp(client_ip): | |
yield None, "Failed !!! Our server is under maintenance, please try again later", "", None | |
return None, "Failed !!! Our server is under maintenance, please try again later", "", None | |
# client_ip = '8.8.8.8' | |
yield None, "begin to upload ", "", None | |
timeId = int( str(time.time()).replace(".", "") )+random.randint(1000, 9999) | |
upload_url = upload_pose_img(client_ip, timeId, pose_image) | |
# exit(0) | |
yield None, "begin to public task ", "", None | |
# return None, "begin to public task ", "" | |
if len(upload_url)==0: | |
yield None, "fail to upload", "", None | |
return None, "fail to upload", "", None | |
if high_resolution: | |
public_res = publicClothSwap(upload_url, cloth_id, is_hr=1) | |
else: | |
public_res = publicClothSwap(upload_url, cloth_id, is_hr=0) | |
if public_res is None: | |
yield None, "fail to public you task", "", None | |
return None, "fail to public you task", "", None | |
print(client_ip, public_res['mid_result']) | |
# Check if mid_result resource is accessible | |
mid_result = public_res['mid_result'] if is_http_resource_accessible(public_res['mid_result']) else None | |
yield mid_result, f"task is processing, task id: {public_res['id']}, {public_res['msg']}", "", mid_result | |
max_try = 120*3 | |
wait_s = 0.5 | |
for i in range(max_try): | |
time.sleep(wait_s) | |
state = getInfRes(public_res['id']) | |
timestamp = int(time.time() * 1000) | |
if state is None: | |
mid_result = public_res['mid_result'] if is_http_resource_accessible(public_res['mid_result']) else None | |
result_url = mid_result + f"?t={timestamp}" if mid_result else None | |
yield result_url, "task query failed,", "", result_url | |
elif state['status']=='PROCESSING': | |
mid_result = public_res['mid_result'] if is_http_resource_accessible(public_res['mid_result']) else None | |
result_url = mid_result + f"?t={timestamp}" if mid_result else None | |
yield result_url, f"task is processing, query {i}", "", result_url | |
elif state['status']=='SUCCEED': | |
result_image = state['output1'] + f"?t={timestamp}" | |
yield result_image, f"task finished, {state['msg']}", "", result_image | |
return result_image, f"task finished, {state['msg']}", "", result_image | |
elif state['status']=='FAILED': | |
yield None, f"task failed, {state['msg']}", "", None | |
return None, f"task failed, {state['msg']}", "", None | |
else: | |
mid_result = public_res['mid_result'] if is_http_resource_accessible(public_res['mid_result']) else None | |
result_url = mid_result + f"?t={timestamp}" if mid_result else None | |
yield result_url, f"task is on processing, query {i}", "", result_url | |
return None, "no machine...", "", None | |
except Exception as e: | |
print(e) | |
raise e | |
return None, "fail to create task", "", None | |
with gr.Blocks() as demo: | |
gr.Markdown(title) | |
gr.Markdown(description) | |
with gr.Accordion('upload tips', open=False): | |
with gr.Row(): | |
gr.HTML(f"<img src=\"{tip1}\" >") | |
gr.HTML(f"<img src=\"{tip2}\" >") | |
with gr.Row(): | |
with gr.Column(): | |
cloth_image = gr.Image(value=None, interactive=False, type="filepath", label="choose a clothing") | |
example = gr.Examples(inputs=cloth_image,examples_per_page=20,examples=cloth_examples, label="clothing") | |
hr_example = gr.Examples(inputs=cloth_image,examples_per_page=9,examples=cloth_hr_examples, label="invalid clothing") | |
with gr.Column(): | |
pose_image = gr.Image(value=None, type="filepath", label="choose/upload a photo") | |
example_pose = gr.Examples(inputs=pose_image, | |
examples_per_page=20, | |
examples=pose_examples) | |
with gr.Column(): | |
with gr.Column(): | |
# size_slider = gr.Slider(-3, 3, value=1, interactive=True, label="clothes size") | |
high_resolution = gr.Checkbox(value=False, label="high resolution", interactive=True) | |
run_button = gr.Button(value="Run") | |
info_text = gr.Textbox(value="", interactive=False, | |
label='runtime information') | |
res_image = gr.Image(label="result image", value=None, type="filepath") | |
MK01 = gr.Markdown() | |
# Add pose changer module | |
with gr.Accordion('pose changer', open=False): | |
# Top: token input | |
with gr.Row(): | |
token_input = gr.Textbox( | |
value="", | |
label="Access Token", | |
placeholder="请输入token...", | |
type="password", | |
scale=1 | |
) | |
# Middle: text box and button | |
with gr.Row(): | |
pose_prompt = gr.Textbox( | |
value="Change the pose: hands on hips.#Change the pose: arms extended.", | |
label="Pose Change Prompt", | |
placeholder="Enter pose change description...", | |
lines=2, | |
scale=4 | |
) | |
change_button = gr.Button(value="Change", scale=1) | |
# Bottom: source image on left, result images on right | |
with gr.Row(): | |
with gr.Column(scale=1): | |
pose_changer_image = gr.Image(value=None, type="filepath", label="Source Image", interactive=True) | |
pose_change_info = gr.Textbox(value="", interactive=False, label="Processing Info") | |
with gr.Column(scale=2): | |
with gr.Row(): | |
pose_result_1 = gr.Image(label="Result 1", value=None, type="filepath") | |
pose_result_2 = gr.Image(label="Result 2", value=None, type="filepath") | |
pose_result_3 = gr.Image(label="Result 3", value=None, type="filepath") | |
run_button.click(fn=onClick, inputs=[cloth_image, pose_image, high_resolution], | |
outputs=[res_image, info_text, MK01, pose_changer_image]) | |
# Bind pose changer change button | |
change_button.click( | |
fn=onPoseChange, | |
inputs=[pose_prompt, pose_changer_image, token_input], | |
outputs=[pose_change_info, pose_result_1, pose_result_2, pose_result_3] | |
) | |
if __name__ == "__main__": | |
demo.queue(max_size=50) | |
# demo.queue(concurrency_count=60) | |
# demo.launch(server_name='0.0.0.0', server_port=225) | |
# demo.launch(server_name='0.0.0.0') | |
demo.launch() | |