import requests import os import gradio as gr from huggingface_hub import update_repo_visibility, whoami, upload_folder, create_repo, upload_file, update_repo_visibility import gradio as gr import re import uuid from typing import Optional import json def download_file(url, file_path, folder): headers = {} gr.Info("Downloading file from "+ url + " to "+ file_path) try: response = requests.get(url, headers=headers) response.raise_for_status() except requests.exceptions.HTTPError as e: print(e) if response.status_code == 401: headers['Authorization'] = f'Bearer {os.environ["CIVITAI_API_KEY"]}' try: response = requests.get(url, headers=headers) response.raise_for_status() except requests.exceptions.RequestException as e: raise gr.Error(f"Error downloading file: {e}") else: raise gr.Error(f"Error downloading file: {e}") except requests.exceptions.RequestException as e: raise gr.Error(f"Error downloading file: {e}") os.makedirs(os.path.join(folder, os.path.dirname(file_path)), exist_ok=True) with open(os.path.join(folder, file_path), 'wb') as f: f.write(response.content) gr.Info("Downloaded "+ file_path) def get_files_by_username(username): url = f"https://civitai.com/api/v1/models?username={username}&limit=100" output = {} while url: response = requests.get(url) data = response.json() # Add current page items to the list for model in data['items']: for version in model['modelVersions']: for file in version['files']: output[str(model['id']) + '/' + str(version['id']) + '/' + file['name']] = file['downloadUrl'] return output def get_files_by_model_id(model_id): api_url = f"https://civitai.com/api/v1/models/{model_id}" try: response = requests.get(api_url) response.raise_for_status() model = response.json() output = {} for version in model['modelVersions']: for file in version['files']: output[str(model['id']) + '/' + str(version['id']) + '/' + file['name']] = file['downloadUrl'] return output except requests.exceptions.RequestException as e: raise gr.Error("Something went wrong in fetching CivitAI API") def process_url(url, profile, do_download=True, folder="."): if url.startswith("https://civitai.com/models/"): model_id = url.split('/')[4] files = get_files_by_model_id(model_id) elif url.startswith("https://civitai.com/user/"): username = url.split('/')[4] files = get_files_by_username(username) else: raise gr.Error("Unknown CivitAI URL format, please provide model URL or user profile URL") if do_download: downloaded_files = {} for dl_path, download_url in files.items(): try: download_file(download_url, dl_path, folder) downloaded_files[dl_path] = download_url except Exception as e: gr.Warning(f"Failed to download {dl_path}: {str(e)}") return downloaded_files return files def add_mirror(repo_id): response = requests.post("https://civitaiarchive.com/api/mirrors", headers={ "Authorization": f"Bearer {os.environ['CIVITAIARCHIVE_API_KEY']}", "Content-Type": "application/json" }, json={ "type": "huggingface", "url": repo_id }) if response.status_code == 200: gr.Info("Added mirror to CivitaiArchive.com") else: gr.Error("Failed to add mirror to CivitaiArchive.com") def upload_civit_to_hf(profile: Optional[gr.OAuthProfile], oauth_token: gr.OAuthToken, url, destination_repo): if not profile.name: return gr.Error("Are you sure you are logged in?") if not destination_repo: return gr.Error("Please provide a destination repository name") # validate destination repo is alphanumeric if not re.match(r'^[a-zA-Z0-9_-]+$', destination_repo): return gr.Error("Destination repository name must contain only alphanumeric characters, underscores, and hyphens") folder = str(uuid.uuid4()) os.makedirs(folder, exist_ok=False) gr.Info(f"Starting download from {url}") try: files = process_url(url, profile, folder=folder) if not files or len(files.keys()) == 0: return gr.Error("No files were downloaded. Please check the URL and try again.") results = [] user_repo_id = f"{profile.username}/{destination_repo}" # Try to create repo only if it doesn't exist try: create_repo(repo_id=user_repo_id, private=True, exist_ok=False, token=oauth_token.token) gr.Info(f"Created new repository {user_repo_id}") except Exception as e: gr.Info(f"Repository {user_repo_id} already exists, will update it:"+ str(e)) gr.Info(f"Uploading models to {user_repo_id}...") # Upload the model and card upload_folder( folder_path=folder, repo_id=user_repo_id, repo_type="model", token=oauth_token.token ) update_repo_visibility(repo_id=user_repo_id, private=False, token=oauth_token.token) results.append(f"## [{user_repo_id}](https://huggingface.co/{user_repo_id})") if not results: return gr.Error("Failed to upload any models. Please check the logs for details.") add_mirror(user_repo_id) return "# Models uploaded to 🤗!\n" + "\n".join(results) except Exception as e: print(e) raise gr.Error(f"Error during upload process: {str(e)}") finally: # Cleanup if os.path.exists(folder): import shutil shutil.rmtree(folder) css = ''' #login { width: 100% !important; margin: 0 auto; } #disabled_upload{ opacity: 0.5; pointer-events:none; } ''' with gr.Blocks(css=css) as demo: gr.Markdown('''# Upload CivitAI models to HuggingFace You can upload either: - A single model by providing a CivitAI model URL (e.g., https://civitai.com/models/144684) - All models from a user by providing their profile URL (e.g., https://civitai.com/user/username) This will create a new HuggingFace repository under your username if it doesn't exist. Once uploaded, it will add this repository to CivitaiArchive.com as a mirror. ''') gr.LoginButton(elem_id="login") with gr.Column() : submit_source_civit = gr.Textbox( placeholder="https://civitai.com/models/144684 or https://civitai.com/user/username", label="CivitAI URL", info="Enter either a model URL or user profile URL", ) destination_repo = gr.Textbox( placeholder="my-awesome-model", label="HF Repo Name", info="Name for the HuggingFace repository (a new one will be created if it doesn't exist)", ) instructions = gr.HTML("") submit_button_civit = gr.Button("Upload to Hugging Face", interactive=True) output = gr.Markdown(label="Upload Progress") submit_button_civit.click( fn=upload_civit_to_hf, inputs=[submit_source_civit, destination_repo], outputs=[output] ) demo.queue(default_concurrency_limit=50) demo.launch()