Spaces:
Sleeping
Sleeping
File size: 15,836 Bytes
c66bf1b 849764e c66bf1b 849764e c66bf1b 0da8126 c66bf1b 0da8126 849764e 0da8126 c66bf1b 0da8126 849764e 0da8126 849764e 0da8126 849764e 0da8126 849764e 0da8126 849764e 0da8126 849764e 0da8126 849764e 0da8126 849764e 0da8126 849764e 0da8126 849764e 0da8126 849764e 0da8126 849764e c66bf1b f89e3c9 c66bf1b 0da8126 c66bf1b f89e3c9 0da8126 f89e3c9 0da8126 f89e3c9 0da8126 f89e3c9 0da8126 f89e3c9 0da8126 f89e3c9 0da8126 f89e3c9 0da8126 f89e3c9 c66bf1b f89e3c9 0da8126 c66bf1b f89e3c9 0da8126 c66bf1b f89e3c9 0da8126 c66bf1b f89e3c9 0da8126 c66bf1b 0da8126 c66bf1b 849764e 0da8126 c66bf1b 0da8126 c66bf1b 849764e f89e3c9 0da8126 849764e c66bf1b 0da8126 c66bf1b 0da8126 c66bf1b 849764e c66bf1b 849764e c66bf1b 849764e 0da8126 c66bf1b 0da8126 849764e 0da8126 849764e c66bf1b 849764e c66bf1b 0da8126 f89e3c9 c66bf1b 0da8126 f89e3c9 c66bf1b 0da8126 c66bf1b 849764e c66bf1b f89e3c9 849764e c66bf1b 849764e c66bf1b 0da8126 f89e3c9 c66bf1b 0da8126 c66bf1b 0da8126 849764e 0da8126 849764e 0da8126 849764e c66bf1b 0da8126 c66bf1b 0da8126 849764e 0da8126 849764e 0da8126 c66bf1b 0da8126 849764e 0da8126 c66bf1b 0da8126 c66bf1b 0da8126 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 |
import os
import re
import tempfile
import git
from huggingface_hub import (
create_repo,
upload_folder,
list_repo_files,
Repository,
whoami,
)
import logging
from pathlib import Path
from PIL import Image # For type hinting from gr.Image(type="pil")
# Attempt to import keylock_decode
try:
from keylock_decode import decode_from_image_pil
KEYLOCK_DECODE_AVAILABLE = True
except ImportError:
KEYLOCK_DECODE_AVAILABLE = False
decode_from_image_pil = None
logging.warning("keylock-decode library not found. Image decoding feature will be disabled.")
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)
# --- Helper Function to Get API Token (Unchanged from previous version) ---
def _get_api_token(ui_token_from_textbox=None):
env_token = os.getenv('HF_TOKEN')
if env_token:
logger.info("Using HF_TOKEN from environment.")
return env_token, None
if ui_token_from_textbox:
logger.info("Using API token from UI textbox.")
return ui_token_from_textbox, None
logger.warning("HF API token not found in environment or UI textbox.")
return None, "Error: Hugging Face API token not provided. Please enter it in the textbox or load it from an image."
# --- Updated Function for KeyLock-Decode Integration with Debugging ---
def load_token_from_image_and_set_env(image_pil_object: Image.Image, password: str):
if not KEYLOCK_DECODE_AVAILABLE:
return "Error: KeyLock-Decode library is not installed. This feature is disabled."
if image_pil_object is None:
return "Error: No image provided for decoding."
if not password:
return "Error: Password cannot be empty for image decoding."
status_messages_display = []
# --- DEBUGGING: Check image properties and save it ---
debug_image_path_str = "Not saved (error or no image)."
try:
if image_pil_object:
# Log original properties
original_mode = getattr(image_pil_object, 'mode', 'N/A')
original_format = getattr(image_pil_object, 'format', 'N/A') # Format might be None for PIL object not read from file
original_size = getattr(image_pil_object, 'size', 'N/A')
logger.info(f"Received PIL Image from Gradio. Mode: {original_mode}, Original Format: {original_format}, Size: {original_size}")
status_messages_display.append(f"[DEBUG] Gradio provided PIL Image - Mode: {original_mode}, Original Format: {original_format}, Size: {original_size}")
# Save the image received from Gradio to a temporary file for external checking
# Ensure the temp directory is writable in the Hugging Face Space environment
# /tmp/ is usually writable.
temp_dir = Path(tempfile.gettempdir()) / "gradio_image_debug"
temp_dir.mkdir(parents=True, exist_ok=True)
debug_image_path = temp_dir / "image_from_gradio.png"
# Save the image. If it's RGBA, try to keep it RGBA.
# PNG is lossless, so this should be okay if the mode is preserved.
image_pil_object.save(debug_image_path, "PNG")
debug_image_path_str = str(debug_image_path)
status_messages_display.append(f"**[DEBUG ACTION REQUIRED]** Image received from Gradio has been saved to: `{debug_image_path_str}` in the Space's temporary filesystem. Please try decoding this *specific file* using the standalone `keylock-decode` CLI to verify its integrity. E.g., `keylock-decode {debug_image_path_str} \"YOUR_PASSWORD\"`")
logger.info(f"Debug image from Gradio saved to: {debug_image_path_str}")
else:
logger.warning("image_pil_object is None when trying to debug-save.")
status_messages_display.append("[DEBUG] No PIL image object received to save for debugging.")
except Exception as save_exc:
logger.exception("Error during debug-saving of image from Gradio:")
status_messages_display.append(f"[DEBUG] Error saving image for debugging: {str(save_exc)}. Path was: {debug_image_path_str}")
# --- END DEBUGGING ---
try:
logger.info(f"Attempting to decode from image using KeyLock-Decode...")
# Pass the original PIL object
decoded_data, status_msgs_from_lib = decode_from_image_pil(
image_pil_object, # Use the PIL object directly
password,
set_environment_variables=True
)
status_messages_display.extend(status_msgs_from_lib)
if decoded_data:
status_messages_display.append("\n**Decoded Data Summary (sensitive values masked):**")
for key, value in decoded_data.items():
display_value = '********' if any(k_word in key.upper() for k_word in ['TOKEN', 'KEY', 'SECRET', 'PASS']) else value
status_messages_display.append(f"- {key}: {display_value}")
if os.getenv('HF_TOKEN'):
status_messages_display.append(f"\n**SUCCESS: HF_TOKEN was found and has been set in the current process environment.** It will be prioritized for operations.")
elif 'HF_TOKEN' in decoded_data:
status_messages_display.append(f"\nWarning: HF_TOKEN was in decoded data but os.getenv('HF_TOKEN') is not picking it up. This is unexpected if 'set_environment_variables=True' worked.")
else:
status_messages_display.append("\nNote: HF_TOKEN was not specifically found in the decoded data from the image.")
# else:
# if not any("No payload data found" in msg for msg in status_msgs_from_lib) and not any("Failed to decrypt" in msg for msg in status_msgs_from_lib): # Avoid redundant messages
# status_messages_display.append("No data was decoded, or the decoded data was empty (and no specific error from library).")
except ValueError as e:
logger.error(f"KeyLock-Decode ValueError: {e}")
status_messages_display.append(f"**Decoding Error (e.g., bad password, corrupted data):** {e}")
except Exception as e:
logger.exception("An unexpected error occurred during image decoding with KeyLock-Decode:")
status_messages_display.append(f"**An unexpected error occurred during decoding:** {str(e)}")
return "\n".join(status_messages_display)
# --- `parse_markdown` (Unchanged) ---
def parse_markdown(markdown_input):
"""Parse markdown input to extract space details and file structure."""
space_info = {"repo_name_md": "", "owner_md": "", "files": []}
current_file = None
file_content = []
in_file_content = False
in_code_block = False
lines = markdown_input.strip().split("\n")
for line_idx, line_content_orig in enumerate(lines):
line_content_stripped = line_content_orig.strip()
if in_file_content:
if line_content_stripped.startswith("```"):
if in_code_block:
file_content.append(line_content_orig)
in_code_block = False
else:
in_code_block = True
file_content.append(line_content_orig)
elif in_code_block:
file_content.append(line_content_orig)
elif not in_code_block:
if line_content_stripped.startswith("### File:") or line_content_stripped.startswith("## File Structure") or line_content_stripped.startswith("# Space:"):
if current_file and file_content:
space_info["files"].append({"path": current_file, "content": "\n".join(file_content)})
current_file = None
file_content = []
in_file_content = False
else:
file_content.append(line_content_orig)
if line_content_stripped.startswith("# Space:"):
if current_file and file_content:
space_info["files"].append({"path": current_file, "content": "\n".join(file_content)})
full_space_name_md = line_content_stripped.replace("# Space:", "").strip()
if "/" in full_space_name_md:
space_info["owner_md"], space_info["repo_name_md"] = full_space_name_md.split("/", 1)
else:
space_info["repo_name_md"] = full_space_name_md
current_file = None
file_content = []
in_file_content = False
in_code_block = False
elif line_content_stripped.startswith("## File Structure"):
if current_file and file_content:
space_info["files"].append({"path": current_file, "content": "\n".join(file_content)})
current_file = None
file_content = []
in_file_content = False
in_code_block = False
continue
elif line_content_stripped.startswith("### File:"):
if current_file and file_content:
space_info["files"].append({"path": current_file, "content": "\n".join(file_content)})
current_file = line_content_stripped.replace("### File:", "").strip()
file_content = []
in_file_content = True
in_code_block = False
if current_file and file_content:
space_info["files"].append({"path": current_file, "content": "\n".join(file_content)})
space_info["files"] = [f for f in space_info["files"] if f.get("path")]
return space_info
# --- `_determine_repo_id` (Unchanged) ---
def _determine_repo_id(ui_api_token_from_textbox, space_name_ui, owner_ui):
if not space_name_ui:
return None, "Error: Space Name cannot be empty."
if "/" in space_name_ui:
return None, "Error: Space Name should not contain '/'. Please use the Owner field for the namespace."
final_owner = owner_ui; error_message = None
if not final_owner:
resolved_api_token, token_err = _get_api_token(ui_api_token_from_textbox)
if token_err: return None, token_err
if not resolved_api_token: return None, "Error: API token required for auto owner determination (internal check failed)."
try:
user_info = whoami(token=resolved_api_token)
if user_info and 'name' in user_info:
final_owner = user_info['name']
logger.info(f"Determined owner: {final_owner} using API token.")
else:
error_message = "Error: Could not retrieve username from API token. Check permissions or specify Owner."
except Exception as e:
error_message = f"Error retrieving username from API token: {str(e)}. Specify Owner manually."
if error_message: return None, error_message
if not final_owner: return None, "Error: Owner could not be determined."
return f"{final_owner}/{space_name_ui}", None
# --- Core Functions: `create_space`, `view_space_files`, `update_space_file` (Unchanged from previous correct version) ---
def create_space(ui_api_token_from_textbox, space_name_ui, owner_ui, sdk_ui, markdown_input):
repo_id_for_error_logging = f"{owner_ui}/{space_name_ui}" if owner_ui else space_name_ui
try:
resolved_api_token, token_err = _get_api_token(ui_api_token_from_textbox)
if token_err: return token_err
repo_id, err = _determine_repo_id(ui_api_token_from_textbox, space_name_ui, owner_ui)
if err: return err
repo_id_for_error_logging = repo_id
space_info = parse_markdown(markdown_input)
if not space_info["files"]: return "Error: No files found in markdown. Use '### File: path/to/file.ext'."
with tempfile.TemporaryDirectory() as temp_dir:
repo_local_path = Path(temp_dir) / "repo_upload_content"
repo_local_path.mkdir(exist_ok=True)
for file_info in space_info["files"]:
if not file_info.get("path"): continue
file_path_abs = repo_local_path / file_info["path"]
file_path_abs.parent.mkdir(parents=True, exist_ok=True)
with open(file_path_abs, "w", encoding="utf-8") as f: f.write(file_info["content"])
try:
create_repo(repo_id=repo_id, token=resolved_api_token, repo_type="space", space_sdk=sdk_ui, private=False)
except Exception as e:
err_str = str(e).lower()
if not ("already exists" in err_str or "you already created this repo" in err_str or "exists" in err_str):
return f"Error creating Space '{repo_id}': {str(e)}"
upload_folder(repo_id=repo_id, folder_path=str(repo_local_path), path_in_repo=".", token=resolved_api_token, repo_type="space", commit_message=f"Initial Space setup of {repo_id} via Builder")
return f"Successfully created/updated Space: [{repo_id}](https://huggingface.co/spaces/{repo_id})"
except Exception as e:
logger.exception(f"Error in create_space for {repo_id_for_error_logging}:")
return f"Error during Space creation/update: {str(e)}"
def view_space_files(ui_api_token_from_textbox, space_name_ui, owner_ui):
repo_id_for_error_logging = f"{owner_ui}/{space_name_ui}" if owner_ui else space_name_ui
try:
resolved_api_token, token_err = _get_api_token(ui_api_token_from_textbox)
if token_err: return token_err
repo_id, err = _determine_repo_id(ui_api_token_from_textbox, space_name_ui, owner_ui)
if err: return err
repo_id_for_error_logging = repo_id
files = list_repo_files(repo_id=repo_id, token=resolved_api_token, repo_type="space")
if files: return f"Files in `{repo_id}`:\n\n" + "\n".join([f"- `{f}`" for f in files])
return f"No files found in Space `{repo_id}`."
except Exception as e:
logger.exception(f"Error in view_space_files for {repo_id_for_error_logging}:")
return f"Error listing files for `{repo_id_for_error_logging}`: {str(e)}"
def update_space_file(ui_api_token_from_textbox, space_name_ui, owner_ui, file_path_in_repo, file_content, commit_message_ui):
repo_id_for_error_logging = f"{owner_ui}/{space_name_ui}" if owner_ui else space_name_ui
try:
resolved_api_token, token_err = _get_api_token(ui_api_token_from_textbox)
if token_err: return token_err
repo_id, err = _determine_repo_id(ui_api_token_from_textbox, space_name_ui, owner_ui)
if err: return err
repo_id_for_error_logging = repo_id
if not file_path_in_repo: return "Error: File Path cannot be empty."
commit_message_ui = commit_message_ui or f"Update {file_path_in_repo} via Space Builder"
with tempfile.TemporaryDirectory() as temp_dir:
repo_local_clone_path = Path(temp_dir) / "cloned_space_repo_for_update"
cloned_repo = Repository(local_dir=str(repo_local_clone_path), clone_from=f"https://huggingface.co/spaces/{repo_id}", repo_type="space", use_auth_token=resolved_api_token, git_user="Space Builder Bot", git_email="[email protected]")
full_local_file_path = Path(cloned_repo.local_dir) / file_path_in_repo
full_local_file_path.parent.mkdir(parents=True, exist_ok=True)
with open(full_local_file_path, "w", encoding="utf-8") as f: f.write(file_content)
cloned_repo.push_to_hub(commit_message=commit_message_ui)
return f"Successfully updated `{file_path_in_repo}` in Space [{repo_id}](https://huggingface.co/spaces/{repo_id})"
except Exception as e:
logger.exception(f"Error in update_space_file for {repo_id_for_error_logging}:")
return f"Error updating file for `{repo_id_for_error_logging}`: {str(e)}" |