Scratch_Vision_Game / app_main.py
prthm11's picture
Update app_main.py
584895d verified
raw
history blame
25.9 kB
from flask import Flask, render_template, Response, flash, redirect, url_for, request, jsonify, send_from_directory
import cv2
import numpy as np
from unstructured.partition.pdf import partition_pdf
import json
import base64
import io
import os
from PIL import Image, ImageEnhance, ImageDraw
from imutils.perspective import four_point_transform
from dotenv import load_dotenv
import pytesseract
from transformers import AutoProcessor, AutoModelForImageTextToText, AutoModelForVision2Seq
from langchain_community.document_loaders.image_captions import ImageCaptionLoader
from werkzeug.utils import secure_filename
import tempfile
import torch
from langchain_groq import ChatGroq
from langgraph.prebuilt import create_react_agent
import logging, time
# Configure logging
logging.basicConfig(
level=logging.DEBUG, # Use INFO or ERROR in production
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[
logging.FileHandler("app.log"),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
load_dotenv()
# os.environ["GROQ_API_KEY"] = os.getenv("GROQ_API_KEY")
groq_api_key = os.getenv("GROQ_API_KEY")
llm = ChatGroq(
model="meta-llama/llama-4-maverick-17b-128e-instruct",
temperature=0,
max_tokens=None,
)
app = Flask(__name__)
pytesseract.pytesseract.tesseract_cmd = r"C:\Program Files\Tesseract-OCR\tesseract.exe"
poppler_path = r"C:\poppler-23.11.0\Library\bin"
count = 0
OUTPUT_FOLDER = "OUTPUTS"
DETECTED_IMAGE_FOLDER_PATH = os.path.join(OUTPUT_FOLDER, "DETECTED_IMAGE")
IMAGE_FOLDER_PATH = os.path.join(OUTPUT_FOLDER, "SCANNED_IMAGE")
JSON_FOLDER_PATH = os.path.join(OUTPUT_FOLDER, "EXTRACTED_JSON")
for path in [OUTPUT_FOLDER, IMAGE_FOLDER_PATH, DETECTED_IMAGE_FOLDER_PATH, JSON_FOLDER_PATH]:
os.makedirs(path, exist_ok=True)
# Model Initialization
try:
smolvlm256m_processor = AutoProcessor.from_pretrained(
"HuggingFaceTB/SmolVLM-256M-Instruct")
# smolvlm256m_model = AutoModelForImageTextToText.from_pretrained("HuggingFaceTB/SmolVLM-256M-Instruct").to("cpu")
smolvlm256m_model = AutoModelForVision2Seq.from_pretrained(
"HuggingFaceTB/SmolVLM-256M-Instruct",
torch_dtype=torch.bfloat16 if hasattr(
torch, "bfloat16") else torch.float32,
_attn_implementation="eager"
).to("cpu")
except Exception as e:
raise RuntimeError(f"❌ Failed to load SmolVLM model: {str(e)}")
# SmolVLM Image Captioning functioning
def get_smolvlm_caption(image: Image.Image, prompt: str = "") -> str:
try:
# Ensure exactly one <image> token
if "<image>" not in prompt:
prompt = f"<image> {prompt.strip()}"
num_image_tokens = prompt.count("<image>")
if num_image_tokens != 1:
raise ValueError(
f"Prompt must contain exactly 1 <image> token. Found {num_image_tokens}")
inputs = smolvlm256m_processor(
images=[image], text=[prompt], return_tensors="pt").to("cpu")
output_ids = smolvlm256m_model.generate(**inputs, max_new_tokens=100)
return smolvlm256m_processor.decode(output_ids[0], skip_special_tokens=True)
except Exception as e:
return f"❌ Error during caption generation: {str(e)}"
# --- FUNCTION: Extract images from saved PDF ---
def extract_images_from_pdf(pdf_path, output_json_path):
''' Extract images from PDF and generate structured sprite JSON '''
try:
pdf_filename = os.path.splitext(os.path.basename(pdf_path))[
0] # e.g., "scratch_crab"
pdf_dir_path = os.path.dirname(pdf_path).replace("/", "\\")
# Create subfolders
extracted_image_subdir = os.path.join(
DETECTED_IMAGE_FOLDER_PATH, pdf_filename)
json_subdir = os.path.join(JSON_FOLDER_PATH, pdf_filename)
os.makedirs(extracted_image_subdir, exist_ok=True)
os.makedirs(json_subdir, exist_ok=True)
# Output paths
output_json_path = os.path.join(json_subdir, "extracted.json")
final_json_path = os.path.join(json_subdir, "extracted_sprites.json")
try:
elements = partition_pdf(
filename=pdf_path,
strategy="hi_res",
extract_image_block_types=["Image"],
extract_image_block_to_payload=True, # Set to True to get base64 in output
)
except Exception as e:
raise RuntimeError(
f"❌ Failed to extract images from PDF: {str(e)}")
try:
start_time = time.perf_counter()
with open(output_json_path, "w") as f:
json.dump([element.to_dict()
for element in elements], f, indent=4)
elapsed = time.perf_counter() - start_time
logger.info(f"βœ… extracted.json write in {elapsed:.2f} seconds")
except Exception as e:
raise RuntimeError(f"❌ Failed to write extracted.json: {str(e)}")
try:
# Display extracted images
with open(output_json_path, 'r') as file:
file_elements = json.load(file)
except Exception as e:
raise RuntimeError(f"❌ Failed to read extracted.json: {str(e)}")
# Prepare manipulated sprite JSON structure
manipulated_json = {}
# SET A SYSTEM PROMPT
system_prompt = """
You are an expert in visual scene understanding.
Your Job is to analyze an image and respond acoording if asked for name give simple name by analyzing it and if ask for descrption generate a short description covering its elements.
Guidelines:
- Focus only the images given in Square Shape.
- Don't Consider Blank areas in Image as.
- Don't include generic summary or explanation outside the fields.
Return only string.
"""
agent = create_react_agent(
model=llm,
tools=[],
prompt=system_prompt
)
# If JSON already exists, load it and find the next available Sprite number
if os.path.exists(final_json_path):
with open(final_json_path, "r") as existing_file:
manipulated = json.load(existing_file)
# Determine the next available index (e.g., Sprite 4 if 1–3 already exist)
existing_keys = [int(k.replace("Sprite ", ""))
for k in manipulated.keys()]
start_count = max(existing_keys, default=0) + 1
else:
start_count = 1
sprite_count = start_count
start_time = time.perf_counter()
for i, element in enumerate(file_elements):
if "image_base64" in element["metadata"]:
try:
image_data = base64.b64decode(
element["metadata"]["image_base64"])
image = Image.open(io.BytesIO(image_data)).convert("RGB")
image.show(title=f"Extracted Image {i+1}")
image_path = os.path.join(
extracted_image_subdir, f"Sprite_{i+1}.png")
image.save(image_path)
with open(image_path, "rb") as image_file:
image_bytes = image_file.read()
img_base64 = base64.b64encode(image_bytes).decode("utf-8")
# description = get_smolvlm_caption(image, prompt="Give a brief Description")
# name = get_smolvlm_caption(image, prompt="give a short name/title of this Image.")
def clean_caption_output(raw_output: str, prompt: str) -> str:
answer = raw_output.replace(prompt, '').replace(
"<image>", '').strip(" :-\n")
return answer
prompt_description = "Give a brief Captioning."
prompt_name = "give a short name caption of this Image."
content1 = [
{
"type": "text",
"text": f"{prompt_description}"
},
{
"type": "image_url",
"image_url": {
"url": f"data:image/jpeg;base64,{img_base64}"
}
}
]
response1 = agent.invoke(
{"messages": [{"role": "user", "content": content1}]})
print(response1)
description = response1["messages"][-1].content
content2 = [
{
"type": "text",
"text": f"{prompt_name}"
},
{
"type": "image_url",
"image_url": {
"url": f"data:image/jpeg;base64,{img_base64}"
}
}
]
response2 = agent.invoke(
{"messages": [{"role": "user", "content": content2}]})
print(response2)
name = response2["messages"][-1].content
# raw_description = get_smolvlm_caption(image, prompt=prompt_description)
# raw_name = get_smolvlm_caption(image, prompt=prompt_name)
# description = clean_caption_output(raw_description, prompt_description)
# name = clean_caption_output(raw_name, prompt_name)
manipulated_json[f"Sprite {sprite_count}"] = {
"name": name,
"base64": element["metadata"]["image_base64"],
"file-path": pdf_dir_path,
"description": description
}
sprite_count += 1
except Exception as e:
print(f"⚠️ Error processing Sprite {i+1}: {str(e)}")
elapsed = time.perf_counter() - start_time
logger.info(f"βœ… extracted_sprites.json write in {elapsed:.2f} seconds")
# Save manipulated JSON
with open(final_json_path, "w") as sprite_file:
json.dump(manipulated_json, sprite_file, indent=4)
print(f"βœ… Manipulated sprite JSON saved: {final_json_path}")
return final_json_path, manipulated_json
except Exception as e:
raise RuntimeError(f"❌ Error in extract_images_from_pdf: {str(e)}")
def similarity_matching(input_json_path: str) -> str:
import uuid
import shutil
import tempfile
from langchain_experimental.open_clip.open_clip import OpenCLIPEmbeddings
from matplotlib.offsetbox import OffsetImage, AnnotationBbox
from io import BytesIO
logger.info("πŸ” Running similarity matching...")
# ============================== #
# DEFINE PATHS #
# ============================== #
backdrop_images_path = os.getenv("BACKDROP_FOLDER_PATH", "/app/reference/backdrops")
sprite_images_path = os.getenv("SPRITE_FOLDER_PATH", "/app/reference/sprites")
image_dirs = [backdrop_images_path, sprite_images_path]
# ================================================= #
# Generate Random UUID for project folder name #
# ================================================= #
random_id = str(uuid.uuid4()).replace('-', '')
project_folder = os.path.join("outputs", f"project_{random_id}")
# =========================================================================== #
# Create empty json in project_{random_id} folder #
# =========================================================================== #
os.makedirs(project_folder, exist_ok=True)
project_json_path = os.path.join(project_folder, "project.json")
# ============================== #
# READ SPRITE METADATA #
# ============================== #
with open(input_json_path, 'r') as f:
sprites_data = json.load(f)
sprite_ids, texts, sprite_base64 = [], [], []
start_time = time.perf_counter()
for sid, sprite in sprites_data.items():
sprite_ids.append(sid)
texts.append(
"This is " + sprite.get("description", sprite.get("name", "")))
sprite_base64.append(sprite["base64"])
elapsed = time.perf_counter() - start_time
logger.info(f"βœ… Append Sprite's Name and Description in {elapsed:.2f} seconds")
# ============================== #
# INITIALIZE CLIP EMBEDDER #
# ============================== #
clip_embd = OpenCLIPEmbeddings()
# # ========================================= #
# # Walk folders to collect all image paths #
# # ========================================= #
folder_image_paths = []
for image_dir in image_dirs:
for root, _, files in os.walk(image_dir):
for fname in files:
if fname.lower().endswith((".png", ".jpg", ".jpeg")):
folder_image_paths.append(os.path.join(root, fname))
# # ============================== #
# # EMBED FOLDER IMAGES (REF) #
# # ============================== #
# img_features = clip_embd.embed_image(folder_image_paths)
# # ============================== #
# # Store image embeddings #
# # ============================== #
# embedding_json = []
# for i, path in enumerate(folder_image_paths):
# embedding_json.append({
# "name":os.path.basename(path),
# "file-path": path,
# "embeddings": list(img_features[i])
# })
# # Save to embeddings.json
# with open(f"{OUTPUT_FOLDER}/embeddings.json", "w") as f:
# json.dump(embedding_json, f, indent=2)
# ============================== #
# DECODE SPRITE IMAGES #
# ============================== #
temp_dir = tempfile.mkdtemp()
sprite_image_paths = []
start_time = time.perf_counter()
for idx, b64 in enumerate(sprite_base64):
image_data = base64.b64decode(b64.split(",")[-1])
img = Image.open(BytesIO(image_data)).convert("RGB")
temp_path = os.path.join(temp_dir, f"sprite_{idx}.png")
img.save(temp_path)
sprite_image_paths.append(temp_path)
elapsed = time.perf_counter() - start_time
logger.info(f"βœ… Decoded Sprite Base64 in {elapsed:.2f} seconds")
# ============================== #
# EMBED SPRITE IMAGES #
# ============================== #
sprite_features = clip_embd.embed_image(sprite_image_paths)
# ============================== #
# COMPUTE SIMILARITIES #
# ============================== #
with open(f"{OUTPUT_FOLDER}/embeddings.json", "r") as f:
embedding_json = json.load(f)
img_matrix = np.array([img["embeddings"] for img in embedding_json])
sprite_matrix = np.array(sprite_features)
if sprite_matrix.size == 0 or img_matrix.size == 0:
raise RuntimeError("❌ No valid embeddings found for sprites or reference images.")
try:
similarity = np.matmul(sprite_matrix, img_matrix.T)
except ValueError as ve:
if "matmul" in str(ve) and "size" in str(ve):
logger.error("❌ Matrix multiplication failed due to shape mismatch. Likely due to empty or invalid embeddings.")
raise RuntimeError("Matrix shape mismatch: CLIP embedding input is invalid or empty.")
else:
raise
most_similar_indices = np.argmax(similarity, axis=1)
# ============= Match and copy ================
project_data = []
copied_folders = set()
# =============================================================== #
# Loop through most similar images from Sprites folder #
# β†’ Copy sprite assets (excluding matched image + sprite.json) #
# β†’ Load sprite.json and append its data to project_data #
# =============================================================== #
for sprite_idx, matched_idx in enumerate(most_similar_indices):
matched_image_path = folder_image_paths[matched_idx]
matched_image_path = os.path.normpath(matched_image_path)
matched_folder = os.path.dirname(matched_image_path)
folder_name = os.path.basename(matched_folder)
if matched_folder in copied_folders:
continue
copied_folders.add(matched_folder)
logger.info(f"Matched image path: {matched_image_path}")
sprite_json_path = os.path.join(matched_folder, 'sprite.json')
if not os.path.exists(sprite_json_path):
logger.warning(f"sprite.json not found in: {matched_folder}")
continue
with open(sprite_json_path, 'r') as f:
sprite_data = json.load(f)
print(f"SPRITE DATA: \n{sprite_data}")
# Copy only non-matched files
for fname in os.listdir(matched_folder):
fpath = os.path.join(matched_folder, fname)
if os.path.isfile(fpath) and fname not in {os.path.basename(matched_image_path), 'sprite.json'}:
shutil.copy2(fpath, os.path.join(project_folder, fname))
logger.info(f"Copied Sprite asset: {fname}")
project_data.append(sprite_data)
# ================================================================== #
# Loop through most similar images from Backdrops folder #
# β†’ Copy Backdrop assets (excluding matched image + project.json) #
# β†’ Load project.json and append its data to project_data #
# ================================================================== #
backdrop_data = [] # for backdrop-related entries
for backdrop_idx, matched_idx in enumerate(most_similar_indices):
matched_image_path = os.path.normpath(folder_image_paths[matched_idx])
# Check if the match is from the Backdrops folder
if matched_image_path.startswith(os.path.normpath(backdrop_images_path)):
matched_folder = os.path.dirname(matched_image_path)
folder_name = os.path.basename(matched_folder)
logger.info(f"Backdrop matched image: {matched_image_path}")
# Copy only non-matched files
for fname in os.listdir(matched_folder):
fpath = os.path.join(matched_folder, fname)
if os.path.isfile(fpath) and fname not in {os.path.basename(matched_image_path), 'project.json'}:
shutil.copy2(fpath, os.path.join(project_folder, fname))
logger.info(f"Copied Backdrop asset: {fname}")
# Append backdrop's project.json
backdrop_json_path = os.path.join(matched_folder, 'project.json')
if os.path.exists(backdrop_json_path):
with open(backdrop_json_path, 'r') as f:
backdrop_json_data = json.load(f)
print(f"SPRITE DATA: \n{backdrop_json_data}")
if "targets" in backdrop_json_data:
for target in backdrop_json_data["targets"]:
if target.get("isStage") == True:
backdrop_data.append(target)
else:
logger.warning(f"project.json not found in: {matched_folder}")
# project_data, backdrop_data = [], []
# copied_folders = set()
# start_time = time.perf_counter()
# for sprite_idx, matched_idx in enumerate(most_similar_indices):
# matched_entry = embedding_json[matched_idx]
# # matched_image_path = os.path.normpath(folder_image_paths[matched_idx])
# matched_image_path = os.path.normpath(matched_entry["file-path"])
# matched_folder = os.path.dirname(matched_image_path)
# if matched_folder in copied_folders:
# continue
# copied_folders.add(matched_folder)
# # Sprite
# sprite_json_path = os.path.join(matched_folder, 'sprite.json')
# if os.path.exists(sprite_json_path):
# with open(sprite_json_path, 'r') as f:
# sprite_data = json.load(f)
# project_data.append(sprite_data)
# for fname in os.listdir(matched_folder):
# if fname not in {os.path.basename(matched_image_path), 'sprite.json'}:
# shutil.copy2(os.path.join(
# matched_folder, fname), project_folder)
# # Backdrop
# if matched_image_path.startswith(os.path.normpath(backdrop_images_path)):
# backdrop_json_path = os.path.join(matched_folder, 'project.json')
# if os.path.exists(backdrop_json_path):
# with open(backdrop_json_path, 'r') as f:
# backdrop_json_data = json.load(f)
# for target in backdrop_json_data.get("targets", []):
# if target.get("isStage"):
# backdrop_data.append(target)
# for fname in os.listdir(matched_folder):
# if fname not in {os.path.basename(matched_image_path), 'project.json'}:
# shutil.copy2(os.path.join(
# matched_folder, fname), project_folder)
# Merge JSON structure
final_project = {
"targets": [],
"monitors": [],
"extensions": [],
"meta": {
"semver": "3.0.0",
"vm": "11.3.0",
"agent": "OpenAI ScratchVision Agent"
}
}
start_time = time.perf_counter()
for sprite in project_data:
if not sprite.get("isStage", False):
final_project["targets"].append(sprite)
elapsed = time.perf_counter() - start_time
logger.info(f"βœ… Append sprite 'targets' in {elapsed:.2f} seconds")
if backdrop_data:
all_costumes, sounds = [], []
for idx, bd in enumerate(backdrop_data):
all_costumes.extend(bd.get("costumes", []))
if idx == 0 and "sounds" in bd:
sounds = bd["sounds"]
final_project["targets"].append({
"isStage": True,
"name": "Stage",
"variables": {},
"lists": {},
"broadcasts": {},
"blocks": {},
"comments": {},
"currentCostume": 1 if len(all_costumes) > 1 else 0,
"costumes": all_costumes,
"sounds": sounds,
"volume": 100,
"layerOrder": 0,
"tempo": 60,
"videoTransparency": 50,
"videoState": "on",
"textToSpeechLanguage": None
})
with open(project_json_path, 'w') as f:
json.dump(final_project, f, indent=2)
logger.info(f"πŸŽ‰ Final project saved: {project_json_path}")
return project_json_path
@app.route('/')
def index():
return render_template('app_index.html')
# API endpoint
@app.route('/process_pdf', methods=['POST'])
def process_pdf():
try:
logger.info("Received request to process PDF.")
if 'pdf_file' not in request.files:
logger.warning("No PDF file found in request.")
return jsonify({"error": "Missing PDF file in form-data with key 'pdf_file'"}), 400
pdf_file = request.files['pdf_file']
if pdf_file.filename == '':
return jsonify({"error": "Empty filename"}), 400
# Save the uploaded PDF temporarily
filename = secure_filename(pdf_file.filename)
temp_dir = tempfile.mkdtemp()
saved_pdf_path = os.path.join(temp_dir, filename)
pdf_file.save(saved_pdf_path)
logger.info(f"Saved uploaded PDF to: {saved_pdf_path}")
# Extract & process
json_path = None
#0output_path, result = extract_images_from_pdf(
# saved_pdf_path, json_path)
#project_output = similarity_matching(output_path)
logger.info("Received request to process PDF.")
return jsonify({
"message": "βœ… PDF processed successfully",
"output_json": "output_path",
"sprites": "result",
"project_output_json": "project_output",
"test_url":r"https://prthm11-scratch-vision-game.hf.space/download_sb3/Event_test"
})
except Exception as e:
logger.exception("❌ Failed to process PDF")
return jsonify({"error": f"❌ Failed to process PDF: {str(e)}"}), 500
# --- New endpoint to download the .sb3 file ---
@app.route("/download_sb3/<project_id>", methods=["GET"])
def download_sb3(project_id):
"""
Allows users to download the generated .sb3 Scratch project file.
"""
sb3_filename = f"{project_id}.sb3"
sb3_filepath = os.path.join("game_samples", sb3_filename)
try:
if os.path.exists(sb3_filepath):
logger.info(f"Serving SB3 file for project ID: {project_id}")
# send_from_directory serves the file and handles content-disposition for download
return send_from_directory(
directory="game_samples",
path=sb3_filename,
as_attachment=True, # This makes the browser download the file
download_name=sb3_filename # This sets the filename for the download
)
else:
logger.warning(f"SB3 file not found for ID: {project_id}")
return jsonify({"error": "Scratch project file not found"}), 404
except Exception as e:
logger.error(f"Error serving SB3 file for ID {project_id}: {e}")
return jsonify({"error": "Failed to retrieve Scratch project file"}), 500
if __name__ == '__main__':
app.run(host='0.0.0.0', port=7860, debug=True)