Spaces:
Running
Running
| from flask import Flask, request, jsonify, render_template, send_from_directory, send_file | |
| import cv2, json,base64,io,os,tempfile,logging, re | |
| import numpy as np | |
| from unstructured.partition.pdf import partition_pdf | |
| from PIL import Image | |
| # from imutils.perspective import four_point_transform | |
| from dotenv import load_dotenv | |
| import pytesseract | |
| from werkzeug.utils import secure_filename | |
| from langchain_groq import ChatGroq | |
| from langgraph.prebuilt import create_react_agent | |
| from pdf2image import convert_from_path, convert_from_bytes | |
| from concurrent.futures import ThreadPoolExecutor | |
| from pdf2image.exceptions import PDFInfoNotInstalledError | |
| from typing import Dict, TypedDict, Optional, Any | |
| from langgraph.graph import StateGraph, END | |
| import uuid | |
| import shutil, time, functools | |
| from langchain_experimental.open_clip.open_clip import OpenCLIPEmbeddings | |
| from langchain_core.utils.utils import secret_from_env | |
| # from matplotlib.offsetbox import OffsetImage, AnnotationBbox | |
| from io import BytesIO | |
| from pathlib import Path | |
| import os | |
| from utils.block_relation_builder import block_builder, separate_scripts, transform_logic_to_action_flow, analyze_opcode_counts | |
| from langchain.chat_models import ChatOpenAI | |
| from langchain_openai import ChatOpenAI | |
| from pydantic import Field, SecretStr | |
| from difflib import get_close_matches | |
| import torch | |
| from transformers import AutoImageProcessor, AutoModel | |
| # --- Config (tune threads as needed) --- | |
| DINOV2_MODEL = "facebook/dinov2-small" # small = best CPU latency/quality tradeoff | |
| DEVICE = torch.device("cpu") | |
| torch.set_num_threads(4) # tune for your CPU | |
| # --- Globals for single-shot model load --- | |
| _dinov2_processor = None | |
| _dinov2_model = None | |
| # os.environ["OPENROUTER_API_KEY"] = os.getenv("OPENROUTER_API_KEY", "default_key_or_placeholder") | |
| # class ChatOpenRouter(ChatOpenAI): | |
| # openai_api_key: Optional[SecretStr] = Field( | |
| # alias="api_key", | |
| # default_factory=secret_from_env("OPENROUTER_API_KEY", default=None), | |
| # ) | |
| # @property | |
| # def lc_secrets(self) -> dict[str, str]: | |
| # return {"openai_api_key": "OPENROUTER_API_KEY"} | |
| # def __init__(self, | |
| # openai_api_key: Optional[str] = None, | |
| # **kwargs): | |
| # openai_api_key = ( | |
| # openai_api_key or os.environ.get("OPENROUTER_API_KEY") | |
| # ) | |
| # super().__init__( | |
| # base_url="https://openrouter.ai/api/v1", | |
| # openai_api_key=openai_api_key, | |
| # **kwargs | |
| # ) | |
| # llm2 = ChatOpenRouter( | |
| # #model_name="deepseek/deepseek-r1-0528:free", | |
| # #model_name="google/gemini-2.0-flash-exp:free", | |
| # #model_name="deepseek/deepseek-v3-base:free", | |
| # model_name="deepseek/deepseek-r1:free" | |
| # ) | |
| def log_execution_time(func): | |
| def wrapper(*args, **kwargs): | |
| start_time = time.time() | |
| result = func(*args, **kwargs) | |
| end_time = time.time() | |
| logger.info(f"⏱ {func.__name__} executed in {end_time - start_time:.2f} seconds") | |
| return result | |
| return wrapper | |
| # global pdf_doc | |
| # ============================== # | |
| # INITIALIZE CLIP EMBEDDER # | |
| # ============================== # | |
| # clip_embd = OpenCLIPEmbeddings() | |
| # Configure logging | |
| logging.basicConfig( | |
| level=logging.DEBUG, # Use INFO or ERROR in production | |
| format="%(asctime)s [%(levelname)s] %(message)s", | |
| handlers=[ | |
| logging.FileHandler("/app/logs/app.log"), | |
| logging.StreamHandler() | |
| ] | |
| ) | |
| logger = logging.getLogger(__name__) | |
| load_dotenv() | |
| # os.environ["GROQ_API_KEY"] = os.getenv("GROQ_API_KEY") | |
| groq_api_key = os.getenv("GROQ_API_KEY") | |
| llm = ChatGroq( | |
| model="meta-llama/llama-4-scout-17b-16e-instruct", | |
| temperature=0, | |
| max_tokens=None, | |
| ) | |
| app = Flask(__name__) | |
| # ============================== # | |
| # TESSERACT CONFIGURATION # | |
| # ============================== # | |
| # Set the Tesseract executable path | |
| # pytesseract.pytesseract.tesseract_cmd = r"C:\Program Files\Tesseract-OCR\tesseract.exe" | |
| pytesseract.pytesseract.tesseract_cmd = (r'/usr/bin/tesseract') | |
| # Set the TESSDATA_PREFIX environment variable to the directory containing the 'tessdata' folder | |
| # This is crucial for Tesseract to find its language data files (e.g., eng.traineddata) | |
| # os.environ['TESSDATA_PREFIX'] = r'C:\Program Files\Tesseract-OCR' | |
| # poppler_path = r"C:\poppler\Library\bin" | |
| backdrop_images_path = r"app\blocks\Backdrops" | |
| sprite_images_path = r"app\blocks\sprites" | |
| code_blocks_image_path = r"app\blocks\code_blocks" | |
| count = 0 | |
| BASE_DIR = Path("/app") | |
| BLOCKS_DIR = BASE_DIR / "blocks" | |
| STATIC_DIR = BASE_DIR / "static" | |
| GEN_PROJECT_DIR = BASE_DIR / "generated_projects" | |
| BACKDROP_DIR = BLOCKS_DIR / "Backdrops" | |
| SPRITE_DIR = BLOCKS_DIR / "sprites" | |
| CODE_BLOCKS_DIR = BLOCKS_DIR / "code_blocks" | |
| SOUND_DIR = BLOCKS_DIR / "sound" | |
| # === new: outputs rooted under BASE_DIR === | |
| OUTPUT_DIR = BASE_DIR / "outputs" | |
| # DETECTED_IMAGE_DIR = OUTPUT_DIR / "DETECTED_IMAGE" | |
| # SCANNED_IMAGE_DIR = OUTPUT_DIR / "SCANNED_IMAGE" | |
| # JSON_DIR = OUTPUT_DIR / "EXTRACTED_JSON" | |
| # make all of them in one go | |
| for d in ( | |
| BLOCKS_DIR, | |
| STATIC_DIR, | |
| GEN_PROJECT_DIR, | |
| BACKDROP_DIR, | |
| SPRITE_DIR, | |
| CODE_BLOCKS_DIR, | |
| OUTPUT_DIR, | |
| # DETECTED_IMAGE_DIR, | |
| # SCANNED_IMAGE_DIR, | |
| # JSON_DIR, | |
| ): | |
| d.mkdir(parents=True, exist_ok=True) | |
| # class GameState(TypedDict): | |
| # project_json: dict | |
| # description: str | |
| # project_id: str | |
| # project_image: str | |
| # pseudo_code: dict | |
| # action_plan: Optional[Dict] | |
| # temporary_node: Optional[Dict] | |
| class GameState(TypedDict): | |
| project_json: dict | |
| description: str | |
| project_id: str | |
| project_image: str | |
| pseudo_code: dict | |
| action_plan: Optional[Dict] | |
| temporary_node: Optional[Dict] | |
| page_count: int | |
| processing: bool | |
| temp_pseudo_code: list | |
| # Refined SYSTEM_PROMPT with more explicit Scratch JSON rules, especially for variables | |
| SYSTEM_PROMPT = """ | |
| You are GameScratchAgent, an expert in Scratch 3.0 game development. | |
| Your task is to process OCR-extracted text from images of Scratch 3.0 code blocks and produce **precisely formatted pseudocode JSON**. | |
| ### Core Role | |
| - Treat this as an OCR refinement task: the input may contain typos, spacing issues, or incomplete tokens. | |
| - Intelligently correct such OCR mistakes to align with valid Scratch 3.0 block syntax. | |
| - Always generate logically valid pseudocode consistent with Scratch semantics. | |
| ### Responsibilities | |
| 1. **Code-block detection** | |
| - If no Scratch code-blocks are detected → return `"No Code-blocks"`. | |
| - If code-blocks are present → refine into pseudocode. | |
| 2. **Script ownership** | |
| - Extract the value from `"Script for:"`. | |
| - If it exactly matches any costume in `Stage_costumes`, set `"name_variable": "Stage"`. | |
| - Otherwise, keep the detected target name. | |
| 3. **Pseudocode refinement** | |
| - Correct OCR artifacts automatically (e.g., “when cliked” → “when green flag clicked”). | |
| - Apply strict Scratch rules for variables, values, dropdowns, reporters, and booleans. | |
| - Ensure indentation of nested blocks (4 spaces). | |
| - Every hat block must end with `end`. | |
| - Do not include explanations or comments. | |
| - **The pseudocode must always be returned as a single string separated by `\n` (not as a list of strings).** | |
| - **Never use string concatenation (`+`) or arrays—only one continuous string.** | |
| - **Every nested control structure (forever, repeat, if, if-else, etc.) must also have its own `end` placed at the correct depth, ensuring proper closure of each block. The placement of `end` is critical for differentiating script meaning (e.g., Case 1 vs Case 2 nesting).** | |
| 4. **Formatting precautions** | |
| - Numbers → `(5)`, `(-130)` | |
| - Text/strings → `(hello)` | |
| - Variables → `[score v]` | |
| - Dropdowns → `[space v]`, `[Game Over v]` | |
| - Reporter blocks → `((x position))` | |
| - Boolean logic → `<condition>`, `<<cond1> and <cond2>>`, `<not <cond>>` | |
| - Operators → explicit, e.g. `(([speed v]) * (1.1))` | |
| ### Critical Notes | |
| - Be robust to OCR noise: missing characters, misread symbols, or accidental merges. | |
| - Never output raw OCR mistakes—always normalize to valid Scratch pseudocode. | |
| - Output only the JSON object, nothing else. | |
| """ | |
| # SYSTEM_PROMPT_JSON_CORRECTOR = """ | |
| # You are an assistant that outputs JSON responses strictly following the given schema. | |
| # If the JSON you produce has any formatting errors, missing required fields, or invalid structure, you must identify the problems and correct them. | |
| # Always return only valid JSON that fully conforms to the schema below, enclosed in triple backticks (```), without any extra text or explanation. | |
| # If you receive an invalid or incomplete JSON response, fix it by: | |
| # - Adding any missing required fields with appropriate values. | |
| # - Correcting syntax errors such as missing commas, brackets, or quotes. | |
| # - Ensuring the JSON structure matches the schema exactly. | |
| # - Ensuring `"pseudocode"` is always a **single JSON string with embedded `\n` newlines** (never arrays, never concatenated with `+`). | |
| # - Removing any invalid concatenation artifacts (`+`, `"string1" + "string2"`). | |
| # - Never output explanations, comments, or extra text — only the corrected JSON. | |
| # - **Every nested control structure (forever, repeat, if, if-else, etc.) must also have its own `end` placed at the correct depth, ensuring proper closure of each block. The placement of `end` is critical for differentiating script meaning (e.g., Case 1 vs Case 2 nesting).** | |
| # Remember: Your output must be valid JSON only, ready to be parsed without errors. | |
| # """ | |
| SYSTEM_PROMPT_JSON_CORRECTOR = """ | |
| You are a JSON correction assistant. Your ONLY task is to fix malformed JSON and return it in the correct format. | |
| REQUIRED OUTPUT FORMAT: | |
| { | |
| "refined_logic": { | |
| "name_variable": "sprite_name_here", | |
| "pseudocode": "pseudocode_string_here" | |
| } | |
| } | |
| RULES: | |
| 1. Extract the sprite name and pseudocode from the input | |
| 2. Return ONLY valid JSON in the exact format above | |
| 3. No explanations, no extra text, no other fields | |
| 4. If you can't find the data, use "Unknown" for name_variable and "No pseudocode found" for pseudocode | |
| """ | |
| # debugger and resolver agent for Scratch 3.0 | |
| # Main agent of the system agent for Scratch 3.0 | |
| agent = create_react_agent( | |
| model=llm, | |
| tools=[], # No specific tools are defined here, but could be added later | |
| prompt=SYSTEM_PROMPT | |
| ) | |
| # agent_2 = create_react_agent( | |
| # model=llm2, | |
| # tools=[], # No specific tools are defined here, but could be added later | |
| # prompt=SYSTEM_PROMPT | |
| # ) | |
| agent_json_resolver = create_react_agent( | |
| model=llm, | |
| tools=[], # No specific tools are defined here, but could be added later | |
| prompt=SYSTEM_PROMPT_JSON_CORRECTOR | |
| ) | |
| def init_dinov2(model_name: str = DINOV2_MODEL, device: torch.device = DEVICE): | |
| """ | |
| Lazy-initialize DINOv2 processor & model (call once before embedding). | |
| """ | |
| global _dinov2_processor, _dinov2_model | |
| if _dinov2_processor is None or _dinov2_model is None: | |
| _dinov2_processor = AutoImageProcessor.from_pretrained(model_name) | |
| _dinov2_model = AutoModel.from_pretrained(model_name) | |
| _dinov2_model.eval().to(device) | |
| def embed_bytesio_list(bytesio_list, batch_size: int = 8): | |
| """ | |
| Accepts a list of BytesIO objects (each contains an image). | |
| Returns: np.ndarray shape (N, D) of L2-normalized embeddings (dtype float32). | |
| """ | |
| if _dinov2_processor is None or _dinov2_model is None: | |
| init_dinov2() | |
| imgs = [] | |
| for b in bytesio_list: | |
| with Image.open(b) as original_img: | |
| # Create a new image with a white background in RGB mode | |
| final_img = Image.new("RGB", original_img.size, (255, 255, 255)) | |
| # Paste the original image onto the white background, using the alpha channel as a mask if it exists | |
| if original_img.mode == 'RGBA': | |
| final_img.paste(original_img, mask=original_img.split()[-1]) | |
| else: | |
| final_img.paste(original_img) | |
| imgs.append(final_img.copy()) | |
| embs = [] | |
| for i in range(0, len(imgs), batch_size): | |
| batch = imgs[i: i + batch_size] | |
| inputs = _dinov2_processor(images=batch, return_tensors="pt") | |
| inputs = {k: v.to(DEVICE) for k, v in inputs.items()} | |
| with torch.no_grad(): | |
| out = _dinov2_model(**inputs) | |
| cls = out.last_hidden_state[:, 0, :] # (B, D) | |
| cls = torch.nn.functional.normalize(cls, p=2, dim=1) | |
| embs.append(cls.cpu().numpy()) | |
| if not embs: | |
| return np.zeros((0, _dinov2_model.config.hidden_size), dtype=np.float32) | |
| return np.vstack(embs).astype(np.float32) | |
| def l2_normalize_rows(a: np.ndarray, eps: float = 1e-12) -> np.ndarray: | |
| """ | |
| Row-wise L2 normalization for numpy arrays. | |
| """ | |
| norm = np.linalg.norm(a, axis=1, keepdims=True) | |
| return a / (norm + eps) | |
| # Helper function to load the block catalog from a JSON file | |
| def _load_block_catalog(block_type: str) -> Dict: | |
| """ | |
| Loads the Scratch block catalog named '{block_type}_blocks.json' | |
| from the <project_root>/blocks/ folder. Returns {} on any error. | |
| """ | |
| catalog_path = BLOCKS_DIR / f"{block_type}.json" | |
| try: | |
| text = catalog_path.read_text() # will raise FileNotFoundError if missing | |
| catalog = json.loads(text) # will raise JSONDecodeError if malformed | |
| logger.info(f"Successfully loaded block catalog from {catalog_path}") | |
| return catalog | |
| except FileNotFoundError: | |
| logger.error(f"Error: Block catalog file not found at {catalog_path}") | |
| except json.JSONDecodeError as e: | |
| logger.error(f"Error decoding JSON from {catalog_path}: {e}") | |
| except Exception as e: | |
| logger.error(f"Unexpected error loading {catalog_path}: {e}") | |
| def get_block_by_opcode(catalog_data: dict, opcode: str) -> dict | None: | |
| """ | |
| Search a single catalog (with keys "description" and "blocks": List[dict]) | |
| for a block whose 'op_code' matches the given opcode. | |
| Returns the block dict or None if not found. | |
| """ | |
| for block in catalog_data["blocks"]: | |
| if block.get("op_code") == opcode: | |
| return block | |
| return None | |
| # Helper function to find a block in all catalogs by opcode | |
| def find_block_in_all(opcode: str, all_catalogs: list[dict]) -> dict | None: | |
| """ | |
| Search across multiple catalogs for a given opcode. | |
| Returns the first matching block dict or None. | |
| """ | |
| for catalog in all_catalogs: | |
| blk = get_block_by_opcode(catalog, opcode) | |
| if blk is not None: | |
| return blk | |
| return None | |
| def variable_intialization(project_data): | |
| """ | |
| Updates variable and broadcast definitions in a Scratch project JSON, | |
| populating the 'variables' and 'broadcasts' sections of the Stage target | |
| and extracting initial values for variables. | |
| Args: | |
| project_data (dict): The loaded JSON data of the Scratch project. | |
| Returns: | |
| dict: The updated project JSON data. | |
| """ | |
| stage_target = None | |
| for target in project_data['targets']: | |
| if target.get('isStage'): | |
| stage_target = target | |
| break | |
| if stage_target is None: | |
| print("Error: Stage target not found in the project data.") | |
| return project_data | |
| # Ensure 'variables' and 'broadcasts' exist in the Stage target | |
| if "variables" not in stage_target: | |
| stage_target["variables"] = {} | |
| if "broadcasts" not in stage_target: | |
| stage_target["broadcasts"] = {} | |
| # Helper function to recursively find and update variable/broadcast fields | |
| def process_dict(obj): | |
| if isinstance(obj, dict): | |
| # Check for "data_setvariableto" opcode to extract initial values | |
| if obj.get("opcode") == "data_setvariableto": | |
| variable_field = obj.get("fields", {}).get("VARIABLE") | |
| value_input = obj.get("inputs", {}).get("VALUE") | |
| if variable_field and isinstance(variable_field, list) and len(variable_field) == 2: | |
| var_name = variable_field[0] | |
| var_id = variable_field[1] | |
| initial_value = "" | |
| if value_input and isinstance(value_input, list) and len(value_input) > 1 and \ | |
| isinstance(value_input[1], list) and len(value_input[1]) > 1: | |
| if value_input[1][0] == 10: | |
| initial_value = str(value_input[1][1]) | |
| elif value_input[1][0] == 12 and len(value_input) > 2 and isinstance(value_input[2], list) and value_input[2][0] == 10: | |
| initial_value = str(value_input[2][1]) | |
| elif isinstance(value_input[1], (str, int, float)): | |
| initial_value = str(value_input[1]) | |
| stage_target["variables"][var_id] = [var_name, initial_value] | |
| for key, value in obj.items(): | |
| # Process broadcast definitions in 'inputs' (BROADCAST_INPUT) | |
| if key == "BROADCAST_INPUT" and isinstance(value, list) and len(value) == 2 and \ | |
| isinstance(value[1], list) and len(value[1]) == 3 and value[1][0] == 11: | |
| broadcast_name = value[1][1] | |
| broadcast_id = value[1][2] | |
| stage_target["broadcasts"][broadcast_id] = broadcast_name | |
| # Process broadcast definitions in 'fields' (BROADCAST_OPTION) | |
| elif key == "BROADCAST_OPTION" and isinstance(value, list) and len(value) == 2: | |
| broadcast_name = value[0] | |
| broadcast_id = value[1] | |
| stage_target["broadcasts"][broadcast_id] = broadcast_name | |
| # Recursively call for nested dictionaries or lists | |
| process_dict(value) | |
| elif isinstance(obj, list): | |
| for i, item in enumerate(obj): | |
| # Process variable references in 'inputs' (like [12, "score", "id"]) | |
| if isinstance(item, list) and len(item) == 3 and item[0] == 12: | |
| var_name = item[1] | |
| var_id = item[2] | |
| if var_id not in stage_target["variables"]: | |
| stage_target["variables"][var_id] = [var_name, ""] | |
| process_dict(item) | |
| # Iterate through all targets to process their blocks | |
| for target in project_data['targets']: | |
| if "blocks" in target: | |
| for block_id, block_data in target["blocks"].items(): | |
| process_dict(block_data) | |
| return project_data | |
| def deduplicate_variables(project_data): | |
| """ | |
| Removes duplicate variable entries in the 'variables' dictionary of the Stage target, | |
| prioritizing entries with non-empty values. | |
| Args: | |
| project_data (dict): The loaded JSON data of the Scratch project. | |
| Returns: | |
| dict: The updated project JSON data with deduplicated variables. | |
| """ | |
| stage_target = None | |
| for target in project_data['targets']: | |
| if target.get('isStage'): | |
| stage_target = target | |
| break | |
| if stage_target is None: | |
| print("Error: Stage target not found in the project data.") | |
| return project_data | |
| if "variables" not in stage_target: | |
| return project_data # No variables to deduplicate | |
| # Use a temporary dictionary to store the preferred variable entry by name | |
| # Format: {variable_name: [variable_id, variable_name, variable_value]} | |
| resolved_variables = {} | |
| for var_id, var_info in stage_target["variables"].items(): | |
| var_name = var_info[0] | |
| var_value = var_info[1] | |
| if var_name not in resolved_variables: | |
| # If the variable name is not yet seen, add it | |
| resolved_variables[var_name] = [var_id, var_name, var_value] | |
| else: | |
| # If the variable name is already seen, decide which one to keep | |
| existing_id, existing_name, existing_value = resolved_variables[var_name] | |
| # Prioritize the entry with a non-empty value | |
| if var_value != "" and existing_value == "": | |
| resolved_variables[var_name] = [var_id, var_name, var_value] | |
| # If both have non-empty values, or both are empty, keep the current one (arbitrary choice, but consistent) | |
| # The current logic will effectively keep the last one encountered that has a value, | |
| # or the very last one if all are empty. | |
| elif var_value != "" and existing_value != "": | |
| # If there are multiple non-empty values for the same variable name | |
| # this keeps the one from the most recent iteration. | |
| # For the given example, this will correctly keep "5". | |
| resolved_variables[var_name] = [var_id, var_name, var_value] | |
| elif var_value == "" and existing_value == "": | |
| # If both are empty, just keep the current one (arbitrary) | |
| resolved_variables[var_name] = [var_id, var_name, var_value] | |
| # Reconstruct the 'variables' dictionary using the resolved entries | |
| new_variables_dict = {} | |
| for var_name, var_data in resolved_variables.items(): | |
| var_id_to_keep = var_data[0] | |
| var_name_to_keep = var_data[1] | |
| var_value_to_keep = var_data[2] | |
| new_variables_dict[var_id_to_keep] = [var_name_to_keep, var_value_to_keep] | |
| stage_target["variables"] = new_variables_dict | |
| return project_data | |
| def variable_adder_main(project_data): | |
| try: | |
| declare_variable_json= variable_intialization(project_data) | |
| print("declare_variable_json------->",declare_variable_json) | |
| except Exception as e: | |
| print(f"Error error in the variable initialization opcodes: {e}") | |
| try: | |
| processed_json= deduplicate_variables(declare_variable_json) | |
| print("processed_json------->",processed_json) | |
| return processed_json | |
| except Exception as e: | |
| print(f"Error error in the variable initialization opcodes: {e}") | |
| # --- Global variable for the block catalog --- | |
| ALL_SCRATCH_BLOCKS_CATALOG = {} | |
| BLOCK_CATALOG_PATH = "blocks" # Define the path to your JSON file | |
| HAT_BLOCKS_PATH = "hat_blocks" # Path to the hat blocks JSON file | |
| STACK_BLOCKS_PATH = "stack_blocks" # Path to the stack blocks JSON file | |
| REPORTER_BLOCKS_PATH = "reporter_blocks" # Path to the reporter blocks JSON file | |
| BOOLEAN_BLOCKS_PATH = "boolean_blocks" # Path to the boolean blocks JSON file | |
| C_BLOCKS_PATH = "c_blocks" # Path to the C blocks JSON file | |
| CAP_BLOCKS_PATH = "cap_blocks" # Path to the cap blocks JSON file | |
| # Load the block catalogs from their respective JSON files | |
| hat_block_data = _load_block_catalog(HAT_BLOCKS_PATH) | |
| hat_description = hat_block_data["description"] | |
| #hat_description = hat_block_data.get("description", "No description available") | |
| # hat_opcodes_functionalities = "\n".join([f" - Opcode: {block['op_code']}, functionality: {block['functionality']} example: standalone use: {block['example_standalone']}" for block in hat_block_data["blocks"]]) | |
| hat_opcodes_functionalities = "\n".join([ | |
| # f" - Opcode: {block.get('op_code', 'N/A')}, functionality: {block.get('functionality', 'N/A')}, example: standalone use {block.get('example_standalone', 'N/A')}" | |
| f" - Opcode: {block.get('op_code', 'N/A')}, example: standalone use {block.get('example_standalone', 'N/A')}" | |
| for block in hat_block_data.get("blocks", []) | |
| ]) if isinstance(hat_block_data.get("blocks"), list) else " No blocks information available." | |
| #hat_opcodes_functionalities = os.path.join(BLOCKS_DIR, "hat_blocks.txt") | |
| print("Hat blocks loaded successfully.", hat_description) | |
| boolean_block_data = _load_block_catalog(BOOLEAN_BLOCKS_PATH) | |
| boolean_description = boolean_block_data["description"] | |
| # boolean_opcodes_functionalities = "\n".join([f" - Opcode: {block['op_code']}, functionality: {block['functionality']} example: standalone use: {block['example_standalone']}" for block in boolean_block_data["blocks"]]) | |
| boolean_opcodes_functionalities = "\n".join([ | |
| # f" - Opcode: {block.get('op_code', 'N/A')}, functionality: {block.get('functionality', 'N/A')}, example: standalone use {block.get('example_standalone', 'N/A')}" | |
| f" - Opcode: {block.get('op_code', 'N/A')}, example: standalone use {block.get('example_standalone', 'N/A')}" | |
| for block in boolean_block_data.get("blocks", []) | |
| ]) if isinstance(boolean_block_data.get("blocks"), list) else " No blocks information available." | |
| #boolean_opcodes_functionalities = os.path.join(BLOCKS_DIR, "boolean_blocks.txt") | |
| c_block_data = _load_block_catalog(C_BLOCKS_PATH) | |
| c_description = c_block_data["description"] | |
| # c_opcodes_functionalities = "\n".join([f" - Opcode: {block['op_code']}, functionality: {block['functionality']} example: standalone use: {block['example_standalone']}" for block in c_block_data["blocks"]]) | |
| c_opcodes_functionalities = "\n".join([ | |
| # f" - Opcode: {block.get('op_code', 'N/A')}, functionality: {block.get('functionality', 'N/A')}, example: standalone use {block.get('example_standalone', 'N/A')}" | |
| f" - Opcode: {block.get('op_code', 'N/A')}, example: standalone use {block.get('example_standalone', 'N/A')}" | |
| for block in c_block_data.get("blocks", []) | |
| ]) if isinstance(c_block_data.get("blocks"), list) else " No blocks information available." | |
| #c_opcodes_functionalities = os.path.join(BLOCKS_DIR, "c_blocks.txt") | |
| cap_block_data = _load_block_catalog(CAP_BLOCKS_PATH) | |
| cap_description = cap_block_data["description"] | |
| # cap_opcodes_functionalities = "\n".join([f" - Opcode: {block['op_code']}, functionality: {block['functionality']} example: standalone use: {block['example_standalone']}" for block in cap_block_data["blocks"]]) | |
| cap_opcodes_functionalities = "\n".join([ | |
| # f" - Opcode: {block.get('op_code', 'N/A')}, functionality: {block.get('functionality', 'N/A')}, example: standalone use {block.get('example_standalone', 'N/A')}" | |
| f" - Opcode: {block.get('op_code', 'N/A')}, example: standalone use {block.get('example_standalone', 'N/A')}" | |
| for block in cap_block_data.get("blocks", []) | |
| ]) if isinstance(cap_block_data.get("blocks"), list) else " No blocks information available." | |
| #cap_opcodes_functionalities = os.path.join(BLOCKS_DIR, "cap_blocks.txt") | |
| reporter_block_data = _load_block_catalog(REPORTER_BLOCKS_PATH) | |
| reporter_description = reporter_block_data["description"] | |
| # reporter_opcodes_functionalities = "\n".join([f" - Opcode: {block['op_code']}, functionality: {block['functionality']} example: standalone use: {block['example_standalone']}" for block in reporter_block_data["blocks"]]) | |
| reporter_opcodes_functionalities = "\n".join([ | |
| # f" - Opcode: {block.get('op_code', 'N/A')}, functionality: {block.get('functionality', 'N/A')}, example: standalone use {block.get('example_standalone', 'N/A')}" | |
| f" - Opcode: {block.get('op_code', 'N/A')}, example: standalone use {block.get('example_standalone', 'N/A')}" | |
| for block in reporter_block_data.get("blocks", []) | |
| ]) if isinstance(reporter_block_data.get("blocks"), list) else " No blocks information available." | |
| #reporter_opcodes_functionalities = os.path.join(BLOCKS_DIR, "reporter_blocks.txt") | |
| stack_block_data = _load_block_catalog(STACK_BLOCKS_PATH) | |
| stack_description = stack_block_data["description"] | |
| # stack_opcodes_functionalities = "\n".join([f" - Opcode: {block['op_code']}, functionality: {block['functionality']} example: standalone use: {block['example_standalone']}" for block in stack_block_data["blocks"]]) | |
| stack_opcodes_functionalities = "\n".join([ | |
| # f" - Opcode: {block.get('op_code', 'N/A')}, functionality: {block.get('functionality', 'N/A')}, example: standalone use {block.get('example_standalone', 'N/A')}" | |
| f" - Opcode: {block.get('op_code', 'N/A')}, example: standalone use {block.get('example_standalone', 'N/A')}" | |
| for block in stack_block_data.get("blocks", []) | |
| ]) if isinstance(stack_block_data.get("blocks"), list) else " No blocks information available." | |
| #stack_opcodes_functionalities = os.path.join(BLOCKS_DIR, "stack_blocks.txt") | |
| # This makes ALL_SCRATCH_BLOCKS_CATALOG available globally | |
| ALL_SCRATCH_BLOCKS_CATALOG = _load_block_catalog(BLOCK_CATALOG_PATH) | |
| # Helper function to extract JSON from LLM response | |
| # def extract_json_from_llm_response(raw_response: str) -> dict: | |
| # # --- 1) Pull out the JSON code‑block if present --- | |
| # md = re.search(r"```(?:json)?\s*([\s\S]*?)\s*```", raw_response) | |
| # json_string = md.group(1).strip() if md else raw_response | |
| # # --- 2) Trim to the outermost { … } so we drop any prefix/suffix junk --- | |
| # first, last = json_string.find('{'), json_string.rfind('}') | |
| # if 0 <= first < last: | |
| # json_string = json_string[first:last+1] | |
| # # --- 3) PRE‑CLEANUP: remove stray assistant{…}, rogue assistant keys, fix boolean quotes --- | |
| # json_string = re.sub(r'\b\w+\s*{', '{', json_string) | |
| # json_string = re.sub(r'"assistant"\s*:', '', json_string) | |
| # json_string = re.sub(r'\b(false|true)"', r'\1', json_string) | |
| # logger.debug("Ran pre‑cleanup for stray tokens and boolean quotes.") | |
| # # --- 3.1) Fix stray inner quotes at start of name/list values --- | |
| # # e.g., { "name": " \"recent_scoress\"", ... } → "recent_scoress" | |
| # json_string = re.sub( | |
| # r'("name"\s*:\s*")\s*"', | |
| # r'\1', | |
| # json_string | |
| # ) | |
| # # --- 4) Escape all embedded quotes in any `logic` value up to the next key --- | |
| # def _esc(m): | |
| # prefix, body = m.group(1), m.group(2) | |
| # return prefix + body.replace('"', r'\"') | |
| # json_string = re.sub( | |
| # r'("logic"\s*:\s*")([\s\S]+?)(?=",\s*"[A-Za-z_]\w*"\s*:\s*)', | |
| # _esc, | |
| # json_string | |
| # ) | |
| # logger.debug("Escaped embedded quotes in logic fields.") | |
| # logger.debug("Quoted unquoted keys.") | |
| # # --- 6) Remove trailing commas before } or ] --- | |
| # json_string = re.sub(r',\s*(?=[}\],])', '', json_string) | |
| # json_string = re.sub(r',\s*,', ',', json_string) | |
| # logger.debug("Removed trailing commas.") | |
| # # --- 7) Balance braces: drop extra } at end if needed --- | |
| # ob, cb = json_string.count('{'), json_string.count('}') | |
| # if cb > ob: | |
| # excess = cb - ob | |
| # json_string = json_string.rstrip()[:-excess] | |
| # logger.debug(f"Stripped {excess} extra closing brace(s).") | |
| # # --- 8) Escape literal newlines in *all* string values --- | |
| # json_string = re.sub( | |
| # r'"((?:[^"\\]|\\.)*?)"', | |
| # lambda m: '"' + m.group(1).replace('\n', '\\n').replace('\r', '\\r') + '"', | |
| # json_string, | |
| # flags=re.DOTALL | |
| # ) | |
| # logger.debug("Escaped newlines in strings.") | |
| # # --- 9) Final parse attempt --- | |
| # try: | |
| # return json.loads(json_string) | |
| # except json.JSONDecodeError: | |
| # logger.error("Sanitized JSON still invalid:\n%s", json_string) | |
| # raise | |
| def extract_json_from_llm_response(raw_response: str) -> dict: | |
| """ | |
| Finds and parses the first valid JSON object from a raw LLM response string. | |
| """ | |
| logger.debug("Attempting to extract JSON from raw LLM response...") | |
| # 1. Look for a JSON markdown block first | |
| match = re.search(r"```(?:json)?\s*({[\s\S]*?})\s*```", raw_response) | |
| if match: | |
| json_string = match.group(1) | |
| logger.debug("Found JSON inside a markdown block.") | |
| try: | |
| return json.loads(json_string) | |
| except json.JSONDecodeError as e: | |
| logger.warning(f"Failed to parse JSON from markdown block: {e}") | |
| # Fall through to the next method if parsing fails | |
| # 2. If no block is found (or it failed), find the outermost braces | |
| logger.debug("Markdown block not found or failed. Searching for outermost braces.") | |
| try: | |
| first_brace = raw_response.find('{') | |
| last_brace = raw_response.rfind('}') | |
| if first_brace != -1 and last_brace != -1 and first_brace < last_brace: | |
| json_string = raw_response[first_brace : last_brace + 1] | |
| return json.loads(json_string) | |
| else: | |
| logger.error("Could not find a valid JSON structure (outermost braces).") | |
| raise json.JSONDecodeError("No valid JSON object found in the response.", raw_response, 0) | |
| except json.JSONDecodeError as e: | |
| logger.error(f"Final JSON parsing attempt failed: {e}") | |
| # Re-raise the exception to be caught by the calling logic (to invoke the corrector agent) | |
| raise | |
| def reduce_image_size_to_limit(clean_b64_str: str, max_kb: int = 4000) -> str: | |
| """ | |
| Input: clean_b64_str = BASE64 STRING (no data: prefix) | |
| Output: BASE64 STRING (no data: prefix), sized as close as possible to max_kb KB. | |
| Guarantees: returns a valid base64 string (never None). May still be larger than max_kb | |
| if saving at lowest quality cannot get under the limit. | |
| """ | |
| # sanitize | |
| clean = re.sub(r"\s+", "", clean_b64_str).strip() | |
| # fix padding | |
| missing = len(clean) % 4 | |
| if missing: | |
| clean += "=" * (4 - missing) | |
| try: | |
| image_data = base64.b64decode(clean) | |
| except Exception as e: | |
| raise ValueError("Invalid base64 input to reduce_image_size_to_limit") from e | |
| try: | |
| img = Image.open(io.BytesIO(image_data)) | |
| img.load() | |
| except Exception as e: | |
| raise ValueError("Could not open image from base64") from e | |
| # convert alpha -> RGB because JPEG doesn't support alpha | |
| if img.mode in ("RGBA", "LA") or (img.mode == "P" and "transparency" in img.info): | |
| background = Image.new("RGB", img.size, (255, 255, 255)) | |
| background.paste(img, mask=img.split()[-1] if img.mode != "RGB" else None) | |
| img = background | |
| elif img.mode != "RGB": | |
| img = img.convert("RGB") | |
| low, high = 20, 95 | |
| best_bytes = None | |
| # binary search for best quality | |
| while low <= high: | |
| mid = (low + high) // 2 | |
| buf = io.BytesIO() | |
| try: | |
| img.save(buf, format="JPEG", quality=mid, optimize=True) | |
| except OSError: | |
| # some PIL builds/channels may throw on optimize=True; fallback without optimize | |
| buf = io.BytesIO() | |
| img.save(buf, format="JPEG", quality=mid) | |
| size_kb = len(buf.getvalue()) / 1024.0 | |
| if size_kb <= max_kb: | |
| best_bytes = buf.getvalue() | |
| low = mid + 1 | |
| else: | |
| high = mid - 1 | |
| # if never found a quality <= max_kb, use the smallest we created (quality = 20) | |
| if best_bytes is None: | |
| buf = io.BytesIO() | |
| try: | |
| img.save(buf, format="JPEG", quality=20, optimize=True) | |
| except OSError: | |
| buf = io.BytesIO() | |
| img.save(buf, format="JPEG", quality=20) | |
| best_bytes = buf.getvalue() | |
| return base64.b64encode(best_bytes).decode("utf-8") | |
| def clean_base64_for_model(raw_b64, max_bytes_threshold=4000000) -> str: | |
| """ | |
| Accepts: raw_b64 can be: | |
| - a data URI 'data:image/png;base64,...' | |
| - a plain base64 string | |
| - a PIL Image | |
| - a list containing the above (take first) | |
| Returns: a data URI string 'data:<mime>;base64,<base64>' guaranteed to be syntactically valid. | |
| """ | |
| # normalize input | |
| if not raw_b64: | |
| return "" | |
| if isinstance(raw_b64, list): | |
| raw_b64 = raw_b64[0] if raw_b64 else "" | |
| if not raw_b64: | |
| return "" | |
| if isinstance(raw_b64, Image.Image): | |
| buf = io.BytesIO() | |
| # convert to RGB and save as JPEG to keep consistent | |
| img = raw_b64.convert("RGB") | |
| img.save(buf, format="JPEG") | |
| clean_b64 = base64.b64encode(buf.getvalue()).decode("utf-8") | |
| mime = "image/jpeg" | |
| return f"data:{mime};base64,{clean_b64}" | |
| if not isinstance(raw_b64, str): | |
| raise TypeError(f"Expected base64 string or PIL Image, got {type(raw_b64)}") | |
| # detect mime if present; otherwise default to png | |
| m = re.match(r"^data:(image\/[a-zA-Z0-9.+-]+);base64,(.+)$", raw_b64, flags=re.DOTALL) | |
| if m: | |
| mime = m.group(1) | |
| clean_b64 = m.group(2) | |
| else: | |
| # no prefix; assume png by default (you can change to jpeg if you prefer) | |
| mime = "image/png" | |
| clean_b64 = raw_b64 | |
| # sanitize base64 string | |
| clean_b64 = re.sub(r"\s+", "", clean_b64).strip() | |
| missing = len(clean_b64) % 4 | |
| if missing: | |
| clean_b64 += "=" * (4 - missing) | |
| original_size_bytes = len(clean_b64.encode("utf-8")) | |
| # debug print | |
| print(f"Original base64 size (bytes): {original_size_bytes}, mime: {mime}") | |
| if original_size_bytes > max_bytes_threshold: | |
| # reduce and return JPEG prefixed data URI (JPEG tends to compress better for photos) | |
| reduced_clean = reduce_image_size_to_limit(clean_b64, max_kb=4000) | |
| # reduced_clean is plain base64 (no prefix) | |
| print(f"Reduced base64 size (bytes): {original_size_bytes}, mime: {mime}") | |
| return f"data:image/jpeg;base64,{reduced_clean}" | |
| # otherwise return original with its mime prefix (ensure prefix exists) | |
| return f"data:{mime};base64,{clean_b64}" | |
| SCRATCH_OPCODES = [ | |
| 'motion_movesteps', 'motion_turnright', 'motion_turnleft', 'motion_goto', | |
| 'motion_gotoxy', 'motion_glideto', 'motion_glidesecstoxy', 'motion_pointindirection', | |
| 'motion_pointtowards', 'motion_changexby', 'motion_setx', 'motion_changeyby', | |
| 'motion_sety', 'motion_ifonedgebounce', 'motion_setrotationstyle', 'looks_sayforsecs', | |
| 'looks_say', 'looks_thinkforsecs', 'looks_think', 'looks_switchcostumeto', | |
| 'looks_nextcostume', 'looks_switchbackdropto', 'looks_switchbackdroptowait', | |
| 'looks_nextbackdrop', 'looks_changesizeby', 'looks_setsizeto', 'looks_changeeffectby', | |
| 'looks_seteffectto', 'looks_cleargraphiceffects', 'looks_show', 'looks_hide', | |
| 'looks_gotofrontback', 'looks_goforwardbackwardlayers', 'sound_playuntildone', | |
| 'sound_play', 'sound_stopallsounds', 'sound_changevolumeby', 'sound_setvolumeto', | |
| 'event_broadcast', 'event_broadcastandwait', 'control_wait', 'control_wait_until', | |
| 'control_stop', 'control_create_clone_of', 'control_delete_this_clone', | |
| 'data_setvariableto', 'data_changevariableby', 'data_addtolist', 'data_deleteoflist', | |
| 'data_insertatlist', 'data_replaceitemoflist', 'data_showvariable', 'data_hidevariable', | |
| 'data_showlist', 'data_hidelist', 'sensing_askandwait', 'sensing_resettimer', | |
| 'sensing_setdragmode', 'procedures_call', 'operator_lt', 'operator_equals', | |
| 'operator_gt', 'operator_and', 'operator_or', 'operator_not', 'operator_contains', | |
| 'sensing_touchingobject', 'sensing_touchingcolor', 'sensing_coloristouchingcolor', | |
| 'sensing_keypressed', 'sensing_mousedown', 'data_listcontainsitem', 'control_repeat', | |
| 'control_forever', 'control_if', 'control_if_else', 'control_repeat_until', | |
| 'motion_xposition', 'motion_yposition', 'motion_direction', 'looks_costumenumbername', | |
| 'looks_size', 'looks_backdropnumbername', 'sound_volume', 'sensing_distanceto', | |
| 'sensing_answer', 'sensing_mousex', 'sensing_mousey', 'sensing_loudness', | |
| 'sensing_timer', 'sensing_of', 'sensing_current', 'sensing_dayssince2000', | |
| 'sensing_username', 'operator_add', 'operator_subtract', 'operator_multiply', | |
| 'operator_divide', 'operator_random', 'operator_join', 'operator_letterof', | |
| 'operator_length', 'operator_mod', 'operator_round', 'operator_mathop', | |
| 'data_variable', 'data_list', 'data_itemoflist', 'data_lengthoflist', | |
| 'data_itemnumoflist', 'event_whenflagclicked', 'event_whenkeypressed', | |
| 'event_whenthisspriteclicked', 'event_whenbackdropswitchesto', 'event_whengreaterthan', | |
| 'event_whenbroadcastreceived', 'control_start_as_clone', 'procedures_definition' | |
| ] | |
| def validate_and_fix_opcodes(opcode_counts): | |
| """ | |
| Ensures all opcodes are valid. If an opcode is invalid, replace with closest match. | |
| """ | |
| corrected_list = [] | |
| for item in opcode_counts: | |
| opcode = item.get("opcode") | |
| count = item.get("count", 1) | |
| if opcode not in SCRATCH_OPCODES: | |
| # Find closest match (case-sensitive) | |
| match = get_close_matches(opcode, SCRATCH_OPCODES, n=1, cutoff=0.6) | |
| if match: | |
| print(f"Opcode '{opcode}' not found. Replacing with '{match[0]}'") | |
| opcode = match[0] | |
| else: | |
| print(f"Opcode '{opcode}' not recognized and no close match found. Skipping.") | |
| continue | |
| corrected_list.append({"opcode": opcode, "count": count}) | |
| # Merge duplicates after correction | |
| merged = {} | |
| for item in corrected_list: | |
| merged[item["opcode"]] = merged.get(item["opcode"], 0) + item["count"] | |
| return [{"opcode": k, "count": v} for k, v in merged.items()] | |
| def format_scratch_pseudo_code(code_string): | |
| """ | |
| Parses and formats Scratch pseudo-code with correct indentation, | |
| specifically handling if/else/end structures correctly. | |
| Args: | |
| code_string (str): A string containing Scratch pseudo-code with | |
| potentially inconsistent indentation. | |
| Returns: | |
| str: The correctly formatted and indented pseudo-code string. | |
| """ | |
| lines = code_string.strip().split('\n') | |
| formatted_lines = [] | |
| indent_level = 0 | |
| # Keywords that increase indentation for the NEXT line | |
| indent_keywords = ['when', 'forever', 'if', 'repeat', 'else'] | |
| # Keywords that decrease indentation for the CURRENT line | |
| unindent_keywords = ['end', 'else'] | |
| for line in lines: | |
| stripped_line = line.strip() | |
| if not stripped_line: | |
| continue | |
| # Check for keywords that should un-indent the current line | |
| if any(keyword in stripped_line for keyword in unindent_keywords): | |
| # Special case for 'else': it should align with its 'if' | |
| if 'else' in stripped_line: | |
| # Decrease indentation for 'else' and its following lines | |
| indentation = ' ' * (indent_level -1) | |
| formatted_lines.append(indentation + stripped_line) | |
| continue | |
| # For 'end', decrease the level before formatting | |
| indent_level = max(0, indent_level - 1) | |
| indentation = ' ' * indent_level | |
| formatted_lines.append(indentation + stripped_line) | |
| # Check for keywords that should indent the next line | |
| if any(keyword in stripped_line for keyword in indent_keywords): | |
| # 'else' both un-indents and indents, so the level remains the same for the next block | |
| if 'else' not in stripped_line: | |
| indent_level += 1 | |
| return '\n'.join(formatted_lines) | |
| # Node 1: Logic updating if any issue here | |
| def pseudo_generator_node(state: GameState): | |
| logger.info("--- Running plan_logic_aligner_node ---") | |
| image = state.get("project_image", "") | |
| project_json = state["project_json"] | |
| cnt =state["page_count"] | |
| print(f"The page number recived at the pseudo_generator node:-----> {cnt}") | |
| # MODIFICATION 1: Include 'Stage' in the list of names to plan for. | |
| # It's crucial to ensure 'Stage' is always present for its global role. | |
| target_names = [t["name"] for t in project_json["targets"]] | |
| stage_names = [t["name"] for t in project_json["targets"] if t.get("isStage")] | |
| sprite_names = [t["name"] for t in project_json["targets"] if not t.get("isStage")] | |
| # Get costumes separately for Stage and Sprites | |
| stage_costumes = [ | |
| c["name"] | |
| for t in project_json["targets"] if t.get("isStage") | |
| for c in t.get("costumes", []) | |
| ] | |
| refinement_prompt = f""" | |
| You are an expert in Scratch 3.0 game development, specializing in understanding block relationships (stacked, nested). | |
| "Analyze the Scratch code-block image and generate Pseudo-Code for what this logic appears to be doing." | |
| From Image, you also have to detect a value of Key given in Text form "Script for: ". Below is the example | |
| Example: "Script for: Bear", "Script for:" is a key and "Bear" is value and check if there is related target name available. | |
| **Special Rule for Stage Costumes:** | |
| If the value of "Script for:" exactly matches ANY costume name in the Stage_costumes list given below, | |
| then the `"name_variable"` in the output JSON must be set to `"Stage"` (not the costume name). | |
| **Targets in Game (Sprites and Stage) available in project_json:** | |
| Sprite_name:{', '.join(sprite_names)} | |
| Stage_name: Stage | |
| Stage_costumes: {', '.join(stage_costumes)} | |
| --- Scratch 3.0 Block Reference --- | |
| ### Hat Blocks | |
| Description: {hat_description} | |
| Blocks: | |
| {hat_opcodes_functionalities} | |
| ### Boolean Blocks | |
| Description: {boolean_description} | |
| Blocks: | |
| {boolean_opcodes_functionalities} | |
| ### C Blocks | |
| Description: {c_description} | |
| Blocks: | |
| {c_opcodes_functionalities} | |
| ### Cap Blocks | |
| Description: {cap_description} | |
| Blocks: | |
| {cap_opcodes_functionalities} | |
| ### Reporter Blocks | |
| Description: {reporter_description} | |
| Blocks: | |
| {reporter_opcodes_functionalities} | |
| ### Stack Blocks | |
| Description: {stack_description} | |
| Blocks: | |
| {stack_opcodes_functionalities} | |
| ----------------------------------- | |
| Your task is to: | |
| If you don't find any "Code-Blocks" then, | |
| **Don't generate Pseudo Code, and pass the message "No Code-blocks" | |
| If you find any "Code-Blocks" then, | |
| 1. **Refine the 'logic'**: Make it precise, accurate, and fully aligned with the Game Description. Use Scratch‑consistent verbs and phrasing. **Do NOT** use raw double‑quotes inside the logic string. | |
| 2. **Structural requirements**: | |
| - **Numeric values** `(e.g., 0, 5, 0.2, -130)` **must** be in parentheses: `(0)`, `(5)`, `(0.2)`, `(-130)`. | |
| - **AlphaNumeric values** `(e.g., hello, say 5, 4, hi!)` **must** be in parentheses: `(hello)`, `(say 5)`, `(4)`, `(hi!)`. | |
| - **Variables** must be in the form `[variable v]` (e.g., `[score v]`), even when used inside expressions two example use `set [score v] to (1)` or `show variable ([speed v])`. | |
| - **Dropdown options** must be in the form `[option v]` (e.g., `[Game Start v]`, `[blue sky v]`). example use `when [space v] key pressed`. | |
| - **Reporter blocks** used as inputs must be double‑wrapped: `((x position))`, `((y position))`. example use `if <((y position)) = (-130)> then` or `(((x position)) * (1))`. | |
| - **Boolean blocks** in conditions must be inside `< >`, including nested ones: `<not <condition>>`, `<<cond1> and <cond2>>`,`<<cond1> or <cond2>>`. | |
| - **Other Boolean blocks** in conditions must be inside `< >`, including nested ones or values or variables: `<(block/value/variable) * (block/value/variable)>`,`<(block/value/variable) < (block/value/variable)>`, and example of another variable`<[apple v] contains [a v]?>`. | |
| - **Operator expressions** must use explicit Scratch operator blocks, e.g.: | |
| ``` | |
| (([ballSpeed v]) * (1.1)) | |
| ``` | |
| - **Every hat block script must end** with a final `end` on its own line. | |
| - **[critical important]:Every nested control structure (forever, repeat, if, if-else, etc.) must also have its own `end` placed at the correct depth, ensuring proper closure of each block. The placement of `end` is critical for differentiating script meaning (e.g., Case 1 vs Case 2 nesting).** | |
| 3. **Pseudo‑code formatting**: | |
| - Represent each block or nested block on its own line. | |
| - Auto-correct invalid OCR phrases to valid Scratch 3.0 block names using the **Scratch 3.0 Block Reference** above. | |
| - **Indent nested blocks by 4 spaces under their parent (`forever`, `if`, etc.).This is a critical requirement.** | |
| - No comments or explanatory text—just the block sequence. | |
| - a natural language breakdown of each step taken after the event, formatted as a multi-line string representing pseudo-code. Ensure clarity and granularity—each described action should map closely to a Scratch block or tight sequence. | |
| - **The pseudocode must be returned as a single string separated by `\n` (not as a list of strings).** | |
| - **Never use string concatenation (`+`) or arrays—only one continuous string.** | |
| - **Every nested control structure (forever, repeat, if, if-else, etc.) must also have its own `end` placed at the correct depth, ensuring proper closure of each block. The placement of `end` is critical for differentiating script meaning (e.g., Case 1 vs Case 2 nesting).** | |
| 4. **Logic content**: | |
| - Build clear flow for mechanics (movement, jumping, flying, scoring, collisions). | |
| - Match each action closely to a Scratch block or tight sequence. | |
| - Do **NOT** include any justification or comments—only the raw logic. | |
| 5. **Examples for reference**: | |
| **Correct** pattern for a simple start script: | |
| ``` | |
| when green flag clicked | |
| switch backdrop to [blue sky v] | |
| set [score v] to (0) | |
| show variable [score v] | |
| broadcast [Game Start v] | |
| end | |
| ``` | |
| **Correct** pattern for updating the high score variable handling: | |
| ``` | |
| when I receive [Game Over v] | |
| if <((score)) > (([High Score v]))> then | |
| set [High Score v] to ([score v]) | |
| end | |
| switch backdrop to [Game Over v] | |
| end | |
| ``` | |
| **Correct** pattern for level up and increase difficulty use: | |
| ``` | |
| when I receive [Level Up v] | |
| change [level v] by (1) | |
| set [ballSpeed v] to ((([ballSpeed v]) * (1.1))) | |
| end | |
| ``` | |
| **Correct** pattern for jumping mechanics use: | |
| ``` | |
| when [space v] key pressed | |
| if <((y position)) = (-100)> then | |
| repeat (5) | |
| change y by (100) | |
| wait (0.1) seconds | |
| change y by (-100) | |
| wait (0.1) seconds | |
| end | |
| end | |
| end | |
| ``` | |
| **Correct** pattern for continuos moving objects use: | |
| ``` | |
| when green flag clicked | |
| go to x: (240) y: (-100) | |
| set [speed v] to (-5) | |
| show variable [speed v] | |
| forever | |
| change x by ([speed v]) | |
| if <((x position)) < (-240)> then | |
| go to x: (240) y: (-100) | |
| end | |
| end | |
| end | |
| ``` | |
| 6. **Donot** add any explaination of logic or comments to justify or explain just put the logic content in the json. | |
| 7. **Output**: | |
| Return **only** a JSON object, using double quotes everywhere: | |
| ```json | |
| {{ | |
| "refined_logic":{{ | |
| "name_variable": 'Value of "Sript for: "', | |
| "pseudocode":"…your fully‑formatted pseudo‑code here…", | |
| }} | |
| }} | |
| ``` | |
| """ | |
| image_input = { | |
| "type": "image_url", | |
| "image_url": { | |
| # "url": f"data:image/png;base64,{image}" | |
| "url": clean_base64_for_model(image[cnt]) | |
| } | |
| } | |
| content = [ | |
| {"type": "text", "text": refinement_prompt}, | |
| image_input | |
| ] | |
| try: | |
| # Invoke the main agent for logic refinement and relationship identification | |
| response = agent.invoke({"messages": [{"role": "user", "content": content}]}) | |
| llm_output_raw = response["messages"][-1].content.strip() | |
| print(f"llm_output_raw: {response}") | |
| parsed_llm_output = extract_json_from_llm_response(llm_output_raw) | |
| result = parsed_llm_output | |
| print(f"result:\n\n {result}") | |
| except json.JSONDecodeError as error_json: | |
| # If JSON parsing fails, use the json resolver agent | |
| # correction_prompt = ( | |
| # "Your task is to correct the provided JSON string to ensure it is **syntactically perfect and adheres strictly to JSON rules**.\n" | |
| # "It must be a JSON object with `refined_logic` (string) and `block_relationships` (array of objects).\n" | |
| # f"- **Error Details**: {error_json}\n\n" | |
| # "**Strict Instructions for your response:**\n" | |
| # "1. **ONLY** output the corrected JSON. Do not include any other text or explanations.\n" | |
| # "2. Ensure all keys and string values are enclosed in **double quotes**. Escape internal quotes (`\\`).\n" | |
| # "3. No trailing commas. Correct nesting.\n\n" | |
| # "Here is the problematic JSON string to correct:\n" | |
| # f"```json\n{llm_output_raw}\n```\n" | |
| # "Corrected JSON:\n" | |
| # ) | |
| correction_prompt = f""" | |
| Fix this malformed response and return only the corrected JSON: | |
| Input: {llm_output_raw if 'llm_output_raw' in locals() else 'No response available'} | |
| Extract the sprite name and pseudocode, then return in this exact format: | |
| {{ | |
| "refined_logic": {{ | |
| "name_variable": "sprite_name", | |
| "pseudocode": "pseudocode_here" | |
| }} | |
| }} | |
| """ | |
| try: | |
| correction_response = agent_json_resolver.invoke({"messages": [{"role": "user", "content": correction_prompt}]}) | |
| corrected_output = extract_json_from_llm_response(correction_response['messages'][-1].content) | |
| # corrected_output = extract_json_from_llm_response(llm_output_raw) | |
| #block_relationships = corrected_output.get("block_relationships", []) | |
| result = corrected_output | |
| print(f"result:\n\n {result}") | |
| except Exception as e_corr: | |
| logger.error(f"Failed to correct JSON output for even after retry: {e_corr}") | |
| result = { | |
| "refined_logic": { | |
| "name_variable": "Error", | |
| "pseudocode": "Processing failed" | |
| } | |
| } | |
| # Update the original action_plan in the state with the refined version | |
| state["pseudo_code"] = result | |
| state["temp_pseudo_code"] += [result] | |
| Data = state["temp_pseudo_code"] | |
| # with open("debug_state.json", "w", encoding="utf-8") as f: | |
| # json.dump(state, f, indent=2, ensure_ascii=False) | |
| print(f"[OVREALL REFINED PSEUDO CODE LOGIC]: {result}") | |
| print(f"[OVREALL LISTS OF LOGICS]: {Data}") | |
| logger.info("Plan refinement and block relation analysis completed for all plans.") | |
| return state | |
| # Node2: Node Optimizer node | |
| from typing import Dict, Any | |
| def node_optimizer(state: Dict[str, Any]) -> Dict[str, Any]: | |
| logger.info("--- Running Node Optimizer Node ---") | |
| # safe fetches | |
| project_json = state.get("project_json", {}) if isinstance(state.get("project_json", {}), dict) else {} | |
| # ensure pseudo_code exists so we can write into it safely | |
| raw = state.setdefault("pseudo_code", {}) | |
| refined_logic_data = raw.get("refined_logic", {}) or {} | |
| # keep scalar sprite_name (if present) and build a separate map | |
| sprite_name = refined_logic_data.get("name_variable", "<unknown>") | |
| sprite_name_map: Dict[str, str] = {} | |
| project_json_targets = project_json.get("targets", []) if isinstance(project_json, dict) else [] | |
| for target in project_json_targets: | |
| if isinstance(target, dict): | |
| name = target.get("name") | |
| if name: | |
| sprite_name_map[name] = name | |
| try: | |
| # ensure we read the pseudocode from the same key consistently | |
| pseudo_text = refined_logic_data.get("pseudocode", "") | |
| refined_logic_data["pseudocode"] = separate_scripts(str(pseudo_text)) | |
| # write refined logic back into state safely | |
| state["pseudo_code"]["refined_logic"] = refined_logic_data | |
| logger.debug("[The pseudo_code generated here]: %s", state["pseudo_code"]) | |
| # transform to action flow (expecting transform_logic_to_action_flow to accept the pseudo_code dict) | |
| state["action_plan"] = transform_logic_to_action_flow(state["pseudo_code"]) | |
| logger.debug("[The action plan generated here]: %s", state["action_plan"]) | |
| action_flow = state.get("action_plan", {}) | |
| # Normalize plan_data into a list of (sprite, sprite_data) pairs | |
| plan_data = [] | |
| if isinstance(action_flow, dict): | |
| ao = action_flow.get("action_overall_flow") | |
| if isinstance(ao, dict) and ao: | |
| plan_data = list(ao.items()) | |
| else: | |
| # fall back to the top-level dict items | |
| plan_data = list(action_flow.items()) | |
| elif isinstance(action_flow, list): | |
| # possible structure: list of dicts each holding sprite info | |
| for item in action_flow: | |
| if isinstance(item, tuple) and len(item) == 2: | |
| plan_data.append(item) | |
| elif isinstance(item, dict): | |
| # try to detect common shapes | |
| if "name" in item and "plans" in item: | |
| plan_data.append((item["name"], item)) | |
| else: | |
| plan_data = [] | |
| refined_flow: Dict[str, Any] = {} | |
| for sprite, sprite_data in plan_data: | |
| if sprite is None: | |
| continue | |
| sprite_data = sprite_data or {} | |
| refined_plans = [] | |
| for plan in sprite_data.get("plans", []): | |
| logic = plan.get("logic", "") | |
| # analyze_opcode_counts should accept a string | |
| plan["opcode_counts"] = analyze_opcode_counts(str(logic)) | |
| refined_plans.append(plan) | |
| refined_flow[str(sprite)] = { | |
| "description": sprite_data.get("description", ""), | |
| "plans": refined_plans | |
| } | |
| if refined_flow: | |
| state["action_plan"] = refined_flow | |
| logger.info("Node Optimization completed.") | |
| return state | |
| except Exception: | |
| # log full stacktrace for debugging but return state so caller won't break | |
| logger.exception("Error in Node Optimizer Node — returning current state for inspection") | |
| return state | |
| # Node 5: block_builder_node | |
| def overall_block_builder_node_2(state: GameState): | |
| logger.info("--- Running OverallBlockBuilderNode ---") | |
| print("--- Running OverallBlockBuilderNode ---") | |
| project_json = state["project_json"] | |
| targets = project_json["targets"] | |
| # --- Sprite and Stage Target Mapping --- | |
| sprite_map = {target["name"]: target for target in targets if not target["isStage"]} | |
| stage_target = next((target for target in targets if target["isStage"]), None) | |
| if stage_target: | |
| sprite_map[stage_target["name"]] = stage_target | |
| action_plan = state.get("action_plan", {}) | |
| print("[Overall Action Plan received at the block generator]:", json.dumps(action_plan, indent=2)) | |
| if not action_plan: | |
| logger.warning("No action plan found in state. Skipping OverallBlockBuilderNode.") | |
| return state | |
| # Initialize offsets for script placement on the Scratch canvas | |
| # script_y_offset = {} | |
| # script_x_offset_per_sprite = {name: 0 for name in sprite_map.keys()} | |
| # --- Set initial fixed position --- | |
| INITIAL_X = 382 | |
| INITIAL_Y = -1089 | |
| # Initialize offsets for script placement on the Scratch canvas | |
| script_y_offset = {name: INITIAL_Y for name in sprite_map.keys()} | |
| script_x_offset_per_sprite = {name: INITIAL_X for name in sprite_map.keys()} | |
| # This handles potential variations in the action_plan structure. | |
| if action_plan.get("action_overall_flow", {}) == {}: | |
| plan_data = action_plan.items() | |
| else: | |
| plan_data = action_plan.get("action_overall_flow", {}).items() | |
| # --- Extract global project context for LLM --- | |
| all_sprite_names = list(sprite_map.keys()) | |
| all_variable_names = {} | |
| all_list_names = {} | |
| all_broadcast_messages = {} | |
| for target in targets: | |
| for var_id, var_info in target.get("variables", {}).items(): | |
| all_variable_names[var_info[0]] = var_id # Store name -> ID mapping (e.g., "myVariable": "myVarId123") | |
| for list_id, list_info in target.get("lists", {}).items(): | |
| all_list_names[list_info[0]] = list_id # Store name -> ID mapping | |
| for broadcast_id, broadcast_name in target.get("broadcasts", {}).items(): | |
| all_broadcast_messages[broadcast_name] = broadcast_id # Store name -> ID mapping | |
| # --- Process each sprite's action plan --- | |
| for sprite_name, sprite_actions_data in plan_data: | |
| if sprite_name in sprite_map: | |
| current_sprite_target = sprite_map[sprite_name] | |
| if "blocks" not in current_sprite_target: | |
| current_sprite_target["blocks"] = {} | |
| # if sprite_name not in script_y_offset: | |
| # script_y_offset[sprite_name] = 0 | |
| for plan_entry in sprite_actions_data.get("plans", []): | |
| logic_sequence = str(plan_entry["logic"]) | |
| opcode_counts = plan_entry.get("opcode_counts", {}) | |
| refined_indent_logic = format_scratch_pseudo_code(logic_sequence) | |
| print(f"\n--------------------------- refined indent logic: {refined_indent_logic}-------------------------------\n") | |
| try: | |
| generated_blocks = block_builder(opcode_counts, refined_indent_logic) | |
| # Ensure generated_blocks is a dictionary | |
| if not isinstance(generated_blocks, dict): | |
| logger.error(f"block_builder for sprite '{sprite_name}' returned non-dict type: {type(generated_blocks)}. Skipping block update.") | |
| continue # Skip to next plan_entry if output is not a dictionary | |
| if "blocks" in generated_blocks and isinstance(generated_blocks["blocks"], dict): | |
| logger.warning(f"LLM returned nested 'blocks' key for {sprite_name}. Unwrapping.") | |
| generated_blocks = generated_blocks["blocks"] | |
| # Update block positions for top-level script | |
| # for block_id, block_data in generated_blocks.items(): | |
| # if block_data.get("topLevel"): | |
| # block_data["x"] = script_x_offset_per_sprite.get(sprite_name, 0) | |
| # block_data["y"] = script_y_offset[sprite_name] | |
| # script_y_offset[sprite_name] += 150 # Increment for next script | |
| # Update block positions for top-level script | |
| for block_id, block_data in generated_blocks.items(): | |
| if block_data.get("topLevel"): | |
| block_data["x"] = INITIAL_X | |
| block_data["y"] = script_y_offset[sprite_name] | |
| script_y_offset[sprite_name] += 150 # increment only Y | |
| current_sprite_target["blocks"].update(generated_blocks) | |
| print(f"[current_sprite_target block updated]: {current_sprite_target['blocks']}") | |
| state["iteration_count"] = 0 | |
| logger.info(f"Action blocks added for sprite '{sprite_name}' by OverallBlockBuilderNode.") | |
| except Exception as e: | |
| logger.error(f"Error generating blocks for sprite '{sprite_name}': {e}") | |
| # Consider adding more specific error handling here if a malformed output | |
| # from block_builder should cause a specific state change, but generally | |
| # avoid nulling the entire project_json. | |
| state["project_json"] = project_json | |
| # with open("debug_state.json", "w", encoding="utf-8") as f: | |
| # json.dump(state, f, indent=2, ensure_ascii=False) | |
| return state | |
| # Node 6: variable adder node | |
| def variable_adder_node(state: GameState): | |
| logger.info("--- Running Variable Adder Node ---") | |
| project_json = state["project_json"] | |
| try: | |
| updated_project_json = variable_adder_main(project_json) | |
| if updated_project_json is not None: | |
| print("Variable added inside the project successfully!") | |
| state["project_json"]=updated_project_json | |
| else: | |
| print("Variable adder unable to add any variable inside the project!") | |
| state["project_json"]=project_json | |
| state["page_count"] +=1 | |
| return state | |
| except Exception as e: | |
| logger.error(f"Error in variable adder node while updating project_json': {e}") | |
| raise | |
| # Node 7: variable adder node | |
| def layer_order_correction(state: GameState): | |
| """ | |
| Ensures that all sprites (isStage: false) have unique layerOrder values >= 1. | |
| If duplicates are found, they are reassigned sequentially. | |
| """ | |
| logger.info("--- Running Layer Order Correction Node ---") | |
| try: | |
| project_json = state.get("project_json", {}) | |
| targets = project_json.get("targets", []) | |
| # Collect all sprites (ignore Stage) | |
| sprites = [t for t in targets if not t.get("isStage", False)] | |
| # Reassign layerOrder sequentially (starting from 1) | |
| for idx, sprite in enumerate(sprites, start=1): | |
| old_lo = sprite.get("layerOrder", None) | |
| sprite["layerOrder"] = idx | |
| logger.debug(f"Sprite '{sprite.get('name')}' layerOrder: {old_lo} -> {idx}") | |
| # Stage always remains 0 | |
| for target in targets: | |
| if target.get("isStage", False): | |
| target["layerOrder"] = 0 | |
| # Update state | |
| state["project_json"]["targets"] = targets | |
| logger.info("Layer Order Correction completed successfully.") | |
| return state | |
| except Exception as e: | |
| logger.error(f"Error in Layer Order Correction Node: {e}") | |
| return state | |
| # Node 8: variable adder node | |
| def processed_page_node(state: GameState): | |
| logger.info("--- Processing the Pages Node ---") | |
| image = state.get("project_image", "") | |
| cnt =state["page_count"] | |
| print(f"The page processed for page:--------------> {cnt}") | |
| if cnt<len(image): | |
| state["processing"]= True | |
| else: | |
| state["processing"]= False | |
| return state | |
| # Prepare manipulated sprite JSON structure | |
| # { changes: "pdf_stream" in place of "pdf_path" | |
| def extract_images_from_pdf(pdf_stream: io.BytesIO): | |
| ''' Extract images from PDF and generate structured sprite JSON ''' | |
| manipulated_json = {} | |
| img_elements = [] | |
| try: | |
| # { | |
| # pdf_path = Path(pdf_path) | |
| # pdf_filename = pdf_path.stem # e.g., "scratch_crab" | |
| # pdf_dir_path = str(pdf_path.parent).replace("/", "\\") | |
| # print("-------------------------------pdf_filename-------------------------------",pdf_filename) | |
| # print("-------------------------------pdf_dir_path-------------------------------",pdf_dir_path) | |
| if isinstance(pdf_stream, io.BytesIO): | |
| # use a random ID since there's no filename | |
| pdf_id = uuid.uuid4().hex | |
| else: | |
| pdf_id = os.path.splitext(os.path.basename(pdf_stream))[0] | |
| # extracted_image_subdir = DETECTED_IMAGE_DIR / pdf_filename | |
| # json_subdir = JSON_DIR / pdf_filename | |
| # extracted_image_subdir.mkdir(parents=True, exist_ok=True) | |
| # json_subdir.mkdir(parents=True, exist_ok=True) | |
| # print("-------------------------------extracted_image_subdir-------------------------------",extracted_image_subdir) | |
| # print("-------------------------------json_subdir-------------------------------",json_subdir) | |
| # # Output paths (now using Path objects directly) | |
| # output_json_path = json_subdir / "extracted.json" | |
| # final_json_path = json_subdir / "extracted_sprites.json" # Path to extracted_sprites.json | |
| # final_json_path_2 = json_subdir / "extracted_sprites_2.json" | |
| # print("-------------------------------output_json_path-------------------------------",output_json_path) | |
| # print("-------------------------------final_json_path-------------------------------",final_json_path) | |
| # print("-------------------------------final_json_path_2-------------------------------",final_json_path_2) | |
| # } | |
| try: | |
| elements = partition_pdf( | |
| # filename=str(pdf_path), # partition_pdf might expect a string | |
| file=pdf_stream, # 'file=', inplace of 'filename' | |
| strategy="hi_res", | |
| extract_image_block_types=["Image"], | |
| hi_res_model_name="yolox", | |
| extract_image_block_to_payload=True, | |
| ) | |
| print(f"ELEMENTS") | |
| except Exception as e: | |
| raise RuntimeError( | |
| f"❌ Failed to extract images from PDF: {str(e)}") | |
| file_elements = [element.to_dict() for element in elements] | |
| #{ | |
| # try: | |
| # with open(output_json_path, "w") as f: | |
| # json.dump([element.to_dict() | |
| # for element in elements], f, indent=4) | |
| # except Exception as e: | |
| # raise RuntimeError(f"❌ Failed to write extracted.json: {str(e)}") | |
| # try: | |
| # # Display extracted images | |
| # with open(output_json_path, 'r') as file: | |
| # file_elements = json.load(file) | |
| # except Exception as e: | |
| # raise RuntimeError(f"❌ Failed to read extracted.json: {str(e)}") | |
| # } | |
| sprite_count = 1 | |
| for el in file_elements: | |
| img_b64 = el["metadata"].get("image_base64") | |
| if not img_b64: | |
| continue | |
| manipulated_json[f"Sprite {sprite_count}"] = { | |
| # "id":auto_id, | |
| # "name": name, | |
| "base64": el["metadata"]["image_base64"], | |
| "file-path": pdf_id, | |
| # "description": description | |
| } | |
| sprite_count += 1 | |
| return manipulated_json | |
| except Exception as e: | |
| raise RuntimeError(f"❌ Error in extract_images_from_pdf: {str(e)}") | |
| # def similarity_matching(input_json_path: str, project_folder: str) -> str: | |
| def similarity_matching(sprites_data: str, project_folder: str) -> str: | |
| logger.info("🔍 Running similarity matching…") | |
| os.makedirs(project_folder, exist_ok=True) | |
| # ---------------------------------------- | |
| # CHANGED: define normalized base-paths so startswith() checks work | |
| backdrop_base_path = os.path.normpath(str(BACKDROP_DIR)) | |
| sprite_base_path = os.path.normpath(str(SPRITE_DIR)) | |
| code_blocks_path = os.path.normpath(str(CODE_BLOCKS_DIR)) | |
| # ---------------------------------------- | |
| project_json_path = os.path.join(project_folder, "project.json") | |
| # ============================== | |
| # READ SPRITE METADATA | |
| # ============================== | |
| # with open(input_json_path, 'r') as f: | |
| # sprites_data = json.load(f) | |
| sprite_ids, sprite_base64 = [], [] | |
| for sid, sprite in sprites_data.items(): | |
| sprite_ids.append(sid) | |
| # texts.append("This is " + sprite.get("description", sprite.get("name", ""))) | |
| sprite_base64.append(sprite["base64"]) | |
| sprite_images_bytes = [] | |
| for b64 in sprite_base64: | |
| img = Image.open(BytesIO(base64.b64decode(b64.split(",")[-1]))).convert("RGB") | |
| buffer = BytesIO() | |
| img.save(buffer, format="PNG") | |
| buffer.seek(0) | |
| sprite_images_bytes.append(buffer) | |
| # ========================================= | |
| # Build the list of all candidate images | |
| # ========================================= | |
| folder_image_paths = [ | |
| BACKDROP_DIR/"Baseball 2.sb3"/"7be1f5b3e682813dac1f297e52ff7dca.png", | |
| BACKDROP_DIR/"Beach Malibu.sb3"/"050615fe992a00d6af0e664e497ebf53.png", | |
| BACKDROP_DIR/"Bedroom 3.sb3"/"8cc0b88d53345b3e337e8f028a32a4e7.png", | |
| BACKDROP_DIR/"Blue Sky.sb3"/"e7c147730f19d284bcd7b3f00af19bb6.png", | |
| BACKDROP_DIR/"Castle 2.sb3"/"951765ee7f7370f120c9df20b577c22f.png", | |
| BACKDROP_DIR/"Colorful City.sb3"/"04d18ddd1b85f0ea30beb14b8da49f60.png", | |
| BACKDROP_DIR/"Hall.sb3"/"ea86ca30b346f27ca5faf1254f6a31e3.png", | |
| BACKDROP_DIR/"Jungle.sb3"/"f4f908da19e2753f3ed679d7b37650ca.png", | |
| BACKDROP_DIR/"Soccer.sb3"/"04a63154f04b09494354090f7cc2f1b9.png", | |
| BACKDROP_DIR/"Theater.sb3"/"c2b097bc5cdb6a14ef5485202bc5ee76.png", | |
| SPRITE_DIR/"Batter.sprite3"/"592ee9ab2aeefe65cb4fb95fcd046f33.png", | |
| SPRITE_DIR/"Batter.sprite3"/"9d193bef6e3d6d8eba6d1470b8bf9351.png", | |
| SPRITE_DIR/"Batter.sprite3"/"baseball_sprite_motion_1.png", | |
| SPRITE_DIR/"Batter.sprite3"/"bd4fc003528acfa847e45ff82f346eee.png", | |
| SPRITE_DIR/"Batter.sprite3"/"fdfde4bcbaca0f68e83fdf3f4ef0c660.png", | |
| SPRITE_DIR/"Bear.sprite3"/"6f303e972f33fcb7ef36d0d8012d0975.png", | |
| SPRITE_DIR/"Bear.sprite3"/"bear_motion_2.png", | |
| SPRITE_DIR/"Bear.sprite3"/"deef1eaa96d550ae6fc11524a1935024.png", | |
| SPRITE_DIR/"Beetle.sprite3"/"46d0dfd4ae7e9bfe3a6a2e35a4905eae.png", | |
| SPRITE_DIR/"Butterfly 1.sprite3"/"34b76c1835c6a7fc2c47956e49bb0f52.png", | |
| SPRITE_DIR/"Butterfly 1.sprite3"/"49c9f952007d870a046cff93b6e5e098.png", | |
| SPRITE_DIR/"Butterfly 1.sprite3"/"fe98df7367e314d9640bfaa54fc239be.png", | |
| SPRITE_DIR/"Cat.sprite3"/"0fb9be3e8397c983338cb71dc84d0b25.png", | |
| SPRITE_DIR/"Cat.sprite3"/"bcf454acf82e4504149f7ffe07081dbc.png", | |
| SPRITE_DIR/"Centaur.sprite3"/"2373556e776cad3ba4d6ee04fc34550b.png", | |
| SPRITE_DIR/"Centaur.sprite3"/"c00ffa6c5dd0baf9f456b897ff974377.png", | |
| SPRITE_DIR/"Centaur.sprite3"/"d722329bd9373ad80625e5be6d52f3ed.png", | |
| SPRITE_DIR/"Centaur.sprite3"/"d7aa990538915b7ef1f496d7e8486ade.png", | |
| SPRITE_DIR/"City Bus.sprite3"/"7d7e26014a346b894db8ab1819f2167f.png", | |
| SPRITE_DIR/"City Bus.sprite3"/"e9694adbff9422363e2ea03166015393.png", | |
| SPRITE_DIR/"Crab.sprite3"/"49839aa1b0feed02a3c759db5f8dee71.png", | |
| SPRITE_DIR/"Crab.sprite3"/"bear_element.png", | |
| SPRITE_DIR/"Crab.sprite3"/"f7cdd2acbc6d7559d33be8675059c79e.png", | |
| SPRITE_DIR/"Glow-G.sprite3"/"56839bc48957869d980c6f9b6f5a2a91.png", | |
| SPRITE_DIR/"Jordyn.sprite3"/"00c8c464c19460df693f8d5ae69afdab.png", | |
| SPRITE_DIR/"Jordyn.sprite3"/"768c4601174f0dfcb96b3080ccc3a192.png", | |
| SPRITE_DIR/"Jordyn.sprite3"/"a7cc1e5f02b58ecc8095cfc18eef0289.png", | |
| SPRITE_DIR/"Jordyn.sprite3"/"db4d97cbf24e2b8af665bfbf06f67fa0.png", | |
| SPRITE_DIR/"Soccer Ball.sprite3"/"5d973d7a3a8be3f3bd6e1cd0f73c32b5.png", | |
| SPRITE_DIR/"Soccer Ball.sprite3"/"cat_football.png", | |
| SPRITE_DIR/"Star.sprite3"/"551629f2a64c1f3703e57aaa133effa6.png", | |
| SPRITE_DIR/"Wizard.sprite3"/"55ba51188af86ca16ef30267e874c1ed.png", | |
| SPRITE_DIR/"Wizard.sprite3"/"91d495085eb4d02a375c42f6318071e7.png", | |
| SPRITE_DIR/"Wizard.sprite3"/"df943c9894ee4b9df8c5893ce30c2a5f.png", | |
| # CODE_BLOCKS_DIR/"client_code_block_1.jpg", | |
| # CODE_BLOCKS_DIR/"client_code_block_2.jpg", | |
| CODE_BLOCKS_DIR/"script1.JPG", | |
| CODE_BLOCKS_DIR/"script2.JPG", | |
| CODE_BLOCKS_DIR/"script3.JPG", | |
| CODE_BLOCKS_DIR/"script4.JPG", | |
| CODE_BLOCKS_DIR/"script5.JPG", | |
| CODE_BLOCKS_DIR/"script6.JPG", | |
| CODE_BLOCKS_DIR/"script7.JPG", | |
| CODE_BLOCKS_DIR/"script8.JPG", | |
| CODE_BLOCKS_DIR/"script9.JPG", | |
| CODE_BLOCKS_DIR/"static_white.png"] | |
| folder_image_paths = [os.path.normpath(str(p)) for p in folder_image_paths] | |
| # ========================================= | |
| # ----------------------------------------- | |
| # Load reference embeddings from JSON | |
| # ----------------------------------------- | |
| with open(f"{BLOCKS_DIR}/dinov2_embeddings.json", "r") as f: | |
| embedding_json = json.load(f) | |
| # ========================================= | |
| # Decode & embed each sprite image | |
| # ========================================= | |
| # sprite_features = [] | |
| # for b64 in sprite_base64: | |
| # if "," in b64: | |
| # b64 = b64.split(",", 1)[1] | |
| # img_bytes = base64.b64decode(b64) | |
| # pil_img = Image.open(BytesIO(img_bytes)).convert("RGB") | |
| # buf = BytesIO() | |
| # pil_img.save(buf, format="PNG") | |
| # buf.seek(0) | |
| # feats = clip_embd.embed_image([buf])[0] | |
| # sprite_features.append(feats) | |
| # ============================== # | |
| # EMBED SPRITE IMAGES # | |
| # ============================== # | |
| # ensure model is initialized (fast no-op after first call) | |
| init_dinov2() | |
| # embed the incoming sprite BytesIO images (same data structure you already use) | |
| sprite_matrix = embed_bytesio_list(sprite_images_bytes, batch_size=8) # shape (N, D) | |
| # load reference embeddings from JSON (they must be numeric lists) | |
| img_matrix = np.array([img["embeddings"] for img in embedding_json], dtype=np.float32) | |
| # normalize both sides (important — stored embeddings may not be normalized) | |
| sprite_matrix = l2_normalize_rows(sprite_matrix) | |
| img_matrix = l2_normalize_rows(img_matrix) | |
| # ========================================= | |
| # Compute similarities & pick best match | |
| # ========================================= | |
| similarity = np.matmul(sprite_matrix, img_matrix.T) | |
| most_similar_indices = np.argmax(similarity, axis=1) | |
| # ========================================= | |
| # Copy matched sprite assets + collect data | |
| # ========================================= | |
| project_data = [] | |
| copied_folders = set() | |
| for sprite_idx, matched_idx in enumerate(most_similar_indices): | |
| matched_image_path = folder_image_paths[matched_idx] | |
| matched_folder = os.path.dirname(matched_image_path) | |
| # CHANGED: use our new normalized sprite_base_path | |
| if not matched_folder.startswith(sprite_base_path): | |
| continue | |
| if matched_folder in copied_folders: | |
| continue | |
| copied_folders.add(matched_folder) | |
| logger.info(f"Matched sprite: {matched_image_path}") | |
| sprite_json_path = os.path.join(matched_folder, 'sprite.json') | |
| if not os.path.exists(sprite_json_path): | |
| logger.warning(f"No sprite.json in {matched_folder}") | |
| continue | |
| with open(sprite_json_path, 'r') as f: | |
| sprite_info = json.load(f) | |
| # copy all non‐matched files | |
| for fname in os.listdir(matched_folder): | |
| if fname in (os.path.basename(matched_image_path), 'sprite.json'): | |
| continue | |
| shutil.copy2(os.path.join(matched_folder, fname), | |
| os.path.join(project_folder, fname)) | |
| project_data.append(sprite_info) | |
| # ========================================= | |
| # Copy matched backdrop assets + collect | |
| # ========================================= | |
| backdrop_data = [] | |
| copied_backdrop_folders = set() | |
| for backdrop_idx, matched_idx in enumerate(most_similar_indices): | |
| matched_image_path = folder_image_paths[matched_idx] | |
| matched_folder = os.path.dirname(matched_image_path) | |
| matched_filename = os.path.basename(matched_image_path) | |
| # CHANGED: use our new normalized backdrop_base_path | |
| if not matched_folder.startswith(backdrop_base_path): | |
| continue | |
| # skip if backdrop folder already processed | |
| if matched_folder in copied_backdrop_folders: | |
| continue | |
| copied_backdrop_folders.add(matched_folder) | |
| logger.info(f"Matched backdrop: {matched_image_path}") | |
| # 1) Copy the matched backdrop image itself | |
| try: | |
| shutil.copy2( | |
| matched_image_path, | |
| os.path.join(project_folder, matched_filename) | |
| ) | |
| logger.info(f"✅ Copied matched backdrop image {matched_filename} to {project_folder}") | |
| except Exception as e: | |
| logger.error(f"❌ Failed to copy matched backdrop {matched_image_path}: {e}") | |
| # copy non‐matched files | |
| for fname in os.listdir(matched_folder): | |
| # if fname in (os.path.basename(matched_image_path), 'project.json'): | |
| if fname in {matched_filename, 'project.json'}: | |
| continue | |
| # shutil.copy2(os.path.join(matched_folder, fname), | |
| # os.path.join(project_folder, fname)) | |
| src = os.path.join(matched_folder, fname) | |
| dst = os.path.join(project_folder, fname) | |
| if os.path.isfile(src): | |
| try: | |
| shutil.copy2(src, dst) | |
| logger.info(f"Copied additional backdrop asset {fname} to project folder") | |
| except Exception as e: | |
| logger.error(f"Failed to copy {src}: {e}") | |
| # append the stage‐target from its project.json | |
| pj = os.path.join(matched_folder, 'project.json') | |
| if os.path.exists(pj): | |
| with open(pj, 'r') as f: | |
| bd_json = json.load(f) | |
| for tgt in bd_json.get("targets", []): | |
| if tgt.get("isStage"): | |
| backdrop_data.append(tgt) | |
| else: | |
| logger.warning(f"No project.json in {matched_folder}") | |
| # ========================================= | |
| # Merge into final Scratch project.json | |
| # ========================================= | |
| final_project = { | |
| "targets": [], "monitors": [], "extensions": [], | |
| "meta": { | |
| "semver": "3.0.0", | |
| "vm": "11.3.0", | |
| "agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36" | |
| } | |
| } | |
| # sprites first | |
| for spr in project_data: | |
| if not spr.get("isStage", False): | |
| final_project["targets"].append(spr) | |
| # then backdrop as the Stage | |
| if backdrop_data: | |
| all_costumes, sounds = [], [] | |
| seen_costumes = set() | |
| for i, bd in enumerate(backdrop_data): | |
| for costume in bd.get("costumes", []): | |
| # Create a unique key for the costume | |
| key = (costume.get("name"), costume.get("assetId")) | |
| if key not in seen_costumes: | |
| seen_costumes.add(key) | |
| all_costumes.append(costume) | |
| if i == 0: | |
| sounds = bd.get("sounds", []) | |
| stage_obj={ | |
| "isStage": True, | |
| "name": "Stage", | |
| "objName": "Stage", | |
| "variables": {}, | |
| "lists": {}, | |
| "broadcasts": {}, | |
| "blocks": {}, | |
| "comments": {}, | |
| "currentCostume": 1 if len(all_costumes) > 1 else 0, | |
| "costumes": all_costumes, | |
| "sounds": sounds, | |
| "volume": 100, | |
| "layerOrder": 0, | |
| "tempo": 60, | |
| "videoTransparency": 50, | |
| "videoState": "on", | |
| "textToSpeechLanguage": None | |
| } | |
| final_project["targets"].insert(0, stage_obj) | |
| else: | |
| logger.warning("⚠️ No backdrop matched. Using default static backdrop.") | |
| default_backdrop_path = BACKDROP_DIR / "cd21514d0531fdffb22204e0ec5ed84a.svg" | |
| default_backdrop_name = "cd21514d0531fdffb22204e0ec5ed84a.svg" | |
| default_backdrop_sound = BACKDROP_DIR / "83a9787d4cb6f3b7632b4ddfebf74367.wav" | |
| default_backdrop_sound_name = "cd21514d0531fdffb22204e0ec5ed84a.svg" | |
| try: | |
| shutil.copy2(default_backdrop_path, os.path.join(project_folder, default_backdrop_name)) | |
| logger.info(f"✅ Default backdrop copied to project: {default_backdrop_name}") | |
| shutil.copy2(default_backdrop_sound, os.path.join(project_folder, default_backdrop_sound_name)) | |
| logger.info(f"✅ Default backdrop sound copied to project: {default_backdrop_sound_name}") | |
| except Exception as e: | |
| logger.error(f"❌ Failed to copy default backdrop: {e}") | |
| stage_obj={ | |
| "isStage": True, | |
| "name": "Stage", | |
| "objName": "Stage", | |
| "variables": {}, | |
| "lists": {}, | |
| "broadcasts": {}, | |
| "blocks": {}, | |
| "comments": {}, | |
| "currentCostume": 0, | |
| "costumes": [ | |
| { | |
| "assetId": default_backdrop_name.split(".")[0], | |
| "name": "defaultBackdrop", | |
| "md5ext": default_backdrop_name, | |
| "dataFormat": "svg", | |
| "rotationCenterX": 240, | |
| "rotationCenterY": 180 | |
| } | |
| ], | |
| "sounds": [ | |
| { | |
| "name": "pop", | |
| "assetId": "83a9787d4cb6f3b7632b4ddfebf74367", | |
| "dataFormat": "wav", | |
| "format": "", | |
| "rate": 48000, | |
| "sampleCount": 1123, | |
| "md5ext": "83a9787d4cb6f3b7632b4ddfebf74367.wav" | |
| } | |
| ], | |
| "volume": 100, | |
| "layerOrder": 0, | |
| "tempo": 60, | |
| "videoTransparency": 50, | |
| "videoState": "on", | |
| "textToSpeechLanguage": None | |
| } | |
| final_project["targets"].insert(0, stage_obj) | |
| with open(project_json_path, 'w') as f: | |
| json.dump(final_project, f, indent=2) | |
| return project_json_path | |
| def convert_pdf_stream_to_images(pdf_stream: io.BytesIO, dpi=300): | |
| # Ensure we are at the start of the stream | |
| pdf_stream.seek(0) | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as tmp_pdf: | |
| tmp_pdf.write(pdf_stream.read()) | |
| tmp_pdf_path = tmp_pdf.name | |
| # Now use convert_from_path on the temp file | |
| images = convert_from_path(tmp_pdf_path, dpi=dpi) | |
| return images | |
| def delay_for_tpm_node(state: GameState): | |
| logger.info("--- Running DelayForTPMNode ---") | |
| time.sleep(10) # Adjust the delay as needed | |
| logger.info("Delay completed.") | |
| return state | |
| # Build the LangGraph workflow | |
| workflow = StateGraph(GameState) | |
| # Add all nodes to the workflow | |
| # workflow.add_node("time_delay_1", delay_for_tpm_node) | |
| # workflow.add_node("time_delay_2", delay_for_tpm_node) | |
| # workflow.add_node("time_delay_3", delay_for_tpm_node) | |
| # workflow.add_node("time_delay_4", delay_for_tpm_node) | |
| workflow.add_node("pseudo_generator", pseudo_generator_node) | |
| #workflow.add_node("plan_generator", overall_planner_node) | |
| workflow.add_node("Node_optimizer", node_optimizer) | |
| workflow.add_node("layer_optimizer", layer_order_correction) | |
| #workflow.add_node("logic_alignment", plan_logic_aligner_node) | |
| #workflow.add_node("plan_verifier", plan_verification_node) | |
| #workflow.add_node("refined_planner", refined_planner_node) # Refines the action plan | |
| #workflow.add_node("opcode_counter", plan_opcode_counter_node) | |
| workflow.add_node("block_builder", overall_block_builder_node_2) | |
| workflow.add_node("variable_initializer", variable_adder_node) | |
| workflow.add_node("page_processed", processed_page_node) | |
| workflow.set_entry_point("page_processed") | |
| # Conditional branching from the start | |
| def decide_next_step(state: GameState): | |
| if state.get("processing", False): | |
| return "pseudo_generator" | |
| else: | |
| return "layer_optimizer"#END | |
| workflow.add_conditional_edges( | |
| "page_processed", | |
| decide_next_step, | |
| { | |
| "pseudo_generator": "pseudo_generator", | |
| "layer_optimizer": "layer_optimizer" | |
| #str(END): END # str(END) is '__end__' | |
| } | |
| ) | |
| # Main chain | |
| # workflow.add_edge("pseudo_generator", "time_delay_1") | |
| workflow.add_edge("pseudo_generator", "Node_optimizer") | |
| # workflow.add_edge("time_delay_1", "plan_generator") | |
| # workflow.add_edge("plan_generator", "time_delay_2") | |
| # workflow.add_edge("time_delay_2", "refined_planner") | |
| # workflow.add_edge("refined_planner", "time_delay_3") | |
| # workflow.add_edge("time_delay_3", "opcode_counter") | |
| workflow.add_edge("Node_optimizer", "block_builder") | |
| workflow.add_edge("block_builder", "variable_initializer") | |
| # After last node, check again | |
| workflow.add_edge("variable_initializer", "page_processed") | |
| workflow.add_edge("layer_optimizer", END) | |
| app_graph = workflow.compile() | |
| # ============== Helper function to Upscale an Image ============== # | |
| def upscale_image(image: Image.Image, scale: int = 2) -> Image.Image: | |
| """ | |
| Upscales a PIL image by a given scale factor. | |
| """ | |
| try: | |
| width, height = image.size | |
| new_size = (width * scale, height * scale) | |
| upscaled_image = image.resize(new_size, Image.LANCZOS) | |
| logger.info(f"✅ Upscaled image to {new_size}") | |
| return upscaled_image | |
| except Exception as e: | |
| logger.error(f"❌ Error during image upscaling: {str(e)}") | |
| return image | |
| def create_sb3_archive(project_folder, project_id): | |
| """ | |
| Zips the project folder and renames it to an .sb3 file. | |
| Args: | |
| project_folder (str): The path to the directory containing the project.json and assets. | |
| project_id (str): The unique ID for the project, used for naming the .sb3 file. | |
| Returns: | |
| str: The path to the created .sb3 file, or None if an error occurred. | |
| """ | |
| print(" --------------------------------------- create_sb3_archive INITIALIZE ---------------------------------------") | |
| # output_filename = os.path.join("outputs", project_id) | |
| #output_filename = OUTPUT_DIR / project_id | |
| output_filename = GEN_PROJECT_DIR / project_id | |
| print(" --------------------------------------- output_filename ---------------------------------------",output_filename) | |
| zip_path = None | |
| sb3_path = None | |
| try: | |
| zip_path = shutil.make_archive(output_filename, 'zip', root_dir=project_folder) | |
| print(" --------------------------------------- zip_path_str ---------------------------------------", output_filename, project_folder) | |
| logger.info(f"Project folder zipped to: {zip_path}") | |
| # 2. Rename the .zip file to .sb3 | |
| sb3_path = f"{output_filename}.sb3" | |
| os.rename(zip_path, sb3_path) | |
| print(" --------------------------------------- rename paths ---------------------------------------", zip_path, sb3_path) | |
| logger.info(f"Renamed {zip_path} to {sb3_path}") | |
| return sb3_path | |
| except Exception as e: | |
| logger.error(f"Error creating SB3 archive for {project_id}: {e}") | |
| # Clean up any partial files if an error occurs | |
| if zip_path and os.path.exists(zip_path): | |
| os.remove(zip_path) | |
| if sb3_path and os.path.exists(sb3_path): | |
| os.remove(sb3_path) | |
| return sb3_path | |
| #{ changes -> pdf_stream replacement of pdf_path | |
| # def save_pdf_to_generated_dir(pdf_path: str, project_id: str) -> str: | |
| def save_pdf_to_generated_dir(pdf_stream: io.BytesIO, project_id: str) -> str: | |
| """ | |
| Copies the PDF at `pdf_stream` into GEN_PROJECT_DIR/project_id/, | |
| renaming it to <project_id>.pdf. | |
| Args: | |
| pdf_stream (io.BytesIO): Any existing stream to a PDF file. | |
| project_id (str): Your unique project identifier. | |
| Returns: | |
| str: Path to the copied PDF in the generated directory, | |
| or None if something went wrong. | |
| """ | |
| # } | |
| try: | |
| # 1) Build the destination directory and base filename | |
| output_dir = GEN_PROJECT_DIR / project_id | |
| output_dir.mkdir(parents=True, exist_ok=True) | |
| print(f"\n--------------------------------output_dir {output_dir}") | |
| # 2) Define the target PDF path | |
| target_pdf = output_dir / f"{project_id}.pdf" | |
| print(f"\n--------------------------------target_pdf {target_pdf}") | |
| # 3) Copy the PDF | |
| # { | |
| # shutil.copy2(pdf_path, target_pdf) | |
| if isinstance(pdf_stream, io.BytesIO): | |
| with open(target_pdf, "wb") as f: | |
| f.write(pdf_stream.getbuffer()) | |
| else: | |
| shutil.copy2(pdf_stream, target_pdf) | |
| print(f"Copied PDF from {pdf_stream} → {target_pdf}") | |
| logger.info(f"Copied PDF from {pdf_stream} → {target_pdf}") | |
| # } | |
| return str(target_pdf) | |
| except Exception as e: | |
| logger.error(f"Failed to save PDF to generated dir: {e}", exc_info=True) | |
| return None | |
| def index(): | |
| return render_template('app_index.html') | |
| def download_sb3(project_id): | |
| sb3_path = GEN_PROJECT_DIR / f"{project_id}.sb3" | |
| if not sb3_path.exists(): | |
| return jsonify({"error": "Scratch project file not found"}), 404 | |
| return send_file( | |
| sb3_path, | |
| as_attachment=True, | |
| download_name=sb3_path.name | |
| ) | |
| def download_pdf(project_id): | |
| pdf_path = GEN_PROJECT_DIR / project_id / f"{project_id}.pdf" | |
| if not pdf_path.exists(): | |
| return jsonify({"error": "Scratch project pdf file not found"}), 404 | |
| return send_file( | |
| pdf_path, | |
| as_attachment=True, | |
| download_name=pdf_path.name | |
| ) | |
| def download_sound(sound_id): | |
| sound_path = SOUND_DIR / f"{sound_id}.wav" | |
| if not sound_path.exists(): | |
| return jsonify({"error": "Scratch project sound file not found"}), 404 | |
| return send_file( | |
| sound_path, | |
| as_attachment=True, | |
| download_name=sound_path.name | |
| ) | |
| # API endpoint | |
| def process_pdf(): | |
| try: | |
| logger.info("Received request to process PDF.") | |
| if 'pdf_file' not in request.files: | |
| logger.warning("No PDF file found in request.") | |
| return jsonify({"error": "Missing PDF file in form-data with key 'pdf_file'"}), 400 | |
| pdf_file = request.files['pdf_file'] | |
| if pdf_file.filename == '': | |
| return jsonify({"error": "Empty filename"}), 400 | |
| # ================================================= # | |
| # Generate Random UUID for project folder name # | |
| # ================================================= # | |
| project_id = str(uuid.uuid4()).replace('-', '') | |
| # project_folder = os.path.join("outputs", f"{project_id}") | |
| project_folder = OUTPUT_DIR / project_id | |
| # =========================================================================== # | |
| # Create empty json in project_{random_id} folder # | |
| # =========================================================================== # | |
| #os.makedirs(project_folder, exist_ok=True) | |
| # { | |
| # Save the uploaded PDF temporarily | |
| # filename = secure_filename(pdf_file.filename) | |
| # temp_dir = tempfile.mkdtemp() | |
| # saved_pdf_path = os.path.join(temp_dir, filename) | |
| # pdf_file.save(saved_pdf_path) | |
| # pdf_doc = saved_pdf_path | |
| pdf_bytes = pdf_file.read() | |
| pdf_stream = io.BytesIO(pdf_bytes) | |
| logger.info(f"Saved uploaded PDF to: {pdf_stream}") | |
| # pdf= save_pdf_to_generated_dir(saved_pdf_path, project_id) | |
| start_time = time.time() | |
| pdf= save_pdf_to_generated_dir(pdf_stream, project_id) | |
| # logger.info(f"Created project folder: {project_folder}") | |
| # logger.info(f"Saved uploaded PDF to: {saved_pdf_path}") | |
| logger.info(f"Saved uploaded PDF to: {pdf_file}: {pdf}") | |
| print("--------------------------------pdf_file_path---------------------",pdf_file,pdf_stream) | |
| total_time = time.time() - start_time | |
| print(f"-----------------------------Execution Time save_pdf_to_generated_dir() : {total_time}-----------------------------\n") | |
| # } | |
| # Save uploaded file to disk | |
| # pdf_path = os.path.join("/tmp", secure_filename(file.filename)) | |
| # file.save(pdf_path) | |
| # compressed_pages = pdf_to_images_with_size_check(pdf_path, "/tmp/compressed_pages", size_limit_mb=4) | |
| # { | |
| # Extract & process | |
| # output_path, result = extract_images_from_pdf(saved_pdf_path) | |
| start_time = time.time() | |
| output_path = extract_images_from_pdf(pdf_stream) | |
| print(" --------------------------------------- zip_path_str ---------------------------------------", output_path) | |
| total_time = time.time() - start_time | |
| print(f"-----------------------------Execution Time extract_images_from_pdf() : {total_time}-----------------------------\n") | |
| # } | |
| # Check extracted_sprites.json for "scratch block" in any 'name' | |
| # extracted_dir = os.path.join(JSON_DIR, os.path.splitext(filename)[0]) | |
| # extracted_sprites_json = os.path.join(extracted_dir, "extracted_sprites.json") | |
| # if not os.path.exists(extracted_sprites_json): | |
| # return jsonify({"error": "No extracted_sprites.json found"}), 500 | |
| # with open(extracted_sprites_json, 'r') as f: | |
| # sprite_data = json.load(f) | |
| start_time = time.time() | |
| project_output = similarity_matching(output_path, project_folder) | |
| logger.info("Received request to process PDF.") | |
| total_time = time.time() - start_time | |
| print(f"-----------------------------Execution Time similarity_matching() : {total_time}-----------------------------\n") | |
| with open(project_output, 'r') as f: | |
| project_skeleton = json.load(f) | |
| # images = convert_from_path(pdf_stream, dpi=300) | |
| # print(type) | |
| # page = images[0] | |
| # # img_base64 = base64.b64encode(images).decode("utf-8") | |
| # buf = BytesIO() | |
| # page.save(buf, format="PNG") | |
| # img_bytes = buf.getvalue() | |
| # img_b64 = base64.b64encode(img_bytes).decode("utf-8") | |
| #image_paths = await convert_pdf_to_images_async(saved_pdf_path) | |
| # images = convert_bytes_to_image(pdf_stream, dpi=250) | |
| # print("PDF converted to images:", images) | |
| if isinstance(pdf_stream, io.BytesIO): | |
| images = convert_pdf_stream_to_images(pdf_stream, dpi=300) | |
| else: | |
| images = convert_from_path(pdf_stream, dpi=300) | |
| #updating logic here [Dev Patel] | |
| initial_state_dict = { | |
| "project_json": project_skeleton, | |
| "description": "The pseudo code for the script", | |
| "project_id": project_id, | |
| # "project_image": img_b64, | |
| "project_image": images, | |
| "action_plan": {}, | |
| "pseudo_code": {}, | |
| "temporary_node": {}, | |
| "processing":True, | |
| "page_count": 0, | |
| "temp_pseudo_code":[], | |
| } | |
| #final_state_dict = app_graph.invoke(initial_state_dict) # Pass dictionary | |
| final_state_dict = app_graph.invoke(initial_state_dict,config={"recursion_limit": 200}) | |
| final_project_json = final_state_dict['project_json'] # Access as dict | |
| # final_project_json = project_skeleton | |
| # Save the *final* filled project JSON, overwriting the skeleton | |
| with open(project_output, "w") as f: | |
| json.dump(final_project_json, f, indent=2) | |
| logger.info(f"Final project JSON saved to {project_output}") | |
| # --- Call the new function to create the .sb3 file --- | |
| sb3_file_path = create_sb3_archive(project_folder, project_id) | |
| if sb3_file_path: | |
| logger.info(f"Successfully created SB3 file: {sb3_file_path}") | |
| # Instead of returning the local path, return a URL to the download endpoint | |
| download_url = f"https://prthm11-scratch-vision-game.hf.space/download_sb3/{project_id}" | |
| pdf_url = f"https://prthm11-scratch-vision-game.hf.space/download_pdf/{project_id}" | |
| print(f"DOWNLOAD_URL: {download_url}") | |
| print(f"PDF_URL: {pdf_url}") | |
| # return jsonify({"message": "Procesed PDF and Game sb3 generated successfully", "project_id": project_id, "download_url": download_url}) | |
| return jsonify({ | |
| "message": "✅ PDF processed successfully", | |
| "isError": False, | |
| "output_json": "output_path", | |
| "sprites": "result", | |
| "project_output_json": "project_output", | |
| "test_url": download_url | |
| }) | |
| else: | |
| return jsonify({ | |
| "message": "❌ Scanned images are not clear please retry!", | |
| "isError": True, | |
| "output_json": "output_path", | |
| "sprites": "result", | |
| "project_output_json": "project_output", | |
| "test_url": download_url | |
| }), 500 | |
| except Exception as e: | |
| logger.error(f"Error during processing the pdf workflow for project ID {project_id}: {e}", exc_info=True) | |
| return jsonify({ | |
| "message": "❌ Scanned images are not clear please retry!", | |
| "isError": True, | |
| "output_json": "output_path", | |
| "sprites": "result", | |
| "project_output_json": "project_output", | |
| "test_url": "download_url" | |
| }), 500 | |
| if __name__ == '__main__': | |
| # os.makedirs("outputs", exist_ok=True) #== commented by P | |
| app.run(host='0.0.0.0', port=7860, debug=True) |