from flask import Flask, render_template, Response, flash, redirect, url_for, request, jsonify, send_from_directory import cv2 import numpy as np from unstructured.partition.pdf import partition_pdf import json import base64 import io import os from PIL import Image, ImageEnhance, ImageDraw from imutils.perspective import four_point_transform from dotenv import load_dotenv import pytesseract from transformers import AutoProcessor, AutoModelForImageTextToText, AutoModelForVision2Seq from langchain_community.document_loaders.image_captions import ImageCaptionLoader from werkzeug.utils import secure_filename import tempfile import torch from langchain_groq import ChatGroq from langgraph.prebuilt import create_react_agent import logging, time # Configure logging logging.basicConfig( level=logging.DEBUG, # Use INFO or ERROR in production format="%(asctime)s [%(levelname)s] %(message)s", handlers=[ logging.FileHandler("app.log"), logging.StreamHandler() ] ) logger = logging.getLogger(__name__) load_dotenv() # os.environ["GROQ_API_KEY"] = os.getenv("GROQ_API_KEY") groq_api_key = os.getenv("GROQ_API_KEY") llm = ChatGroq( model="meta-llama/llama-4-maverick-17b-128e-instruct", temperature=0, max_tokens=None, ) app = Flask(__name__) pytesseract.pytesseract.tesseract_cmd = r"C:\Program Files\Tesseract-OCR\tesseract.exe" poppler_path = r"C:\poppler-23.11.0\Library\bin" count = 0 OUTPUT_FOLDER = "OUTPUTS" DETECTED_IMAGE_FOLDER_PATH = os.path.join(OUTPUT_FOLDER, "DETECTED_IMAGE") IMAGE_FOLDER_PATH = os.path.join(OUTPUT_FOLDER, "SCANNED_IMAGE") JSON_FOLDER_PATH = os.path.join(OUTPUT_FOLDER, "EXTRACTED_JSON") for path in [OUTPUT_FOLDER, IMAGE_FOLDER_PATH, DETECTED_IMAGE_FOLDER_PATH, JSON_FOLDER_PATH]: os.makedirs(path, exist_ok=True) # Model Initialization try: smolvlm256m_processor = AutoProcessor.from_pretrained( "HuggingFaceTB/SmolVLM-256M-Instruct") # smolvlm256m_model = AutoModelForImageTextToText.from_pretrained("HuggingFaceTB/SmolVLM-256M-Instruct").to("cpu") smolvlm256m_model = AutoModelForVision2Seq.from_pretrained( "HuggingFaceTB/SmolVLM-256M-Instruct", torch_dtype=torch.bfloat16 if hasattr( torch, "bfloat16") else torch.float32, _attn_implementation="eager" ).to("cpu") except Exception as e: raise RuntimeError(f"❌ Failed to load SmolVLM model: {str(e)}") # SmolVLM Image Captioning functioning def get_smolvlm_caption(image: Image.Image, prompt: str = "") -> str: try: # Ensure exactly one token if "" not in prompt: prompt = f" {prompt.strip()}" num_image_tokens = prompt.count("") if num_image_tokens != 1: raise ValueError( f"Prompt must contain exactly 1 token. Found {num_image_tokens}") inputs = smolvlm256m_processor( images=[image], text=[prompt], return_tensors="pt").to("cpu") output_ids = smolvlm256m_model.generate(**inputs, max_new_tokens=100) return smolvlm256m_processor.decode(output_ids[0], skip_special_tokens=True) except Exception as e: return f"❌ Error during caption generation: {str(e)}" # --- FUNCTION: Extract images from saved PDF --- def extract_images_from_pdf(pdf_path, output_json_path): ''' Extract images from PDF and generate structured sprite JSON ''' try: pdf_filename = os.path.splitext(os.path.basename(pdf_path))[ 0] # e.g., "scratch_crab" pdf_dir_path = os.path.dirname(pdf_path).replace("/", "\\") # Create subfolders extracted_image_subdir = os.path.join( DETECTED_IMAGE_FOLDER_PATH, pdf_filename) json_subdir = os.path.join(JSON_FOLDER_PATH, pdf_filename) os.makedirs(extracted_image_subdir, exist_ok=True) os.makedirs(json_subdir, exist_ok=True) # Output paths output_json_path = os.path.join(json_subdir, "extracted.json") final_json_path = os.path.join(json_subdir, "extracted_sprites.json") try: elements = partition_pdf( filename=pdf_path, strategy="hi_res", extract_image_block_types=["Image"], extract_image_block_to_payload=True, # Set to True to get base64 in output ) except Exception as e: raise RuntimeError( f"❌ Failed to extract images from PDF: {str(e)}") try: start_time = time.perf_counter() with open(output_json_path, "w") as f: json.dump([element.to_dict() for element in elements], f, indent=4) elapsed = time.perf_counter() - start_time logger.info(f"✅ extracted.json write in {elapsed:.2f} seconds") except Exception as e: raise RuntimeError(f"❌ Failed to write extracted.json: {str(e)}") try: # Display extracted images with open(output_json_path, 'r') as file: file_elements = json.load(file) except Exception as e: raise RuntimeError(f"❌ Failed to read extracted.json: {str(e)}") # Prepare manipulated sprite JSON structure manipulated_json = {} # SET A SYSTEM PROMPT system_prompt = """ You are an expert in visual scene understanding. Your Job is to analyze an image and respond acoording if asked for name give simple name by analyzing it and if ask for descrption generate a short description covering its elements. Guidelines: - Focus only the images given in Square Shape. - Don't Consider Blank areas in Image as. - Don't include generic summary or explanation outside the fields. Return only string. """ agent = create_react_agent( model=llm, tools=[], prompt=system_prompt ) # If JSON already exists, load it and find the next available Sprite number if os.path.exists(final_json_path): with open(final_json_path, "r") as existing_file: manipulated = json.load(existing_file) # Determine the next available index (e.g., Sprite 4 if 1–3 already exist) existing_keys = [int(k.replace("Sprite ", "")) for k in manipulated.keys()] start_count = max(existing_keys, default=0) + 1 else: start_count = 1 sprite_count = start_count start_time = time.perf_counter() for i, element in enumerate(file_elements): if "image_base64" in element["metadata"]: try: image_data = base64.b64decode( element["metadata"]["image_base64"]) image = Image.open(io.BytesIO(image_data)).convert("RGB") image.show(title=f"Extracted Image {i+1}") image_path = os.path.join( extracted_image_subdir, f"Sprite_{i+1}.png") image.save(image_path) with open(image_path, "rb") as image_file: image_bytes = image_file.read() img_base64 = base64.b64encode(image_bytes).decode("utf-8") # description = get_smolvlm_caption(image, prompt="Give a brief Description") # name = get_smolvlm_caption(image, prompt="give a short name/title of this Image.") def clean_caption_output(raw_output: str, prompt: str) -> str: answer = raw_output.replace(prompt, '').replace( "", '').strip(" :-\n") return answer prompt_description = "Give a brief Captioning." prompt_name = "give a short name caption of this Image." content1 = [ { "type": "text", "text": f"{prompt_description}" }, { "type": "image_url", "image_url": { "url": f"data:image/jpeg;base64,{img_base64}" } } ] response1 = agent.invoke( {"messages": [{"role": "user", "content": content1}]}) print(response1) description = response1["messages"][-1].content content2 = [ { "type": "text", "text": f"{prompt_name}" }, { "type": "image_url", "image_url": { "url": f"data:image/jpeg;base64,{img_base64}" } } ] response2 = agent.invoke( {"messages": [{"role": "user", "content": content2}]}) print(response2) name = response2["messages"][-1].content # raw_description = get_smolvlm_caption(image, prompt=prompt_description) # raw_name = get_smolvlm_caption(image, prompt=prompt_name) # description = clean_caption_output(raw_description, prompt_description) # name = clean_caption_output(raw_name, prompt_name) manipulated_json[f"Sprite {sprite_count}"] = { "name": name, "base64": element["metadata"]["image_base64"], "file-path": pdf_dir_path, "description": description } sprite_count += 1 except Exception as e: print(f"⚠️ Error processing Sprite {i+1}: {str(e)}") elapsed = time.perf_counter() - start_time logger.info(f"✅ extracted_sprites.json write in {elapsed:.2f} seconds") # Save manipulated JSON with open(final_json_path, "w") as sprite_file: json.dump(manipulated_json, sprite_file, indent=4) print(f"✅ Manipulated sprite JSON saved: {final_json_path}") return final_json_path, manipulated_json except Exception as e: raise RuntimeError(f"❌ Error in extract_images_from_pdf: {str(e)}") def similarity_matching(input_json_path: str) -> str: import uuid import shutil import tempfile from langchain_experimental.open_clip.open_clip import OpenCLIPEmbeddings from matplotlib.offsetbox import OffsetImage, AnnotationBbox from io import BytesIO logger.info("🔍 Running similarity matching...") # ============================== # # DEFINE PATHS # # ============================== # backdrop_images_path = os.getenv("BACKDROP_FOLDER_PATH", "/app/reference/backdrops") sprite_images_path = os.getenv("SPRITE_FOLDER_PATH", "/app/reference/sprites") image_dirs = [backdrop_images_path, sprite_images_path] # ================================================= # # Generate Random UUID for project folder name # # ================================================= # random_id = str(uuid.uuid4()).replace('-', '') project_folder = os.path.join("outputs", f"project_{random_id}") # =========================================================================== # # Create empty json in project_{random_id} folder # # =========================================================================== # os.makedirs(project_folder, exist_ok=True) project_json_path = os.path.join(project_folder, "project.json") # ============================== # # READ SPRITE METADATA # # ============================== # with open(input_json_path, 'r') as f: sprites_data = json.load(f) sprite_ids, texts, sprite_base64 = [], [], [] start_time = time.perf_counter() for sid, sprite in sprites_data.items(): sprite_ids.append(sid) texts.append( "This is " + sprite.get("description", sprite.get("name", ""))) sprite_base64.append(sprite["base64"]) elapsed = time.perf_counter() - start_time logger.info(f"✅ Append Sprite's Name and Description in {elapsed:.2f} seconds") # ============================== # # INITIALIZE CLIP EMBEDDER # # ============================== # clip_embd = OpenCLIPEmbeddings() # # ========================================= # # # Walk folders to collect all image paths # # # ========================================= # folder_image_paths = [] for image_dir in image_dirs: for root, _, files in os.walk(image_dir): for fname in files: if fname.lower().endswith((".png", ".jpg", ".jpeg")): folder_image_paths.append(os.path.join(root, fname)) # # ============================== # # # EMBED FOLDER IMAGES (REF) # # # ============================== # # img_features = clip_embd.embed_image(folder_image_paths) # # ============================== # # # Store image embeddings # # # ============================== # # embedding_json = [] # for i, path in enumerate(folder_image_paths): # embedding_json.append({ # "name":os.path.basename(path), # "file-path": path, # "embeddings": list(img_features[i]) # }) # # Save to embeddings.json # with open(f"{OUTPUT_FOLDER}/embeddings.json", "w") as f: # json.dump(embedding_json, f, indent=2) # ============================== # # DECODE SPRITE IMAGES # # ============================== # temp_dir = tempfile.mkdtemp() sprite_image_paths = [] start_time = time.perf_counter() for idx, b64 in enumerate(sprite_base64): image_data = base64.b64decode(b64.split(",")[-1]) img = Image.open(BytesIO(image_data)).convert("RGB") temp_path = os.path.join(temp_dir, f"sprite_{idx}.png") img.save(temp_path) sprite_image_paths.append(temp_path) elapsed = time.perf_counter() - start_time logger.info(f"✅ Decoded Sprite Base64 in {elapsed:.2f} seconds") # ============================== # # EMBED SPRITE IMAGES # # ============================== # sprite_features = clip_embd.embed_image(sprite_image_paths) # ============================== # # COMPUTE SIMILARITIES # # ============================== # with open(f"{OUTPUT_FOLDER}/embeddings.json", "r") as f: embedding_json = json.load(f) img_matrix = np.array([img["embeddings"] for img in embedding_json]) sprite_matrix = np.array(sprite_features) if sprite_matrix.size == 0 or img_matrix.size == 0: raise RuntimeError("❌ No valid embeddings found for sprites or reference images.") try: similarity = np.matmul(sprite_matrix, img_matrix.T) except ValueError as ve: if "matmul" in str(ve) and "size" in str(ve): logger.error("❌ Matrix multiplication failed due to shape mismatch. Likely due to empty or invalid embeddings.") raise RuntimeError("Matrix shape mismatch: CLIP embedding input is invalid or empty.") else: raise most_similar_indices = np.argmax(similarity, axis=1) # ============= Match and copy ================ project_data = [] copied_folders = set() # =============================================================== # # Loop through most similar images from Sprites folder # # → Copy sprite assets (excluding matched image + sprite.json) # # → Load sprite.json and append its data to project_data # # =============================================================== # for sprite_idx, matched_idx in enumerate(most_similar_indices): matched_image_path = folder_image_paths[matched_idx] matched_image_path = os.path.normpath(matched_image_path) matched_folder = os.path.dirname(matched_image_path) folder_name = os.path.basename(matched_folder) if matched_folder in copied_folders: continue copied_folders.add(matched_folder) logger.info(f"Matched image path: {matched_image_path}") sprite_json_path = os.path.join(matched_folder, 'sprite.json') if not os.path.exists(sprite_json_path): logger.warning(f"sprite.json not found in: {matched_folder}") continue with open(sprite_json_path, 'r') as f: sprite_data = json.load(f) print(f"SPRITE DATA: \n{sprite_data}") # Copy only non-matched files for fname in os.listdir(matched_folder): fpath = os.path.join(matched_folder, fname) if os.path.isfile(fpath) and fname not in {os.path.basename(matched_image_path), 'sprite.json'}: shutil.copy2(fpath, os.path.join(project_folder, fname)) logger.info(f"Copied Sprite asset: {fname}") project_data.append(sprite_data) # ================================================================== # # Loop through most similar images from Backdrops folder # # → Copy Backdrop assets (excluding matched image + project.json) # # → Load project.json and append its data to project_data # # ================================================================== # backdrop_data = [] # for backdrop-related entries for backdrop_idx, matched_idx in enumerate(most_similar_indices): matched_image_path = os.path.normpath(folder_image_paths[matched_idx]) # Check if the match is from the Backdrops folder if matched_image_path.startswith(os.path.normpath(backdrop_images_path)): matched_folder = os.path.dirname(matched_image_path) folder_name = os.path.basename(matched_folder) logger.info(f"Backdrop matched image: {matched_image_path}") # Copy only non-matched files for fname in os.listdir(matched_folder): fpath = os.path.join(matched_folder, fname) if os.path.isfile(fpath) and fname not in {os.path.basename(matched_image_path), 'project.json'}: shutil.copy2(fpath, os.path.join(project_folder, fname)) logger.info(f"Copied Backdrop asset: {fname}") # Append backdrop's project.json backdrop_json_path = os.path.join(matched_folder, 'project.json') if os.path.exists(backdrop_json_path): with open(backdrop_json_path, 'r') as f: backdrop_json_data = json.load(f) print(f"SPRITE DATA: \n{backdrop_json_data}") if "targets" in backdrop_json_data: for target in backdrop_json_data["targets"]: if target.get("isStage") == True: backdrop_data.append(target) else: logger.warning(f"project.json not found in: {matched_folder}") # project_data, backdrop_data = [], [] # copied_folders = set() # start_time = time.perf_counter() # for sprite_idx, matched_idx in enumerate(most_similar_indices): # matched_entry = embedding_json[matched_idx] # # matched_image_path = os.path.normpath(folder_image_paths[matched_idx]) # matched_image_path = os.path.normpath(matched_entry["file-path"]) # matched_folder = os.path.dirname(matched_image_path) # if matched_folder in copied_folders: # continue # copied_folders.add(matched_folder) # # Sprite # sprite_json_path = os.path.join(matched_folder, 'sprite.json') # if os.path.exists(sprite_json_path): # with open(sprite_json_path, 'r') as f: # sprite_data = json.load(f) # project_data.append(sprite_data) # for fname in os.listdir(matched_folder): # if fname not in {os.path.basename(matched_image_path), 'sprite.json'}: # shutil.copy2(os.path.join( # matched_folder, fname), project_folder) # # Backdrop # if matched_image_path.startswith(os.path.normpath(backdrop_images_path)): # backdrop_json_path = os.path.join(matched_folder, 'project.json') # if os.path.exists(backdrop_json_path): # with open(backdrop_json_path, 'r') as f: # backdrop_json_data = json.load(f) # for target in backdrop_json_data.get("targets", []): # if target.get("isStage"): # backdrop_data.append(target) # for fname in os.listdir(matched_folder): # if fname not in {os.path.basename(matched_image_path), 'project.json'}: # shutil.copy2(os.path.join( # matched_folder, fname), project_folder) # Merge JSON structure final_project = { "targets": [], "monitors": [], "extensions": [], "meta": { "semver": "3.0.0", "vm": "11.3.0", "agent": "OpenAI ScratchVision Agent" } } start_time = time.perf_counter() for sprite in project_data: if not sprite.get("isStage", False): final_project["targets"].append(sprite) elapsed = time.perf_counter() - start_time logger.info(f"✅ Append sprite 'targets' in {elapsed:.2f} seconds") if backdrop_data: all_costumes, sounds = [], [] for idx, bd in enumerate(backdrop_data): all_costumes.extend(bd.get("costumes", [])) if idx == 0 and "sounds" in bd: sounds = bd["sounds"] final_project["targets"].append({ "isStage": True, "name": "Stage", "variables": {}, "lists": {}, "broadcasts": {}, "blocks": {}, "comments": {}, "currentCostume": 1 if len(all_costumes) > 1 else 0, "costumes": all_costumes, "sounds": sounds, "volume": 100, "layerOrder": 0, "tempo": 60, "videoTransparency": 50, "videoState": "on", "textToSpeechLanguage": None }) with open(project_json_path, 'w') as f: json.dump(final_project, f, indent=2) logger.info(f"🎉 Final project saved: {project_json_path}") return project_json_path @app.route('/') def index(): return render_template('app_index.html') # API endpoint @app.route('/process_pdf', methods=['POST']) def process_pdf(): try: logger.info("Received request to process PDF.") if 'pdf_file' not in request.files: logger.warning("No PDF file found in request.") return jsonify({"error": "Missing PDF file in form-data with key 'pdf_file'"}), 400 pdf_file = request.files['pdf_file'] if pdf_file.filename == '': return jsonify({"error": "Empty filename"}), 400 # Save the uploaded PDF temporarily filename = secure_filename(pdf_file.filename) temp_dir = tempfile.mkdtemp() saved_pdf_path = os.path.join(temp_dir, filename) pdf_file.save(saved_pdf_path) logger.info(f"Saved uploaded PDF to: {saved_pdf_path}") # Extract & process json_path = None #0output_path, result = extract_images_from_pdf( # saved_pdf_path, json_path) #project_output = similarity_matching(output_path) logger.info("Received request to process PDF.") return jsonify({ "message": "✅ PDF processed successfully", "output_json": "output_path", "sprites": "result", "project_output_json": "project_output", "test_url":r"https://prthm11-scratch-vision-game.hf.space/download_sb3/Event_test" }) except Exception as e: logger.exception("❌ Failed to process PDF") return jsonify({"error": f"❌ Failed to process PDF: {str(e)}"}), 500 # --- New endpoint to download the .sb3 file --- @app.route("/download_sb3/", methods=["GET"]) def download_sb3(project_id): """ Allows users to download the generated .sb3 Scratch project file. """ sb3_filename = f"{project_id}.sb3" sb3_filepath = os.path.join("game_samples", sb3_filename) try: if os.path.exists(sb3_filepath): logger.info(f"Serving SB3 file for project ID: {project_id}") # send_from_directory serves the file and handles content-disposition for download return send_from_directory( directory="game_samples", path=sb3_filename, as_attachment=True, # This makes the browser download the file download_name=sb3_filename # This sets the filename for the download ) else: logger.warning(f"SB3 file not found for ID: {project_id}") return jsonify({"error": "Scratch project file not found"}), 404 except Exception as e: logger.error(f"Error serving SB3 file for ID {project_id}: {e}") return jsonify({"error": "Failed to retrieve Scratch project file"}), 500 if __name__ == '__main__': app.run(host='0.0.0.0', port=7860, debug=True)