from flask import Flask, render_template, Response, flash, redirect, url_for, request, jsonify import cv2 import numpy as np from unstructured.partition.pdf import partition_pdf import json import base64 import io import os from PIL import Image, ImageEnhance, ImageDraw from imutils.perspective import four_point_transform from dotenv import load_dotenv import pytesseract from transformers import AutoProcessor, AutoModelForImageTextToText, AutoModelForVision2Seq from langchain_community.document_loaders.image_captions import ImageCaptionLoader from werkzeug.utils import secure_filename import tempfile import torch from langchain_groq import ChatGroq from langgraph.prebuilt import create_react_agent import logging # Configure logging logging.basicConfig( level=logging.DEBUG, # Use INFO or ERROR in production format="%(asctime)s [%(levelname)s] %(message)s", handlers=[ logging.FileHandler("app.log"), logging.StreamHandler() ] ) logger = logging.getLogger(__name__) load_dotenv() # os.environ["GROQ_API_KEY"] = os.getenv("GROQ_API_KEY") groq_api_key = os.getenv("GROQ_API_KEY") llm = ChatGroq( model="meta-llama/llama-4-maverick-17b-128e-instruct", temperature=0, max_tokens=None, ) app = Flask(__name__) pytesseract.pytesseract.tesseract_cmd = r"C:\Program Files\Tesseract-OCR\tesseract.exe" poppler_path = r"C:\poppler-23.11.0\Library\bin" count = 0 PDF_GET = r"E:\Pratham\2025\Harsh Sir\Scratch Vision\images\scratch_crab.pdf" OUTPUT_FOLDER = "OUTPUTS" DETECTED_IMAGE_FOLDER_PATH = os.path.join(OUTPUT_FOLDER, "DETECTED_IMAGE") IMAGE_FOLDER_PATH = os.path.join(OUTPUT_FOLDER, "SCANNED_IMAGE") JSON_FOLDER_PATH = os.path.join(OUTPUT_FOLDER, "EXTRACTED_JSON") for path in [OUTPUT_FOLDER, IMAGE_FOLDER_PATH, DETECTED_IMAGE_FOLDER_PATH, JSON_FOLDER_PATH]: os.makedirs(path, exist_ok=True) # Model Initialization try: smolvlm256m_processor = AutoProcessor.from_pretrained( "HuggingFaceTB/SmolVLM-256M-Instruct") # smolvlm256m_model = AutoModelForImageTextToText.from_pretrained("HuggingFaceTB/SmolVLM-256M-Instruct").to("cpu") smolvlm256m_model = AutoModelForVision2Seq.from_pretrained( "HuggingFaceTB/SmolVLM-256M-Instruct", torch_dtype=torch.bfloat16 if hasattr( torch, "bfloat16") else torch.float32, _attn_implementation="eager" ).to("cpu") except Exception as e: raise RuntimeError(f"❌ Failed to load SmolVLM model: {str(e)}") # SmolVLM Image Captioning functioning def get_smolvlm_caption(image: Image.Image, prompt: str = "") -> str: try: # Ensure exactly one token if "" not in prompt: prompt = f" {prompt.strip()}" num_image_tokens = prompt.count("") if num_image_tokens != 1: raise ValueError( f"Prompt must contain exactly 1 token. Found {num_image_tokens}") inputs = smolvlm256m_processor( images=[image], text=[prompt], return_tensors="pt").to("cpu") output_ids = smolvlm256m_model.generate(**inputs, max_new_tokens=100) return smolvlm256m_processor.decode(output_ids[0], skip_special_tokens=True) except Exception as e: return f"❌ Error during caption generation: {str(e)}" # --- FUNCTION: Extract images from saved PDF --- def extract_images_from_pdf(pdf_path, output_json_path): ''' Extract images from PDF and generate structured sprite JSON ''' try: pdf_filename = os.path.splitext(os.path.basename(pdf_path))[ 0] # e.g., "scratch_crab" pdf_dir_path = os.path.dirname(pdf_path).replace("/", "\\") # Create subfolders extracted_image_subdir = os.path.join( DETECTED_IMAGE_FOLDER_PATH, pdf_filename) json_subdir = os.path.join(JSON_FOLDER_PATH, pdf_filename) os.makedirs(extracted_image_subdir, exist_ok=True) os.makedirs(json_subdir, exist_ok=True) # Output paths output_json_path = os.path.join(json_subdir, "extracted.json") final_json_path = os.path.join(json_subdir, "extracted_sprites.json") try: elements = partition_pdf( filename=pdf_path, strategy="hi_res", extract_image_block_types=["Image"], extract_image_block_to_payload=True, # Set to True to get base64 in output ) except Exception as e: raise RuntimeError( f"❌ Failed to extract images from PDF: {str(e)}") try: with open(output_json_path, "w") as f: json.dump([element.to_dict() for element in elements], f, indent=4) except Exception as e: raise RuntimeError(f"❌ Failed to write extracted.json: {str(e)}") try: # Display extracted images with open(output_json_path, 'r') as file: file_elements = json.load(file) except Exception as e: raise RuntimeError(f"❌ Failed to read extracted.json: {str(e)}") # Prepare manipulated sprite JSON structure manipulated_json = {} # SET A SYSTEM PROMPT system_prompt = """ You are an expert in visual scene understanding. Your Job is to analyze an image and respond acoording if asked for name give simple name by analyzing it and if ask for descrption generate a short description covering its elements. Guidelines: - Focus only the images given in Square Shape. - Don't Consider Blank areas in Image as. - Don't include generic summary or explanation outside the fields. Return only string. """ agent = create_react_agent( model=llm, tools=[], prompt=system_prompt ) # If JSON already exists, load it and find the next available Sprite number if os.path.exists(final_json_path): with open(final_json_path, "r") as existing_file: manipulated = json.load(existing_file) # Determine the next available index (e.g., Sprite 4 if 1–3 already exist) existing_keys = [int(k.replace("Sprite ", "")) for k in manipulated.keys()] start_count = max(existing_keys, default=0) + 1 else: start_count = 1 sprite_count = start_count for i, element in enumerate(file_elements): if "image_base64" in element["metadata"]: try: image_data = base64.b64decode( element["metadata"]["image_base64"]) image = Image.open(io.BytesIO(image_data)).convert("RGB") image.show(title=f"Extracted Image {i+1}") image_path = os.path.join( extracted_image_subdir, f"Sprite_{i+1}.png") image.save(image_path) with open(image_path, "rb") as image_file: image_bytes = image_file.read() img_base64 = base64.b64encode(image_bytes).decode("utf-8") # description = get_smolvlm_caption(image, prompt="Give a brief Description") # name = get_smolvlm_caption(image, prompt="give a short name/title of this Image.") def clean_caption_output(raw_output: str, prompt: str) -> str: answer = raw_output.replace(prompt, '').replace( "", '').strip(" :-\n") return answer prompt_description = "Give a brief Captioning." prompt_name = "give a short name caption of this Image." content1 = [ { "type": "text", "text": f"{prompt_description}" }, { "type": "image_url", "image_url": { "url": f"data:image/jpeg;base64,{img_base64}" } } ] response1 = agent.invoke( {"messages": [{"role": "user", "content": content1}]}) print(response1) description = response1["messages"][-1].content content2 = [ { "type": "text", "text": f"{prompt_name}" }, { "type": "image_url", "image_url": { "url": f"data:image/jpeg;base64,{img_base64}" } } ] response2 = agent.invoke( {"messages": [{"role": "user", "content": content2}]}) print(response2) name = response2["messages"][-1].content # raw_description = get_smolvlm_caption(image, prompt=prompt_description) # raw_name = get_smolvlm_caption(image, prompt=prompt_name) # description = clean_caption_output(raw_description, prompt_description) # name = clean_caption_output(raw_name, prompt_name) manipulated_json[f"Sprite {sprite_count}"] = { "name": name, "base64": element["metadata"]["image_base64"], "file-path": pdf_dir_path, "description": description } sprite_count += 1 except Exception as e: print(f"⚠️ Error processing Sprite {i+1}: {str(e)}") # Save manipulated JSON with open(final_json_path, "w") as sprite_file: json.dump(manipulated_json, sprite_file, indent=4) print(f"✅ Manipulated sprite JSON saved: {final_json_path}") return final_json_path, manipulated_json except Exception as e: raise RuntimeError(f"❌ Error in extract_images_from_pdf: {str(e)}") def similarity_matching(input_json_path: str) -> str: import uuid import shutil import tempfile from langchain_experimental.open_clip.open_clip import OpenCLIPEmbeddings from matplotlib.offsetbox import OffsetImage, AnnotationBbox from io import BytesIO logger.info("🔍 Running similarity matching...") # ============================== # # DEFINE PATHS # # ============================== # backdrop_images_path = os.getenv("BACKDROP_FOLDER_PATH", "/app/reference/backdrops") sprite_images_path = os.getenv("SPRITE_FOLDER_PATH", "/app/reference/sprites") image_dirs = [backdrop_images_path, sprite_images_path] # ================================================= # # Generate Random UUID for project folder name # # ================================================= # random_id = str(uuid.uuid4()).replace('-', '') project_folder = os.path.join("outputs", f"project_{random_id}") # =========================================================================== # # Create empty json in project_{random_id} folder # # =========================================================================== # os.makedirs(project_folder, exist_ok=True) project_json_path = os.path.join(project_folder, "project.json") # ============================== # # READ SPRITE METADATA # # ============================== # with open(input_json_path, 'r') as f: sprites_data = json.load(f) sprite_ids, texts, sprite_base64 = [], [], [] for sid, sprite in sprites_data.items(): sprite_ids.append(sid) texts.append( "This is " + sprite.get("description", sprite.get("name", ""))) sprite_base64.append(sprite["base64"]) # ============================== # # INITIALIZE CLIP EMBEDDER # # ============================== # clip_embd = OpenCLIPEmbeddings() # # ========================================= # # # Walk folders to collect all image paths # # # ========================================= # # folder_image_paths = [] # for image_dir in image_dirs: # for root, _, files in os.walk(image_dir): # for fname in files: # if fname.lower().endswith((".png", ".jpg", ".jpeg")): # folder_image_paths.append(os.path.join(root, fname)) # # ============================== # # # EMBED FOLDER IMAGES (REF) # # # ============================== # # img_features = clip_embd.embed_image(folder_image_paths) # # ============================== # # # Store image embeddings # # # ============================== # # embedding_json = [] # for i, path in enumerate(folder_image_paths): # embedding_json.append({ # "name":os.path.basename(path), # "file-path": path, # "embeddings": list(img_features[i]) # }) # # Save to embeddings.json # with open(f"{OUTPUT_FOLDER}/embeddings.json", "w") as f: # json.dump(embedding_json, f, indent=2) # ============================== # # DECODE SPRITE IMAGES # # ============================== # temp_dir = tempfile.mkdtemp() sprite_image_paths = [] for idx, b64 in enumerate(sprite_base64): image_data = base64.b64decode(b64.split(",")[-1]) img = Image.open(BytesIO(image_data)).convert("RGB") temp_path = os.path.join(temp_dir, f"sprite_{idx}.png") img.save(temp_path) sprite_image_paths.append(temp_path) # ============================== # # EMBED SPRITE IMAGES # # ============================== # sprite_features = clip_embd.embed_image(sprite_image_paths) # ============================== # # COMPUTE SIMILARITIES # # ============================== # with open(f"{OUTPUT_FOLDER}/embeddings.json", "r") as f: embedding_json = json.load(f) img_matrix = np.array([img["embeddings"] for img in embedding_json]) sprite_matrix = np.array(sprite_features) similarity = np.matmul(sprite_matrix, img_matrix.T) most_similar_indices = np.argmax(similarity, axis=1) # ============= Match and copy ================ project_data, backdrop_data = [], [] copied_folders = set() for sprite_idx, matched_idx in enumerate(most_similar_indices): matched_entry = embedding_json[matched_idx] # matched_image_path = os.path.normpath(folder_image_paths[matched_idx]) matched_image_path = os.path.normpath(matched_entry["file-path"]) matched_folder = os.path.dirname(matched_image_path) if matched_folder in copied_folders: continue copied_folders.add(matched_folder) # Sprite sprite_json_path = os.path.join(matched_folder, 'sprite.json') if os.path.exists(sprite_json_path): with open(sprite_json_path, 'r') as f: sprite_data = json.load(f) project_data.append(sprite_data) for fname in os.listdir(matched_folder): if fname not in {os.path.basename(matched_image_path), 'sprite.json'}: shutil.copy2(os.path.join( matched_folder, fname), project_folder) # Backdrop if matched_image_path.startswith(os.path.normpath(backdrop_images_path)): backdrop_json_path = os.path.join(matched_folder, 'project.json') if os.path.exists(backdrop_json_path): with open(backdrop_json_path, 'r') as f: backdrop_json_data = json.load(f) for target in backdrop_json_data.get("targets", []): if target.get("isStage"): backdrop_data.append(target) for fname in os.listdir(matched_folder): if fname not in {os.path.basename(matched_image_path), 'project.json'}: shutil.copy2(os.path.join( matched_folder, fname), project_folder) # Merge JSON structure final_project = { "targets": [], "monitors": [], "extensions": [], "meta": { "semver": "3.0.0", "vm": "11.3.0", "agent": "OpenAI ScratchVision Agent" } } for sprite in project_data: if not sprite.get("isStage", False): final_project["targets"].append(sprite) if backdrop_data: all_costumes, sounds = [], [] for idx, bd in enumerate(backdrop_data): all_costumes.extend(bd.get("costumes", [])) if idx == 0 and "sounds" in bd: sounds = bd["sounds"] final_project["targets"].append({ "isStage": True, "name": "Stage", "variables": {}, "lists": {}, "broadcasts": {}, "blocks": {}, "comments": {}, "currentCostume": 1 if len(all_costumes) > 1 else 0, "costumes": all_costumes, "sounds": sounds, "volume": 100, "layerOrder": 0, "tempo": 60, "videoTransparency": 50, "videoState": "on", "textToSpeechLanguage": None }) with open(project_json_path, 'w') as f: json.dump(final_project, f, indent=2) logger.info(f"🎉 Final project saved: {project_json_path}") return project_json_path @app.route('/') def index(): return render_template('app_index.html') # API endpoint @app.route('/process_pdf', methods=['POST']) def process_pdf(): try: logger.info("Received request to process PDF.") if 'pdf_file' not in request.files: logger.warning("No PDF file found in request.") return jsonify({"error": "Missing PDF file in form-data with key 'pdf_file'"}), 400 pdf_file = request.files['pdf_file'] if pdf_file.filename == '': return jsonify({"error": "Empty filename"}), 400 # Save the uploaded PDF temporarily filename = secure_filename(pdf_file.filename) temp_dir = tempfile.mkdtemp() saved_pdf_path = os.path.join(temp_dir, filename) pdf_file.save(saved_pdf_path) logger.info(f"Saved uploaded PDF to: {saved_pdf_path}") # Extract & process json_path = None output_path, result = extract_images_from_pdf( saved_pdf_path, json_path) project_output = similarity_matching(output_path) logger.info("Received request to process PDF.") return jsonify({ "message": "✅ PDF processed successfully", "output_json": output_path, "sprites": result, "project_output_json": project_output }) except Exception as e: logger.exception("❌ Failed to process PDF") return jsonify({"error": f"❌ Failed to process PDF: {str(e)}"}), 500 if __name__ == '__main__': app.run(host='0.0.0.0', port=7860, debug=True)