Spaces:
Running
Running
File size: 39,712 Bytes
2aeb649 6e50c0f 2aeb649 9071ed9 2aeb649 6e50c0f 2a43fc6 6e50c0f 65351e7 6e50c0f 65351e7 6e50c0f 9669215 6e50c0f 2aeb649 6e50c0f 2aeb649 6e50c0f 2aeb649 6e50c0f 2aeb649 6e50c0f bf3802a 6e50c0f 2aeb649 6e50c0f e460389 6e50c0f e460389 c5b9a3f 6e50c0f 2aeb649 6e50c0f 2aeb649 6e50c0f a2a0ec8 6e50c0f 2aeb649 6e50c0f 2aeb649 6e50c0f a2a0ec8 6e50c0f 2aeb649 e48859b 2aeb649 6e50c0f 2aeb649 6e50c0f 2aeb649 6e50c0f 2aeb649 6e50c0f 2aeb649 6e50c0f 6389ef8 2aeb649 6e50c0f 9071ed9 6e50c0f 1d06c07 6e50c0f 1d06c07 6e50c0f 53b0d0a 4f448b7 6e50c0f 30a8f33 4f448b7 6e50c0f 4f448b7 6e50c0f b9ca2c3 6e50c0f 0b1a6cb c52b2b1 6e50c0f 3f71e88 6531480 4d371f5 6e50c0f 3f71e88 4f448b7 2aeb649 f5a2481 2aeb649 4f448b7 2aeb649 f5a2481 1d06c07 2aeb649 8d92190 53b0d0a 1d06c07 6e50c0f 1d06c07 f9f9336 6e50c0f 1d06c07 6e50c0f b9ca2c3 1d06c07 6e50c0f 2a43fc6 6e50c0f 2a43fc6 6e50c0f 2a43fc6 6e50c0f 2a43fc6 6e50c0f 2a43fc6 6e50c0f 2a43fc6 6e50c0f 2a43fc6 6e50c0f 2a43fc6 6e50c0f c50ed1f 6e50c0f b189c01 6e50c0f 78994f5 6e50c0f 78994f5 0227cff 6e50c0f 0227cff 6e50c0f 0817061 6e50c0f 0817061 7950bc5 6e50c0f d6bfdd6 6e50c0f 9329734 6e50c0f 49bde08 6e50c0f 0817061 6e50c0f 7256938 6e50c0f 2aeb649 6e50c0f 2aeb649 6e50c0f da93b9f cfe8a79 e6d2c96 6e50c0f 2aeb649 6e50c0f 4d344de 6e50c0f 0227cff 6e50c0f 0227cff 6e50c0f 0817061 6e50c0f 7468c99 6e50c0f 7468c99 6e50c0f da93b9f 6e50c0f |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 |
import requests
import os
import gradio as gr
from huggingface_hub import update_repo_visibility, upload_folder, create_repo, upload_file
from slugify import slugify
import re
import uuid
from typing import Optional, Dict, Any, List
import json
import shutil # For cleaning up local folders
import traceback # For debugging
TRUSTED_UPLOADERS = [
"KappaNeuro", "CiroN2022", "multimodalart", "Norod78", "joachimsallstrom",
"blink7630", "e-n-v-y", "DoctorDiffusion", "RalFinger", "artificialguybr"
]
# --- Helper Functions (CivitAI API, Data Extraction, File Handling) ---
def get_json_data(url: str) -> Optional[Dict[str, Any]]:
url_split = url.split('/')
if len(url_split) < 5 or not url_split[4].isdigit(): # Check if model ID is present and numeric
print(f"Error: Invalid CivitAI URL format or missing model ID: {url}")
# Try to extract model ID if it's just the ID
if url.isdigit():
model_id = url
else:
return None
else:
model_id = url_split[4]
api_url = f"https://civitai.com/api/v1/models/{model_id}"
try:
response = requests.get(api_url, timeout=15)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
print(f"Error fetching JSON data from {api_url}: {e}")
return None
def check_nsfw(json_data: Dict[str, Any], profile: Optional[gr.OAuthProfile]) -> bool:
if json_data.get("nsfw", False):
print(f"Model {json_data.get('id', 'Unknown')} flagged as NSFW at model level.")
return False
if profile and profile.username in TRUSTED_UPLOADERS:
print(f"Trusted uploader {profile.username}, bypassing strict image NSFW check for model {json_data.get('id', 'Unknown')}.")
return True
for model_version in json_data.get("modelVersions", []):
for image_media in model_version.get("images", []): # 'images' can contain videos
if image_media.get("nsfwLevel", 0) > 5: # Allow 0-5 (None, Soft, Moderate, Mature, X)
print(f"Model {json_data.get('id', 'Unknown')} version {model_version.get('id')} has media with nsfwLevel > 5.")
return False
return True
def get_prompts_from_image(image_id: int) -> (str, str):
url = f'https://civitai.com/api/trpc/image.getGenerationData?input={{"json":{{"id":{image_id}}}}}'
prompt = ""
negative_prompt = ""
try:
response = requests.get(url, timeout=10)
if response.status_code == 200:
data = response.json()
result = data.get('result', {}).get('data', {}).get('json', {})
if result and result.get('meta') is not None:
prompt = result['meta'].get('prompt', "")
negative_prompt = result['meta'].get('negativePrompt', "")
# else:
# print(f"Prompt fetch for {image_id}: Status {response.status_code}")
except requests.exceptions.RequestException as e:
print(f"Error fetching prompt data for image_id {image_id}: {e}")
return prompt, negative_prompt
def extract_info(json_data: Dict[str, Any]) -> Optional[Dict[str, Any]]:
if json_data.get("type") != "LORA":
return None
model_mapping = {
"SDXL 1.0": "stabilityai/stable-diffusion-xl-base-1.0", "SDXL 0.9": "stabilityai/stable-diffusion-xl-base-1.0",
"SD 1.5": "runwayml/stable-diffusion-v1-5", "SD 1.4": "CompVis/stable-diffusion-v1-4",
"SD 2.1": "stabilityai/stable-diffusion-2-1-base", "SD 2.0": "stabilityai/stable-diffusion-2-base",
"SD 2.1 768": "stabilityai/stable-diffusion-2-1", "SD 2.0 768": "stabilityai/stable-diffusion-2",
"SD 3": "stabilityai/stable-diffusion-3-medium-diffusers",
"SD 3.5": "stabilityai/stable-diffusion-3-medium",
"SD 3.5 Large": "stabilityai/stable-diffusion-3-medium", # Adjusted to medium as large might not be public LoRA base
"SD 3.5 Medium": "stabilityai/stable-diffusion-3-medium",
"SD 3.5 Large Turbo": "stabilityai/stable-diffusion-3-medium-turbo", # Placeholder
"Flux.1 D": "black-forest-labs/FLUX.1-dev", "Flux.1 S": "black-forest-labs/FLUX.1-schnell",
"LTXV": "Lightricks/LTX-Video-0.9.7-dev",
"Hunyuan Video": "hunyuanvideo-community/HunyuanVideo", # Default T2V
"Wan Video 1.3B t2v": "Wan-AI/Wan2.1-T2V-1.3B-Diffusers",
"Wan Video 14B t2v": "Wan-AI/Wan2.1-T2V-14B-Diffusers",
"Wan Video 14B i2v 480p": "Wan-AI/Wan2.1-I2V-14B-480P-Diffusers",
"Wan Video 14B i2v 720p": "Wan-AI/Wan2.1-I2V-14B-720P-Diffusers",
}
for model_version in json_data.get("modelVersions", []):
civic_base_model_name = model_version.get("baseModel")
if civic_base_model_name in model_mapping:
base_model_hf_name = model_mapping[civic_base_model_name]
urls_to_download: List[Dict[str, Any]] = []
primary_file_found = False
for file_data in model_version.get("files", []):
if file_data.get("primary") and file_data.get("type") == "Model":
urls_to_download.append({
"url": file_data["downloadUrl"],
"filename": os.path.basename(file_data["name"]),
"type": "weightName", "is_video": False
})
primary_file_found = True
break
if not primary_file_found: continue
for media_data in model_version.get("images", []): # CivitAI uses 'images' for both images and videos
if media_data.get("nsfwLevel", 0) > 5: continue
media_url_parts = media_data["url"].split("/")
if not media_url_parts: continue
filename_part = media_url_parts[-1]
# Robustly extract ID: try to get it before the first dot or before query params
id_candidate = filename_part.split(".")[0].split("?")[0]
prompt, negative_prompt = "", ""
if media_data.get("hasMeta", False) and media_data.get("type") == "image": # Prompts mainly for images
if id_candidate.isdigit():
try:
prompt, negative_prompt = get_prompts_from_image(int(id_candidate))
except ValueError:
print(f"Warning: Non-integer ID '{id_candidate}' for prompt fetching.")
except Exception as e:
print(f"Warning: Prompt fetch failed for ID {id_candidate}: {e}")
is_video_file = media_data.get("type") == "video"
media_type_key = "videoName" if is_video_file else "imageName"
urls_to_download.append({
"url": media_data["url"], "filename": os.path.basename(filename_part),
"type": media_type_key, "prompt": prompt, "negative_prompt": negative_prompt,
"is_video": is_video_file
})
# Ensure 'allowCommercialUse' is processed correctly
allow_commercial_use = json_data.get("allowCommercialUse", "Sell") # Default
if isinstance(allow_commercial_use, list):
allow_commercial_use = allow_commercial_use[0] if allow_commercial_use else "Sell"
elif not isinstance(allow_commercial_use, str): # If boolean or other, convert to expected string
allow_commercial_use = "Sell" if allow_commercial_use else "None"
info_dict = {
"urls_to_download": urls_to_download, "id": model_version.get("id"),
"baseModel": base_model_hf_name, "modelId": model_version.get("modelId", json_data.get("id")),
"name": json_data.get("name", "Untitled LoRA"),
"description": json_data.get("description", "No description provided."),
"trainedWords": model_version.get("trainedWords", []),
"creator": json_data.get("creator", {}).get("username", "Unknown Creator"),
"tags": json_data.get("tags", []),
"allowNoCredit": json_data.get("allowNoCredit", True),
"allowCommercialUse": allow_commercial_use,
"allowDerivatives": json_data.get("allowDerivatives", True),
"allowDifferentLicense": json_data.get("allowDifferentLicense", True)
}
return info_dict
return None
def download_file_from_url(url: str, filename: str, folder: str = "."):
headers = {}
local_filepath = os.path.join(folder, filename)
try:
# Add a User-Agent to mimic a browser, as some CDNs might block default requests User-Agent
headers['User-Agent'] = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
if "CIVITAI_API_TOKEN" in os.environ and os.environ["CIVITAI_API_TOKEN"]: # Check for token existence and value
headers['Authorization'] = f'Bearer {os.environ["CIVITAI_API_TOKEN"]}'
response = requests.get(url, headers=headers, stream=True, timeout=120) # Increased timeout
response.raise_for_status()
with open(local_filepath, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
# print(f"Successfully downloaded {filename} to {folder}")
except requests.exceptions.HTTPError as e_http:
# If 401/403 and no token was used, it's a clear auth issue.
# If token was used and still 401/403, token might be invalid or insufficient.
if e_http.response.status_code in [401, 403] and not headers.get('Authorization'):
print(f"Authorization error downloading {url}. Consider setting CIVITAI_API_TOKEN for restricted files.")
raise gr.Error(f"HTTP Error downloading {filename}: {e_http.response.status_code} {e_http.response.reason}. URL: {url}")
except requests.exceptions.RequestException as e_req:
raise gr.Error(f"Request Error downloading {filename}: {e_req}. URL: {url}")
def download_files(info: Dict[str, Any], folder: str = ".") -> Dict[str, List[Any]]:
downloaded_media_items: List[Dict[str, Any]] = []
downloaded_weights: List[str] = []
for item in info["urls_to_download"]:
filename_to_save = item["filename"]
# Sanitize filename (though os.path.basename usually handles paths well)
filename_to_save = re.sub(r'[<>:"/\\|?*]', '_', filename_to_save) # Basic sanitization
if not filename_to_save: # Handle case where filename becomes empty
filename_to_save = f"downloaded_file_{uuid.uuid4().hex[:8]}" + os.path.splitext(item["url"])[1]
gr.Info(f"Downloading {filename_to_save}...")
download_file_from_url(item["url"], filename_to_save, folder)
if item["type"] == "weightName":
downloaded_weights.append(filename_to_save)
elif item["type"] in ["imageName", "videoName"]:
prompt_clean = re.sub(r'<.*?>', '', item.get("prompt", ""))
negative_prompt_clean = re.sub(r'<.*?>', '', item.get("negative_prompt", ""))
downloaded_media_items.append({
"filename": filename_to_save, "prompt": prompt_clean,
"negative_prompt": negative_prompt_clean, "is_video": item.get("is_video", False)
})
return {"media_items": downloaded_media_items, "weightName": downloaded_weights}
def process_url(url: str, profile: Optional[gr.OAuthProfile], do_download: bool = True, folder: str = ".") -> (Optional[Dict[str, Any]], Optional[Dict[str, List[Any]]]):
json_data = get_json_data(url)
if json_data:
if check_nsfw(json_data, profile):
info = extract_info(json_data)
if info:
downloaded_files_dict = None
if do_download:
downloaded_files_dict = download_files(info, folder)
return info, downloaded_files_dict
else:
model_type = json_data.get("type", "Unknown type")
base_models_in_json = [mv.get("baseModel", "Unknown base") for mv in json_data.get("modelVersions", [])]
error_message = f"This LoRA is not supported. Details:\n"
error_message += f"- Model Type: {model_type} (expected LORA)\n"
if base_models_in_json:
error_message += f"- Detected Base Models in CivitAI: {', '.join(list(set(base_models_in_json)))}\n"
error_message += "Ensure it's a LORA for a supported base (SD, SDXL, Pony, Flux, LTXV, Hunyuan, Wan) and has primary files."
raise gr.Error(error_message)
else:
raise gr.Error("This model is flagged as NSFW by CivitAI or its media exceeds the allowed NSFW level (max level 5).")
else:
raise gr.Error("Could not fetch CivitAI API data. Check URL or model ID. Example: https://civitai.com/models/12345 or just 12345")
# --- README Creation ---
def create_readme(info: Dict[str, Any], downloaded_files: Dict[str, List[Any]], user_repo_id: str, link_civit: bool = False, is_author: bool = True, folder: str = "."):
original_url = f"https://civitai.com/models/{info['modelId']}"
link_civit_disclaimer = f'([CivitAI]({original_url}))'
non_author_disclaimer = f'This model was originally uploaded on [CivitAI]({original_url}), by [{info["creator"]}](https://civitai.com/user/{info["creator"]}/models). The information below was provided by the author on CivitAI:'
is_video_model = False
video_base_models_hf = [
"Lightricks/LTX-Video-0.9.7-dev", "hunyuanvideo-community/HunyuanVideo",
"hunyuanvideo-community/HunyuanVideo-I2V", "Wan-AI/Wan2.1-T2V-1.3B-Diffusers",
"Wan-AI/Wan2.1-T2V-14B-Diffusers", "Wan-AI/Wan2.1-I2V-14B-480P-Diffusers",
"Wan-AI/Wan2.1-I2V-14B-720P-Diffusers"
]
if info["baseModel"] in video_base_models_hf: is_video_model = True
is_i2v_model = "i2v" in info["baseModel"].lower()
default_tags = ["lora", "diffusers", "migrated"]
if is_video_model:
default_tags.append("video")
default_tags.append("image-to-video" if is_i2v_model else "text-to-video")
default_tags.append("template:video-lora") # Added a template tag for video
else:
default_tags.extend(["text-to-image", "stable-diffusion", "template:sd-lora"])
civit_tags = [t.replace(":", "").strip() for t in info.get("tags", []) if t.replace(":", "").strip() and t.replace(":", "").strip() not in default_tags]
tags = default_tags + civit_tags
unpacked_tags = "\n- ".join(sorted(list(set(tags))))
trained_words = [word for word in info.get('trainedWords', []) if word]
formatted_words = ', '.join(f'`{word}`' for word in trained_words)
trigger_words_section = f"## Trigger words\nYou should use {formatted_words} to trigger the generation." if formatted_words else ""
widget_content = ""
media_items_for_widget = downloaded_files.get("media_items", [])
if not media_items_for_widget:
widget_content = "# No example media available for widget.\n"
else:
for media_item in media_items_for_widget[:5]: # Limit to 5 examples for widget
prompt = media_item["prompt"]
negative_prompt = media_item["negative_prompt"]
filename = media_item["filename"]
escaped_prompt = prompt.replace("'", "''").replace("\n", " ") # Escape and remove newlines
negative_prompt_content = f"""parameters:
negative_prompt: '{negative_prompt.replace("'", "''").replace("\n", " ")}'""" if negative_prompt else ""
widget_content += f"""- text: '{escaped_prompt if escaped_prompt else ' ' }'
{negative_prompt_content}
output:
url: >-
{filename}
"""
flux_models_bf16 = ["black-forest-labs/FLUX.1-dev", "black-forest-labs/FLUX.1-schnell"]
dtype = "torch.bfloat16" if info["baseModel"] in flux_models_bf16 else "torch.float16"
pipeline_import = "AutoPipelineForText2Image"
pipeline_call_example = f"image = pipeline('{formatted_words if formatted_words else 'Your custom prompt'}').images[0]"
example_prompt_for_pipeline = formatted_words if formatted_words else 'Your custom prompt'
if media_items_for_widget and media_items_for_widget[0]["prompt"]:
example_prompt_for_pipeline = media_items_for_widget[0]["prompt"]
pipeline_call_example = f"image = pipeline('{example_prompt_for_pipeline.replace ciclo '','' ')').images[0]"
if is_video_model:
pipeline_import = "DiffusionPipeline"
video_prompt_example = example_prompt_for_pipeline
pipeline_call_example = f"# Example prompt for video generation\nprompt = \"{video_prompt_example.replace ciclico '','' ')}\"\n"
pipeline_call_example += "# Adjust parameters like num_frames, num_inference_steps, height, width as needed for the specific pipeline.\n"
pipeline_call_example += "# video_frames = pipeline(prompt, num_frames=16, guidance_scale=7.5, num_inference_steps=25).frames # Example parameters"
if "LTX-Video" in info["baseModel"]:
pipeline_call_example += "\n# LTX-Video uses a specific setup. Check its model card on Hugging Face."
elif "HunyuanVideo" in info["baseModel"]:
pipeline_call_example += "\n# HunyuanVideo often uses custom pipeline scripts or specific classes (e.g., HunyuanDiTPipeline). Check its HF model card."
elif "Wan-AI" in info["baseModel"]:
pipeline_call_example += "\n# Wan-AI models (e.g., WanVideoTextToVideoPipeline) require specific pipeline classes. Check model card for usage."
weight_name = (downloaded_files["weightName"][0] if downloaded_files.get("weightName")
else "your_lora_weights.safetensors")
diffusers_code_block = f"""```py
from diffusers import {pipeline_import}
import torch
device = "cuda" if torch.cuda.is_available() else "cpu"
# Note: The pipeline class '{pipeline_import}' is a general suggestion.
# For specific video models (LTX, Hunyuan, Wan), you will likely need a dedicated pipeline class
# (e.g., TextToVideoSDPipeline, HunyuanDiTPipeline, WanVideoTextToVideoPipeline, etc.).
# Please refer to the documentation of the base model '{info["baseModel"]}' on Hugging Face for precise usage.
pipeline = {pipeline_import}.from_pretrained('{info["baseModel"]}', torch_dtype={dtype})
pipeline.to(device)
# Load LoRA weights
pipeline.load_lora_weights('{user_repo_id}', weight_name='{weight_name}')
# For some pipelines, you might need to fuse LoRA layers:
# pipeline.fuse_lora() # or pipeline.unfuse_lora()
# Example generation call (adjust parameters as needed for the specific pipeline)
{pipeline_call_example}
```"""
commercial_use_val = info["allowCommercialUse"] # Already processed in extract_info
content = f"""---
license: other
license_name: bespoke-lora-trained-license
license_link: https://multimodal.art/civitai-licenses?allowNoCredit={info["allowNoCredit"]}&allowCommercialUse={commercial_use_val}&allowDerivatives={info["allowDerivatives"]}&allowDifferentLicense={info["allowDifferentLicense"]}
tags:
- {unpacked_tags}
base_model: {info["baseModel"]}
instance_prompt: {trained_words[0] if trained_words else ''}
widget:
{widget_content}
---
# {info["name"]}
<Gallery />
{non_author_disclaimer if not is_author else ''}
{link_civit_disclaimer if link_civit else ''}
## Model description
{info["description"]}
{trigger_words_section}
## Download model
Weights for this model are available in Safetensors format.
[Download](/{user_repo_id}/tree/main/{weight_name}) the LoRA in the Files & versions tab.
## Use it with the [🧨 diffusers library](https://github.com/huggingface/diffusers)
{diffusers_code_block}
For more details, including weighting, merging and fusing LoRAs, check the [documentation on loading LoRAs in diffusers](https://huggingface.co/docs/diffusers/main/en/using-diffusers/loading_adapters).
"""
readme_path = os.path.join(folder, "README.md")
with open(readme_path, "w", encoding="utf-8") as file:
file.write(content)
# print(f"README.md created at {readme_path}")
# --- Hugging Face Profile / Authorship ---
def get_creator(username: str) -> Dict:
if "COOKIE_INFO" not in os.environ or not os.environ["COOKIE_INFO"]:
print("Warning: COOKIE_INFO env var not set. Cannot fetch CivitAI creator's HF username.")
return {"result": {"data": {"json": {"links": []}}}}
url = f"https://civitai.com/api/trpc/user.getCreator?input=%7B%22json%22%3A%7B%22username%22%3A%22{username}%22%2C%22authed%22%3Atrue%7D%7D"
headers = {
"authority": "civitai.com", "accept": "*/*", "accept-language": "en-US,en;q=0.9",
"content-type": "application/json", "cookie": os.environ["COOKIE_INFO"],
"referer": f"https://civitai.com/user/{username}/models",
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/100.0.0.0 Safari/537.36"
}
try:
response = requests.get(url, headers=headers, timeout=10)
response.raise_for_status()
return response.json()
except requests.RequestException as e:
print(f"Error fetching CivitAI creator data for {username}: {e}")
return {"result": {"data": {"json": {"links": []}}}}
def extract_huggingface_username(civitai_username: str) -> Optional[str]:
data = get_creator(civitai_username)
try:
links = data.get('result', {}).get('data', {}).get('json', {}).get('links', [])
if not isinstance(links, list): return None
for link in links:
if not isinstance(link, dict): continue
url = link.get('url', '')
if isinstance(url, str) and \
(url.startswith('https://huggingface.co/') or url.startswith('https://www.huggingface.co/')):
hf_username = url.split('/')[-1].split('?')[0].split('#')[0]
if hf_username: return hf_username
except Exception as e:
print(f"Error parsing CivitAI creator data for HF username: {e}")
return None
# --- Gradio UI Logic Functions ---
def check_civit_link(profile_state: Optional[gr.OAuthProfile], url_input: str):
url_input = url_input.strip()
if not url_input:
return "", gr.update(interactive=False, visible=False), gr.update(visible=False), gr.update(visible=False)
if not profile_state:
return "Please log in with Hugging Face first.", gr.update(interactive=False, visible=False), gr.update(visible=False), gr.update(visible=False)
try:
info, _ = process_url(url_input, profile_state, do_download=False)
if not info:
return "Could not process this CivitAI URL. Model might be unsupported.", gr.update(interactive=False), gr.update(visible=True), gr.update(visible=False)
except gr.Error as e:
return str(e), gr.update(interactive=False), gr.update(visible=True), gr.update(visible=False)
except Exception as e:
print(f"Unexpected error in check_civit_link: {e}\n{traceback.format_exc()}")
return f"An unexpected error occurred: {str(e)}", gr.update(interactive=False), gr.update(visible=True), gr.update(visible=False)
civitai_creator_username = info['creator']
hf_username_on_civitai = extract_huggingface_username(civitai_creator_username)
if profile_state.username in TRUSTED_UPLOADERS:
return f'Welcome, trusted uploader {profile_state.username}! You can upload this model by "{civitai_creator_username}".', gr.update(interactive=True, visible=True), gr.update(visible=False), gr.update(visible=True)
if not hf_username_on_civitai:
no_username_text = (
f'If you are "{civitai_creator_username}" on CivitAI, hi! Your CivitAI profile does not seem to have a Hugging Face username linked. '
f'Please visit <a href="https://civitai.com/user/account" target="_blank">your CivitAI account settings</a> and add your 🤗 username ({profile_state.username}). '
f'Example: <br/><img width="60%" src="https://i.imgur.com/hCbo9uL.png" alt="CivitAI profile settings example"/><br/>'
f'(If you are not "{civitai_creator_username}", you cannot submit their model at this time.)'
)
return no_username_text, gr.update(interactive=False, visible=False), gr.update(visible=True), gr.update(visible=False) # Hide upload, show try_again
if profile_state.username.lower() != hf_username_on_civitai.lower():
unmatched_username_text = (
f'The Hugging Face username on "{civitai_creator_username}"\'s CivitAI profile ("{hf_username_on_civitai}") '
f'does not match your logged-in Hugging Face account ("{profile_state.username}"). '
f'Please update it on <a href="https://civitai.com/user/account" target="_blank">CivitAI</a> or log in to Hugging Face as "{hf_username_on_civitai}".<br/>'
f'<img src="https://i.imgur.com/hCbo9uL.png" alt="CivitAI profile settings example"/>'
)
return unmatched_username_text, gr.update(interactive=False, visible=False), gr.update(visible=True), gr.update(visible=False) # Hide upload, show try_again
return f'Authorship verified for "{civitai_creator_username}" (🤗 {profile_state.username}). Ready to upload!', gr.update(interactive=True, visible=True), gr.update(visible=False), gr.update(visible=True) # Show upload, hide try_again
def handle_auth_change(profile: Optional[gr.OAuthProfile]):
# This function is called by demo.load when auth state changes
# It updates the visibility of UI areas and clears inputs.
if profile: # Logged in
return gr.update(visible=False), gr.update(visible=True), "", gr.update(value=""), gr.update(interactive=False, visible=False), gr.update(visible=False)
else: # Logged out
return gr.update(visible=True), gr.update(visible=False), "", gr.update(value=""), gr.update(interactive=False, visible=False), gr.update(visible=False)
def show_output_area():
return gr.update(visible=True)
def list_civit_models(username: str) -> str:
if not username.strip(): return ""
url = f"https://civitai.com/api/v1/models?username={username}&limit=100&sort=Newest"
json_models_list = []
page_count, max_pages = 0, 5 # Limit pages
gr.Info(f"Fetching LoRAs for CivitAI user: {username}...")
while url and page_count < max_pages:
try:
response = requests.get(url, timeout=10)
response.raise_for_status()
data = response.json()
current_items = data.get('items', [])
# Filter for LORAs and ensure they have a name for slugify
json_models_list.extend(item for item in current_items if item.get("type") == "LORA" and item.get("name"))
metadata = data.get('metadata', {})
url = metadata.get('nextPage', None)
page_count += 1
except requests.RequestException as e:
gr.Warning(f"Failed to fetch page {page_count + 1} for {username}: {e}")
break
if not json_models_list:
gr.Info(f"No suitable LoRA models found for {username} or failed to fetch.")
return ""
urls_text = "\n".join(
f'https://civitai.com/models/{model["id"]}/{slugify(model["name"])}'
for model in json_models_list
)
gr.Info(f"Found {len(json_models_list)} LoRA models for {username}.")
return urls_text.strip()
# --- Main Upload Functions ---
def upload_civit_to_hf(profile: Optional[gr.OAuthProfile], oauth_token_obj: gr.OAuthToken, url: str, link_civit_checkbox_val: bool):
if not profile or not profile.username:
raise gr.Error("User profile not available. Please log in.")
if not oauth_token_obj or not oauth_token_obj.token:
raise gr.Error("Hugging Face token not available. Please log in again.")
hf_auth_token = oauth_token_obj.token
folder_uuid = str(uuid.uuid4())
# Create a unique subfolder in a general 'temp_uploads' directory
base_temp_dir = "temp_uploads"
os.makedirs(base_temp_dir, exist_ok=True)
folder_path = os.path.join(base_temp_dir, folder_uuid)
os.makedirs(folder_path, exist_ok=True)
gr.Info(f"Starting processing of model {url}")
try:
info, downloaded_data = process_url(url, profile, do_download=True, folder=folder_path)
if not info or not downloaded_data:
raise gr.Error("Failed to process URL or download files after initial checks.")
slug_name = slugify(info["name"])
user_repo_id = f"{profile.username}/{slug_name}"
is_author = False # Default
hf_username_on_civitai = extract_huggingface_username(info["creator"])
if profile.username in TRUSTED_UPLOADERS or \
(hf_username_on_civitai and profile.username.lower() == hf_username_on_civitai.lower()):
is_author = True # Or at least authorized to upload as/for them
create_readme(info, downloaded_data, user_repo_id, link_civit_checkbox_val, is_author=is_author, folder=folder_path)
repo_url_huggingface = f"https://huggingface.co/{user_repo_id}"
gr.Info(f"Creating/updating repository {user_repo_id} on Hugging Face...")
create_repo(repo_id=user_repo_id, private=True, exist_ok=True, token=hf_auth_token)
gr.Info(f"Starting upload to {repo_url_huggingface}...")
upload_folder(
folder_path=folder_path, repo_id=user_repo_id, repo_type="model",
token=hf_auth_token, commit_message=f"Upload LoRA: {info['name']} from CivitAI ID {info['modelId']}"
)
update_repo_visibility(repo_id=user_repo_id, private=False, token=hf_auth_token)
gr.Info(f"Model uploaded successfully!")
return f'''# Model uploaded to 🤗!
## Access it here [{user_repo_id}]({repo_url_huggingface}) '''
except Exception as e:
print(f"Error during Hugging Face repo operations for {url}: {e}\n{traceback.format_exc()}")
raise gr.Error(f"Upload failed for {url}: {str(e)}. Token might be expired. Try re-logging or check server logs.")
finally:
# Cleanup local folder
try:
if os.path.exists(folder_path):
shutil.rmtree(folder_path)
# print(f"Cleaned up temporary folder: {folder_path}")
except Exception as e_clean:
print(f"Error cleaning up folder {folder_path}: {e_clean}")
def bulk_upload(profile: Optional[gr.OAuthProfile], oauth_token_obj: gr.OAuthToken, urls_text: str, link_civit_checkbox_val: bool):
if not profile or not oauth_token_obj or not oauth_token_obj.token:
raise gr.Error("Authentication missing for bulk upload. Please log in.")
urls = [url.strip() for url in urls_text.splitlines() if url.strip()]
if not urls:
return "No URLs provided for bulk upload."
upload_results = []
total_urls = len(urls)
gr.Info(f"Starting bulk upload for {total_urls} models.")
for i, url in enumerate(urls):
gr.Info(f"Processing model {i+1}/{total_urls}: {url}")
try:
# Each call to upload_civit_to_hf will handle its own folder creation/cleanup
result_message = upload_civit_to_hf(profile, oauth_token_obj, url, link_civit_checkbox_val)
upload_results.append(result_message)
gr.Info(f"Successfully processed {url}")
except gr.Error as ge:
gr.Warning(f"Skipping model {url} due to error: {str(ge)}")
upload_results.append(f"Failed to upload {url}: {str(ge)}")
except Exception as e:
gr.Warning(f"Unhandled error uploading model {url}: {str(e)}")
upload_results.append(f"Failed to upload {url}: Unhandled exception - {str(e)}")
print(f"Unhandled exception during bulk upload for {url}: {e}\n{traceback.format_exc()}")
return "\n\n---\n\n".join(upload_results) if upload_results else "No URLs were processed or all failed."
# --- Gradio UI Definition ---
css = '''
#login_button_area { margin-bottom: 10px; }
#disabled_upload_area { opacity: 0.6; pointer-events: none; }
.gr-html ul { list-style-type: disc; margin-left: 20px; }
.gr-html ol { list-style-type: decimal; margin-left: 20px; }
.gr-html a { color: #007bff; text-decoration: underline; }
.gr-html img { max-width: 100%; height: auto; margin-top: 5px; margin-bottom: 5px; border: 1px solid #ddd; }
'''
with gr.Blocks(css=css, title="CivitAI to Hugging Face LoRA Uploader") as demo:
# States to hold authentication info globally within the Blocks context
auth_profile_state = gr.State()
# oauth_token_state = gr.State() # Token string will be passed directly from gr.OAuthToken
gr.Markdown('''# Upload your CivitAI LoRA to Hugging Face 🤗
By uploading your LoRAs to Hugging Face you get diffusers compatibility, a free GPU-based Inference Widget, you'll be listed in [LoRA Studio](https://lorastudio.co/models) after a short review, and get the possibility to submit your model to the [LoRA the Explorer](https://huggingface.co/spaces/multimodalart/LoraTheExplorer) ✨
''')
with gr.Row(elem_id="login_button_area"):
login_button = gr.LoginButton() # Default uses HF OAuth
# This column is visible when the user is NOT logged in
with gr.Column(visible=True, elem_id="disabled_upload_area") as disabled_area:
gr.HTML("<h3>Please log in with Hugging Face to enable uploads.</h3>")
gr.Textbox(
placeholder="e.g., https://civitai.com/models/12345/my-lora or just 12345",
label="CivitAI Model URL or ID (Log in to enable)",
interactive=False
)
# This column is visible when the user IS logged in
with gr.Column(visible=False) as enabled_area:
gr.HTML("<h3 style='color:green;'>Logged in! You can now upload models.</h3>")
with gr.Tabs():
with gr.TabItem("Single Model Upload"):
submit_source_civit_enabled = gr.Textbox(
placeholder="e.g., https://civitai.com/models/12345/my-lora or just 12345",
label="CivitAI Model URL or ID",
info="Enter the full URL or just the numeric ID of the CivitAI LoRA model page.",
)
instructions_html = gr.HTML(elem_id="instructions_area")
try_again_button = gr.Button("I've updated my CivitAI profile (Re-check Authorship)", visible=False)
link_civit_checkbox_single = gr.Checkbox(label="Add a link back to CivitAI in the README?", value=True, visible=True)
submit_button_single_model = gr.Button("Upload This Model to Hugging Face", interactive=False, visible=False, variant="primary")
with gr.TabItem("Bulk Upload"):
civit_username_to_bulk = gr.Textbox(
label="Your CivitAI Username (Optional)",
info="Enter your CivitAI username to auto-populate the list below with your LoRAs (up to 50 newest)."
)
submit_bulk_civit_urls = gr.Textbox(
label="CivitAI Model URLs or IDs (One per line)",
info="Paste multiple CivitAI model page URLs or just IDs here, one on each line.",
lines=8,
)
link_civit_checkbox_bulk = gr.Checkbox(label="Add a link back to CivitAI in READMEs?", value=True)
bulk_upload_button = gr.Button("Start Bulk Upload", variant="primary")
output_markdown_area = gr.Markdown(label="Upload Progress & Results", visible=False)
# --- Event Handlers Wiring ---
# Handle login/logout and initial load
# login_button.login() or logout() implicitly triggers demo.load()
# The .load event is triggered when the Gradio app starts or when login/logout happens.
# It receives profile and token from the gr.LoginButton's state.
# Inputs to handle_auth_change must match how gr.LoginButton provides them.
# LoginButton provides profile (OAuthProfile) and token (OAuthToken)
# These are implicitly passed to the function called by demo.load if it's the only .load.
# Using gr.State() for auth_profile_state.
# This demo.load will be triggered by login/logout from gr.LoginButton
# and also on initial page load.
demo.load(
fn=handle_auth_change,
inputs=[auth_profile_state], # Pass the state which will be updated by login
outputs=[disabled_area, enabled_area, instructions_html, submit_source_civit_enabled, submit_button_single_model, try_again_button],
api_name=False, queue=False
).then(
# After login/logout, update the auth_profile_state
# This is a bit of a workaround to get profile into a state for other functions
lambda profile: profile, # Identity function
inputs=[gr.Variable()], # This will receive the profile from LoginButton
outputs=[auth_profile_state],
api_name=False, queue=False
)
# When CivitAI URL changes (in the enabled area)
submit_source_civit_enabled.change(
fn=check_civit_link,
inputs=[auth_profile_state, submit_source_civit_enabled],
outputs=[instructions_html, submit_button_single_model, try_again_button, submit_button_single_model],
api_name=False
)
# When "Try Again" button is clicked
try_again_button.click(
fn=check_civit_link,
inputs=[auth_profile_state, submit_source_civit_enabled],
outputs=[instructions_html, submit_button_single_model, try_again_button, submit_button_single_model],
api_name=False
)
# When CivitAI username for bulk input changes
civit_username_to_bulk.submit( # Use .submit for when user presses Enter or blurs
fn=list_civit_models,
inputs=[civit_username_to_bulk],
outputs=[submit_bulk_civit_urls],
api_name=False
)
# Single model upload button
submit_button_single_model.click(
fn=show_output_area, inputs=[], outputs=[output_markdown_area], api_name=False
).then(
fn=upload_civit_to_hf,
inputs=[auth_profile_state, gr.OAuthToken(scopes=["write_repository","read_repository"]), submit_source_civit_enabled, link_civit_checkbox_single],
outputs=[output_markdown_area],
api_name="upload_single_model"
)
# Bulk model upload button
bulk_upload_button.click(
fn=show_output_area, inputs=[], outputs=[output_markdown_area], api_name=False
).then(
fn=bulk_upload,
inputs=[auth_profile_state, gr.OAuthToken(scopes=["write_repository","read_repository"]), submit_bulk_civit_urls, link_civit_checkbox_bulk],
outputs=[output_markdown_area],
api_name="upload_bulk_models"
)
demo.queue(default_concurrency_limit=3, max_size=10) # Adjusted concurrency
if __name__ == "__main__":
# For local testing, you might need to set COOKIE_INFO and CIVITAI_API_TOKEN
# os.environ["COOKIE_INFO"] = "your_civitai_cookie_string_here"
# os.environ["CIVITAI_API_TOKEN"] = "your_civitai_api_token_here_if_needed"
demo.launch(debug=True, share=os.environ.get("GRADIO_SHARE") == "true") |