|
import requests |
|
import os |
|
import gradio as gr |
|
from huggingface_hub import update_repo_visibility, whoami, upload_folder, create_repo, upload_file, update_repo_visibility, file_exists |
|
import subprocess |
|
|
|
import gradio as gr |
|
import re |
|
import uuid |
|
from typing import Optional |
|
import json |
|
|
|
from apscheduler.schedulers.background import BackgroundScheduler |
|
from huggingface_hub import Repository, HfApi |
|
api = HfApi() |
|
def restart_space(): |
|
api.restart_space(repo_id="civitaiarchive/civitai-to-hf-uploader", token=os.environ["HF_TOKEN"]) |
|
|
|
|
|
def download_file(url, file_path, folder, api_key=None): |
|
headers = {} |
|
full_path = os.path.join(folder, file_path) |
|
os.makedirs(os.path.dirname(full_path), exist_ok=True) |
|
|
|
curl_cmd = ['curl', '--fail', '-L', '-o', full_path, url] |
|
|
|
if api_key: |
|
curl_cmd.extend(['-H', f'Authorization: Bearer {api_key}']) |
|
try: |
|
result = subprocess.run(curl_cmd, check=True, capture_output=True, text=True) |
|
except subprocess.CalledProcessError as e: |
|
if ('401' in e.stderr or '403' in e.stderr) and not api_key: |
|
|
|
api_key = os.environ.get("CIVITAI_API_KEY") |
|
curl_cmd.extend(['-H', f'Authorization: Bearer {api_key}']) |
|
try: |
|
result = subprocess.run(curl_cmd, check=True, capture_output=True, text=True) |
|
except subprocess.CalledProcessError as e: |
|
raise gr.Error(f"Error downloading file with authorization: {e.stderr}") |
|
else: |
|
raise gr.Error(f"Error downloading file: {e.stderr}") |
|
except Exception as e: |
|
raise gr.Error(f"Error downloading file: {str(e)}") |
|
|
|
|
|
def get_files_by_username(username, api_key=None): |
|
url = f"https://civitai.com/api/v1/models?username={username}&limit=100&nsfw=true" |
|
output = {} |
|
headers = {} |
|
if api_key: |
|
headers['Authorization'] = f'Bearer {api_key}' |
|
|
|
while url: |
|
response = requests.get(url, headers=headers, timeout=30) |
|
data = response.json() |
|
|
|
for model in data['items']: |
|
for version in model['modelVersions']: |
|
for file in version['files']: |
|
output[str(model['id']) + '/' + str(version['id']) + '/' + file['name']] = { |
|
'downloadUrl': file['downloadUrl'], |
|
'modelId': model['name'] + ' - ' + version['name'], |
|
'modelUrl': f"https://civitai.com/models/{model['id']}?modelVersionId={version['id']}", |
|
'author': model['creator']['username'], |
|
'authorUrl': f"https://civitai.com/user/{model['creator']['username']}", |
|
'mirrorUrl': f"https://civitaiarchive.com/models/{model['id']}?modelVersionId={version['id']}", |
|
} |
|
|
|
metadata = data.get('metadata', {}) |
|
url = metadata.get('nextPage', None) |
|
return output |
|
|
|
def get_files_by_model_id(model_id, api_key=None): |
|
api_url = f"https://civitai.com/api/v1/models/{model_id}" |
|
headers = {} |
|
if api_key: |
|
headers['Authorization'] = f'Bearer {api_key}' |
|
|
|
try: |
|
response = requests.get(api_url, headers=headers) |
|
response.raise_for_status() |
|
model = response.json() |
|
|
|
output = {} |
|
for version in model['modelVersions']: |
|
for file in version['files']: |
|
output[str(model['id']) + '/' + str(version['id']) + '/' + file['name']] = { |
|
'downloadUrl': file['downloadUrl'], |
|
'modelId': model['name'] + ' - ' + version['name'], |
|
'modelUrl': f"https://civitai.com/models/{model['id']}?modelVersionId={version['id']}", |
|
'author': model['creator']['username'], |
|
'authorUrl': f"https://civitai.com/user/{model['creator']['username']}", |
|
'mirrorUrl': f"https://civitaiarchive.com/models/{model['id']}?modelVersionId={version['id']}", |
|
} |
|
return output |
|
|
|
except requests.exceptions.RequestException as e: |
|
raise gr.Error("Something went wrong in fetching CivitAI API") |
|
|
|
def process_url(url, profile, user_repo_id, oauth_token, folder, api_key=None): |
|
if url.startswith("https://civitai.com/models/"): |
|
model_id = url.split('/')[4] |
|
files = get_files_by_model_id(model_id, api_key) |
|
elif url.startswith("https://civitai.com/user/"): |
|
username = url.split('/')[4] |
|
files = get_files_by_username(username, api_key) |
|
else: |
|
raise gr.Error("Unknown CivitAI URL format, please provide model URL or user profile URL") |
|
|
|
gr.Info(f"Found {len(files)} files to download") |
|
|
|
|
|
downloaded_files = {} |
|
total_files = len(files) |
|
current_file = 1 |
|
for dl_path, data in files.items(): |
|
try: |
|
download_url = data['downloadUrl'] |
|
filename = dl_path.split('/')[-1] |
|
if file_exists( |
|
repo_id = user_repo_id, |
|
filename = dl_path, |
|
token = oauth_token |
|
): |
|
gr.Info(f"Skipping {filename}, folder exists {dl_path}") |
|
continue |
|
gr.Info(f"Downloading {filename} ({current_file}/{total_files})") |
|
download_file(download_url, dl_path, folder, api_key) |
|
|
|
gr.Info(f"Uploading {filename} ({current_file}/{total_files})") |
|
base_folder = os.path.join(folder, os.path.dirname(dl_path)) |
|
|
|
|
|
readme = f""" |
|
Author: [{data['author']}]({data['authorUrl']}) |
|
|
|
Model: [{data['modelUrl']}]({data['modelUrl']}) |
|
|
|
Mirror: [{data['mirrorUrl']}]({data['mirrorUrl']}) |
|
""" |
|
with open(os.path.join(base_folder, "README.md"), "w") as f: |
|
f.write(readme) |
|
|
|
upload_folder( |
|
folder_path=base_folder, |
|
repo_id=user_repo_id, |
|
repo_type="model", |
|
path_in_repo=os.path.dirname(dl_path), |
|
token=oauth_token |
|
) |
|
downloaded_files[dl_path] = download_url |
|
except Exception as e: |
|
gr.Warning(f"Failed to download {dl_path}: {str(e)}") |
|
finally: |
|
current_file += 1 |
|
|
|
return files |
|
|
|
|
|
|
|
def add_mirror(repo_id): |
|
response = requests.post("https://civitaiarchive.com/api/mirrors", |
|
headers={ |
|
"Authorization": f"Bearer {os.environ['CIVITAIARCHIVE_API_KEY']}", |
|
"Content-Type": "application/json" |
|
}, |
|
json={ |
|
"type": "huggingface", |
|
"url": repo_id |
|
}) |
|
if response.status_code == 200: |
|
gr.Info("Added mirror to CivitaiArchive.com") |
|
else: |
|
gr.Error("Failed to add mirror to CivitaiArchive.com") |
|
|
|
|
|
|
|
def upload_civit_to_hf(profile: Optional[gr.OAuthProfile], oauth_token: gr.OAuthToken, url, destination_repo, civitai_api_key=None): |
|
if not profile.name: |
|
raise gr.Error("Are you sure you are logged in?") |
|
|
|
if not destination_repo: |
|
raise gr.Error("Please provide a destination repository name") |
|
|
|
|
|
if not re.match(r'^[a-zA-Z0-9_-]+$', destination_repo): |
|
raise gr.Error("Destination repository name must contain only alphanumeric characters, underscores, and hyphens") |
|
|
|
folder = str(uuid.uuid4()) |
|
os.makedirs(folder, exist_ok=False) |
|
gr.Info(f"Starting download from {url}") |
|
|
|
try: |
|
user_repo_id = f"{profile.username}/{destination_repo}" |
|
|
|
|
|
try: |
|
create_repo(repo_id=user_repo_id, private=True, exist_ok=False, token=oauth_token.token) |
|
gr.Info(f"Created new repository {user_repo_id}") |
|
except Exception as e: |
|
gr.Info(f"Repository {user_repo_id} already exists, will update it") |
|
update_repo_visibility(repo_id=user_repo_id, private=False, token=oauth_token.token) |
|
|
|
files = process_url(url, profile, user_repo_id, oauth_token.token, folder, civitai_api_key) |
|
if not files or len(files.keys()) == 0: |
|
raise gr.Error("No files were copied. Something went wrong.") |
|
|
|
gr.Info(f"Copied {len(files)} files") |
|
|
|
results = [] |
|
|
|
results.append(f"## [{user_repo_id}](https://huggingface.co/{user_repo_id})") |
|
|
|
if not results: |
|
raise gr.Error("Failed to upload any models. Please check the logs for details.") |
|
|
|
add_mirror(user_repo_id) |
|
|
|
return "# Models uploaded to 🤗!\n" + "\n".join(results) |
|
|
|
except Exception as e: |
|
print(e) |
|
raise gr.Error(f"Error during upload process: {str(e)}") |
|
finally: |
|
|
|
if os.path.exists(folder): |
|
import shutil |
|
shutil.rmtree(folder) |
|
|
|
css = ''' |
|
#login { |
|
width: 100% !important; |
|
margin: 0 auto; |
|
} |
|
#disabled_upload{ |
|
opacity: 0.5; |
|
pointer-events:none; |
|
} |
|
''' |
|
|
|
with gr.Blocks(css=css) as demo: |
|
gr.Markdown('''# Upload CivitAI models to HuggingFace |
|
|
|
You can upload either: |
|
- A single model by providing a CivitAI model URL (e.g., https://civitai.com/models/144684) |
|
- All models from a user by providing their profile URL (e.g., https://civitai.com/user/username) |
|
|
|
This will create a new HuggingFace repository under your username if it doesn't exist. |
|
Once uploaded, it will add this repository to CivitaiArchive.com as a mirror. |
|
''') |
|
|
|
gr.LoginButton(elem_id="login") |
|
|
|
|
|
with gr.Column() : |
|
submit_source_civit = gr.Textbox( |
|
placeholder="https://civitai.com/models/144684 or https://civitai.com/user/username", |
|
label="CivitAI URL", |
|
info="Enter either a model URL or user profile URL", |
|
) |
|
destination_repo = gr.Textbox( |
|
placeholder="my-awesome-model", |
|
label="HF Repo Name", |
|
info="Name for the HuggingFace repository (a new one will be created if it doesn't exist)", |
|
) |
|
civitai_api_key = gr.Textbox( |
|
placeholder="Your CivitAI API key (optional)", |
|
label="CivitAI API Key", |
|
info="Optional: Provide your own CivitAI API key to avoid rate limits. If not provided, a default key will be used.", |
|
) |
|
|
|
instructions = gr.HTML("") |
|
submit_button_civit = gr.Button("Upload to Hugging Face", interactive=True) |
|
output = gr.Markdown(label="Upload Progress") |
|
|
|
|
|
|
|
submit_button_civit.click( |
|
fn=upload_civit_to_hf, |
|
inputs=[submit_source_civit, destination_repo, civitai_api_key], |
|
outputs=[output] |
|
) |
|
|
|
scheduler = BackgroundScheduler() |
|
scheduler.add_job(restart_space, 'interval', seconds=3600) |
|
scheduler.start() |
|
|
|
demo.queue(default_concurrency_limit=50) |
|
demo.launch() |