|
import requests |
|
import os |
|
import gradio as gr |
|
from huggingface_hub import update_repo_visibility, whoami, upload_folder, create_repo, upload_file, update_repo_visibility, file_exists |
|
import subprocess |
|
|
|
import gradio as gr |
|
import re |
|
import uuid |
|
from typing import Optional |
|
import json |
|
|
|
from apscheduler.schedulers.background import BackgroundScheduler |
|
from huggingface_hub import Repository, HfApi |
|
api = HfApi() |
|
def restart_space(): |
|
api.restart_space(repo_id="civitaiarchive/civitai-to-hf-uploader", token=os.environ["HF_TOKEN"]) |
|
|
|
BOOLPARM = { |
|
|
|
"Default": None, |
|
"Include": True, |
|
"Exclude": False, |
|
|
|
True: "true", |
|
False: "false", |
|
None: "", |
|
} |
|
|
|
def download_file(url, file_path, folder, api_key=None): |
|
headers = {} |
|
full_path = os.path.join(folder, file_path) |
|
os.makedirs(os.path.dirname(full_path), exist_ok=True) |
|
|
|
curl_cmd = ['curl', '--fail', '-L', '-o', full_path, url] |
|
|
|
if api_key: |
|
curl_cmd.extend(['-H', f'Authorization: Bearer {api_key}']) |
|
try: |
|
result = subprocess.run(curl_cmd, check=True, capture_output=True, text=True) |
|
except subprocess.CalledProcessError as e: |
|
if ('401' in e.stderr or '403' in e.stderr) and not api_key: |
|
|
|
api_key = os.environ.get("CIVITAI_API_KEY") |
|
curl_cmd.extend(['-H', f'Authorization: Bearer {api_key}']) |
|
try: |
|
result = subprocess.run(curl_cmd, check=True, capture_output=True, text=True) |
|
except subprocess.CalledProcessError as e: |
|
raise gr.Error(f"Error downloading file with authorization: {e.stderr}") |
|
else: |
|
raise gr.Error(f"Error downloading file: {e.stderr}") |
|
except Exception as e: |
|
raise gr.Error(f"Error downloading file: {str(e)}") |
|
|
|
def get_model_meta(model): |
|
"""Read model metadata from json (api/models format). |
|
|
|
Includes all versions, files and images therein. |
|
""" |
|
dfiles = dict() |
|
dimages = dict() |
|
for version in model['modelVersions']: |
|
mainfile = "" |
|
for file in version['files']: |
|
if mainfile == "": |
|
mainfile = os.path.splitext(file['name'])[0] |
|
dfiles[str(model['id']) + '/' + str(version['id']) + '/' + file['name']] = { |
|
'rawModelId': model['id'], |
|
'rawVersionId': version['id'], |
|
'rawFilename': file['name'], |
|
'downloadUrl': file['downloadUrl'], |
|
'modelId': model['name'] + ' - ' + version['name'], |
|
'modelUrl': f"https://civitai.com/models/{model['id']}?modelVersionId={version['id']}", |
|
'author': model['creator']['username'], |
|
'authorUrl': f"https://civitai.com/user/{model['creator']['username']}", |
|
'mirrorUrl': f"https://civitaiarchive.com/models/{model['id']}?modelVersionId={version['id']}", |
|
} |
|
for image in version['images']: |
|
|
|
|
|
vext = os.path.splitext(image['url'])[-1] |
|
imagename = mainfile + '-ID' + str(image['id']) + '.preview' + vext |
|
dimages[str(model['id']) + '/' + str(version['id']) + '/' + imagename] = { |
|
'rawModelId': model['id'], |
|
'rawVersionId': version['id'], |
|
'rawFilename': imagename, |
|
'downloadUrl': image['url'], |
|
'modelId': model['name'] + ' - ' + version['name'], |
|
'modelUrl': f"https://civitai.com/models/{model['id']}?modelVersionId={version['id']}", |
|
'author': model['creator']['username'], |
|
'authorUrl': f"https://civitai.com/user/{model['creator']['username']}", |
|
'mirrorUrl': f"https://civitaiarchive.com/models/{model['id']}?modelVersionId={version['id']}", |
|
|
|
'nsfwLevel': image['nsfwLevel'], |
|
'poi': image['poi'], |
|
'type': image['type'], |
|
} |
|
return (dfiles, dimages) |
|
|
|
def get_files_by_username(username, api_key=None, nsfw=None, hidden=None): |
|
url = f"https://civitai.com/api/v1/models?username={username}&limit=100" |
|
output = {} |
|
images = {} |
|
headers = {} |
|
if nsfw is not None: |
|
url = url + f"&nsfw={BOOLPARM[nsfw]}" |
|
if hidden is not None: |
|
url = url + f"&hidden={BOOLPARM[hidden]}" |
|
gr.Info(f"SBM url: {url}") |
|
if api_key: |
|
headers['Authorization'] = f'Bearer {api_key}' |
|
|
|
while url: |
|
response = requests.get(url, headers=headers, timeout=180) |
|
data = response.json() |
|
|
|
for model in data['items']: |
|
(dfiles, dimages) = get_model_meta(model) |
|
output.update(dfiles) |
|
images.update(dimages) |
|
|
|
metadata = data.get('metadata', {}) |
|
url = metadata.get('nextPage', None) |
|
return (output, images) |
|
|
|
def get_files_by_model_id(model_id, api_key=None): |
|
api_url = f"https://civitai.com/api/v1/models/{model_id}" |
|
headers = {} |
|
if api_key: |
|
headers['Authorization'] = f'Bearer {api_key}' |
|
|
|
try: |
|
response = requests.get(api_url, headers=headers) |
|
response.raise_for_status() |
|
model = response.json() |
|
|
|
output = {} |
|
images = {} |
|
(dfiles, dimages) = get_model_meta(model) |
|
output.update(dfiles) |
|
images.update(dimages) |
|
return (output, images) |
|
|
|
except requests.exceptions.RequestException as e: |
|
raise gr.Error("Something went wrong in fetching CivitAI API") |
|
|
|
def process_url(url, profile, user_repo_id, oauth_token, folder, api_key=None, nsfw=None, hidden=None, indimages = False): |
|
if url.startswith("https://civitai.com/models/"): |
|
model_id = url.split('/')[4] |
|
(files, images) = get_files_by_model_id(model_id, api_key) |
|
elif url.startswith("https://civitai.com/user/"): |
|
username = url.split('/')[4] |
|
(files, images) = get_files_by_username(username, api_key, nsfw, hidden) |
|
else: |
|
raise gr.Error("Unknown CivitAI URL format, please provide model URL or user profile URL") |
|
|
|
total_files = len(files) |
|
total_images = len(images) |
|
|
|
gr.Info(f"Found {total_files} files to download, {total_images} images") |
|
|
|
downloaded_files = {} |
|
current_file = 1 |
|
for dl_path, data in files.items(): |
|
try: |
|
download_url = data['downloadUrl'] |
|
filename = data["rawFilename"] |
|
if file_exists( |
|
repo_id = user_repo_id, |
|
filename = dl_path, |
|
token = oauth_token |
|
): |
|
gr.Info(f"Skipping {filename}, folder exists {dl_path}") |
|
continue |
|
gr.Info(f"Downloading {filename} ({current_file}/{total_files})") |
|
download_file(download_url, dl_path, folder, api_key) |
|
|
|
gr.Info(f"Uploading {filename} ({current_file}/{total_files})") |
|
base_folder = os.path.join(folder, os.path.dirname(dl_path)) |
|
|
|
|
|
readme = f""" |
|
Author: [{data['author']}]({data['authorUrl']}) |
|
|
|
Model: [{data['modelUrl']}]({data['modelUrl']}) |
|
|
|
Mirror: [{data['mirrorUrl']}]({data['mirrorUrl']}) |
|
""" |
|
with open(os.path.join(base_folder, "README.md"), "w") as f: |
|
f.write(readme) |
|
|
|
upload_folder( |
|
folder_path=base_folder, |
|
repo_id=user_repo_id, |
|
repo_type="model", |
|
path_in_repo=os.path.dirname(dl_path), |
|
token=oauth_token |
|
) |
|
downloaded_files[dl_path] = download_url |
|
except Exception as e: |
|
gr.Warning(f"Failed to download {dl_path}: {str(e)}") |
|
finally: |
|
current_file += 1 |
|
|
|
|
|
downloaded_images = {} |
|
current_image = 1 |
|
gr.Info(f"SBM get images {indimages}") |
|
if not indimages: |
|
gr.Info(f"Skipping all images.") |
|
else: |
|
for dl_path, data in images.items(): |
|
try: |
|
download_url = data['downloadUrl'] |
|
filename = data["rawFilename"] |
|
checkread = os.path.join(os.path.dirname(dl_path), "README.md") |
|
if not file_exists( |
|
|
|
repo_id = user_repo_id, |
|
filename = checkread, |
|
token = oauth_token |
|
): |
|
gr.Info(f"Skipping {filename}, model was not created {checkread}") |
|
continue |
|
if file_exists( |
|
repo_id = user_repo_id, |
|
filename = dl_path, |
|
token = oauth_token |
|
): |
|
gr.Info(f"Skipping {filename}, folder exists {dl_path}") |
|
continue |
|
|
|
gr.Info(f"Downloading {filename} ({current_image}/{total_images})") |
|
download_file(download_url, dl_path, folder, api_key) |
|
|
|
gr.Info(f"Uploading {filename} ({current_image}/{total_images})") |
|
base_folder = os.path.join(folder, os.path.dirname(dl_path)) |
|
upload_folder( |
|
folder_path=base_folder, |
|
repo_id=user_repo_id, |
|
repo_type="model", |
|
path_in_repo=os.path.dirname(dl_path), |
|
token=oauth_token |
|
) |
|
downloaded_images[dl_path] = download_url |
|
except Exception as e: |
|
gr.Warning(f"Failed to download {dl_path}: {str(e)}") |
|
finally: |
|
current_image += 1 |
|
|
|
return (files, images) |
|
|
|
|
|
def add_mirror(repo_id): |
|
response = requests.post("https://civitaiarchive.com/api/mirrors", |
|
headers={ |
|
"Authorization": f"Bearer {os.environ['CIVITAIARCHIVE_API_KEY']}", |
|
"Content-Type": "application/json" |
|
}, |
|
json={ |
|
"type": "huggingface", |
|
"url": repo_id |
|
}) |
|
if response.status_code == 200: |
|
gr.Info("Added mirror to CivitaiArchive.com") |
|
else: |
|
gr.Error("Failed to add mirror to CivitaiArchive.com") |
|
|
|
|
|
|
|
def upload_civit_to_hf(profile: Optional[gr.OAuthProfile], oauth_token: gr.OAuthToken, url, destination_repo, civitai_api_key=None, nsfw=None, hidden=None, indimages=False): |
|
if not profile.name: |
|
raise gr.Error("Are you sure you are logged in?") |
|
|
|
if not destination_repo: |
|
raise gr.Error("Please provide a destination repository name") |
|
|
|
|
|
if not re.match(r'^[a-zA-Z0-9_-]+$', destination_repo): |
|
raise gr.Error("Destination repository name must contain only alphanumeric characters, underscores, and hyphens") |
|
|
|
|
|
nsfw = BOOLPARM[nsfw] |
|
hidden = BOOLPARM[hidden] |
|
|
|
folder = str(uuid.uuid4()) |
|
os.makedirs(folder, exist_ok=False) |
|
gr.Info(f"Starting download from {url}") |
|
|
|
try: |
|
user_repo_id = f"{profile.username}/{destination_repo}" |
|
|
|
|
|
try: |
|
create_repo(repo_id=user_repo_id, private=True, exist_ok=False, token=oauth_token.token) |
|
gr.Info(f"Created new repository {user_repo_id}") |
|
except Exception as e: |
|
gr.Info(f"Repository {user_repo_id} already exists, will update it") |
|
update_repo_visibility(repo_id=user_repo_id, private=False, token=oauth_token.token) |
|
|
|
gr.Info(f"SBM get images upload {indimages}") |
|
(files, images) = process_url(url, profile, user_repo_id, oauth_token.token, folder, civitai_api_key, nsfw, hidden, indimages) |
|
if not files or len(files.keys()) == 0: |
|
raise gr.Error("No files were copied. Something went wrong.") |
|
|
|
gr.Info(f"Copied {len(files)} files") |
|
if indimages: |
|
gr.Info(f"Copied {len(images)} images") |
|
|
|
results = [] |
|
|
|
results.append(f"## [{user_repo_id}](https://huggingface.co/{user_repo_id})") |
|
|
|
if not results: |
|
raise gr.Error("Failed to upload any models. Please check the logs for details.") |
|
|
|
add_mirror(user_repo_id) |
|
|
|
return "# Models uploaded to 🤗!\n" + "\n".join(results) |
|
|
|
except Exception as e: |
|
print(e) |
|
raise gr.Error(f"Error during upload process: {str(e)}") |
|
finally: |
|
|
|
if os.path.exists(folder): |
|
import shutil |
|
shutil.rmtree(folder) |
|
|
|
css = ''' |
|
#login { |
|
width: 100% !important; |
|
margin: 0 auto; |
|
} |
|
#disabled_upload{ |
|
opacity: 0.5; |
|
pointer-events:none; |
|
} |
|
''' |
|
|
|
with gr.Blocks(css=css) as demo: |
|
gr.Markdown('''# Upload CivitAI models to HuggingFace |
|
|
|
You can upload either: |
|
- A single model by providing a CivitAI model URL (e.g., https://civitai.com/models/144684) |
|
- All models from a user by providing their profile URL (e.g., https://civitai.com/user/username) |
|
|
|
This will create a new HuggingFace repository under your username if it doesn't exist. |
|
Once uploaded, it will add this repository to CivitaiArchive.com as a mirror. |
|
''') |
|
|
|
gr.LoginButton(elem_id="login") |
|
|
|
|
|
with gr.Column() : |
|
submit_source_civit = gr.Textbox( |
|
placeholder="https://civitai.com/models/144684 or https://civitai.com/user/username", |
|
label="CivitAI URL", |
|
info="Enter either a model URL or user profile URL", |
|
) |
|
destination_repo = gr.Textbox( |
|
placeholder="my-awesome-model", |
|
label="HF Repo Name", |
|
info="Name for the HuggingFace repository (a new one will be created if it doesn't exist)", |
|
) |
|
civitai_api_key = gr.Textbox( |
|
placeholder="Your CivitAI API key (optional)", |
|
label="CivitAI API Key", |
|
info="Optional: Provide your own CivitAI API key to avoid rate limits. If not provided, a default key will be used.", |
|
) |
|
include_nsfw = gr.Dropdown( |
|
choices=["Default", "Include", "Exclude"], |
|
value="Include", |
|
label="Nsfw models", |
|
info="Optional: Include, exclude or do not specify inclusion of nsfw models.", |
|
) |
|
include_hidden = gr.Dropdown( |
|
choices=["Default", "Include", "Exclude"], |
|
value="Default", |
|
label="Hidden models", |
|
info="Optional: Include, exclude or do not specify inclusion of hidden models.", |
|
) |
|
upload_images = gr.Checkbox( |
|
value=True, |
|
interactive=True, |
|
label="Upload images", |
|
info="Optional: Upload images alongside the models for future access. Beware content restrictions." |
|
) |
|
|
|
instructions = gr.HTML("") |
|
submit_button_civit = gr.Button("Upload to Hugging Face", interactive=True) |
|
output = gr.Markdown(label="Upload Progress") |
|
|
|
|
|
|
|
submit_button_civit.click( |
|
fn=upload_civit_to_hf, |
|
inputs=[submit_source_civit, destination_repo, civitai_api_key, include_nsfw, include_hidden, upload_images], |
|
outputs=[output] |
|
) |
|
|
|
scheduler = BackgroundScheduler() |
|
scheduler.add_job(restart_space, 'interval', seconds=3600) |
|
scheduler.start() |
|
|
|
demo.queue(default_concurrency_limit=50) |
|
demo.launch() |