nevreal commited on
Commit
ecf897a
·
verified ·
1 Parent(s): 71140be

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +112 -211
app.py CHANGED
@@ -1,224 +1,125 @@
1
- import requests
2
- import os
3
  import gradio as gr
4
- from huggingface_hub import update_repo_visibility, whoami, upload_folder, create_repo, upload_file
5
- from slugify import slugify
6
- import re
7
- import uuid
8
- from typing import Optional
9
- import json
10
-
11
- # List of trusted uploaders
12
- TRUSTED_UPLOADERS = [
13
- "KappaNeuro", "CiroN2022", "multimodalart", "Norod78", "joachimsallstrom",
14
- "blink7630", "e-n-v-y", "DoctorDiffusion", "RalFinger", "artificialguybr", "nevreal"
15
- ]
16
-
17
- # Fetch JSON data from CivitAI
18
- def get_json_data(url):
19
- url_split = url.split('/')
20
- api_url = f"https://civitai.com/api/v1/models/{url_split[4]}"
21
  try:
22
- response = requests.get(api_url)
23
- response.raise_for_status()
24
- return response.json()
25
- except requests.exceptions.RequestException as e:
26
- print(f"Error fetching JSON data: {e}")
27
  return None
28
 
29
- # Fetch prompt data from images using image IDs
30
- def get_prompts_from_image(image_id):
31
- url = f'https://civitai.com/api/trpc/image.getGenerationData?input={{"json":{{"id":{image_id}}}}}'
32
- response = requests.get(url)
33
- prompt = ""
34
- negative_prompt = ""
35
- if response.status_code == 200:
36
- data = response.json()
37
- result = data['result']['data']['json']
38
- if "prompt" in result['meta']:
39
- prompt = result['meta']['prompt']
40
- if "negativePrompt" in result['meta']:
41
- negative_prompt = result["meta"]["negativePrompt"]
42
- return prompt, negative_prompt
43
-
44
- # Extract model information from JSON data
45
- def extract_info(json_data):
46
- if json_data["type"] == "LORA":
47
- for model_version in json_data["modelVersions"]:
48
- if model_version["baseModel"] in [
49
- "SDXL 1.0", "SDXL 0.9", "SD 1.5", "SD 1.4", "SD 2.1", "SD 2.0",
50
- "SD 2.0 768", "SD 2.1 768", "SD 3", "Flux.1 D", "Flux.1 S"
51
- ]:
52
- for file in model_version["files"]:
53
- if "primary" in file:
54
- urls_to_download = [{
55
- "url": file["downloadUrl"],
56
- "filename": file["name"],
57
- "type": "weightName"
58
- }]
59
-
60
- for image in model_version["images"]:
61
- image_id = image["url"].split("/")[-1].split(".")[0]
62
- prompt, negative_prompt = get_prompts_from_image(image_id)
63
- urls_to_download.append({
64
- "url": image["url"],
65
- "filename": os.path.basename(image["url"]),
66
- "type": "imageName",
67
- "prompt": prompt,
68
- "negative_prompt": negative_prompt
69
- })
70
- model_mapping = {
71
- "SDXL 1.0": "stabilityai/stable-diffusion-xl-base-1.0",
72
- "SDXL 0.9": "stabilityai/stable-diffusion-xl-base-1.0",
73
- "SD 1.5": "runwayml/stable-diffusion-v1-5",
74
- "SD 1.4": "CompVis/stable-diffusion-v1-4",
75
- "SD 2.1": "stabilityai/stable-diffusion-2-1-base",
76
- "SD 2.0": "stabilityai/stable-diffusion-2-base",
77
- "SD 2.1 768": "stabilityai/stable-diffusion-2-1",
78
- "SD 2.0 768": "stabilityai/stable-diffusion-2",
79
- "SD 3": "stabilityai/stable-diffusion-3-medium-diffusers",
80
- "Flux.1 D": "black-forest-labs/FLUX.1-dev",
81
- "Flux.1 S": "black-forest-labs/FLUX.1-schnell"
82
- }
83
- base_model = model_mapping[model_version["baseModel"]]
84
- info = {
85
- "urls_to_download": urls_to_download,
86
- "id": model_version["id"],
87
- "baseModel": base_model,
88
- "modelId": model_version.get("modelId", ""),
89
- "name": json_data["name"],
90
- "description": json_data["description"],
91
- "trainedWords": model_version.get("trainedWords", []),
92
- "creator": json_data["creator"]["username"],
93
- "tags": json_data["tags"],
94
- "allowNoCredit": json_data["allowNoCredit"],
95
- "allowCommercialUse": json_data["allowCommercialUse"],
96
- "allowDerivatives": json_data["allowDerivatives"],
97
- "allowDifferentLicense": json_data["allowDifferentLicense"]
98
- }
99
- return info
100
- return None
101
-
102
- # Function to download files
103
- def download_files(info, folder="."):
104
- downloaded_files = {
105
- "imageName": [],
106
- "imagePrompt": [],
107
- "imageNegativePrompt": [],
108
- "weightName": []
109
  }
110
- for item in info["urls_to_download"]:
111
- download_file(item["url"], item["filename"], folder)
112
- downloaded_files[item["type"]].append(item["filename"])
113
- if item["type"] == "imageName":
114
- prompt_clean = re.sub(r'<.*?>', '', item["prompt"])
115
- negative_prompt_clean = re.sub(r'<.*?>', '', item["negative_prompt"])
116
- downloaded_files["imagePrompt"].append(prompt_clean)
117
- downloaded_files["imageNegativePrompt"].append(negative_prompt_clean)
118
- return downloaded_files
119
-
120
- # Function to download individual files
121
- def download_file(url, filename, folder="."):
122
- headers = {}
123
- try:
124
- response = requests.get(url, headers=headers)
125
- response.raise_for_status()
126
- except requests.exceptions.HTTPError as e:
127
- print(e)
128
- if response.status_code == 401:
129
- headers['Authorization'] = f'Bearer {os.environ["CIVITAI_API"]}'
130
- try:
131
- response = requests.get(url, headers=headers)
132
- response.raise_for_status()
133
- except requests.exceptions.RequestException as e:
134
- raise gr.Error(f"Error downloading file: {e}")
135
- else:
136
- raise gr.Error(f"Error downloading file: {e}")
137
- except requests.exceptions.RequestException as e:
138
- raise gr.Error(f"Error downloading file: {e}")
139
-
140
- with open(f"{folder}/{filename}", 'wb') as f:
141
- f.write(response.content)
142
-
143
- # Main function to process URL and extract necessary info
144
- def process_url(url, profile, do_download=True, folder="."):
145
- json_data = get_json_data(url)
146
- if json_data:
147
- if check_nsfw(json_data, profile):
148
- info = extract_info(json_data)
149
- if info:
150
- if do_download:
151
- downloaded_files = download_files(info, folder)
152
- else:
153
- downloaded_files = []
154
- return info, downloaded_files
155
- else:
156
- raise gr.Error("Only SDXL LoRAs are supported for now")
157
  else:
158
- raise gr.Error("This model has content tagged as unsafe by CivitAI")
 
 
 
 
159
  else:
160
- raise gr.Error("Something went wrong in fetching CivitAI API")
161
 
162
- # Creating the web UI using Gradio
163
- with gr.Blocks() as demo:
164
- gr.Markdown("# Upload your CivitAI LoRA to Hugging Face 🤗")
165
-
166
- # Login Button
167
- gr.LoginButton()
168
-
169
- # URL input and submit button
170
- submit_source_civit = gr.Textbox(
171
- placeholder="https://civitai.com/models/144684/pixelartredmond-pixel-art-loras-for-sd-xl",
172
- label="CivitAI model URL",
173
- info="URL of the CivitAI LoRA"
174
- )
175
- submit_button_civit = gr.Button("Upload model to Hugging Face and submit")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
176
 
177
- # Output section for progress and result display
178
- output = gr.Markdown(label="Output progress", visible=False)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
179
 
180
- # Functionality to handle button click events
181
- def upload_model_to_hf(url, profile, oauth_token):
182
- if not profile.name:
183
- return gr.Error("Are you sure you are logged in?")
184
-
185
- folder = str(uuid.uuid4())
186
- os.makedirs(folder, exist_ok=False)
187
- info, downloaded_files = process_url(url, profile, folder=folder)
188
- slug_name = slugify(info["name"])
189
- user_repo_id = f"{profile.username}/{slug_name}"
190
-
191
- # Create repository on Hugging Face
192
- try:
193
- create_repo(repo_id=user_repo_id, repo_type="model", space_sdk="diffusers", private=True, token=oauth_token)
194
- except Exception as e:
195
- raise gr.Error(f"Error creating Hugging Face repo: {e}")
196
-
197
- try:
198
- upload_folder(
199
- folder_path=folder,
200
- repo_id=user_repo_id,
201
- repo_type="model",
202
- token=oauth_token
203
- )
204
- except Exception as e:
205
- raise gr.Error(f"Error uploading to Hugging Face: {e}")
206
-
207
- update_repo_visibility(repo_id=user_repo_id, private=False, token=oauth_token)
208
-
209
- # Clean up downloaded files
210
- for filename in downloaded_files:
211
- os.remove(os.path.join(folder, filename))
212
- os.rmdir(folder)
213
-
214
- return f"Model '{info['name']}' uploaded to {user_repo_id}"
215
-
216
- # Define the submit button click action
217
- submit_button_civit.click(
218
- upload_model_to_hf,
219
- inputs=[submit_source_civit, gr.OAuthProfile(hub_token=gr.Token())],
220
- outputs=output
221
  )
222
 
223
- # Launch the Gradio interface
224
  demo.launch()
 
 
 
1
  import gradio as gr
2
+ import os
3
+ import time
4
+ import urllib.request
5
+ from pathlib import Path
6
+ from urllib.parse import urlparse, parse_qs, unquote
7
+
8
+ CHUNK_SIZE = 1638400
9
+ TOKEN_FILE = Path.home() / '.civitai' / 'config'
10
+ USER_AGENT = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3'
11
+
12
+ def get_token():
 
 
 
 
 
 
13
  try:
14
+ with open(TOKEN_FILE, 'r') as file:
15
+ token = file.read()
16
+ return token
17
+ except Exception:
 
18
  return None
19
 
20
+ def store_token(token: str):
21
+ TOKEN_FILE.parent.mkdir(parents=True, exist_ok=True)
22
+ with open(TOKEN_FILE, 'w') as file:
23
+ file.write(token)
24
+
25
+ def download_file(url: str, output_path: str, token: str):
26
+ headers = {
27
+ 'Authorization': f'Bearer {token}',
28
+ 'User-Agent': USER_AGENT,
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
29
  }
30
+
31
+ class NoRedirection(urllib.request.HTTPErrorProcessor):
32
+ def http_response(self, request, response):
33
+ return response
34
+ https_response = http_response
35
+
36
+ request = urllib.request.Request(url, headers=headers)
37
+ opener = urllib.request.build_opener(NoRedirection)
38
+ response = opener.open(request)
39
+
40
+ if response.status in [301, 302, 303, 307, 308]:
41
+ redirect_url = response.getheader('Location')
42
+ parsed_url = urlparse(redirect_url)
43
+ query_params = parse_qs(parsed_url.query)
44
+ content_disposition = query_params.get('response-content-disposition', [None])[0]
45
+
46
+ if content_disposition:
47
+ filename = unquote(content_disposition.split('filename=')[1].strip('"'))
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
48
  else:
49
+ return None, f"ERROR: Unable to determine filename"
50
+
51
+ response = urllib.request.urlopen(redirect_url)
52
+ elif response.status == 404:
53
+ return None, f"ERROR: File not found"
54
  else:
55
+ return None, f"ERROR: No redirect found, something went wrong"
56
 
57
+ total_size = response.getheader('Content-Length')
58
+ if total_size is not None:
59
+ total_size = int(total_size)
60
+
61
+ output_file = os.path.join(output_path, filename)
62
+
63
+ with open(output_file, 'wb') as f:
64
+ downloaded = 0
65
+ start_time = time.time()
66
+
67
+ while True:
68
+ chunk_start_time = time.time()
69
+ buffer = response.read(CHUNK_SIZE)
70
+ chunk_end_time = time.time()
71
+
72
+ if not buffer:
73
+ break
74
+
75
+ downloaded += len(buffer)
76
+ f.write(buffer)
77
+ chunk_time = chunk_end_time - chunk_start_time
78
+
79
+ if chunk_time > 0:
80
+ speed = len(buffer) / chunk_time / (1024 ** 2) # Speed in MB/s
81
+
82
+ if total_size is not None:
83
+ progress = downloaded / total_size
84
+ gr.Progress.update(f'Downloading: {filename} [{progress*100:.2f}%] - {speed:.2f} MB/s')
85
 
86
+ end_time = time.time()
87
+ time_taken = end_time - start_time
88
+ hours, remainder = divmod(time_taken, 3600)
89
+ minutes, seconds = divmod(remainder, 60)
90
+
91
+ if hours > 0:
92
+ time_str = f'{int(hours)}h {int(minutes)}m {int(seconds)}s'
93
+ elif minutes > 0:
94
+ time_str = f'{int(minutes)}m {int(seconds)}s'
95
+ else:
96
+ time_str = f'{int(seconds)}s'
97
+
98
+ return output_file, f"Download completed. File saved as: {filename}. Downloaded in {time_str}"
99
+
100
+ def run_downloader(url, output_path, token_input):
101
+ token = get_token() or token_input
102
+ if not token:
103
+ return None, "ERROR: Token not provided."
104
+
105
+ store_token(token)
106
+ file_path, message = download_file(url, output_path, token)
107
+ return file_path, message
108
+
109
+ with gr.Blocks() as demo:
110
+ gr.Markdown("# CivitAI Downloader")
111
+ with gr.Row():
112
+ url_input = gr.Textbox(label="CivitAI Download URL")
113
+ output_path_input = gr.Textbox(label="Output Path", value="./") # Default value set to current directory
114
+ token_input = gr.Textbox(label="CivitAI API Token (Optional)")
115
+ download_button = gr.Button("Download")
116
+ output_file = gr.File(label="Downloaded Model")
117
+ message_output = gr.Textbox(label="Status Message", interactive=False)
118
 
119
+ download_button.click(
120
+ run_downloader,
121
+ inputs=[url_input, output_path_input, token_input],
122
+ outputs=[output_file, message_output]
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
123
  )
124
 
 
125
  demo.launch()