Spaces:
Running
Running
| import json | |
| import copy | |
| import re | |
| from collections import defaultdict | |
| import secrets | |
| import string | |
| from typing import Dict, Any, TypedDict | |
| from plan_generator_10 import generate_plan,generate_blocks_from_opcodes,all_block_definitions | |
| ################################################################################################################################################################# | |
| #--------------------------------------------------[Security key id generation for the better understanding of keys]--------------------------------------------- | |
| ################################################################################################################################################################# | |
| def generate_secure_token(length=20): | |
| charset = string.ascii_letters + string.digits + "!@#$%^&*()[]{}=+-_~" | |
| return ''.join(secrets.choice(charset) for _ in range(length)) | |
| ################################################################################################################################################################# | |
| #--------------------------------------------------[Processed the two Skelton as input and generate refined skelton json]---------------------------------------- | |
| ################################################################################################################################################################# | |
| def process_scratch_blocks(all_generated_blocks, generated_output_json): | |
| processed_blocks = {} | |
| # Initialize dictionaries to store and reuse generated unique IDs | |
| # This prevents creating multiple unique IDs for the same variable/broadcast across different blocks | |
| variable_id_map = defaultdict(lambda: generate_secure_token(20)) | |
| broadcast_id_map = defaultdict(lambda: generate_secure_token(20)) | |
| for block_id, gen_block_data in generated_output_json.items(): | |
| processed_block = {} | |
| all_gen_block_data = all_generated_blocks.get(block_id, {}) | |
| # Copy and update fields, inputs, next, parent, shadow, topLevel, mutation, and opcode | |
| processed_block["opcode"] = all_gen_block_data.get("op_code", gen_block_data.get("op_code")) | |
| processed_block["inputs"] = {} | |
| processed_block["fields"] = {} | |
| processed_block["shadow"] = all_gen_block_data.get("shadow", gen_block_data.get("shadow")) | |
| processed_block["topLevel"] = all_gen_block_data.get("topLevel", gen_block_data.get("topLevel")) | |
| processed_block["parent"] = all_gen_block_data.get("parent", gen_block_data.get("parent")) | |
| processed_block["next"] = all_gen_block_data.get("next", gen_block_data.get("next")) | |
| if "mutation" in all_gen_block_data: | |
| processed_block["mutation"] = all_gen_block_data["mutation"] | |
| # Process inputs | |
| if "inputs" in all_gen_block_data: | |
| for input_name, input_data in all_gen_block_data["inputs"].items(): | |
| if input_name in ["SUBSTACK", "CONDITION"]: | |
| # These should always be type 2 | |
| if isinstance(input_data, list) and len(input_data) == 2: | |
| processed_block["inputs"][input_name] = [2, input_data[1]] | |
| elif isinstance(input_data, dict) and input_data.get("kind") == "block": | |
| processed_block["inputs"][input_name] = [2, input_data.get("block")] | |
| else: # Fallback for unexpected formats, try to use the original if possible | |
| processed_block["inputs"][input_name] = gen_block_data["inputs"].get(input_name, [2, None]) | |
| elif isinstance(input_data, dict): | |
| if input_data.get("kind") == "value": | |
| # Case 1: Direct value input | |
| processed_block["inputs"][input_name] = [ | |
| 1, | |
| [ | |
| 4, | |
| str(input_data.get("value", "")) | |
| ] | |
| ] | |
| elif input_data.get("kind") == "block": | |
| # Case 3: Nested block input | |
| existing_shadow_value = "" | |
| if input_name in gen_block_data.get("inputs", {}) and \ | |
| isinstance(gen_block_data["inputs"][input_name], list) and \ | |
| len(gen_block_data["inputs"][input_name]) > 2 and \ | |
| isinstance(gen_block_data["inputs"][input_name][2], list) and \ | |
| len(gen_block_data["inputs"][input_name][2]) > 1: | |
| existing_shadow_value = gen_block_data["inputs"][input_name][2][1] | |
| processed_block["inputs"][input_name] = [ | |
| 3, | |
| input_data.get("block", ""), | |
| [ | |
| 10, # Assuming 10 for number/string shadow | |
| existing_shadow_value | |
| ] | |
| ] | |
| elif input_data.get("kind") == "menu": | |
| # Handle menu inputs like in event_broadcast | |
| menu_option = input_data.get("option", "") | |
| # Generate or retrieve a unique ID for the broadcast message | |
| broadcast_id = broadcast_id_map[menu_option] # Use defaultdict for unique IDs | |
| processed_block["inputs"][input_name] = [ | |
| 1, | |
| [ | |
| 11, # This is typically the code for menu dropdowns | |
| menu_option, | |
| broadcast_id | |
| ] | |
| ] | |
| elif isinstance(input_data, list): | |
| # For cases like TOUCHINGOBJECTMENU, where input_data is a list [1, "block_id"] | |
| processed_block["inputs"][input_name] = input_data | |
| # Process fields | |
| if "fields" in all_gen_block_data: | |
| for field_name, field_value in all_gen_block_data["fields"].items(): | |
| if field_name == "VARIABLE" and isinstance(field_value, list) and len(field_value) > 0: | |
| # Generate or retrieve a unique ID for the variable | |
| variable_name = field_value[0] | |
| unique_id = variable_id_map[variable_name] # Use defaultdict for unique IDs | |
| processed_block["fields"][field_name] = [ | |
| variable_name, | |
| unique_id | |
| ] | |
| elif field_name == "STOP_OPTION": | |
| processed_block["fields"][field_name] = [ | |
| field_value[0], | |
| None | |
| ] | |
| elif field_name == "TOUCHINGOBJECTMENU": | |
| referenced_menu_block_id = all_gen_block_data["inputs"].get("TOUCHINGOBJECTMENU", [None, None])[1] | |
| if referenced_menu_block_id and referenced_menu_block_id in all_generated_blocks: | |
| menu_block = all_generated_blocks[referenced_menu_block_id] | |
| menu_value = menu_block.get("fields", {}).get("TOUCHINGOBJECTMENU", ["", None])[0] | |
| processed_block["fields"][field_name] = [menu_value, None] | |
| else: | |
| processed_block["fields"][field_name] = [field_value[0], None] | |
| else: | |
| processed_block["fields"][field_name] = field_value | |
| # Remove unwanted keys from the processed block | |
| keys_to_remove = ["functionality", "block_shape", "id", "block_name", "block_type"] | |
| for key in keys_to_remove: | |
| if key in processed_block: | |
| del processed_block[key] | |
| processed_blocks[block_id] = processed_block | |
| return processed_blocks | |
| ################################################################################################################################################################# | |
| #--------------------------------------------------[Unique secret key for skelton json to make sure it donot overwrite each other]------------------------------- | |
| ################################################################################################################################################################# | |
| def rename_blocks(block_json: dict, opcode_count: dict) -> tuple[dict, dict]: | |
| """ | |
| Replace each block key in block_json and each identifier in opcode_count | |
| with a newly generated secure token. | |
| Args: | |
| block_json: Mapping of block_key -> block_data. | |
| opcode_count: Mapping of opcode -> list of block_keys. | |
| Returns: | |
| A tuple of (new_block_json, new_opcode_count) with updated keys. | |
| """ | |
| # Step 1: Generate a secure token mapping for every existing block key | |
| token_map = {} | |
| for old_key in block_json.keys(): | |
| # Ensure uniqueness in the unlikely event of a collision | |
| while True: | |
| new_key = generate_secure_token() | |
| if new_key not in token_map.values(): | |
| break | |
| token_map[old_key] = new_key | |
| # Step 2: Rebuild block_json with new keys | |
| new_block_json = {} | |
| for old_key, block in block_json.items(): | |
| new_key = token_map[old_key] | |
| new_block_json[new_key] = block.copy() | |
| # Update parent and next references | |
| if 'parent' in block and block['parent'] in token_map: | |
| new_block_json[new_key]['parent'] = token_map[block['parent']] | |
| if 'next' in block and block['next'] in token_map: | |
| new_block_json[new_key]['next'] = token_map[block['next']] | |
| # Update inputs if they reference blocks | |
| for inp_key, inp_val in block.get('inputs', {}).items(): | |
| if isinstance(inp_val, list) and len(inp_val) == 2: | |
| idx, ref = inp_val | |
| if idx in (2, 3) and isinstance(ref, str) and ref in token_map: | |
| new_block_json[new_key]['inputs'][inp_key] = [idx, token_map[ref]] | |
| # Step 3: Update opcode count map | |
| new_opcode_count = {} | |
| for opcode, key_list in opcode_count.items(): | |
| new_opcode_count[opcode] = [token_map.get(k, k) for k in key_list] | |
| return new_block_json, new_opcode_count | |
| ################################################################################################################################################################# | |
| #--------------------------------------------------[Helper function to add Variables and Broadcasts [USed in main app file for main projectjson]]---------------- | |
| ################################################################################################################################################################# | |
| def variable_intialization(project_data): | |
| """ | |
| Updates variable and broadcast definitions in a Scratch project JSON, | |
| populating the 'variables' and 'broadcasts' sections of the Stage target | |
| and extracting initial values for variables. | |
| Args: | |
| project_data (dict): The loaded JSON data of the Scratch project. | |
| Returns: | |
| dict: The updated project JSON data. | |
| """ | |
| stage_target = None | |
| for target in project_data['targets']: | |
| if target.get('isStage'): | |
| stage_target = target | |
| break | |
| if stage_target is None: | |
| print("Error: Stage target not found in the project data.") | |
| return project_data | |
| # Ensure 'variables' and 'broadcasts' exist in the Stage target | |
| if "variables" not in stage_target: | |
| stage_target["variables"] = {} | |
| if "broadcasts" not in stage_target: | |
| stage_target["broadcasts"] = {} | |
| # Helper function to recursively find and update variable/broadcast fields | |
| def process_dict(obj): | |
| if isinstance(obj, dict): | |
| # Check for "data_setvariableto" opcode to extract initial values | |
| if obj.get("opcode") == "data_setvariableto": | |
| variable_field = obj.get("fields", {}).get("VARIABLE") | |
| value_input = obj.get("inputs", {}).get("VALUE") | |
| if variable_field and isinstance(variable_field, list) and len(variable_field) == 2: | |
| var_name = variable_field[0] | |
| var_id = variable_field[1] | |
| initial_value = "" | |
| if value_input and isinstance(value_input, list) and len(value_input) > 1 and \ | |
| isinstance(value_input[1], list) and len(value_input[1]) > 1: | |
| # Extract value from various formats, e.g., [1, [10, "0"]] or [3, [12, "score", "id"], [10, "0"]] | |
| if value_input[1][0] == 10: # Direct value like [10, "0"] | |
| initial_value = str(value_input[1][1]) | |
| elif value_input[1][0] == 12 and len(value_input) > 2 and isinstance(value_input[2], list) and value_input[2][0] == 10: # Variable reference with initial value block | |
| initial_value = str(value_input[2][1]) | |
| elif isinstance(value_input[1], (str, int, float)): # For direct number/string inputs | |
| initial_value = str(value_input[1]) | |
| # Add/update the variable in the Stage's 'variables' with its initial value | |
| stage_target["variables"][var_id] = [var_name, initial_value] | |
| for key, value in obj.items(): | |
| # Process variable definitions in 'fields' (for blocks that define variables like 'show variable') | |
| if key == "VARIABLE" and isinstance(value, list) and len(value) == 2: | |
| var_name = value[0] | |
| var_id = value[1] | |
| # Only add if not already defined with an initial value from set_variableto | |
| if var_id not in stage_target["variables"]: | |
| stage_target["variables"][var_id] = [var_name, ""] # Default to empty string if no initial value found yet | |
| elif stage_target["variables"][var_id][0] != var_name: # Update name if ID exists but name is different | |
| stage_target["variables"][var_id][0] = var_name | |
| # Process broadcast definitions in 'inputs' (BROADCAST_INPUT) | |
| elif key == "BROADCAST_INPUT" and isinstance(value, list) and len(value) == 2 and \ | |
| isinstance(value[1], list) and len(value[1]) == 3 and value[1][0] == 11: | |
| broadcast_name = value[1][1] | |
| broadcast_id = value[1][2] | |
| # Add/update the broadcast in the Stage's 'broadcasts' | |
| stage_target["broadcasts"][broadcast_id] = broadcast_name | |
| # Process broadcast definitions in 'fields' (BROADCAST_OPTION) | |
| elif key == "BROADCAST_OPTION" and isinstance(value, list) and len(value) == 2: | |
| broadcast_name = value[0] | |
| broadcast_id = value[1] | |
| # Add/update the broadcast in the Stage's 'broadcasts' | |
| stage_target["broadcasts"][broadcast_id] = broadcast_name | |
| # Recursively call for nested dictionaries or lists | |
| process_dict(value) | |
| elif isinstance(obj, list): | |
| for i, item in enumerate(obj): | |
| # Process variable references in 'inputs' (like [12, "score", "id"]) | |
| if isinstance(item, list) and len(item) == 3 and item[0] == 12: | |
| var_name = item[1] | |
| var_id = item[2] | |
| # Only add if not already defined with an initial value from set_variableto | |
| if var_id not in stage_target["variables"]: | |
| stage_target["variables"][var_id] = [var_name, ""] # Default to empty string if no initial value found yet | |
| elif stage_target["variables"][var_id][0] != var_name: # Update name if ID exists but name is different | |
| stage_target["variables"][var_id][0] = var_name | |
| process_dict(item) | |
| # Iterate through all targets to process their blocks | |
| for target in project_data['targets']: | |
| if "blocks" in target: | |
| for block_id, block_data in target["blocks"].items(): | |
| process_dict(block_data) | |
| return project_data | |
| def deduplicate_variables(project_data): | |
| """ | |
| Removes duplicate variable entries in the 'variables' dictionary of the Stage target, | |
| prioritizing entries with non-empty values. | |
| Args: | |
| project_data (dict): The loaded JSON data of the Scratch project. | |
| Returns: | |
| dict: The updated project JSON data with deduplicated variables. | |
| """ | |
| stage_target = None | |
| for target in project_data['targets']: | |
| if target.get('isStage'): | |
| stage_target = target | |
| break | |
| if stage_target is None: | |
| print("Error: Stage target not found in the project data.") | |
| return project_data | |
| if "variables" not in stage_target: | |
| return project_data # No variables to deduplicate | |
| # Use a temporary dictionary to store the preferred variable entry by name | |
| # Format: {variable_name: [variable_id, variable_name, variable_value]} | |
| resolved_variables = {} | |
| for var_id, var_info in stage_target["variables"].items(): | |
| var_name = var_info[0] | |
| var_value = var_info[1] | |
| if var_name not in resolved_variables: | |
| # If the variable name is not yet seen, add it | |
| resolved_variables[var_name] = [var_id, var_name, var_value] | |
| else: | |
| # If the variable name is already seen, decide which one to keep | |
| existing_id, existing_name, existing_value = resolved_variables[var_name] | |
| # Prioritize the entry with a non-empty value | |
| if var_value != "" and existing_value == "": | |
| resolved_variables[var_name] = [var_id, var_name, var_value] | |
| # If both have non-empty values, or both are empty, keep the current one (arbitrary choice, but consistent) | |
| # The current logic will effectively keep the last one encountered that has a value, | |
| # or the very last one if all are empty. | |
| elif var_value != "" and existing_value != "": | |
| # If there are multiple non-empty values for the same variable name | |
| # this keeps the one from the most recent iteration. | |
| # For the given example, this will correctly keep "5". | |
| resolved_variables[var_name] = [var_id, var_name, var_value] | |
| elif var_value == "" and existing_value == "": | |
| # If both are empty, just keep the current one (arbitrary) | |
| resolved_variables[var_name] = [var_id, var_name, var_value] | |
| # Reconstruct the 'variables' dictionary using the resolved entries | |
| new_variables_dict = {} | |
| for var_name, var_data in resolved_variables.items(): | |
| var_id_to_keep = var_data[0] | |
| var_name_to_keep = var_data[1] | |
| var_value_to_keep = var_data[2] | |
| new_variables_dict[var_id_to_keep] = [var_name_to_keep, var_value_to_keep] | |
| stage_target["variables"] = new_variables_dict | |
| return project_data | |
| def variable_adder_main(project_data): | |
| try: | |
| declare_variable_json= variable_intialization(project_data) | |
| except Exception as e: | |
| print(f"Error error in the variable initialization opcodes: {e}") | |
| try: | |
| processed_json= deduplicate_variables(declare_variable_json) | |
| return | |
| except Exception as e: | |
| print(f"Error error in the variable initialization opcodes: {e}") | |
| ################################################################################################################################################################# | |
| #--------------------------------------------------[Helper main function]---------------------------------------------------------------------------------------- | |
| ################################################################################################################################################################# | |
| def block_builder(opcode_count,pseudo_code): | |
| try: | |
| generated_output_json, initial_opcode_occurrences = generate_blocks_from_opcodes(opcode_count, all_block_definitions) | |
| except Exception as e: | |
| print(f"Error generating blocks from opcodes: {e}") | |
| return {} | |
| try: | |
| all_generated_blocks = generate_plan(generated_output_json, initial_opcode_occurrences, pseudo_code) | |
| except Exception as e: | |
| print(f"Error generating plan from blocks: {e}") | |
| return {} | |
| try: | |
| processed_blocks= process_scratch_blocks(all_generated_blocks, generated_output_json) | |
| except Exception as e: | |
| print(f"Error processing Scratch blocks: {e}") | |
| return {} | |
| renamed_blocks, renamed_counts = rename_blocks(processed_blocks, initial_opcode_occurrences) | |
| return renamed_blocks | |
| ################################################################################################################################################################# | |
| #--------------------------------------------------[Example use of the function here]---------------------------------------------------------------------------- | |
| ################################################################################################################################################################# | |
| initial_opcode_counts = [ | |
| { | |
| "opcode": "event_whenflagclicked", | |
| "count": 1 | |
| }, | |
| { | |
| "opcode": "data_setvariableto", | |
| "count": 2 | |
| }, | |
| { | |
| "opcode": "data_showvariable", | |
| "count": 2 | |
| }, | |
| { | |
| "opcode": "event_broadcast", | |
| "count": 1 | |
| } | |
| ] | |
| pseudo_code=""" | |
| when green flag clicked | |
| set [score v] to (0) | |
| set [lives v] to (3) | |
| show variable [score v] | |
| show variable [lives v] | |
| broadcast [Game Start v] | |
| """ | |
| generated_output_json, initial_opcode_occurrences = generate_blocks_from_opcodes(initial_opcode_counts, all_block_definitions) | |
| all_generated_blocks = generate_plan(generated_output_json, initial_opcode_occurrences, pseudo_code) | |
| processed_blocks= process_scratch_blocks(all_generated_blocks, generated_output_json) | |
| print(all_generated_blocks) | |
| print("--------------\n\n") | |
| print(processed_blocks) | |
| print("--------------\n\n") | |
| print(initial_opcode_occurrences) |