multimodalart HF Staff commited on
Commit
0ec77e4
·
verified ·
1 Parent(s): 6e50c0f

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +121 -123
app.py CHANGED
@@ -25,7 +25,12 @@ def get_json_data(url: str) -> Optional[Dict[str, Any]]:
25
  if url.isdigit():
26
  model_id = url
27
  else:
28
- return None
 
 
 
 
 
29
  else:
30
  model_id = url_split[4]
31
 
@@ -83,16 +88,18 @@ def extract_info(json_data: Dict[str, Any]) -> Optional[Dict[str, Any]]:
83
  "SD 2.1 768": "stabilityai/stable-diffusion-2-1", "SD 2.0 768": "stabilityai/stable-diffusion-2",
84
  "SD 3": "stabilityai/stable-diffusion-3-medium-diffusers",
85
  "SD 3.5": "stabilityai/stable-diffusion-3-medium",
86
- "SD 3.5 Large": "stabilityai/stable-diffusion-3-medium", # Adjusted to medium as large might not be public LoRA base
87
  "SD 3.5 Medium": "stabilityai/stable-diffusion-3-medium",
88
- "SD 3.5 Large Turbo": "stabilityai/stable-diffusion-3-medium-turbo", # Placeholder
89
  "Flux.1 D": "black-forest-labs/FLUX.1-dev", "Flux.1 S": "black-forest-labs/FLUX.1-schnell",
90
  "LTXV": "Lightricks/LTX-Video-0.9.7-dev",
91
- "Hunyuan Video": "hunyuanvideo-community/HunyuanVideo", # Default T2V
92
  "Wan Video 1.3B t2v": "Wan-AI/Wan2.1-T2V-1.3B-Diffusers",
93
  "Wan Video 14B t2v": "Wan-AI/Wan2.1-T2V-14B-Diffusers",
94
  "Wan Video 14B i2v 480p": "Wan-AI/Wan2.1-I2V-14B-480P-Diffusers",
95
  "Wan Video 14B i2v 720p": "Wan-AI/Wan2.1-I2V-14B-720P-Diffusers",
 
 
96
  }
97
 
98
  for model_version in json_data.get("modelVersions", []):
@@ -114,18 +121,17 @@ def extract_info(json_data: Dict[str, Any]) -> Optional[Dict[str, Any]]:
114
 
115
  if not primary_file_found: continue
116
 
117
- for media_data in model_version.get("images", []): # CivitAI uses 'images' for both images and videos
118
  if media_data.get("nsfwLevel", 0) > 5: continue
119
 
120
- media_url_parts = media_data["url"].split("/")
121
- if not media_url_parts: continue
122
 
123
  filename_part = media_url_parts[-1]
124
- # Robustly extract ID: try to get it before the first dot or before query params
125
  id_candidate = filename_part.split(".")[0].split("?")[0]
126
 
127
  prompt, negative_prompt = "", ""
128
- if media_data.get("hasMeta", False) and media_data.get("type") == "image": # Prompts mainly for images
129
  if id_candidate.isdigit():
130
  try:
131
  prompt, negative_prompt = get_prompts_from_image(int(id_candidate))
@@ -143,12 +149,15 @@ def extract_info(json_data: Dict[str, Any]) -> Optional[Dict[str, Any]]:
143
  "is_video": is_video_file
144
  })
145
 
146
- # Ensure 'allowCommercialUse' is processed correctly
147
- allow_commercial_use = json_data.get("allowCommercialUse", "Sell") # Default
148
- if isinstance(allow_commercial_use, list):
149
- allow_commercial_use = allow_commercial_use[0] if allow_commercial_use else "Sell"
150
- elif not isinstance(allow_commercial_use, str): # If boolean or other, convert to expected string
151
- allow_commercial_use = "Sell" if allow_commercial_use else "None"
 
 
 
152
 
153
 
154
  info_dict = {
@@ -160,7 +169,7 @@ def extract_info(json_data: Dict[str, Any]) -> Optional[Dict[str, Any]]:
160
  "creator": json_data.get("creator", {}).get("username", "Unknown Creator"),
161
  "tags": json_data.get("tags", []),
162
  "allowNoCredit": json_data.get("allowNoCredit", True),
163
- "allowCommercialUse": allow_commercial_use,
164
  "allowDerivatives": json_data.get("allowDerivatives", True),
165
  "allowDifferentLicense": json_data.get("allowDifferentLicense", True)
166
  }
@@ -171,24 +180,21 @@ def download_file_from_url(url: str, filename: str, folder: str = "."):
171
  headers = {}
172
  local_filepath = os.path.join(folder, filename)
173
  try:
174
- # Add a User-Agent to mimic a browser, as some CDNs might block default requests User-Agent
175
  headers['User-Agent'] = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
176
- if "CIVITAI_API_TOKEN" in os.environ and os.environ["CIVITAI_API_TOKEN"]: # Check for token existence and value
177
- headers['Authorization'] = f'Bearer {os.environ["CIVITAI_API_TOKEN"]}'
 
178
 
179
- response = requests.get(url, headers=headers, stream=True, timeout=120) # Increased timeout
180
  response.raise_for_status()
181
 
182
  with open(local_filepath, 'wb') as f:
183
  for chunk in response.iter_content(chunk_size=8192):
184
  f.write(chunk)
185
- # print(f"Successfully downloaded {filename} to {folder}")
186
 
187
  except requests.exceptions.HTTPError as e_http:
188
- # If 401/403 and no token was used, it's a clear auth issue.
189
- # If token was used and still 401/403, token might be invalid or insufficient.
190
- if e_http.response.status_code in [401, 403] and not headers.get('Authorization'):
191
- print(f"Authorization error downloading {url}. Consider setting CIVITAI_API_TOKEN for restricted files.")
192
  raise gr.Error(f"HTTP Error downloading {filename}: {e_http.response.status_code} {e_http.response.reason}. URL: {url}")
193
  except requests.exceptions.RequestException as e_req:
194
  raise gr.Error(f"Request Error downloading {filename}: {e_req}. URL: {url}")
@@ -199,13 +205,11 @@ def download_files(info: Dict[str, Any], folder: str = ".") -> Dict[str, List[An
199
  downloaded_weights: List[str] = []
200
 
201
  for item in info["urls_to_download"]:
202
- filename_to_save = item["filename"]
203
-
204
- # Sanitize filename (though os.path.basename usually handles paths well)
205
- filename_to_save = re.sub(r'[<>:"/\\|?*]', '_', filename_to_save) # Basic sanitization
206
- if not filename_to_save: # Handle case where filename becomes empty
207
- filename_to_save = f"downloaded_file_{uuid.uuid4().hex[:8]}" + os.path.splitext(item["url"])[1]
208
-
209
 
210
  gr.Info(f"Downloading {filename_to_save}...")
211
  download_file_from_url(item["url"], filename_to_save, folder)
@@ -266,12 +270,16 @@ def create_readme(info: Dict[str, Any], downloaded_files: Dict[str, List[Any]],
266
  if is_video_model:
267
  default_tags.append("video")
268
  default_tags.append("image-to-video" if is_i2v_model else "text-to-video")
269
- default_tags.append("template:video-lora") # Added a template tag for video
270
  else:
271
  default_tags.extend(["text-to-image", "stable-diffusion", "template:sd-lora"])
272
 
273
- civit_tags = [t.replace(":", "").strip() for t in info.get("tags", []) if t.replace(":", "").strip() and t.replace(":", "").strip() not in default_tags]
274
- tags = default_tags + civit_tags
 
 
 
 
275
  unpacked_tags = "\n- ".join(sorted(list(set(tags))))
276
 
277
  trained_words = [word for word in info.get('trainedWords', []) if word]
@@ -283,16 +291,24 @@ def create_readme(info: Dict[str, Any], downloaded_files: Dict[str, List[Any]],
283
  if not media_items_for_widget:
284
  widget_content = "# No example media available for widget.\n"
285
  else:
286
- for media_item in media_items_for_widget[:5]: # Limit to 5 examples for widget
287
- prompt = media_item["prompt"]
288
- negative_prompt = media_item["negative_prompt"]
289
  filename = media_item["filename"]
290
 
291
- escaped_prompt = prompt.replace("'", "''").replace("\n", " ") # Escape and remove newlines
292
- negative_prompt_content = f"""parameters:
293
- negative_prompt: '{negative_prompt.replace("'", "''").replace("\n", " ")}'""" if negative_prompt else ""
 
 
 
 
 
 
 
 
294
  widget_content += f"""- text: '{escaped_prompt if escaped_prompt else ' ' }'
295
- {negative_prompt_content}
296
  output:
297
  url: >-
298
  {filename}
@@ -301,18 +317,18 @@ def create_readme(info: Dict[str, Any], downloaded_files: Dict[str, List[Any]],
301
  dtype = "torch.bfloat16" if info["baseModel"] in flux_models_bf16 else "torch.float16"
302
 
303
  pipeline_import = "AutoPipelineForText2Image"
304
- pipeline_call_example = f"image = pipeline('{formatted_words if formatted_words else 'Your custom prompt'}').images[0]"
305
  example_prompt_for_pipeline = formatted_words if formatted_words else 'Your custom prompt'
306
  if media_items_for_widget and media_items_for_widget[0]["prompt"]:
307
  example_prompt_for_pipeline = media_items_for_widget[0]["prompt"]
308
- pipeline_call_example = f"image = pipeline('{example_prompt_for_pipeline.replace ciclo '','' ')').images[0]"
309
-
 
310
 
311
  if is_video_model:
312
  pipeline_import = "DiffusionPipeline"
313
- video_prompt_example = example_prompt_for_pipeline
314
 
315
- pipeline_call_example = f"# Example prompt for video generation\nprompt = \"{video_prompt_example.replace ciclico '','' ')}\"\n"
316
  pipeline_call_example += "# Adjust parameters like num_frames, num_inference_steps, height, width as needed for the specific pipeline.\n"
317
  pipeline_call_example += "# video_frames = pipeline(prompt, num_frames=16, guidance_scale=7.5, num_inference_steps=25).frames # Example parameters"
318
  if "LTX-Video" in info["baseModel"]:
@@ -341,14 +357,18 @@ pipeline.to(device)
341
  # Load LoRA weights
342
  pipeline.load_lora_weights('{user_repo_id}', weight_name='{weight_name}')
343
 
344
- # For some pipelines, you might need to fuse LoRA layers:
345
- # pipeline.fuse_lora() # or pipeline.unfuse_lora()
 
346
 
347
  # Example generation call (adjust parameters as needed for the specific pipeline)
348
  {pipeline_call_example}
 
 
 
349
  ```"""
350
 
351
- commercial_use_val = info["allowCommercialUse"] # Already processed in extract_info
352
 
353
  content = f"""---
354
  license: other
@@ -386,7 +406,6 @@ For more details, including weighting, merging and fusing LoRAs, check the [docu
386
  readme_path = os.path.join(folder, "README.md")
387
  with open(readme_path, "w", encoding="utf-8") as file:
388
  file.write(content)
389
- # print(f"README.md created at {readme_path}")
390
 
391
 
392
  # --- Hugging Face Profile / Authorship ---
@@ -438,14 +457,15 @@ def check_civit_link(profile_state: Optional[gr.OAuthProfile], url_input: str):
438
 
439
  try:
440
  info, _ = process_url(url_input, profile_state, do_download=False)
441
- if not info:
442
- return "Could not process this CivitAI URL. Model might be unsupported.", gr.update(interactive=False), gr.update(visible=True), gr.update(visible=False)
443
- except gr.Error as e:
444
- return str(e), gr.update(interactive=False), gr.update(visible=True), gr.update(visible=False)
445
- except Exception as e:
446
- print(f"Unexpected error in check_civit_link: {e}\n{traceback.format_exc()}")
447
- return f"An unexpected error occurred: {str(e)}", gr.update(interactive=False), gr.update(visible=True), gr.update(visible=False)
448
-
 
449
  civitai_creator_username = info['creator']
450
  hf_username_on_civitai = extract_huggingface_username(civitai_creator_username)
451
 
@@ -459,7 +479,7 @@ def check_civit_link(profile_state: Optional[gr.OAuthProfile], url_input: str):
459
  f'Example: <br/><img width="60%" src="https://i.imgur.com/hCbo9uL.png" alt="CivitAI profile settings example"/><br/>'
460
  f'(If you are not "{civitai_creator_username}", you cannot submit their model at this time.)'
461
  )
462
- return no_username_text, gr.update(interactive=False, visible=False), gr.update(visible=True), gr.update(visible=False) # Hide upload, show try_again
463
 
464
  if profile_state.username.lower() != hf_username_on_civitai.lower():
465
  unmatched_username_text = (
@@ -468,17 +488,16 @@ def check_civit_link(profile_state: Optional[gr.OAuthProfile], url_input: str):
468
  f'Please update it on <a href="https://civitai.com/user/account" target="_blank">CivitAI</a> or log in to Hugging Face as "{hf_username_on_civitai}".<br/>'
469
  f'<img src="https://i.imgur.com/hCbo9uL.png" alt="CivitAI profile settings example"/>'
470
  )
471
- return unmatched_username_text, gr.update(interactive=False, visible=False), gr.update(visible=True), gr.update(visible=False) # Hide upload, show try_again
472
 
473
- return f'Authorship verified for "{civitai_creator_username}" (🤗 {profile_state.username}). Ready to upload!', gr.update(interactive=True, visible=True), gr.update(visible=False), gr.update(visible=True) # Show upload, hide try_again
474
 
475
- def handle_auth_change(profile: Optional[gr.OAuthProfile]):
476
- # This function is called by demo.load when auth state changes
477
- # It updates the visibility of UI areas and clears inputs.
478
  if profile: # Logged in
479
- return gr.update(visible=False), gr.update(visible=True), "", gr.update(value=""), gr.update(interactive=False, visible=False), gr.update(visible=False)
480
  else: # Logged out
481
- return gr.update(visible=True), gr.update(visible=False), "", gr.update(value=""), gr.update(interactive=False, visible=False), gr.update(visible=False)
482
 
483
  def show_output_area():
484
  return gr.update(visible=True)
@@ -486,19 +505,18 @@ def show_output_area():
486
  def list_civit_models(username: str) -> str:
487
  if not username.strip(): return ""
488
 
489
- url = f"https://civitai.com/api/v1/models?username={username}&limit=100&sort=Newest"
490
  json_models_list = []
491
- page_count, max_pages = 0, 5 # Limit pages
492
 
493
  gr.Info(f"Fetching LoRAs for CivitAI user: {username}...")
494
  while url and page_count < max_pages:
495
  try:
496
- response = requests.get(url, timeout=10)
497
  response.raise_for_status()
498
  data = response.json()
499
 
500
  current_items = data.get('items', [])
501
- # Filter for LORAs and ensure they have a name for slugify
502
  json_models_list.extend(item for item in current_items if item.get("type") == "LORA" and item.get("name"))
503
 
504
  metadata = data.get('metadata', {})
@@ -529,7 +547,6 @@ def upload_civit_to_hf(profile: Optional[gr.OAuthProfile], oauth_token_obj: gr.O
529
  hf_auth_token = oauth_token_obj.token
530
 
531
  folder_uuid = str(uuid.uuid4())
532
- # Create a unique subfolder in a general 'temp_uploads' directory
533
  base_temp_dir = "temp_uploads"
534
  os.makedirs(base_temp_dir, exist_ok=True)
535
  folder_path = os.path.join(base_temp_dir, folder_uuid)
@@ -540,16 +557,20 @@ def upload_civit_to_hf(profile: Optional[gr.OAuthProfile], oauth_token_obj: gr.O
540
  try:
541
  info, downloaded_data = process_url(url, profile, do_download=True, folder=folder_path)
542
  if not info or not downloaded_data:
 
543
  raise gr.Error("Failed to process URL or download files after initial checks.")
544
 
545
  slug_name = slugify(info["name"])
546
  user_repo_id = f"{profile.username}/{slug_name}"
547
 
548
- is_author = False # Default
549
- hf_username_on_civitai = extract_huggingface_username(info["creator"])
 
 
 
550
  if profile.username in TRUSTED_UPLOADERS or \
551
  (hf_username_on_civitai and profile.username.lower() == hf_username_on_civitai.lower()):
552
- is_author = True # Or at least authorized to upload as/for them
553
 
554
  create_readme(info, downloaded_data, user_repo_id, link_civit_checkbox_val, is_author=is_author, folder=folder_path)
555
 
@@ -573,11 +594,9 @@ def upload_civit_to_hf(profile: Optional[gr.OAuthProfile], oauth_token_obj: gr.O
573
  print(f"Error during Hugging Face repo operations for {url}: {e}\n{traceback.format_exc()}")
574
  raise gr.Error(f"Upload failed for {url}: {str(e)}. Token might be expired. Try re-logging or check server logs.")
575
  finally:
576
- # Cleanup local folder
577
  try:
578
  if os.path.exists(folder_path):
579
  shutil.rmtree(folder_path)
580
- # print(f"Cleaned up temporary folder: {folder_path}")
581
  except Exception as e_clean:
582
  print(f"Error cleaning up folder {folder_path}: {e_clean}")
583
 
@@ -597,14 +616,13 @@ def bulk_upload(profile: Optional[gr.OAuthProfile], oauth_token_obj: gr.OAuthTok
597
  for i, url in enumerate(urls):
598
  gr.Info(f"Processing model {i+1}/{total_urls}: {url}")
599
  try:
600
- # Each call to upload_civit_to_hf will handle its own folder creation/cleanup
601
  result_message = upload_civit_to_hf(profile, oauth_token_obj, url, link_civit_checkbox_val)
602
  upload_results.append(result_message)
603
  gr.Info(f"Successfully processed {url}")
604
- except gr.Error as ge:
605
  gr.Warning(f"Skipping model {url} due to error: {str(ge)}")
606
  upload_results.append(f"Failed to upload {url}: {str(ge)}")
607
- except Exception as e:
608
  gr.Warning(f"Unhandled error uploading model {url}: {str(e)}")
609
  upload_results.append(f"Failed to upload {url}: Unhandled exception - {str(e)}")
610
  print(f"Unhandled exception during bulk upload for {url}: {e}\n{traceback.format_exc()}")
@@ -619,21 +637,20 @@ css = '''
619
  .gr-html ol { list-style-type: decimal; margin-left: 20px; }
620
  .gr-html a { color: #007bff; text-decoration: underline; }
621
  .gr-html img { max-width: 100%; height: auto; margin-top: 5px; margin-bottom: 5px; border: 1px solid #ddd; }
 
622
  '''
623
 
624
  with gr.Blocks(css=css, title="CivitAI to Hugging Face LoRA Uploader") as demo:
625
- # States to hold authentication info globally within the Blocks context
626
- auth_profile_state = gr.State()
627
- # oauth_token_state = gr.State() # Token string will be passed directly from gr.OAuthToken
628
 
629
  gr.Markdown('''# Upload your CivitAI LoRA to Hugging Face 🤗
630
  By uploading your LoRAs to Hugging Face you get diffusers compatibility, a free GPU-based Inference Widget, you'll be listed in [LoRA Studio](https://lorastudio.co/models) after a short review, and get the possibility to submit your model to the [LoRA the Explorer](https://huggingface.co/spaces/multimodalart/LoraTheExplorer) ✨
631
  ''')
632
 
633
  with gr.Row(elem_id="login_button_area"):
634
- login_button = gr.LoginButton() # Default uses HF OAuth
 
635
 
636
- # This column is visible when the user is NOT logged in
637
  with gr.Column(visible=True, elem_id="disabled_upload_area") as disabled_area:
638
  gr.HTML("<h3>Please log in with Hugging Face to enable uploads.</h3>")
639
  gr.Textbox(
@@ -642,7 +659,6 @@ By uploading your LoRAs to Hugging Face you get diffusers compatibility, a free
642
  interactive=False
643
  )
644
 
645
- # This column is visible when the user IS logged in
646
  with gr.Column(visible=False) as enabled_area:
647
  gr.HTML("<h3 style='color:green;'>Logged in! You can now upload models.</h3>")
648
 
@@ -653,17 +669,15 @@ By uploading your LoRAs to Hugging Face you get diffusers compatibility, a free
653
  label="CivitAI Model URL or ID",
654
  info="Enter the full URL or just the numeric ID of the CivitAI LoRA model page.",
655
  )
656
- instructions_html = gr.HTML(elem_id="instructions_area")
657
-
658
  try_again_button = gr.Button("I've updated my CivitAI profile (Re-check Authorship)", visible=False)
659
-
660
  link_civit_checkbox_single = gr.Checkbox(label="Add a link back to CivitAI in the README?", value=True, visible=True)
661
  submit_button_single_model = gr.Button("Upload This Model to Hugging Face", interactive=False, visible=False, variant="primary")
662
 
663
  with gr.TabItem("Bulk Upload"):
664
  civit_username_to_bulk = gr.Textbox(
665
  label="Your CivitAI Username (Optional)",
666
- info="Enter your CivitAI username to auto-populate the list below with your LoRAs (up to 50 newest)."
667
  )
668
  submit_bulk_civit_urls = gr.Textbox(
669
  label="CivitAI Model URLs or IDs (One per line)",
@@ -676,41 +690,23 @@ By uploading your LoRAs to Hugging Face you get diffusers compatibility, a free
676
  output_markdown_area = gr.Markdown(label="Upload Progress & Results", visible=False)
677
 
678
  # --- Event Handlers Wiring ---
679
-
680
- # Handle login/logout and initial load
681
- # login_button.login() or logout() implicitly triggers demo.load()
682
- # The .load event is triggered when the Gradio app starts or when login/logout happens.
683
- # It receives profile and token from the gr.LoginButton's state.
684
- # Inputs to handle_auth_change must match how gr.LoginButton provides them.
685
- # LoginButton provides profile (OAuthProfile) and token (OAuthToken)
686
- # These are implicitly passed to the function called by demo.load if it's the only .load.
687
- # Using gr.State() for auth_profile_state.
688
-
689
- # This demo.load will be triggered by login/logout from gr.LoginButton
690
- # and also on initial page load.
691
  demo.load(
692
- fn=handle_auth_change,
693
- inputs=[auth_profile_state], # Pass the state which will be updated by login
694
- outputs=[disabled_area, enabled_area, instructions_html, submit_source_civit_enabled, submit_button_single_model, try_again_button],
695
- api_name=False, queue=False
696
- ).then(
697
- # After login/logout, update the auth_profile_state
698
- # This is a bit of a workaround to get profile into a state for other functions
699
- lambda profile: profile, # Identity function
700
- inputs=[gr.Variable()], # This will receive the profile from LoginButton
701
- outputs=[auth_profile_state],
702
  api_name=False, queue=False
703
  )
704
 
705
- # When CivitAI URL changes (in the enabled area)
706
  submit_source_civit_enabled.change(
707
  fn=check_civit_link,
708
  inputs=[auth_profile_state, submit_source_civit_enabled],
709
- outputs=[instructions_html, submit_button_single_model, try_again_button, submit_button_single_model],
710
  api_name=False
711
  )
712
 
713
- # When "Try Again" button is clicked
714
  try_again_button.click(
715
  fn=check_civit_link,
716
  inputs=[auth_profile_state, submit_source_civit_enabled],
@@ -718,15 +714,13 @@ By uploading your LoRAs to Hugging Face you get diffusers compatibility, a free
718
  api_name=False
719
  )
720
 
721
- # When CivitAI username for bulk input changes
722
- civit_username_to_bulk.submit( # Use .submit for when user presses Enter or blurs
723
  fn=list_civit_models,
724
  inputs=[civit_username_to_bulk],
725
  outputs=[submit_bulk_civit_urls],
726
  api_name=False
727
  )
728
 
729
- # Single model upload button
730
  submit_button_single_model.click(
731
  fn=show_output_area, inputs=[], outputs=[output_markdown_area], api_name=False
732
  ).then(
@@ -736,7 +730,6 @@ By uploading your LoRAs to Hugging Face you get diffusers compatibility, a free
736
  api_name="upload_single_model"
737
  )
738
 
739
- # Bulk model upload button
740
  bulk_upload_button.click(
741
  fn=show_output_area, inputs=[], outputs=[output_markdown_area], api_name=False
742
  ).then(
@@ -746,9 +739,14 @@ By uploading your LoRAs to Hugging Face you get diffusers compatibility, a free
746
  api_name="upload_bulk_models"
747
  )
748
 
749
- demo.queue(default_concurrency_limit=3, max_size=10) # Adjusted concurrency
750
  if __name__ == "__main__":
751
- # For local testing, you might need to set COOKIE_INFO and CIVITAI_API_TOKEN
752
- # os.environ["COOKIE_INFO"] = "your_civitai_cookie_string_here"
753
- # os.environ["CIVITAI_API_TOKEN"] = "your_civitai_api_token_here_if_needed"
 
 
 
 
 
754
  demo.launch(debug=True, share=os.environ.get("GRADIO_SHARE") == "true")
 
25
  if url.isdigit():
26
  model_id = url
27
  else:
28
+ # Check if it's a slugified URL without /models/ part
29
+ match = re.search(r'(\d+)(?:/[^/]+)?$', url)
30
+ if match:
31
+ model_id = match.group(1)
32
+ else:
33
+ return None
34
  else:
35
  model_id = url_split[4]
36
 
 
88
  "SD 2.1 768": "stabilityai/stable-diffusion-2-1", "SD 2.0 768": "stabilityai/stable-diffusion-2",
89
  "SD 3": "stabilityai/stable-diffusion-3-medium-diffusers",
90
  "SD 3.5": "stabilityai/stable-diffusion-3-medium",
91
+ "SD 3.5 Large": "stabilityai/stable-diffusion-3-medium",
92
  "SD 3.5 Medium": "stabilityai/stable-diffusion-3-medium",
93
+ "SD 3.5 Large Turbo": "stabilityai/stable-diffusion-3-medium-turbo",
94
  "Flux.1 D": "black-forest-labs/FLUX.1-dev", "Flux.1 S": "black-forest-labs/FLUX.1-schnell",
95
  "LTXV": "Lightricks/LTX-Video-0.9.7-dev",
96
+ "Hunyuan Video": "hunyuanvideo-community/HunyuanVideo",
97
  "Wan Video 1.3B t2v": "Wan-AI/Wan2.1-T2V-1.3B-Diffusers",
98
  "Wan Video 14B t2v": "Wan-AI/Wan2.1-T2V-14B-Diffusers",
99
  "Wan Video 14B i2v 480p": "Wan-AI/Wan2.1-I2V-14B-480P-Diffusers",
100
  "Wan Video 14B i2v 720p": "Wan-AI/Wan2.1-I2V-14B-720P-Diffusers",
101
+ "Pony": "SG161222/RealVisXL_V4.0",
102
+ "Illustrious": "artificialguybr/LogoRedmond", # Example, could be "stabilityai/stable-diffusion-xl-base-1.0"
103
  }
104
 
105
  for model_version in json_data.get("modelVersions", []):
 
121
 
122
  if not primary_file_found: continue
123
 
124
+ for media_data in model_version.get("images", []):
125
  if media_data.get("nsfwLevel", 0) > 5: continue
126
 
127
+ media_url_parts = media_data.get("url","").split("/") # Add default "" for url
128
+ if not media_url_parts or not media_url_parts[-1]: continue # Ensure URL and filename part exist
129
 
130
  filename_part = media_url_parts[-1]
 
131
  id_candidate = filename_part.split(".")[0].split("?")[0]
132
 
133
  prompt, negative_prompt = "", ""
134
+ if media_data.get("hasMeta", False) and media_data.get("type") == "image":
135
  if id_candidate.isdigit():
136
  try:
137
  prompt, negative_prompt = get_prompts_from_image(int(id_candidate))
 
149
  "is_video": is_video_file
150
  })
151
 
152
+ allow_commercial_use_raw = json_data.get("allowCommercialUse", "Sell")
153
+ if isinstance(allow_commercial_use_raw, list):
154
+ allow_commercial_use_processed = allow_commercial_use_raw[0] if allow_commercial_use_raw else "Sell"
155
+ elif isinstance(allow_commercial_use_raw, bool):
156
+ allow_commercial_use_processed = "Sell" if allow_commercial_use_raw else "None"
157
+ elif isinstance(allow_commercial_use_raw, str):
158
+ allow_commercial_use_processed = allow_commercial_use_raw
159
+ else: # Fallback for unexpected types
160
+ allow_commercial_use_processed = "Sell"
161
 
162
 
163
  info_dict = {
 
169
  "creator": json_data.get("creator", {}).get("username", "Unknown Creator"),
170
  "tags": json_data.get("tags", []),
171
  "allowNoCredit": json_data.get("allowNoCredit", True),
172
+ "allowCommercialUse": allow_commercial_use_processed,
173
  "allowDerivatives": json_data.get("allowDerivatives", True),
174
  "allowDifferentLicense": json_data.get("allowDifferentLicense", True)
175
  }
 
180
  headers = {}
181
  local_filepath = os.path.join(folder, filename)
182
  try:
 
183
  headers['User-Agent'] = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
184
+ civitai_token = os.environ.get("CIVITAI_API_TOKEN")
185
+ if civitai_token:
186
+ headers['Authorization'] = f'Bearer {civitai_token}'
187
 
188
+ response = requests.get(url, headers=headers, stream=True, timeout=120)
189
  response.raise_for_status()
190
 
191
  with open(local_filepath, 'wb') as f:
192
  for chunk in response.iter_content(chunk_size=8192):
193
  f.write(chunk)
 
194
 
195
  except requests.exceptions.HTTPError as e_http:
196
+ if e_http.response.status_code in [401, 403] and not headers.get('Authorization') and not civitai_token:
197
+ print(f"Authorization error (401/403) downloading {url}. Consider setting CIVITAI_API_TOKEN for restricted files.")
 
 
198
  raise gr.Error(f"HTTP Error downloading {filename}: {e_http.response.status_code} {e_http.response.reason}. URL: {url}")
199
  except requests.exceptions.RequestException as e_req:
200
  raise gr.Error(f"Request Error downloading {filename}: {e_req}. URL: {url}")
 
205
  downloaded_weights: List[str] = []
206
 
207
  for item in info["urls_to_download"]:
208
+ filename_to_save_raw = item["filename"]
209
+ filename_to_save = re.sub(r'[<>:"/\\|?*]', '_', filename_to_save_raw)
210
+ if not filename_to_save:
211
+ base, ext = os.path.splitext(item["url"])
212
+ filename_to_save = f"downloaded_file_{uuid.uuid4().hex[:8]}{ext if ext else '.bin'}"
 
 
213
 
214
  gr.Info(f"Downloading {filename_to_save}...")
215
  download_file_from_url(item["url"], filename_to_save, folder)
 
270
  if is_video_model:
271
  default_tags.append("video")
272
  default_tags.append("image-to-video" if is_i2v_model else "text-to-video")
273
+ default_tags.append("template:video-lora")
274
  else:
275
  default_tags.extend(["text-to-image", "stable-diffusion", "template:sd-lora"])
276
 
277
+ civit_tags_raw = info.get("tags", [])
278
+ civit_tags_processed = []
279
+ if isinstance(civit_tags_raw, list):
280
+ civit_tags_processed = [str(t).replace(":", "").strip() for t in civit_tags_raw if str(t).replace(":", "").strip() and str(t).replace(":", "").strip() not in default_tags]
281
+
282
+ tags = default_tags + civit_tags_processed
283
  unpacked_tags = "\n- ".join(sorted(list(set(tags))))
284
 
285
  trained_words = [word for word in info.get('trainedWords', []) if word]
 
291
  if not media_items_for_widget:
292
  widget_content = "# No example media available for widget.\n"
293
  else:
294
+ for media_item in media_items_for_widget[:5]:
295
+ prompt_text = media_item["prompt"]
296
+ negative_prompt_text = media_item["negative_prompt"]
297
  filename = media_item["filename"]
298
 
299
+ escaped_prompt = prompt_text.replace("'", "''").replace("\n", " ")
300
+
301
+ negative_prompt_cleaned_and_escaped = ""
302
+ if negative_prompt_text:
303
+ negative_prompt_cleaned_and_escaped = negative_prompt_text.replace("'", "''").replace("\n", " ") # Correct
304
+
305
+ negative_prompt_widget_entry = ""
306
+ if negative_prompt_cleaned_and_escaped: # Only add if non-empty
307
+ negative_prompt_widget_entry = f"""parameters:
308
+ negative_prompt: '{negative_prompt_cleaned_and_escaped}'"""
309
+
310
  widget_content += f"""- text: '{escaped_prompt if escaped_prompt else ' ' }'
311
+ {negative_prompt_widget_entry}
312
  output:
313
  url: >-
314
  {filename}
 
317
  dtype = "torch.bfloat16" if info["baseModel"] in flux_models_bf16 else "torch.float16"
318
 
319
  pipeline_import = "AutoPipelineForText2Image"
 
320
  example_prompt_for_pipeline = formatted_words if formatted_words else 'Your custom prompt'
321
  if media_items_for_widget and media_items_for_widget[0]["prompt"]:
322
  example_prompt_for_pipeline = media_items_for_widget[0]["prompt"]
323
+
324
+ cleaned_example_pipeline_prompt = example_prompt_for_pipeline.replace("'", "\\'").replace("\n", " ")
325
+ pipeline_call_example = f"image = pipeline('{cleaned_example_pipeline_prompt}').images[0]"
326
 
327
  if is_video_model:
328
  pipeline_import = "DiffusionPipeline"
329
+ video_prompt_example = cleaned_example_pipeline_prompt
330
 
331
+ pipeline_call_example = f"# Example prompt for video generation\nprompt = \"{video_prompt_example}\"\n"
332
  pipeline_call_example += "# Adjust parameters like num_frames, num_inference_steps, height, width as needed for the specific pipeline.\n"
333
  pipeline_call_example += "# video_frames = pipeline(prompt, num_frames=16, guidance_scale=7.5, num_inference_steps=25).frames # Example parameters"
334
  if "LTX-Video" in info["baseModel"]:
 
357
  # Load LoRA weights
358
  pipeline.load_lora_weights('{user_repo_id}', weight_name='{weight_name}')
359
 
360
+ # For some pipelines, you might need to fuse LoRA layers before inference
361
+ # and unfuse them after, or apply scaling. Check model card.
362
+ # Example: pipeline.fuse_lora() or pipeline.set_adapters(["default"], adapter_weights=[0.8])
363
 
364
  # Example generation call (adjust parameters as needed for the specific pipeline)
365
  {pipeline_call_example}
366
+
367
+ # If using fused LoRA:
368
+ # pipeline.unfuse_lora()
369
  ```"""
370
 
371
+ commercial_use_val = info["allowCommercialUse"]
372
 
373
  content = f"""---
374
  license: other
 
406
  readme_path = os.path.join(folder, "README.md")
407
  with open(readme_path, "w", encoding="utf-8") as file:
408
  file.write(content)
 
409
 
410
 
411
  # --- Hugging Face Profile / Authorship ---
 
457
 
458
  try:
459
  info, _ = process_url(url_input, profile_state, do_download=False)
460
+ if not info: # Should be caught by process_url, but as a safeguard
461
+ return "Could not process this CivitAI URL. Model might be unsupported or invalid.", gr.update(interactive=False, visible=True), gr.update(visible=False), gr.update(visible=False)
462
+ except gr.Error as e: # Catch errors from process_url (like NSFW, unsupported, API fetch failed)
463
+ return str(e), gr.update(interactive=False, visible=True), gr.update(visible=False), gr.update(visible=False)
464
+ except Exception as e: # Catch any other unexpected error during processing check
465
+ print(f"Unexpected error in check_civit_link during process_url: {e}\n{traceback.format_exc()}")
466
+ return f"An unexpected error occurred: {str(e)}", gr.update(interactive=False, visible=True), gr.update(visible=False), gr.update(visible=False)
467
+
468
+ # If model is processable, then check authorship
469
  civitai_creator_username = info['creator']
470
  hf_username_on_civitai = extract_huggingface_username(civitai_creator_username)
471
 
 
479
  f'Example: <br/><img width="60%" src="https://i.imgur.com/hCbo9uL.png" alt="CivitAI profile settings example"/><br/>'
480
  f'(If you are not "{civitai_creator_username}", you cannot submit their model at this time.)'
481
  )
482
+ return no_username_text, gr.update(interactive=False, visible=False), gr.update(visible=True), gr.update(visible=False)
483
 
484
  if profile_state.username.lower() != hf_username_on_civitai.lower():
485
  unmatched_username_text = (
 
488
  f'Please update it on <a href="https://civitai.com/user/account" target="_blank">CivitAI</a> or log in to Hugging Face as "{hf_username_on_civitai}".<br/>'
489
  f'<img src="https://i.imgur.com/hCbo9uL.png" alt="CivitAI profile settings example"/>'
490
  )
491
+ return unmatched_username_text, gr.update(interactive=False, visible=False), gr.update(visible=True), gr.update(visible=False)
492
 
493
+ return f'Authorship verified for "{civitai_creator_username}" (🤗 {profile_state.username}). Ready to upload!', gr.update(interactive=True, visible=True), gr.update(visible=False), gr.update(visible=True)
494
 
495
+ def handle_auth_change_and_update_state(profile: Optional[gr.OAuthProfile]):
496
+ # This function now returns the profile to update the state
 
497
  if profile: # Logged in
498
+ return profile, gr.update(visible=False), gr.update(visible=True), "", gr.update(value=""), gr.update(interactive=False, visible=False), gr.update(visible=False)
499
  else: # Logged out
500
+ return None, gr.update(visible=True), gr.update(visible=False), "", gr.update(value=""), gr.update(interactive=False, visible=False), gr.update(visible=False)
501
 
502
  def show_output_area():
503
  return gr.update(visible=True)
 
505
  def list_civit_models(username: str) -> str:
506
  if not username.strip(): return ""
507
 
508
+ url = f"https://civitai.com/api/v1/models?username={username}&limit=100&sort=Newest" # Max limit is 100 per page on CivitAI
509
  json_models_list = []
510
+ page_count, max_pages = 0, 1 # Limit to 1 page (100 models) for now to be quicker, can be increased
511
 
512
  gr.Info(f"Fetching LoRAs for CivitAI user: {username}...")
513
  while url and page_count < max_pages:
514
  try:
515
+ response = requests.get(url, timeout=15) # Increased timeout
516
  response.raise_for_status()
517
  data = response.json()
518
 
519
  current_items = data.get('items', [])
 
520
  json_models_list.extend(item for item in current_items if item.get("type") == "LORA" and item.get("name"))
521
 
522
  metadata = data.get('metadata', {})
 
547
  hf_auth_token = oauth_token_obj.token
548
 
549
  folder_uuid = str(uuid.uuid4())
 
550
  base_temp_dir = "temp_uploads"
551
  os.makedirs(base_temp_dir, exist_ok=True)
552
  folder_path = os.path.join(base_temp_dir, folder_uuid)
 
557
  try:
558
  info, downloaded_data = process_url(url, profile, do_download=True, folder=folder_path)
559
  if not info or not downloaded_data:
560
+ # process_url should raise gr.Error, but this is a fallback.
561
  raise gr.Error("Failed to process URL or download files after initial checks.")
562
 
563
  slug_name = slugify(info["name"])
564
  user_repo_id = f"{profile.username}/{slug_name}"
565
 
566
+ is_author = False
567
+ # Re-verify authorship just before upload, using info from processed model
568
+ civitai_creator_username_from_model = info.get('creator', 'Unknown Creator')
569
+ hf_username_on_civitai = extract_huggingface_username(civitai_creator_username_from_model)
570
+
571
  if profile.username in TRUSTED_UPLOADERS or \
572
  (hf_username_on_civitai and profile.username.lower() == hf_username_on_civitai.lower()):
573
+ is_author = True
574
 
575
  create_readme(info, downloaded_data, user_repo_id, link_civit_checkbox_val, is_author=is_author, folder=folder_path)
576
 
 
594
  print(f"Error during Hugging Face repo operations for {url}: {e}\n{traceback.format_exc()}")
595
  raise gr.Error(f"Upload failed for {url}: {str(e)}. Token might be expired. Try re-logging or check server logs.")
596
  finally:
 
597
  try:
598
  if os.path.exists(folder_path):
599
  shutil.rmtree(folder_path)
 
600
  except Exception as e_clean:
601
  print(f"Error cleaning up folder {folder_path}: {e_clean}")
602
 
 
616
  for i, url in enumerate(urls):
617
  gr.Info(f"Processing model {i+1}/{total_urls}: {url}")
618
  try:
 
619
  result_message = upload_civit_to_hf(profile, oauth_token_obj, url, link_civit_checkbox_val)
620
  upload_results.append(result_message)
621
  gr.Info(f"Successfully processed {url}")
622
+ except gr.Error as ge: # Catch Gradio specific errors to display them
623
  gr.Warning(f"Skipping model {url} due to error: {str(ge)}")
624
  upload_results.append(f"Failed to upload {url}: {str(ge)}")
625
+ except Exception as e: # Catch any other unhandled exception
626
  gr.Warning(f"Unhandled error uploading model {url}: {str(e)}")
627
  upload_results.append(f"Failed to upload {url}: Unhandled exception - {str(e)}")
628
  print(f"Unhandled exception during bulk upload for {url}: {e}\n{traceback.format_exc()}")
 
637
  .gr-html ol { list-style-type: decimal; margin-left: 20px; }
638
  .gr-html a { color: #007bff; text-decoration: underline; }
639
  .gr-html img { max-width: 100%; height: auto; margin-top: 5px; margin-bottom: 5px; border: 1px solid #ddd; }
640
+ #instructions_area { padding: 10px; border: 1px solid #eee; border-radius: 5px; margin-top: 10px; background-color: #f9f9f9; }
641
  '''
642
 
643
  with gr.Blocks(css=css, title="CivitAI to Hugging Face LoRA Uploader") as demo:
644
+ auth_profile_state = gr.State() # Stores the gr.OAuthProfile object
 
 
645
 
646
  gr.Markdown('''# Upload your CivitAI LoRA to Hugging Face 🤗
647
  By uploading your LoRAs to Hugging Face you get diffusers compatibility, a free GPU-based Inference Widget, you'll be listed in [LoRA Studio](https://lorastudio.co/models) after a short review, and get the possibility to submit your model to the [LoRA the Explorer](https://huggingface.co/spaces/multimodalart/LoraTheExplorer) ✨
648
  ''')
649
 
650
  with gr.Row(elem_id="login_button_area"):
651
+ # LoginButton updates auth_profile_state via the .then() chain on demo.load
652
+ login_button = gr.LoginButton()
653
 
 
654
  with gr.Column(visible=True, elem_id="disabled_upload_area") as disabled_area:
655
  gr.HTML("<h3>Please log in with Hugging Face to enable uploads.</h3>")
656
  gr.Textbox(
 
659
  interactive=False
660
  )
661
 
 
662
  with gr.Column(visible=False) as enabled_area:
663
  gr.HTML("<h3 style='color:green;'>Logged in! You can now upload models.</h3>")
664
 
 
669
  label="CivitAI Model URL or ID",
670
  info="Enter the full URL or just the numeric ID of the CivitAI LoRA model page.",
671
  )
672
+ instructions_html = gr.HTML(elem_id="instructions_area") # For feedback
 
673
  try_again_button = gr.Button("I've updated my CivitAI profile (Re-check Authorship)", visible=False)
 
674
  link_civit_checkbox_single = gr.Checkbox(label="Add a link back to CivitAI in the README?", value=True, visible=True)
675
  submit_button_single_model = gr.Button("Upload This Model to Hugging Face", interactive=False, visible=False, variant="primary")
676
 
677
  with gr.TabItem("Bulk Upload"):
678
  civit_username_to_bulk = gr.Textbox(
679
  label="Your CivitAI Username (Optional)",
680
+ info="Enter your CivitAI username to auto-populate the list below with your LoRAs (up to 100 newest)."
681
  )
682
  submit_bulk_civit_urls = gr.Textbox(
683
  label="CivitAI Model URLs or IDs (One per line)",
 
690
  output_markdown_area = gr.Markdown(label="Upload Progress & Results", visible=False)
691
 
692
  # --- Event Handlers Wiring ---
693
+ # This demo.load is triggered by login/logout from gr.LoginButton (which is a client-side component that calls this on auth change)
694
+ # and also on initial page load (where profile will be None if not logged in via cookies).
695
+ # The first input to demo.load for LoginButton is the profile.
 
 
 
 
 
 
 
 
 
696
  demo.load(
697
+ fn=handle_auth_change_and_update_state,
698
+ inputs=gr.Variable(), # This will receive the profile from LoginButton
699
+ outputs=[auth_profile_state, disabled_area, enabled_area, instructions_html, submit_source_civit_enabled, submit_button_single_model, try_again_button],
 
 
 
 
 
 
 
700
  api_name=False, queue=False
701
  )
702
 
 
703
  submit_source_civit_enabled.change(
704
  fn=check_civit_link,
705
  inputs=[auth_profile_state, submit_source_civit_enabled],
706
+ outputs=[instructions_html, submit_button_single_model, try_again_button, submit_button_single_model], # submit_button_single_model is repeated to control both interactivity and visibility
707
  api_name=False
708
  )
709
 
 
710
  try_again_button.click(
711
  fn=check_civit_link,
712
  inputs=[auth_profile_state, submit_source_civit_enabled],
 
714
  api_name=False
715
  )
716
 
717
+ civit_username_to_bulk.submit(
 
718
  fn=list_civit_models,
719
  inputs=[civit_username_to_bulk],
720
  outputs=[submit_bulk_civit_urls],
721
  api_name=False
722
  )
723
 
 
724
  submit_button_single_model.click(
725
  fn=show_output_area, inputs=[], outputs=[output_markdown_area], api_name=False
726
  ).then(
 
730
  api_name="upload_single_model"
731
  )
732
 
 
733
  bulk_upload_button.click(
734
  fn=show_output_area, inputs=[], outputs=[output_markdown_area], api_name=False
735
  ).then(
 
739
  api_name="upload_bulk_models"
740
  )
741
 
742
+ demo.queue(default_concurrency_limit=3, max_size=10)
743
  if __name__ == "__main__":
744
+ # For local testing, you might need to set these environment variables:
745
+ # os.environ["COOKIE_INFO"] = "your_civitai_session_cookie_here" # For creator verification
746
+ # os.environ["CIVITAI_API_TOKEN"] = "your_civitai_api_key_here" # For potentially restricted downloads
747
+ # os.environ["GRADIO_SERVER_NAME"] = "0.0.0.0" # To make it accessible on local network
748
+
749
+ # To enable OAuth locally, you might need to set HF_HUB_DISABLE_OAUTH_CHECKMESSAGES="1"
750
+ # and ensure your HF OAuth app is configured for http://localhost:7860 or http://127.0.0.1:7860
751
+
752
  demo.launch(debug=True, share=os.environ.get("GRADIO_SHARE") == "true")