File size: 17,825 Bytes
c66bf1b
 
 
c0f7c5e
 
79af715
 
c66bf1b
 
 
 
849764e
c66bf1b
dea7543
c66bf1b
 
 
dea7543
c66bf1b
dea7543
0da8126
dea7543
 
 
 
 
0da8126
 
 
dea7543
 
 
 
 
0da8126
 
dea7543
0da8126
 
c66bf1b
 
dea7543
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
0da8126
dea7543
 
 
 
 
 
0da8126
dea7543
 
 
 
 
 
 
 
c0f7c5e
0da8126
dea7543
 
 
 
0da8126
dea7543
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
0da8126
13591d5
dea7543
0da8126
dea7543
0da8126
 
 
 
 
dea7543
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
0da8126
 
dea7543
 
 
 
 
 
 
 
 
75e5f7e
dea7543
c0e2a00
cbc73b6
dea7543
9abfd61
dea7543
9abfd61
 
 
 
 
cbc73b6
c66bf1b
dea7543
1d4a062
 
 
b631296
dea7543
1d4a062
 
 
 
 
 
 
 
fe71f92
1d4a062
 
 
 
 
cbc73b6
1d4a062
cbc73b6
 
1d4a062
 
 
 
 
cbc73b6
dea7543
0da8126
dea7543
fe71f92
 
849764e
f89e3c9
0da8126
dea7543
fe71f92
c66bf1b
0da8126
fe71f92
dea7543
 
849764e
 
c66bf1b
 
dea7543
c0f7c5e
dea7543
c0f7c5e
13591d5
c0f7c5e
 
dea7543
13591d5
c0f7c5e
 
 
 
dea7543
c0f7c5e
 
 
 
 
 
 
 
 
 
dea7543
c0f7c5e
dea7543
 
 
c0f7c5e
 
 
 
dea7543
c0f7c5e
 
dea7543
c0f7c5e
dea7543
13591d5
 
 
 
dea7543
13591d5
 
 
dea7543
c0f7c5e
 
dea7543
 
13591d5
c0f7c5e
 
13591d5
dea7543
0da8126
dea7543
0da8126
c66bf1b
0da8126
849764e
0da8126
849764e
 
13591d5
c0f7c5e
 
13591d5
 
c66bf1b
0da8126
13591d5
c66bf1b
13591d5
c66bf1b
849764e
c66bf1b
f89e3c9
849764e
c66bf1b
13591d5
c66bf1b
 
0da8126
f89e3c9
c66bf1b
dea7543
0da8126
dea7543
0da8126
c66bf1b
0da8126
849764e
0da8126
849764e
0da8126
13591d5
 
0da8126
c0f7c5e
13591d5
c0f7c5e
0da8126
c0f7c5e
 
c66bf1b
 
 
13591d5
0da8126
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
import os
import re
import tempfile
import shutil
import git
import re

from huggingface_hub import (
    create_repo,
    upload_folder,
    list_repo_files,
    Repository,
    whoami,
    hf_hub_download,
)
import logging
from pathlib import Path
from PIL import Image

# --- Import functions from keylock_decode ---
try:
    from keylock_decode import (
        decode_data_from_image_pil,
        save_decoded_data_locally_encrypted,
        load_decoded_data_locally_encrypted
    )
    KEYLOCK_DECODE_AVAILABLE = True
except ImportError:
    KEYLOCK_DECODE_AVAILABLE = False
    decode_data_from_image_pil = None
    save_decoded_data_locally_encrypted = None
    load_decoded_data_locally_encrypted = None
    logging.warning("keylock-decode library not available or missing functions. KeyLock Wallet features will be disabled.")


logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)

# --- Global variable for in-memory keys and local storage path ---
_loaded_local_keys = {}
LOCAL_KEYS_FILE = "/data/.space_keys.enc" # Persistent storage in Hugging Face Spaces

# --- Helper Function: Load Local Keys into Memory ---
def _load_keys_into_memory(password: str) -> list[str]:
    # Loads and decrypts keys from the local file into memory
    global _loaded_local_keys
    status_messages = []

    if not KEYLOCK_DECODE_AVAILABLE or not load_decoded_data_locally_encrypted:
        status_messages.append("KeyLock-Decode library not available for local loading.")
        _loaded_local_keys = {}
        return status_messages

    if not password:
        status_messages.append("Error: Password required to load local keys.")
        _loaded_local_keys = {}
        return status_messages

    if not Path(LOCAL_KEYS_FILE).exists():
         status_messages.append(f"Info: Local key file not found at `{LOCAL_KEYS_FILE}`. No keys loaded.")
         _loaded_local_keys = {}
         return status_messages

    try:
        _loaded_local_keys = load_decoded_data_locally_encrypted(password, LOCAL_KEYS_FILE)

        if _loaded_local_keys:
            status_messages.append(f"Successfully loaded {len(_loaded_local_keys)} keys from local storage.")
            masked_keys = {k: ('********' if any(k_word in k.upper() for k_word in ['TOKEN', 'KEY', 'SECRET', 'PASS']) else v) for k, v in _loaded_local_keys.items()}
            status_messages.append(f"Loaded keys: {masked_keys}")
            logger.info(f"Keys loaded into memory from {LOCAL_KEYS_FILE}")
        else:
             status_messages.append(f"Info: Local key file `{LOCAL_KEYS_FILE}` was empty or contained no valid data after decryption.")
             _loaded_local_keys = {}

    except FileNotFoundError:
        status_messages.append(f"Info: Local key file not found at {LOCAL_KEYS_FILE}. No keys loaded.")
        _loaded_local_keys = {}
    except ValueError as e:
        status_messages.append(f"Error loading local keys: {e}. Password may be incorrect.")
        _loaded_local_keys = {}
    except IOError as e:
         status_messages.append(f"IO Error loading local keys: {e}")
         _loaded_local_keys = {}
    except Exception as e:
        logger.exception(f"Unexpected error loading local keys from {LOCAL_KEYS_FILE}:")
        status_messages.append(f"Unexpected error loading local keys: {str(e)}")
        _loaded_local_keys = {}

    return status_messages


# --- Helper Function: Get API Token ---
def _get_api_token(ui_token_from_textbox=None):
    # Retrieves token from in-memory keys, env vars, or textbox
    in_memory_token = _loaded_local_keys.get('HF_TOKEN')
    if in_memory_token:
        logger.info("Using HF_TOKEN from in-memory loaded keys.")
        return in_memory_token, None

    env_token = os.getenv('HF_TOKEN')
    if env_token:
        logger.info("Using HF_TOKEN from environment variable.")
        return env_token, None

    if ui_token_from_textbox:
        logger.info("Using HF_TOKEN from UI textbox.")
        return ui_token_from_textbox, None

    return None, "Error: Hugging Face API token not provided."


# --- Main Function: Process KeyLock Image and Store Locally ---
def process_keylock_image_and_store_locally(image_pil_object: Image.Image, password: str):
    # Decodes image, saves data encrypted locally, and loads into memory
    status_messages_display = []

    if not KEYLOCK_DECODE_AVAILABLE or not decode_data_from_image_pil or \
       not save_decoded_data_locally_encrypted or not load_decoded_data_locally_encrypted:
        status_messages_display.append("Error: KeyLock-Decode library not available.")
        global _loaded_local_keys
        _loaded_local_keys = {}
        return "\n".join(status_messages_display)

    if image_pil_object is None:
        status_messages_display.append("Error: No KeyLock Wallet image provided.")
        return "\n".join(status_messages_display)

    if not password:
        status_messages_display.append("Error: Password cannot be empty.")
        return "\n".join(status_messages_display)

    decoded_data = None
    try:
        logger.info(f"Attempting to decode from KeyLock Wallet image...")
        decoded_data, status_msgs_from_lib = decode_data_from_image_pil(image_pil_object, password)
        status_messages_display.extend(status_msgs_from_lib)

        if decoded_data:
            status_messages_display.append("\n**Decoded Data Summary (sensitive values masked):**")
            for key, value in decoded_data.items():
                display_value = '********' if any(k_word in key.upper() for k_word in ['TOKEN', 'KEY', 'SECRET', 'PASS']) else value
                status_messages_display.append(f"- {key}: {display_value}")

            try:
                save_decoded_data_locally_encrypted(decoded_data, password, LOCAL_KEYS_FILE)
                status_messages_display.append(f"\n**SUCCESS: Decoded data saved to encrypted local file:** `{LOCAL_KEYS_FILE}`")

                load_status = _load_keys_into_memory(password)
                status_messages_display.extend(load_status)
                if not _loaded_local_keys:
                    status_messages_display.append("Warning: No keys were loaded into memory after saving.")


            except Exception as e:
                 status_messages_display.append(f"Error saving or loading data locally: {str(e)}")
                 logger.error(f"Error during local save/load after decoding: {e}")
                 global _loaded_local_keys
                 _loaded_local_keys = {}

        elif not status_msgs_from_lib:
             status_messages_display.append("Info: Decoding process completed, but no data was extracted.")


    except ValueError as e:
        status_messages_display.append(f"**Decoding Error:** {e}. Please check password and image.")
        global _loaded_local_keys
        _loaded_local_keys = {}
    except Exception as e:
        status_messages_display.append(f"**Unexpected error during processing:** {str(e)}")
        logger.exception("Unexpected error during keylock image processing:")
        global _loaded_local_keys
        _loaded_local_keys = {}


    return "\n".join(status_messages_display)

# --- Function: Load Keys from Local File ---
def load_keys_from_local_file(password: str) -> str:
    # Triggers loading keys from the encrypted local file into memory
    status_messages = _load_keys_into_memory(password)
    if not status_messages:
         if _loaded_local_keys:
             return f"Keys successfully loaded from `{LOCAL_KEYS_FILE}`."
         else:
             return f"Attempted to load keys from `{LOCAL_KEYS_FILE}`, but no keys were loaded. Check password."

    return "\n".join(status_messages)


# --- Markdown Processing Functions ---
def process_commented_markdown(commented_input):
    # Processes markdown by stripping comments if a specific marker is present
    lines = commented_input.strip().split("\n")
    if any( "# # Space:" in line.strip() for line in lines):
        cleaned_lines = [line[2:] if line.startswith("# ") else line for line in lines]
        return cleaned_lines
    return lines

def parse_markdown(markdown_input):
    # Parses markdown input to extract space info and file content
    space_info = {"repo_name_md": "", "owner_md": "", "files": []}
    current_file_path = None; current_file_content_lines = []
    in_file_definition = False; in_code_block = False
    lines = process_commented_markdown(markdown_input)

    for line_content_orig in lines:
        line_content_stripped = line_content_orig.strip()
        if line_content_stripped.startswith("### File:"):
            if current_file_path and in_file_definition:
                space_info["files"].append({"path": current_file_path, "content": "\n".join(current_file_content_lines)})
            current_file_path = line_content_stripped.replace("### File:", "").strip()
            current_file_content_lines = []
            in_file_definition = True; in_code_block = False
            continue
        if not in_file_definition:
            if line_content_stripped.startswith("# Space:"):
                full_space_name_md = line_content_stripped.replace("# Space:", "").strip()
                if "/" in full_space_name_md: space_info["owner_md"], space_info["repo_name_md"] = full_space_name_md.split("/", 1)
                else: space_info["repo_name_md"] = full_space_name_md
            continue
        if line_content_stripped.startswith("```"):
            in_code_block = not in_code_block
            continue
        current_file_content_lines.append(line_content_orig)
    if current_file_path and in_file_definition:
        space_info["files"].append({"path": current_file_path, "content": "\n".join(current_file_content_lines)})
    space_info["files"] = [f for f in space_info["files"] if f.get("path")]
    return space_info

# --- Helper Function: Determine Repository ID ---
def _determine_repo_id(ui_api_token_from_textbox, space_name_ui, owner_ui):
    # Determines the full repo ID (owner/space_name)
    if not space_name_ui: return None, "Error: Space Name cannot be empty."
    if "/" in space_name_ui: return None, "Error: Space Name should not contain '/'. Use Owner field."
    final_owner = owner_ui; error_message = None
    if not final_owner:
        resolved_api_token, token_err = _get_api_token(ui_api_token_from_textbox)
        if token_err: return None, token_err
        if not resolved_api_token: return None, "Error: API token required for auto owner determination."
        try:
            user_info = whoami(token=resolved_api_token)
            if user_info and 'name' in user_info: final_owner = user_info['name']
            else: error_message = "Error: Could not retrieve username. Check token/permissions."
        except Exception as e: error_message = f"Error retrieving username: {str(e)}."
        if error_message: return None, error_message
    if not final_owner: return None, "Error: Owner could not be determined."
    return f"{final_owner}/{space_name_ui}", None

# --- Function: Fetch File Content from Hub ---
def get_space_file_content(ui_api_token_from_textbox, space_name_ui, owner_ui, file_path_in_repo):
    # Fetches content of a specific file from a Hugging Face Space
    repo_id_for_error_logging = f"{owner_ui}/{space_name_ui}" if owner_ui else space_name_ui
    try:
        resolved_api_token, token_err = _get_api_token(ui_api_token_from_textbox)
        if token_err:
            return None, token_err

        repo_id, err = _determine_repo_id(ui_api_token_from_textbox, space_name_ui, owner_ui)
        if err:
            return None, err
        repo_id_for_error_logging = repo_id

        if not file_path_in_repo:
            return None, "Error: File path cannot be empty."

        logger.info(f"Attempting to download file: {file_path_in_repo} from Space: {repo_id}")
        downloaded_file_path = hf_hub_download(
            repo_id=repo_id,
            filename=file_path_in_repo,
            repo_type="space",
            token=resolved_api_token,
        )

        content = Path(downloaded_file_path).read_text(encoding="utf-8")
        logger.info(f"Successfully downloaded and read file: {file_path_in_repo}")
        return content, None

    except Exception as e:
        if "404" in str(e) or "not found" in str(e).lower():
             logger.warning(f"File not found {file_path_in_repo} in {repo_id_for_error_logging}: {e}")
             return None, f"Error: File '{file_path_in_repo}' not found in Space '{repo_id_for_error_logging}'."
        logger.exception(f"Error fetching file content for {file_path_in_repo}:")
        return None, f"Error fetching file content: {str(e)}"

# --- Function: List Space Files ---
def list_space_files_for_browsing(ui_api_token_from_textbox, space_name_ui, owner_ui):
    # Lists files in a Hugging Face Space
    repo_id_for_error_logging = f"{owner_ui}/{space_name_ui}" if owner_ui else space_name_ui
    try:
        resolved_api_token, token_err = _get_api_token(ui_api_token_from_textbox)
        if token_err: return None, token_err

        repo_id, err = _determine_repo_id(ui_api_token_from_textbox, space_name_ui, owner_ui)
        if err: return None, err
        repo_id_for_error_logging = repo_id

        files = list_repo_files(repo_id=repo_id, token=resolved_api_token, repo_type="space")
        if not files:
            return [], f"No files found in Space `{repo_id}`."
        return files, None
    except Exception as e:
        logger.exception(f"Error listing files for {repo_id_for_error_logging}:")
        return None, f"Error listing files for `{repo_id_for_error_logging}`: {str(e)}"

# --- Core Function: Create/Update Space ---
def create_space(ui_api_token_from_textbox, space_name_ui, owner_ui, sdk_ui, markdown_input):
    # Creates or updates a Hugging Face Space with files from markdown
    repo_id_for_error_logging = f"{owner_ui}/{space_name_ui}" if owner_ui else space_name_ui
    try:
        resolved_api_token, token_err = _get_api_token(ui_api_token_from_textbox)
        if token_err: return token_err
        repo_id, err = _determine_repo_id(ui_api_token_from_textbox, space_name_ui, owner_ui)
        if err: return err
        repo_id_for_error_logging = repo_id
        space_info = parse_markdown(markdown_input)
        if not space_info["files"]: return "Error: No files found in markdown."
        with tempfile.TemporaryDirectory() as temp_dir:
            repo_staging_path = Path(temp_dir) / "repo_staging_content"
            repo_staging_path.mkdir(exist_ok=True)
            for file_info in space_info["files"]:
                if not file_info.get("path"): continue
                file_path_abs = repo_staging_path / file_info["path"]
                file_path_abs.parent.mkdir(parents=True, exist_ok=True)
                with open(file_path_abs, "w", encoding="utf-8") as f: f.write(file_info["content"])
            try:
                create_repo(repo_id=repo_id, token=resolved_api_token, repo_type="space", space_sdk=sdk_ui, private=False)
            except Exception as e:
                err_str = str(e).lower()
                if not ("already exists" in err_str or "you already created this repo" in err_str or "exists" in err_str):
                    return f"Error creating Space '{repo_id}': {str(e)}"
            upload_folder(repo_id=repo_id, folder_path=str(repo_staging_path), path_in_repo=".", token=resolved_api_token, repo_type="space", commit_message=f"Initial Space setup of {repo_id} via Builder")
            return f"Successfully created/updated Space: [{repo_id}](https://huggingface.co/spaces/{repo_id})"
    except Exception as e:
        logger.exception(f"Error in create_space for {repo_id_for_error_logging}:")
        return f"Error during Space creation/update: {str(e)}"

# --- Core Function: Update Space File ---
def update_space_file(ui_api_token_from_textbox, space_name_ui, owner_ui, file_path_in_repo, file_content, commit_message_ui):
    # Updates a specific file in a Hugging Face Space
    repo_id_for_error_logging = f"{owner_ui}/{space_name_ui}" if owner_ui else space_name_ui
    try:
        resolved_api_token, token_err = _get_api_token(ui_api_token_from_textbox)
        if token_err: return token_err
        repo_id, err = _determine_repo_id(ui_api_token_from_textbox, space_name_ui, owner_ui)
        if err: return err
        repo_id_for_error_logging = repo_id
        if not file_path_in_repo: return "Error: File Path to update cannot be empty."
        file_path_in_repo = file_path_in_repo.lstrip('/').replace(os.sep, '/')
        commit_message_ui = commit_message_ui or f"Update {file_path_in_repo} via Space Builder"
        with tempfile.TemporaryDirectory() as temp_dir_for_update:
            repo_local_clone_path = Path(temp_dir_for_update) / "update_clone"
            cloned_repo = Repository(local_dir=str(repo_local_clone_path), clone_from=f"https://huggingface.co/spaces/{repo_id}", repo_type="space", use_auth_token=resolved_api_token, git_user="Space Builder Bot", git_email="[email protected]")
            full_local_file_path = Path(cloned_repo.local_dir) / file_path_in_repo
            full_local_file_path.parent.mkdir(parents=True, exist_ok=True)
            with open(full_local_file_path, "w", encoding="utf-8") as f: f.write(file_content)
            cloned_repo.push_to_hub(commit_message=commit_message_ui)
            return f"Successfully updated `{file_path_in_repo}` in Space [{repo_id}](https://huggingface.co/spaces/{repo_id})"
    except Exception as e:
        logger.exception(f"Error in update_space_file for {repo_id_for_error_logging}, file {file_path_in_repo}:")
        return f"Error updating file for `{repo_id_for_error_logging}`: {str(e)}"