Spaces:
Runtime error
Runtime error
import requests | |
import os | |
import gradio as gr | |
from huggingface_hub import update_repo_visibility, whoami, upload_folder, create_repo, upload_file, update_repo_visibility | |
import gradio as gr | |
import re | |
import uuid | |
from typing import Optional | |
import json | |
def download_file(url, file_path, folder): | |
headers = {} | |
gr.Info("Downloading file from "+ url + " to "+ file_path) | |
try: | |
response = requests.get(url, headers=headers) | |
response.raise_for_status() | |
except requests.exceptions.HTTPError as e: | |
print(e) | |
if response.status_code == 401: | |
headers['Authorization'] = f'Bearer {os.environ["CIVITAI_API_KEY"]}' | |
try: | |
response = requests.get(url, headers=headers) | |
response.raise_for_status() | |
except requests.exceptions.RequestException as e: | |
raise gr.Error(f"Error downloading file: {e}") | |
else: | |
raise gr.Error(f"Error downloading file: {e}") | |
except requests.exceptions.RequestException as e: | |
raise gr.Error(f"Error downloading file: {e}") | |
os.makedirs(os.path.join(folder, os.path.dirname(file_path)), exist_ok=True) | |
with open(os.path.join(folder, file_path), 'wb') as f: | |
f.write(response.content) | |
gr.Info("Downloaded "+ file_path) | |
def get_files_by_username(username): | |
url = f"https://civitai.com/api/v1/models?username={username}&limit=100" | |
output = {} | |
while url: | |
response = requests.get(url) | |
data = response.json() | |
# Add current page items to the list | |
for model in data['items']: | |
for version in model['modelVersions']: | |
for file in version['files']: | |
output[str(model['id']) + '/' + str(version['id']) + '/' + file['name']] = file['downloadUrl'] | |
return output | |
def get_files_by_model_id(model_id): | |
api_url = f"https://civitai.com/api/v1/models/{model_id}" | |
try: | |
response = requests.get(api_url) | |
response.raise_for_status() | |
model = response.json() | |
output = {} | |
for version in model['modelVersions']: | |
for file in version['files']: | |
output[str(model['id']) + '/' + str(version['id']) + '/' + file['name']] = file['downloadUrl'] | |
return output | |
except requests.exceptions.RequestException as e: | |
raise gr.Error("Something went wrong in fetching CivitAI API") | |
def process_url(url, profile, do_download=True, folder="."): | |
if url.startswith("https://civitai.com/models/"): | |
model_id = url.split('/')[4] | |
files = get_files_by_model_id(model_id) | |
elif url.startswith("https://civitai.com/user/"): | |
username = url.split('/')[4] | |
files = get_files_by_username(username) | |
else: | |
raise gr.Error("Unknown CivitAI URL format, please provide model URL or user profile URL") | |
if do_download: | |
downloaded_files = {} | |
for dl_path, download_url in files.items(): | |
try: | |
download_file(download_url, dl_path, folder) | |
downloaded_files[dl_path] = download_url | |
except Exception as e: | |
gr.Warning(f"Failed to download {dl_path}: {str(e)}") | |
return downloaded_files | |
return files | |
def add_mirror(repo_id): | |
response = requests.post("https://civitaiarchive.com/api/mirrors", | |
headers={ | |
"Authorization": f"Bearer {os.environ['CIVITAIARCHIVE_API_KEY']}", | |
"Content-Type": "application/json" | |
}, | |
json={ | |
"type": "huggingface", | |
"url": repo_id | |
}) | |
if response.status_code == 200: | |
gr.Info("Added mirror to CivitaiArchive.com") | |
else: | |
gr.Error("Failed to add mirror to CivitaiArchive.com") | |
def upload_civit_to_hf(profile: Optional[gr.OAuthProfile], oauth_token: gr.OAuthToken, url, destination_repo): | |
if not profile.name: | |
return gr.Error("Are you sure you are logged in?") | |
if not destination_repo: | |
return gr.Error("Please provide a destination repository name") | |
# validate destination repo is alphanumeric | |
if not re.match(r'^[a-zA-Z0-9_-]+$', destination_repo): | |
return gr.Error("Destination repository name must contain only alphanumeric characters, underscores, and hyphens") | |
folder = str(uuid.uuid4()) | |
os.makedirs(folder, exist_ok=False) | |
gr.Info(f"Starting download from {url}") | |
try: | |
files = process_url(url, profile, folder=folder) | |
if not files or len(files.keys()) == 0: | |
return gr.Error("No files were downloaded. Please check the URL and try again.") | |
results = [] | |
user_repo_id = f"{profile.username}/{destination_repo}" | |
# Try to create repo only if it doesn't exist | |
try: | |
create_repo(repo_id=user_repo_id, private=True, exist_ok=False, token=oauth_token.token) | |
gr.Info(f"Created new repository {user_repo_id}") | |
except Exception as e: | |
gr.Info(f"Repository {user_repo_id} already exists, will update it:"+ str(e)) | |
gr.Info(f"Uploading models to {user_repo_id}...") | |
# Upload the model and card | |
upload_folder( | |
folder_path=folder, | |
repo_id=user_repo_id, | |
repo_type="model", | |
token=oauth_token.token | |
) | |
update_repo_visibility(repo_id=user_repo_id, private=False, token=oauth_token.token) | |
results.append(f"## [{user_repo_id}](https://huggingface.co/{user_repo_id})") | |
if not results: | |
return gr.Error("Failed to upload any models. Please check the logs for details.") | |
add_mirror(user_repo_id) | |
return "# Models uploaded to 🤗!\n" + "\n".join(results) | |
except Exception as e: | |
print(e) | |
raise gr.Error(f"Error during upload process: {str(e)}") | |
finally: | |
# Cleanup | |
if os.path.exists(folder): | |
import shutil | |
shutil.rmtree(folder) | |
css = ''' | |
#login { | |
width: 100% !important; | |
margin: 0 auto; | |
} | |
#disabled_upload{ | |
opacity: 0.5; | |
pointer-events:none; | |
} | |
''' | |
with gr.Blocks(css=css) as demo: | |
gr.Markdown('''# Upload CivitAI models to HuggingFace | |
You can upload either: | |
- A single model by providing a CivitAI model URL (e.g., https://civitai.com/models/144684) | |
- All models from a user by providing their profile URL (e.g., https://civitai.com/user/username) | |
This will create a new HuggingFace repository under your username if it doesn't exist. | |
Once uploaded, it will add this repository to CivitaiArchive.com as a mirror. | |
''') | |
gr.LoginButton(elem_id="login") | |
with gr.Column() : | |
submit_source_civit = gr.Textbox( | |
placeholder="https://civitai.com/models/144684 or https://civitai.com/user/username", | |
label="CivitAI URL", | |
info="Enter either a model URL or user profile URL", | |
) | |
destination_repo = gr.Textbox( | |
placeholder="my-awesome-model", | |
label="HF Repo Name", | |
info="Name for the HuggingFace repository (a new one will be created if it doesn't exist)", | |
) | |
instructions = gr.HTML("") | |
submit_button_civit = gr.Button("Upload to Hugging Face", interactive=True) | |
output = gr.Markdown(label="Upload Progress") | |
submit_button_civit.click( | |
fn=upload_civit_to_hf, | |
inputs=[submit_source_civit, destination_repo], | |
outputs=[output] | |
) | |
demo.queue(default_concurrency_limit=50) | |
demo.launch() |