broadfield-dev's picture
Update build_logic.py
db7e1e2 verified
raw
history blame
51.3 kB
import os
import re
import tempfile
import shutil
import logging
from pathlib import Path
from huggingface_hub import (
create_repo,
upload_folder,
list_repo_files,
whoami,
hf_hub_download,
delete_file as hf_delete_file,
HfApi
)
from huggingface_hub.hf_api import CommitOperationDelete, CommitOperationAdd, CommitOperation
# Import the general HTTP error from huggingface_hub.utils
from huggingface_hub.utils import HfHubHTTPError # For catching specific HF HTTP errors
# Setup basic logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)
# --- Helper Function to Get API Token ---
def _get_api_token(ui_token_from_textbox=None):
env_token = os.getenv('HF_TOKEN')
if env_token:
logger.info("Using HF_TOKEN from environment variable.")
return env_token, None
if ui_token_from_textbox:
logger.info("Using HF_TOKEN from UI textbox.")
return ui_token_from_textbox.strip(), None
logger.warning("Hugging Face API token not provided in UI or HF_TOKEN env var.")
return None, "Error: Hugging Face API token not provided in UI or HF_TOKEN env var."
# --- Helper Function to Determine Repo ID ---
def _determine_repo_id(ui_api_token_from_textbox, space_name_ui, owner_ui):
if not space_name_ui: return None, "Error: Space Name cannot be empty."
if "/" in space_name_ui: return None, "Error: Space Name should not contain '/'. Use Owner field for the owner part."
final_owner = owner_ui
error_message = None
if not final_owner:
logger.info("Owner not specified, attempting to auto-detect from token.")
resolved_api_token, token_err = _get_api_token(ui_api_token_from_textbox)
if token_err: return None, token_err
if not resolved_api_token: return None, "Error: API token required for auto owner determination if Owner field is empty."
try:
user_info = whoami(token=resolved_api_token)
if user_info and 'name' in user_info:
final_owner = user_info['name']
logger.info(f"Auto-detected owner: {final_owner}")
else:
error_message = "Error: Could not retrieve username from token. Check token permissions or specify Owner."
logger.error(error_message)
except Exception as e:
error_message = f"Error retrieving username from token: {str(e)}. Specify Owner or check token."
logger.exception("Error retrieving username from token:")
if error_message: return None, error_message
if not final_owner: return None, "Error: Owner could not be determined. Please specify it in the Owner field."
repo_id = f"{final_owner}/{space_name_ui}"
logger.info(f"Determined repo_id: {repo_id}")
return repo_id, None
# --- Corrected Markdown Parsing ---
# This function remains mostly the same as its purpose is just to parse the *AI's output format*
# into a structured format, not necessarily to represent the *current state* of a Space.
# The app.py logic will use this output and combine it with the current Space state.
def parse_markdown(markdown_input):
space_info = {"repo_name_md": "", "owner_md": "", "files": []}
current_file_path = None
current_file_content_lines = []
in_file_definition = False
in_code_block = False
file_parsing_errors = [] # To collect potential parsing issues
lines = markdown_input.strip().split("\n")
# Clean up potential leading '#' added by Gradio's Markdown sometimes
cleaned_lines = []
for line_content_orig in lines:
if line_content_orig.strip().startswith("# "):
# Only strip leading # if it looks like a Markdown heading related to our format
if line_content_orig.strip().startswith("# ### File:") or \
line_content_orig.strip().startswith("# ## File Structure") or \
line_content_orig.strip().startswith("# # Space:"):
cleaned_lines.append(line_content_orig.strip()[2:])
else:
cleaned_lines.append(line_content_orig)
else:
cleaned_lines.append(line_content_orig)
lines = cleaned_lines
for i, line_content_orig in enumerate(lines):
line_content_stripped = line_content_orig.strip()
line_num = i + 1
# Check for file header
file_match = re.match(r"### File:\s*(?P<filename_line>[^\n]+)", line_content_stripped)
if file_match:
# Before processing a new file, save the content of the previous one
if current_file_path is not None and in_file_definition: # Check if we were inside a file definition
# Remove leading/trailing blank lines from content lines
content_to_save = "\n".join(current_file_content_lines).strip()
space_info["files"].append({"path": current_file_path, "content": content_to_save})
filename_line = file_match.group("filename_line").strip()
current_file_path = filename_line
# Clean up potential trailing descriptions like "(main application)"
current_file_path = re.split(r'\s*\(', current_file_path, 1)[0].strip()
# Clean up potential backticks around the filename
current_file_path = current_file_path.strip('`\'"').strip() # Add more chars to strip
if not current_file_path:
file_parsing_errors.append(f"Line {line_num}: Found '### File:' but filename is empty or invalid.")
current_file_path = None # Invalidate current file path if parsing failed
in_file_definition = False # Exit file definition mode until a valid one is found
continue # Move to next line
current_file_content_lines = []
in_file_definition = True
in_code_block = False # Reset code block flag for the new file
logger.debug(f"Parsed file header: {current_file_path}")
continue # Move to next line
# If not a file header, check for other top-level structures *before* file definitions start
if not in_file_definition:
if line_content_stripped.startswith("# Space:"):
full_space_name_md = line_content_stripped.replace("# Space:", "").strip()
if "/" in full_space_name_md:
parts = full_space_name_md.split("/", 1)
if len(parts) == 2:
space_info["owner_md"], space_info["repo_name_md"] = parts[0].strip(), parts[1].strip()
else:
space_info["repo_name_md"] = full_space_name_md # Handle case like "user/repo/"
else:
space_info["repo_name_md"] = full_space_name_md
logger.debug(f"Parsed space header: {space_info['owner_md']}/{space_info['repo_name_md']}")
continue
# Ignore ## File Structure headers and their code blocks, as they are not file content
if line_content_stripped.startswith("## File Structure"):
# Need to consume the following code block if it exists
structure_block_start = i + 1
while structure_block_start < len(lines) and not lines[structure_block_start].strip().startswith("```"):
structure_block_start += 1
if structure_block_start < len(lines) and lines[structure_block_start].strip().startswith("```"):
# Found opening ```, look for closing ```
structure_block_end = structure_block_start + 1
while structure_block_end < len(lines) and not lines[structure_block_end].strip().startswith("```"):
structure_block_end += 1
if structure_block_end < len(lines) and lines[structure_block_end].strip().startswith("```"):
# Found closing ```, skip all these lines
logger.debug(f"Skipping File Structure block from line {i+1} to {structure_block_end+1}")
i = structure_block_end # Adjust loop counter (outer loop will increment)
continue
# Ignore other lines outside a file block definition
continue
# If we are inside a file definition block (in_file_definition is True)
if in_file_definition:
# Check for code block start/end
if line_content_stripped.startswith("```"):
# Toggle code block status
in_code_block = not in_code_block
# We consume the ``` line(s), do not add to content
logger.debug(f"Toggled code block to {in_code_block} at line {line_num}")
continue # Do not add the ``` line to content
# If inside a code block, add the line as-is (original content, including leading/trailing whitespace)
if in_code_block:
current_file_content_lines.append(line_content_orig)
# If not inside a code block, check for binary file marker or error messages
elif line_content_stripped.startswith("[Binary file") or line_content_stripped.startswith("[Error loading content:") or line_content_stripped.startswith("[Binary or Skipped file]"):
# Handle binary file markers or error messages as content if not in code block
current_file_content_lines.append(line_content_orig)
logger.debug(f"Parsed binary/error marker for {current_file_path} at line {line_num}")
# Any other lines outside code blocks within a file definition are ignored (e.g., descriptions, blank lines)
# This assumes all code/content *must* be within ``` blocks or be a specific marker line.
else:
# Optionally log ignored lines within a file block if debugging parsing
# logger.debug(f"Ignoring line {line_num} within file {current_file_path}: '{line_content_orig}'")
pass
# After the loop, save the content of the last file if we were inside a file definition
if current_file_path is not None and in_file_definition:
content_to_save = "\n".join(current_file_content_lines).strip()
space_info["files"].append({"path": current_file_path, "content": content_to_save})
# Ensure all file paths are valid and clean up empty files if necessary (based on content parsing)
# The parsing logic above should handle stripping content, but this is a final check
space_info["files"] = [f for f in space_info["files"] if f.get("path")] # Ensure path exists
# Clean up owner/repo names from potential whitespace
space_info["owner_md"] = space_info["owner_md"].strip()
space_info["repo_name_md"] = space_info["repo_name_md"].strip()
if file_parsing_errors:
logger.warning(f"Markdown parsing encountered errors: {file_parsing_errors}")
# You might want to return the errors or include them in the space_info dict
# For now, we just log them.
logger.info(f"Parsed markdown. Found {len(space_info['files'])} files.")
return space_info
# --- Function to Get Space SDK and Files ---
def get_space_repository_info(ui_api_token_from_textbox, space_name_ui, owner_ui):
repo_id_for_error_logging = f"{owner_ui}/{space_name_ui}" if owner_ui else space_name_ui
sdk = None
files = []
error = None
repo_id = None # Define repo_id here to ensure it's available for error logging after _determine_repo_id
logger.info(f"Attempting to get repo info for {repo_id_for_error_logging}")
try:
resolved_api_token, token_err = _get_api_token(ui_api_token_from_textbox)
if token_err: return None, [], token_err
repo_id, err_repo_id = _determine_repo_id(ui_api_token_from_textbox, space_name_ui, owner_ui)
if err_repo_id: return None, [], err_repo_id
repo_id_for_error_logging = repo_id # Update logging name
api = HfApi(token=resolved_api_token)
# Use repo_info endpoint as it's more robust and gives SDK
repo_info_obj = api.repo_info(repo_id=repo_id, repo_type="space", timeout=20) # Added timeout, increased slightly
sdk = repo_info_obj.sdk
files = [sibling.rfilename for sibling in repo_info_obj.siblings if sibling.rfilename]
if not files and repo_info_obj.siblings:
logger.warning(f"Repo {repo_id} has siblings but no rfilenames extracted. Total siblings: {len(repo_info_obj.siblings)}")
# Sometimes empty repos exist, or listing might fail partially.
# Continue, files list is just empty.
logger.info(f"Successfully got repo info for {repo_id}. SDK: {sdk}, Files found: {len(files)}")
except HfHubHTTPError as e_http: # Catch specific HF HTTP errors first
logger.error(f"HTTP error getting repo info for {repo_id_for_error_logging or 'unknown repo'}: {e_http}")
error_message = str(e_http)
status_code = e_http.response.status_code if e_http.response is not None else None
if status_code == 404:
error = f"Space '{repo_id_for_error_logging or 'unknown repo'}' not found (404)."
elif status_code in (401,403):
error = f"Access denied for '{repo_id_for_error_logging or 'unknown repo'}' ({status_code}). Check token permissions."
else:
error = f"HTTP Error {status_code or 'unknown'} for '{repo_id_for_error_logging or 'unknown repo'}': {error_message}"
except Exception as e: # Catch other general exceptions
# If repo_info failed, try listing files as a fallback
logger.warning(f"Could not get full repo_info for {repo_id_for_error_logging or 'unknown repo'}, attempting list_repo_files fallback: {e}")
error = f"Error retrieving Space info for `{repo_id_for_error_logging or 'unknown repo'}`: {str(e)}. Attempting file list fallback." # Set a warning message
try:
# Re-determine repo_id and get token for fallback
resolved_api_token_fb, token_err_fb = _get_api_token(ui_api_token_from_textbox)
if token_err_fb: return None, [], f"{error}\nAPI Token Error during fallback: {token_err_fb}" # Propagate token error
repo_id_fb, err_repo_id_fb = _determine_repo_id(ui_api_token_from_textbox, space_name_ui, owner_ui)
if err_repo_id_fb: return None, [], f"{error}\nRepo ID Error during fallback: {err_repo_id_fb}" # Propagate repo ID error
# Attempt to list files
files = list_repo_files(repo_id=repo_id_fb, token=resolved_api_token_fb, repo_type="space", timeout=20) # Added timeout
# If fallback is successful, update error message to a warning about repo_info
error = f"Warning: Could not fetch full Space info (SDK etc.) for `{repo_id_for_error_logging or 'unknown repo'}`: {str(e)}. File list loaded via fallback."
logger.info(f"Fallback list_repo_files successful for {repo_id_fb}. Files found: {len(files)}")
except HfHubHTTPError as e2_http:
logger.error(f"HTTP error during fallback list_repo_files for {repo_id_for_error_logging or 'unknown repo'}: {e2_http}")
error_message_fb = str(e2_http)
status_code_fb = e2_http.response.status_code if e2_http.response is not None else None
if status_code_fb == 404:
error = f"Space '{repo_id_for_error_logging or 'unknown repo'}' not found during fallback (404)."
else:
error = f"HTTP Error {status_code_fb or 'unknown'} for '{repo_id_for_error_logging or 'unknown repo'}' during fallback: {error_message_fb}"
files = [] # Ensure files list is empty on fallback error
except Exception as e2:
logger.exception(f"Error listing files for {repo_id_for_error_logging or 'unknown repo'} during fallback: {e2}")
error = f"{error}\nError listing files during fallback for `{repo_id_for_error_logging or 'unknown repo'}`: {str(e2)}"
files = [] # Ensure files list is empty on fallback error
# Final check: if files are still empty and there's no specific error, provide a generic "no files" message
# or if a specific 404 error occurred.
if not files and not error and (repo_id_for_error_logging is not None):
error = f"No files found in Space `{repo_id_for_error_logging or 'unknown repo'}`."
return sdk, files, error
# --- Function to list files ---
def list_space_files_for_browsing(ui_api_token_from_textbox, space_name_ui, owner_ui):
files, err = get_space_repository_info(ui_api_token_from_textbox, space_name_ui, owner_ui)[1:]
return files, err
# --- Function to Fetch File Content from Hub ---
def get_space_file_content(ui_api_token_from_textbox, space_name_ui, owner_ui, file_path_in_repo):
repo_id_for_error_logging = f"{owner_ui}/{space_name_ui}" if owner_ui else space_name_ui
repo_id = None
logger.info(f"Attempting to get content for file '{file_path_in_repo}' from {repo_id_for_error_logging}")
try:
resolved_api_token, token_err = _get_api_token(ui_api_token_from_textbox)
if token_err: return None, token_err
repo_id, err_repo_id = _determine_repo_id(ui_api_token_from_textbox, space_name_ui, owner_ui)
if err_repo_id: return None, err_repo_id
repo_id_for_error_logging = repo_id # Update logging name
if not file_path_in_repo: return None, "Error: File path cannot be empty."
# Ensure file_path_in_repo uses forward slashes
file_path_in_repo = file_path_in_repo.replace("\\", "/")
# Use hf_hub_download first, which caches locally
downloaded_file_path = hf_hub_download(
repo_id=repo_id,
filename=file_path_in_repo,
repo_type="space",
token=resolved_api_token,
local_dir_use_symlinks=False, # Avoid symlinks issues
cache_dir=None, # Use default cache dir
timeout=20 # Added timeout
)
content = Path(downloaded_file_path).read_text(encoding="utf-8")
logger.info(f"Successfully downloaded and read content for '{file_path_in_repo}'.")
return content, None
except FileNotFoundError:
logger.error(f"FileNotFoundError for '{file_path_in_repo}' in {repo_id_for_error_logging or 'unknown'}")
return None, f"Error: File '{file_path_in_repo}' not found in Space '{repo_id_for_error_logging or 'unknown repo'}' (404)." # Often gets translated from 404 by hf_hub_download
except UnicodeDecodeError:
# If read_text fails, it's likely binary or non-utf8 text
logger.warning(f"UnicodeDecodeError for '{file_path_in_repo}'. Likely binary.")
return None, f"Error: File '{file_path_in_repo}' is not valid UTF-8 text. Cannot display."
except HfHubHTTPError as e_http:
logger.error(f"HTTP error fetching file {file_path_in_repo} from {repo_id_for_error_logging or 'unknown repo'}: {e_http}")
error_message = str(e_http)
status_code = e_http.response.status_code if e_http.response is not None else None
if status_code == 404:
return None, f"Error: File '{file_path_in_repo}' not found in Space '{repo_id_for_error_logging or 'unknown repo'}' (404)."
if status_code in (401, 403):
return None, f"Error: Access denied or authentication required for '{repo_id_for_error_logging or 'unknown repo'}' ({status_code}). Check token permissions."
return None, f"HTTP Error {status_code or 'unknown'} fetching file '{file_path_in_repo}': {error_message}"
except Exception as e:
logger.exception(f"Error fetching file content for {file_path_in_repo} from {repo_id_for_error_logging or 'unknown repo'}:")
return None, f"Error fetching file content: {str(e)}"
# --- Create/Update Space from Staged Changes ---
# This function is modified to take a list of operations (changeset) instead of markdown
# It's designed to be called by handle_confirm_changes
def apply_staged_changes(ui_api_token_from_textbox, owner_ui, space_name_ui, changeset):
repo_id_for_error_logging = f"{owner_ui}/{space_name_ui}" if owner_ui else space_name_ui
repo_id = None
status_messages = []
operations = [] # List of CommitOperation objects
logger.info(f"Attempting to apply {len(changeset)} staged changes to {repo_id_for_error_logging}")
try:
resolved_api_token, token_err = _get_api_token(ui_api_token_from_textbox)
if token_err: return [token_err]
repo_id, err_repo_id = _determine_repo_id(ui_api_token_from_textbox, owner_ui, space_name_ui)
if err_repo_id: return [err_repo_id]
repo_id_for_error_logging = repo_id
api = HfApi(token=resolved_api_token)
# First, handle Space Creation if it's part of the changeset
create_space_op = next((c for c in changeset if c['type'] == 'CREATE_SPACE'), None)
if create_space_op:
# The repo_id here should match the one determined above, derived from owner_ui/space_name_ui
# We assume the UI fields owner_ui and space_name_ui are set to the *target* space for creation
# This is a slight divergence from the AI's CREATE_SPACE command which specifies repo_id
# We'll prioritize the UI fields as the user's explicit target.
# The AI command should ideally set the UI fields first.
# TODO: Refine AI CREATE_SPACE to update UI fields first? Or make build_logic_create_space use the AI's repo_id?
# Let's adjust `create_space` to take the repo_id directly from the AI command, but validate/use UI token/owner if needed.
# But the *current* structure assumes owner_ui/space_name_ui are the target.
# Let's stick to the current structure for now: CREATE_SPACE action is noted, but the target repo is from UI fields.
# The `create_space` call below *will* create the repo specified by UI fields.
# The file operations will then be applied to this new repo.
logger.info(f"Detected CREATE_SPACE action for {create_space_op['repo_id']}. Proceeding with creation of {repo_id_for_error_logging} based on UI fields.")
# Call the existing create_space logic, but perhaps without markdown?
# The subsequent file operations will populate it.
# We need a way to create an *empty* repo first. HfApi().create_repo handles this.
try:
api.create_repo(repo_id=repo_id, repo_type="space", space_sdk=create_space_op.get('sdk', 'gradio'), private=create_space_op.get('private', False), exist_ok=True)
status_messages.append(f"CREATE_SPACE: Successfully created or ensured space [{repo_id}](https://huggingface.co/spaces/{repo_id}) exists with SDK '{create_space_op.get('sdk', 'gradio')}' and private={create_space_op.get('private', False)}.")
logger.info(f"Successfully created or ensured space {repo_id} exists.")
except Exception as e:
status_messages.append(f"CREATE_SPACE Error: {e}")
logger.error(f"Error creating space {repo_id}: {e}")
# If space creation fails, subsequent operations will also fail.
# Should we stop here? Let's add a check.
# We could try to proceed with file operations if `exist_ok=True` succeeded partially.
# For simplicity, let's just report the error and continue, hoping upload_folder is resilient.
# A better approach might stop if create_repo fails definitively (e.g., 401/403/409).
pass # Continue attempting other operations
# Prepare commit operations for file changes (Add/Update/Delete)
temp_dir = None
operations = [] # Reset operations list for the file commit
paths_to_upload = {} # Map of local temp path -> path in repo for Add/Update
try:
temp_dir = tempfile.TemporaryDirectory()
repo_staging_path = Path(temp_dir.name) / "repo_staging_content"
repo_staging_path.mkdir(exist_ok=True)
# Always stage .gitattributes to ensure consistent line endings
gitattributes_path_local = repo_staging_path / ".gitattributes"
with open(gitattributes_path_local, "w", encoding="utf-8") as f:
f.write("* text=auto eol=lf\n")
paths_to_upload[str(gitattributes_path_local)] = ".gitattributes"
for change in changeset:
if change['type'] == 'UPDATE_FILE' or change['type'] == 'CREATE_FILE':
file_path_in_repo = change['path'].lstrip('/').replace(os.sep, '/')
if not file_path_in_repo:
status_messages.append(f"Skipping {change['type']} operation: empty path.")
continue
content_to_write = change.get('content', '')
# Skip files that were marked as binary/error during loading
if content_to_write.startswith("[Binary file") or content_to_write.startswith("[Error loading content:") or content_to_write.startswith("[Binary or Skipped file]"):
status_messages.append(f"Skipping {change['type']} for '{file_path_in_repo}': Content is a binary/error placeholder.")
logger.warning(f"Skipping {change['type']} operation for '{file_path_in_repo}': Content is binary/error placeholder.")
continue
file_path_local = repo_staging_path / file_path_in_repo
file_path_local.parent.mkdir(parents=True, exist_ok=True) # Create parent directories
try:
with open(file_path_local, "w", encoding="utf-8") as f:
f.write(content_to_write)
paths_to_upload[str(file_path_local)] = file_path_in_repo
logger.info(f"Staged file for {change['type']}: {file_path_in_repo}")
except Exception as file_write_error:
status_messages.append(f"Error staging file {file_path_in_repo} for {change['type']}: {file_write_error}")
logger.error(f"Error writing file {file_path_in_repo} during staging for {change['type']}: {file_write_error}")
elif change['type'] == 'DELETE_FILE':
file_path_in_repo = change['path'].lstrip('/').replace(os.sep, '/')
if not file_path_in_repo:
status_messages.append(f"Skipping DELETE_FILE operation: empty path.")
continue
operations.append(CommitOperationDelete(path_in_repo=file_path_in_repo))
logger.info(f"Added DELETE_FILE operation for: {file_path_in_repo}")
# SET_PRIVACY and DELETE_SPACE are handled separately below after the commit
# Add Add/Update operations from staged files
for local_path, repo_path in paths_to_upload.items():
# upload_folder/upload_file is simpler than CommitOperationAdd for content
# Let's use upload_folder approach if there are files to upload
pass # Just stage paths, upload_folder handles the ops
# Perform the combined file commit (uploads and deletes)
if paths_to_upload or operations: # Check if there's anything to commit
logger.info(f"Committing file changes to {repo_id_for_error_logging}. Uploads: {len(paths_to_upload)}, Deletes: {len(operations)}")
# upload_folder automatically handles Add/Update operations based on the local folder state
# Deletes need to be handled via CommitOperations or a separate delete call.
# The simplest approach might be:
# 1. Perform deletes using create_commit with CommitOperationDelete.
# 2. Perform adds/updates using upload_folder.
# This requires two commits if there are both types of operations.
delete_operations = [op for op in operations if isinstance(op, CommitOperationDelete)]
if delete_operations:
try:
commit_message_delete = f"AI Space Builder: Deleted {len(delete_operations)} files."
logger.info(f"Performing delete commit for {repo_id_for_error_logging}: {commit_message_delete}")
api.create_commit(
repo_id=repo_id,
repo_type="space",
operations=delete_operations,
commit_message=commit_message_delete
)
status_messages.append(f"File Deletions: Successfully committed {len(delete_operations)} deletions.")
logger.info("Delete commit successful.")
except HfHubHTTPError as e_http:
status_messages.append(f"File Deletion Error ({e_http.response.status_code if e_http.response else 'N/A'}): {e_http.response.text if e_http.response else str(e_http)}. Check logs.")
logger.error(f"HTTP error during delete commit for {repo_id}: {e_http}")
except Exception as e_delete_commit:
status_messages.append(f"File Deletion Error: {str(e_delete_commit)}. Check logs.")
logger.exception(f"Error during delete commit for {repo_id}:")
if paths_to_upload: # If there are files to upload/update (including .gitattributes)
try:
commit_message_upload = f"AI Space Builder: Updated Space content for {repo_id}"
logger.info(f"Uploading staged files from {str(repo_staging_path)} to {repo_id}...")
upload_folder(
repo_id=repo_id,
folder_path=str(repo_staging_path),
path_in_repo=".", # Upload to the root of the repository
token=resolved_api_token,
repo_type="space",
commit_message=commit_message_upload,
allow_patterns=["*"], # Ensure all staged files are considered
# Use force_patterns to ensure specific files are uploaded even if git thinks they are binary
# force_patterns=[f.get("path").replace("\\", "/") for f in space_info["files"] if f.get("path")] # This requires knowing the original markdown files
# Let's rely on .gitattributes text=auto instead for now.
)
status_messages.append(f"File Uploads/Updates: Successfully uploaded/updated {len(paths_to_upload)} files.")
logger.info("Upload/Update commit successful.")
except HfHubHTTPError as e_http:
status_messages.append(f"File Upload/Update Error ({e_http.response.status_code if e_http.response else 'N/A'}): {e_http.response.text if e_http.response else str(e_http)}. Check logs.")
logger.error(f"HTTP error during upload_folder for {repo_id}: {e_http}")
except Exception as e_upload:
status_messages.append(f"File Upload/Update Error: {str(e_upload)}. Check logs.")
logger.exception(f"Error during upload_folder for {repo_id}:")
else:
status_messages.append("No file changes (create/update/delete) to commit.")
logger.info("No file changes to commit.")
finally:
# Clean up temporary directory
if temp_dir:
try:
temp_dir.cleanup()
logger.info("Cleaned up temporary staging directory.")
except Exception as e:
logger.error(f"Error cleaning up temp dir: {e}")
# Handle Space Privacy and Delete actions *after* the file commit (if any)
for change in changeset:
if change['type'] == 'SET_PRIVACY':
try:
target_repo_id = change.get('repo_id', repo_id) # Use specified repo_id or current
if not target_repo_id:
status_messages.append("SET_PRIVACY Error: Target repo_id not specified.")
continue
api.update_repo_visibility(repo_id=target_repo_id, private=change['private'], repo_type='space')
status_messages.append(f"SET_PRIVACY: Successfully set `{target_repo_id}` to `private={change['private']}`.")
logger.info(f"Successfully set privacy for {target_repo_id} to {change['private']}.")
except HfHubHTTPError as e_http:
status_messages.append(f"SET_PRIVACY Error ({e_http.response.status_code if e_http.response else 'N/A'}): {e_http.response.text if e_http.response else str(e_http)}. Check token/permissions.")
logger.error(f"HTTP error setting privacy for {target_repo_id}: {e_http}")
except Exception as e:
status_messages.append(f"SET_PRIVACY Error: {str(e)}. Check logs.")
logger.exception(f"Error setting privacy for {target_repo_id}:")
elif change['type'] == 'DELETE_SPACE':
# This is destructive and typically requires the exact owner/space_name
delete_owner = change.get('owner') or owner_ui # Use specified owner or current UI owner
delete_space = change.get('space_name') or space_name_ui # Use specified space_name or current UI space name
delete_repo_id = f"{delete_owner}/{delete_space}" if delete_owner and delete_space else repo_id
if not delete_repo_id:
status_messages.append("DELETE_SPACE Error: Target repo_id not specified.")
continue
# Add an extra safeguard: Only delete the *currently loaded* space unless AI specifies otherwise AND it matches the current UI fields?
# Or strictly use the repo_id from the action? Let's strictly use the action's specified repo_id,
# falling back to UI fields only if the action didn't provide them.
# The action format is `DELETE_SPACE` (implies current UI space) or maybe `DELETE_SPACE owner/repo`?
# The prompt defined `DELETE_SPACE` only, implying the current space. Let's enforce that.
# The change object should have owner/space_name populated by generate_and_stage_changes based on current UI.
if delete_repo_id != repo_id:
status_messages.append(f"DELETE_SPACE Error: AI requested deletion of '{delete_repo_id}', but this action is only permitted for the currently loaded space '{repo_id}'. Action blocked.")
logger.warning(f"Blocked DELETE_SPACE action: requested '{delete_repo_id}', current '{repo_id}'.")
continue # Block deletion if not the currently loaded space
logger.warning(f"Attempting DESTRUCTIVE DELETE_SPACE action for {delete_repo_id}")
try:
api.delete_repo(repo_id=delete_repo_id, repo_type='space')
status_messages.append(f"DELETE_SPACE: Successfully deleted space `{delete_repo_id}`.")
logger.info(f"Successfully deleted space {delete_repo_id}.")
except HfHubHTTPError as e_http:
status_messages.append(f"DELETE_SPACE Error ({e_http.response.status_code if e_http.response else 'N/A'}): {e_http.response.text if e_http.response else str(e_http)}. Check token/permissions.")
logger.error(f"HTTP error deleting space {delete_repo_id}: {e_http}")
except Exception as e:
status_messages.append(f"DELETE_SPACE Error: {str(e)}. Check logs.")
logger.exception(f"Error deleting space {delete_repo_id}:")
except HfHubHTTPError as e_http:
logger.error(f"Top-level HTTP error during apply_staged_changes for {repo_id_for_error_logging or 'unknown repo'}: {e_http}")
status_messages.append(f"API HTTP Error ({e_http.response.status_code if e_http.response else 'N/A'}): {e_http.response.text if e_http.response else str(e_http)}")
except Exception as e:
logger.exception(f"Top-level error during apply_staged_changes for {repo_id_for_error_logging or 'unknown repo'}:")
status_messages.append(f"An unexpected error occurred: {str(e)}")
# Format the final status message
final_status = " | ".join(status_messages) if status_messages else "No operations were applied."
logger.info(f"Finished applying staged changes. Final status: {final_status}")
return final_status
# --- Delete Single File (Manual UI Trigger) ---
# This function remains for direct UI file deletion, distinct from the AI-driven workflow
def delete_space_file(ui_api_token_from_textbox, space_name_ui, owner_ui, file_path_in_repo, commit_message_ui=None):
repo_id_for_error_logging = f"{owner_ui}/{space_name_ui}" if owner_ui else space_name_ui
repo_id = None
logger.info(f"Attempting manual file deletion for '{file_path_in_repo}' from {repo_id_for_error_logging}")
try:
resolved_api_token, token_err = _get_api_token(ui_api_token_from_textbox)
if token_err: return f"API Token Error: {token_err}"
repo_id, err_repo_id = _determine_repo_id(ui_api_token_from_textbox, space_name_ui, owner_ui)
if err_repo_id: return f"Repo ID Error: {err_repo_id}"
repo_id_for_error_logging = repo_id # Update logging name
if not file_path_in_repo: return "Error: File path cannot be empty for deletion."
file_path_in_repo = file_path_in_repo.lstrip('/').replace(os.sep, '/') # Clean path for Hub
# Prevent deleting essential files like .gitattributes or README.md unless explicitly handled?
# For now, allow deleting anything selected in the dropdown.
effective_commit_message = commit_message_ui or f"Deleted file: {file_path_in_repo} via AI Space Editor UI"
# Use hf_delete_file directly
hf_delete_file(
path_in_repo=file_path_in_repo,
repo_id=repo_id,
repo_type="space",
token=resolved_api_token,
commit_message=effective_commit_message,
timeout=20 # Added timeout
)
logger.info(f"Successfully deleted file: {file_path_in_repo}")
return f"Successfully deleted file: `{file_path_in_repo}`"
except FileNotFoundError:
logger.error(f"FileNotFoundError during manual delete for '{file_path_in_repo}' in {repo_id_for_error_logging or 'unknown'}")
return f"Error: File '{file_path_in_repo}' not found in Space '{repo_id_for_error_logging or 'unknown repo'}' (404)." # hf_delete_file translates 404
except HfHubHTTPError as e_http: # Catch specific HF HTTP errors
logger.error(f"HTTP error deleting file {file_path_in_repo} from {repo_id_for_error_logging or 'unknown repo'}: {e_http}")
error_message = str(e_http)
status_code = e_http.response.status_code if e_http.response is not None else None
if status_code == 404:
return f"Error: File '{file_path_in_repo}' not found in Space '{repo_id_for_error_logging or 'unknown repo'}' for deletion (404)."
if status_code in (401, 403):
return f"Error: Access denied or authentication required for '{repo_id_for_error_logging or 'unknown repo'}' ({status_code}). Check token permissions."
return f"HTTP Error {status_code or 'unknown'} deleting file '{file_path_in_repo}': {error_message}"
except Exception as e:
logger.exception(f"Error deleting file {file_path_in_repo} from {repo_id_for_error_logging or 'unknown repo'}:")
return f"Error deleting file '{file_path_in_repo}': {str(e)}"
# --- Update Single File (Manual UI Trigger) ---
# This function remains for direct UI file editing, distinct from the AI-driven workflow
def update_space_file(ui_api_token_from_textbox, space_name_ui, owner_ui, file_path_in_repo, file_content, commit_message_ui):
repo_id_for_error_logging = f"{owner_ui}/{space_name_ui}" if owner_ui else space_name_ui
repo_id = None
logger.info(f"Attempting manual file update for '{file_path_in_repo}' in {repo_id_for_error_logging}")
try:
resolved_api_token, token_err = _get_api_token(ui_api_token_from_textbox)
if token_err: return token_err
repo_id, err_repo_id = _determine_repo_id(ui_api_token_from_textbox, space_name_ui, owner_ui)
if err_repo_id: return err_repo_id
repo_id_for_error_logging = repo_id # Update logging name
if not file_path_in_repo: return "Error: File Path to update cannot be empty."
file_path_in_repo = file_path_in_repo.lstrip('/').replace(os.sep, '/') # Clean path for Hub
commit_msg = commit_message_ui or f"Update {file_path_in_repo} via AI Space Editor UI"
api = HfApi(token=resolved_api_token)
# Use a temporary file to upload content safely
tmp_file_path = None
try:
with tempfile.NamedTemporaryFile(mode='w', delete=False, encoding='utf-8') as tmp_file_obj:
tmp_file_obj.write(file_content)
tmp_file_path = tmp_file_obj.name
# Upload the temporary file to the specified path in the repo
api.upload_file(
path_or_fileobj=tmp_file_path,
path_in_repo=file_path_in_repo,
repo_id=repo_id,
repo_type="space",
commit_message=commit_msg,
timeout=20 # Added timeout
)
logger.info(f"Successfully updated file: {file_path_in_repo}")
return f"Successfully updated `{file_path_in_repo}`"
finally:
# Ensure the temporary file is removed
if tmp_file_path and os.path.exists(tmp_file_path):
os.remove(tmp_file_path)
except FileNotFoundError:
logger.error(f"FileNotFoundError during manual update for '{file_path_in_repo}' in {repo_id_for_error_logging or 'unknown'}")
return f"Error: Local temporary file not found during upload for '{file_path_in_repo}'."
except UnicodeDecodeError:
# If read_text fails, it's likely binary or non-utf8 text
logger.warning(f"UnicodeDecodeError for '{file_path_in_repo}' during manual update.")
return f"Error: Content for '{file_path_in_repo}' is not valid UTF-8 text. Cannot edit this way."
except HfHubHTTPError as e_http:
logger.error(f"HTTP error in update_space_file for {repo_id_for_error_logging or 'unknown repo'}, file {file_path_in_repo}: {e_http}")
error_message = str(e_http)
status_code = e_http.response.status_code if e_http.response is not None else None
if status_code == 404:
return f"Error: Space '{repo_id_for_error_logging or 'unknown repo'}' or file '{file_path_in_repo}' not found (404)."
if status_code in (401, 403):
return f"Error: Access denied or authentication required for '{repo_id_for_error_logging or 'unknown repo'}' ({status_code}). Check token permissions."
return f"HTTP Error {status_code or 'unknown'} updating file '{file_path_in_repo}': {error_message}"
except Exception as e:
logger.exception(f"Error in update_space_file for {repo_id_for_error_logging or 'unknown repo'}, file {file_path_in_repo}:")
return f"Error updating file for `{repo_id_for_error_logging or 'unknown repo'}`: {str(e)}"
# --- Get Space Runtime Status ---
def get_space_runtime_status(ui_api_token_from_textbox, space_name_ui, owner_ui):
repo_id_for_error_logging = f"{owner_ui}/{space_name_ui}" if owner_ui else space_name_ui
repo_id = None
logger.info(f"Fetching runtime status for Space: {repo_id_for_error_logging}")
try:
resolved_api_token, token_err = _get_api_token(ui_api_token_from_textbox)
if token_err: return None, f"API Token Error: {token_err}"
repo_id, err_repo_id = _determine_repo_id(ui_api_token_from_textbox, space_name_ui, owner_ui)
if err_repo_id: return None, f"Repo ID Error: {err_repo_id}"
repo_id_for_error_logging = repo_id # Update logging name
api = HfApi(token=resolved_api_token)
# Use get_space_runtime which provides details like stage, hardware, etc.
# Added timeout for robustness
runtime_info = api.get_space_runtime(repo_id=repo_id, timeout=20)
logger.info(f"Received runtime info for {repo_id}. Stage: {runtime_info.stage}")
# Structure the details for display
status_details = {
"stage": runtime_info.stage,
"hardware": runtime_info.hardware,
"requested_hardware": runtime_info.requested_hardware if hasattr(runtime_info, 'requested_hardware') else None, # requested_hardware might not always be present
"error_message": None, # Populate this if stage is ERRORED
"status": runtime_info.status if hasattr(runtime_info, 'status') else None, # Additional status field
"full_log_link": f"https://huggingface.co/spaces/{repo_id}/logs" if repo_id else "#"
# We can add more fields from runtime_info.raw if useful
}
# Check for specific error states or messages
if runtime_info.stage == "ERRORED":
error_content = None
# Look for error details in various places within the raw data or the error attribute
if hasattr(runtime_info, 'error') and runtime_info.error: error_content = str(runtime_info.error)
# Check build/run specific error messages in raw data
if 'build' in runtime_info.raw and isinstance(runtime_info.raw['build'], dict) and runtime_info.raw['build'].get('status') == 'error':
error_content = f"Build Error: {runtime_info.raw['build'].get('message', error_content or 'Unknown build error')}"
elif 'run' in runtime_info.raw and isinstance(runtime_info.raw['run'], dict) and runtime_info.raw['run'].get('status') == 'error':
error_content = f"Runtime Error: {runtime_info.raw['run'].get('message', error_content or 'Unknown runtime error')}"
elif 'message' in runtime_info.raw and isinstance(runtime_info.raw['message'], str) and ('error' in runtime_info.raw['message'].lower() or runtime_info.raw['message'].strip().endswith('!')): # Basic check for message indicative of error
error_content = runtime_info.raw['message'] # Prioritize messages from raw data if they look like errors
status_details["error_message"] = error_content if error_content else "Space is in an errored state. Check logs for details."
logger.info(f"Runtime status details for {repo_id}: {status_details}")
return status_details, None
except HfHubHTTPError as e_http: # Catch specific HF HTTP errors
logger.error(f"HTTP error fetching runtime status for {repo_id_for_error_logging or 'unknown repo'}: {e_http}")
error_message = str(e_http)
status_code = e_http.response.status_code if e_http.response is not None else None
if status_code == 404:
# A 404 could mean the space doesn't exist or doesn't have an active runtime state recorded
return None, f"Error: Space '{repo_id_for_error_logging or 'unknown repo'}' not found or has no active runtime status (404)."
if status_code in (401, 403):
return None, f"Error: Access denied or authentication required for '{repo_id_for_error_logging or 'unknown repo'}' ({status_code}). Check token permissions."
return None, f"HTTP Error {status_code or 'unknown'} fetching runtime status for '{repo_id_for_error_logging or 'unknown repo'}': {error_message}"
except Exception as e:
logger.exception(f"Error fetching runtime status for {repo_id_for_error_logging or 'unknown repo'}:")
return None, f"Error fetching runtime status: {str(e)}"
# --- Function to set space privacy ---
def build_logic_set_space_privacy(hf_api_key, repo_id, private: bool):
"""Sets the privacy of a Hugging Face Space."""
logger.info(f"Attempting to set privacy for '{repo_id}' to {private}.")
try:
token, err = _get_api_token(hf_api_key)
if err or not token:
logger.error(f"Token error setting privacy: {err or 'Token not found'}")
return f"Error getting token: {err or 'Token not found.'}"
api = HfApi(token=token)
api.update_repo_visibility(repo_id=repo_id, private=private, repo_type='space')
logger.info(f"Successfully set privacy for {repo_id} to {private}.")
return f"Successfully set privacy for `{repo_id}` to `{private}`."
except HfHubHTTPError as e_http:
logger.error(f"HTTP error setting privacy for {repo_id}: {e_http}")
status_code = e_http.response.status_code if e_http.response else 'N/A'
return f"HTTP Error ({status_code}) setting privacy for `{repo_id}`: {e_http.response.text if e_http.response else str(e_http)}"
except Exception as e:
logger.exception(f"Error setting privacy for {repo_id}:")
return f"Error setting privacy for `{repo_id}`: {e}"
# --- Function to delete an entire space ---
def build_logic_delete_space(hf_api_key, owner, space_name):
"""Deletes an entire Hugging Face Space."""
repo_id = f"{owner}/{space_name}"
logger.warning(f"Attempting DESTRUCTIVE DELETE_SPACE action for '{repo_id}'.")
try:
token, err = _get_api_token(hf_api_key)
if err or not token:
logger.error(f"Token error deleting space: {err or 'Token not found'}")
return f"Error getting token: {err or 'Token not found.'}"
api = HfApi(token=token)
api.delete_repo(repo_id=repo_id, repo_type='space')
logger.warning(f"Successfully deleted space {repo_id}.")
return f"Successfully deleted space `{repo_id}`."
except HfHubHTTPError as e_http:
logger.error(f"HTTP error deleting space {repo_id}: {e_http}")
status_code = e_http.response.status_code if e_http.response else 'N/A'
return f"HTTP Error ({status_code}) deleting space `{repo_id}`: {e_http.response.text if e_http.response else str(e_http)}"
except Exception as e:
logger.exception(f"Error deleting space {repo_id}:")
return f"Error deleting space `{repo_id}`: {e}"