File size: 39,328 Bytes
b80b022
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
db7e1e2
3103a1e
b80b022
 
 
 
 
 
 
 
 
db7e1e2
3103a1e
db7e1e2
 
3103a1e
db7e1e2
 
b80b022
 
3103a1e
b80b022
 
 
 
 
 
 
db7e1e2
b80b022
3103a1e
b80b022
 
 
 
 
db7e1e2
b80b022
 
db7e1e2
b80b022
 
db7e1e2
b80b022
 
 
db7e1e2
 
 
b80b022
 
 
 
 
 
 
3103a1e
b80b022
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
db7e1e2
b80b022
db7e1e2
b80b022
db7e1e2
 
3103a1e
db7e1e2
 
b80b022
db7e1e2
 
b80b022
3103a1e
b80b022
db7e1e2
 
3103a1e
 
 
b80b022
 
 
3103a1e
db7e1e2
3103a1e
b80b022
 
 
 
 
 
 
 
 
3103a1e
b80b022
 
db7e1e2
 
 
 
 
 
 
 
 
 
 
 
3103a1e
db7e1e2
b80b022
 
 
 
 
db7e1e2
3103a1e
b80b022
 
 
 
 
db7e1e2
 
 
b80b022
 
db7e1e2
 
 
3103a1e
b80b022
 
 
db7e1e2
 
b80b022
db7e1e2
b80b022
 
 
 
 
 
 
3103a1e
b80b022
db7e1e2
 
b80b022
 
db7e1e2
b80b022
3103a1e
db7e1e2
3103a1e
b80b022
 
3103a1e
b80b022
 
 
 
db7e1e2
 
 
 
b80b022
3103a1e
b80b022
 
 
 
 
 
 
 
 
 
 
3103a1e
b80b022
3103a1e
b80b022
 
 
3103a1e
 
 
b80b022
3103a1e
b80b022
db7e1e2
b80b022
 
 
 
 
 
 
 
 
3103a1e
b80b022
 
 
 
3103a1e
b80b022
db7e1e2
 
b80b022
 
 
 
db7e1e2
b80b022
 
 
 
 
db7e1e2
b80b022
 
 
3103a1e
b80b022
3103a1e
db7e1e2
b80b022
 
 
 
 
 
 
 
3103a1e
 
 
b80b022
 
db7e1e2
b80b022
 
db7e1e2
3103a1e
b80b022
db7e1e2
b80b022
 
 
 
 
 
 
 
 
 
 
 
 
 
db7e1e2
b80b022
 
db7e1e2
 
 
 
b80b022
 
3103a1e
db7e1e2
3103a1e
 
db7e1e2
b80b022
db7e1e2
 
 
 
 
 
3103a1e
db7e1e2
3103a1e
 
 
db7e1e2
 
 
3103a1e
db7e1e2
3103a1e
 
b80b022
db7e1e2
 
 
b80b022
 
db7e1e2
3103a1e
 
 
 
 
 
 
db7e1e2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3103a1e
db7e1e2
 
 
 
 
3103a1e
db7e1e2
 
 
 
 
 
 
 
 
 
3103a1e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
b80b022
db7e1e2
 
 
b80b022
 
db7e1e2
 
 
 
 
 
 
 
 
 
 
3103a1e
db7e1e2
 
 
 
 
 
 
3103a1e
db7e1e2
 
 
 
 
 
3103a1e
 
db7e1e2
 
 
 
 
 
 
 
 
3103a1e
db7e1e2
 
 
 
 
3103a1e
db7e1e2
3103a1e
db7e1e2
 
 
 
b80b022
db7e1e2
 
 
 
 
3103a1e
b80b022
db7e1e2
 
 
b80b022
db7e1e2
 
 
 
 
 
 
3103a1e
db7e1e2
3103a1e
b80b022
db7e1e2
3103a1e
b80b022
db7e1e2
b80b022
db7e1e2
 
 
 
 
 
3103a1e
db7e1e2
 
 
 
 
 
3103a1e
 
db7e1e2
b80b022
 
db7e1e2
 
3103a1e
b80b022
3103a1e
 
b80b022
db7e1e2
3103a1e
b80b022
 
 
 
db7e1e2
b80b022
 
3103a1e
 
 
 
b80b022
3103a1e
 
db7e1e2
b80b022
 
 
db7e1e2
b80b022
db7e1e2
 
 
 
b80b022
 
 
 
 
db7e1e2
3103a1e
b80b022
db7e1e2
 
b80b022
db7e1e2
b80b022
 
 
db7e1e2
3103a1e
b80b022
db7e1e2
3103a1e
b80b022
 
 
 
 
3103a1e
b80b022
3103a1e
 
b80b022
 
3103a1e
b80b022
 
 
 
db7e1e2
b80b022
 
 
3103a1e
b80b022
3103a1e
b80b022
 
 
db7e1e2
 
 
b80b022
 
 
3103a1e
 
 
db7e1e2
b80b022
 
 
 
 
 
 
 
 
3103a1e
 
b80b022
 
 
db7e1e2
b80b022
 
3103a1e
b80b022
 
 
 
 
3103a1e
b80b022
3103a1e
 
b80b022
 
 
3103a1e
db7e1e2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3103a1e
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
import os
import re
import tempfile
import shutil
import logging
from pathlib import Path

from huggingface_hub import (
    create_repo,
    upload_folder,
    list_repo_files,
    whoami,
    hf_hub_download,
    delete_file as hf_delete_file,
    HfApi
)
from huggingface_hub.hf_api import CommitOperationDelete, CommitOperationAdd, CommitOperation
from huggingface_hub.utils import HfHubHTTPError

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)

def _get_api_token(ui_token_from_textbox=None):
    env_token = os.getenv('HF_TOKEN')
    if env_token:
        logger.debug("Using HF_TOKEN from environment variable.")
        return env_token, None
    if ui_token_from_textbox:
        logger.debug("Using HF_TOKEN from UI textbox.")
        return ui_token_from_textbox.strip(), None
    logger.warning("Hugging Face API token not provided in UI or HF_TOKEN env var.")
    return None, "Error: Hugging Face API token not provided in UI or HF_TOKEN env var."

def _determine_repo_id(ui_api_token_from_textbox, owner_ui, space_name_ui):
    if not space_name_ui: return None, "Error: Space Name cannot be empty."
    if "/" in space_name_ui: return None, "Error: Space Name should not contain '/'. Use Owner field for the owner part."

    final_owner = owner_ui
    error_message = None

    if not final_owner:
        logger.info("Owner not specified, attempting to auto-detect from token.")
        resolved_api_token, token_err = _get_api_token(ui_api_token_from_textbox)
        if token_err: return None, f"Error auto-detecting owner: {token_err}"
        if not resolved_api_token: return None, "Error: API token required for auto owner determination if Owner field is empty."
        try:
            user_info = whoami(token=resolved_api_token)
            if user_info and 'name' in user_info:
                final_owner = user_info['name']
                logger.info(f"Auto-detected owner: {final_owner}")
            else:
                error_message = "Error: Could not retrieve username from token. Check token permissions or specify Owner."
                logger.error(error_message)
        except Exception as e:
            error_message = f"Error retrieving username from token: {str(e)}. Specify Owner or check token."
            logger.exception("Error retrieving username from token:")
        if error_message: return None, error_message

    if not final_owner: return None, "Error: Owner could not be determined. Please specify it in the Owner field."
    repo_id = f"{final_owner}/{space_name_ui}"
    logger.info(f"Determined repo_id: {repo_id}")
    return repo_id, None

def parse_markdown(markdown_input):
    space_info = {"repo_name_md": "", "owner_md": "", "files": []}
    current_file_path = None
    current_file_content_lines = []
    in_file_definition = False
    in_code_block = False
    file_parsing_errors = []

    lines = markdown_input.strip().split("\n")

    cleaned_lines = []
    for line_content_orig in lines:
         if line_content_orig.strip().startswith("# "):
             if line_content_orig.strip().startswith("# ### File:") or \
                line_content_orig.strip().startswith("# ## File Structure") or \
                line_content_orig.strip().startswith("# # Space:"):
                 cleaned_lines.append(line_content_orig.strip()[2:])
             else:
                 cleaned_lines.append(line_content_orig)
         else:
              cleaned_lines.append(line_content_orig)

    lines = cleaned_lines


    for i, line_content_orig in enumerate(lines):
        line_content_stripped = line_content_orig.strip()
        line_num = i + 1

        file_match = re.match(r"### File:\s*(?P<filename_line>[^\n]+)", line_content_stripped)
        if file_match:
            if current_file_path is not None and in_file_definition:
                content_to_save = "\n".join(current_file_content_lines).strip()
                space_info["files"].append({"path": current_file_path, "content": content_to_save})

            filename_line = file_match.group("filename_line").strip()
            current_file_path = filename_line
            current_file_path = re.split(r'\s*\(', current_file_path, 1)[0].strip()
            current_file_path = current_file_path.strip('`\'"').strip()

            if not current_file_path:
                file_parsing_errors.append(f"Line {line_num}: Found '### File:' but filename is empty or invalid.")
                current_file_path = None
                in_file_definition = False
                continue

            current_file_content_lines = []
            in_file_definition = True
            in_code_block = False
            logger.debug(f"Parsed file header: {current_file_path}")
            continue

        if not in_file_definition:
            if line_content_stripped.startswith("# Space:"):
                full_space_name_md = line_content_stripped.replace("# Space:", "").strip()
                if "/" in full_space_name_md:
                    parts = full_space_name_md.split("/", 1)
                    if len(parts) == 2:
                         space_info["owner_md"], space_info["repo_name_md"] = parts[0].strip(), parts[1].strip()
                    else:
                         space_info["repo_name_md"] = full_space_name_md
                else:
                    space_info["repo_name_md"] = full_space_name_md
                logger.debug(f"Parsed space header: {space_info['owner_md']}/{space_info['repo_name_md']}")
                continue
            if line_content_stripped.startswith("## File Structure"):
                 structure_block_start = i + 1
                 while structure_block_start < len(lines) and not lines[structure_block_start].strip().startswith("```"):
                      structure_block_start += 1
                 if structure_block_start < len(lines) and lines[structure_block_start].strip().startswith("```"):
                      structure_block_end = structure_block_start + 1
                      while structure_block_end < len(lines) and not lines[structure_block_end].strip().startswith("```"):
                           structure_block_end += 1
                      if structure_block_end < len(lines) and lines[structure_block_end].strip().startswith("```"):
                           logger.debug(f"Skipping File Structure block from line {i+1} to {structure_block_end+1}")
                           i = structure_block_end
                           continue
            continue

        if in_file_definition:
            if line_content_stripped.startswith("```"):
                in_code_block = not in_code_block
                logger.debug(f"Toggled code block to {in_code_block} at line {line_num}")
                continue

            if in_code_block:
                current_file_content_lines.append(line_content_orig)
            elif line_content_stripped.startswith("[Binary file") or line_content_stripped.startswith("[Error loading content:") or line_content_stripped.startswith("[Binary or Skipped file]"):
                 current_file_content_lines.append(line_content_orig)
                 logger.debug(f"Parsed binary/error marker for {current_file_path} at line {line_num}")
            else:
                 pass

    if current_file_path is not None and in_file_definition:
        content_to_save = "\n".join(current_file_content_lines).strip()
        space_info["files"].append({"path": current_file_path, "content": content_to_save})

    space_info["files"] = [f for f in space_info["files"] if f.get("path")]
    space_info["owner_md"] = space_info["owner_md"].strip()
    space_info["repo_name_md"] = space_info["repo_name_md"].strip()

    if file_parsing_errors:
         logger.warning(f"Markdown parsing encountered errors: {file_parsing_errors}")

    logger.info(f"Parsed markdown. Found {len(space_info['files'])} files.")
    return space_info

def get_space_repository_info(ui_api_token_from_textbox, space_name_ui, owner_ui):
    repo_id_for_error_logging = f"{owner_ui}/{space_name_ui}" if owner_ui else space_name_ui
    sdk = None
    files = []
    error = None
    repo_id = None

    logger.info(f"Attempting to get repo info for {repo_id_for_error_logging}")

    try:
        resolved_api_token, token_err = _get_api_token(ui_api_token_from_textbox)
        if token_err: return None, [], token_err

        repo_id, err_repo_id = _determine_repo_id(resolved_api_token, owner_ui, space_name_ui)
        if err_repo_id: return None, [], err_repo_id
        repo_id_for_error_logging = repo_id

        api = HfApi(token=resolved_api_token)
        repo_info_obj = api.repo_info(repo_id=repo_id, repo_type="space", timeout=20)
        sdk = repo_info_obj.sdk
        files = [sibling.rfilename for sibling in repo_info_obj.siblings if sibling.rfilename]

        if not files and repo_info_obj.siblings:
            logger.warning(f"Repo {repo_id} has siblings but no rfilenames extracted. Total siblings: {len(repo_info_obj.siblings)}")

        logger.info(f"Successfully got repo info for {repo_id}. SDK: {sdk}, Files found: {len(files)}")


    except HfHubHTTPError as e_http:
        logger.error(f"HTTP error getting repo info for {repo_id_for_error_logging or 'unknown repo'}: {e_http}")
        error_message = str(e_http)
        status_code = e_http.response.status_code if e_http.response is not None else None

        if status_code == 404:
            error = f"Space '{repo_id_for_error_logging or 'unknown repo'}' not found (404)."
        elif status_code in (401,403):
            error = f"Access denied for '{repo_id_for_error_logging or 'unknown repo'}' ({status_code}). Check token permissions."
        else:
            error = f"HTTP Error {status_code or 'unknown'} for '{repo_id_for_error_logging or 'unknown repo'}': {error_message}"

    except Exception as e:
        logger.warning(f"Could not get full repo_info for {repo_id_for_error_logging or 'unknown repo'}, attempting list_repo_files fallback: {e}")
        error = f"Error retrieving Space info for `{repo_id_for_error_logging or 'unknown repo'}`: {str(e)}. Attempting file list fallback."

        try:
            resolved_api_token_fb, token_err_fb = _get_api_token(ui_api_token_from_textbox)
            if token_err_fb: return None, [], f"{error}\nAPI Token Error during fallback: {token_err_fb}"
            repo_id_fb, err_repo_id_fb = _determine_repo_id(resolved_api_token_fb, owner_ui, space_name_ui)
            if err_repo_id_fb: return None, [], f"{error}\nRepo ID Error during fallback: {err_repo_id_fb}"

            files = list_repo_files(repo_id=repo_id_fb, token=resolved_api_token_fb, repo_type="space", timeout=20)
            error = f"Warning: Could not fetch full Space info (SDK etc.) for `{repo_id_for_error_logging or 'unknown repo'}`: {str(e)}. File list loaded via fallback."
            logger.info(f"Fallback list_repo_files successful for {repo_id_fb}. Files found: {len(files)}")

        except HfHubHTTPError as e2_http:
            logger.error(f"HTTP error during fallback list_repo_files for {repo_id_for_error_logging or 'unknown repo'}: {e2_http}")
            error_message_fb = str(e2_http)
            status_code_fb = e2_http.response.status_code if e2_http.response is not None else None
            if status_code_fb == 404:
                error = f"Space '{repo_id_for_error_logging or 'unknown repo'}' not found during fallback (404)."
            else:
                error = f"HTTP Error {status_code_fb or 'unknown'} for '{repo_id_for_error_logging or 'unknown repo'}' during fallback: {error_message_fb}"
            files = []

        except Exception as e2:
            logger.exception(f"Error listing files for {repo_id_for_error_logging or 'unknown repo'} during fallback: {e2}")
            error = f"{error}\nError listing files during fallback for `{repo_id_for_error_logging or 'unknown repo'}`: {str(e2)}"
            files = []

    if not files and not error and (repo_id_for_error_logging is not None):
         error = f"No files found in Space `{repo_id_for_error_logging or 'unknown repo'}`."

    return sdk, files, error

def list_space_files_for_browsing(ui_api_token_from_textbox, space_name_ui, owner_ui):
    files, err = get_space_repository_info(ui_api_token_from_textbox, space_name_ui, owner_ui)[1:]
    return files, err

def get_space_file_content(ui_api_token_from_textbox, space_name_ui, owner_ui, file_path_in_repo):
    repo_id_for_error_logging = f"{owner_ui}/{space_name_ui}" if owner_ui else space_name_ui
    repo_id = None
    logger.info(f"Attempting to get content for file '{file_path_in_repo}' from {repo_id_for_error_logging}")
    try:
        resolved_api_token, token_err = _get_api_token(ui_api_token_from_textbox)
        if token_err: return None, token_err
        repo_id, err_repo_id = _determine_repo_id(resolved_api_token, owner_ui, space_name_ui)
        if err_repo_id: return None, err_repo_id
        repo_id_for_error_logging = repo_id

        if not file_path_in_repo: return None, "Error: File path cannot be empty."
        file_path_in_repo = file_path_in_repo.replace("\\", "/")

        downloaded_file_path = hf_hub_download(
            repo_id=repo_id,
            filename=file_path_in_repo,
            repo_type="space",
            token=resolved_api_token,
            local_dir_use_symlinks=False,
            cache_dir=None,
            timeout=20
        )
        content = Path(downloaded_file_path).read_text(encoding="utf-8")
        logger.info(f"Successfully downloaded and read content for '{file_path_in_repo}'.")
        return content, None
    except FileNotFoundError:
         logger.error(f"FileNotFoundError for '{file_path_in_repo}' in {repo_id_for_error_logging or 'unknown'}")
         return None, f"Error: File '{file_path_in_repo}' not found in Space '{repo_id_for_error_logging or 'unknown repo'}' (404)."
    except UnicodeDecodeError:
        logger.warning(f"UnicodeDecodeError for '{file_path_in_repo}'. Likely binary.")
        return None, f"Error: File '{file_path_in_repo}' is not valid UTF-8 text. Cannot display."
    except HfHubHTTPError as e_http:
        logger.error(f"HTTP error fetching file {file_path_in_repo} from {repo_id_for_error_logging or 'unknown repo'}: {e_http}")
        error_message = str(e_http)
        status_code = e_http.response.status_code if e_http.response is not None else None
        if status_code == 404:
            return None, f"Error: File '{file_path_in_repo}' not found in Space '{repo_id_for_error_logging or 'unknown repo'}' (404)."
        if status_code in (401, 403):
             return None, f"Error: Access denied or authentication required for '{repo_id_for_error_logging or 'unknown repo'}' ({status_code}). Check token permissions."
        return None, f"HTTP Error {status_code or 'unknown'} fetching file '{file_path_in_repo}': {error_message}"
    except Exception as e:
        logger.exception(f"Error fetching file content for {file_path_in_repo} from {repo_id_for_error_logging or 'unknown repo'}:")
        return None, f"Error fetching file content: {str(e)}"

def apply_staged_changes(ui_api_token_from_textbox, owner_ui, space_name_ui, changeset):
    repo_id_for_error_logging = f"{owner_ui}/{space_name_ui}" if owner_ui else space_name_ui
    repo_id = None
    status_messages = []

    logger.info(f"Attempting to apply {len(changeset)} staged changes to {repo_id_for_error_logging}")

    try:
        resolved_api_token, token_err = _get_api_token(ui_api_token_from_textbox)
        if token_err: return f"API Token Error: {token_err}"

        repo_id, err_repo_id = _determine_repo_id(resolved_api_token, owner_ui, space_name_ui)
        if err_repo_id: return f"Repo ID Error: {err_repo_id}"
        repo_id_for_error_logging = repo_id

        api = HfApi(token=resolved_api_token)

        create_space_op = next((c for c in changeset if c['type'] == 'CREATE_SPACE'), None)
        if create_space_op:
            try:
                 api.create_repo(repo_id=repo_id, repo_type="space", space_sdk=create_space_op.get('sdk', 'gradio'), private=create_space_op.get('private', False), exist_ok=True)
                 status_messages.append(f"CREATE_SPACE: Successfully created or ensured space [{repo_id}](https://huggingface.co/spaces/{repo_id}) exists.")
                 logger.info(f"Successfully created or ensured space {repo_id} exists.")
            except HfHubHTTPError as e_http:
                 status_messages.append(f"CREATE_SPACE HTTP Error ({e_http.response.status_code if e_http.response else 'N/A'}): {e_http.response.text if e_http.response else str(e_http)}. Check logs.")
                 logger.error(f"HTTP error creating space {repo_id}: {e_http}")
            except Exception as e:
                 status_messages.append(f"CREATE_SPACE Error: {e}")
                 logger.error(f"Error creating space {repo_id}: {e}")

        temp_dir = None
        paths_to_upload = {}
        delete_operations = []

        try:
            temp_dir = tempfile.TemporaryDirectory()
            repo_staging_path = Path(temp_dir.name) / "repo_staging_content"
            repo_staging_path.mkdir(exist_ok=True)

            gitattributes_path_local = repo_staging_path / ".gitattributes"
            try:
                with open(gitattributes_path_local, "w", encoding="utf-8") as f:
                    f.write("* text=auto eol=lf\n")
                paths_to_upload[str(gitattributes_path_local)] = ".gitattributes"
            except Exception as e:
                status_messages.append(f"Warning: Could not stage .gitattributes file: {e}")
                logger.warning(f"Could not stage .gitattributes: {e}")


            for change in changeset:
                if change['type'] == 'UPDATE_FILE' or change['type'] == 'CREATE_FILE':
                    file_path_in_repo = change['path'].lstrip('/').replace(os.sep, '/')
                    if not file_path_in_repo:
                         status_messages.append(f"Skipping {change['type']} operation: empty path.")
                         continue

                    content_to_write = change.get('content', '')
                    if content_to_write.startswith("[Binary file") or content_to_write.startswith("[Error loading content:") or content_to_write.startswith("[Binary or Skipped file]"):
                         status_messages.append(f"Skipping {change['type']} for '{file_path_in_repo}': Content is a binary/error placeholder.")
                         logger.warning(f"Skipping {change['type']} operation for '{file_path_in_repo}': Content is binary/error placeholder.")
                         continue

                    file_path_local = repo_staging_path / file_path_in_repo
                    file_path_local.parent.mkdir(parents=True, exist_ok=True)

                    try:
                        with open(file_path_local, "w", encoding="utf-8") as f:
                             f.write(content_to_write)
                        paths_to_upload[str(file_path_local)] = file_path_in_repo
                        logger.debug(f"Staged file for {change['type']}: {file_path_in_repo}")
                    except Exception as file_write_error:
                        status_messages.append(f"Error staging file {file_path_in_repo} for {change['type']}: {file_write_error}")
                        logger.error(f"Error writing file {file_path_in_repo} during staging for {change['type']}: {file_write_error}")


                elif change['type'] == 'DELETE_FILE':
                    file_path_in_repo = change['path'].lstrip('/').replace(os.sep, '/')
                    if not file_path_in_repo:
                        status_messages.append(f"Skipping DELETE_FILE operation: empty path.")
                        continue
                    delete_operations.append(CommitOperationDelete(path_in_repo=file_path_in_repo))
                    logger.debug(f"Added DELETE_FILE operation for: {file_path_in_repo}")


            if delete_operations:
                 try:
                      commit_message_delete = f"AI Space Builder: Deleted {len(delete_operations)} files."
                      logger.info(f"Performing delete commit for {repo_id_for_error_logging}: {commit_message_delete}")
                      api.create_commit(
                         repo_id=repo_id,
                         repo_type="space",
                         operations=delete_operations,
                         commit_message=commit_message_delete
                      )
                      status_messages.append(f"File Deletions: Successfully committed {len(delete_operations)} deletions.")
                      logger.info("Delete commit successful.")
                 except HfHubHTTPError as e_http:
                      status_messages.append(f"File Deletion HTTP Error ({e_http.response.status_code if e_http.response else 'N/A'}): {e_http.response.text if e_http.response else str(e_http)}. Check logs.")
                      logger.error(f"HTTP error during delete commit for {repo_id}: {e_http}")
                 except Exception as e_delete_commit:
                      status_messages.append(f"File Deletion Error: {str(e_delete_commit)}. Check logs.")
                      logger.exception(f"Error during delete commit for {repo_id}:")


            if paths_to_upload:
                 try:
                      commit_message_upload = f"AI Space Builder: Updated Space content for {repo_id}"
                      logger.info(f"Uploading staged files from {str(repo_staging_path)} to {repo_id}...")
                      upload_folder(
                         repo_id=repo_id,
                         folder_path=str(repo_staging_path),
                         path_in_repo=".",
                         token=resolved_api_token,
                         repo_type="space",
                         commit_message=commit_message_upload,
                         allow_patterns=["*"],
                      )
                      status_messages.append(f"File Uploads/Updates: Successfully uploaded/updated {len(paths_to_upload)} files.")
                      logger.info("Upload/Update commit successful.")
                 except HfHubHTTPError as e_http:
                      status_messages.append(f"File Upload/Update HTTP Error ({e_http.response.status_code if e_http.response else 'N/A'}): {e_http.response.text if e_http.response else str(e_http)}. Check logs.")
                      logger.error(f"HTTP error during upload_folder for {repo_id}: {e_http}")
                 except Exception as e_upload:
                      status_messages.append(f"File Upload/Update Error: {str(e_upload)}. Check logs.")
                      logger.exception(f"Error during upload_folder for {repo_id}:")

            else:
                status_messages.append("No file changes (create/update/delete) to commit.")
                logger.info("No file changes to commit.")


        finally:
            if temp_dir:
                try:
                    temp_dir.cleanup()
                    logger.info("Cleaned up temporary staging directory.")
                except Exception as e:
                    logger.error(f"Error cleaning up temp dir: {e}")

        for change in changeset:
            if change['type'] == 'SET_PRIVACY':
                 try:
                      target_repo_id = change.get('repo_id', repo_id)
                      if not target_repo_id:
                           status_messages.append("SET_PRIVACY Error: Target repo_id not specified.")
                           continue
                      api.update_repo_visibility(repo_id=target_repo_id, private=change['private'], repo_type='space')
                      status_messages.append(f"SET_PRIVACY: Successfully set `{target_repo_id}` to `private={change['private']}`.")
                      logger.info(f"Successfully set privacy for {target_repo_id} to {change['private']}.")
                 except HfHubHTTPError as e_http:
                      status_messages.append(f"SET_PRIVACY HTTP Error ({e_http.response.status_code if e_http.response else 'N/A'}): {e_http.response.text if e_http.response else str(e_http)}. Check token/permissions.")
                      logger.error(f"HTTP error setting privacy for {target_repo_id}: {e_http}")
                 except Exception as e:
                      status_messages.append(f"SET_PRIVACY Error: {str(e)}. Check logs.")
                      logger.exception(f"Error setting privacy for {target_repo_id}:")

            elif change['type'] == 'DELETE_SPACE':
                 delete_owner = change.get('owner') or owner_ui
                 delete_space = change.get('space_name') or space_name_ui
                 delete_repo_id = f"{delete_owner}/{delete_space}" if delete_owner and delete_space else repo_id

                 if not delete_repo_id:
                      status_messages.append("DELETE_SPACE Error: Target repo_id not specified.")
                      continue

                 if delete_repo_id != repo_id:
                      status_messages.append(f"DELETE_SPACE Error: AI requested deletion of '{delete_repo_id}', but this action is only permitted for the currently loaded space '{repo_id}'. Action blocked.")
                      logger.warning(f"Blocked DELETE_SPACE action: requested '{delete_repo_id}', current '{repo_id}'.")
                      continue

                 logger.warning(f"Attempting DESTRUCTIVE DELETE_SPACE action for {delete_repo_id}")
                 try:
                      api.delete_repo(repo_id=delete_repo_id, repo_type='space')
                      status_messages.append(f"DELETE_SPACE: Successfully deleted space `{delete_repo_id}`.")
                      logger.warning(f"Successfully deleted space {delete_repo_id}.")
                 except HfHubHTTPError as e_http:
                      status_messages.append(f"DELETE_SPACE HTTP Error ({e_http.response.status_code if e_http.response else 'N/A'}): {e_http.response.text if e_http.response else str(e_http)}. Check token/permissions.")
                      logger.error(f"HTTP error deleting space {delete_repo_id}: {e_http}")
                 except Exception as e:
                      status_messages.append(f"DELETE_SPACE Error: {str(e)}. Check logs.")
                      logger.exception(f"Error deleting space {delete_repo_id}:")

    except HfHubHTTPError as e_http:
        logger.error(f"Top-level HTTP error during apply_staged_changes for {repo_id_for_error_logging or 'unknown repo'}: {e_http}")
        status_messages.append(f"API HTTP Error ({e_http.response.status_code if e_http.response else 'N/A'}): {e_http.response.text if e_http.response else str(e_http)}")
    except Exception as e:
        logger.exception(f"Top-level error during apply_staged_changes for {repo_id_for_error_logging or 'unknown repo'}:")
        status_messages.append(f"An unexpected error occurred during apply staged changes: {str(e)}")

    final_status = " | ".join(status_messages) if status_messages else "No operations were applied."
    logger.info(f"Finished applying staged changes. Final status: {final_status}")
    return final_status

def delete_space_file(ui_api_token_from_textbox, space_name_ui, owner_ui, file_path_in_repo, commit_message_ui=None):
    repo_id_for_error_logging = f"{owner_ui}/{space_name_ui}" if owner_ui else space_name_ui
    repo_id = None
    logger.info(f"Attempting manual file deletion for '{file_path_in_repo}' from {repo_id_for_error_logging}")
    try:
        resolved_api_token, token_err = _get_api_token(ui_api_token_from_textbox)
        if token_err: return f"API Token Error: {token_err}"
        repo_id, err_repo_id = _determine_repo_id(resolved_api_token, owner_ui, space_name_ui)
        if err_repo_id: return f"Repo ID Error: {err_repo_id}"
        repo_id_for_error_logging = repo_id

        if not file_path_in_repo: return "Error: File path cannot be empty for deletion."
        file_path_in_repo = file_path_in_repo.lstrip('/').replace(os.sep, '/')

        effective_commit_message = commit_message_ui or f"Deleted file: {file_path_in_repo} via AI Space Editor UI"

        hf_delete_file(
            path_in_repo=file_path_in_repo,
            repo_id=repo_id,
            repo_type="space",
            token=resolved_api_token,
            commit_message=effective_commit_message,
            timeout=20
        )
        logger.info(f"Successfully deleted file: {file_path_in_repo}")
        return f"Successfully deleted file: `{file_path_in_repo}`"

    except FileNotFoundError:
         logger.error(f"FileNotFoundError during manual delete for '{file_path_in_repo}' in {repo_id_for_error_logging or 'unknown'}")
         return f"Delete Error: File '{file_path_in_repo}' not found in Space '{repo_id_for_error_logging or 'unknown repo'}' (404)."
    except HfHubHTTPError as e_http:
        logger.error(f"HTTP error deleting file {file_path_in_repo} from {repo_id_for_error_logging or 'unknown repo'}: {e_http}")
        error_message = str(e_http)
        status_code = e_http.response.status_code if e_http.response is not None else None

        if status_code == 404:
             return f"Delete Error: File '{file_path_in_repo}' not found in Space '{repo_id_for_error_logging or 'unknown repo'}' for deletion (404)."
        if status_code in (401, 403):
             return f"Delete Error: Access denied or authentication required for '{repo_id_for_error_logging or 'unknown repo'}' ({status_code}). Check token permissions."
        return f"Delete HTTP Error {status_code or 'unknown'} deleting file '{file_path_in_repo}': {error_message}"
    except Exception as e:
        logger.exception(f"Error deleting file {file_path_in_repo} from {repo_id_for_error_logging or 'unknown repo'}:")
        return f"Delete Error deleting file '{file_path_in_repo}': {str(e)}"

def update_space_file(ui_api_token_from_textbox, space_name_ui, owner_ui, file_path_in_repo, file_content, commit_message_ui):
    repo_id_for_error_logging = f"{owner_ui}/{space_name_ui}" if owner_ui else space_name_ui
    repo_id = None
    logger.info(f"Attempting manual file update for '{file_path_in_repo}' in {repo_id_for_error_logging}")
    try:
        resolved_api_token, token_err = _get_api_token(ui_api_token_from_textbox)
        if token_err: return f"API Token Error: {token_err}"
        repo_id, err_repo_id = _determine_repo_id(resolved_api_token, owner_ui, space_name_ui)
        if err_repo_id: return f"Repo ID Error: {err_repo_id}"
        repo_id_for_error_logging = repo_id

        if not file_path_in_repo: return "Update Error: File Path to update cannot be empty."
        file_path_in_repo = file_path_in_repo.lstrip('/').replace(os.sep, '/')
        commit_msg = commit_message_ui or f"Update {file_path_in_repo} via AI Space Editor UI"

        api = HfApi(token=resolved_api_token)

        tmp_file_path = None
        try:
            with tempfile.NamedTemporaryFile(mode='w', delete=False, encoding='utf-8') as tmp_file_obj:
                tmp_file_obj.write(file_content)
                tmp_file_path = tmp_file_obj.name

            api.upload_file(
                path_or_fileobj=tmp_file_path,
                path_in_repo=file_path_in_repo,
                repo_id=repo_id,
                repo_type="space",
                commit_message=commit_msg,
                timeout=20
            )
            logger.info(f"Successfully updated file: {file_path_in_repo}")
            return f"Successfully updated `{file_path_in_repo}`"
        finally:
            if tmp_file_path and os.path.exists(tmp_file_path):
                 os.remove(tmp_file_path)

    except FileNotFoundError:
         logger.error(f"FileNotFoundError during manual update for '{file_path_in_repo}' in {repo_id_for_error_logging or 'unknown'}")
         return f"Update Error: Local temporary file not found during upload for '{file_path_in_repo}'."
    except UnicodeDecodeError:
        logger.warning(f"UnicodeDecodeError for '{file_path_in_repo}' during manual update.")
        return f"Update Error: Content for '{file_path_in_repo}' is not valid UTF-8 text. Cannot edit this way."
    except HfHubHTTPError as e_http:
        logger.error(f"HTTP error in update_space_file for {repo_id_for_error_logging or 'unknown repo'}, file {file_path_in_repo}: {e_http}")
        error_message = str(e_http)
        status_code = e_http.response.status_code if e_http.response is not None else None
        if status_code == 404:
            return f"Update Error: Space '{repo_id_for_error_logging or 'unknown repo'}' or file '{file_path_in_repo}' not found (404)."
        if status_code in (401, 403):
             return f"Update Error: Access denied or authentication required for '{repo_id_for_error_logging or 'unknown repo'}' ({status_code}). Check token permissions."
        return f"Update HTTP Error {status_code or 'unknown'} updating file '{file_path_in_repo}': {error_message}"
    except Exception as e:
        logger.exception(f"Error in update_space_file for {repo_id_for_error_logging or 'unknown repo'}, file {file_path_in_repo}:")
        return f"Update Error updating file for `{repo_id_for_error_logging or 'unknown repo'}`: {str(e)}"

def get_space_runtime_status(ui_api_token_from_textbox, space_name_ui, owner_ui):
    repo_id_for_error_logging = f"{owner_ui}/{space_name_ui}" if owner_ui else space_name_ui
    repo_id = None
    logger.info(f"Fetching runtime status for Space: {repo_id_for_error_logging}")
    try:
        resolved_api_token, token_err = _get_api_token(ui_api_token_from_textbox)
        if token_err: return None, f"API Token Error: {token_err}"
        repo_id, err_repo_id = _determine_repo_id(resolved_api_token, owner_ui, space_name_ui)
        if err_repo_id: return None, f"Repo ID Error: {err_repo_id}"
        repo_id_for_error_logging = repo_id

        api = HfApi(token=resolved_api_token)

        runtime_info = api.get_space_runtime(repo_id=repo_id, timeout=20)
        logger.info(f"Received runtime info for {repo_id}. Stage: {runtime_info.stage}")

        status_details = {
            "stage": runtime_info.stage,
            "hardware": runtime_info.hardware,
            "requested_hardware": runtime_info.requested_hardware if hasattr(runtime_info, 'requested_hardware') else None,
            "error_message": None,
            "status": runtime_info.status if hasattr(runtime_info, 'status') else None,
            "full_log_link": f"https://huggingface.co/spaces/{repo_id}/logs" if repo_id else "#"
        }

        if runtime_info.stage == "ERRORED":
            error_content = None
            if hasattr(runtime_info, 'error') and runtime_info.error: error_content = str(runtime_info.error)
            if 'build' in runtime_info.raw and isinstance(runtime_info.raw['build'], dict) and runtime_info.raw['build'].get('status') == 'error':
                 error_content = f"Build Error: {runtime_info.raw['build'].get('message', error_content or 'Unknown build error')}"
            elif 'run' in runtime_info.raw and isinstance(runtime_info.raw['run'], dict) and runtime_info.raw['run'].get('status') == 'error':
                 error_content = f"Runtime Error: {runtime_info.raw['run'].get('message', error_content or 'Unknown runtime error')}"
            elif 'message' in runtime_info.raw and isinstance(runtime_info.raw['message'], str) and ('error' in runtime_info.raw['message'].lower() or runtime_info.raw['message'].strip().endswith('!')):
                 error_content = runtime_info.raw['message']

            status_details["error_message"] = error_content if error_content else "Space is in an errored state. Check logs for details."

        logger.info(f"Runtime status details for {repo_id}: {status_details}")
        return status_details, None

    except HfHubHTTPError as e_http:
        logger.error(f"HTTP error fetching runtime status for {repo_id_for_error_logging or 'unknown repo'}: {e_http}")
        error_message = str(e_http)
        status_code = e_http.response.status_code if e_http.response is not None else None

        if status_code == 404:
            return None, f"Status Error: Space '{repo_id_for_error_logging or 'unknown repo'}' not found or has no active runtime status (404)."
        if status_code in (401, 403):
             return None, f"Status Error: Access denied or authentication required for '{repo_id_for_error_logging or 'unknown repo'}' ({status_code}). Check token permissions."
        return None, f"Status HTTP Error {status_code or 'unknown'} fetching runtime status for '{repo_id_for_error_logging or 'unknown repo'}': {error_message}"

    except Exception as e:
        logger.exception(f"Error fetching runtime status for {repo_id_for_error_logging or 'unknown repo'}:")
        return None, f"Status Error fetching runtime status: {str(e)}"

def build_logic_set_space_privacy(hf_api_key, repo_id, private: bool):
    logger.info(f"Attempting to set privacy for '{repo_id}' to {private}.")
    try:
        token, err = _get_api_token(hf_api_key)
        if err or not token:
            logger.error(f"Token error setting privacy: {err or 'Token not found'}")
            return f"Error getting token: {err or 'Token not found.'}"
        api = HfApi(token=token)
        api.update_repo_visibility(repo_id=repo_id, private=private, repo_type='space')
        logger.info(f"Successfully set privacy for {repo_id} to {private}.")
        return f"Successfully set privacy for `{repo_id}` to `{private}`."
    except HfHubHTTPError as e_http:
        logger.error(f"HTTP error setting privacy for {repo_id}: {e_http}")
        status_code = e_http.response.status_code if e_http.response else 'N/A'
        return f"HTTP Error ({status_code}) setting privacy for `{repo_id}`: {e_http.response.text if e_http.response else str(e_http)}"
    except Exception as e:
        logger.exception(f"Error setting privacy for {repo_id}:")
        return f"Error setting privacy for `{repo_id}`: {e}"

def build_logic_delete_space(hf_api_key, owner, space_name):
    repo_id = f"{owner}/{space_name}"
    logger.warning(f"Attempting DESTRUCTIVE DELETE_SPACE action for '{repo_id}'.")
    try:
        token, err = _get_api_token(hf_api_key)
        if err or not token:
            logger.error(f"Token error deleting space: {err or 'Token not found'}")
            return f"Error getting token: {err or 'Token not found.'}"
        api = HfApi(token=token)
        api.delete_repo(repo_id=repo_id, repo_type='space')
        logger.warning(f"Successfully deleted space {repo_id}.")
        return f"Successfully deleted space `{repo_id}`."
    except HfHubHTTPError as e_http:
        logger.error(f"HTTP error deleting space {repo_id}: {e_http}")
        status_code = e_http.response.status_code if e_http.response else 'N/A'
        return f"HTTP Error ({status_code}) deleting space `{repo_id}`: {e_http.response.text if e_http.response else str(e_http)}"
    except Exception as e:
        logger.exception(f"Error deleting space {repo_id}:")
        return f"Error deleting space `{repo_id}`: {e}"