prthm11's picture
Update app.py
fbcc670 verified
raw
history blame
101 kB
from flask import Flask, request, jsonify, render_template, send_from_directory, send_file
import cv2, json,base64,io,os,tempfile,logging, re
import numpy as np
from unstructured.partition.pdf import partition_pdf
from PIL import Image
# from imutils.perspective import four_point_transform
from dotenv import load_dotenv
import pytesseract
from werkzeug.utils import secure_filename
from langchain_groq import ChatGroq
from langgraph.prebuilt import create_react_agent
from pdf2image import convert_from_path, convert_from_bytes
from concurrent.futures import ThreadPoolExecutor
from pdf2image.exceptions import PDFInfoNotInstalledError
from typing import Dict, TypedDict, Optional, Any
from langgraph.graph import StateGraph, END
import uuid
import shutil, time, functools
from langchain_experimental.open_clip.open_clip import OpenCLIPEmbeddings
from langchain_core.utils.utils import secret_from_env
# from matplotlib.offsetbox import OffsetImage, AnnotationBbox
from io import BytesIO
from pathlib import Path
import os
from utils.block_relation_builder import block_builder, separate_scripts, transform_logic_to_action_flow, analyze_opcode_counts
from langchain.chat_models import ChatOpenAI
from langchain_openai import ChatOpenAI
from pydantic import Field, SecretStr
from difflib import get_close_matches
import torch
from transformers import AutoImageProcessor, AutoModel
# --- Config (tune threads as needed) ---
DINOV2_MODEL = "facebook/dinov2-small" # small = best CPU latency/quality tradeoff
DEVICE = torch.device("cpu")
torch.set_num_threads(4) # tune for your CPU
# --- Globals for single-shot model load ---
_dinov2_processor = None
_dinov2_model = None
# os.environ["OPENROUTER_API_KEY"] = os.getenv("OPENROUTER_API_KEY", "default_key_or_placeholder")
# class ChatOpenRouter(ChatOpenAI):
# openai_api_key: Optional[SecretStr] = Field(
# alias="api_key",
# default_factory=secret_from_env("OPENROUTER_API_KEY", default=None),
# )
# @property
# def lc_secrets(self) -> dict[str, str]:
# return {"openai_api_key": "OPENROUTER_API_KEY"}
# def __init__(self,
# openai_api_key: Optional[str] = None,
# **kwargs):
# openai_api_key = (
# openai_api_key or os.environ.get("OPENROUTER_API_KEY")
# )
# super().__init__(
# base_url="https://openrouter.ai/api/v1",
# openai_api_key=openai_api_key,
# **kwargs
# )
# llm2 = ChatOpenRouter(
# #model_name="deepseek/deepseek-r1-0528:free",
# #model_name="google/gemini-2.0-flash-exp:free",
# #model_name="deepseek/deepseek-v3-base:free",
# model_name="deepseek/deepseek-r1:free"
# )
def log_execution_time(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
start_time = time.time()
result = func(*args, **kwargs)
end_time = time.time()
logger.info(f"⏱ {func.__name__} executed in {end_time - start_time:.2f} seconds")
return result
return wrapper
# global pdf_doc
# ============================== #
# INITIALIZE CLIP EMBEDDER #
# ============================== #
# clip_embd = OpenCLIPEmbeddings()
# Configure logging
logging.basicConfig(
level=logging.DEBUG, # Use INFO or ERROR in production
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[
logging.FileHandler("/app/logs/app.log"),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
load_dotenv()
# os.environ["GROQ_API_KEY"] = os.getenv("GROQ_API_KEY")
groq_api_key = os.getenv("GROQ_API_KEY")
llm = ChatGroq(
model="meta-llama/llama-4-scout-17b-16e-instruct",
temperature=0,
max_tokens=None,
)
app = Flask(__name__)
# ============================== #
# TESSERACT CONFIGURATION #
# ============================== #
# Set the Tesseract executable path
# pytesseract.pytesseract.tesseract_cmd = r"C:\Program Files\Tesseract-OCR\tesseract.exe"
pytesseract.pytesseract.tesseract_cmd = (r'/usr/bin/tesseract')
# Set the TESSDATA_PREFIX environment variable to the directory containing the 'tessdata' folder
# This is crucial for Tesseract to find its language data files (e.g., eng.traineddata)
# os.environ['TESSDATA_PREFIX'] = r'C:\Program Files\Tesseract-OCR'
# poppler_path = r"C:\poppler\Library\bin"
backdrop_images_path = r"app\blocks\Backdrops"
sprite_images_path = r"app\blocks\sprites"
code_blocks_image_path = r"app\blocks\code_blocks"
count = 0
BASE_DIR = Path("/app")
BLOCKS_DIR = BASE_DIR / "blocks"
STATIC_DIR = BASE_DIR / "static"
GEN_PROJECT_DIR = BASE_DIR / "generated_projects"
BACKDROP_DIR = BLOCKS_DIR / "Backdrops"
SPRITE_DIR = BLOCKS_DIR / "sprites"
CODE_BLOCKS_DIR = BLOCKS_DIR / "code_blocks"
SOUND_DIR = BLOCKS_DIR / "sound"
# === new: outputs rooted under BASE_DIR ===
OUTPUT_DIR = BASE_DIR / "outputs"
# DETECTED_IMAGE_DIR = OUTPUT_DIR / "DETECTED_IMAGE"
# SCANNED_IMAGE_DIR = OUTPUT_DIR / "SCANNED_IMAGE"
# JSON_DIR = OUTPUT_DIR / "EXTRACTED_JSON"
# make all of them in one go
for d in (
BLOCKS_DIR,
STATIC_DIR,
GEN_PROJECT_DIR,
BACKDROP_DIR,
SPRITE_DIR,
CODE_BLOCKS_DIR,
OUTPUT_DIR,
# DETECTED_IMAGE_DIR,
# SCANNED_IMAGE_DIR,
# JSON_DIR,
):
d.mkdir(parents=True, exist_ok=True)
# class GameState(TypedDict):
# project_json: dict
# description: str
# project_id: str
# project_image: str
# pseudo_code: dict
# action_plan: Optional[Dict]
# temporary_node: Optional[Dict]
class GameState(TypedDict):
project_json: dict
description: str
project_id: str
project_image: str
pseudo_code: dict
action_plan: Optional[Dict]
temporary_node: Optional[Dict]
page_count: int
processing: bool
temp_pseudo_code: list
# Refined SYSTEM_PROMPT with more explicit Scratch JSON rules, especially for variables
SYSTEM_PROMPT = """
You are GameScratchAgent, an expert in Scratch 3.0 game development.
Your task is to process OCR-extracted text from images of Scratch 3.0 code blocks and produce **precisely formatted pseudocode JSON**.
### Core Role
- Treat this as an OCR refinement task: the input may contain typos, spacing issues, or incomplete tokens.
- Intelligently correct such OCR mistakes to align with valid Scratch 3.0 block syntax.
- Always generate logically valid pseudocode consistent with Scratch semantics.
### Responsibilities
1. **Code-block detection**
- If no Scratch code-blocks are detected → return `"No Code-blocks"`.
- If code-blocks are present → refine into pseudocode.
2. **Script ownership**
- Extract the value from `"Script for:"`.
- If it exactly matches any costume in `Stage_costumes`, set `"name_variable": "Stage"`.
- Otherwise, keep the detected target name.
3. **Pseudocode refinement**
- Correct OCR artifacts automatically (e.g., “when cliked” → “when green flag clicked”).
- Apply strict Scratch rules for variables, values, dropdowns, reporters, and booleans.
- Ensure indentation of nested blocks (4 spaces).
- Every hat block must end with `end`.
- Do not include explanations or comments.
- **The pseudocode must always be returned as a single string separated by `\n` (not as a list of strings).**
- **Never use string concatenation (`+`) or arrays—only one continuous string.**
- **Every nested control structure (forever, repeat, if, if-else, etc.) must also have its own `end` placed at the correct depth, ensuring proper closure of each block. The placement of `end` is critical for differentiating script meaning (e.g., Case 1 vs Case 2 nesting).**
4. **Formatting precautions**
- Numbers → `(5)`, `(-130)`
- Text/strings → `(hello)`
- Variables → `[score v]`
- Dropdowns → `[space v]`, `[Game Over v]`
- Reporter blocks → `((x position))`
- Boolean logic → `<condition>`, `<<cond1> and <cond2>>`, `<not <cond>>`
- Operators → explicit, e.g. `(([speed v]) * (1.1))`
### Critical Notes
- Be robust to OCR noise: missing characters, misread symbols, or accidental merges.
- Never output raw OCR mistakes—always normalize to valid Scratch pseudocode.
- Output only the JSON object, nothing else.
"""
# SYSTEM_PROMPT_JSON_CORRECTOR = """
# You are an assistant that outputs JSON responses strictly following the given schema.
# If the JSON you produce has any formatting errors, missing required fields, or invalid structure, you must identify the problems and correct them.
# Always return only valid JSON that fully conforms to the schema below, enclosed in triple backticks (```), without any extra text or explanation.
# If you receive an invalid or incomplete JSON response, fix it by:
# - Adding any missing required fields with appropriate values.
# - Correcting syntax errors such as missing commas, brackets, or quotes.
# - Ensuring the JSON structure matches the schema exactly.
# - Ensuring `"pseudocode"` is always a **single JSON string with embedded `\n` newlines** (never arrays, never concatenated with `+`).
# - Removing any invalid concatenation artifacts (`+`, `"string1" + "string2"`).
# - Never output explanations, comments, or extra text — only the corrected JSON.
# - **Every nested control structure (forever, repeat, if, if-else, etc.) must also have its own `end` placed at the correct depth, ensuring proper closure of each block. The placement of `end` is critical for differentiating script meaning (e.g., Case 1 vs Case 2 nesting).**
# Remember: Your output must be valid JSON only, ready to be parsed without errors.
# """
SYSTEM_PROMPT_JSON_CORRECTOR = """
You are a JSON correction assistant. Your ONLY task is to fix malformed JSON and return it in the correct format.
REQUIRED OUTPUT FORMAT:
{
"refined_logic": {
"name_variable": "sprite_name_here",
"pseudocode": "pseudocode_string_here"
}
}
RULES:
1. Extract the sprite name and pseudocode from the input
2. Return ONLY valid JSON in the exact format above
3. No explanations, no extra text, no other fields
4. If you can't find the data, use "Unknown" for name_variable and "No pseudocode found" for pseudocode
"""
# debugger and resolver agent for Scratch 3.0
# Main agent of the system agent for Scratch 3.0
agent = create_react_agent(
model=llm,
tools=[], # No specific tools are defined here, but could be added later
prompt=SYSTEM_PROMPT
)
# agent_2 = create_react_agent(
# model=llm2,
# tools=[], # No specific tools are defined here, but could be added later
# prompt=SYSTEM_PROMPT
# )
agent_json_resolver = create_react_agent(
model=llm,
tools=[], # No specific tools are defined here, but could be added later
prompt=SYSTEM_PROMPT_JSON_CORRECTOR
)
def init_dinov2(model_name: str = DINOV2_MODEL, device: torch.device = DEVICE):
"""
Lazy-initialize DINOv2 processor & model (call once before embedding).
"""
global _dinov2_processor, _dinov2_model
if _dinov2_processor is None or _dinov2_model is None:
_dinov2_processor = AutoImageProcessor.from_pretrained(model_name)
_dinov2_model = AutoModel.from_pretrained(model_name)
_dinov2_model.eval().to(device)
def embed_bytesio_list(bytesio_list, batch_size: int = 8):
"""
Accepts a list of BytesIO objects (each contains an image).
Returns: np.ndarray shape (N, D) of L2-normalized embeddings (dtype float32).
"""
if _dinov2_processor is None or _dinov2_model is None:
init_dinov2()
imgs = []
for b in bytesio_list:
with Image.open(b) as original_img:
# Create a new image with a white background in RGB mode
final_img = Image.new("RGB", original_img.size, (255, 255, 255))
# Paste the original image onto the white background, using the alpha channel as a mask if it exists
if original_img.mode == 'RGBA':
final_img.paste(original_img, mask=original_img.split()[-1])
else:
final_img.paste(original_img)
imgs.append(final_img.copy())
embs = []
for i in range(0, len(imgs), batch_size):
batch = imgs[i: i + batch_size]
inputs = _dinov2_processor(images=batch, return_tensors="pt")
inputs = {k: v.to(DEVICE) for k, v in inputs.items()}
with torch.no_grad():
out = _dinov2_model(**inputs)
cls = out.last_hidden_state[:, 0, :] # (B, D)
cls = torch.nn.functional.normalize(cls, p=2, dim=1)
embs.append(cls.cpu().numpy())
if not embs:
return np.zeros((0, _dinov2_model.config.hidden_size), dtype=np.float32)
return np.vstack(embs).astype(np.float32)
def l2_normalize_rows(a: np.ndarray, eps: float = 1e-12) -> np.ndarray:
"""
Row-wise L2 normalization for numpy arrays.
"""
norm = np.linalg.norm(a, axis=1, keepdims=True)
return a / (norm + eps)
# Helper function to load the block catalog from a JSON file
def _load_block_catalog(block_type: str) -> Dict:
"""
Loads the Scratch block catalog named '{block_type}_blocks.json'
from the <project_root>/blocks/ folder. Returns {} on any error.
"""
catalog_path = BLOCKS_DIR / f"{block_type}.json"
try:
text = catalog_path.read_text() # will raise FileNotFoundError if missing
catalog = json.loads(text) # will raise JSONDecodeError if malformed
logger.info(f"Successfully loaded block catalog from {catalog_path}")
return catalog
except FileNotFoundError:
logger.error(f"Error: Block catalog file not found at {catalog_path}")
except json.JSONDecodeError as e:
logger.error(f"Error decoding JSON from {catalog_path}: {e}")
except Exception as e:
logger.error(f"Unexpected error loading {catalog_path}: {e}")
def get_block_by_opcode(catalog_data: dict, opcode: str) -> dict | None:
"""
Search a single catalog (with keys "description" and "blocks": List[dict])
for a block whose 'op_code' matches the given opcode.
Returns the block dict or None if not found.
"""
for block in catalog_data["blocks"]:
if block.get("op_code") == opcode:
return block
return None
# Helper function to find a block in all catalogs by opcode
def find_block_in_all(opcode: str, all_catalogs: list[dict]) -> dict | None:
"""
Search across multiple catalogs for a given opcode.
Returns the first matching block dict or None.
"""
for catalog in all_catalogs:
blk = get_block_by_opcode(catalog, opcode)
if blk is not None:
return blk
return None
def variable_intialization(project_data):
"""
Updates variable and broadcast definitions in a Scratch project JSON,
populating the 'variables' and 'broadcasts' sections of the Stage target
and extracting initial values for variables.
Args:
project_data (dict): The loaded JSON data of the Scratch project.
Returns:
dict: The updated project JSON data.
"""
stage_target = None
for target in project_data['targets']:
if target.get('isStage'):
stage_target = target
break
if stage_target is None:
print("Error: Stage target not found in the project data.")
return project_data
# Ensure 'variables' and 'broadcasts' exist in the Stage target
if "variables" not in stage_target:
stage_target["variables"] = {}
if "broadcasts" not in stage_target:
stage_target["broadcasts"] = {}
# Helper function to recursively find and update variable/broadcast fields
def process_dict(obj):
if isinstance(obj, dict):
# Check for "data_setvariableto" opcode to extract initial values
if obj.get("opcode") == "data_setvariableto":
variable_field = obj.get("fields", {}).get("VARIABLE")
value_input = obj.get("inputs", {}).get("VALUE")
if variable_field and isinstance(variable_field, list) and len(variable_field) == 2:
var_name = variable_field[0]
var_id = variable_field[1]
initial_value = ""
if value_input and isinstance(value_input, list) and len(value_input) > 1 and \
isinstance(value_input[1], list) and len(value_input[1]) > 1:
if value_input[1][0] == 10:
initial_value = str(value_input[1][1])
elif value_input[1][0] == 12 and len(value_input) > 2 and isinstance(value_input[2], list) and value_input[2][0] == 10:
initial_value = str(value_input[2][1])
elif isinstance(value_input[1], (str, int, float)):
initial_value = str(value_input[1])
stage_target["variables"][var_id] = [var_name, initial_value]
for key, value in obj.items():
# Process broadcast definitions in 'inputs' (BROADCAST_INPUT)
if key == "BROADCAST_INPUT" and isinstance(value, list) and len(value) == 2 and \
isinstance(value[1], list) and len(value[1]) == 3 and value[1][0] == 11:
broadcast_name = value[1][1]
broadcast_id = value[1][2]
stage_target["broadcasts"][broadcast_id] = broadcast_name
# Process broadcast definitions in 'fields' (BROADCAST_OPTION)
elif key == "BROADCAST_OPTION" and isinstance(value, list) and len(value) == 2:
broadcast_name = value[0]
broadcast_id = value[1]
stage_target["broadcasts"][broadcast_id] = broadcast_name
# Recursively call for nested dictionaries or lists
process_dict(value)
elif isinstance(obj, list):
for i, item in enumerate(obj):
# Process variable references in 'inputs' (like [12, "score", "id"])
if isinstance(item, list) and len(item) == 3 and item[0] == 12:
var_name = item[1]
var_id = item[2]
if var_id not in stage_target["variables"]:
stage_target["variables"][var_id] = [var_name, ""]
process_dict(item)
# Iterate through all targets to process their blocks
for target in project_data['targets']:
if "blocks" in target:
for block_id, block_data in target["blocks"].items():
process_dict(block_data)
return project_data
def deduplicate_variables(project_data):
"""
Removes duplicate variable entries in the 'variables' dictionary of the Stage target,
prioritizing entries with non-empty values.
Args:
project_data (dict): The loaded JSON data of the Scratch project.
Returns:
dict: The updated project JSON data with deduplicated variables.
"""
stage_target = None
for target in project_data['targets']:
if target.get('isStage'):
stage_target = target
break
if stage_target is None:
print("Error: Stage target not found in the project data.")
return project_data
if "variables" not in stage_target:
return project_data # No variables to deduplicate
# Use a temporary dictionary to store the preferred variable entry by name
# Format: {variable_name: [variable_id, variable_name, variable_value]}
resolved_variables = {}
for var_id, var_info in stage_target["variables"].items():
var_name = var_info[0]
var_value = var_info[1]
if var_name not in resolved_variables:
# If the variable name is not yet seen, add it
resolved_variables[var_name] = [var_id, var_name, var_value]
else:
# If the variable name is already seen, decide which one to keep
existing_id, existing_name, existing_value = resolved_variables[var_name]
# Prioritize the entry with a non-empty value
if var_value != "" and existing_value == "":
resolved_variables[var_name] = [var_id, var_name, var_value]
# If both have non-empty values, or both are empty, keep the current one (arbitrary choice, but consistent)
# The current logic will effectively keep the last one encountered that has a value,
# or the very last one if all are empty.
elif var_value != "" and existing_value != "":
# If there are multiple non-empty values for the same variable name
# this keeps the one from the most recent iteration.
# For the given example, this will correctly keep "5".
resolved_variables[var_name] = [var_id, var_name, var_value]
elif var_value == "" and existing_value == "":
# If both are empty, just keep the current one (arbitrary)
resolved_variables[var_name] = [var_id, var_name, var_value]
# Reconstruct the 'variables' dictionary using the resolved entries
new_variables_dict = {}
for var_name, var_data in resolved_variables.items():
var_id_to_keep = var_data[0]
var_name_to_keep = var_data[1]
var_value_to_keep = var_data[2]
new_variables_dict[var_id_to_keep] = [var_name_to_keep, var_value_to_keep]
stage_target["variables"] = new_variables_dict
return project_data
def variable_adder_main(project_data):
try:
declare_variable_json= variable_intialization(project_data)
print("declare_variable_json------->",declare_variable_json)
except Exception as e:
print(f"Error error in the variable initialization opcodes: {e}")
try:
processed_json= deduplicate_variables(declare_variable_json)
print("processed_json------->",processed_json)
return processed_json
except Exception as e:
print(f"Error error in the variable initialization opcodes: {e}")
# --- Global variable for the block catalog ---
ALL_SCRATCH_BLOCKS_CATALOG = {}
BLOCK_CATALOG_PATH = "blocks" # Define the path to your JSON file
HAT_BLOCKS_PATH = "hat_blocks" # Path to the hat blocks JSON file
STACK_BLOCKS_PATH = "stack_blocks" # Path to the stack blocks JSON file
REPORTER_BLOCKS_PATH = "reporter_blocks" # Path to the reporter blocks JSON file
BOOLEAN_BLOCKS_PATH = "boolean_blocks" # Path to the boolean blocks JSON file
C_BLOCKS_PATH = "c_blocks" # Path to the C blocks JSON file
CAP_BLOCKS_PATH = "cap_blocks" # Path to the cap blocks JSON file
# Load the block catalogs from their respective JSON files
hat_block_data = _load_block_catalog(HAT_BLOCKS_PATH)
hat_description = hat_block_data["description"]
#hat_description = hat_block_data.get("description", "No description available")
# hat_opcodes_functionalities = "\n".join([f" - Opcode: {block['op_code']}, functionality: {block['functionality']} example: standalone use: {block['example_standalone']}" for block in hat_block_data["blocks"]])
hat_opcodes_functionalities = "\n".join([
# f" - Opcode: {block.get('op_code', 'N/A')}, functionality: {block.get('functionality', 'N/A')}, example: standalone use {block.get('example_standalone', 'N/A')}"
f" - Opcode: {block.get('op_code', 'N/A')}, example: standalone use {block.get('example_standalone', 'N/A')}"
for block in hat_block_data.get("blocks", [])
]) if isinstance(hat_block_data.get("blocks"), list) else " No blocks information available."
#hat_opcodes_functionalities = os.path.join(BLOCKS_DIR, "hat_blocks.txt")
print("Hat blocks loaded successfully.", hat_description)
boolean_block_data = _load_block_catalog(BOOLEAN_BLOCKS_PATH)
boolean_description = boolean_block_data["description"]
# boolean_opcodes_functionalities = "\n".join([f" - Opcode: {block['op_code']}, functionality: {block['functionality']} example: standalone use: {block['example_standalone']}" for block in boolean_block_data["blocks"]])
boolean_opcodes_functionalities = "\n".join([
# f" - Opcode: {block.get('op_code', 'N/A')}, functionality: {block.get('functionality', 'N/A')}, example: standalone use {block.get('example_standalone', 'N/A')}"
f" - Opcode: {block.get('op_code', 'N/A')}, example: standalone use {block.get('example_standalone', 'N/A')}"
for block in boolean_block_data.get("blocks", [])
]) if isinstance(boolean_block_data.get("blocks"), list) else " No blocks information available."
#boolean_opcodes_functionalities = os.path.join(BLOCKS_DIR, "boolean_blocks.txt")
c_block_data = _load_block_catalog(C_BLOCKS_PATH)
c_description = c_block_data["description"]
# c_opcodes_functionalities = "\n".join([f" - Opcode: {block['op_code']}, functionality: {block['functionality']} example: standalone use: {block['example_standalone']}" for block in c_block_data["blocks"]])
c_opcodes_functionalities = "\n".join([
# f" - Opcode: {block.get('op_code', 'N/A')}, functionality: {block.get('functionality', 'N/A')}, example: standalone use {block.get('example_standalone', 'N/A')}"
f" - Opcode: {block.get('op_code', 'N/A')}, example: standalone use {block.get('example_standalone', 'N/A')}"
for block in c_block_data.get("blocks", [])
]) if isinstance(c_block_data.get("blocks"), list) else " No blocks information available."
#c_opcodes_functionalities = os.path.join(BLOCKS_DIR, "c_blocks.txt")
cap_block_data = _load_block_catalog(CAP_BLOCKS_PATH)
cap_description = cap_block_data["description"]
# cap_opcodes_functionalities = "\n".join([f" - Opcode: {block['op_code']}, functionality: {block['functionality']} example: standalone use: {block['example_standalone']}" for block in cap_block_data["blocks"]])
cap_opcodes_functionalities = "\n".join([
# f" - Opcode: {block.get('op_code', 'N/A')}, functionality: {block.get('functionality', 'N/A')}, example: standalone use {block.get('example_standalone', 'N/A')}"
f" - Opcode: {block.get('op_code', 'N/A')}, example: standalone use {block.get('example_standalone', 'N/A')}"
for block in cap_block_data.get("blocks", [])
]) if isinstance(cap_block_data.get("blocks"), list) else " No blocks information available."
#cap_opcodes_functionalities = os.path.join(BLOCKS_DIR, "cap_blocks.txt")
reporter_block_data = _load_block_catalog(REPORTER_BLOCKS_PATH)
reporter_description = reporter_block_data["description"]
# reporter_opcodes_functionalities = "\n".join([f" - Opcode: {block['op_code']}, functionality: {block['functionality']} example: standalone use: {block['example_standalone']}" for block in reporter_block_data["blocks"]])
reporter_opcodes_functionalities = "\n".join([
# f" - Opcode: {block.get('op_code', 'N/A')}, functionality: {block.get('functionality', 'N/A')}, example: standalone use {block.get('example_standalone', 'N/A')}"
f" - Opcode: {block.get('op_code', 'N/A')}, example: standalone use {block.get('example_standalone', 'N/A')}"
for block in reporter_block_data.get("blocks", [])
]) if isinstance(reporter_block_data.get("blocks"), list) else " No blocks information available."
#reporter_opcodes_functionalities = os.path.join(BLOCKS_DIR, "reporter_blocks.txt")
stack_block_data = _load_block_catalog(STACK_BLOCKS_PATH)
stack_description = stack_block_data["description"]
# stack_opcodes_functionalities = "\n".join([f" - Opcode: {block['op_code']}, functionality: {block['functionality']} example: standalone use: {block['example_standalone']}" for block in stack_block_data["blocks"]])
stack_opcodes_functionalities = "\n".join([
# f" - Opcode: {block.get('op_code', 'N/A')}, functionality: {block.get('functionality', 'N/A')}, example: standalone use {block.get('example_standalone', 'N/A')}"
f" - Opcode: {block.get('op_code', 'N/A')}, example: standalone use {block.get('example_standalone', 'N/A')}"
for block in stack_block_data.get("blocks", [])
]) if isinstance(stack_block_data.get("blocks"), list) else " No blocks information available."
#stack_opcodes_functionalities = os.path.join(BLOCKS_DIR, "stack_blocks.txt")
# This makes ALL_SCRATCH_BLOCKS_CATALOG available globally
ALL_SCRATCH_BLOCKS_CATALOG = _load_block_catalog(BLOCK_CATALOG_PATH)
# Helper function to extract JSON from LLM response
# def extract_json_from_llm_response(raw_response: str) -> dict:
# # --- 1) Pull out the JSON code‑block if present ---
# md = re.search(r"```(?:json)?\s*([\s\S]*?)\s*```", raw_response)
# json_string = md.group(1).strip() if md else raw_response
# # --- 2) Trim to the outermost { … } so we drop any prefix/suffix junk ---
# first, last = json_string.find('{'), json_string.rfind('}')
# if 0 <= first < last:
# json_string = json_string[first:last+1]
# # --- 3) PRE‑CLEANUP: remove stray assistant{…}, rogue assistant keys, fix boolean quotes ---
# json_string = re.sub(r'\b\w+\s*{', '{', json_string)
# json_string = re.sub(r'"assistant"\s*:', '', json_string)
# json_string = re.sub(r'\b(false|true)"', r'\1', json_string)
# logger.debug("Ran pre‑cleanup for stray tokens and boolean quotes.")
# # --- 3.1) Fix stray inner quotes at start of name/list values ---
# # e.g., { "name": " \"recent_scoress\"", ... } → "recent_scoress"
# json_string = re.sub(
# r'("name"\s*:\s*")\s*"',
# r'\1',
# json_string
# )
# # --- 4) Escape all embedded quotes in any `logic` value up to the next key ---
# def _esc(m):
# prefix, body = m.group(1), m.group(2)
# return prefix + body.replace('"', r'\"')
# json_string = re.sub(
# r'("logic"\s*:\s*")([\s\S]+?)(?=",\s*"[A-Za-z_]\w*"\s*:\s*)',
# _esc,
# json_string
# )
# logger.debug("Escaped embedded quotes in logic fields.")
# logger.debug("Quoted unquoted keys.")
# # --- 6) Remove trailing commas before } or ] ---
# json_string = re.sub(r',\s*(?=[}\],])', '', json_string)
# json_string = re.sub(r',\s*,', ',', json_string)
# logger.debug("Removed trailing commas.")
# # --- 7) Balance braces: drop extra } at end if needed ---
# ob, cb = json_string.count('{'), json_string.count('}')
# if cb > ob:
# excess = cb - ob
# json_string = json_string.rstrip()[:-excess]
# logger.debug(f"Stripped {excess} extra closing brace(s).")
# # --- 8) Escape literal newlines in *all* string values ---
# json_string = re.sub(
# r'"((?:[^"\\]|\\.)*?)"',
# lambda m: '"' + m.group(1).replace('\n', '\\n').replace('\r', '\\r') + '"',
# json_string,
# flags=re.DOTALL
# )
# logger.debug("Escaped newlines in strings.")
# # --- 9) Final parse attempt ---
# try:
# return json.loads(json_string)
# except json.JSONDecodeError:
# logger.error("Sanitized JSON still invalid:\n%s", json_string)
# raise
def extract_json_from_llm_response(raw_response: str) -> dict:
"""
Finds and parses the first valid JSON object from a raw LLM response string.
"""
logger.debug("Attempting to extract JSON from raw LLM response...")
# 1. Look for a JSON markdown block first
match = re.search(r"```(?:json)?\s*({[\s\S]*?})\s*```", raw_response)
if match:
json_string = match.group(1)
logger.debug("Found JSON inside a markdown block.")
try:
return json.loads(json_string)
except json.JSONDecodeError as e:
logger.warning(f"Failed to parse JSON from markdown block: {e}")
# Fall through to the next method if parsing fails
# 2. If no block is found (or it failed), find the outermost braces
logger.debug("Markdown block not found or failed. Searching for outermost braces.")
try:
first_brace = raw_response.find('{')
last_brace = raw_response.rfind('}')
if first_brace != -1 and last_brace != -1 and first_brace < last_brace:
json_string = raw_response[first_brace : last_brace + 1]
return json.loads(json_string)
else:
logger.error("Could not find a valid JSON structure (outermost braces).")
raise json.JSONDecodeError("No valid JSON object found in the response.", raw_response, 0)
except json.JSONDecodeError as e:
logger.error(f"Final JSON parsing attempt failed: {e}")
# Re-raise the exception to be caught by the calling logic (to invoke the corrector agent)
raise
def reduce_image_size_to_limit(clean_b64_str: str, max_kb: int = 4000) -> str:
"""
Input: clean_b64_str = BASE64 STRING (no data: prefix)
Output: BASE64 STRING (no data: prefix), sized as close as possible to max_kb KB.
Guarantees: returns a valid base64 string (never None). May still be larger than max_kb
if saving at lowest quality cannot get under the limit.
"""
# sanitize
clean = re.sub(r"\s+", "", clean_b64_str).strip()
# fix padding
missing = len(clean) % 4
if missing:
clean += "=" * (4 - missing)
try:
image_data = base64.b64decode(clean)
except Exception as e:
raise ValueError("Invalid base64 input to reduce_image_size_to_limit") from e
try:
img = Image.open(io.BytesIO(image_data))
img.load()
except Exception as e:
raise ValueError("Could not open image from base64") from e
# convert alpha -> RGB because JPEG doesn't support alpha
if img.mode in ("RGBA", "LA") or (img.mode == "P" and "transparency" in img.info):
background = Image.new("RGB", img.size, (255, 255, 255))
background.paste(img, mask=img.split()[-1] if img.mode != "RGB" else None)
img = background
elif img.mode != "RGB":
img = img.convert("RGB")
low, high = 20, 95
best_bytes = None
# binary search for best quality
while low <= high:
mid = (low + high) // 2
buf = io.BytesIO()
try:
img.save(buf, format="JPEG", quality=mid, optimize=True)
except OSError:
# some PIL builds/channels may throw on optimize=True; fallback without optimize
buf = io.BytesIO()
img.save(buf, format="JPEG", quality=mid)
size_kb = len(buf.getvalue()) / 1024.0
if size_kb <= max_kb:
best_bytes = buf.getvalue()
low = mid + 1
else:
high = mid - 1
# if never found a quality <= max_kb, use the smallest we created (quality = 20)
if best_bytes is None:
buf = io.BytesIO()
try:
img.save(buf, format="JPEG", quality=20, optimize=True)
except OSError:
buf = io.BytesIO()
img.save(buf, format="JPEG", quality=20)
best_bytes = buf.getvalue()
return base64.b64encode(best_bytes).decode("utf-8")
def clean_base64_for_model(raw_b64, max_bytes_threshold=4000000) -> str:
"""
Accepts: raw_b64 can be:
- a data URI 'data:image/png;base64,...'
- a plain base64 string
- a PIL Image
- a list containing the above (take first)
Returns: a data URI string 'data:<mime>;base64,<base64>' guaranteed to be syntactically valid.
"""
# normalize input
if not raw_b64:
return ""
if isinstance(raw_b64, list):
raw_b64 = raw_b64[0] if raw_b64 else ""
if not raw_b64:
return ""
if isinstance(raw_b64, Image.Image):
buf = io.BytesIO()
# convert to RGB and save as JPEG to keep consistent
img = raw_b64.convert("RGB")
img.save(buf, format="JPEG")
clean_b64 = base64.b64encode(buf.getvalue()).decode("utf-8")
mime = "image/jpeg"
return f"data:{mime};base64,{clean_b64}"
if not isinstance(raw_b64, str):
raise TypeError(f"Expected base64 string or PIL Image, got {type(raw_b64)}")
# detect mime if present; otherwise default to png
m = re.match(r"^data:(image\/[a-zA-Z0-9.+-]+);base64,(.+)$", raw_b64, flags=re.DOTALL)
if m:
mime = m.group(1)
clean_b64 = m.group(2)
else:
# no prefix; assume png by default (you can change to jpeg if you prefer)
mime = "image/png"
clean_b64 = raw_b64
# sanitize base64 string
clean_b64 = re.sub(r"\s+", "", clean_b64).strip()
missing = len(clean_b64) % 4
if missing:
clean_b64 += "=" * (4 - missing)
original_size_bytes = len(clean_b64.encode("utf-8"))
# debug print
print(f"Original base64 size (bytes): {original_size_bytes}, mime: {mime}")
if original_size_bytes > max_bytes_threshold:
# reduce and return JPEG prefixed data URI (JPEG tends to compress better for photos)
reduced_clean = reduce_image_size_to_limit(clean_b64, max_kb=4000)
# reduced_clean is plain base64 (no prefix)
print(f"Reduced base64 size (bytes): {original_size_bytes}, mime: {mime}")
return f"data:image/jpeg;base64,{reduced_clean}"
# otherwise return original with its mime prefix (ensure prefix exists)
return f"data:{mime};base64,{clean_b64}"
SCRATCH_OPCODES = [
'motion_movesteps', 'motion_turnright', 'motion_turnleft', 'motion_goto',
'motion_gotoxy', 'motion_glideto', 'motion_glidesecstoxy', 'motion_pointindirection',
'motion_pointtowards', 'motion_changexby', 'motion_setx', 'motion_changeyby',
'motion_sety', 'motion_ifonedgebounce', 'motion_setrotationstyle', 'looks_sayforsecs',
'looks_say', 'looks_thinkforsecs', 'looks_think', 'looks_switchcostumeto',
'looks_nextcostume', 'looks_switchbackdropto', 'looks_switchbackdroptowait',
'looks_nextbackdrop', 'looks_changesizeby', 'looks_setsizeto', 'looks_changeeffectby',
'looks_seteffectto', 'looks_cleargraphiceffects', 'looks_show', 'looks_hide',
'looks_gotofrontback', 'looks_goforwardbackwardlayers', 'sound_playuntildone',
'sound_play', 'sound_stopallsounds', 'sound_changevolumeby', 'sound_setvolumeto',
'event_broadcast', 'event_broadcastandwait', 'control_wait', 'control_wait_until',
'control_stop', 'control_create_clone_of', 'control_delete_this_clone',
'data_setvariableto', 'data_changevariableby', 'data_addtolist', 'data_deleteoflist',
'data_insertatlist', 'data_replaceitemoflist', 'data_showvariable', 'data_hidevariable',
'data_showlist', 'data_hidelist', 'sensing_askandwait', 'sensing_resettimer',
'sensing_setdragmode', 'procedures_call', 'operator_lt', 'operator_equals',
'operator_gt', 'operator_and', 'operator_or', 'operator_not', 'operator_contains',
'sensing_touchingobject', 'sensing_touchingcolor', 'sensing_coloristouchingcolor',
'sensing_keypressed', 'sensing_mousedown', 'data_listcontainsitem', 'control_repeat',
'control_forever', 'control_if', 'control_if_else', 'control_repeat_until',
'motion_xposition', 'motion_yposition', 'motion_direction', 'looks_costumenumbername',
'looks_size', 'looks_backdropnumbername', 'sound_volume', 'sensing_distanceto',
'sensing_answer', 'sensing_mousex', 'sensing_mousey', 'sensing_loudness',
'sensing_timer', 'sensing_of', 'sensing_current', 'sensing_dayssince2000',
'sensing_username', 'operator_add', 'operator_subtract', 'operator_multiply',
'operator_divide', 'operator_random', 'operator_join', 'operator_letterof',
'operator_length', 'operator_mod', 'operator_round', 'operator_mathop',
'data_variable', 'data_list', 'data_itemoflist', 'data_lengthoflist',
'data_itemnumoflist', 'event_whenflagclicked', 'event_whenkeypressed',
'event_whenthisspriteclicked', 'event_whenbackdropswitchesto', 'event_whengreaterthan',
'event_whenbroadcastreceived', 'control_start_as_clone', 'procedures_definition'
]
def validate_and_fix_opcodes(opcode_counts):
"""
Ensures all opcodes are valid. If an opcode is invalid, replace with closest match.
"""
corrected_list = []
for item in opcode_counts:
opcode = item.get("opcode")
count = item.get("count", 1)
if opcode not in SCRATCH_OPCODES:
# Find closest match (case-sensitive)
match = get_close_matches(opcode, SCRATCH_OPCODES, n=1, cutoff=0.6)
if match:
print(f"Opcode '{opcode}' not found. Replacing with '{match[0]}'")
opcode = match[0]
else:
print(f"Opcode '{opcode}' not recognized and no close match found. Skipping.")
continue
corrected_list.append({"opcode": opcode, "count": count})
# Merge duplicates after correction
merged = {}
for item in corrected_list:
merged[item["opcode"]] = merged.get(item["opcode"], 0) + item["count"]
return [{"opcode": k, "count": v} for k, v in merged.items()]
def format_scratch_pseudo_code(code_string):
"""
Parses and formats Scratch pseudo-code with correct indentation,
specifically handling if/else/end structures correctly.
Args:
code_string (str): A string containing Scratch pseudo-code with
potentially inconsistent indentation.
Returns:
str: The correctly formatted and indented pseudo-code string.
"""
lines = code_string.strip().split('\n')
formatted_lines = []
indent_level = 0
# Keywords that increase indentation for the NEXT line
indent_keywords = ['when', 'forever', 'if', 'repeat', 'else']
# Keywords that decrease indentation for the CURRENT line
unindent_keywords = ['end', 'else']
for line in lines:
stripped_line = line.strip()
if not stripped_line:
continue
# Check for keywords that should un-indent the current line
if any(keyword in stripped_line for keyword in unindent_keywords):
# Special case for 'else': it should align with its 'if'
if 'else' in stripped_line:
# Decrease indentation for 'else' and its following lines
indentation = ' ' * (indent_level -1)
formatted_lines.append(indentation + stripped_line)
continue
# For 'end', decrease the level before formatting
indent_level = max(0, indent_level - 1)
indentation = ' ' * indent_level
formatted_lines.append(indentation + stripped_line)
# Check for keywords that should indent the next line
if any(keyword in stripped_line for keyword in indent_keywords):
# 'else' both un-indents and indents, so the level remains the same for the next block
if 'else' not in stripped_line:
indent_level += 1
return '\n'.join(formatted_lines)
# Node 1: Logic updating if any issue here
def pseudo_generator_node(state: GameState):
logger.info("--- Running plan_logic_aligner_node ---")
image = state.get("project_image", "")
project_json = state["project_json"]
cnt =state["page_count"]
print(f"The page number recived at the pseudo_generator node:-----> {cnt}")
# MODIFICATION 1: Include 'Stage' in the list of names to plan for.
# It's crucial to ensure 'Stage' is always present for its global role.
target_names = [t["name"] for t in project_json["targets"]]
stage_names = [t["name"] for t in project_json["targets"] if t.get("isStage")]
sprite_names = [t["name"] for t in project_json["targets"] if not t.get("isStage")]
# Get costumes separately for Stage and Sprites
stage_costumes = [
c["name"]
for t in project_json["targets"] if t.get("isStage")
for c in t.get("costumes", [])
]
refinement_prompt = f"""
You are an expert in Scratch 3.0 game development, specializing in understanding block relationships (stacked, nested).
"Analyze the Scratch code-block image and generate Pseudo-Code for what this logic appears to be doing."
From Image, you also have to detect a value of Key given in Text form "Script for: ". Below is the example
Example: "Script for: Bear", "Script for:" is a key and "Bear" is value and check if there is related target name available.
**Special Rule for Stage Costumes:**
If the value of "Script for:" exactly matches ANY costume name in the Stage_costumes list given below,
then the `"name_variable"` in the output JSON must be set to `"Stage"` (not the costume name).
**Targets in Game (Sprites and Stage) available in project_json:**
Sprite_name:{', '.join(sprite_names)}
Stage_name: Stage
Stage_costumes: {', '.join(stage_costumes)}
--- Scratch 3.0 Block Reference ---
### Hat Blocks
Description: {hat_description}
Blocks:
{hat_opcodes_functionalities}
### Boolean Blocks
Description: {boolean_description}
Blocks:
{boolean_opcodes_functionalities}
### C Blocks
Description: {c_description}
Blocks:
{c_opcodes_functionalities}
### Cap Blocks
Description: {cap_description}
Blocks:
{cap_opcodes_functionalities}
### Reporter Blocks
Description: {reporter_description}
Blocks:
{reporter_opcodes_functionalities}
### Stack Blocks
Description: {stack_description}
Blocks:
{stack_opcodes_functionalities}
-----------------------------------
Your task is to:
If you don't find any "Code-Blocks" then,
**Don't generate Pseudo Code, and pass the message "No Code-blocks"
If you find any "Code-Blocks" then,
1. **Refine the 'logic'**: Make it precise, accurate, and fully aligned with the Game Description. Use Scratch‑consistent verbs and phrasing. **Do NOT** use raw double‑quotes inside the logic string.
2. **Structural requirements**:
- **Numeric values** `(e.g., 0, 5, 0.2, -130)` **must** be in parentheses: `(0)`, `(5)`, `(0.2)`, `(-130)`.
- **AlphaNumeric values** `(e.g., hello, say 5, 4, hi!)` **must** be in parentheses: `(hello)`, `(say 5)`, `(4)`, `(hi!)`.
- **Variables** must be in the form `[variable v]` (e.g., `[score v]`), even when used inside expressions two example use `set [score v] to (1)` or `show variable ([speed v])`.
- **Dropdown options** must be in the form `[option v]` (e.g., `[Game Start v]`, `[blue sky v]`). example use `when [space v] key pressed`.
- **Reporter blocks** used as inputs must be double‑wrapped: `((x position))`, `((y position))`. example use `if <((y position)) = (-130)> then` or `(((x position)) * (1))`.
- **Boolean blocks** in conditions must be inside `< >`, including nested ones: `<not <condition>>`, `<<cond1> and <cond2>>`,`<<cond1> or <cond2>>`.
- **Other Boolean blocks** in conditions must be inside `< >`, including nested ones or values or variables: `<(block/value/variable) * (block/value/variable)>`,`<(block/value/variable) < (block/value/variable)>`, and example of another variable`<[apple v] contains [a v]?>`.
- **Operator expressions** must use explicit Scratch operator blocks, e.g.:
```
(([ballSpeed v]) * (1.1))
```
- **Every hat block script must end** with a final `end` on its own line.
- **[critical important]:Every nested control structure (forever, repeat, if, if-else, etc.) must also have its own `end` placed at the correct depth, ensuring proper closure of each block. The placement of `end` is critical for differentiating script meaning (e.g., Case 1 vs Case 2 nesting).**
3. **Pseudo‑code formatting**:
- Represent each block or nested block on its own line.
- Auto-correct invalid OCR phrases to valid Scratch 3.0 block names using the **Scratch 3.0 Block Reference** above.
- **Indent nested blocks by 4 spaces under their parent (`forever`, `if`, etc.).This is a critical requirement.**
- No comments or explanatory text—just the block sequence.
- a natural language breakdown of each step taken after the event, formatted as a multi-line string representing pseudo-code. Ensure clarity and granularity—each described action should map closely to a Scratch block or tight sequence.
- **The pseudocode must be returned as a single string separated by `\n` (not as a list of strings).**
- **Never use string concatenation (`+`) or arrays—only one continuous string.**
- **Every nested control structure (forever, repeat, if, if-else, etc.) must also have its own `end` placed at the correct depth, ensuring proper closure of each block. The placement of `end` is critical for differentiating script meaning (e.g., Case 1 vs Case 2 nesting).**
4. **Logic content**:
- Build clear flow for mechanics (movement, jumping, flying, scoring, collisions).
- Match each action closely to a Scratch block or tight sequence.
- Do **NOT** include any justification or comments—only the raw logic.
5. **Examples for reference**:
**Correct** pattern for a simple start script:
```
when green flag clicked
switch backdrop to [blue sky v]
set [score v] to (0)
show variable [score v]
broadcast [Game Start v]
end
```
**Correct** pattern for updating the high score variable handling:
```
when I receive [Game Over v]
if <((score)) > (([High Score v]))> then
set [High Score v] to ([score v])
end
switch backdrop to [Game Over v]
end
```
**Correct** pattern for level up and increase difficulty use:
```
when I receive [Level Up v]
change [level v] by (1)
set [ballSpeed v] to ((([ballSpeed v]) * (1.1)))
end
```
**Correct** pattern for jumping mechanics use:
```
when [space v] key pressed
if <((y position)) = (-100)> then
repeat (5)
change y by (100)
wait (0.1) seconds
change y by (-100)
wait (0.1) seconds
end
end
end
```
**Correct** pattern for continuos moving objects use:
```
when green flag clicked
go to x: (240) y: (-100)
set [speed v] to (-5)
show variable [speed v]
forever
change x by ([speed v])
if <((x position)) < (-240)> then
go to x: (240) y: (-100)
end
end
end
```
6. **Donot** add any explaination of logic or comments to justify or explain just put the logic content in the json.
7. **Output**:
Return **only** a JSON object, using double quotes everywhere:
```json
{{
"refined_logic":{{
"name_variable": 'Value of "Sript for: "',
"pseudocode":"…your fully‑formatted pseudo‑code here…",
}}
}}
```
"""
image_input = {
"type": "image_url",
"image_url": {
# "url": f"data:image/png;base64,{image}"
"url": clean_base64_for_model(image[cnt])
}
}
content = [
{"type": "text", "text": refinement_prompt},
image_input
]
try:
# Invoke the main agent for logic refinement and relationship identification
response = agent.invoke({"messages": [{"role": "user", "content": content}]})
llm_output_raw = response["messages"][-1].content.strip()
print(f"llm_output_raw: {response}")
parsed_llm_output = extract_json_from_llm_response(llm_output_raw)
result = parsed_llm_output
print(f"result:\n\n {result}")
except json.JSONDecodeError as error_json:
# If JSON parsing fails, use the json resolver agent
# correction_prompt = (
# "Your task is to correct the provided JSON string to ensure it is **syntactically perfect and adheres strictly to JSON rules**.\n"
# "It must be a JSON object with `refined_logic` (string) and `block_relationships` (array of objects).\n"
# f"- **Error Details**: {error_json}\n\n"
# "**Strict Instructions for your response:**\n"
# "1. **ONLY** output the corrected JSON. Do not include any other text or explanations.\n"
# "2. Ensure all keys and string values are enclosed in **double quotes**. Escape internal quotes (`\\`).\n"
# "3. No trailing commas. Correct nesting.\n\n"
# "Here is the problematic JSON string to correct:\n"
# f"```json\n{llm_output_raw}\n```\n"
# "Corrected JSON:\n"
# )
correction_prompt = f"""
Fix this malformed response and return only the corrected JSON:
Input: {llm_output_raw if 'llm_output_raw' in locals() else 'No response available'}
Extract the sprite name and pseudocode, then return in this exact format:
{{
"refined_logic": {{
"name_variable": "sprite_name",
"pseudocode": "pseudocode_here"
}}
}}
"""
try:
correction_response = agent_json_resolver.invoke({"messages": [{"role": "user", "content": correction_prompt}]})
corrected_output = extract_json_from_llm_response(correction_response['messages'][-1].content)
# corrected_output = extract_json_from_llm_response(llm_output_raw)
#block_relationships = corrected_output.get("block_relationships", [])
result = corrected_output
print(f"result:\n\n {result}")
except Exception as e_corr:
logger.error(f"Failed to correct JSON output for even after retry: {e_corr}")
result = {
"refined_logic": {
"name_variable": "Error",
"pseudocode": "Processing failed"
}
}
# Update the original action_plan in the state with the refined version
state["pseudo_code"] = result
state["temp_pseudo_code"] += [result]
Data = state["temp_pseudo_code"]
# with open("debug_state.json", "w", encoding="utf-8") as f:
# json.dump(state, f, indent=2, ensure_ascii=False)
print(f"[OVREALL REFINED PSEUDO CODE LOGIC]: {result}")
print(f"[OVREALL LISTS OF LOGICS]: {Data}")
logger.info("Plan refinement and block relation analysis completed for all plans.")
return state
# Node2: Node Optimizer node
from typing import Dict, Any
def node_optimizer(state: Dict[str, Any]) -> Dict[str, Any]:
logger.info("--- Running Node Optimizer Node ---")
# safe fetches
project_json = state.get("project_json", {}) if isinstance(state.get("project_json", {}), dict) else {}
# ensure pseudo_code exists so we can write into it safely
raw = state.setdefault("pseudo_code", {})
refined_logic_data = raw.get("refined_logic", {}) or {}
# keep scalar sprite_name (if present) and build a separate map
sprite_name = refined_logic_data.get("name_variable", "<unknown>")
sprite_name_map: Dict[str, str] = {}
project_json_targets = project_json.get("targets", []) if isinstance(project_json, dict) else []
for target in project_json_targets:
if isinstance(target, dict):
name = target.get("name")
if name:
sprite_name_map[name] = name
try:
# ensure we read the pseudocode from the same key consistently
pseudo_text = refined_logic_data.get("pseudocode", "")
refined_logic_data["pseudocode"] = separate_scripts(str(pseudo_text))
# write refined logic back into state safely
state["pseudo_code"]["refined_logic"] = refined_logic_data
logger.debug("[The pseudo_code generated here]: %s", state["pseudo_code"])
# transform to action flow (expecting transform_logic_to_action_flow to accept the pseudo_code dict)
state["action_plan"] = transform_logic_to_action_flow(state["pseudo_code"])
logger.debug("[The action plan generated here]: %s", state["action_plan"])
action_flow = state.get("action_plan", {})
# Normalize plan_data into a list of (sprite, sprite_data) pairs
plan_data = []
if isinstance(action_flow, dict):
ao = action_flow.get("action_overall_flow")
if isinstance(ao, dict) and ao:
plan_data = list(ao.items())
else:
# fall back to the top-level dict items
plan_data = list(action_flow.items())
elif isinstance(action_flow, list):
# possible structure: list of dicts each holding sprite info
for item in action_flow:
if isinstance(item, tuple) and len(item) == 2:
plan_data.append(item)
elif isinstance(item, dict):
# try to detect common shapes
if "name" in item and "plans" in item:
plan_data.append((item["name"], item))
else:
plan_data = []
refined_flow: Dict[str, Any] = {}
for sprite, sprite_data in plan_data:
if sprite is None:
continue
sprite_data = sprite_data or {}
refined_plans = []
for plan in sprite_data.get("plans", []):
logic = plan.get("logic", "")
# analyze_opcode_counts should accept a string
plan["opcode_counts"] = analyze_opcode_counts(str(logic))
refined_plans.append(plan)
refined_flow[str(sprite)] = {
"description": sprite_data.get("description", ""),
"plans": refined_plans
}
if refined_flow:
state["action_plan"] = refined_flow
logger.info("Node Optimization completed.")
return state
except Exception:
# log full stacktrace for debugging but return state so caller won't break
logger.exception("Error in Node Optimizer Node — returning current state for inspection")
return state
# Node 5: block_builder_node
def overall_block_builder_node_2(state: GameState):
logger.info("--- Running OverallBlockBuilderNode ---")
print("--- Running OverallBlockBuilderNode ---")
project_json = state["project_json"]
targets = project_json["targets"]
# --- Sprite and Stage Target Mapping ---
sprite_map = {target["name"]: target for target in targets if not target["isStage"]}
stage_target = next((target for target in targets if target["isStage"]), None)
if stage_target:
sprite_map[stage_target["name"]] = stage_target
action_plan = state.get("action_plan", {})
print("[Overall Action Plan received at the block generator]:", json.dumps(action_plan, indent=2))
if not action_plan:
logger.warning("No action plan found in state. Skipping OverallBlockBuilderNode.")
return state
# Initialize offsets for script placement on the Scratch canvas
# script_y_offset = {}
# script_x_offset_per_sprite = {name: 0 for name in sprite_map.keys()}
# --- Set initial fixed position ---
INITIAL_X = 382
INITIAL_Y = -1089
# Initialize offsets for script placement on the Scratch canvas
script_y_offset = {name: INITIAL_Y for name in sprite_map.keys()}
script_x_offset_per_sprite = {name: INITIAL_X for name in sprite_map.keys()}
# This handles potential variations in the action_plan structure.
if action_plan.get("action_overall_flow", {}) == {}:
plan_data = action_plan.items()
else:
plan_data = action_plan.get("action_overall_flow", {}).items()
# --- Extract global project context for LLM ---
all_sprite_names = list(sprite_map.keys())
all_variable_names = {}
all_list_names = {}
all_broadcast_messages = {}
for target in targets:
for var_id, var_info in target.get("variables", {}).items():
all_variable_names[var_info[0]] = var_id # Store name -> ID mapping (e.g., "myVariable": "myVarId123")
for list_id, list_info in target.get("lists", {}).items():
all_list_names[list_info[0]] = list_id # Store name -> ID mapping
for broadcast_id, broadcast_name in target.get("broadcasts", {}).items():
all_broadcast_messages[broadcast_name] = broadcast_id # Store name -> ID mapping
# --- Process each sprite's action plan ---
for sprite_name, sprite_actions_data in plan_data:
if sprite_name in sprite_map:
current_sprite_target = sprite_map[sprite_name]
if "blocks" not in current_sprite_target:
current_sprite_target["blocks"] = {}
# if sprite_name not in script_y_offset:
# script_y_offset[sprite_name] = 0
for plan_entry in sprite_actions_data.get("plans", []):
logic_sequence = str(plan_entry["logic"])
opcode_counts = plan_entry.get("opcode_counts", {})
refined_indent_logic = format_scratch_pseudo_code(logic_sequence)
print(f"\n--------------------------- refined indent logic: {refined_indent_logic}-------------------------------\n")
try:
generated_blocks = block_builder(opcode_counts, refined_indent_logic)
# Ensure generated_blocks is a dictionary
if not isinstance(generated_blocks, dict):
logger.error(f"block_builder for sprite '{sprite_name}' returned non-dict type: {type(generated_blocks)}. Skipping block update.")
continue # Skip to next plan_entry if output is not a dictionary
if "blocks" in generated_blocks and isinstance(generated_blocks["blocks"], dict):
logger.warning(f"LLM returned nested 'blocks' key for {sprite_name}. Unwrapping.")
generated_blocks = generated_blocks["blocks"]
# Update block positions for top-level script
# for block_id, block_data in generated_blocks.items():
# if block_data.get("topLevel"):
# block_data["x"] = script_x_offset_per_sprite.get(sprite_name, 0)
# block_data["y"] = script_y_offset[sprite_name]
# script_y_offset[sprite_name] += 150 # Increment for next script
# Update block positions for top-level script
for block_id, block_data in generated_blocks.items():
if block_data.get("topLevel"):
block_data["x"] = INITIAL_X
block_data["y"] = script_y_offset[sprite_name]
script_y_offset[sprite_name] += 150 # increment only Y
current_sprite_target["blocks"].update(generated_blocks)
print(f"[current_sprite_target block updated]: {current_sprite_target['blocks']}")
state["iteration_count"] = 0
logger.info(f"Action blocks added for sprite '{sprite_name}' by OverallBlockBuilderNode.")
except Exception as e:
logger.error(f"Error generating blocks for sprite '{sprite_name}': {e}")
# Consider adding more specific error handling here if a malformed output
# from block_builder should cause a specific state change, but generally
# avoid nulling the entire project_json.
state["project_json"] = project_json
# with open("debug_state.json", "w", encoding="utf-8") as f:
# json.dump(state, f, indent=2, ensure_ascii=False)
return state
# Node 6: variable adder node
def variable_adder_node(state: GameState):
logger.info("--- Running Variable Adder Node ---")
project_json = state["project_json"]
try:
updated_project_json = variable_adder_main(project_json)
if updated_project_json is not None:
print("Variable added inside the project successfully!")
state["project_json"]=updated_project_json
else:
print("Variable adder unable to add any variable inside the project!")
state["project_json"]=project_json
state["page_count"] +=1
return state
except Exception as e:
logger.error(f"Error in variable adder node while updating project_json': {e}")
raise
# Node 7: variable adder node
def layer_order_correction(state: GameState):
"""
Ensures that all sprites (isStage: false) have unique layerOrder values >= 1.
If duplicates are found, they are reassigned sequentially.
"""
logger.info("--- Running Layer Order Correction Node ---")
try:
project_json = state.get("project_json", {})
targets = project_json.get("targets", [])
# Collect all sprites (ignore Stage)
sprites = [t for t in targets if not t.get("isStage", False)]
# Reassign layerOrder sequentially (starting from 1)
for idx, sprite in enumerate(sprites, start=1):
old_lo = sprite.get("layerOrder", None)
sprite["layerOrder"] = idx
logger.debug(f"Sprite '{sprite.get('name')}' layerOrder: {old_lo} -> {idx}")
# Stage always remains 0
for target in targets:
if target.get("isStage", False):
target["layerOrder"] = 0
# Update state
state["project_json"]["targets"] = targets
logger.info("Layer Order Correction completed successfully.")
return state
except Exception as e:
logger.error(f"Error in Layer Order Correction Node: {e}")
return state
# Node 8: variable adder node
def processed_page_node(state: GameState):
logger.info("--- Processing the Pages Node ---")
image = state.get("project_image", "")
cnt =state["page_count"]
print(f"The page processed for page:--------------> {cnt}")
if cnt<len(image):
state["processing"]= True
else:
state["processing"]= False
return state
# Prepare manipulated sprite JSON structure
# { changes: "pdf_stream" in place of "pdf_path"
def extract_images_from_pdf(pdf_stream: io.BytesIO):
''' Extract images from PDF and generate structured sprite JSON '''
manipulated_json = {}
img_elements = []
try:
# {
# pdf_path = Path(pdf_path)
# pdf_filename = pdf_path.stem # e.g., "scratch_crab"
# pdf_dir_path = str(pdf_path.parent).replace("/", "\\")
# print("-------------------------------pdf_filename-------------------------------",pdf_filename)
# print("-------------------------------pdf_dir_path-------------------------------",pdf_dir_path)
if isinstance(pdf_stream, io.BytesIO):
# use a random ID since there's no filename
pdf_id = uuid.uuid4().hex
else:
pdf_id = os.path.splitext(os.path.basename(pdf_stream))[0]
# extracted_image_subdir = DETECTED_IMAGE_DIR / pdf_filename
# json_subdir = JSON_DIR / pdf_filename
# extracted_image_subdir.mkdir(parents=True, exist_ok=True)
# json_subdir.mkdir(parents=True, exist_ok=True)
# print("-------------------------------extracted_image_subdir-------------------------------",extracted_image_subdir)
# print("-------------------------------json_subdir-------------------------------",json_subdir)
# # Output paths (now using Path objects directly)
# output_json_path = json_subdir / "extracted.json"
# final_json_path = json_subdir / "extracted_sprites.json" # Path to extracted_sprites.json
# final_json_path_2 = json_subdir / "extracted_sprites_2.json"
# print("-------------------------------output_json_path-------------------------------",output_json_path)
# print("-------------------------------final_json_path-------------------------------",final_json_path)
# print("-------------------------------final_json_path_2-------------------------------",final_json_path_2)
# }
try:
elements = partition_pdf(
# filename=str(pdf_path), # partition_pdf might expect a string
file=pdf_stream, # 'file=', inplace of 'filename'
strategy="hi_res",
extract_image_block_types=["Image"],
hi_res_model_name="yolox",
extract_image_block_to_payload=True,
)
print(f"ELEMENTS")
except Exception as e:
raise RuntimeError(
f"❌ Failed to extract images from PDF: {str(e)}")
file_elements = [element.to_dict() for element in elements]
#{
# try:
# with open(output_json_path, "w") as f:
# json.dump([element.to_dict()
# for element in elements], f, indent=4)
# except Exception as e:
# raise RuntimeError(f"❌ Failed to write extracted.json: {str(e)}")
# try:
# # Display extracted images
# with open(output_json_path, 'r') as file:
# file_elements = json.load(file)
# except Exception as e:
# raise RuntimeError(f"❌ Failed to read extracted.json: {str(e)}")
# }
sprite_count = 1
for el in file_elements:
img_b64 = el["metadata"].get("image_base64")
if not img_b64:
continue
manipulated_json[f"Sprite {sprite_count}"] = {
# "id":auto_id,
# "name": name,
"base64": el["metadata"]["image_base64"],
"file-path": pdf_id,
# "description": description
}
sprite_count += 1
return manipulated_json
except Exception as e:
raise RuntimeError(f"❌ Error in extract_images_from_pdf: {str(e)}")
# def similarity_matching(input_json_path: str, project_folder: str) -> str:
def similarity_matching(sprites_data: str, project_folder: str) -> str:
logger.info("🔍 Running similarity matching…")
os.makedirs(project_folder, exist_ok=True)
# ----------------------------------------
# CHANGED: define normalized base-paths so startswith() checks work
backdrop_base_path = os.path.normpath(str(BACKDROP_DIR))
sprite_base_path = os.path.normpath(str(SPRITE_DIR))
code_blocks_path = os.path.normpath(str(CODE_BLOCKS_DIR))
# ----------------------------------------
project_json_path = os.path.join(project_folder, "project.json")
# ==============================
# READ SPRITE METADATA
# ==============================
# with open(input_json_path, 'r') as f:
# sprites_data = json.load(f)
sprite_ids, sprite_base64 = [], []
for sid, sprite in sprites_data.items():
sprite_ids.append(sid)
# texts.append("This is " + sprite.get("description", sprite.get("name", "")))
sprite_base64.append(sprite["base64"])
sprite_images_bytes = []
for b64 in sprite_base64:
img = Image.open(BytesIO(base64.b64decode(b64.split(",")[-1]))).convert("RGB")
buffer = BytesIO()
img.save(buffer, format="PNG")
buffer.seek(0)
sprite_images_bytes.append(buffer)
# =========================================
# Build the list of all candidate images
# =========================================
folder_image_paths = [
BACKDROP_DIR/"Baseball 2.sb3"/"7be1f5b3e682813dac1f297e52ff7dca.png",
BACKDROP_DIR/"Beach Malibu.sb3"/"050615fe992a00d6af0e664e497ebf53.png",
BACKDROP_DIR/"Bedroom 3.sb3"/"8cc0b88d53345b3e337e8f028a32a4e7.png",
BACKDROP_DIR/"Blue Sky.sb3"/"e7c147730f19d284bcd7b3f00af19bb6.png",
BACKDROP_DIR/"Castle 2.sb3"/"951765ee7f7370f120c9df20b577c22f.png",
BACKDROP_DIR/"Colorful City.sb3"/"04d18ddd1b85f0ea30beb14b8da49f60.png",
BACKDROP_DIR/"Hall.sb3"/"ea86ca30b346f27ca5faf1254f6a31e3.png",
BACKDROP_DIR/"Jungle.sb3"/"f4f908da19e2753f3ed679d7b37650ca.png",
BACKDROP_DIR/"Soccer.sb3"/"04a63154f04b09494354090f7cc2f1b9.png",
BACKDROP_DIR/"Theater.sb3"/"c2b097bc5cdb6a14ef5485202bc5ee76.png",
SPRITE_DIR/"Batter.sprite3"/"592ee9ab2aeefe65cb4fb95fcd046f33.png",
SPRITE_DIR/"Batter.sprite3"/"9d193bef6e3d6d8eba6d1470b8bf9351.png",
SPRITE_DIR/"Batter.sprite3"/"baseball_sprite_motion_1.png",
SPRITE_DIR/"Batter.sprite3"/"bd4fc003528acfa847e45ff82f346eee.png",
SPRITE_DIR/"Batter.sprite3"/"fdfde4bcbaca0f68e83fdf3f4ef0c660.png",
SPRITE_DIR/"Bear.sprite3"/"6f303e972f33fcb7ef36d0d8012d0975.png",
SPRITE_DIR/"Bear.sprite3"/"bear_motion_2.png",
SPRITE_DIR/"Bear.sprite3"/"deef1eaa96d550ae6fc11524a1935024.png",
SPRITE_DIR/"Beetle.sprite3"/"46d0dfd4ae7e9bfe3a6a2e35a4905eae.png",
SPRITE_DIR/"Butterfly 1.sprite3"/"34b76c1835c6a7fc2c47956e49bb0f52.png",
SPRITE_DIR/"Butterfly 1.sprite3"/"49c9f952007d870a046cff93b6e5e098.png",
SPRITE_DIR/"Butterfly 1.sprite3"/"fe98df7367e314d9640bfaa54fc239be.png",
SPRITE_DIR/"Cat.sprite3"/"0fb9be3e8397c983338cb71dc84d0b25.png",
SPRITE_DIR/"Cat.sprite3"/"bcf454acf82e4504149f7ffe07081dbc.png",
SPRITE_DIR/"Centaur.sprite3"/"2373556e776cad3ba4d6ee04fc34550b.png",
SPRITE_DIR/"Centaur.sprite3"/"c00ffa6c5dd0baf9f456b897ff974377.png",
SPRITE_DIR/"Centaur.sprite3"/"d722329bd9373ad80625e5be6d52f3ed.png",
SPRITE_DIR/"Centaur.sprite3"/"d7aa990538915b7ef1f496d7e8486ade.png",
SPRITE_DIR/"City Bus.sprite3"/"7d7e26014a346b894db8ab1819f2167f.png",
SPRITE_DIR/"City Bus.sprite3"/"e9694adbff9422363e2ea03166015393.png",
SPRITE_DIR/"Crab.sprite3"/"49839aa1b0feed02a3c759db5f8dee71.png",
SPRITE_DIR/"Crab.sprite3"/"bear_element.png",
SPRITE_DIR/"Crab.sprite3"/"f7cdd2acbc6d7559d33be8675059c79e.png",
SPRITE_DIR/"Glow-G.sprite3"/"56839bc48957869d980c6f9b6f5a2a91.png",
SPRITE_DIR/"Jordyn.sprite3"/"00c8c464c19460df693f8d5ae69afdab.png",
SPRITE_DIR/"Jordyn.sprite3"/"768c4601174f0dfcb96b3080ccc3a192.png",
SPRITE_DIR/"Jordyn.sprite3"/"a7cc1e5f02b58ecc8095cfc18eef0289.png",
SPRITE_DIR/"Jordyn.sprite3"/"db4d97cbf24e2b8af665bfbf06f67fa0.png",
SPRITE_DIR/"Soccer Ball.sprite3"/"5d973d7a3a8be3f3bd6e1cd0f73c32b5.png",
SPRITE_DIR/"Soccer Ball.sprite3"/"cat_football.png",
SPRITE_DIR/"Star.sprite3"/"551629f2a64c1f3703e57aaa133effa6.png",
SPRITE_DIR/"Wizard.sprite3"/"55ba51188af86ca16ef30267e874c1ed.png",
SPRITE_DIR/"Wizard.sprite3"/"91d495085eb4d02a375c42f6318071e7.png",
SPRITE_DIR/"Wizard.sprite3"/"df943c9894ee4b9df8c5893ce30c2a5f.png",
# CODE_BLOCKS_DIR/"client_code_block_1.jpg",
# CODE_BLOCKS_DIR/"client_code_block_2.jpg",
CODE_BLOCKS_DIR/"script1.JPG",
CODE_BLOCKS_DIR/"script2.JPG",
CODE_BLOCKS_DIR/"script3.JPG",
CODE_BLOCKS_DIR/"script4.JPG",
CODE_BLOCKS_DIR/"script5.JPG",
CODE_BLOCKS_DIR/"script6.JPG",
CODE_BLOCKS_DIR/"script7.JPG",
CODE_BLOCKS_DIR/"script8.JPG",
CODE_BLOCKS_DIR/"script9.JPG",
CODE_BLOCKS_DIR/"static_white.png"]
folder_image_paths = [os.path.normpath(str(p)) for p in folder_image_paths]
# =========================================
# -----------------------------------------
# Load reference embeddings from JSON
# -----------------------------------------
with open(f"{BLOCKS_DIR}/dinov2_embeddings.json", "r") as f:
embedding_json = json.load(f)
# =========================================
# Decode & embed each sprite image
# =========================================
# sprite_features = []
# for b64 in sprite_base64:
# if "," in b64:
# b64 = b64.split(",", 1)[1]
# img_bytes = base64.b64decode(b64)
# pil_img = Image.open(BytesIO(img_bytes)).convert("RGB")
# buf = BytesIO()
# pil_img.save(buf, format="PNG")
# buf.seek(0)
# feats = clip_embd.embed_image([buf])[0]
# sprite_features.append(feats)
# ============================== #
# EMBED SPRITE IMAGES #
# ============================== #
# ensure model is initialized (fast no-op after first call)
init_dinov2()
# embed the incoming sprite BytesIO images (same data structure you already use)
sprite_matrix = embed_bytesio_list(sprite_images_bytes, batch_size=8) # shape (N, D)
# load reference embeddings from JSON (they must be numeric lists)
img_matrix = np.array([img["embeddings"] for img in embedding_json], dtype=np.float32)
# normalize both sides (important — stored embeddings may not be normalized)
sprite_matrix = l2_normalize_rows(sprite_matrix)
img_matrix = l2_normalize_rows(img_matrix)
# =========================================
# Compute similarities & pick best match
# =========================================
similarity = np.matmul(sprite_matrix, img_matrix.T)
most_similar_indices = np.argmax(similarity, axis=1)
# =========================================
# Copy matched sprite assets + collect data
# =========================================
project_data = []
copied_folders = set()
for sprite_idx, matched_idx in enumerate(most_similar_indices):
matched_image_path = folder_image_paths[matched_idx]
matched_folder = os.path.dirname(matched_image_path)
# CHANGED: use our new normalized sprite_base_path
if not matched_folder.startswith(sprite_base_path):
continue
if matched_folder in copied_folders:
continue
copied_folders.add(matched_folder)
logger.info(f"Matched sprite: {matched_image_path}")
sprite_json_path = os.path.join(matched_folder, 'sprite.json')
if not os.path.exists(sprite_json_path):
logger.warning(f"No sprite.json in {matched_folder}")
continue
with open(sprite_json_path, 'r') as f:
sprite_info = json.load(f)
# copy all non‐matched files
for fname in os.listdir(matched_folder):
if fname in (os.path.basename(matched_image_path), 'sprite.json'):
continue
shutil.copy2(os.path.join(matched_folder, fname),
os.path.join(project_folder, fname))
project_data.append(sprite_info)
# =========================================
# Copy matched backdrop assets + collect
# =========================================
backdrop_data = []
copied_backdrop_folders = set()
for backdrop_idx, matched_idx in enumerate(most_similar_indices):
matched_image_path = folder_image_paths[matched_idx]
matched_folder = os.path.dirname(matched_image_path)
matched_filename = os.path.basename(matched_image_path)
# CHANGED: use our new normalized backdrop_base_path
if not matched_folder.startswith(backdrop_base_path):
continue
# skip if backdrop folder already processed
if matched_folder in copied_backdrop_folders:
continue
copied_backdrop_folders.add(matched_folder)
logger.info(f"Matched backdrop: {matched_image_path}")
# 1) Copy the matched backdrop image itself
try:
shutil.copy2(
matched_image_path,
os.path.join(project_folder, matched_filename)
)
logger.info(f"✅ Copied matched backdrop image {matched_filename} to {project_folder}")
except Exception as e:
logger.error(f"❌ Failed to copy matched backdrop {matched_image_path}: {e}")
# copy non‐matched files
for fname in os.listdir(matched_folder):
# if fname in (os.path.basename(matched_image_path), 'project.json'):
if fname in {matched_filename, 'project.json'}:
continue
# shutil.copy2(os.path.join(matched_folder, fname),
# os.path.join(project_folder, fname))
src = os.path.join(matched_folder, fname)
dst = os.path.join(project_folder, fname)
if os.path.isfile(src):
try:
shutil.copy2(src, dst)
logger.info(f"Copied additional backdrop asset {fname} to project folder")
except Exception as e:
logger.error(f"Failed to copy {src}: {e}")
# append the stage‐target from its project.json
pj = os.path.join(matched_folder, 'project.json')
if os.path.exists(pj):
with open(pj, 'r') as f:
bd_json = json.load(f)
for tgt in bd_json.get("targets", []):
if tgt.get("isStage"):
backdrop_data.append(tgt)
else:
logger.warning(f"No project.json in {matched_folder}")
# =========================================
# Merge into final Scratch project.json
# =========================================
final_project = {
"targets": [], "monitors": [], "extensions": [],
"meta": {
"semver": "3.0.0",
"vm": "11.3.0",
"agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36"
}
}
# sprites first
for spr in project_data:
if not spr.get("isStage", False):
final_project["targets"].append(spr)
# then backdrop as the Stage
if backdrop_data:
all_costumes, sounds = [], []
seen_costumes = set()
for i, bd in enumerate(backdrop_data):
for costume in bd.get("costumes", []):
# Create a unique key for the costume
key = (costume.get("name"), costume.get("assetId"))
if key not in seen_costumes:
seen_costumes.add(key)
all_costumes.append(costume)
if i == 0:
sounds = bd.get("sounds", [])
stage_obj={
"isStage": True,
"name": "Stage",
"objName": "Stage",
"variables": {},
"lists": {},
"broadcasts": {},
"blocks": {},
"comments": {},
"currentCostume": 1 if len(all_costumes) > 1 else 0,
"costumes": all_costumes,
"sounds": sounds,
"volume": 100,
"layerOrder": 0,
"tempo": 60,
"videoTransparency": 50,
"videoState": "on",
"textToSpeechLanguage": None
}
final_project["targets"].insert(0, stage_obj)
else:
logger.warning("⚠️ No backdrop matched. Using default static backdrop.")
default_backdrop_path = BACKDROP_DIR / "cd21514d0531fdffb22204e0ec5ed84a.svg"
default_backdrop_name = "cd21514d0531fdffb22204e0ec5ed84a.svg"
default_backdrop_sound = BACKDROP_DIR / "83a9787d4cb6f3b7632b4ddfebf74367.wav"
default_backdrop_sound_name = "cd21514d0531fdffb22204e0ec5ed84a.svg"
try:
shutil.copy2(default_backdrop_path, os.path.join(project_folder, default_backdrop_name))
logger.info(f"✅ Default backdrop copied to project: {default_backdrop_name}")
shutil.copy2(default_backdrop_sound, os.path.join(project_folder, default_backdrop_sound_name))
logger.info(f"✅ Default backdrop sound copied to project: {default_backdrop_sound_name}")
except Exception as e:
logger.error(f"❌ Failed to copy default backdrop: {e}")
stage_obj={
"isStage": True,
"name": "Stage",
"objName": "Stage",
"variables": {},
"lists": {},
"broadcasts": {},
"blocks": {},
"comments": {},
"currentCostume": 0,
"costumes": [
{
"assetId": default_backdrop_name.split(".")[0],
"name": "defaultBackdrop",
"md5ext": default_backdrop_name,
"dataFormat": "svg",
"rotationCenterX": 240,
"rotationCenterY": 180
}
],
"sounds": [
{
"name": "pop",
"assetId": "83a9787d4cb6f3b7632b4ddfebf74367",
"dataFormat": "wav",
"format": "",
"rate": 48000,
"sampleCount": 1123,
"md5ext": "83a9787d4cb6f3b7632b4ddfebf74367.wav"
}
],
"volume": 100,
"layerOrder": 0,
"tempo": 60,
"videoTransparency": 50,
"videoState": "on",
"textToSpeechLanguage": None
}
final_project["targets"].insert(0, stage_obj)
with open(project_json_path, 'w') as f:
json.dump(final_project, f, indent=2)
return project_json_path
def convert_pdf_stream_to_images(pdf_stream: io.BytesIO, dpi=300):
# Ensure we are at the start of the stream
pdf_stream.seek(0)
with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as tmp_pdf:
tmp_pdf.write(pdf_stream.read())
tmp_pdf_path = tmp_pdf.name
# Now use convert_from_path on the temp file
images = convert_from_path(tmp_pdf_path, dpi=dpi)
return images
def delay_for_tpm_node(state: GameState):
logger.info("--- Running DelayForTPMNode ---")
time.sleep(10) # Adjust the delay as needed
logger.info("Delay completed.")
return state
# Build the LangGraph workflow
workflow = StateGraph(GameState)
# Add all nodes to the workflow
# workflow.add_node("time_delay_1", delay_for_tpm_node)
# workflow.add_node("time_delay_2", delay_for_tpm_node)
# workflow.add_node("time_delay_3", delay_for_tpm_node)
# workflow.add_node("time_delay_4", delay_for_tpm_node)
workflow.add_node("pseudo_generator", pseudo_generator_node)
#workflow.add_node("plan_generator", overall_planner_node)
workflow.add_node("Node_optimizer", node_optimizer)
workflow.add_node("layer_optimizer", layer_order_correction)
#workflow.add_node("logic_alignment", plan_logic_aligner_node)
#workflow.add_node("plan_verifier", plan_verification_node)
#workflow.add_node("refined_planner", refined_planner_node) # Refines the action plan
#workflow.add_node("opcode_counter", plan_opcode_counter_node)
workflow.add_node("block_builder", overall_block_builder_node_2)
workflow.add_node("variable_initializer", variable_adder_node)
workflow.add_node("page_processed", processed_page_node)
workflow.set_entry_point("page_processed")
# Conditional branching from the start
def decide_next_step(state: GameState):
if state.get("processing", False):
return "pseudo_generator"
else:
return "layer_optimizer"#END
workflow.add_conditional_edges(
"page_processed",
decide_next_step,
{
"pseudo_generator": "pseudo_generator",
"layer_optimizer": "layer_optimizer"
#str(END): END # str(END) is '__end__'
}
)
# Main chain
# workflow.add_edge("pseudo_generator", "time_delay_1")
workflow.add_edge("pseudo_generator", "Node_optimizer")
# workflow.add_edge("time_delay_1", "plan_generator")
# workflow.add_edge("plan_generator", "time_delay_2")
# workflow.add_edge("time_delay_2", "refined_planner")
# workflow.add_edge("refined_planner", "time_delay_3")
# workflow.add_edge("time_delay_3", "opcode_counter")
workflow.add_edge("Node_optimizer", "block_builder")
workflow.add_edge("block_builder", "variable_initializer")
# After last node, check again
workflow.add_edge("variable_initializer", "page_processed")
workflow.add_edge("layer_optimizer", END)
app_graph = workflow.compile()
# ============== Helper function to Upscale an Image ============== #
def upscale_image(image: Image.Image, scale: int = 2) -> Image.Image:
"""
Upscales a PIL image by a given scale factor.
"""
try:
width, height = image.size
new_size = (width * scale, height * scale)
upscaled_image = image.resize(new_size, Image.LANCZOS)
logger.info(f"✅ Upscaled image to {new_size}")
return upscaled_image
except Exception as e:
logger.error(f"❌ Error during image upscaling: {str(e)}")
return image
@log_execution_time
def create_sb3_archive(project_folder, project_id):
"""
Zips the project folder and renames it to an .sb3 file.
Args:
project_folder (str): The path to the directory containing the project.json and assets.
project_id (str): The unique ID for the project, used for naming the .sb3 file.
Returns:
str: The path to the created .sb3 file, or None if an error occurred.
"""
print(" --------------------------------------- create_sb3_archive INITIALIZE ---------------------------------------")
# output_filename = os.path.join("outputs", project_id)
#output_filename = OUTPUT_DIR / project_id
output_filename = GEN_PROJECT_DIR / project_id
print(" --------------------------------------- output_filename ---------------------------------------",output_filename)
zip_path = None
sb3_path = None
try:
zip_path = shutil.make_archive(output_filename, 'zip', root_dir=project_folder)
print(" --------------------------------------- zip_path_str ---------------------------------------", output_filename, project_folder)
logger.info(f"Project folder zipped to: {zip_path}")
# 2. Rename the .zip file to .sb3
sb3_path = f"{output_filename}.sb3"
os.rename(zip_path, sb3_path)
print(" --------------------------------------- rename paths ---------------------------------------", zip_path, sb3_path)
logger.info(f"Renamed {zip_path} to {sb3_path}")
return sb3_path
except Exception as e:
logger.error(f"Error creating SB3 archive for {project_id}: {e}")
# Clean up any partial files if an error occurs
if zip_path and os.path.exists(zip_path):
os.remove(zip_path)
if sb3_path and os.path.exists(sb3_path):
os.remove(sb3_path)
return sb3_path
#{ changes -> pdf_stream replacement of pdf_path
# def save_pdf_to_generated_dir(pdf_path: str, project_id: str) -> str:
def save_pdf_to_generated_dir(pdf_stream: io.BytesIO, project_id: str) -> str:
"""
Copies the PDF at `pdf_stream` into GEN_PROJECT_DIR/project_id/,
renaming it to <project_id>.pdf.
Args:
pdf_stream (io.BytesIO): Any existing stream to a PDF file.
project_id (str): Your unique project identifier.
Returns:
str: Path to the copied PDF in the generated directory,
or None if something went wrong.
"""
# }
try:
# 1) Build the destination directory and base filename
output_dir = GEN_PROJECT_DIR / project_id
output_dir.mkdir(parents=True, exist_ok=True)
print(f"\n--------------------------------output_dir {output_dir}")
# 2) Define the target PDF path
target_pdf = output_dir / f"{project_id}.pdf"
print(f"\n--------------------------------target_pdf {target_pdf}")
# 3) Copy the PDF
# {
# shutil.copy2(pdf_path, target_pdf)
if isinstance(pdf_stream, io.BytesIO):
with open(target_pdf, "wb") as f:
f.write(pdf_stream.getbuffer())
else:
shutil.copy2(pdf_stream, target_pdf)
print(f"Copied PDF from {pdf_stream}{target_pdf}")
logger.info(f"Copied PDF from {pdf_stream}{target_pdf}")
# }
return str(target_pdf)
except Exception as e:
logger.error(f"Failed to save PDF to generated dir: {e}", exc_info=True)
return None
@app.route('/')
def index():
return render_template('app_index.html')
@app.route("/download_sb3/<project_id>", methods=["GET"])
def download_sb3(project_id):
sb3_path = GEN_PROJECT_DIR / f"{project_id}.sb3"
if not sb3_path.exists():
return jsonify({"error": "Scratch project file not found"}), 404
return send_file(
sb3_path,
as_attachment=True,
download_name=sb3_path.name
)
@app.route("/download_pdf/<project_id>", methods=["GET"])
def download_pdf(project_id):
pdf_path = GEN_PROJECT_DIR / project_id / f"{project_id}.pdf"
if not pdf_path.exists():
return jsonify({"error": "Scratch project pdf file not found"}), 404
return send_file(
pdf_path,
as_attachment=True,
download_name=pdf_path.name
)
@app.route("/download_sound/<sound_id>", methods=["GET"])
def download_sound(sound_id):
sound_path = SOUND_DIR / f"{sound_id}.wav"
if not sound_path.exists():
return jsonify({"error": "Scratch project sound file not found"}), 404
return send_file(
sound_path,
as_attachment=True,
download_name=sound_path.name
)
# API endpoint
@app.route('/process_pdf', methods=['POST'])
def process_pdf():
try:
logger.info("Received request to process PDF.")
if 'pdf_file' not in request.files:
logger.warning("No PDF file found in request.")
return jsonify({"error": "Missing PDF file in form-data with key 'pdf_file'"}), 400
pdf_file = request.files['pdf_file']
if pdf_file.filename == '':
return jsonify({"error": "Empty filename"}), 400
# ================================================= #
# Generate Random UUID for project folder name #
# ================================================= #
project_id = str(uuid.uuid4()).replace('-', '')
# project_folder = os.path.join("outputs", f"{project_id}")
project_folder = OUTPUT_DIR / project_id
# =========================================================================== #
# Create empty json in project_{random_id} folder #
# =========================================================================== #
#os.makedirs(project_folder, exist_ok=True)
# {
# Save the uploaded PDF temporarily
# filename = secure_filename(pdf_file.filename)
# temp_dir = tempfile.mkdtemp()
# saved_pdf_path = os.path.join(temp_dir, filename)
# pdf_file.save(saved_pdf_path)
# pdf_doc = saved_pdf_path
pdf_bytes = pdf_file.read()
pdf_stream = io.BytesIO(pdf_bytes)
logger.info(f"Saved uploaded PDF to: {pdf_stream}")
# pdf= save_pdf_to_generated_dir(saved_pdf_path, project_id)
start_time = time.time()
pdf= save_pdf_to_generated_dir(pdf_stream, project_id)
# logger.info(f"Created project folder: {project_folder}")
# logger.info(f"Saved uploaded PDF to: {saved_pdf_path}")
logger.info(f"Saved uploaded PDF to: {pdf_file}: {pdf}")
print("--------------------------------pdf_file_path---------------------",pdf_file,pdf_stream)
total_time = time.time() - start_time
print(f"-----------------------------Execution Time save_pdf_to_generated_dir() : {total_time}-----------------------------\n")
# }
# Save uploaded file to disk
# pdf_path = os.path.join("/tmp", secure_filename(file.filename))
# file.save(pdf_path)
# compressed_pages = pdf_to_images_with_size_check(pdf_path, "/tmp/compressed_pages", size_limit_mb=4)
# {
# Extract & process
# output_path, result = extract_images_from_pdf(saved_pdf_path)
start_time = time.time()
output_path = extract_images_from_pdf(pdf_stream)
print(" --------------------------------------- zip_path_str ---------------------------------------", output_path)
total_time = time.time() - start_time
print(f"-----------------------------Execution Time extract_images_from_pdf() : {total_time}-----------------------------\n")
# }
# Check extracted_sprites.json for "scratch block" in any 'name'
# extracted_dir = os.path.join(JSON_DIR, os.path.splitext(filename)[0])
# extracted_sprites_json = os.path.join(extracted_dir, "extracted_sprites.json")
# if not os.path.exists(extracted_sprites_json):
# return jsonify({"error": "No extracted_sprites.json found"}), 500
# with open(extracted_sprites_json, 'r') as f:
# sprite_data = json.load(f)
start_time = time.time()
project_output = similarity_matching(output_path, project_folder)
logger.info("Received request to process PDF.")
total_time = time.time() - start_time
print(f"-----------------------------Execution Time similarity_matching() : {total_time}-----------------------------\n")
with open(project_output, 'r') as f:
project_skeleton = json.load(f)
# images = convert_from_path(pdf_stream, dpi=300)
# print(type)
# page = images[0]
# # img_base64 = base64.b64encode(images).decode("utf-8")
# buf = BytesIO()
# page.save(buf, format="PNG")
# img_bytes = buf.getvalue()
# img_b64 = base64.b64encode(img_bytes).decode("utf-8")
#image_paths = await convert_pdf_to_images_async(saved_pdf_path)
# images = convert_bytes_to_image(pdf_stream, dpi=250)
# print("PDF converted to images:", images)
if isinstance(pdf_stream, io.BytesIO):
images = convert_pdf_stream_to_images(pdf_stream, dpi=300)
else:
images = convert_from_path(pdf_stream, dpi=300)
#updating logic here [Dev Patel]
initial_state_dict = {
"project_json": project_skeleton,
"description": "The pseudo code for the script",
"project_id": project_id,
# "project_image": img_b64,
"project_image": images,
"action_plan": {},
"pseudo_code": {},
"temporary_node": {},
"processing":True,
"page_count": 0,
"temp_pseudo_code":[],
}
#final_state_dict = app_graph.invoke(initial_state_dict) # Pass dictionary
final_state_dict = app_graph.invoke(initial_state_dict,config={"recursion_limit": 200})
final_project_json = final_state_dict['project_json'] # Access as dict
# final_project_json = project_skeleton
# Save the *final* filled project JSON, overwriting the skeleton
with open(project_output, "w") as f:
json.dump(final_project_json, f, indent=2)
logger.info(f"Final project JSON saved to {project_output}")
# --- Call the new function to create the .sb3 file ---
sb3_file_path = create_sb3_archive(project_folder, project_id)
if sb3_file_path:
logger.info(f"Successfully created SB3 file: {sb3_file_path}")
# Instead of returning the local path, return a URL to the download endpoint
download_url = f"https://prthm11-scratch-vision-game.hf.space/download_sb3/{project_id}"
pdf_url = f"https://prthm11-scratch-vision-game.hf.space/download_pdf/{project_id}"
print(f"DOWNLOAD_URL: {download_url}")
print(f"PDF_URL: {pdf_url}")
# return jsonify({"message": "Procesed PDF and Game sb3 generated successfully", "project_id": project_id, "download_url": download_url})
return jsonify({
"message": "✅ PDF processed successfully",
"isError": False,
"output_json": "output_path",
"sprites": "result",
"project_output_json": "project_output",
"test_url": download_url
})
else:
return jsonify({
"message": "❌ Scanned images are not clear please retry!",
"isError": True,
"output_json": "output_path",
"sprites": "result",
"project_output_json": "project_output",
"test_url": download_url
}), 500
except Exception as e:
logger.error(f"Error during processing the pdf workflow for project ID {project_id}: {e}", exc_info=True)
return jsonify({
"message": "❌ Scanned images are not clear please retry!",
"isError": True,
"output_json": "output_path",
"sprites": "result",
"project_output_json": "project_output",
"test_url": "download_url"
}), 500
if __name__ == '__main__':
# os.makedirs("outputs", exist_ok=True) #== commented by P
app.run(host='0.0.0.0', port=7860, debug=True)