Spaces:
Running
Running
| import json | |
| import uuid | |
| import logging | |
| from typing import Dict, Any, List | |
| # Assume GameState is a TypedDict or similar for clarity | |
| # from typing import TypedDict | |
| # class GameState(TypedDict): | |
| # description: str | |
| # project_json: Dict[str, Any] | |
| # action_plan: Dict[str, Any] | |
| # sprite_initial_positions: Dict[str, Any] | |
| # Placeholder for actual GameState in your application | |
| GameState = Dict[str, Any] | |
| logger = logging.getLogger(__name__) | |
| # --- Mock Agent for demonstration --- | |
| class MockAgent: | |
| def invoke(self, payload: Dict[str, Any]) -> Dict[str, Any]: | |
| """ | |
| Mocks an LLM agent invocation. In a real scenario, this would call your | |
| actual LLM API (e.g., through LangChain, LlamaIndex, etc.). | |
| """ | |
| user_message = payload["messages"][-1]["content"] | |
| print(f"\n--- Mock Agent Received Prompt (partial) ---\n{user_message[:500]}...\n------------------------------------------") | |
| # Simplified mock responses for demonstration purposes | |
| # In a real scenario, the LLM would generate actual Scratch block JSON | |
| if "Propose a high-level action flow" in user_message: | |
| return { | |
| "messages": [{ | |
| "content": json.dumps({ | |
| "action_overall_flow": { | |
| "Sprite1": { | |
| "description": "Basic movement and interaction", | |
| "plans": [ | |
| { | |
| "event": "when flag clicked", | |
| "logic": "forever loop: move 10 steps, if touching Edge then turn 15 degrees" | |
| }, | |
| { | |
| "event": "when space key pressed", | |
| "logic": "say Hello! for 2 seconds" | |
| } | |
| ] | |
| }, | |
| "Ball": { | |
| "description": "Simple bouncing behavior", | |
| "plans": [ | |
| { | |
| "event": "when flag clicked", | |
| "logic": "move 5 steps, if on edge bounce" | |
| } | |
| ] | |
| } | |
| } | |
| }) | |
| }] | |
| } | |
| elif "You are an AI assistant generating Scratch 3.0 block JSON" in user_message: | |
| # This mock response is highly simplified. A real LLM would generate | |
| # valid Scratch blocks based on the provided relevant catalog and plan. | |
| # We're just demonstrating the *mechanism* of filtering the catalog. | |
| if "Sprite1" in user_message: | |
| return { | |
| "messages": [{ | |
| "content": json.dumps({ | |
| f"block_id_{generate_block_id()}": { | |
| "opcode": "event_whenflagclicked", | |
| "next": f"block_id_{generate_block_id()}_forever", | |
| "parent": None, | |
| "inputs": {}, "fields": {}, "shadow": False, "topLevel": True, "x": 100, "y": 100 | |
| }, | |
| f"block_id_{generate_block_id()}_forever": { | |
| "opcode": "control_forever", | |
| "next": None, | |
| "parent": f"block_id_{generate_block_id()}", | |
| "inputs": { | |
| "SUBSTACK": [2, f"block_id_{generate_block_id()}_move"] | |
| }, "fields": {}, "shadow": False, "topLevel": False | |
| }, | |
| f"block_id_{generate_block_id()}_move": { | |
| "opcode": "motion_movesteps", | |
| "next": f"block_id_{generate_block_id()}_if", | |
| "parent": f"block_id_{generate_block_id()}_forever", | |
| "inputs": { | |
| "STEPS": [1, [4, "10"]] | |
| }, "fields": {}, "shadow": False, "topLevel": False | |
| }, | |
| f"block_id_{generate_block_id()}_if": { | |
| "opcode": "control_if", | |
| "next": None, | |
| "parent": f"block_id_{generate_block_id()}_forever", | |
| "inputs": { | |
| "CONDITION": [2, f"block_id_{generate_block_id()}_touching"], | |
| "SUBSTACK": [2, f"block_id_{generate_block_id()}_turn"] | |
| }, "fields": {}, "shadow": False, "topLevel": False | |
| }, | |
| f"block_id_{generate_block_id()}_touching": { | |
| "opcode": "sensing_touchingobject", | |
| "next": None, | |
| "parent": f"block_id_{generate_block_id()}_if", | |
| "inputs": { | |
| "TOUCHINGOBJECTMENU": [1, f"block_id_{generate_block_id()}_touching_menu"] | |
| }, "fields": {}, "shadow": False, "topLevel": False | |
| }, | |
| f"block_id_{generate_block_id()}_touching_menu": { | |
| "opcode": "sensing_touchingobjectmenu", | |
| "next": None, | |
| "parent": f"block_id_{generate_block_id()}_touching", | |
| "inputs": {}, | |
| "fields": {"TOUCHINGOBJECTMENU": ["_edge_", None]}, | |
| "shadow": True, "topLevel": False | |
| }, | |
| f"block_id_{generate_block_id()}_turn": { | |
| "opcode": "motion_turnright", | |
| "next": None, | |
| "parent": f"block_id_{generate_block_id()}_if", | |
| "inputs": { | |
| "DEGREES": [1, [4, "15"]] | |
| }, "fields": {}, "shadow": False, "topLevel": False | |
| }, | |
| f"block_id_{generate_block_id()}_say": { | |
| "opcode": "looks_sayforsecs", | |
| "next": None, | |
| "parent": None, # This block would typically be part of a separate script | |
| "inputs": { | |
| "MESSAGE": [1, [10, "Hello!"]], | |
| "SECS": [1, [4, "2"]] | |
| }, "fields": {}, "shadow": False, "topLevel": True, "x": 300, "y": 100 | |
| } | |
| }) | |
| }] | |
| } | |
| elif "Ball" in user_message: | |
| return { | |
| "messages": [{ | |
| "content": json.dumps({ | |
| f"block_id_{generate_block_id()}": { | |
| "opcode": "event_whenflagclicked", | |
| "next": f"block_id_{generate_block_id()}_moveball", | |
| "parent": None, | |
| "inputs": {}, "fields": {}, "shadow": False, "topLevel": True, "x": 100, "y": 100 | |
| }, | |
| f"block_id_{generate_block_id()}_moveball": { | |
| "opcode": "motion_movesteps", | |
| "next": f"block_id_{generate_block_id()}_edgebounce", | |
| "parent": f"block_id_{generate_block_id()}", | |
| "inputs": { | |
| "STEPS": [1, [4, "5"]] | |
| }, "fields": {}, "shadow": False, "topLevel": False | |
| }, | |
| f"block_id_{generate_block_id()}_edgebounce": { | |
| "opcode": "motion_ifonedgebounce", | |
| "next": None, | |
| "parent": f"block_id_{generate_block_id()}_moveball", | |
| "inputs": {}, "fields": {}, "shadow": False, "topLevel": False | |
| } | |
| }) | |
| }] | |
| } | |
| return {"messages": [{"content": "[]"}]} # Default empty response | |
| agent = MockAgent() | |
| # Helper function to generate a unique block ID | |
| def generate_block_id(): | |
| return str(uuid.uuid4())[:10].replace('-', '') # Shorten for readability, ensure uniqueness | |
| # Placeholder for your extract_json_from_llm_response function | |
| def extract_json_from_llm_response(response_string): | |
| try: | |
| # Assuming the LLM response is ONLY the JSON string within triple backticks | |
| json_match = response_string.strip().replace("```json", "").replace("```", "").strip() | |
| return json.loads(json_match) | |
| except json.JSONDecodeError as e: | |
| logger.error(f"Failed to decode JSON from LLM response: {e}") | |
| logger.error(f"Raw response: {response_string}") | |
| raise ValueError("Invalid JSON response from LLM") | |
| # --- GLOBAL CATALOG OF ALL SCRATCH BLOCKS --- | |
| # This is where you would load your block_content.json | |
| # For demonstration, I'm using your provided snippets and adding some common ones. | |
| # In a real application, you'd load this once at startup. | |
| ALL_SCRATCH_BLOCKS_CATALOG = { | |
| "motion_movesteps": { | |
| "opcode": "motion_movesteps", "next": None, "parent": None, | |
| "inputs": {"STEPS": [1, [4, "10"]]}, "fields": {}, "shadow": False, "topLevel": True, "x": 464, "y": -416 | |
| }, | |
| "motion_turnright": { | |
| "opcode": "motion_turnright", "next": None, "parent": None, | |
| "inputs": {"DEGREES": [1, [4, "15"]]}, "fields": {}, "shadow": False, "topLevel": True, "x": 467, "y": -316 | |
| }, | |
| "motion_ifonedgebounce": { | |
| "opcode": "motion_ifonedgebounce", "next": None, "parent": None, | |
| "inputs": {}, "fields": {}, "shadow": False, "topLevel": True, "x": 467, "y": -316 | |
| }, | |
| "event_whenflagclicked": { | |
| "opcode": "event_whenflagclicked", "next": None, "parent": None, | |
| "inputs": {}, "fields": {}, "shadow": False, "topLevel": True, "x": 10, "y": 10 | |
| }, | |
| "event_whenkeypressed": { | |
| "opcode": "event_whenkeypressed", "next": None, "parent": None, | |
| "inputs": {}, "fields": {"KEY_OPTION": ["space", None]}, "shadow": False, "topLevel": True, "x": 10, "y": 10 | |
| }, | |
| "control_forever": { | |
| "opcode": "control_forever", "next": None, "parent": None, | |
| "inputs": {"SUBSTACK": [2, "some_id"]}, "fields": {}, "shadow": False, "topLevel": True, "x": 10, "y": 10 | |
| }, | |
| "control_if": { | |
| "opcode": "control_if", "next": None, "parent": None, | |
| "inputs": {"CONDITION": [2, "some_id"], "SUBSTACK": [2, "some_id_2"]}, "fields": {}, "shadow": False, "topLevel": True, "x": 10, "y": 10 | |
| }, | |
| "looks_sayforsecs": { | |
| "opcode": "looks_sayforsecs", "next": None, "parent": None, | |
| "inputs": {"MESSAGE": [1, [10, "Hello!"]], "SECS": [1, [4, "2"]]}, "fields": {}, "shadow": False, "topLevel": True, "x": 10, "y": 10 | |
| }, | |
| "looks_say": { | |
| "opcode": "looks_say", "next": None, "parent": None, | |
| "inputs": {"MESSAGE": [1, [10, "Hello!"]]}, "fields": {}, "shadow": False, "topLevel": True, "x": 10, "y": 10 | |
| }, | |
| "sensing_touchingobject": { | |
| "opcode": "sensing_touchingobject", "next": None, "parent": None, | |
| "inputs": {"TOUCHINGOBJECTMENU": [1, "some_id"]}, "fields": {}, "shadow": False, "topLevel": True, "x": 10, "y": 10 | |
| }, | |
| "sensing_touchingobjectmenu": { | |
| "opcode": "sensing_touchingobjectmenu", "next": None, "parent": None, | |
| "inputs": {}, "fields": {"TOUCHINGOBJECTMENU": ["_mouse_", None]}, "shadow": True, "topLevel": True, "x": 10, "y": 10 | |
| }, | |
| # Add more blocks from your block_content.json here... | |
| } | |
| # --- Heuristic-based block selection --- | |
| def get_relevant_blocks_for_plan(action_plan: Dict[str, Any], all_blocks_catalog: Dict[str, Any]) -> Dict[str, Any]: | |
| """ | |
| Analyzes the natural language action plan and selects relevant Scratch blocks | |
| from the comprehensive catalog. This is a heuristic approach and might need | |
| to be refined based on your specific use cases and LLM capabilities. | |
| """ | |
| relevant_opcodes = set() | |
| # Always include common event blocks | |
| relevant_opcodes.add("event_whenflagclicked") | |
| relevant_opcodes.add("event_whenkeypressed") # Could be more specific if key is mentioned | |
| # Keyword to opcode mapping (can be expanded) | |
| keyword_map = { | |
| "move": "motion_movesteps", | |
| "steps": "motion_movesteps", | |
| "turn": "motion_turnright", | |
| "rotate": "motion_turnright", | |
| "bounce": "motion_ifonedgebounce", | |
| "edge": "motion_ifonedgebounce", | |
| "forever": "control_forever", | |
| "loop": "control_forever", | |
| "if": "control_if", | |
| "condition": "control_if", | |
| "say": "looks_say", | |
| "hello": "looks_say", # Simple example, might need more context | |
| "touching": "sensing_touchingobject", | |
| "mouse pointer": "sensing_touchingobjectmenu", | |
| "edge": "sensing_touchingobjectmenu", # For touching edge | |
| } | |
| # Iterate through the action plan to find keywords | |
| for sprite_name, sprite_actions in action_plan.get("action_overall_flow", {}).items(): | |
| for plan in sprite_actions.get("plans", []): | |
| event_logic = plan.get("event", "").lower() + " " + plan.get("logic", "").lower() | |
| # Check for direct opcode matches (if the LLM somehow outputs opcodes in its plan) | |
| for opcode in all_blocks_catalog.keys(): | |
| if opcode in event_logic: | |
| relevant_opcodes.add(opcode) | |
| # Check for keywords | |
| for keyword, opcode in keyword_map.items(): | |
| if keyword in event_logic: | |
| relevant_opcodes.add(opcode) | |
| # Add associated shadow blocks if known | |
| if opcode == "sensing_touchingobject": | |
| relevant_opcodes.add("sensing_touchingobjectmenu") | |
| if opcode == "event_whenkeypressed": | |
| relevant_opcodes.add("event_whenkeypressed") # It's already there but good to be explicit | |
| # Construct the filtered catalog | |
| relevant_blocks_catalog = { | |
| opcode: all_blocks_catalog[opcode] | |
| for opcode in relevant_opcodes if opcode in all_blocks_catalog | |
| } | |
| return relevant_blocks_catalog | |
| # --- New Action Planning Node --- | |
| def plan_sprite_actions(state: GameState): | |
| logger.info("--- Running PlanSpriteActionsNode ---") | |
| planning_prompt = ( | |
| f"You are an AI assistant tasked with planning Scratch 3.0 block code for a game. " | |
| f"The game description is: '{state['description']}'.\n\n" | |
| f"Here are the sprites currently in the project: {', '.join(target['name'] for target in state['project_json']['targets'] if not target['isStage']) if len(state['project_json']['targets']) > 1 else 'None'}.\n" | |
| f"Initial positions: {json.dumps(state.get('sprite_initial_positions', {}), indent=2)}\n\n" | |
| f"Consider the main actions and interactions required for each sprite. " | |
| f"Think step-by-step about what each sprite needs to *do*, *when* it needs to do it (events), " | |
| f"and if any actions need to *repeat* or depend on *conditions*.\n\n" | |
| f"Propose a high-level action flow for each sprite in the following JSON format. " | |
| f"Do NOT generate Scratch block JSON yet. Only describe the logic using natural language or simplified pseudo-code.\n\n" | |
| f"Example format:\n" | |
| f"```json\n" | |
| f"{{\n" | |
| f" \"action_overall_flow\": {{\n" | |
| f" \"Sprite1\": {{\n" | |
| f" \"description\": \"Main character actions\",\n" | |
| f" \"plans\": [\n" | |
| f" {{\n" | |
| f" \"event\": \"when flag clicked\",\n" | |
| f" \"logic\": \"forever loop: move 10 steps, if on edge bounce\"\n" | |
| f" }},\n" | |
| f" {{\n" | |
| f" \"event\": \"when space key pressed\",\n" | |
| f" \"logic\": \"change y by 10, wait 0.1 seconds, change y by -10\"\n" | |
| f" }}\n" | |
| f" ]\n" | |
| f" }},\n" | |
| f" \"Ball\": {{\n" | |
| f" \"description\": \"Projectile movement\",\n" | |
| f" \"plans\": [\n" | |
| f" {{\n" | |
| f" \"event\": \"when I start as a clone\",\n" | |
| f" \"logic\": \"glide 1 sec to random position, if touching Sprite1 then stop this script\"\n" | |
| f" }}\n" | |
| f" ]\n" | |
| f" }}\n" | |
| f" }}\n" | |
| f"}}\n" | |
| f"```\n\n" | |
| f"Return ONLY the JSON object for the action overall flow." | |
| ) | |
| try: | |
| response = agent.invoke({"messages": [{"role": "user", "content": planning_prompt}]}) | |
| raw_response = response["messages"][-1].content | |
| print("Raw response from LLM [PlanSpriteActionsNode]:", raw_response) | |
| action_plan = extract_json_from_llm_response(raw_response) | |
| logger.info("Sprite action plan generated by PlanSpriteActionsNode.") | |
| return {"action_plan": action_plan} | |
| except Exception as e: | |
| logger.error(f"Error in PlanSpriteActionsNode: {e}") | |
| raise | |
| # --- Updated Action Node Builder (to consume the plan and build blocks) --- | |
| def build_action_nodes(state: GameState): | |
| logger.info("--- Running ActionNodeBuilder ---") | |
| action_plan = state.get("action_plan", {}) | |
| if not action_plan: | |
| raise ValueError("No action plan found in state. Run PlanSpriteActionsNode first.") | |
| # Convert the Scratch project JSON to a mutable Python object | |
| project_json = state["project_json"] | |
| targets = project_json["targets"] | |
| # We need a way to map sprite names to their actual target objects in project_json | |
| sprite_map = {target["name"]: target for target in targets if not target["isStage"]} | |
| # --- NEW: Get only the relevant blocks for the entire action plan --- | |
| relevant_scratch_blocks_catalog = get_relevant_blocks_for_plan(action_plan, ALL_SCRATCH_BLOCKS_CATALOG) | |
| logger.info(f"Filtered {len(relevant_scratch_blocks_catalog)} relevant blocks out of {len(ALL_SCRATCH_BLOCKS_CATALOG)} total.") | |
| # Iterate through the planned actions for each sprite | |
| for sprite_name, sprite_actions in action_plan.get("action_overall_flow", {}).items(): | |
| if sprite_name in sprite_map: | |
| current_sprite_target = sprite_map[sprite_name] | |
| # Ensure 'blocks' field exists for the sprite | |
| if "blocks" not in current_sprite_target: | |
| current_sprite_target["blocks"] = {} | |
| # Generate block JSON based on the detailed action plan for this sprite | |
| # This is where the LLM's role becomes crucial: translating logic to blocks | |
| llm_block_generation_prompt = ( | |
| f"You are an AI assistant generating Scratch 3.0 block JSON based on a provided plan. " | |
| f"The current sprite is '{sprite_name}'.\n" | |
| f"Its planned actions are:\n" | |
| f"```json\n{json.dumps(sprite_actions, indent=2)}\n```\n\n" | |
| f"Here is a **curated catalog of only the most relevant Scratch 3.0 blocks** for this plan:\n" | |
| f"```json\n{json.dumps(relevant_scratch_blocks_catalog, indent=2)}\n```\n\n" | |
| f"Current Scratch project JSON (for context, specifically this sprite's existing blocks if any):\n" | |
| f"```json\n{json.dumps(current_sprite_target, indent=2)}\n```\n\n" | |
| f"**Instructions:**\n" | |
| f"1. For each planned event and its associated logic, generate the corresponding Scratch 3.0 block JSON.\n" | |
| f"2. **Generate unique block IDs** for every new block. Use a format like 'block_id_abcdef12'.\n" # Updated ID format hint | |
| f"3. Properly link blocks using `next` and `parent` fields to form execution stacks. Hat blocks (`topLevel: true`, `parent: null`).\n" | |
| f"4. Correctly fill `inputs` and `fields` based on the catalog and the plan's logic (e.g., specific values for motion, keys for events, conditions for controls).\n" | |
| f"5. For C-blocks (like `control_repeat`, `control_forever`, `control_if`), use the `SUBSTACK` input to link to the first block inside its loop/conditional.\n" | |
| f"6. If the plan involves operators (e.g., 'if touching Sprite1'), use the appropriate operator blocks from the catalog and link them correctly as `CONDITION` inputs.\n" | |
| f"7. Ensure that any shadow blocks (e.g., for dropdowns like `motion_goto_menu`, `sensing_touchingobjectmenu`) are generated with `shadow: true` and linked correctly as inputs to their parent block.\n" | |
| f"8. Return ONLY the **updated 'blocks' dictionary** for this specific sprite. Do NOT return the full project JSON. ONLY the `blocks` dictionary." | |
| ) | |
| try: | |
| response = agent.invoke({"messages": [{"role": "user", "content": llm_block_generation_prompt}]}) | |
| raw_response = response["messages"][-1].content | |
| print(f"Raw response from LLM [ActionNodeBuilder - {sprite_name}]:", raw_response) | |
| generated_blocks = extract_json_from_llm_response(raw_response) | |
| current_sprite_target["blocks"].update(generated_blocks) # Merge new blocks | |
| logger.info(f"Action blocks added for sprite '{sprite_name}' by ActionNodeBuilder.") | |
| except Exception as e: | |
| logger.error(f"Error generating blocks for sprite '{sprite_name}': {e}") | |
| # Depending on robustness needed, you might continue or re-raise | |
| raise | |
| return {"project_json": project_json} | |
| # --- Example Usage (to demonstrate the flow) --- | |
| if __name__ == "__main__": | |
| # Initialize a mock game state | |
| initial_game_state = { | |
| "description": "A simple game where a sprite moves and says hello.", | |
| "project_json": { | |
| "targets": [ | |
| {"isStage": True, "name": "Stage", "blocks": {}}, | |
| {"isStage": False, "name": "Sprite1", "blocks": {}}, | |
| {"isStage": False, "name": "Ball", "blocks": {}} | |
| ] | |
| }, | |
| "sprite_initial_positions": {} | |
| } | |
| # Step 1: Plan Sprite Actions | |
| try: | |
| state_after_planning = plan_sprite_actions(initial_game_state) | |
| initial_game_state.update(state_after_planning) | |
| print("\n--- Game State After Planning ---") | |
| print(json.dumps(initial_game_state, indent=2)) | |
| except Exception as e: | |
| print(f"Planning failed: {e}") | |
| exit() | |
| # Step 2: Build Action Nodes (Generate Blocks) | |
| try: | |
| state_after_building = build_action_nodes(initial_game_state) | |
| initial_game_state.update(state_after_building) | |
| print("\n--- Game State After Building Blocks ---") | |
| # Print only the blocks for a specific sprite to keep output manageable | |
| for target in initial_game_state["project_json"]["targets"]: | |
| if not target["isStage"]: | |
| print(f"\nBlocks for {target['name']}:") | |
| print(json.dumps(target.get('blocks', {}), indent=2)) | |
| except Exception as e: | |
| print(f"Building blocks failed: {e}") |