Wan2.2-S2V / app.py
kelseye's picture
Update app.py
a82a0b1 verified
# app.py
import os
import oss2
import sys
import uuid
import shutil
import time
import gradio as gr
import requests
os.system("pip install dashscope")
import dashscope
from dashscope.utils.oss_utils import check_and_upload_local
DASHSCOPE_API_KEY = os.getenv("DASHSCOPE_API_KEY")
dashscope.api_key = DASHSCOPE_API_KEY
class WanS2VApp:
def __init__(self):
pass
def predict(
self,
ref_img,
audio,
resolution="480P",
style="speech",
):
# Upload files to OSS if needed and get URLs
_, image_url = check_and_upload_local("wan2.2-s2v", ref_img, DASHSCOPE_API_KEY)
_, audio_url = check_and_upload_local("wan2.2-s2v", audio, DASHSCOPE_API_KEY)
# Prepare the request payload
payload = {
"model": "wan2.2-s2v",
"input": {
"image_url": image_url,
"audio_url": audio_url
},
"parameters": {
"style": style,
"resolution": resolution,
}
}
# Set up headers
headers = {
"X-DashScope-Async": "enable",
"X-DashScope-OssResourceResolve": "enable",
"Authorization": f"Bearer {DASHSCOPE_API_KEY}",
"Content-Type": "application/json"
}
# Make the initial API request
url = "https://dashscope.aliyuncs.com/api/v1/services/aigc/image2video/video-synthesis/"
response = requests.post(url, json=payload, headers=headers)
# Check if request was successful
if response.status_code != 200:
raise Exception(f"Initial request failed with status code {response.status_code}: {response.text}")
# Get the task ID from response
result = response.json()
task_id = result.get("output", {}).get("task_id")
if not task_id:
raise Exception("Failed to get task ID from response")
# Poll for results
get_url = f"https://dashscope.aliyuncs.com/api/v1/tasks/{task_id}"
headers = {
"Authorization": f"Bearer {DASHSCOPE_API_KEY}",
"Content-Type": "application/json"
}
while True:
response = requests.get(get_url, headers=headers)
if response.status_code != 200:
raise Exception(f"Failed to get task status: {response.status_code}: {response.text}")
result = response.json()
print(result)
task_status = result.get("output", {}).get("task_status")
if task_status == "SUCCEEDED":
# Task completed successfully, return video URL
video_url = result["output"]["results"]["video_url"]
return video_url
elif task_status == "FAILED":
# Task failed, raise an exception with error message
error_msg = result.get("output", {}).get("message", "Unknown error")
raise Exception(f"Task failed: {error_msg}")
else:
# Task is still running, wait and retry
time.sleep(5) # Wait 5 seconds before polling again
def start_app():
import argparse
parser = argparse.ArgumentParser(description="Wan2.2-S2V 视频生成工具")
args = parser.parse_args()
app = WanS2VApp()
with gr.Blocks(title="Wan2.2-S2V 视频生成") as demo:
# gr.Markdown("# Wan2.2-S2V 视频生成工具")
gr.HTML("""
<div style="text-align: center; font-size: 32px; font-weight: bold; margin-bottom: 20px;">
Wan2.2-S2V
</div>
""")
gr.Markdown("Generate video from audio and a reference image. This app uses a distilled model; for the full version, deploy [the open-source model](https://huggingface.co/Wan-AI/Wan2.2-S2V-14B).")
with gr.Row():
with gr.Column():
ref_img = gr.Image(
label="Input image(输入图像)",
type="filepath",
sources=["upload"],
)
audio = gr.Audio(
label="Audio(音频文件)",
type="filepath",
sources=["upload"],
)
resolution = gr.Dropdown(
label="Resolution(分辨率)",
choices=["480P", "720P"],
value="480P",
info="Inference Resolution, default: 480P(推理分辨率,默认480P)"
)
run_button = gr.Button("Generate Video(生成视频)")
with gr.Column():
output_video = gr.Video(label="Output Video(输出视频)")
run_button.click(
fn=app.predict,
inputs=[
ref_img,
audio,
resolution,
],
outputs=[output_video],
)
examples_dir = "examples"
if os.path.exists(examples_dir):
example_data = []
files_dict = {}
for file in os.listdir(examples_dir):
file_path = os.path.join(examples_dir, file)
name, ext = os.path.splitext(file)
if ext.lower() in [".png", ".jpg", ".jpeg", ".bmp", ".tiff", ".webp"]:
if name not in files_dict:
files_dict[name] = {}
files_dict[name]["image"] = file_path
elif ext.lower() in [".mp3", ".wav"]:
if name not in files_dict:
files_dict[name] = {}
files_dict[name]["audio"] = file_path
for name, files in files_dict.items():
if "image" in files and "audio" in files:
example_data.append([
files["image"],
files["audio"],
"480P"
])
if example_data:
gr.Examples(
examples=example_data,
inputs=[ref_img, audio, resolution],
outputs=output_video,
fn=app.predict,
cache_examples=False,
)
demo.launch(
server_name="0.0.0.0",
server_port=7860
)
if __name__ == "__main__":
start_app()