Spaces:
Sleeping
Sleeping
File size: 15,314 Bytes
c66bf1b 13591d5 c66bf1b 849764e c66bf1b fe71f92 c66bf1b 13591d5 0da8126 849764e 13591d5 0da8126 c66bf1b fe71f92 0da8126 13591d5 0da8126 13591d5 0da8126 13591d5 fe71f92 0da8126 13591d5 0da8126 13591d5 fe71f92 0da8126 13591d5 849764e 13591d5 0da8126 13591d5 849764e 0da8126 13591d5 849764e 0da8126 13591d5 c66bf1b f89e3c9 13591d5 c66bf1b fe71f92 f89e3c9 fe71f92 13591d5 fe71f92 13591d5 fe71f92 13591d5 fe71f92 13591d5 fe71f92 13591d5 fe71f92 13591d5 0da8126 c66bf1b 849764e 0da8126 fe71f92 849764e f89e3c9 0da8126 849764e fe71f92 c66bf1b 0da8126 fe71f92 849764e c66bf1b 13591d5 0da8126 13591d5 0da8126 c66bf1b 0da8126 849764e 0da8126 849764e 13591d5 849764e 13591d5 c66bf1b 0da8126 13591d5 c66bf1b 13591d5 c66bf1b 849764e c66bf1b f89e3c9 849764e c66bf1b 13591d5 c66bf1b 0da8126 f89e3c9 c66bf1b 13591d5 0da8126 13591d5 0da8126 c66bf1b 0da8126 849764e 13591d5 0da8126 849764e 0da8126 13591d5 0da8126 13591d5 0da8126 13591d5 c66bf1b 13591d5 c66bf1b 13591d5 0da8126 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 |
import os
import re
import tempfile
import shutil # For rmtree
import git # Used by Repository indirectly
from huggingface_hub import (
create_repo,
upload_folder,
list_repo_files,
Repository,
whoami,
)
import logging
from pathlib import Path
from PIL import Image
# Attempt to import keylock_decode
try:
from keylock_decode import decode_from_image_pil
KEYLOCK_DECODE_AVAILABLE = True
except ImportError:
KEYLOCK_DECODE_AVAILABLE = False
decode_from_image_pil = None
logging.warning("keylock-decode library not found. KeyLock Wallet image feature will be disabled.")
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)
# --- Helper Function to Get API Token (Unchanged) ---
def _get_api_token(ui_token_from_textbox=None):
env_token = os.getenv('HF_TOKEN')
if env_token:
logger.info("Using HF_TOKEN from environment.")
return env_token, None
if ui_token_from_textbox:
logger.info("Using API token from UI textbox.")
return ui_token_from_textbox, None
logger.warning("HF API token not found in environment or UI textbox.")
return None, "Error: Hugging Face API token not provided. Please enter it or load from a KeyLock Wallet image."
# --- `load_token_from_image_and_set_env` (Unchanged from previous debug version, ensure debug lines are suitable) ---
def load_token_from_image_and_set_env(image_pil_object: Image.Image, password: str):
if not KEYLOCK_DECODE_AVAILABLE:
return "Error: KeyLock-Decode library is not installed. This feature is disabled."
if image_pil_object is None: return "Error: No KeyLock Wallet image provided for decoding."
if not password: return "Error: Password cannot be empty for image decoding."
status_messages_display = []
# Optional: Keep debug saving if still needed, otherwise remove for cleaner output
# debug_image_path_str = "Not saved."
# try: ... debug save logic ...
# except Exception as save_exc: ...
try:
logger.info(f"Attempting to decode from KeyLock Wallet image...")
decoded_data, status_msgs_from_lib = decode_from_image_pil(image_pil_object, password, set_environment_variables=True)
status_messages_display.extend(status_msgs_from_lib)
if decoded_data:
status_messages_display.append("\n**Decoded Data Summary (sensitive values masked):**")
for key, value in decoded_data.items():
display_value = '********' if any(k_word in key.upper() for k_word in ['TOKEN', 'KEY', 'SECRET', 'PASS']) else value
status_messages_display.append(f"- {key}: {display_value}")
if os.getenv('HF_TOKEN'):
status_messages_display.append(f"\n**SUCCESS: HF_TOKEN was found and set in environment from KeyLock Wallet image.**")
elif 'HF_TOKEN' in decoded_data:
status_messages_display.append(f"\nWarning: HF_TOKEN decoded but os.getenv('HF_TOKEN') not found (unexpected).")
else:
status_messages_display.append("\nNote: HF_TOKEN not specifically found in decoded KeyLock Wallet image data.")
except ValueError as e: # Specific errors from keylock-decode
status_messages_display.append(f"**Decoding Error (e.g., bad password, corrupted data):** {e}")
except Exception as e:
logger.exception("Unexpected error during KeyLock Wallet image decoding:")
status_messages_display.append(f"**An unexpected error occurred during decoding:** {str(e)}")
return "\n".join(status_messages_display)
# --- `parse_markdown` (Unchanged from previous corrected version) ---
def parse_markdown(markdown_input):
space_info = {"repo_name_md": "", "owner_md": "", "files": []}
current_file_path = None; current_file_content_lines = []
in_file_definition = False; in_code_block = False
lines = markdown_input.strip().split("\n")
for line_content_orig in lines:
line_content_stripped = line_content_orig.strip()
if line_content_stripped.startswith("### File:"):
if current_file_path and in_file_definition:
space_info["files"].append({"path": current_file_path, "content": "\n".join(current_file_content_lines)})
current_file_path = line_content_stripped.replace("### File:", "").strip()
current_file_content_lines = []
in_file_definition = True; in_code_block = False
continue
if not in_file_definition:
if line_content_stripped.startswith("# Space:"):
full_space_name_md = line_content_stripped.replace("# Space:", "").strip()
if "/" in full_space_name_md: space_info["owner_md"], space_info["repo_name_md"] = full_space_name_md.split("/", 1)
else: space_info["repo_name_md"] = full_space_name_md
continue
if line_content_stripped.startswith("```"):
in_code_block = not in_code_block
continue
current_file_content_lines.append(line_content_orig)
if current_file_path and in_file_definition:
space_info["files"].append({"path": current_file_path, "content": "\n".join(current_file_content_lines)})
space_info["files"] = [f for f in space_info["files"] if f.get("path")]
return space_info
# --- `_determine_repo_id` (Unchanged) ---
def _determine_repo_id(ui_api_token_from_textbox, space_name_ui, owner_ui):
if not space_name_ui: return None, "Error: Space Name cannot be empty."
if "/" in space_name_ui: return None, "Error: Space Name should not contain '/'. Use Owner field."
final_owner = owner_ui; error_message = None
if not final_owner:
resolved_api_token, token_err = _get_api_token(ui_api_token_from_textbox)
if token_err: return None, token_err
if not resolved_api_token: return None, "Error: API token required for auto owner determination."
try:
user_info = whoami(token=resolved_api_token)
if user_info and 'name' in user_info: final_owner = user_info['name']
else: error_message = "Error: Could not retrieve username. Check token/permissions or specify Owner."
except Exception as e: error_message = f"Error retrieving username: {str(e)}. Specify Owner."
if error_message: return None, error_message
if not final_owner: return None, "Error: Owner could not be determined."
return f"{final_owner}/{space_name_ui}", None
# --- New/Modified Functions for File Browsing and Editing ---
CLONE_BASE_DIR = Path(tempfile.gettempdir()) / "space_builder_active_clones"
CLONE_BASE_DIR.mkdir(parents=True, exist_ok=True)
def _cleanup_old_clones(repo_id_slug_to_keep=None):
"""Clean up old clone directories, optionally keeping one specific repo's clone."""
try:
for item in CLONE_BASE_DIR.iterdir():
if item.is_dir():
if repo_id_slug_to_keep and item.name == repo_id_slug_to_keep:
continue
logger.info(f"Cleaning up old clone: {item}")
shutil.rmtree(item)
except Exception as e:
logger.error(f"Error during old clone cleanup: {e}")
def get_space_local_clone_path(ui_api_token_from_textbox, space_name_ui, owner_ui, force_refresh=False):
"""Clones a Space locally, manages cleanup, and returns the path."""
repo_id_for_error_logging = f"{owner_ui}/{space_name_ui}" if owner_ui else space_name_ui
try:
resolved_api_token, token_err = _get_api_token(ui_api_token_from_textbox)
if token_err: return None, token_err
repo_id, err = _determine_repo_id(ui_api_token_from_textbox, space_name_ui, owner_ui)
if err: return None, err
repo_id_for_error_logging = repo_id
repo_id_slug = repo_id.replace("/", "_") # Sanitize for directory name
_cleanup_old_clones(repo_id_slug_to_keep=repo_id_slug)
local_clone_dir = CLONE_BASE_DIR / repo_id_slug
if force_refresh and local_clone_dir.exists():
logger.info(f"Force refresh: Removing existing clone at {local_clone_dir}")
shutil.rmtree(local_clone_dir)
if not local_clone_dir.exists():
local_clone_dir.mkdir(parents=True, exist_ok=True) # Ensure parent exists before Repository tries to use it
logger.info(f"Cloning Space {repo_id} to {local_clone_dir}...")
try:
Repository(
local_dir=str(local_clone_dir),
clone_from=f"https://huggingface.co/spaces/{repo_id}",
repo_type="space", use_auth_token=resolved_api_token,
)
logger.info(f"Successfully cloned {repo_id} to {local_clone_dir}")
except Exception as clone_exc:
logger.exception(f"Error cloning {repo_id}:")
if local_clone_dir.exists(): shutil.rmtree(local_clone_dir) # Cleanup partial clone
return None, f"Error cloning Space '{repo_id}': {str(clone_exc)}"
else:
logger.info(f"Using existing clone for {repo_id} at {local_clone_dir}")
# For a true refresh of an existing clone, you'd `git pull` here.
# `force_refresh=True` handles this by re-cloning.
return str(local_clone_dir), None
except Exception as e:
logger.exception(f"Error in get_space_local_clone_path for {repo_id_for_error_logging}:")
return None, f"Error preparing local clone: {str(e)}"
def read_file_from_local_path(absolute_local_file_path_str: str):
"""Reads content of a file given its absolute local path."""
try:
file_path = Path(absolute_local_file_path_str)
if not file_path.is_file():
return None, f"Error: Path is not a file or does not exist: {absolute_local_file_path_str}"
content = file_path.read_text(encoding="utf-8")
return content, None
except Exception as e:
logger.exception(f"Error reading local file {absolute_local_file_path_str}:")
return None, f"Error reading file content: {str(e)}"
# --- Core Functions: `create_space`, `update_space_file` (view_space_files can be deprecated) ---
def create_space(ui_api_token_from_textbox, space_name_ui, owner_ui, sdk_ui, markdown_input):
# (Unchanged from previous correct version)
repo_id_for_error_logging = f"{owner_ui}/{space_name_ui}" if owner_ui else space_name_ui
try:
resolved_api_token, token_err = _get_api_token(ui_api_token_from_textbox)
if token_err: return token_err
repo_id, err = _determine_repo_id(ui_api_token_from_textbox, space_name_ui, owner_ui)
if err: return err
repo_id_for_error_logging = repo_id
space_info = parse_markdown(markdown_input)
if not space_info["files"]: return "Error: No files found in markdown. Use '### File: path/to/file.ext'."
with tempfile.TemporaryDirectory() as temp_dir: # Temp dir for staging files from markdown
repo_staging_path = Path(temp_dir) / "repo_staging_content"
repo_staging_path.mkdir(exist_ok=True)
for file_info in space_info["files"]:
if not file_info.get("path"): continue
file_path_abs = repo_staging_path / file_info["path"]
file_path_abs.parent.mkdir(parents=True, exist_ok=True)
with open(file_path_abs, "w", encoding="utf-8") as f: f.write(file_info["content"])
try:
create_repo(repo_id=repo_id, token=resolved_api_token, repo_type="space", space_sdk=sdk_ui, private=False)
except Exception as e:
err_str = str(e).lower()
if not ("already exists" in err_str or "you already created this repo" in err_str or "exists" in err_str):
return f"Error creating Space '{repo_id}': {str(e)}"
logger.info(f"Space {repo_id} already exists or creation confirmed, proceeding with upload.")
upload_folder(repo_id=repo_id, folder_path=str(repo_staging_path), path_in_repo=".", token=resolved_api_token, repo_type="space", commit_message=f"Initial Space setup of {repo_id} via Builder")
return f"Successfully created/updated Space: [{repo_id}](https://huggingface.co/spaces/{repo_id})"
except Exception as e:
logger.exception(f"Error in create_space for {repo_id_for_error_logging}:")
return f"Error during Space creation/update: {str(e)}"
# view_space_files can be removed or kept as a simple alternative list view
# def view_space_files(...)
def update_space_file(ui_api_token_from_textbox, space_name_ui, owner_ui, file_path_in_repo, file_content, commit_message_ui):
# This function is now simpler as it expects file_path_in_repo to be the correct relative path.
# It still does its own clone for transactional integrity.
repo_id_for_error_logging = f"{owner_ui}/{space_name_ui}" if owner_ui else space_name_ui
try:
resolved_api_token, token_err = _get_api_token(ui_api_token_from_textbox)
if token_err: return token_err
repo_id, err = _determine_repo_id(ui_api_token_from_textbox, space_name_ui, owner_ui)
if err: return err
repo_id_for_error_logging = repo_id
if not file_path_in_repo: return "Error: File Path to update cannot be empty."
# Sanitize file_path_in_repo: remove leading slashes, ensure forward slashes
file_path_in_repo = file_path_in_repo.lstrip('/').replace(os.sep, '/')
commit_message_ui = commit_message_ui or f"Update {file_path_in_repo} via Space Builder"
with tempfile.TemporaryDirectory() as temp_dir_for_update: # Fresh temp dir for this update operation
repo_local_clone_path = Path(temp_dir_for_update) / "update_clone"
cloned_repo = Repository(
local_dir=str(repo_local_clone_path),
clone_from=f"https://huggingface.co/spaces/{repo_id}",
repo_type="space", use_auth_token=resolved_api_token,
git_user="Space Builder Bot", git_email="[email protected]" # Optional
)
logger.info(f"Cloned Space {repo_id} to {repo_local_clone_path} for update operation.")
# The file_path_in_repo is relative to the cloned_repo.local_dir
full_local_file_path = Path(cloned_repo.local_dir) / file_path_in_repo
full_local_file_path.parent.mkdir(parents=True, exist_ok=True) # Create parent dirs if needed
with open(full_local_file_path, "w", encoding="utf-8") as f:
f.write(file_content)
logger.info(f"Wrote updated content to {full_local_file_path} for commit.")
cloned_repo.push_to_hub(commit_message=commit_message_ui)
logger.info(f"Pushed update for {file_path_in_repo} to {repo_id}")
return f"Successfully updated `{file_path_in_repo}` in Space [{repo_id}](https://huggingface.co/spaces/{repo_id})"
except Exception as e:
logger.exception(f"Error in update_space_file for {repo_id_for_error_logging}, file {file_path_in_repo}:")
return f"Error updating file for `{repo_id_for_error_logging}`: {str(e)}" |