Spaces:
Sleeping
Sleeping
File size: 17,825 Bytes
c66bf1b c0f7c5e 79af715 c66bf1b 849764e c66bf1b dea7543 c66bf1b dea7543 c66bf1b dea7543 0da8126 dea7543 0da8126 dea7543 0da8126 dea7543 0da8126 c66bf1b dea7543 0da8126 dea7543 0da8126 dea7543 c0f7c5e 0da8126 dea7543 0da8126 dea7543 0da8126 13591d5 dea7543 0da8126 dea7543 0da8126 dea7543 0da8126 dea7543 75e5f7e dea7543 c0e2a00 cbc73b6 dea7543 9abfd61 dea7543 9abfd61 cbc73b6 c66bf1b dea7543 1d4a062 b631296 dea7543 1d4a062 fe71f92 1d4a062 cbc73b6 1d4a062 cbc73b6 1d4a062 cbc73b6 dea7543 0da8126 dea7543 fe71f92 849764e f89e3c9 0da8126 dea7543 fe71f92 c66bf1b 0da8126 fe71f92 dea7543 849764e c66bf1b dea7543 c0f7c5e dea7543 c0f7c5e 13591d5 c0f7c5e dea7543 13591d5 c0f7c5e dea7543 c0f7c5e dea7543 c0f7c5e dea7543 c0f7c5e dea7543 c0f7c5e dea7543 c0f7c5e dea7543 13591d5 dea7543 13591d5 dea7543 c0f7c5e dea7543 13591d5 c0f7c5e 13591d5 dea7543 0da8126 dea7543 0da8126 c66bf1b 0da8126 849764e 0da8126 849764e 13591d5 c0f7c5e 13591d5 c66bf1b 0da8126 13591d5 c66bf1b 13591d5 c66bf1b 849764e c66bf1b f89e3c9 849764e c66bf1b 13591d5 c66bf1b 0da8126 f89e3c9 c66bf1b dea7543 0da8126 dea7543 0da8126 c66bf1b 0da8126 849764e 0da8126 849764e 0da8126 13591d5 0da8126 c0f7c5e 13591d5 c0f7c5e 0da8126 c0f7c5e c66bf1b 13591d5 0da8126 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 |
import os
import re
import tempfile
import shutil
import git
import re
from huggingface_hub import (
create_repo,
upload_folder,
list_repo_files,
Repository,
whoami,
hf_hub_download,
)
import logging
from pathlib import Path
from PIL import Image
# --- Import functions from keylock_decode ---
try:
from keylock_decode import (
decode_data_from_image_pil,
save_decoded_data_locally_encrypted,
load_decoded_data_locally_encrypted
)
KEYLOCK_DECODE_AVAILABLE = True
except ImportError:
KEYLOCK_DECODE_AVAILABLE = False
decode_data_from_image_pil = None
save_decoded_data_locally_encrypted = None
load_decoded_data_locally_encrypted = None
logging.warning("keylock-decode library not available or missing functions. KeyLock Wallet features will be disabled.")
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)
# --- Global variable for in-memory keys and local storage path ---
_loaded_local_keys = {}
LOCAL_KEYS_FILE = "/data/.space_keys.enc" # Persistent storage in Hugging Face Spaces
# --- Helper Function: Load Local Keys into Memory ---
def _load_keys_into_memory(password: str) -> list[str]:
# Loads and decrypts keys from the local file into memory
global _loaded_local_keys
status_messages = []
if not KEYLOCK_DECODE_AVAILABLE or not load_decoded_data_locally_encrypted:
status_messages.append("KeyLock-Decode library not available for local loading.")
_loaded_local_keys = {}
return status_messages
if not password:
status_messages.append("Error: Password required to load local keys.")
_loaded_local_keys = {}
return status_messages
if not Path(LOCAL_KEYS_FILE).exists():
status_messages.append(f"Info: Local key file not found at `{LOCAL_KEYS_FILE}`. No keys loaded.")
_loaded_local_keys = {}
return status_messages
try:
_loaded_local_keys = load_decoded_data_locally_encrypted(password, LOCAL_KEYS_FILE)
if _loaded_local_keys:
status_messages.append(f"Successfully loaded {len(_loaded_local_keys)} keys from local storage.")
masked_keys = {k: ('********' if any(k_word in k.upper() for k_word in ['TOKEN', 'KEY', 'SECRET', 'PASS']) else v) for k, v in _loaded_local_keys.items()}
status_messages.append(f"Loaded keys: {masked_keys}")
logger.info(f"Keys loaded into memory from {LOCAL_KEYS_FILE}")
else:
status_messages.append(f"Info: Local key file `{LOCAL_KEYS_FILE}` was empty or contained no valid data after decryption.")
_loaded_local_keys = {}
except FileNotFoundError:
status_messages.append(f"Info: Local key file not found at {LOCAL_KEYS_FILE}. No keys loaded.")
_loaded_local_keys = {}
except ValueError as e:
status_messages.append(f"Error loading local keys: {e}. Password may be incorrect.")
_loaded_local_keys = {}
except IOError as e:
status_messages.append(f"IO Error loading local keys: {e}")
_loaded_local_keys = {}
except Exception as e:
logger.exception(f"Unexpected error loading local keys from {LOCAL_KEYS_FILE}:")
status_messages.append(f"Unexpected error loading local keys: {str(e)}")
_loaded_local_keys = {}
return status_messages
# --- Helper Function: Get API Token ---
def _get_api_token(ui_token_from_textbox=None):
# Retrieves token from in-memory keys, env vars, or textbox
in_memory_token = _loaded_local_keys.get('HF_TOKEN')
if in_memory_token:
logger.info("Using HF_TOKEN from in-memory loaded keys.")
return in_memory_token, None
env_token = os.getenv('HF_TOKEN')
if env_token:
logger.info("Using HF_TOKEN from environment variable.")
return env_token, None
if ui_token_from_textbox:
logger.info("Using HF_TOKEN from UI textbox.")
return ui_token_from_textbox, None
return None, "Error: Hugging Face API token not provided."
# --- Main Function: Process KeyLock Image and Store Locally ---
def process_keylock_image_and_store_locally(image_pil_object: Image.Image, password: str):
# Decodes image, saves data encrypted locally, and loads into memory
status_messages_display = []
if not KEYLOCK_DECODE_AVAILABLE or not decode_data_from_image_pil or \
not save_decoded_data_locally_encrypted or not load_decoded_data_locally_encrypted:
status_messages_display.append("Error: KeyLock-Decode library not available.")
global _loaded_local_keys
_loaded_local_keys = {}
return "\n".join(status_messages_display)
if image_pil_object is None:
status_messages_display.append("Error: No KeyLock Wallet image provided.")
return "\n".join(status_messages_display)
if not password:
status_messages_display.append("Error: Password cannot be empty.")
return "\n".join(status_messages_display)
decoded_data = None
try:
logger.info(f"Attempting to decode from KeyLock Wallet image...")
decoded_data, status_msgs_from_lib = decode_data_from_image_pil(image_pil_object, password)
status_messages_display.extend(status_msgs_from_lib)
if decoded_data:
status_messages_display.append("\n**Decoded Data Summary (sensitive values masked):**")
for key, value in decoded_data.items():
display_value = '********' if any(k_word in key.upper() for k_word in ['TOKEN', 'KEY', 'SECRET', 'PASS']) else value
status_messages_display.append(f"- {key}: {display_value}")
try:
save_decoded_data_locally_encrypted(decoded_data, password, LOCAL_KEYS_FILE)
status_messages_display.append(f"\n**SUCCESS: Decoded data saved to encrypted local file:** `{LOCAL_KEYS_FILE}`")
load_status = _load_keys_into_memory(password)
status_messages_display.extend(load_status)
if not _loaded_local_keys:
status_messages_display.append("Warning: No keys were loaded into memory after saving.")
except Exception as e:
status_messages_display.append(f"Error saving or loading data locally: {str(e)}")
logger.error(f"Error during local save/load after decoding: {e}")
global _loaded_local_keys
_loaded_local_keys = {}
elif not status_msgs_from_lib:
status_messages_display.append("Info: Decoding process completed, but no data was extracted.")
except ValueError as e:
status_messages_display.append(f"**Decoding Error:** {e}. Please check password and image.")
global _loaded_local_keys
_loaded_local_keys = {}
except Exception as e:
status_messages_display.append(f"**Unexpected error during processing:** {str(e)}")
logger.exception("Unexpected error during keylock image processing:")
global _loaded_local_keys
_loaded_local_keys = {}
return "\n".join(status_messages_display)
# --- Function: Load Keys from Local File ---
def load_keys_from_local_file(password: str) -> str:
# Triggers loading keys from the encrypted local file into memory
status_messages = _load_keys_into_memory(password)
if not status_messages:
if _loaded_local_keys:
return f"Keys successfully loaded from `{LOCAL_KEYS_FILE}`."
else:
return f"Attempted to load keys from `{LOCAL_KEYS_FILE}`, but no keys were loaded. Check password."
return "\n".join(status_messages)
# --- Markdown Processing Functions ---
def process_commented_markdown(commented_input):
# Processes markdown by stripping comments if a specific marker is present
lines = commented_input.strip().split("\n")
if any( "# # Space:" in line.strip() for line in lines):
cleaned_lines = [line[2:] if line.startswith("# ") else line for line in lines]
return cleaned_lines
return lines
def parse_markdown(markdown_input):
# Parses markdown input to extract space info and file content
space_info = {"repo_name_md": "", "owner_md": "", "files": []}
current_file_path = None; current_file_content_lines = []
in_file_definition = False; in_code_block = False
lines = process_commented_markdown(markdown_input)
for line_content_orig in lines:
line_content_stripped = line_content_orig.strip()
if line_content_stripped.startswith("### File:"):
if current_file_path and in_file_definition:
space_info["files"].append({"path": current_file_path, "content": "\n".join(current_file_content_lines)})
current_file_path = line_content_stripped.replace("### File:", "").strip()
current_file_content_lines = []
in_file_definition = True; in_code_block = False
continue
if not in_file_definition:
if line_content_stripped.startswith("# Space:"):
full_space_name_md = line_content_stripped.replace("# Space:", "").strip()
if "/" in full_space_name_md: space_info["owner_md"], space_info["repo_name_md"] = full_space_name_md.split("/", 1)
else: space_info["repo_name_md"] = full_space_name_md
continue
if line_content_stripped.startswith("```"):
in_code_block = not in_code_block
continue
current_file_content_lines.append(line_content_orig)
if current_file_path and in_file_definition:
space_info["files"].append({"path": current_file_path, "content": "\n".join(current_file_content_lines)})
space_info["files"] = [f for f in space_info["files"] if f.get("path")]
return space_info
# --- Helper Function: Determine Repository ID ---
def _determine_repo_id(ui_api_token_from_textbox, space_name_ui, owner_ui):
# Determines the full repo ID (owner/space_name)
if not space_name_ui: return None, "Error: Space Name cannot be empty."
if "/" in space_name_ui: return None, "Error: Space Name should not contain '/'. Use Owner field."
final_owner = owner_ui; error_message = None
if not final_owner:
resolved_api_token, token_err = _get_api_token(ui_api_token_from_textbox)
if token_err: return None, token_err
if not resolved_api_token: return None, "Error: API token required for auto owner determination."
try:
user_info = whoami(token=resolved_api_token)
if user_info and 'name' in user_info: final_owner = user_info['name']
else: error_message = "Error: Could not retrieve username. Check token/permissions."
except Exception as e: error_message = f"Error retrieving username: {str(e)}."
if error_message: return None, error_message
if not final_owner: return None, "Error: Owner could not be determined."
return f"{final_owner}/{space_name_ui}", None
# --- Function: Fetch File Content from Hub ---
def get_space_file_content(ui_api_token_from_textbox, space_name_ui, owner_ui, file_path_in_repo):
# Fetches content of a specific file from a Hugging Face Space
repo_id_for_error_logging = f"{owner_ui}/{space_name_ui}" if owner_ui else space_name_ui
try:
resolved_api_token, token_err = _get_api_token(ui_api_token_from_textbox)
if token_err:
return None, token_err
repo_id, err = _determine_repo_id(ui_api_token_from_textbox, space_name_ui, owner_ui)
if err:
return None, err
repo_id_for_error_logging = repo_id
if not file_path_in_repo:
return None, "Error: File path cannot be empty."
logger.info(f"Attempting to download file: {file_path_in_repo} from Space: {repo_id}")
downloaded_file_path = hf_hub_download(
repo_id=repo_id,
filename=file_path_in_repo,
repo_type="space",
token=resolved_api_token,
)
content = Path(downloaded_file_path).read_text(encoding="utf-8")
logger.info(f"Successfully downloaded and read file: {file_path_in_repo}")
return content, None
except Exception as e:
if "404" in str(e) or "not found" in str(e).lower():
logger.warning(f"File not found {file_path_in_repo} in {repo_id_for_error_logging}: {e}")
return None, f"Error: File '{file_path_in_repo}' not found in Space '{repo_id_for_error_logging}'."
logger.exception(f"Error fetching file content for {file_path_in_repo}:")
return None, f"Error fetching file content: {str(e)}"
# --- Function: List Space Files ---
def list_space_files_for_browsing(ui_api_token_from_textbox, space_name_ui, owner_ui):
# Lists files in a Hugging Face Space
repo_id_for_error_logging = f"{owner_ui}/{space_name_ui}" if owner_ui else space_name_ui
try:
resolved_api_token, token_err = _get_api_token(ui_api_token_from_textbox)
if token_err: return None, token_err
repo_id, err = _determine_repo_id(ui_api_token_from_textbox, space_name_ui, owner_ui)
if err: return None, err
repo_id_for_error_logging = repo_id
files = list_repo_files(repo_id=repo_id, token=resolved_api_token, repo_type="space")
if not files:
return [], f"No files found in Space `{repo_id}`."
return files, None
except Exception as e:
logger.exception(f"Error listing files for {repo_id_for_error_logging}:")
return None, f"Error listing files for `{repo_id_for_error_logging}`: {str(e)}"
# --- Core Function: Create/Update Space ---
def create_space(ui_api_token_from_textbox, space_name_ui, owner_ui, sdk_ui, markdown_input):
# Creates or updates a Hugging Face Space with files from markdown
repo_id_for_error_logging = f"{owner_ui}/{space_name_ui}" if owner_ui else space_name_ui
try:
resolved_api_token, token_err = _get_api_token(ui_api_token_from_textbox)
if token_err: return token_err
repo_id, err = _determine_repo_id(ui_api_token_from_textbox, space_name_ui, owner_ui)
if err: return err
repo_id_for_error_logging = repo_id
space_info = parse_markdown(markdown_input)
if not space_info["files"]: return "Error: No files found in markdown."
with tempfile.TemporaryDirectory() as temp_dir:
repo_staging_path = Path(temp_dir) / "repo_staging_content"
repo_staging_path.mkdir(exist_ok=True)
for file_info in space_info["files"]:
if not file_info.get("path"): continue
file_path_abs = repo_staging_path / file_info["path"]
file_path_abs.parent.mkdir(parents=True, exist_ok=True)
with open(file_path_abs, "w", encoding="utf-8") as f: f.write(file_info["content"])
try:
create_repo(repo_id=repo_id, token=resolved_api_token, repo_type="space", space_sdk=sdk_ui, private=False)
except Exception as e:
err_str = str(e).lower()
if not ("already exists" in err_str or "you already created this repo" in err_str or "exists" in err_str):
return f"Error creating Space '{repo_id}': {str(e)}"
upload_folder(repo_id=repo_id, folder_path=str(repo_staging_path), path_in_repo=".", token=resolved_api_token, repo_type="space", commit_message=f"Initial Space setup of {repo_id} via Builder")
return f"Successfully created/updated Space: [{repo_id}](https://huggingface.co/spaces/{repo_id})"
except Exception as e:
logger.exception(f"Error in create_space for {repo_id_for_error_logging}:")
return f"Error during Space creation/update: {str(e)}"
# --- Core Function: Update Space File ---
def update_space_file(ui_api_token_from_textbox, space_name_ui, owner_ui, file_path_in_repo, file_content, commit_message_ui):
# Updates a specific file in a Hugging Face Space
repo_id_for_error_logging = f"{owner_ui}/{space_name_ui}" if owner_ui else space_name_ui
try:
resolved_api_token, token_err = _get_api_token(ui_api_token_from_textbox)
if token_err: return token_err
repo_id, err = _determine_repo_id(ui_api_token_from_textbox, space_name_ui, owner_ui)
if err: return err
repo_id_for_error_logging = repo_id
if not file_path_in_repo: return "Error: File Path to update cannot be empty."
file_path_in_repo = file_path_in_repo.lstrip('/').replace(os.sep, '/')
commit_message_ui = commit_message_ui or f"Update {file_path_in_repo} via Space Builder"
with tempfile.TemporaryDirectory() as temp_dir_for_update:
repo_local_clone_path = Path(temp_dir_for_update) / "update_clone"
cloned_repo = Repository(local_dir=str(repo_local_clone_path), clone_from=f"https://huggingface.co/spaces/{repo_id}", repo_type="space", use_auth_token=resolved_api_token, git_user="Space Builder Bot", git_email="[email protected]")
full_local_file_path = Path(cloned_repo.local_dir) / file_path_in_repo
full_local_file_path.parent.mkdir(parents=True, exist_ok=True)
with open(full_local_file_path, "w", encoding="utf-8") as f: f.write(file_content)
cloned_repo.push_to_hub(commit_message=commit_message_ui)
return f"Successfully updated `{file_path_in_repo}` in Space [{repo_id}](https://huggingface.co/spaces/{repo_id})"
except Exception as e:
logger.exception(f"Error in update_space_file for {repo_id_for_error_logging}, file {file_path_in_repo}:")
return f"Error updating file for `{repo_id_for_error_logging}`: {str(e)}" |