Spaces:
Running
Running
from flask import Flask, render_template, Response, flash, redirect, url_for, request, jsonify, send_from_directory | |
import cv2 | |
import numpy as np | |
from unstructured.partition.pdf import partition_pdf | |
import json | |
import base64 | |
import io | |
import os | |
from PIL import Image, ImageEnhance, ImageDraw | |
from imutils.perspective import four_point_transform | |
from dotenv import load_dotenv | |
import pytesseract | |
from transformers import AutoProcessor, AutoModelForImageTextToText, AutoModelForVision2Seq | |
from langchain_community.document_loaders.image_captions import ImageCaptionLoader | |
from werkzeug.utils import secure_filename | |
import tempfile | |
import torch | |
from langchain_groq import ChatGroq | |
from langgraph.prebuilt import create_react_agent | |
import logging, time | |
# Configure logging | |
logging.basicConfig( | |
level=logging.DEBUG, # Use INFO or ERROR in production | |
format="%(asctime)s [%(levelname)s] %(message)s", | |
handlers=[ | |
logging.FileHandler("app.log"), | |
logging.StreamHandler() | |
] | |
) | |
logger = logging.getLogger(__name__) | |
load_dotenv() | |
# os.environ["GROQ_API_KEY"] = os.getenv("GROQ_API_KEY") | |
groq_api_key = os.getenv("GROQ_API_KEY") | |
llm = ChatGroq( | |
model="meta-llama/llama-4-maverick-17b-128e-instruct", | |
temperature=0, | |
max_tokens=None, | |
) | |
app = Flask(__name__) | |
pytesseract.pytesseract.tesseract_cmd = r"C:\Program Files\Tesseract-OCR\tesseract.exe" | |
poppler_path = r"C:\poppler-23.11.0\Library\bin" | |
count = 0 | |
OUTPUT_FOLDER = "OUTPUTS" | |
DETECTED_IMAGE_FOLDER_PATH = os.path.join(OUTPUT_FOLDER, "DETECTED_IMAGE") | |
IMAGE_FOLDER_PATH = os.path.join(OUTPUT_FOLDER, "SCANNED_IMAGE") | |
JSON_FOLDER_PATH = os.path.join(OUTPUT_FOLDER, "EXTRACTED_JSON") | |
for path in [OUTPUT_FOLDER, IMAGE_FOLDER_PATH, DETECTED_IMAGE_FOLDER_PATH, JSON_FOLDER_PATH]: | |
os.makedirs(path, exist_ok=True) | |
# Model Initialization | |
try: | |
smolvlm256m_processor = AutoProcessor.from_pretrained( | |
"HuggingFaceTB/SmolVLM-256M-Instruct") | |
# smolvlm256m_model = AutoModelForImageTextToText.from_pretrained("HuggingFaceTB/SmolVLM-256M-Instruct").to("cpu") | |
smolvlm256m_model = AutoModelForVision2Seq.from_pretrained( | |
"HuggingFaceTB/SmolVLM-256M-Instruct", | |
torch_dtype=torch.bfloat16 if hasattr( | |
torch, "bfloat16") else torch.float32, | |
_attn_implementation="eager" | |
).to("cpu") | |
except Exception as e: | |
raise RuntimeError(f"β Failed to load SmolVLM model: {str(e)}") | |
# SmolVLM Image Captioning functioning | |
def get_smolvlm_caption(image: Image.Image, prompt: str = "") -> str: | |
try: | |
# Ensure exactly one <image> token | |
if "<image>" not in prompt: | |
prompt = f"<image> {prompt.strip()}" | |
num_image_tokens = prompt.count("<image>") | |
if num_image_tokens != 1: | |
raise ValueError( | |
f"Prompt must contain exactly 1 <image> token. Found {num_image_tokens}") | |
inputs = smolvlm256m_processor( | |
images=[image], text=[prompt], return_tensors="pt").to("cpu") | |
output_ids = smolvlm256m_model.generate(**inputs, max_new_tokens=100) | |
return smolvlm256m_processor.decode(output_ids[0], skip_special_tokens=True) | |
except Exception as e: | |
return f"β Error during caption generation: {str(e)}" | |
# --- FUNCTION: Extract images from saved PDF --- | |
def extract_images_from_pdf(pdf_path, output_json_path): | |
''' Extract images from PDF and generate structured sprite JSON ''' | |
try: | |
pdf_filename = os.path.splitext(os.path.basename(pdf_path))[ | |
0] # e.g., "scratch_crab" | |
pdf_dir_path = os.path.dirname(pdf_path).replace("/", "\\") | |
# Create subfolders | |
extracted_image_subdir = os.path.join( | |
DETECTED_IMAGE_FOLDER_PATH, pdf_filename) | |
json_subdir = os.path.join(JSON_FOLDER_PATH, pdf_filename) | |
os.makedirs(extracted_image_subdir, exist_ok=True) | |
os.makedirs(json_subdir, exist_ok=True) | |
# Output paths | |
output_json_path = os.path.join(json_subdir, "extracted.json") | |
final_json_path = os.path.join(json_subdir, "extracted_sprites.json") | |
try: | |
elements = partition_pdf( | |
filename=pdf_path, | |
strategy="hi_res", | |
extract_image_block_types=["Image"], | |
extract_image_block_to_payload=True, # Set to True to get base64 in output | |
) | |
except Exception as e: | |
raise RuntimeError( | |
f"β Failed to extract images from PDF: {str(e)}") | |
try: | |
start_time = time.perf_counter() | |
with open(output_json_path, "w") as f: | |
json.dump([element.to_dict() | |
for element in elements], f, indent=4) | |
elapsed = time.perf_counter() - start_time | |
logger.info(f"β extracted.json write in {elapsed:.2f} seconds") | |
except Exception as e: | |
raise RuntimeError(f"β Failed to write extracted.json: {str(e)}") | |
try: | |
# Display extracted images | |
with open(output_json_path, 'r') as file: | |
file_elements = json.load(file) | |
except Exception as e: | |
raise RuntimeError(f"β Failed to read extracted.json: {str(e)}") | |
# Prepare manipulated sprite JSON structure | |
manipulated_json = {} | |
# SET A SYSTEM PROMPT | |
system_prompt = """ | |
You are an expert in visual scene understanding. | |
Your Job is to analyze an image and respond acoording if asked for name give simple name by analyzing it and if ask for descrption generate a short description covering its elements. | |
Guidelines: | |
- Focus only the images given in Square Shape. | |
- Don't Consider Blank areas in Image as. | |
- Don't include generic summary or explanation outside the fields. | |
Return only string. | |
""" | |
agent = create_react_agent( | |
model=llm, | |
tools=[], | |
prompt=system_prompt | |
) | |
# If JSON already exists, load it and find the next available Sprite number | |
if os.path.exists(final_json_path): | |
with open(final_json_path, "r") as existing_file: | |
manipulated = json.load(existing_file) | |
# Determine the next available index (e.g., Sprite 4 if 1β3 already exist) | |
existing_keys = [int(k.replace("Sprite ", "")) | |
for k in manipulated.keys()] | |
start_count = max(existing_keys, default=0) + 1 | |
else: | |
start_count = 1 | |
sprite_count = start_count | |
start_time = time.perf_counter() | |
for i, element in enumerate(file_elements): | |
if "image_base64" in element["metadata"]: | |
try: | |
image_data = base64.b64decode( | |
element["metadata"]["image_base64"]) | |
image = Image.open(io.BytesIO(image_data)).convert("RGB") | |
image.show(title=f"Extracted Image {i+1}") | |
image_path = os.path.join( | |
extracted_image_subdir, f"Sprite_{i+1}.png") | |
image.save(image_path) | |
with open(image_path, "rb") as image_file: | |
image_bytes = image_file.read() | |
img_base64 = base64.b64encode(image_bytes).decode("utf-8") | |
# description = get_smolvlm_caption(image, prompt="Give a brief Description") | |
# name = get_smolvlm_caption(image, prompt="give a short name/title of this Image.") | |
def clean_caption_output(raw_output: str, prompt: str) -> str: | |
answer = raw_output.replace(prompt, '').replace( | |
"<image>", '').strip(" :-\n") | |
return answer | |
prompt_description = "Give a brief Captioning." | |
prompt_name = "give a short name caption of this Image." | |
content1 = [ | |
{ | |
"type": "text", | |
"text": f"{prompt_description}" | |
}, | |
{ | |
"type": "image_url", | |
"image_url": { | |
"url": f"data:image/jpeg;base64,{img_base64}" | |
} | |
} | |
] | |
response1 = agent.invoke( | |
{"messages": [{"role": "user", "content": content1}]}) | |
print(response1) | |
description = response1["messages"][-1].content | |
content2 = [ | |
{ | |
"type": "text", | |
"text": f"{prompt_name}" | |
}, | |
{ | |
"type": "image_url", | |
"image_url": { | |
"url": f"data:image/jpeg;base64,{img_base64}" | |
} | |
} | |
] | |
response2 = agent.invoke( | |
{"messages": [{"role": "user", "content": content2}]}) | |
print(response2) | |
name = response2["messages"][-1].content | |
# raw_description = get_smolvlm_caption(image, prompt=prompt_description) | |
# raw_name = get_smolvlm_caption(image, prompt=prompt_name) | |
# description = clean_caption_output(raw_description, prompt_description) | |
# name = clean_caption_output(raw_name, prompt_name) | |
manipulated_json[f"Sprite {sprite_count}"] = { | |
"name": name, | |
"base64": element["metadata"]["image_base64"], | |
"file-path": pdf_dir_path, | |
"description": description | |
} | |
sprite_count += 1 | |
except Exception as e: | |
print(f"β οΈ Error processing Sprite {i+1}: {str(e)}") | |
elapsed = time.perf_counter() - start_time | |
logger.info(f"β extracted_sprites.json write in {elapsed:.2f} seconds") | |
# Save manipulated JSON | |
with open(final_json_path, "w") as sprite_file: | |
json.dump(manipulated_json, sprite_file, indent=4) | |
print(f"β Manipulated sprite JSON saved: {final_json_path}") | |
return final_json_path, manipulated_json | |
except Exception as e: | |
raise RuntimeError(f"β Error in extract_images_from_pdf: {str(e)}") | |
def similarity_matching(input_json_path: str) -> str: | |
import uuid | |
import shutil | |
import tempfile | |
from langchain_experimental.open_clip.open_clip import OpenCLIPEmbeddings | |
from matplotlib.offsetbox import OffsetImage, AnnotationBbox | |
from io import BytesIO | |
logger.info("π Running similarity matching...") | |
# ============================== # | |
# DEFINE PATHS # | |
# ============================== # | |
backdrop_images_path = os.getenv("BACKDROP_FOLDER_PATH", "/app/reference/backdrops") | |
sprite_images_path = os.getenv("SPRITE_FOLDER_PATH", "/app/reference/sprites") | |
image_dirs = [backdrop_images_path, sprite_images_path] | |
# ================================================= # | |
# Generate Random UUID for project folder name # | |
# ================================================= # | |
random_id = str(uuid.uuid4()).replace('-', '') | |
project_folder = os.path.join("outputs", f"project_{random_id}") | |
# =========================================================================== # | |
# Create empty json in project_{random_id} folder # | |
# =========================================================================== # | |
os.makedirs(project_folder, exist_ok=True) | |
project_json_path = os.path.join(project_folder, "project.json") | |
# ============================== # | |
# READ SPRITE METADATA # | |
# ============================== # | |
with open(input_json_path, 'r') as f: | |
sprites_data = json.load(f) | |
sprite_ids, texts, sprite_base64 = [], [], [] | |
start_time = time.perf_counter() | |
for sid, sprite in sprites_data.items(): | |
sprite_ids.append(sid) | |
texts.append( | |
"This is " + sprite.get("description", sprite.get("name", ""))) | |
sprite_base64.append(sprite["base64"]) | |
elapsed = time.perf_counter() - start_time | |
logger.info(f"β Append Sprite's Name and Description in {elapsed:.2f} seconds") | |
# ============================== # | |
# INITIALIZE CLIP EMBEDDER # | |
# ============================== # | |
clip_embd = OpenCLIPEmbeddings() | |
# # ========================================= # | |
# # Walk folders to collect all image paths # | |
# # ========================================= # | |
folder_image_paths = [] | |
for image_dir in image_dirs: | |
for root, _, files in os.walk(image_dir): | |
for fname in files: | |
if fname.lower().endswith((".png", ".jpg", ".jpeg")): | |
folder_image_paths.append(os.path.join(root, fname)) | |
# # ============================== # | |
# # EMBED FOLDER IMAGES (REF) # | |
# # ============================== # | |
# img_features = clip_embd.embed_image(folder_image_paths) | |
# # ============================== # | |
# # Store image embeddings # | |
# # ============================== # | |
# embedding_json = [] | |
# for i, path in enumerate(folder_image_paths): | |
# embedding_json.append({ | |
# "name":os.path.basename(path), | |
# "file-path": path, | |
# "embeddings": list(img_features[i]) | |
# }) | |
# # Save to embeddings.json | |
# with open(f"{OUTPUT_FOLDER}/embeddings.json", "w") as f: | |
# json.dump(embedding_json, f, indent=2) | |
# ============================== # | |
# DECODE SPRITE IMAGES # | |
# ============================== # | |
temp_dir = tempfile.mkdtemp() | |
sprite_image_paths = [] | |
start_time = time.perf_counter() | |
for idx, b64 in enumerate(sprite_base64): | |
image_data = base64.b64decode(b64.split(",")[-1]) | |
img = Image.open(BytesIO(image_data)).convert("RGB") | |
temp_path = os.path.join(temp_dir, f"sprite_{idx}.png") | |
img.save(temp_path) | |
sprite_image_paths.append(temp_path) | |
elapsed = time.perf_counter() - start_time | |
logger.info(f"β Decoded Sprite Base64 in {elapsed:.2f} seconds") | |
# ============================== # | |
# EMBED SPRITE IMAGES # | |
# ============================== # | |
sprite_features = clip_embd.embed_image(sprite_image_paths) | |
# ============================== # | |
# COMPUTE SIMILARITIES # | |
# ============================== # | |
with open(f"{OUTPUT_FOLDER}/embeddings.json", "r") as f: | |
embedding_json = json.load(f) | |
img_matrix = np.array([img["embeddings"] for img in embedding_json]) | |
sprite_matrix = np.array(sprite_features) | |
if sprite_matrix.size == 0 or img_matrix.size == 0: | |
raise RuntimeError("β No valid embeddings found for sprites or reference images.") | |
try: | |
similarity = np.matmul(sprite_matrix, img_matrix.T) | |
except ValueError as ve: | |
if "matmul" in str(ve) and "size" in str(ve): | |
logger.error("β Matrix multiplication failed due to shape mismatch. Likely due to empty or invalid embeddings.") | |
raise RuntimeError("Matrix shape mismatch: CLIP embedding input is invalid or empty.") | |
else: | |
raise | |
most_similar_indices = np.argmax(similarity, axis=1) | |
# ============= Match and copy ================ | |
project_data = [] | |
copied_folders = set() | |
# =============================================================== # | |
# Loop through most similar images from Sprites folder # | |
# β Copy sprite assets (excluding matched image + sprite.json) # | |
# β Load sprite.json and append its data to project_data # | |
# =============================================================== # | |
for sprite_idx, matched_idx in enumerate(most_similar_indices): | |
matched_image_path = folder_image_paths[matched_idx] | |
matched_image_path = os.path.normpath(matched_image_path) | |
matched_folder = os.path.dirname(matched_image_path) | |
folder_name = os.path.basename(matched_folder) | |
if matched_folder in copied_folders: | |
continue | |
copied_folders.add(matched_folder) | |
logger.info(f"Matched image path: {matched_image_path}") | |
sprite_json_path = os.path.join(matched_folder, 'sprite.json') | |
if not os.path.exists(sprite_json_path): | |
logger.warning(f"sprite.json not found in: {matched_folder}") | |
continue | |
with open(sprite_json_path, 'r') as f: | |
sprite_data = json.load(f) | |
print(f"SPRITE DATA: \n{sprite_data}") | |
# Copy only non-matched files | |
for fname in os.listdir(matched_folder): | |
fpath = os.path.join(matched_folder, fname) | |
if os.path.isfile(fpath) and fname not in {os.path.basename(matched_image_path), 'sprite.json'}: | |
shutil.copy2(fpath, os.path.join(project_folder, fname)) | |
logger.info(f"Copied Sprite asset: {fname}") | |
project_data.append(sprite_data) | |
# ================================================================== # | |
# Loop through most similar images from Backdrops folder # | |
# β Copy Backdrop assets (excluding matched image + project.json) # | |
# β Load project.json and append its data to project_data # | |
# ================================================================== # | |
backdrop_data = [] # for backdrop-related entries | |
for backdrop_idx, matched_idx in enumerate(most_similar_indices): | |
matched_image_path = os.path.normpath(folder_image_paths[matched_idx]) | |
# Check if the match is from the Backdrops folder | |
if matched_image_path.startswith(os.path.normpath(backdrop_images_path)): | |
matched_folder = os.path.dirname(matched_image_path) | |
folder_name = os.path.basename(matched_folder) | |
logger.info(f"Backdrop matched image: {matched_image_path}") | |
# Copy only non-matched files | |
for fname in os.listdir(matched_folder): | |
fpath = os.path.join(matched_folder, fname) | |
if os.path.isfile(fpath) and fname not in {os.path.basename(matched_image_path), 'project.json'}: | |
shutil.copy2(fpath, os.path.join(project_folder, fname)) | |
logger.info(f"Copied Backdrop asset: {fname}") | |
# Append backdrop's project.json | |
backdrop_json_path = os.path.join(matched_folder, 'project.json') | |
if os.path.exists(backdrop_json_path): | |
with open(backdrop_json_path, 'r') as f: | |
backdrop_json_data = json.load(f) | |
print(f"SPRITE DATA: \n{backdrop_json_data}") | |
if "targets" in backdrop_json_data: | |
for target in backdrop_json_data["targets"]: | |
if target.get("isStage") == True: | |
backdrop_data.append(target) | |
else: | |
logger.warning(f"project.json not found in: {matched_folder}") | |
# project_data, backdrop_data = [], [] | |
# copied_folders = set() | |
# start_time = time.perf_counter() | |
# for sprite_idx, matched_idx in enumerate(most_similar_indices): | |
# matched_entry = embedding_json[matched_idx] | |
# # matched_image_path = os.path.normpath(folder_image_paths[matched_idx]) | |
# matched_image_path = os.path.normpath(matched_entry["file-path"]) | |
# matched_folder = os.path.dirname(matched_image_path) | |
# if matched_folder in copied_folders: | |
# continue | |
# copied_folders.add(matched_folder) | |
# # Sprite | |
# sprite_json_path = os.path.join(matched_folder, 'sprite.json') | |
# if os.path.exists(sprite_json_path): | |
# with open(sprite_json_path, 'r') as f: | |
# sprite_data = json.load(f) | |
# project_data.append(sprite_data) | |
# for fname in os.listdir(matched_folder): | |
# if fname not in {os.path.basename(matched_image_path), 'sprite.json'}: | |
# shutil.copy2(os.path.join( | |
# matched_folder, fname), project_folder) | |
# # Backdrop | |
# if matched_image_path.startswith(os.path.normpath(backdrop_images_path)): | |
# backdrop_json_path = os.path.join(matched_folder, 'project.json') | |
# if os.path.exists(backdrop_json_path): | |
# with open(backdrop_json_path, 'r') as f: | |
# backdrop_json_data = json.load(f) | |
# for target in backdrop_json_data.get("targets", []): | |
# if target.get("isStage"): | |
# backdrop_data.append(target) | |
# for fname in os.listdir(matched_folder): | |
# if fname not in {os.path.basename(matched_image_path), 'project.json'}: | |
# shutil.copy2(os.path.join( | |
# matched_folder, fname), project_folder) | |
# Merge JSON structure | |
final_project = { | |
"targets": [], | |
"monitors": [], | |
"extensions": [], | |
"meta": { | |
"semver": "3.0.0", | |
"vm": "11.3.0", | |
"agent": "OpenAI ScratchVision Agent" | |
} | |
} | |
start_time = time.perf_counter() | |
for sprite in project_data: | |
if not sprite.get("isStage", False): | |
final_project["targets"].append(sprite) | |
elapsed = time.perf_counter() - start_time | |
logger.info(f"β Append sprite 'targets' in {elapsed:.2f} seconds") | |
if backdrop_data: | |
all_costumes, sounds = [], [] | |
for idx, bd in enumerate(backdrop_data): | |
all_costumes.extend(bd.get("costumes", [])) | |
if idx == 0 and "sounds" in bd: | |
sounds = bd["sounds"] | |
final_project["targets"].append({ | |
"isStage": True, | |
"name": "Stage", | |
"variables": {}, | |
"lists": {}, | |
"broadcasts": {}, | |
"blocks": {}, | |
"comments": {}, | |
"currentCostume": 1 if len(all_costumes) > 1 else 0, | |
"costumes": all_costumes, | |
"sounds": sounds, | |
"volume": 100, | |
"layerOrder": 0, | |
"tempo": 60, | |
"videoTransparency": 50, | |
"videoState": "on", | |
"textToSpeechLanguage": None | |
}) | |
with open(project_json_path, 'w') as f: | |
json.dump(final_project, f, indent=2) | |
logger.info(f"π Final project saved: {project_json_path}") | |
return project_json_path | |
def index(): | |
return render_template('app_index.html') | |
# API endpoint | |
def process_pdf(): | |
try: | |
logger.info("Received request to process PDF.") | |
if 'pdf_file' not in request.files: | |
logger.warning("No PDF file found in request.") | |
return jsonify({"error": "Missing PDF file in form-data with key 'pdf_file'"}), 400 | |
pdf_file = request.files['pdf_file'] | |
if pdf_file.filename == '': | |
return jsonify({"error": "Empty filename"}), 400 | |
# Save the uploaded PDF temporarily | |
filename = secure_filename(pdf_file.filename) | |
temp_dir = tempfile.mkdtemp() | |
saved_pdf_path = os.path.join(temp_dir, filename) | |
pdf_file.save(saved_pdf_path) | |
logger.info(f"Saved uploaded PDF to: {saved_pdf_path}") | |
# Extract & process | |
json_path = None | |
#0output_path, result = extract_images_from_pdf( | |
# saved_pdf_path, json_path) | |
#project_output = similarity_matching(output_path) | |
logger.info("Received request to process PDF.") | |
return jsonify({ | |
"message": "β PDF processed successfully", | |
"output_json": "output_path", | |
"sprites": "result", | |
"project_output_json": "project_output", | |
"test_url":r"https://prthm11-scratch-vision-game.hf.space/download_sb3/Event_test" | |
}) | |
except Exception as e: | |
logger.exception("β Failed to process PDF") | |
return jsonify({"error": f"β Failed to process PDF: {str(e)}"}), 500 | |
# --- New endpoint to download the .sb3 file --- | |
def download_sb3(project_id): | |
""" | |
Allows users to download the generated .sb3 Scratch project file. | |
""" | |
sb3_filename = f"{project_id}.sb3" | |
sb3_filepath = os.path.join("game_samples", sb3_filename) | |
try: | |
if os.path.exists(sb3_filepath): | |
logger.info(f"Serving SB3 file for project ID: {project_id}") | |
# send_from_directory serves the file and handles content-disposition for download | |
return send_from_directory( | |
directory="game_samples", | |
path=sb3_filename, | |
as_attachment=True, # This makes the browser download the file | |
download_name=sb3_filename # This sets the filename for the download | |
) | |
else: | |
logger.warning(f"SB3 file not found for ID: {project_id}") | |
return jsonify({"error": "Scratch project file not found"}), 404 | |
except Exception as e: | |
logger.error(f"Error serving SB3 file for ID {project_id}: {e}") | |
return jsonify({"error": "Failed to retrieve Scratch project file"}), 500 | |
if __name__ == '__main__': | |
app.run(host='0.0.0.0', port=7860, debug=True) |