broadfield-dev's picture
Update build_logic.py
dea3ee4 verified
raw
history blame
46.5 kB
import os
import re
import tempfile
import shutil
import logging
from pathlib import Path
from huggingface_hub import (
create_repo,
upload_folder,
list_repo_files,
whoami,
hf_hub_download,
delete_file as hf_delete_file,
HfApi,
duplicate_repo as hf_duplicate_repo,
list_repos as hf_list_repos
)
from huggingface_hub.hf_api import CommitOperationDelete, CommitOperationAdd, CommitOperation
from huggingface_hub.utils import HfHubHTTPError
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)
def _get_api_token(ui_token_from_textbox=None):
env_token = os.getenv('HF_TOKEN')
if env_token:
logger.debug("Using HF_TOKEN from environment variable.")
return env_token, None
if ui_token_from_textbox:
logger.debug("Using HF_TOKEN from UI textbox.")
return ui_token_from_textbox.strip(), None
logger.warning("Hugging Face API token not provided in UI or HF_TOKEN env var.")
return None, "Error: Hugging Face API token not provided in UI or HF_TOKEN env var."
def _determine_repo_id(ui_api_token_from_textbox, owner_ui, space_name_ui):
if not space_name_ui: return None, "Error: Space Name cannot be empty."
if "/" in space_name_ui: return None, "Error: Space Name should not contain '/'. Use Owner field for the owner part."
final_owner = owner_ui
error_message = None
if not final_owner:
logger.info("Owner not specified, attempting to auto-detect from token.")
resolved_api_token, token_err = _get_api_token(ui_api_token_from_textbox)
if token_err: return None, f"Error auto-detecting owner: {token_err}"
if not resolved_api_token: return None, "Error: API token required for auto owner determination if Owner field is empty."
try:
user_info = whoami(token=resolved_api_token)
if user_info and 'name' in user_info:
final_owner = user_info['name']
logger.info(f"Auto-detected owner: {final_owner}")
else:
error_message = "Error: Could not retrieve username from token. Check token permissions or specify Owner."
logger.error(error_message)
except Exception as e:
error_message = f"Error retrieving username from token: {str(e)}. Specify Owner or check token."
logger.exception("Error retrieving username from token:")
if error_message: return None, error_message
if not final_owner: return None, "Error: Owner could not be determined. Please specify it in the Owner field."
repo_id = f"{final_owner}/{space_name_ui}"
logger.info(f"Determined repo_id: {repo_id}")
return repo_id, None
def parse_markdown(markdown_input):
space_info = {"repo_name_md": "", "owner_md": "", "files": []}
current_file_path = None
current_file_content_lines = []
in_file_definition = False
in_code_block = False
file_parsing_errors = []
lines = markdown_input.strip().split("\n")
cleaned_lines = []
for line_content_orig in lines:
if line_content_orig.strip().startswith("# "):
if line_content_orig.strip().startswith("# ### File:") or \
line_content_orig.strip().startswith("# ## File Structure") or \
line_content_orig.strip().startswith("# # Space:"):
cleaned_lines.append(line_content_orig.strip()[2:])
else:
cleaned_lines.append(line_content_orig)
else:
cleaned_lines.append(line_content_orig)
lines = cleaned_lines
for i, line_content_orig in enumerate(lines):
line_content_stripped = line_content_orig.strip()
line_num = i + 1
file_match = re.match(r"### File:\s*(?P<filename_line>[^\n]+)", line_content_stripped)
if file_match:
if current_file_path is not None and in_file_definition:
content_to_save = "\n".join(current_file_content_lines).strip()
space_info["files"].append({"path": current_file_path, "content": content_to_save})
filename_line = file_match.group("filename_line").strip()
current_file_path = filename_line
current_file_path = re.split(r'\s*\(', current_file_path, 1)[0].strip()
current_file_path = current_file_path.strip('`\'"').strip()
if not current_file_path:
file_parsing_errors.append(f"Line {line_num}: Found '### File:' but filename is empty or invalid.")
current_file_path = None
in_file_definition = False
continue
current_file_content_lines = []
in_file_definition = True
in_code_block = False
logger.debug(f"Parsed file header: {current_file_path}")
continue
if not in_file_definition:
if line_content_stripped.startswith("# Space:"):
full_space_name_md = line_content_stripped.replace("# Space:", "").strip()
if "/" in full_space_name_md:
parts = full_space_name_md.split("/", 1)
if len(parts) == 2:
space_info["owner_md"], space_info["repo_name_md"] = parts[0].strip(), parts[1].strip()
else:
space_info["repo_name_md"] = full_space_name_md
else:
space_info["repo_name_md"] = full_space_name_md
logger.debug(f"Parsed space header: {space_info['owner_md']}/{space_info['repo_name_md']}")
continue
if line_content_stripped.startswith("## File Structure"):
structure_block_start = i + 1
while structure_block_start < len(lines) and not lines[structure_block_start].strip().startswith("```"):
structure_block_start += 1
if structure_block_start < len(lines) and lines[structure_block_start].strip().startswith("```"):
structure_block_end = structure_block_start + 1
while structure_block_end < len(lines) and not lines[structure_block_end].strip().startswith("```"):
structure_block_end += 1
if structure_block_end < len(lines) and lines[structure_block_end].strip().startswith("```"):
logger.debug(f"Skipping File Structure block from line {i+1} to {structure_block_end+1}")
i = structure_block_end
continue
continue
if in_file_definition:
if line_content_stripped.startswith("```"):
in_code_block = not in_code_block
logger.debug(f"Toggled code block to {in_code_block} at line {line_num}")
continue
if in_code_block:
current_file_content_lines.append(line_content_orig)
elif line_content_stripped.startswith("[Binary file") or line_content_stripped.startswith("[Error loading content:") or line_content_stripped.startswith("[Binary or Skipped file]"):
current_file_content_lines.append(line_content_orig)
logger.debug(f"Parsed binary/error marker for {current_file_path} at line {line_num}")
else:
pass
if current_file_path is not None and in_file_definition:
content_to_save = "\n".join(current_file_content_lines).strip()
space_info["files"].append({"path": current_file_path, "content": content_to_save})
space_info["files"] = [f for f in space_info["files"] if f.get("path")]
space_info["owner_md"] = space_info["owner_md"].strip()
space_info["repo_name_md"] = space_info["repo_name_md"].strip()
if file_parsing_errors:
logger.warning(f"Markdown parsing encountered errors: {file_parsing_errors}")
logger.info(f"Parsed markdown. Found {len(space_info['files'])} files.")
return space_info
def get_space_repository_info(ui_api_token_from_textbox, space_name_ui, owner_ui):
repo_id_for_error_logging = f"{owner_ui}/{space_name_ui}" if owner_ui else space_name_ui
sdk = None
files = []
error = None
repo_id = None
logger.info(f"Attempting to get repo info for {repo_id_for_error_logging}")
try:
resolved_api_token, token_err = _get_api_token(ui_api_token_from_textbox)
if token_err: return None, [], token_err
repo_id, err_repo_id = _determine_repo_id(resolved_api_token, owner_ui, space_name_ui)
if err_repo_id: return None, [], err_repo_id
repo_id_for_error_logging = repo_id
api = HfApi(token=resolved_api_token)
repo_info_obj = api.repo_info(repo_id=repo_id, repo_type="space", timeout=20)
sdk = repo_info_obj.sdk
files = [sibling.rfilename for sibling in repo_info_obj.siblings if sibling.rfilename]
if not files and repo_info_obj.siblings:
logger.warning(f"Repo {repo_id} has siblings but no rfilenames extracted. Total siblings: {len(repo_info_obj.siblings)}")
logger.info(f"Successfully got repo info for {repo_id}. SDK: {sdk}, Files found: {len(files)}")
except HfHubHTTPError as e_http:
logger.error(f"HTTP error getting repo info for {repo_id_for_error_logging or 'unknown repo'}: {e_http}")
error_message = str(e_http)
status_code = e_http.response.status_code if e_http.response is not None else None
if status_code == 404:
error = f"Space '{repo_id_for_error_logging or 'unknown repo'}' not found (404)."
elif status_code in (401,403):
error = f"Access denied for '{repo_id_for_error_logging or 'unknown repo'}' ({status_code}). Check token permissions."
else:
error = f"HTTP Error {status_code or 'unknown'} for '{repo_id_for_error_logging or 'unknown repo'}': {error_message}"
except Exception as e:
logger.warning(f"Could not get full repo_info for {repo_id_for_error_logging or 'unknown repo'}, attempting list_repo_files fallback: {e}")
error = f"Error retrieving Space info for `{repo_id_for_error_logging or 'unknown repo'}`: {str(e)}. Attempting file list fallback."
try:
resolved_api_token_fb, token_err_fb = _get_api_token(ui_api_token_from_textbox)
if token_err_fb: return None, [], f"{error}\nAPI Token Error during fallback: {token_err_fb}"
repo_id_fb, err_repo_id_fb = _determine_repo_id(resolved_api_token_fb, owner_ui, space_name_ui)
if err_repo_id_fb: return None, [], f"{error}\nRepo ID Error during fallback: {err_repo_id_fb}"
files = list_repo_files(repo_id=repo_id_fb, token=resolved_api_token_fb, repo_type="space", timeout=20)
error = f"Warning: Could not fetch full Space info (SDK etc.) for `{repo_id_for_error_logging or 'unknown repo'}`: {str(e)}. File list loaded via fallback."
logger.info(f"Fallback list_repo_files successful for {repo_id_fb}. Files found: {len(files)}")
except HfHubHTTPError as e2_http:
logger.error(f"HTTP error during fallback list_repo_files for {repo_id_for_error_logging or 'unknown repo'}: {e2_http}")
error_message_fb = str(e2_http)
status_code_fb = e2_http.response.status_code if e2_http.response is not None else None
if status_code_fb == 404:
error = f"Space '{repo_id_for_error_logging or 'unknown repo'}' not found during fallback (404)."
else:
error = f"HTTP Error {status_code_fb or 'unknown'} for '{repo_id_for_error_logging or 'unknown repo'}' during fallback: {error_message_fb}"
files = []
except Exception as e2:
logger.exception(f"Error listing files for {repo_id_for_error_logging or 'unknown repo'} during fallback: {e2}")
error = f"{error}\nError listing files during fallback for `{repo_id_for_error_logging or 'unknown repo'}`: {str(e2)}"
files = []
if not files and not error and (repo_id_for_error_logging is not None):
error = f"No files found in Space `{repo_id_for_error_logging or 'unknown repo'}`."
return sdk, files, error
def list_space_files_for_browsing(ui_api_token_from_textbox, space_name_ui, owner_ui):
files, err = get_space_repository_info(ui_api_token_from_textbox, space_name_ui, owner_ui)[1:]
return files, err
def get_space_file_content(ui_api_token_from_textbox, space_name_ui, owner_ui, file_path_in_repo):
repo_id_for_error_logging = f"{owner_ui}/{space_name_ui}" if owner_ui else space_name_ui
repo_id = None
logger.info(f"Attempting to get content for file '{file_path_in_repo}' from {repo_id_for_error_logging}")
try:
resolved_api_token, token_err = _get_api_token(ui_api_token_from_textbox)
if token_err: return None, token_err
repo_id, err_repo_id = _determine_repo_id(resolved_api_token, owner_ui, space_name_ui)
if err_repo_id: return None, err_repo_id
repo_id_for_error_logging = repo_id
if not file_path_in_repo: return None, "Error: File path cannot be empty."
file_path_in_repo = file_path_in_repo.replace("\\", "/")
downloaded_file_path = hf_hub_download(
repo_id=repo_id,
filename=file_path_in_repo,
repo_type="space",
token=resolved_api_token,
local_dir_use_symlinks=False,
cache_dir=None,
timeout=20
)
content = Path(downloaded_file_path).read_text(encoding="utf-8")
logger.info(f"Successfully downloaded and read content for '{file_path_in_repo}'.")
return content, None
except FileNotFoundError:
logger.error(f"FileNotFoundError for '{file_path_in_repo}' in {repo_id_for_error_logging or 'unknown'}")
return None, f"Error: File '{file_path_in_repo}' not found in Space '{repo_id_for_error_logging or 'unknown repo'}' (404)."
except UnicodeDecodeError:
logger.warning(f"UnicodeDecodeError for '{file_path_in_repo}'. Likely binary.")
return None, f"Error: File '{file_path_in_repo}' is not valid UTF-8 text. Cannot display."
except HfHubHTTPError as e_http:
logger.error(f"HTTP error fetching file {file_path_in_repo} from {repo_id_for_error_logging or 'unknown repo'}: {e_http}")
error_message = str(e_http)
status_code = e_http.response.status_code if e_http.response is not None else None
if status_code == 404:
return None, f"Error: File '{file_path_in_repo}' not found in Space '{repo_id_for_error_logging or 'unknown repo'}' (404)."
if status_code in (401, 403):
return None, f"Error: Access denied or authentication required for '{repo_id_for_error_logging or 'unknown repo'}' ({status_code}). Check token permissions."
return None, f"HTTP Error {status_code or 'unknown'} fetching file '{file_path_in_repo}': {error_message}"
except Exception as e:
logger.exception(f"Error fetching file content for {file_path_in_repo} from {repo_id_for_error_logging or 'unknown repo'}:")
return None, f"Error fetching file content: {str(e)}"
def apply_staged_changes(ui_api_token_from_textbox, owner_ui, space_name_ui, changeset):
repo_id_for_error_logging = f"{owner_ui}/{space_name_ui}" if owner_ui else space_name_ui
repo_id = None
status_messages = []
logger.info(f"Attempting to apply {len(changeset)} staged changes to {repo_id_for_error_logging}")
try:
resolved_api_token, token_err = _get_api_token(ui_api_token_from_textbox)
if token_err: return f"API Token Error: {token_err}"
repo_id, err_repo_id = _determine_repo_id(resolved_api_token, owner_ui, space_name_ui)
if err_repo_id: return f"Repo ID Error: {err_repo_id}"
repo_id_for_error_logging = repo_id
api = HfApi(token=resolved_api_token)
# --- Handle Exclusive Actions First ---
exclusive_action = next((c for c in changeset if c['type'] in ['DUPLICATE_SPACE', 'DELETE_SPACE']), None)
if exclusive_action:
if exclusive_action['type'] == 'DUPLICATE_SPACE':
# This should be handled in the confirm_changes handler to trigger a space load
# Reaching here means the logic in confirm_changes failed to intercept it
status_messages.append("Internal Error: DUPLICATE_SPACE action should have been handled exclusively.")
logger.error("Internal Error: DUPLICATE_SPACE action was passed to apply_staged_changes unexpectedly.")
elif exclusive_action['type'] == 'DELETE_SPACE':
# This should also ideally be handled by confirm_changes for UI state reset, but we can execute it here too
delete_owner = exclusive_action.get('owner') or owner_ui
delete_space = exclusive_action.get('space_name') or space_name_ui
delete_repo_id_target = f"{delete_owner}/{delete_space}" if delete_owner and delete_space else repo_id
if not delete_repo_id_target:
status_messages.append("DELETE_SPACE Error: Target repo_id not specified.")
elif delete_repo_id_target != repo_id:
status_messages.append(f"DELETE_SPACE Error: AI requested deletion of '{delete_repo_id_target}', but this action is only permitted for the currently loaded space '{repo_id}'. Action blocked.")
logger.warning(f"Blocked DELETE_SPACE action in apply_staged_changes: requested '{delete_repo_id_target}', current '{repo_id}'.")
else:
logger.warning(f"Attempting DESTRUCTIVE DELETE_SPACE action for {delete_repo_id_target}")
try:
api.delete_repo(repo_id=delete_repo_id_target, repo_type='space')
status_messages.append(f"DELETE_SPACE: Successfully deleted space `{delete_repo_id_target}`.")
logger.warning(f"Successfully deleted space {delete_repo_id_target}.")
except HfHubHTTPError as e_http:
status_messages.append(f"DELETE_SPACE HTTP Error ({e_http.response.status_code if e_http.response else 'N/A'}): {e_http.response.text if e_http.response else str(e_http)}. Check token/permissions.")
logger.error(f"HTTP error deleting space {delete_repo_id_target}: {e_http}")
except Exception as e:
status_messages.append(f"DELETE_SPACE Error: {str(e)}. Check logs.")
logger.exception(f"Error deleting space {delete_repo_id_target}:")
# If an exclusive action was found and potentially processed, stop here
final_status = " | ".join(status_messages) if status_messages else "Exclusive operation attempted."
logger.info(f"Exclusive action processed. Final status: {final_status}")
return final_status
# --- Handle Non-Exclusive Actions and File Changes ---
# This block is only reached if no exclusive action was found
create_space_op = next((c for c in changeset if c['type'] == 'CREATE_SPACE'), None)
if create_space_op:
try:
api.create_repo(repo_id=repo_id, repo_type="space", space_sdk=create_space_op.get('sdk', 'gradio'), private=create_space_op.get('private', False), exist_ok=True)
status_messages.append(f"CREATE_SPACE: Successfully created or ensured space [{repo_id}](https://huggingface.co/spaces/{repo_id}) exists.")
logger.info(f"Successfully created or ensured space {repo_id} exists.")
except HfHubHTTPError as e_http:
status_messages.append(f"CREATE_SPACE HTTP Error ({e_http.response.status_code if e_http.response else 'N/A'}): {e_http.response.text if e_http.response else str(e_http)}. Check logs.")
logger.error(f"HTTP error creating space {repo_id}: {e_http}")
except Exception as e:
status_messages.append(f"CREATE_SPACE Error: {e}")
logger.error(f"Error creating space {repo_id}: {e}")
temp_dir = None
paths_to_upload = {}
delete_operations = []
try:
temp_dir = tempfile.TemporaryDirectory()
repo_staging_path = Path(temp_dir.name) / "repo_staging_content"
repo_staging_path.mkdir(exist_ok=True)
gitattributes_path_local = repo_staging_path / ".gitattributes"
try:
with open(gitattributes_path_local, "w", encoding="utf-8") as f:
f.write("* text=auto eol=lf\n")
paths_to_upload[str(gitattributes_path_local)] = ".gitattributes"
except Exception as e:
status_messages.append(f"Warning: Could not stage .gitattributes file: {e}")
logger.warning(f"Could not stage .gitattributes: {e}")
for change in changeset:
if change['type'] == 'UPDATE_FILE' or change['type'] == 'CREATE_FILE':
file_path_in_repo = change['path'].lstrip('/').replace(os.sep, '/')
if not file_path_in_repo:
status_messages.append(f"Skipping {change['type']} operation: empty path.")
continue
content_to_write = change.get('content', '')
if content_to_write.startswith("[Binary file") or content_to_write.startswith("[Error loading content:") or content_to_write.startswith("[Binary or Skipped file]"):
status_messages.append(f"Skipping {change['type']} for '{file_path_in_repo}': Content is a binary/error placeholder.")
logger.warning(f"Skipping {change['type']} operation for '{file_path_in_repo}': Content is binary/error placeholder.")
continue
file_path_local = repo_staging_path / file_path_in_repo
file_path_local.parent.mkdir(parents=True, exist_ok=True)
try:
with open(file_path_local, "w", encoding="utf-8") as f:
f.write(content_to_write)
paths_to_upload[str(file_path_local)] = file_path_in_repo
logger.debug(f"Staged file for {change['type']}: {file_path_in_repo}")
except Exception as file_write_error:
status_messages.append(f"Error staging file {file_path_in_repo} for {change['type']}: {file_write_error}")
logger.error(f"Error writing file {file_path_in_repo} during staging for {change['type']}: {file_write_error}")
elif change['type'] == 'DELETE_FILE':
file_path_in_repo = change['path'].lstrip('/').replace(os.sep, '/')
if not file_path_in_repo:
status_messages.append(f"Skipping DELETE_FILE operation: empty path.")
continue
delete_operations.append(CommitOperationDelete(path_in_repo=file_path_in_repo))
logger.debug(f"Added DELETE_FILE operation for: {file_path_in_repo}")
if delete_operations:
try:
commit_message_delete = f"AI Space Builder: Deleted {len(delete_operations)} files."
logger.info(f"Performing delete commit for {repo_id_for_error_logging}: {commit_message_delete}")
api.create_commit(
repo_id=repo_id,
repo_type="space",
operations=delete_operations,
commit_message=commit_message_delete
)
status_messages.append(f"File Deletions: Successfully committed {len(delete_operations)} deletions.")
logger.info("Delete commit successful.")
except HfHubHTTPError as e_http:
status_messages.append(f"File Deletion HTTP Error ({e_http.response.status_code if e_http.response else 'N/A'}): {e_http.response.text if e_http.response else str(e_http)}. Check logs.")
logger.error(f"HTTP error during delete commit for {repo_id}: {e_http}")
except Exception as e_delete_commit:
status_messages.append(f"File Deletion Error: {str(e_delete_commit)}. Check logs.")
logger.exception(f"Error during delete commit for {repo_id}:")
if paths_to_upload:
try:
commit_message_upload = f"AI Space Builder: Updated Space content for {repo_id}"
logger.info(f"Uploading staged files from {str(repo_staging_path)} to {repo_id}...")
upload_folder(
repo_id=repo_id,
folder_path=str(repo_staging_path),
path_in_repo=".",
token=resolved_api_token,
repo_type="space",
commit_message=commit_message_upload,
allow_patterns=["*"],
)
status_messages.append(f"File Uploads/Updates: Successfully uploaded/updated {len(paths_to_upload)} files.")
logger.info("Upload/Update commit successful.")
except HfHubHTTPError as e_http:
status_messages.append(f"File Upload/Update HTTP Error ({e_http.response.status_code if e_http.response else 'N/A'}): {e_http.response.text if e_http.response else str(e_http)}. Check logs.")
logger.error(f"HTTP error during upload_folder for {repo_id}: {e_http}")
except Exception as e_upload:
status_messages.append(f"File Upload/Update Error: {str(e_upload)}. Check logs.")
logger.exception(f"Error during upload_folder for {repo_id}:")
else:
status_messages.append("No file changes (create/update/delete) to commit.")
logger.info("No file changes to commit.")
finally:
if temp_dir:
try:
temp_dir.cleanup()
logger.info("Cleaned up temporary staging directory.")
except Exception as e:
logger.error(f"Error cleaning up temp dir: {e}")
for change in changeset:
if change['type'] == 'SET_PRIVACY':
try:
target_repo_id = change.get('repo_id', repo_id)
if not target_repo_id:
status_messages.append("SET_PRIVACY Error: Target repo_id not specified.")
continue
api.update_repo_visibility(repo_id=target_repo_id, private=change['private'], repo_type='space')
status_messages.append(f"SET_PRIVACY: Successfully set `{target_repo_id}` to `private={change['private']}`.")
logger.info(f"Successfully set privacy for {target_repo_id} to {change['private']}.")
except HfHubHTTPError as e_http:
status_messages.append(f"SET_PRIVACY HTTP Error ({e_http.response.status_code if e_http.response else 'N/A'}): {e_http.response.text if e_http.response else str(e_http)}. Check token/permissions.")
logger.error(f"HTTP error setting privacy for {target_repo_id}: {e_http}")
except Exception as e:
status_messages.append(f"SET_PRIVACY Error: {str(e)}. Check logs.")
logger.exception(f"Error setting privacy for {target_repo_id}:")
# Note: DELETE_SPACE and DUPLICATE_SPACE are handled as exclusive actions at the top
except HfHubHTTPError as e_http:
logger.error(f"Top-level HTTP error during apply_staged_changes for {repo_id_for_error_logging or 'unknown repo'}: {e_http}")
status_messages.append(f"API HTTP Error ({e_http.response.status_code if e_http.response else 'N/A'}): {e_http.response.text if e_http.response else str(e_http)}")
except Exception as e:
logger.exception(f"Top-level error during apply_staged_changes for {repo_id_for_error_logging or 'unknown repo'}:")
status_messages.append(f"An unexpected error occurred during apply staged changes: {str(e)}")
final_status = " | ".join(status_messages) if status_messages else "No operations were applied."
logger.info(f"Finished applying staged changes. Final status: {final_status}")
return final_status
def delete_space_file(ui_api_token_from_textbox, space_name_ui, owner_ui, file_path_in_repo, commit_message_ui=None):
repo_id_for_error_logging = f"{owner_ui}/{space_name_ui}" if owner_ui else space_name_ui
repo_id = None
logger.info(f"Attempting manual file deletion for '{file_path_in_repo}' from {repo_id_for_error_logging}")
try:
resolved_api_token, token_err = _get_api_token(ui_api_token_from_textbox)
if token_err: return f"API Token Error: {token_err}"
repo_id, err_repo_id = _determine_repo_id(resolved_api_token, owner_ui, space_name_ui)
if err_repo_id: return f"Repo ID Error: {err_repo_id}"
repo_id_for_error_logging = repo_id
if not file_path_in_repo: return "Error: File path cannot be empty for deletion."
file_path_in_repo = file_path_in_repo.lstrip('/').replace(os.sep, '/')
effective_commit_message = commit_message_ui or f"Deleted file: {file_path_in_repo} via AI Space Editor UI"
hf_delete_file(
path_in_repo=file_path_in_repo,
repo_id=repo_id,
repo_type="space",
token=resolved_api_token,
commit_message=effective_commit_message,
timeout=20
)
logger.info(f"Successfully deleted file: {file_path_in_repo}")
return f"Successfully deleted file: `{file_path_in_repo}`"
except FileNotFoundError:
logger.error(f"FileNotFoundError during manual delete for '{file_path_in_repo}' in {repo_id_for_error_logging or 'unknown'}")
return f"Delete Error: File '{file_path_in_repo}' not found in Space '{repo_id_for_error_logging or 'unknown repo'}' (404)."
except HfHubHTTPError as e_http:
logger.error(f"HTTP error deleting file {file_path_in_repo} from {repo_id_for_error_logging or 'unknown repo'}: {e_http}")
error_message = str(e_http)
status_code = e_http.response.status_code if e_http.response is not None else None
if status_code == 404:
return f"Delete Error: File '{file_path_in_repo}' not found in Space '{repo_id_for_error_logging or 'unknown repo'}' for deletion (404)."
if status_code in (401, 403):
return f"Delete Error: Access denied or authentication required for '{repo_id_for_error_logging or 'unknown repo'}' ({status_code}). Check token permissions."
return f"Delete HTTP Error {status_code or 'unknown'} deleting file '{file_path_in_repo}': {error_message}"
except Exception as e:
logger.exception(f"Error deleting file {file_path_in_repo} from {repo_id_for_error_logging or 'unknown repo'}:")
return f"Delete Error deleting file '{file_path_in_repo}': {str(e)}"
def update_space_file(ui_api_token_from_textbox, space_name_ui, owner_ui, file_path_in_repo, file_content, commit_message_ui):
repo_id_for_error_logging = f"{owner_ui}/{space_name_ui}" if owner_ui else space_name_ui
repo_id = None
logger.info(f"Attempting manual file update for '{file_path_in_repo}' in {repo_id_for_error_logging}")
try:
resolved_api_token, token_err = _get_api_token(ui_api_token_from_textbox)
if token_err: return f"API Token Error: {token_err}"
repo_id, err_repo_id = _determine_repo_id(resolved_api_token, owner_ui, space_name_ui)
if err_repo_id: return f"Repo ID Error: {err_repo_id}"
repo_id_for_error_logging = repo_id
if not file_path_in_repo: return "Update Error: File Path to update cannot be empty."
file_path_in_repo = file_path_in_repo.lstrip('/').replace(os.sep, '/')
commit_msg = commit_message_ui or f"Update {file_path_in_repo} via AI Space Editor UI"
api = HfApi(token=resolved_api_token)
tmp_file_path = None
try:
with tempfile.NamedTemporaryFile(mode='w', delete=False, encoding='utf-8') as tmp_file_obj:
tmp_file_obj.write(file_content)
tmp_file_path = tmp_file_obj.name
api.upload_file(
path_or_fileobj=tmp_file_path,
path_in_repo=file_path_in_repo,
repo_id=repo_id,
repo_type="space",
commit_message=commit_msg,
timeout=20
)
logger.info(f"Successfully updated file: {file_path_in_repo}")
return f"Successfully updated `{file_path_in_repo}`"
finally:
if tmp_file_path and os.path.exists(tmp_file_path):
os.remove(tmp_file_path)
except FileNotFoundError:
logger.error(f"FileNotFoundError during manual update for '{file_path_in_repo}' in {repo_id_for_error_logging or 'unknown'}")
return f"Update Error: Local temporary file not found during upload for '{file_path_in_repo}'."
except UnicodeDecodeError:
logger.warning(f"UnicodeDecodeError for '{file_path_in_repo}' during manual update.")
return f"Update Error: Content for '{file_path_in_repo}' is not valid UTF-8 text. Cannot edit this way."
except HfHubHTTPError as e_http:
logger.error(f"HTTP error in update_space_file for {repo_id_for_error_logging or 'unknown repo'}, file {file_path_in_repo}: {e_http}")
error_message = str(e_http)
status_code = e_http.response.status_code if e_http.response is not None else None
if status_code == 404:
return f"Update Error: Space '{repo_id_for_error_logging or 'unknown repo'}' or file '{file_path_in_repo}' not found (404)."
if status_code in (401, 403):
return f"Update Error: Access denied or authentication required for '{repo_id_for_error_logging or 'unknown repo'}' ({status_code}). Check token permissions."
return f"Update HTTP Error {status_code or 'unknown'} updating file '{file_path_in_repo}': {error_message}"
except Exception as e:
logger.exception(f"Error in update_space_file for {repo_id_for_error_logging or 'unknown repo'}, file {file_path_in_repo}:")
return f"Update Error updating file for `{repo_id_for_error_logging or 'unknown repo'}`: {str(e)}"
def get_space_runtime_status(ui_api_token_from_textbox, space_name_ui, owner_ui):
repo_id_for_error_logging = f"{owner_ui}/{space_name_ui}" if owner_ui else space_name_ui
repo_id = None
logger.info(f"Fetching runtime status for Space: {repo_id_for_error_logging}")
try:
resolved_api_token, token_err = _get_api_token(ui_api_token_from_textbox)
if token_err: return None, f"API Token Error: {token_err}"
repo_id, err_repo_id = _determine_repo_id(resolved_api_token, owner_ui, space_name_ui)
if err_repo_id: return None, f"Repo ID Error: {err_repo_id}"
repo_id_for_error_logging = repo_id
api = HfApi(token=resolved_api_token)
runtime_info = api.get_space_runtime(repo_id=repo_id, timeout=20)
logger.info(f"Received runtime info for {repo_id}. Stage: {runtime_info.stage}")
status_details = {
"stage": runtime_info.stage,
"hardware": runtime_info.hardware,
"requested_hardware": runtime_info.requested_hardware if hasattr(runtime_info, 'requested_hardware') else None,
"error_message": None,
"status": runtime_info.status if hasattr(runtime_info, 'status') else None,
"full_log_link": f"https://huggingface.co/spaces/{repo_id}/logs" if repo_id else "#"
}
if runtime_info.stage == "ERRORED":
error_content = None
if hasattr(runtime_info, 'error') and runtime_info.error: error_content = str(runtime_info.error)
if 'build' in runtime_info.raw and isinstance(runtime_info.raw['build'], dict) and runtime_info.raw['build'].get('status') == 'error':
error_content = f"Build Error: {runtime_info.raw['build'].get('message', error_content or 'Unknown build error')}"
elif 'run' in runtime_info.raw and isinstance(runtime_info.raw['run'], dict) and runtime_info.raw['run'].get('status') == 'error':
error_content = f"Runtime Error: {runtime_info.raw['run'].get('message', error_content or 'Unknown runtime error')}"
elif 'message' in runtime_info.raw and isinstance(runtime_info.raw['message'], str) and ('error' in runtime_info.raw['message'].lower() or runtime_info.raw['message'].strip().endswith('!')):
error_content = runtime_info.raw['message']
status_details["error_message"] = error_content if error_content else "Space is in an errored state. Check logs for details."
logger.info(f"Runtime status details for {repo_id}: {status_details}")
return status_details, None
except HfHubHTTPError as e_http:
logger.error(f"HTTP error fetching runtime status for {repo_id_for_error_logging or 'unknown repo'}: {e_http}")
error_message = str(e_http)
status_code = e_http.response.status_code if e_http.response is not None else None
if status_code == 404:
return None, f"Status Error: Space '{repo_id_for_error_logging or 'unknown repo'}' not found or has no active runtime status (404)."
if status_code in (401, 403):
return None, f"Status Error: Access denied or authentication required for '{repo_id_for_error_logging or 'unknown repo'}' ({status_code}). Check token permissions."
return None, f"Status HTTP Error {status_code or 'unknown'} fetching runtime status for '{repo_id_for_error_logging or 'unknown repo'}': {error_message}"
except Exception as e:
logger.exception(f"Error fetching runtime status for {repo_id_for_error_logging or 'unknown repo'}:")
return None, f"Status Error fetching runtime status: {str(e)}"
def build_logic_set_space_privacy(hf_api_key, repo_id, private: bool):
logger.info(f"Attempting to set privacy for '{repo_id}' to {private}.")
try:
token, err = _get_api_token(hf_api_key)
if err or not token:
logger.error(f"Token error setting privacy: {err or 'Token not found'}")
return f"Error getting token: {err or 'Token not found.'}"
api = HfApi(token=token)
api.update_repo_visibility(repo_id=repo_id, private=private, repo_type='space')
logger.info(f"Successfully set privacy for {repo_id} to {private}.")
return f"Successfully set privacy for `{repo_id}` to `{private}`."
except HfHubHTTPError as e_http:
logger.error(f"HTTP error setting privacy for {repo_id}: {e_http}")
status_code = e_http.response.status_code if e_http.response else 'N/A'
return f"HTTP Error ({status_code}) setting privacy for `{repo_id}`: {e_http.response.text if e_http.response else str(e_http)}"
except Exception as e:
logger.exception(f"Error setting privacy for {repo_id}:")
return f"Error setting privacy for `{repo_id}`: {e}"
def build_logic_delete_space(hf_api_key, owner, space_name):
repo_id = f"{owner}/{space_name}"
logger.warning(f"Attempting DESTRUCTIVE DELETE_SPACE action for '{repo_id}'.")
try:
token, err = _get_api_token(hf_api_key)
if err or not token:
logger.error(f"Token error deleting space: {err or 'Token not found'}")
return f"Error getting token: {err or 'Token not found.'}"
api = HfApi(token=token)
api.delete_repo(repo_id=repo_id, repo_type='space')
logger.warning(f"Successfully deleted space {repo_id}.")
return f"Successfully deleted space `{repo_id}`."
except HfHubHTTPError as e_http:
logger.error(f"HTTP error deleting space {repo_id}: {e_http}")
status_code = e_http.response.status_code if e_http.response else 'N/A'
return f"HTTP Error ({status_code}) deleting space `{repo_id}`: {e_http.response.text if e_http.response else str(e_http)}"
except Exception as e:
logger.exception(f"Error deleting space {repo_id}:")
return f"Error deleting space `{repo_id}`: {e}"
def duplicate_space(hf_api_key, source_repo_id, target_repo_id, private: bool = False):
"""Duplicates a Hugging Face Space."""
logger.info(f"Attempting to duplicate '{source_repo_id}' to '{target_repo_id}' (private={private}).")
try:
token, err = _get_api_token(hf_api_key)
if err or not token:
logger.error(f"Token error duplicating space: {err or 'Token not found'}")
return f"Error getting token: {err or 'Token not found.'}"
# Validate target_repo_id format if it includes owner, extract owner/name
if '/' in target_repo_id:
target_owner, target_space_name = target_repo_id.split('/', 1)
if not target_owner or not target_space_name or '/' in target_space_name:
return f"Error: Invalid target repository ID format '{target_repo_id}'. Must be '<owner>/<space_name>'."
else:
# If only space name is provided, try to use the token's user as owner
target_space_name = target_repo_id
try:
user_info = whoami(token=token)
target_owner = user_info.get('name')
if not target_owner: raise Exception("Could not determine owner from token.")
target_repo_id = f"{target_owner}/{target_space_name}" # Update target_repo_id
except Exception as e:
logger.error(f"Could not determine target owner from token: {e}")
return f"Error: Target repository ID '{target_repo_id}' is missing owner, and owner could not be determined from token ({e}). Use '<owner>/<space_name>' format or set the Owner field."
hf_duplicate_repo(
from_repo=source_repo_id,
to_repo=target_repo_id,
repo_type="space",
token=token,
private=private,
# Use exist_ok=True? The UI should probably warn, but the backend can handle overwrite
# For now, let's assume overwrite is intended if triggered by AI/manual button after warning
exist_ok=True # Allow overwriting existing target space
)
logger.info(f"Successfully duplicated space from {source_repo_id} to {target_repo_id}.")
return f"Successfully duplicated space from `{source_repo_id}` to `{target_repo_id}`."
except HfHubHTTPError as e_http:
logger.error(f"HTTP error duplicating space from {source_repo_id} to {target_repo_id}: {e_http}")
status_code = e_http.response.status_code if e_http.response else 'N/A'
return f"HTTP Error ({status_code}) duplicating space: {e_http.response.text if e_http.response else str(e_http)}"
except Exception as e:
logger.exception(f"Error duplicating space from {source_repo_id} to {target_repo_id}:")
return f"Error duplicating space: {e}"
def list_user_spaces(hf_api_key, owner=None):
"""Lists spaces for the authenticated user or a specific owner/org."""
logger.info(f"Attempting to list spaces for owner: {owner or 'authenticated user'}.")
try:
token, err = _get_api_token(hf_api_key)
if err or not token:
logger.error(f"Token error listing spaces: {err or 'Token not found'}")
return None, f"Error getting token: {err or 'Token not found.'}"
# If owner is not provided, list spaces for the authenticated user
# We need the username for list_repos filter if owner is None in UI
effective_owner = owner
if not effective_owner:
try:
user_info = whoami(token=token)
effective_owner = user_info.get('name')
if not effective_owner: raise Exception("Could not determine owner from token.")
logger.info(f"Listing spaces for auto-detected owner: {effective_owner}")
except Exception as e:
logger.error(f"Could not determine owner from token for listing: {e}")
# Continue trying list_repos without username filter? No, list_repos
# typically needs user or org specified for filtering unless it's public repos.
# Let's require owner or a valid token for user listing.
return None, f"Error auto-detecting owner for listing: {e}. Please specify Owner field."
api = HfApi(token=token)
spaces = hf_list_repos(author=effective_owner, type="space", token=token, timeout=20)
space_ids = [f"{r.author}/{r.id}" for r in spaces]
logger.info(f"Successfully listed {len(space_ids)} spaces for {effective_owner}.")
return space_ids, None
except HfHubHTTPError as e_http:
logger.error(f"HTTP error listing spaces for {owner or 'authenticated user'}: {e_http}")
status_code = e_http.response.status_code if e_http.response else 'N/A'
if status_code == 404:
# 404 could mean the owner doesn't exist or has no public spaces and token doesn't give access
return [], f"HTTP Error ({status_code}): Owner '{owner}' not found or has no accessible spaces."
if status_code in (401, 403):
return [], f"HTTP Error ({status_code}): Access denied or authentication required for listing spaces for '{owner}'. Check token permissions."
return None, f"HTTP Error ({status_code}) listing spaces for '{owner or 'authenticated user'}': {e_http.response.text if e_http.response else str(e_http)}"
except Exception as e:
logger.exception(f"Error listing spaces for {owner or 'authenticated user'}:")
return None, f"Error listing spaces: {e}"