File size: 13,099 Bytes
c66bf1b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
import os
import re
import tempfile
import shutil
import git
from huggingface_hub import (
    create_repo,
    upload_folder,
    list_repo_files,
    # delete_file, # Not used in the provided code, can be removed if not planned
    Repository,
    whoami,
)
import logging
from pathlib import Path

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Function to parse markdown input
def parse_markdown(markdown_input):
    """Parse markdown input to extract space details and file structure."""
    # This function remains largely the same as its file parsing logic wasn't flagged as an issue,
    # only its space name/owner extraction's usage.
    space_info = {"repo_name_md": "", "owner_md": "", "files": []} # Renamed keys for clarity
    current_file = None
    file_content = []
    in_file_content = False

    lines = markdown_input.strip().split("\n")
    for line in lines:
        # Extract space name from markdown (for informational purposes, not for repo_id creation)
        if line.startswith("# Space:"):
            full_space_name_md = line.replace("# Space:", "").strip()
            if "/" in full_space_name_md:
                space_info["owner_md"], space_info["repo_name_md"] = full_space_name_md.split("/", 1)
            else:
                space_info["repo_name_md"] = full_space_name_md
        # Detect file structure section
        elif line.startswith("## File Structure"):
            continue
        # Detect file in structure (this part seems less used if "### File:" is primary)
        elif line.startswith("πŸ“„") or line.startswith("πŸ“"):
            if current_file and file_content: # If content was being collected for a previous file (e.g. under "### File:")
                space_info["files"].append({"path": current_file, "content": "\n".join(file_content)})
            elif current_file and not file_content: # Path defined by πŸ“„ but no content under a ### File: block
                 # This case implies an empty file if no ### File: section follows for it.
                 # Or, this line could be ignored if ### File: is the true source.
                 # For now, let's assume πŸ“„ just lists a file, content comes from ### File:
                 pass # current_file is updated, but content collection relies on ### File: or ```

            current_file = line[2:].strip() # Update current file path from πŸ“„ or πŸ“
            file_content = [] # Reset content for this new file context
            in_file_content = False # Content for πŸ“„ files is typically defined by subsequent ### File: blocks
        # Detect file content section
        elif line.startswith("### File:"):
            if current_file and file_content: # Save content of the previous file
                space_info["files"].append({"path": current_file, "content": "\n".join(file_content)})
            
            current_file = line.replace("### File:", "").strip()
            file_content = [] # Reset for new file
            in_file_content = True # Start collecting content lines
        # Handle file content (inside ``` blocks or plain text lines)
        elif in_file_content and line.strip().startswith("```"): # Code block fences
            if file_content and line.strip() == "```": # Closing fence ```
                # Current file_content is the content of the code block
                space_info["files"].append({"path": current_file, "content": "\n".join(file_content)})
                file_content = []
                in_file_content = False # End of this file's content block
                current_file = None # Reset current file, expect new ### File: or πŸ“„
            elif not file_content and line.strip().startswith("```"): # Opening fence ``` or ```lang
                # This line itself is the start of the content if it's like ```python
                file_content.append(line) 
                # in_file_content remains true
            else: # Mid-block line that happens to be ```, treat as content
                 file_content.append(line)
        elif in_file_content:
            file_content.append(line)

    # Append the last file's content if any
    if current_file and file_content:
        space_info["files"].append({"path": current_file, "content": "\n".join(file_content)})
    
    # Filter out any entries that might have been added with no path (shouldn't happen with current logic)
    space_info["files"] = [f for f in space_info["files"] if f.get("path")]
    return space_info

def _determine_repo_id(api_token, space_name_ui, owner_ui):
    """
    Determines the final owner and constructs the repo_id.
    space_name_ui should be just the name, not 'owner/name'.
    owner_ui is the value from the UI's owner field.
    Returns (repo_id, error_message)
    """
    if not space_name_ui:
        return None, "Error: Space Name cannot be empty."
    if "/" in space_name_ui:
        return None, "Error: Space Name should not contain '/'. Please use the Owner field for the namespace."

    final_owner = owner_ui
    error_message = None

    if not final_owner: # If UI owner field is empty
        if not api_token:
            return None, "Error: API token is required to automatically determine owner when Owner field is empty."
        try:
            user_info = whoami(token=api_token)
            if user_info and 'name' in user_info:
                final_owner = user_info['name']
            else:
                logger.error(f"whoami(token=...) returned: {user_info} when trying to determine owner.")
                error_message = "Error: Could not retrieve username from API token. Ensure token is valid and has 'Read profile' permissions. Or, specify Owner manually."
        except Exception as e:
            logger.error(f"Error calling whoami for owner: {str(e)}")
            error_message = f"Error retrieving username from API token: {str(e)}. Please specify Owner manually."
        
        if error_message:
            return None, error_message

    if not final_owner: 
        return None, "Error: Owner could not be determined. Please provide an owner or ensure your API token is valid."

    return f"{final_owner}/{space_name_ui}", None

# Function to create and populate a Space
def create_space(api_token, space_name_ui, owner_ui, sdk_ui, markdown_input):
    """Create a Hugging Face Space and populate it with files from markdown input."""
    try:
        if not api_token:
            return "Error: Please provide a valid Hugging Face API token."

        repo_id, err = _determine_repo_id(api_token, space_name_ui, owner_ui)
        if err:
            return err
        
        # Parse markdown input for file structure
        # The space_info["repo_name_md"] and space_info["owner_md"] are NOT used for repo_id creation.
        space_info = parse_markdown(markdown_input)
        if not space_info["files"]:
            return "Error: No files found in the markdown input. Ensure '### File: path/to/file.ext' markers are used."

        # Create temporary directory
        with tempfile.TemporaryDirectory() as temp_dir:
            # Use a generic name for the local repo dir, not space_name_ui, to avoid issues if space_name_ui has special chars
            # Or, ensure space_name_ui is filesystem-safe. For simplicity, let's use a fixed name.
            repo_local_path = os.path.join(temp_dir, "repo") 
            os.makedirs(repo_local_path, exist_ok=True)

            # Write files to temporary directory
            for file_info in space_info["files"]:
                if not file_info.get("path"):
                    logger.warning(f"Skipping file with no path: {file_info}")
                    continue
                file_path_abs = Path(repo_local_path) / file_info["path"]
                file_path_abs.parent.mkdir(parents=True, exist_ok=True)
                with open(file_path_abs, "w", encoding="utf-8") as f:
                    f.write(file_info["content"])
                logger.info(f"Wrote file: {file_path_abs}")

            # Create repository on Hugging Face
            try:
                create_repo(
                    repo_id=repo_id,
                    token=api_token,
                    repo_type="space",
                    space_sdk=sdk_ui,
                    private=False, # Or make this an option
                )
                logger.info(f"Created Space: {repo_id}")
            except Exception as e:
                if "already exists" in str(e).lower() or "you already created this repo" in str(e).lower() :
                    logger.info(f"Space {repo_id} already exists, proceeding to update/upload.")
                else:
                    return f"Error creating Space '{repo_id}': {str(e)}"

            # Initialize Git repository (optional but good practice before upload_folder if it relies on it)
            # repo_git = git.Repo.init(repo_local_path)
            # repo_git.git.add(all=True)
            # try:
            #     repo_git.index.commit("Initial commit from Space Builder")
            # except git.exc.GitCommandError as e:
            #     if "nothing to commit" in str(e):
            #         logger.info("No changes to commit locally.")
            #     else:
            #         raise e
            
            # Push to Hugging Face Space
            # upload_folder handles its own commit if the folder is not already a git repo or has uncommitted changes.
            upload_folder(
                repo_id=repo_id,
                folder_path=repo_local_path, # path to the directory containing files
                path_in_repo=".",      # path where files should be uploaded within the repo
                token=api_token,
                commit_message=f"Initial Space setup of {repo_id}",
            )

            return f"Successfully created/updated Space: [{repo_id}](https://huggingface.co/spaces/{repo_id})"

    except Exception as e:
        logger.error(f"Error in create_space: {str(e)}")
        return f"Error: {str(e)}"

# Function to view Space files
def view_space_files(api_token, space_name_ui, owner_ui):
    """List files in a Hugging Face Space."""
    try:
        if not api_token:
            return "Error: Please provide a valid Hugging Face API token."
        
        repo_id, err = _determine_repo_id(api_token, space_name_ui, owner_ui)
        if err:
            return err
            
        files = list_repo_files(repo_id=repo_id, token=api_token, repo_type="space")
        if files:
            return f"Files in `{repo_id}`:\n\n" + "\n".join([f"- `{f}`" for f in files])
        else:
            return f"No files found in the Space `{repo_id}`."
    except Exception as e:
        logger.error(f"Error in view_space_files: {str(e)}")
        return f"Error listing files for `{repo_id or (owner_ui + '/' + space_name_ui if owner_ui else space_name_ui)}`: {str(e)}"

# Function to update a Space file
def update_space_file(api_token, space_name_ui, owner_ui, file_path_in_repo, file_content, commit_message_ui):
    """Update a file in a Hugging Face Space with a commit."""
    try:
        if not api_token:
            return "Error: Please provide a valid Hugging Face API token."

        repo_id, err = _determine_repo_id(api_token, space_name_ui, owner_ui)
        if err:
            return err
        
        if not file_path_in_repo:
            return "Error: File Path cannot be empty."
        if not commit_message_ui:
            commit_message_ui = f"Update {file_path_in_repo} via Space Builder"

        # Create temporary directory for cloning
        with tempfile.TemporaryDirectory() as temp_dir:
            repo_local_clone_path = Path(temp_dir) / "cloned_repo" # Use a fixed name for cloned repo dir
            
            # Clone the specific space
            cloned_repo = Repository(
                local_dir=repo_local_clone_path,
                clone_from=f"https://huggingface.co/spaces/{repo_id}",
                repo_type="space",
                use_auth_token=api_token,
                git_user="Space Builder Bot", # Optional: Git committer info
                git_email="[email protected]" # Optional
            )

            # Write updated file
            full_local_file_path = cloned_repo.local_dir / file_path_in_repo
            full_local_file_path.parent.mkdir(parents=True, exist_ok=True)
            with open(full_local_file_path, "w", encoding="utf-8") as f:
                f.write(file_content)

            # Commit and push changes
            # The Repository object automatically handles staging changes from the local_dir.
            cloned_repo.push_to_hub(commit_message=commit_message_ui)
            
            return f"Successfully updated `{file_path_in_repo}` in Space [{repo_id}](https://huggingface.co/spaces/{repo_id})"

    except Exception as e:
        logger.error(f"Error in update_space_file: {str(e)}")
        return f"Error updating file for `{repo_id or (owner_ui + '/' + space_name_ui if owner_ui else space_name_ui)}`: {str(e)}"