Spaces:
Sleeping
Sleeping
File size: 13,099 Bytes
c66bf1b |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 |
import os
import re
import tempfile
import shutil
import git
from huggingface_hub import (
create_repo,
upload_folder,
list_repo_files,
# delete_file, # Not used in the provided code, can be removed if not planned
Repository,
whoami,
)
import logging
from pathlib import Path
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Function to parse markdown input
def parse_markdown(markdown_input):
"""Parse markdown input to extract space details and file structure."""
# This function remains largely the same as its file parsing logic wasn't flagged as an issue,
# only its space name/owner extraction's usage.
space_info = {"repo_name_md": "", "owner_md": "", "files": []} # Renamed keys for clarity
current_file = None
file_content = []
in_file_content = False
lines = markdown_input.strip().split("\n")
for line in lines:
# Extract space name from markdown (for informational purposes, not for repo_id creation)
if line.startswith("# Space:"):
full_space_name_md = line.replace("# Space:", "").strip()
if "/" in full_space_name_md:
space_info["owner_md"], space_info["repo_name_md"] = full_space_name_md.split("/", 1)
else:
space_info["repo_name_md"] = full_space_name_md
# Detect file structure section
elif line.startswith("## File Structure"):
continue
# Detect file in structure (this part seems less used if "### File:" is primary)
elif line.startswith("π") or line.startswith("π"):
if current_file and file_content: # If content was being collected for a previous file (e.g. under "### File:")
space_info["files"].append({"path": current_file, "content": "\n".join(file_content)})
elif current_file and not file_content: # Path defined by π but no content under a ### File: block
# This case implies an empty file if no ### File: section follows for it.
# Or, this line could be ignored if ### File: is the true source.
# For now, let's assume π just lists a file, content comes from ### File:
pass # current_file is updated, but content collection relies on ### File: or ```
current_file = line[2:].strip() # Update current file path from π or π
file_content = [] # Reset content for this new file context
in_file_content = False # Content for π files is typically defined by subsequent ### File: blocks
# Detect file content section
elif line.startswith("### File:"):
if current_file and file_content: # Save content of the previous file
space_info["files"].append({"path": current_file, "content": "\n".join(file_content)})
current_file = line.replace("### File:", "").strip()
file_content = [] # Reset for new file
in_file_content = True # Start collecting content lines
# Handle file content (inside ``` blocks or plain text lines)
elif in_file_content and line.strip().startswith("```"): # Code block fences
if file_content and line.strip() == "```": # Closing fence ```
# Current file_content is the content of the code block
space_info["files"].append({"path": current_file, "content": "\n".join(file_content)})
file_content = []
in_file_content = False # End of this file's content block
current_file = None # Reset current file, expect new ### File: or π
elif not file_content and line.strip().startswith("```"): # Opening fence ``` or ```lang
# This line itself is the start of the content if it's like ```python
file_content.append(line)
# in_file_content remains true
else: # Mid-block line that happens to be ```, treat as content
file_content.append(line)
elif in_file_content:
file_content.append(line)
# Append the last file's content if any
if current_file and file_content:
space_info["files"].append({"path": current_file, "content": "\n".join(file_content)})
# Filter out any entries that might have been added with no path (shouldn't happen with current logic)
space_info["files"] = [f for f in space_info["files"] if f.get("path")]
return space_info
def _determine_repo_id(api_token, space_name_ui, owner_ui):
"""
Determines the final owner and constructs the repo_id.
space_name_ui should be just the name, not 'owner/name'.
owner_ui is the value from the UI's owner field.
Returns (repo_id, error_message)
"""
if not space_name_ui:
return None, "Error: Space Name cannot be empty."
if "/" in space_name_ui:
return None, "Error: Space Name should not contain '/'. Please use the Owner field for the namespace."
final_owner = owner_ui
error_message = None
if not final_owner: # If UI owner field is empty
if not api_token:
return None, "Error: API token is required to automatically determine owner when Owner field is empty."
try:
user_info = whoami(token=api_token)
if user_info and 'name' in user_info:
final_owner = user_info['name']
else:
logger.error(f"whoami(token=...) returned: {user_info} when trying to determine owner.")
error_message = "Error: Could not retrieve username from API token. Ensure token is valid and has 'Read profile' permissions. Or, specify Owner manually."
except Exception as e:
logger.error(f"Error calling whoami for owner: {str(e)}")
error_message = f"Error retrieving username from API token: {str(e)}. Please specify Owner manually."
if error_message:
return None, error_message
if not final_owner:
return None, "Error: Owner could not be determined. Please provide an owner or ensure your API token is valid."
return f"{final_owner}/{space_name_ui}", None
# Function to create and populate a Space
def create_space(api_token, space_name_ui, owner_ui, sdk_ui, markdown_input):
"""Create a Hugging Face Space and populate it with files from markdown input."""
try:
if not api_token:
return "Error: Please provide a valid Hugging Face API token."
repo_id, err = _determine_repo_id(api_token, space_name_ui, owner_ui)
if err:
return err
# Parse markdown input for file structure
# The space_info["repo_name_md"] and space_info["owner_md"] are NOT used for repo_id creation.
space_info = parse_markdown(markdown_input)
if not space_info["files"]:
return "Error: No files found in the markdown input. Ensure '### File: path/to/file.ext' markers are used."
# Create temporary directory
with tempfile.TemporaryDirectory() as temp_dir:
# Use a generic name for the local repo dir, not space_name_ui, to avoid issues if space_name_ui has special chars
# Or, ensure space_name_ui is filesystem-safe. For simplicity, let's use a fixed name.
repo_local_path = os.path.join(temp_dir, "repo")
os.makedirs(repo_local_path, exist_ok=True)
# Write files to temporary directory
for file_info in space_info["files"]:
if not file_info.get("path"):
logger.warning(f"Skipping file with no path: {file_info}")
continue
file_path_abs = Path(repo_local_path) / file_info["path"]
file_path_abs.parent.mkdir(parents=True, exist_ok=True)
with open(file_path_abs, "w", encoding="utf-8") as f:
f.write(file_info["content"])
logger.info(f"Wrote file: {file_path_abs}")
# Create repository on Hugging Face
try:
create_repo(
repo_id=repo_id,
token=api_token,
repo_type="space",
space_sdk=sdk_ui,
private=False, # Or make this an option
)
logger.info(f"Created Space: {repo_id}")
except Exception as e:
if "already exists" in str(e).lower() or "you already created this repo" in str(e).lower() :
logger.info(f"Space {repo_id} already exists, proceeding to update/upload.")
else:
return f"Error creating Space '{repo_id}': {str(e)}"
# Initialize Git repository (optional but good practice before upload_folder if it relies on it)
# repo_git = git.Repo.init(repo_local_path)
# repo_git.git.add(all=True)
# try:
# repo_git.index.commit("Initial commit from Space Builder")
# except git.exc.GitCommandError as e:
# if "nothing to commit" in str(e):
# logger.info("No changes to commit locally.")
# else:
# raise e
# Push to Hugging Face Space
# upload_folder handles its own commit if the folder is not already a git repo or has uncommitted changes.
upload_folder(
repo_id=repo_id,
folder_path=repo_local_path, # path to the directory containing files
path_in_repo=".", # path where files should be uploaded within the repo
token=api_token,
commit_message=f"Initial Space setup of {repo_id}",
)
return f"Successfully created/updated Space: [{repo_id}](https://huggingface.co/spaces/{repo_id})"
except Exception as e:
logger.error(f"Error in create_space: {str(e)}")
return f"Error: {str(e)}"
# Function to view Space files
def view_space_files(api_token, space_name_ui, owner_ui):
"""List files in a Hugging Face Space."""
try:
if not api_token:
return "Error: Please provide a valid Hugging Face API token."
repo_id, err = _determine_repo_id(api_token, space_name_ui, owner_ui)
if err:
return err
files = list_repo_files(repo_id=repo_id, token=api_token, repo_type="space")
if files:
return f"Files in `{repo_id}`:\n\n" + "\n".join([f"- `{f}`" for f in files])
else:
return f"No files found in the Space `{repo_id}`."
except Exception as e:
logger.error(f"Error in view_space_files: {str(e)}")
return f"Error listing files for `{repo_id or (owner_ui + '/' + space_name_ui if owner_ui else space_name_ui)}`: {str(e)}"
# Function to update a Space file
def update_space_file(api_token, space_name_ui, owner_ui, file_path_in_repo, file_content, commit_message_ui):
"""Update a file in a Hugging Face Space with a commit."""
try:
if not api_token:
return "Error: Please provide a valid Hugging Face API token."
repo_id, err = _determine_repo_id(api_token, space_name_ui, owner_ui)
if err:
return err
if not file_path_in_repo:
return "Error: File Path cannot be empty."
if not commit_message_ui:
commit_message_ui = f"Update {file_path_in_repo} via Space Builder"
# Create temporary directory for cloning
with tempfile.TemporaryDirectory() as temp_dir:
repo_local_clone_path = Path(temp_dir) / "cloned_repo" # Use a fixed name for cloned repo dir
# Clone the specific space
cloned_repo = Repository(
local_dir=repo_local_clone_path,
clone_from=f"https://huggingface.co/spaces/{repo_id}",
repo_type="space",
use_auth_token=api_token,
git_user="Space Builder Bot", # Optional: Git committer info
git_email="[email protected]" # Optional
)
# Write updated file
full_local_file_path = cloned_repo.local_dir / file_path_in_repo
full_local_file_path.parent.mkdir(parents=True, exist_ok=True)
with open(full_local_file_path, "w", encoding="utf-8") as f:
f.write(file_content)
# Commit and push changes
# The Repository object automatically handles staging changes from the local_dir.
cloned_repo.push_to_hub(commit_message=commit_message_ui)
return f"Successfully updated `{file_path_in_repo}` in Space [{repo_id}](https://huggingface.co/spaces/{repo_id})"
except Exception as e:
logger.error(f"Error in update_space_file: {str(e)}")
return f"Error updating file for `{repo_id or (owner_ui + '/' + space_name_ui if owner_ui else space_name_ui)}`: {str(e)}" |