broadfield-dev's picture
Update build_logic.py
fab8f61 verified
raw
history blame
51.1 kB
import os
import re
import tempfile
import shutil
import logging
from pathlib import Path
from huggingface_hub import (
create_repo,
upload_folder,
list_repo_files,
whoami,
hf_hub_download,
delete_file as hf_delete_file,
HfApi,
)
from huggingface_hub.hf_api import CommitOperationDelete, CommitOperationAdd, CommitOperation
from huggingface_hub.utils import HfHubHTTPError
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)
def _get_api_token(ui_token_from_textbox=None):
env_token = os.getenv('HF_TOKEN')
if env_token:
logger.debug("Using HF_TOKEN from environment variable.")
return env_token, None
if ui_token_from_textbox:
logger.debug("Using HF_TOKEN from UI textbox.")
return ui_token_from_textbox.strip(), None
logger.warning("Hugging Face API token not provided in UI or HF_TOKEN env var.")
return None, "Error: Hugging Face API token not provided in UI or HF_TOKEN env var."
def _determine_repo_id(ui_api_token_from_textbox, owner_ui, space_name_ui):
if not space_name_ui: return None, "Error: Space Name cannot be empty."
if "/" in space_name_ui: return None, "Error: Space Name should not contain '/'. Use Owner field for the owner part."
final_owner = owner_ui
error_message = None
if not final_owner:
logger.info("Owner not specified, attempting to auto-detect from token.")
resolved_api_token, token_err = _get_api_token(ui_api_token_from_textbox)
if token_err: return None, f"Error auto-detecting owner: {token_err}"
if not resolved_api_token: return None, "Error: API token required for auto owner determination if Owner field is empty."
try:
user_info = whoami(token=resolved_api_token)
if user_info and 'name' in user_info:
final_owner = user_info['name']
logger.info(f"Auto-detected owner: {final_owner}")
else:
error_message = "Error: Could not retrieve username from token. Check token permissions or specify Owner."
logger.error(error_message)
except Exception as e:
error_message = f"Error retrieving username from token: {str(e)}. Specify Owner or check token."
logger.exception("Error retrieving username from token:")
if error_message: return None, error_message
if not final_owner: return None, "Error: Owner could not be determined. Please specify it in the Owner field."
repo_id = f"{final_owner}/{space_name_ui}"
logger.info(f"Determined repo_id: {repo_id}")
return repo_id, None
def parse_markdown(markdown_input):
space_info = {"repo_name_md": "", "owner_md": "", "files": []}
current_file_path = None
current_file_content_lines = []
in_file_definition = False
in_code_block = False
file_parsing_errors = []
lines = markdown_input.strip().split("\n")
cleaned_lines = []
for line_content_orig in lines:
if line_content_orig.strip().startswith("# "):
if line_content_orig.strip().startswith("# ### File:") or \
line_content_orig.strip().startswith("# ## File Structure") or \
line_content_orig.strip().startswith("# # Space:"):
cleaned_lines.append(line_content_orig.strip()[2:])
else:
cleaned_lines.append(line_content_orig)
else:
cleaned_lines.append(line_content_orig)
lines = cleaned_lines
for i, line_content_orig in enumerate(lines):
line_content_stripped = line_content_orig.strip()
line_num = i + 1
file_match = re.match(r"### File:\s*(?P<filename_line>[^\n]+)", line_content_stripped)
if file_match:
if current_file_path is not None and in_file_definition:
content_to_save = "\n".join(current_file_content_lines).strip()
space_info["files"].append({"path": current_file_path, "content": content_to_save})
filename_line = file_match.group("filename_line").strip()
current_file_path = filename_line
current_file_path = re.split(r'\s*\(', current_file_path, 1)[0].strip()
current_file_path = current_file_path.strip('`\'"').strip()
if not current_file_path:
file_parsing_errors.append(f"Line {line_num}: Found '### File:' but filename is empty or invalid.")
current_file_path = None
in_file_definition = False
continue
current_file_content_lines = []
in_file_definition = True
in_code_block = False
logger.debug(f"Parsed file header: {current_file_path}")
continue
if not in_file_definition:
if line_content_stripped.startswith("# Space:"):
full_space_name_md = line_content_stripped.replace("# Space:", "").strip()
if "/" in full_space_name_md:
parts = full_space_name_md.split("/", 1)
if len(parts) == 2:
space_info["owner_md"], space_info["repo_name_md"] = parts[0].strip(), parts[1].strip()
else:
space_info["repo_name_md"] = full_space_name_md
else:
space_info["repo_name_md"] = full_space_name_md
logger.debug(f"Parsed space header: {space_info['owner_md']}/{space_info['repo_name_md']}")
continue
if line_content_stripped.startswith("## File Structure"):
structure_block_start = i + 1
while structure_block_start < len(lines) and not lines[structure_block_start].strip().startswith("```"):
structure_block_start += 1
if structure_block_start < len(lines) and lines[structure_block_start].strip().startswith("```"):
structure_block_end = structure_block_start + 1
while structure_block_end < len(lines) and not lines[structure_block_end].strip().startswith("```"):
structure_block_end += 1
if structure_block_end < len(lines) and lines[structure_block_end].strip().startswith("```"):
logger.debug(f"Skipping File Structure block from line {i+1} to {structure_block_end+1}")
i = structure_block_end
continue
continue
if in_file_definition:
if line_content_stripped.startswith("```"):
in_code_block = not in_code_block
logger.debug(f"Toggled code block to {in_code_block} at line {line_num}")
continue
if in_code_block:
current_file_content_lines.append(line_content_orig)
elif line_content_stripped.startswith("[Binary file") or line_content_stripped.startswith("[Error loading content:") or line_content_stripped.startswith("[Binary or Skipped file]"):
current_file_content_lines.append(line_content_orig)
logger.debug(f"Parsed binary/error marker for {current_file_path} at line {line_num}")
else:
pass
if current_file_path is not None and in_file_definition:
content_to_save = "\n".join(current_file_content_lines).strip()
space_info["files"].append({"path": current_file_path, "content": content_to_save})
space_info["files"] = [f for f in space_info["files"] if f.get("path")]
space_info["owner_md"] = space_info["owner_md"].strip()
space_info["repo_name_md"] = space_info["repo_name_md"].strip()
if file_parsing_errors:
logger.warning(f"Markdown parsing encountered errors: {file_parsing_errors}")
logger.info(f"Parsed markdown. Found {len(space_info['files'])} files.")
return space_info
def get_space_repository_info(ui_api_token_from_textbox, space_name_ui, owner_ui):
repo_id_for_error_logging = f"{owner_ui}/{space_name_ui}" if owner_ui else space_name_ui
sdk = None
files = []
error = None
repo_id = None
logger.info(f"Attempting to get repo info for {repo_id_for_error_logging}")
try:
resolved_api_token, token_err = _get_api_token(ui_api_token_from_textbox)
if token_err: return None, [], token_err
repo_id, err_repo_id = _determine_repo_id(resolved_api_token, owner_ui, space_name_ui)
if err_repo_id: return None, [], err_repo_id
repo_id_for_error_logging = repo_id
api = HfApi(token=resolved_api_token)
repo_info_obj = api.repo_info(repo_id=repo_id, repo_type="space", timeout=20)
sdk = repo_info_obj.sdk
files = [sibling.rfilename for sibling in repo_info_obj.siblings if sibling.rfilename]
if not files and repo_info_obj.siblings:
logger.warning(f"Repo {repo_id} has siblings but no rfilenames extracted. Total siblings: {len(repo_info_obj.siblings)}")
logger.info(f"Successfully got repo info for {repo_id}. SDK: {sdk}, Files found: {len(files)}")
except HfHubHTTPError as e_http:
logger.error(f"HTTP error getting repo info for {repo_id_for_error_logging or 'unknown repo'}: {e_http}")
error_message = str(e_http)
status_code = e_http.response.status_code if e_http.response is not None else None
if status_code == 404:
error = f"Space '{repo_id_for_error_logging or 'unknown repo'}' not found (404)."
elif status_code in (401,403):
error = f"Access denied for '{repo_id_for_error_logging or 'unknown repo'}' ({status_code}). Check token permissions."
else:
error = f"HTTP Error {status_code or 'unknown'} for '{repo_id_for_error_logging or 'unknown repo'}': {error_message}"
except Exception as e:
logger.warning(f"Could not get full repo_info for {repo_id_for_error_logging or 'unknown repo'}, attempting list_repo_files fallback: {e}")
error = f"Error retrieving Space info for `{repo_id_for_error_logging or 'unknown repo'}`: {str(e)}. Attempting file list fallback."
try:
resolved_api_token_fb, token_err_fb = _get_api_token(ui_api_token_from_textbox)
if token_err_fb: return None, [], f"{error}\nAPI Token Error during fallback: {token_err_fb}"
repo_id_fb, err_repo_id_fb = _determine_repo_id(resolved_api_token_fb, owner_ui, space_name_ui)
if err_repo_id_fb: return None, [], f"{error}\nRepo ID Error during fallback: {err_repo_id_fb}"
files = list_repo_files(repo_id=repo_id_fb, token=resolved_api_token_fb, repo_type="space", timeout=20)
error = f"Warning: Could not fetch full Space info (SDK etc.) for `{repo_id_for_error_logging or 'unknown repo'}`: {str(e)}. File list loaded via fallback."
logger.info(f"Fallback list_repo_files successful for {repo_id_fb}. Files found: {len(files)}")
except HfHubHTTPError as e2_http:
logger.error(f"HTTP error during fallback list_repo_files for {repo_id_for_error_logging or 'unknown repo'}: {e2_http}")
error_message_fb = str(e2_http)
status_code_fb = e2_http.response.status_code if e2_http.response is not None else None
if status_code_fb == 404:
error = f"Space '{repo_id_for_error_logging or 'unknown repo'}' not found during fallback (404)."
else:
error = f"HTTP Error {status_code_fb or 'unknown'} for '{repo_id_for_error_logging or 'unknown repo'}' during fallback: {error_message_fb}"
files = []
except Exception as e2:
logger.exception(f"Error listing files for {repo_id_for_error_logging or 'unknown repo'} during fallback: {e2}")
error = f"{error}\nError listing files during fallback for `{repo_id_for_error_logging or 'unknown repo'}`: {str(e2)}"
files = []
if not files and not error and (repo_id_for_error_logging is not None):
error = f"No files found in Space `{repo_id_for_error_logging or 'unknown repo'}`."
return sdk, files, error
def list_space_files_for_browsing(ui_api_token_from_textbox, space_name_ui, owner_ui):
files, err = get_space_repository_info(ui_api_token_from_textbox, space_name_ui, owner_ui)[1:]
return files, err
def get_space_file_content(ui_api_token_from_textbox, space_name_ui, owner_ui, file_path_in_repo):
repo_id_for_error_logging = f"{owner_ui}/{space_name_ui}" if owner_ui else space_name_ui
repo_id = None
logger.info(f"Attempting to get content for file '{file_path_in_repo}' from {repo_id_for_error_logging}")
try:
resolved_api_token, token_err = _get_api_token(ui_api_token_from_textbox)
if token_err: return None, token_err
repo_id, err_repo_id = _determine_repo_id(resolved_api_token, owner_ui, space_name_ui)
if err_repo_id: return None, err_repo_id
repo_id_for_error_logging = repo_id
if not file_path_in_repo: return None, "Error: File path cannot be empty."
file_path_in_repo = file_path_in_repo.replace("\\", "/")
downloaded_file_path = hf_hub_download(
repo_id=repo_id,
filename=file_path_in_repo,
repo_type="space",
token=resolved_api_token,
local_dir_use_symlinks=False,
cache_dir=None,
timeout=20
)
content = Path(downloaded_file_path).read_text(encoding="utf-8")
logger.info(f"Successfully downloaded and read content for '{file_path_in_repo}'.")
return content, None
except FileNotFoundError:
logger.error(f"FileNotFoundError for '{file_path_in_repo}' in {repo_id_for_error_logging or 'unknown'}")
return None, f"Error: File '{file_path_in_repo}' not found in Space '{repo_id_for_error_logging or 'unknown repo'}' (404)."
except UnicodeDecodeError:
logger.warning(f"UnicodeDecodeError for '{file_path_in_repo}'. Likely binary.")
return None, f"Error: File '{file_path_in_repo}' is not valid UTF-8 text. Cannot display."
except HfHubHTTPError as e_http:
logger.error(f"HTTP error fetching file {file_path_in_repo} from {repo_id_for_error_logging or 'unknown repo'}: {e_http}")
error_message = str(e_http)
status_code = e_http.response.status_code if e_http.response is not None else None
if status_code == 404:
return None, f"Error: File '{file_path_in_repo}' not found in Space '{repo_id_for_error_logging or 'unknown repo'}' (404)."
if status_code in (401, 403):
return None, f"Error: Access denied or authentication required for '{repo_id_for_error_logging or 'unknown repo'}' ({status_code}). Check token permissions."
return None, f"HTTP Error {status_code or 'unknown'} fetching file '{file_path_in_repo}': {error_message}"
except Exception as e:
logger.exception(f"Error fetching file content for {file_path_in_repo} from {repo_id_for_error_logging or 'unknown repo'}:")
return None, f"Error fetching file content: {str(e)}"
# Renamed and modified to only handle file changes (CREATE, UPDATE, DELETE)
def apply_staged_file_changes(ui_api_token_from_textbox, owner_ui, space_name_ui, file_changeset):
repo_id_for_error_logging = f"{owner_ui}/{space_name_ui}" if owner_ui else space_name_ui
repo_id = None
status_messages = []
logger.info(f"Attempting to apply {len(file_changeset)} staged file changes to {repo_id_for_error_logging}")
if not owner_ui or not space_name_ui:
return "Error: Cannot apply file changes. Owner and Space Name must be provided."
try:
resolved_api_token, token_err = _get_api_token(ui_api_token_from_textbox)
if token_err: return f"API Token Error: {token_err}"
repo_id, err_repo_id = _determine_repo_id(resolved_api_token, owner_ui, space_name_ui)
if err_repo_id: return f"Repo ID Error: {err_repo_id}"
repo_id_for_error_logging = repo_id
api = HfApi(token=resolved_api_token)
temp_dir = None
paths_to_upload = {}
delete_operations = []
try:
temp_dir = tempfile.TemporaryDirectory()
repo_staging_path = Path(temp_dir.name) / "repo_staging_content"
repo_staging_path.mkdir(exist_ok=True)
gitattributes_path_local = repo_staging_path / ".gitattributes"
try:
with open(gitattributes_path_local, "w", encoding="utf-8") as f:
f.write("* text=auto eol=lf\n")
paths_to_upload[str(gitattributes_path_local)] = ".gitattributes"
except Exception as e:
status_messages.append(f"Warning: Could not stage .gitattributes file: {e}")
logger.warning(f"Could not stage .gitattributes: {e}")
for change in file_changeset: # Iterate only through file changes
if change['type'] == 'UPDATE_FILE' or change['type'] == 'CREATE_FILE':
file_path_in_repo = change['path'].lstrip('/').replace(os.sep, '/')
if not file_path_in_repo:
status_messages.append(f"Skipping {change['type']} operation: empty path.")
continue
content_to_write = change.get('content', '')
if content_to_write.startswith("[Binary file") or content_to_write.startswith("[Error loading content:") or content_to_write.startswith("[Binary or Skipped file]"):
status_messages.append(f"Skipping {change['type']} for '{file_path_in_repo}': Content is a binary/error placeholder.")
logger.warning(f"Skipping {change['type']} operation for '{file_path_in_repo}': Content is binary/error placeholder.")
continue
file_path_local = repo_staging_path / file_path_in_repo
file_path_local.parent.mkdir(parents=True, exist_ok=True)
try:
with open(file_path_local, "w", encoding="utf-8") as f:
f.write(content_to_write)
paths_to_upload[str(file_path_local)] = file_path_in_repo
logger.debug(f"Staged file for {change['type']}: {file_path_in_repo}")
except Exception as file_write_error:
status_messages.append(f"Error staging file {file_path_in_repo} for {change['type']}: {file_write_error}")
logger.error(f"Error writing file {file_path_in_repo} during staging for {change['type']}: {file_write_error}")
elif change['type'] == 'DELETE_FILE':
file_path_in_repo = change['path'].lstrip('/').replace(os.sep, '/')
if not file_path_in_repo:
status_messages.append(f"Skipping DELETE_FILE operation: empty path.")
continue
delete_operations.append(CommitOperationDelete(path_in_repo=file_path_in_repo))
logger.debug(f"Added DELETE_FILE operation for: {file_path_in_repo}")
if delete_operations:
try:
commit_message_delete = f"AI Space Builder: Deleted {len(delete_operations)} files."
logger.info(f"Performing delete commit for {repo_id_for_error_logging}: {commit_message_delete}")
api.create_commit(
repo_id=repo_id,
repo_type="space",
operations=delete_operations,
commit_message=commit_message_delete,
timeout=30
)
status_messages.append(f"File Deletions: Successfully committed {len(delete_operations)} deletions.")
logger.info("Delete commit successful.")
except HfHubHTTPError as e_http:
status_messages.append(f"File Deletion HTTP Error ({e_http.response.status_code if e_http.response else 'N/A'}): {e_http.response.text if e_http.response else str(e_http)}. Check logs.")
logger.error(f"HTTP error during delete commit for {repo_id}: {e_http}")
except Exception as e_delete_commit:
status_messages.append(f"File Deletion Error: {str(e_delete_commit)}. Check logs.")
logger.exception(f"Error during delete commit for {repo_id}:")
if paths_to_upload:
try:
commit_message_upload = f"AI Space Builder: Updated Space content for {repo_id}"
logger.info(f"Uploading staged files from {str(repo_staging_path)} to {repo_id}...")
upload_folder(
repo_id=repo_id,
folder_path=str(repo_staging_path),
path_in_repo=".",
token=resolved_api_token,
repo_type="space",
commit_message=commit_message_upload,
allow_patterns=["*"],
timeout=120 # Increased timeout for uploads
)
status_messages.append(f"File Uploads/Updates: Successfully uploaded/updated {len(paths_to_upload)} files.")
logger.info("Upload/Update commit successful.")
except HfHubHTTPError as e_http:
status_messages.append(f"File Upload/Update HTTP Error ({e_http.response.status_code if e_http.response else 'N/A'}): {e_http.response.text if e_http.response else str(e_http)}. Check logs.")
logger.error(f"HTTP error during upload_folder for {repo_id}: {e_http}")
except Exception as e_upload:
status_messages.append(f"File Upload/Update Error: {str(e_upload)}. Check logs.")
logger.exception(f"Error during upload_folder for {repo_id}:")
else:
status_messages.append("No file changes (create/update/delete) to commit.")
logger.info("No file changes to commit.")
finally:
if temp_dir:
try:
temp_dir.cleanup()
logger.info("Cleaned up temporary staging directory.")
except Exception as e:
logger.error(f"Error cleaning up temp dir: {e}")
except HfHubHTTPError as e_http:
logger.error(f"Top-level HTTP error during apply_staged_file_changes for {repo_id_for_error_logging or 'unknown repo'}: {e_http}")
status_messages.append(f"API HTTP Error ({e_http.response.status_code if e_http.response else 'N/A'}): {e_http.response.text if e_http.response else str(e_http)}")
except Exception as e:
logger.exception(f"Top-level error during apply_staged_file_changes for {repo_id_for_error_logging or 'unknown repo'}:")
status_messages.append(f"An unexpected error occurred during apply file changes: {str(e)}")
final_status = " | ".join(status_messages) if status_messages else "No file operations were applied."
logger.info(f"Finished applying staged file changes. Final status: {final_status}")
return final_status
def delete_space_file(ui_api_token_from_textbox, space_name_ui, owner_ui, file_path_in_repo, commit_message_ui=None):
repo_id_for_error_logging = f"{owner_ui}/{space_name_ui}" if owner_ui else space_name_ui
repo_id = None
logger.info(f"Attempting manual file deletion for '{file_path_in_repo}' from {repo_id_for_error_logging}")
try:
resolved_api_token, token_err = _get_api_token(ui_api_token_from_textbox)
if token_err: return f"API Token Error: {token_err}"
repo_id, err_repo_id = _determine_repo_id(resolved_api_token, owner_ui, space_name_ui)
if err_repo_id: return f"Repo ID Error: {err_repo_id}"
repo_id_for_error_logging = repo_id
if not file_path_in_repo: return "Error: File path cannot be empty for deletion."
file_path_in_repo = file_path_in_repo.lstrip('/').replace(os.sep, '/')
effective_commit_message = commit_message_ui or f"Deleted file: {file_path_in_repo} via AI Space Editor UI"
hf_delete_file(
path_in_repo=file_path_in_repo,
repo_id=repo_id,
repo_type="space",
token=resolved_api_token,
commit_message=effective_commit_message,
timeout=20
)
logger.info(f"Successfully deleted file: {file_path_in_repo}")
return f"Successfully deleted file: `{file_path_in_repo}`"
except FileNotFoundError:
logger.error(f"FileNotFoundError during manual delete for '{file_path_in_repo}' in {repo_id_for_error_logging or 'unknown'}")
return f"Delete Error: File '{file_path_in_repo}' not found in Space '{repo_id_for_error_logging or 'unknown repo'}' (404)."
except HfHubHTTPError as e_http:
logger.error(f"HTTP error deleting file {file_path_in_repo} from {repo_id_for_error_logging or 'unknown repo'}: {e_http}")
error_message = str(e_http)
status_code = e_http.response.status_code if e_http.response is not None else None
if status_code == 404:
return f"Delete Error: File '{file_path_in_repo}' not found in Space '{repo_id_for_error_logging or 'unknown repo'}' for deletion (404)."
if status_code in (401, 403):
return f"Delete Error: Access denied or authentication required for '{repo_id_for_error_logging or 'unknown repo'}' ({status_code}). Check token permissions."
return f"Delete HTTP Error {status_code or 'unknown'} deleting file '{file_path_in_repo}': {error_message}"
except Exception as e:
logger.exception(f"Error deleting file {file_path_in_repo} from {repo_id_for_error_logging or 'unknown repo'}:")
return f"Delete Error deleting file '{file_path_in_repo}': {str(e)}"
def update_space_file(ui_api_token_from_textbox, space_name_ui, owner_ui, file_path_in_repo, file_content, commit_message_ui):
repo_id_for_error_logging = f"{owner_ui}/{space_name_ui}" if owner_ui else space_name_ui
repo_id = None
logger.info(f"Attempting manual file update for '{file_path_in_repo}' in {repo_id_for_error_logging}")
try:
resolved_api_token, token_err = _get_api_token(ui_api_token_from_textbox)
if token_err: return f"API Token Error: {token_err}"
repo_id, err_repo_id = _determine_repo_id(resolved_api_token, owner_ui, space_name_ui)
if err_repo_id: return f"Repo ID Error: {err_repo_id}"
repo_id_for_error_logging = repo_id
if not file_path_in_repo: return "Update Error: File Path to update cannot be empty."
file_path_in_repo = file_path_in_repo.lstrip('/').replace(os.sep, '/')
commit_msg = commit_message_ui or f"Update {file_path_in_repo} via AI Space Editor UI"
api = HfApi(token=resolved_api_token)
tmp_file_path = None
try:
with tempfile.NamedTemporaryFile(mode='w', delete=False, encoding='utf-8') as tmp_file_obj:
tmp_file_obj.write(file_content)
tmp_file_path = tmp_file_obj.name
api.upload_file(
path_or_fileobj=tmp_file_path,
path_in_repo=file_path_in_repo,
repo_id=repo_id,
repo_type="space",
commit_message=commit_msg,
timeout=20
)
logger.info(f"Successfully updated file: {file_path_in_repo}")
return f"Successfully updated `{file_path_in_repo}`"
finally:
if tmp_file_path and os.path.exists(tmp_file_path):
os.remove(tmp_file_path)
except FileNotFoundError:
logger.error(f"FileNotFoundError during manual update for '{file_path_in_repo}' in {repo_id_for_error_logging or 'unknown'}")
return f"Update Error: Local temporary file not found during upload for '{file_path_in_repo}'."
except UnicodeDecodeError:
logger.warning(f"UnicodeDecodeError for '{file_path_in_repo}' during manual update.")
return f"Update Error: Content for '{file_path_in_repo}' is not valid UTF-8 text. Cannot edit this way."
except HfHubHTTPError as e_http:
logger.error(f"HTTP error in update_space_file for {repo_id_for_error_logging or 'unknown repo'}, file {file_path_in_repo}: {e_http}")
error_message = str(e_http)
status_code = e_http.response.status_code if e_http.response is not None else None
if status_code == 404:
return f"Update Error: Space '{repo_id_for_error_logging or 'unknown repo'}' or file '{file_path_in_repo}' not found (404)."
if status_code in (401, 403):
return f"Update Error: Access denied or authentication required for '{repo_id_for_error_logging or 'unknown repo'}' ({status_code}). Check token permissions."
return f"Update HTTP Error {status_code or 'unknown'} updating file '{file_path_in_repo}': {error_message}"
except Exception as e:
logger.exception(f"Error in update_space_file for {repo_id_for_error_logging or 'unknown repo'}, file {file_path_in_repo}:")
return f"Update Error updating file for `{repo_id_for_error_logging or 'unknown repo'}`: {str(e)}"
def get_space_runtime_status(ui_api_token_from_textbox, space_name_ui, owner_ui):
repo_id_for_error_logging = f"{owner_ui}/{space_name_ui}" if owner_ui else space_name_ui
repo_id = None
logger.info(f"Fetching runtime status for Space: {repo_id_for_error_logging}")
try:
resolved_api_token, token_err = _get_api_token(ui_api_token_from_textbox)
if token_err: return None, f"API Token Error: {token_err}"
repo_id, err_repo_id = _determine_repo_id(resolved_api_token, owner_ui, space_name_ui)
if err_repo_id: return None, f"Repo ID Error: {err_repo_id}"
repo_id_for_error_logging = repo_id
api = HfApi(token=resolved_api_token)
runtime_info = api.get_space_runtime(repo_id=repo_id, timeout=20)
logger.info(f"Received runtime info for {repo_id}. Stage: {runtime_info.stage}")
status_details = {
"stage": runtime_info.stage,
"hardware": runtime_info.hardware,
"requested_hardware": runtime_info.requested_hardware if hasattr(runtime_info, 'requested_hardware') else None,
"error_message": None,
"status": runtime_info.status if hasattr(runtime_info, 'status') else None,
"full_log_link": f"https://huggingface.co/spaces/{repo_id}/logs" if repo_id else "#"
}
if runtime_info.stage == "ERRORED":
error_content = None
if hasattr(runtime_info, 'error') and runtime_info.error: error_content = str(runtime_info.error)
if 'build' in runtime_info.raw and isinstance(runtime_info.raw['build'], dict) and runtime_info.raw['build'].get('status') == 'error':
error_content = f"Build Error: {runtime_info.raw['build'].get('message', error_content or 'Unknown build error')}"
elif 'run' in runtime_info.raw and isinstance(runtime_info.raw['run'], dict) and runtime_info.raw['run'].get('status') == 'error':
error_content = f"Runtime Error: {runtime_info.raw['run'].get('message', error_content or 'Unknown runtime error')}"
elif 'message' in runtime_info.raw and isinstance(runtime_info.raw['message'], str) and ('error' in runtime_info.raw['message'].lower() or runtime_info.raw['message'].strip().endswith('!')):
error_content = runtime_info.raw['message']
status_details["error_message"] = error_content if error_content else "Space is in an errored state. Check logs for details."
logger.info(f"Runtime status details for {repo_id}: {status_details}")
return status_details, None
except HfHubHTTPError as e_http:
logger.error(f"HTTP error fetching runtime status for {repo_id_for_error_logging or 'unknown repo'}: {e_http}")
error_message = str(e_http)
status_code = e_http.response.status_code if e_http.response is not None else None
if status_code == 404:
return None, f"Status Error: Space '{repo_id_for_error_logging or 'unknown repo'}' not found or has no active runtime status (404)."
if status_code in (401, 403):
return None, f"Status Error: Access denied or authentication required for '{repo_id_for_error_logging or 'unknown repo'}' ({status_code}). Check token permissions."
return None, f"Status HTTP Error {status_code or 'unknown'} fetching runtime status for '{repo_id_for_error_logging or 'unknown repo'}': {error_message}"
except Exception as e:
logger.exception(f"Error fetching runtime status for {repo_id_for_error_logging or 'unknown repo'}:")
return None, f"Status Error fetching runtime status: {str(e)}"
def build_logic_set_space_privacy(hf_api_key, repo_id, private: bool):
logger.info(f"Attempting to set privacy for '{repo_id}' to {private}.")
try:
token, err = _get_api_token(hf_api_key)
if err or not token:
logger.error(f"Token error setting privacy: {err or 'Token not found'}")
return f"Error getting token: {err or 'Token not found.'}"
api = HfApi(token=token)
api.update_repo_visibility(repo_id=repo_id, private=private, repo_type='space')
logger.info(f"Successfully set privacy for {repo_id} to {private}.")
return f"Successfully set privacy for `{repo_id}` to `{private}`."
except HfHubHTTPError as e_http:
logger.error(f"HTTP error setting privacy for {repo_id}: {e_http}")
status_code = e_http.response.status_code if e_http.response else 'N/A'
return f"HTTP Error ({status_code}) setting privacy for `{repo_id}`: {e_http.response.text if e_http.response else str(e_http)}"
except Exception as e:
logger.exception(f"Error setting privacy for {repo_id}:")
return f"Error setting privacy for `{repo_id}`: {e}"
def build_logic_delete_space(hf_api_key, owner, space_name):
repo_id = f"{owner}/{space_name}"
logger.warning(f"Attempting DESTRUCTIVE DELETE_SPACE action for '{repo_id}'.")
try:
token, err = _get_api_token(hf_api_key)
if err or not token:
logger.error(f"Token error deleting space: {err or 'Token not found'}")
return f"Error getting token: {err or 'Token not found.'}"
api = HfApi(token=token)
api.delete_repo(repo_id=repo_id, repo_type='space')
logger.warning(f"Successfully deleted space {repo_id}.")
return f"Successfully deleted space `{repo_id}`."
except HfHubHTTPError as e_http:
logger.error(f"HTTP error deleting space {repo_id}: {e_http}")
status_code = e_http.response.status_code if e_http.response else 'N/A'
return f"HTTP Error ({status_code}) deleting space `{repo_id}`: {e_http.response.text if e_http.response else str(e_http)}"
except Exception as e:
logger.exception(f"Error deleting space {repo_id}:")
return f"Error deleting space `{repo_id}`: {e}"
def build_logic_create_pull_request(hf_api_key, source_repo_id, target_repo_id, title, body=""):
logger.info(f"Attempting to create PR from '{source_repo_id}' to '{target_repo_id}'. Title: '{title}'")
try:
token, err = _get_api_token(hf_api_key)
if err or not token:
logger.error(f"Token error creating PR: {err or 'Token not found'}")
return f"Error getting token: {err or 'Token not found.'}"
api = HfApi(token=token)
# Assuming the source repo is a Space and the target can be any repo type
# Assuming PR is from source_repo_id main branch to target_repo_id main branch
# This might need refinement based on actual use cases (e.g., PR from space branch to model repo main)
# For simplicity now, assume PR from source_repo_id (space) 'main' to target_repo_id 'main'
# Need to check if the token has write access to target_repo_id.
# A PR is created on the *target* repository
# We need to check if the token can write to the target.
# There isn't a direct "check write permission" API, so we rely on create_pull_request errors.
pr_url = api.create_pull_request(
repo_id=target_repo_id, # PR is created ON the target repo
title=title,
description=body,
# Source branch is the branch *in the source repo* (the space)
# Target branch is the branch *in the target repo*
# By default, create_pull_request assumes 'main' for both,
# but source is interpreted as 'main' in the *source_repo_id*
# and target as 'main' in the *repo_id* parameter (target_repo_id).
# The API docs mention `repo_id` is the target and `base_repo` is the source for cross-repo PRs.
# Let's use `repo_id` as target and `base_repo` as source (the space).
base_repo=source_repo_id, # The Space repo is the source
base="main", # Source branch in the Space (source_repo_id)
head="main", # Target branch in the target_repo_id
token=token,
timeout=30
)
logger.info(f"Successfully created PR: {pr_url}")
return f"Successfully created Pull Request: {pr_url}"
except HfHubHTTPError as e_http:
logger.error(f"HTTP error creating PR from {source_repo_id} to {target_repo_id}: {e_http}")
status_code = e_http.response.status_code if e_http.response else 'N/A'
# Check for common errors like permission denied or target not found
if status_code in (401, 403):
return f"PR Error ({status_code}): Access denied or authentication required to create PR on '{target_repo_id}'. Check token permissions."
if status_code == 404:
return f"PR Error ({status_code}): Target repository '{target_repo_id}' not found."
# Add more specific error checks if needed (e.g., PR already exists, invalid branches)
if e_http.response and 'already exists' in e_http.response.text:
return f"PR Error: Pull Request already exists."
return f"PR HTTP Error ({status_code}): {e_http.response.text if e_http.response else str(e_http)}"
except Exception as e:
logger.exception(f"Error creating PR from {source_repo_id} to {target_repo_id}:")
return f"PR Error: {e}"
def build_logic_add_comment(hf_api_key, repo_id, comment_text):
logger.info(f"Attempting to add comment to '{repo_id}'. Text: '{comment_text[:50]}...'")
try:
token, err = _get_api_token(hf_api_key)
if err or not token:
logger.error(f"Token error adding comment: {err or 'Token not found'}")
return f"Error getting token: {err or 'Token not found.'}"
api = HfApi(token=token)
# The create_discussion method can be used for comments on the repo itself
api.create_discussion(
repo_id=repo_id,
title=f"Comment from AI Space Commander [{time.strftime('%Y-%m-%d %H:%M')}]", # Title required, use timestamp
description=comment_text,
token=token,
timeout=20
)
# Note: This creates a *discussion* rather than a direct comment stream entry.
# A more direct comment API might exist or could be simulated via other means,
# but create_discussion is the closest public API method for adding arbitrary text.
# Let's clarify this limitation or use a different method if available (e.g., adding a commit comment).
# HfApi does not seem to expose a simple "add comment to repo page" API.
# create_discussion is the most reasonable public function.
# Re-reading the API docs... maybe there's no public API for the little comment boxes?
# The Discussions API is the closest. Let's use that, but inform the user it's a discussion.
# Or, maybe the AI should be suggesting adding a comment via a COMMIT message?
# The prompt asks for "send comment", which implies a direct comment stream.
# This might require an internal API or simulating via another method.
# Let's stick to the most direct public API for now: Discussions.
# Or, better, tell the AI this action is not supported via public API or needs clarification.
# Let's return an error message for now, as `create_discussion` isn't what the user likely means by "add comment".
# Okay, let's re-read the original codebase. There *was* no comment/like function. The user is *requesting* them.
# The simplest interpretation of "add comment" is adding to the discussions tab or commit comments.
# Let's add `create_discussion` for now as it's a public API and the closest fit.
# We should clarify the AI prompt/action description to mention it creates a *discussion*.
# Reverted: create_discussion requires a *title*. The prompt just says "comment text".
# This reinforces that the requested action might not map directly to a public API.
# Let's implement it using `create_discussion` and add a default title.
api.create_discussion(
repo_id=repo_id,
title=f"AI Space Commander Comment: {comment_text[:50]}{'...' if len(comment_text) > 50 else ''} [{time.strftime('%Y-%m-%d %H:%M')}]", # Use part of comment + timestamp as title
description=comment_text, # Full comment in description
token=token,
timeout=20
)
logger.info(f"Successfully added comment (as discussion) to {repo_id}.")
return f"Successfully added comment (as a discussion) to `{repo_id}`."
except HfHubHTTPError as e_http:
logger.error(f"HTTP error adding comment to {repo_id}: {e_http}")
status_code = e_http.response.status_code if e_http.response else 'N/A'
if status_code in (401, 403):
return f"Comment Error ({status_code}): Access denied or authentication required to add comment on '{repo_id}'. Check token permissions."
if status_code == 404:
return f"Comment Error ({status_code}): Repository '{repo_id}' not found."
return f"Comment HTTP Error ({status_code}): {e_http.response.text if e_http.response else str(e_http)}"
except Exception as e:
logger.exception(f"Error adding comment to {repo_id}:")
return f"Comment Error: {e}"
def build_logic_like_space(hf_api_key, repo_id):
logger.info(f"Attempting to like space '{repo_id}'.")
try:
token, err = _get_api_token(hf_api_key)
if err or not token:
logger.error(f"Token error liking space: {err or 'Token not found'}")
return f"Error getting token: {err or 'Token not found.'}"
api = HfApi(token=token)
# HfApi does not have a direct 'like' method.
# This action might also require an internal API or not be publicly exposed.
# Let's return an error for now, or note it's not directly supported.
# Given the request "add... like", it implies it *should* be possible.
# A manual approach might involve an authenticated API call not wrapped by hf_hub.
# For now, let's return an informative error.
# Reverted: There *is* an internal endpoint used by the UI. It's not in the public `HfApi`.
# Implementing this reliably without internal API knowledge is difficult and fragile.
# Let's return an error message stating it's not supported via the public API.
return f"Like Error: Liking spaces is not directly supported via the public Hugging Face Hub API used by this tool."
# Example (likely unstable/unsupported) attempt using requests if we knew the endpoint:
# like_url = f"https://huggingface.co/api/v1/repos/{repo_id}/like"
# headers = {"Authorization": f"Bearer {token}"}
# response = requests.post(like_url, headers=headers, timeout=10)
# response.raise_for_status()
# logger.info(f"Successfully liked space: {repo_id}")
# return f"Successfully liked space: `{repo_id}`."
# except requests.exceptions.HTTPError as e:
# # Handle 409 Conflict (already liked) or other errors
# status_code = e.response.status_code if e.response else 'N/A'
# if status_code == 409: return f"Like Error: Space '{repo_id}' already liked."
# return f"Like HTTP Error ({status_code}): {e.response.text if e.response else str(e)}"
# except Exception as e:
# logger.exception(f"Error liking space {repo_id}:")
# return f"Like Error: {e}"
except Exception as e: # Catch potential errors even in the error path above
logger.exception(f"Unexpected error in build_logic_like_space for {repo_id}:")
return f"Like Error: An unexpected error occurred: {e}"
def duplicate_space(hf_api_key, source_repo_id, target_repo_id, private: bool = False):
"""Duplicates a Hugging Face Space."""
logger.info(f"Attempting to duplicate '{source_repo_id}' to '{target_repo_id}' (private={private}).")
try:
token, err = _get_api_token(hf_api_key)
if err or not token:
logger.error(f"Token error duplicating space: {err or 'Token not found'}")
return f"Error getting token: {err or 'Token not found.'}"
if '/' in target_repo_id:
target_owner, target_space_name = target_repo_id.split('/', 1)
if not target_owner or not target_space_name or '/' in target_space_name:
return f"Error: Invalid target repository ID format '{target_repo_id}'. Must be '<owner>/<space_name>'."
else:
target_space_name = target_repo_id
try:
user_info = whoami(token=token)
target_owner = user_info.get('name')
if not target_owner: raise Exception("Could not determine owner from token.")
target_repo_id = f"{target_owner}/{target_space_name}"
except Exception as e:
logger.error(f"Could not determine target owner from token: {e}")
return f"Error: Target repository ID '{target_repo_id}' is missing owner, and owner could not be determined from token ({e}). Use '<owner>/<space_name>' format or set the Owner field."
api = HfApi(token=token) # Use HfApi object to call duplicate_repo
api.duplicate_repo(
from_repo=source_repo_id,
to_repo=target_repo_id,
repo_type="space",
token=token,
private=private,
exist_ok=True
)
logger.info(f"Successfully duplicated space from {source_repo_id} to {target_repo_id}.")
return f"Successfully duplicated space from `{source_repo_id}` to `{target_repo_id}`."
except HfHubHTTPError as e_http:
logger.error(f"HTTP error duplicating space from {source_repo_id} to {target_repo_id}: {e_http}")
status_code = e_http.response.status_code if e_http.response else 'N/A'
return f"HTTP Error ({status_code}) duplicating space: {e_http.response.text if e_http.response else str(e_http)}"
except Exception as e:
logger.exception(f"Error duplicating space from {source_repo_id} to {target_repo_id}:")
return f"Error duplicating space: {e}"
def list_user_spaces(hf_api_key, owner=None):
"""Lists spaces for the authenticated user or a specific owner/org."""
logger.info(f"Attempting to list spaces for owner: {owner or 'authenticated user'}.")
try:
token, err = _get_api_token(hf_api_key)
if err or not token:
logger.error(f"Token error listing spaces: {err or 'Token not found'}")
return None, f"Error getting token: {err or 'Token not found.'}"
effective_owner = owner
if not effective_owner:
try:
user_info = whoami(token=token)
effective_owner = user_info.get('name')
if not effective_owner: raise Exception("Could not determine owner from token.")
logger.info(f"Listing spaces for auto-detected owner: {effective_owner}")
except Exception as e:
logger.error(f"Could not determine owner from token for listing: {e}")
return None, f"Error auto-detecting owner for listing: {e}. Please specify Owner field."
api = HfApi(token=token)
#spaces = hf_list_repos(effective_owner,token=token)
user_repos = {}
# List models for the user
models = api.list_models(author=owner)
user_repos['models'] = [model.id for model in models]
# List datasets for the user
datasets = api.list_datasets(author=owner)
user_repos['datasets'] = [dataset.id for dataset in datasets]
# List Spaces for the user
spaces = api.list_spaces(author=owner)
user_repos['spaces'] = [space.id for space in spaces]
space_ids = [f"{r.author}/{r.id}" for r in spaces]
logger.info(f"Successfully listed {len(space_ids)} spaces for {effective_owner}.")
return space_ids, None
except HfHubHTTPError as e_http:
logger.error(f"HTTP error listing spaces for {owner or 'authenticated user'}: {e_http}")
status_code = e_http.response.status_code if e_http.response else 'N/A'
if status_code == 404:
return [], f"HTTP Error ({status_code}): Owner '{owner}' not found or has no accessible spaces."
if status_code in (401, 403):
return [], f"HTTP Error ({status_code}): Access denied or authentication required for listing spaces for '{owner}'. Check token permissions."
return None, f"HTTP Error ({status_code}) listing spaces for '{owner or 'authenticated user'}': {e_http.response.text if e_http.response else str(e_http)}"
except Exception as e:
logger.exception(f"Error listing spaces for {owner or 'authenticated user'}:")
return None, f"Error listing spaces: {e}"
def list_user_spaces(username):
api = HfApi()
user_repos = {}
# List models for the user
models = api.list_models(author=username)
user_repos['models'] = [model.id for model in models]
# List datasets for the user
datasets = api.list_datasets(author=username)
user_repos['datasets'] = [dataset.id for dataset in datasets]
# List Spaces for the user
spaces = api.list_spaces(author=username)
user_repos['spaces'] = [space.id for space in spaces]
return user_repos