Spaces:
Runtime error
Runtime error
File size: 7,609 Bytes
e9c50ee |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 |
import requests
import os
import gradio as gr
from huggingface_hub import update_repo_visibility, whoami, upload_folder, create_repo, upload_file, update_repo_visibility
import gradio as gr
import re
import uuid
from typing import Optional
import json
def download_file(url, file_path, folder):
headers = {}
gr.Info("Downloading file from "+ url + " to "+ file_path)
try:
response = requests.get(url, headers=headers)
response.raise_for_status()
except requests.exceptions.HTTPError as e:
print(e)
if response.status_code == 401:
headers['Authorization'] = f'Bearer {os.environ["CIVITAI_API_KEY"]}'
try:
response = requests.get(url, headers=headers)
response.raise_for_status()
except requests.exceptions.RequestException as e:
raise gr.Error(f"Error downloading file: {e}")
else:
raise gr.Error(f"Error downloading file: {e}")
except requests.exceptions.RequestException as e:
raise gr.Error(f"Error downloading file: {e}")
os.makedirs(os.path.join(folder, os.path.dirname(file_path)), exist_ok=True)
with open(os.path.join(folder, file_path), 'wb') as f:
f.write(response.content)
gr.Info("Downloaded "+ file_path)
def get_files_by_username(username):
url = f"https://civitai.com/api/v1/models?username={username}&limit=100"
output = {}
while url:
response = requests.get(url)
data = response.json()
# Add current page items to the list
for model in data['items']:
for version in model['modelVersions']:
for file in version['files']:
output[str(model['id']) + '/' + str(version['id']) + '/' + file['name']] = file['downloadUrl']
return output
def get_files_by_model_id(model_id):
api_url = f"https://civitai.com/api/v1/models/{model_id}"
try:
response = requests.get(api_url)
response.raise_for_status()
model = response.json()
output = {}
for version in model['modelVersions']:
for file in version['files']:
output[str(model['id']) + '/' + str(version['id']) + '/' + file['name']] = file['downloadUrl']
return output
except requests.exceptions.RequestException as e:
raise gr.Error("Something went wrong in fetching CivitAI API")
def process_url(url, profile, do_download=True, folder="."):
if url.startswith("https://civitai.com/models/"):
model_id = url.split('/')[4]
files = get_files_by_model_id(model_id)
elif url.startswith("https://civitai.com/user/"):
username = url.split('/')[4]
files = get_files_by_username(username)
else:
raise gr.Error("Unknown CivitAI URL format, please provide model URL or user profile URL")
if do_download:
downloaded_files = {}
for dl_path, download_url in files.items():
try:
download_file(download_url, dl_path, folder)
downloaded_files[dl_path] = download_url
except Exception as e:
gr.Warning(f"Failed to download {dl_path}: {str(e)}")
return downloaded_files
return files
def add_mirror(repo_id):
response = requests.post("https://civitaiarchive.com/api/mirrors",
headers={
"Authorization": f"Bearer {os.environ['CIVITAIARCHIVE_API_KEY']}",
"Content-Type": "application/json"
},
json={
"type": "huggingface",
"url": repo_id
})
if response.status_code == 200:
gr.Info("Added mirror to CivitaiArchive.com")
else:
gr.Error("Failed to add mirror to CivitaiArchive.com")
def upload_civit_to_hf(profile: Optional[gr.OAuthProfile], oauth_token: gr.OAuthToken, url, destination_repo):
if not profile.name:
return gr.Error("Are you sure you are logged in?")
if not destination_repo:
return gr.Error("Please provide a destination repository name")
# validate destination repo is alphanumeric
if not re.match(r'^[a-zA-Z0-9_-]+$', destination_repo):
return gr.Error("Destination repository name must contain only alphanumeric characters, underscores, and hyphens")
folder = str(uuid.uuid4())
os.makedirs(folder, exist_ok=False)
gr.Info(f"Starting download from {url}")
try:
files = process_url(url, profile, folder=folder)
if not files or len(files.keys()) == 0:
return gr.Error("No files were downloaded. Please check the URL and try again.")
results = []
user_repo_id = f"{profile.username}/{destination_repo}"
# Try to create repo only if it doesn't exist
try:
create_repo(repo_id=user_repo_id, private=True, exist_ok=False, token=oauth_token.token)
gr.Info(f"Created new repository {user_repo_id}")
except Exception as e:
gr.Info(f"Repository {user_repo_id} already exists, will update it:"+ str(e))
gr.Info(f"Uploading models to {user_repo_id}...")
# Upload the model and card
upload_folder(
folder_path=folder,
repo_id=user_repo_id,
repo_type="model",
token=oauth_token.token
)
update_repo_visibility(repo_id=user_repo_id, private=False, token=oauth_token.token)
results.append(f"## [{user_repo_id}](https://huggingface.co/{user_repo_id})")
if not results:
return gr.Error("Failed to upload any models. Please check the logs for details.")
add_mirror(user_repo_id)
return "# Models uploaded to 🤗!\n" + "\n".join(results)
except Exception as e:
print(e)
raise gr.Error(f"Error during upload process: {str(e)}")
finally:
# Cleanup
if os.path.exists(folder):
import shutil
shutil.rmtree(folder)
css = '''
#login {
width: 100% !important;
margin: 0 auto;
}
#disabled_upload{
opacity: 0.5;
pointer-events:none;
}
'''
with gr.Blocks(css=css) as demo:
gr.Markdown('''# Upload CivitAI models to HuggingFace
You can upload either:
- A single model by providing a CivitAI model URL (e.g., https://civitai.com/models/144684)
- All models from a user by providing their profile URL (e.g., https://civitai.com/user/username)
This will create a new HuggingFace repository under your username if it doesn't exist.
Once uploaded, it will add this repository to CivitaiArchive.com as a mirror.
''')
gr.LoginButton(elem_id="login")
with gr.Column() :
submit_source_civit = gr.Textbox(
placeholder="https://civitai.com/models/144684 or https://civitai.com/user/username",
label="CivitAI URL",
info="Enter either a model URL or user profile URL",
)
destination_repo = gr.Textbox(
placeholder="my-awesome-model",
label="HF Repo Name",
info="Name for the HuggingFace repository (a new one will be created if it doesn't exist)",
)
instructions = gr.HTML("")
submit_button_civit = gr.Button("Upload to Hugging Face", interactive=True)
output = gr.Markdown(label="Upload Progress")
submit_button_civit.click(
fn=upload_civit_to_hf,
inputs=[submit_source_civit, destination_repo],
outputs=[output]
)
demo.queue(default_concurrency_limit=50)
demo.launch() |