Spaces:
Sleeping
Sleeping
from flask import Flask, render_template, Response, flash, redirect, url_for, request, jsonify | |
import cv2 | |
import numpy as np | |
from unstructured.partition.pdf import partition_pdf | |
import json, base64, io, os | |
from PIL import Image, ImageEnhance, ImageDraw | |
from imutils.perspective import four_point_transform | |
from dotenv import load_dotenv | |
import pytesseract | |
from transformers import AutoProcessor, AutoModelForImageTextToText, AutoModelForVision2Seq | |
from langchain_community.document_loaders.image_captions import ImageCaptionLoader | |
from werkzeug.utils import secure_filename | |
import tempfile | |
import torch | |
from langchain_groq import ChatGroq | |
from langgraph.prebuilt import create_react_agent | |
load_dotenv() | |
# os.environ["GROQ_API_KEY"] = os.getenv("GROQ_API_KEY") | |
groq_api_key = os.getenv("GROQ_API_KEY") | |
llm = ChatGroq( | |
model="meta-llama/llama-4-maverick-17b-128e-instruct", | |
temperature=0, | |
max_tokens=None, | |
) | |
app = Flask(__name__) | |
pytesseract.pytesseract.tesseract_cmd = r"C:\Program Files\Tesseract-OCR\tesseract.exe" | |
poppler_path=r"C:\poppler-23.11.0\Library\bin" | |
count = 0 | |
PDF_GET = r"E:\Pratham\2025\Harsh Sir\Scratch Vision\images\scratch_crab.pdf" | |
OUTPUT_FOLDER = "OUTPUTS" | |
DETECTED_IMAGE_FOLDER_PATH = os.path.join(OUTPUT_FOLDER,"DETECTED_IMAGE") | |
IMAGE_FOLDER_PATH = os.path.join(OUTPUT_FOLDER, "SCANNED_IMAGE") | |
JSON_FOLDER_PATH = os.path.join(OUTPUT_FOLDER, "EXTRACTED_JSON") | |
for path in [OUTPUT_FOLDER, IMAGE_FOLDER_PATH, DETECTED_IMAGE_FOLDER_PATH, JSON_FOLDER_PATH]: | |
os.makedirs(path, exist_ok=True) | |
# Model Initialization | |
try: | |
smolvlm256m_processor = AutoProcessor.from_pretrained("HuggingFaceTB/SmolVLM-256M-Instruct") | |
# smolvlm256m_model = AutoModelForImageTextToText.from_pretrained("HuggingFaceTB/SmolVLM-256M-Instruct").to("cpu") | |
smolvlm256m_model = AutoModelForVision2Seq.from_pretrained( | |
"HuggingFaceTB/SmolVLM-256M-Instruct", | |
torch_dtype=torch.bfloat16 if hasattr(torch, "bfloat16") else torch.float32, | |
_attn_implementation="eager" | |
).to("cpu") | |
except Exception as e: | |
raise RuntimeError(f"❌ Failed to load SmolVLM model: {str(e)}") | |
# SmolVLM Image Captioning functioning | |
def get_smolvlm_caption(image: Image.Image, prompt: str = "") -> str: | |
try: | |
# Ensure exactly one <image> token | |
if "<image>" not in prompt: | |
prompt = f"<image> {prompt.strip()}" | |
num_image_tokens = prompt.count("<image>") | |
if num_image_tokens != 1: | |
raise ValueError(f"Prompt must contain exactly 1 <image> token. Found {num_image_tokens}") | |
inputs = smolvlm256m_processor(images=[image], text=[prompt], return_tensors="pt").to("cpu") | |
output_ids = smolvlm256m_model.generate(**inputs, max_new_tokens=100) | |
return smolvlm256m_processor.decode(output_ids[0], skip_special_tokens=True) | |
except Exception as e: | |
return f"❌ Error during caption generation: {str(e)}" | |
# --- FUNCTION: Extract images from saved PDF --- | |
def extract_images_from_pdf(pdf_path, output_json_path): | |
''' Extract images from PDF and generate structured sprite JSON ''' | |
try: | |
pdf_filename = os.path.splitext(os.path.basename(pdf_path))[0] # e.g., "scratch_crab" | |
pdf_dir_path = os.path.dirname(pdf_path).replace("/", "\\") | |
# Create subfolders | |
extracted_image_subdir = os.path.join(DETECTED_IMAGE_FOLDER_PATH, pdf_filename) | |
json_subdir = os.path.join(JSON_FOLDER_PATH, pdf_filename) | |
os.makedirs(extracted_image_subdir, exist_ok=True) | |
os.makedirs(json_subdir, exist_ok=True) | |
# Output paths | |
output_json_path = os.path.join(json_subdir, "extracted.json") | |
final_json_path = os.path.join(json_subdir, "extracted_sprites.json") | |
try: | |
elements = partition_pdf( | |
filename=pdf_path, | |
strategy="hi_res", | |
extract_image_block_types=["Image"], | |
extract_image_block_to_payload=True, # Set to True to get base64 in output | |
) | |
except Exception as e: | |
raise RuntimeError(f"❌ Failed to extract images from PDF: {str(e)}") | |
try: | |
with open(output_json_path, "w") as f: | |
json.dump([element.to_dict() for element in elements], f, indent=4) | |
except Exception as e: | |
raise RuntimeError(f"❌ Failed to write extracted.json: {str(e)}") | |
try: | |
# Display extracted images | |
with open(output_json_path, 'r') as file: | |
file_elements = json.load(file) | |
except Exception as e: | |
raise RuntimeError(f"❌ Failed to read extracted.json: {str(e)}") | |
# Prepare manipulated sprite JSON structure | |
manipulated_json = {} | |
# SET A SYSTEM PROMPT | |
system_prompt = """ | |
You are an expert in visual scene understanding. | |
Your Job is to analyze an image and respond acoording if asked for name give simple name by analyzing it and if ask for descrption generate a short description covering its elements. | |
Guidelines: | |
- Focus only the images given in Square Shape. | |
- Don't Consider Blank areas in Image as. | |
- Don't include generic summary or explanation outside the fields. | |
Return only string. | |
""" | |
agent = create_react_agent( | |
model = llm, | |
tools = [], | |
prompt = system_prompt | |
) | |
# If JSON already exists, load it and find the next available Sprite number | |
if os.path.exists(final_json_path): | |
with open(final_json_path, "r") as existing_file: | |
manipulated = json.load(existing_file) | |
# Determine the next available index (e.g., Sprite 4 if 1–3 already exist) | |
existing_keys = [int(k.replace("Sprite ", "")) for k in manipulated.keys()] | |
start_count = max(existing_keys, default=0) + 1 | |
else: | |
start_count = 1 | |
sprite_count = start_count | |
for i,element in enumerate(file_elements): | |
if "image_base64" in element["metadata"]: | |
try: | |
image_data = base64.b64decode(element["metadata"]["image_base64"]) | |
image = Image.open(io.BytesIO(image_data)).convert("RGB") | |
image.show(title=f"Extracted Image {i+1}") | |
image_path = os.path.join(extracted_image_subdir, f"Sprite_{i+1}.png") | |
image.save(image_path) | |
with open(image_path, "rb") as image_file: | |
image_bytes = image_file.read() | |
img_base64 = base64.b64encode(image_bytes).decode("utf-8") | |
# description = get_smolvlm_caption(image, prompt="Give a brief Description") | |
# name = get_smolvlm_caption(image, prompt="give a short name/title of this Image.") | |
def clean_caption_output(raw_output: str, prompt: str) -> str: | |
answer = raw_output.replace(prompt, '').replace("<image>", '').strip(" :-\n") | |
return answer | |
prompt_description = "Give a brief Captioning." | |
prompt_name = "give a short name caption of this Image." | |
content1 = [ | |
{ | |
"type": "text", | |
"text": f"{prompt_description}" | |
}, | |
{ | |
"type": "image_url", | |
"image_url": { | |
"url": f"data:image/jpeg;base64,{img_base64}" | |
} | |
} | |
] | |
response1 = agent.invoke({"messages": [{"role": "user", "content":content1}]}) | |
print(response1) | |
description = response1["messages"][-1].content | |
content2 = [ | |
{ | |
"type": "text", | |
"text": f"{prompt_name}" | |
}, | |
{ | |
"type": "image_url", | |
"image_url": { | |
"url": f"data:image/jpeg;base64,{img_base64}" | |
} | |
} | |
] | |
response2 = agent.invoke({"messages": [{"role": "user", "content":content2}]}) | |
print(response2) | |
name = response2["messages"][-1].content | |
#raw_description = get_smolvlm_caption(image, prompt=prompt_description) | |
#raw_name = get_smolvlm_caption(image, prompt=prompt_name) | |
#description = clean_caption_output(raw_description, prompt_description) | |
#name = clean_caption_output(raw_name, prompt_name) | |
manipulated_json[f"Sprite {sprite_count}"] = { | |
"name": name, | |
"base64": element["metadata"]["image_base64"], | |
"file-path": pdf_dir_path, | |
"description":description | |
} | |
sprite_count += 1 | |
except Exception as e: | |
print(f"⚠️ Error processing Sprite {i+1}: {str(e)}") | |
# Save manipulated JSON | |
with open(final_json_path, "w") as sprite_file: | |
json.dump(manipulated_json, sprite_file, indent=4) | |
print(f"✅ Manipulated sprite JSON saved: {final_json_path}") | |
return final_json_path, manipulated_json | |
except Exception as e: | |
raise RuntimeError(f"❌ Error in extract_images_from_pdf: {str(e)}") | |
def index(): | |
return render_template('app_index.html') | |
# API endpoint | |
def process_pdf(): | |
try: | |
logger.info("Received request to process PDF.") | |
if 'pdf_file' not in request.files: | |
logger.warning("No PDF file found in request.") | |
return jsonify({"error": "Missing PDF file in form-data with key 'pdf_file'"}), 400 | |
pdf_file = request.files['pdf_file'] | |
if pdf_file.filename == '': | |
return jsonify({"error": "Empty filename"}), 400 | |
# Save the uploaded PDF temporarily | |
filename = secure_filename(pdf_file.filename) | |
temp_dir = tempfile.mkdtemp() | |
saved_pdf_path = os.path.join(temp_dir, filename) | |
pdf_file.save(saved_pdf_path) | |
logger.info(f"Saved uploaded PDF to: {saved_pdf_path}") | |
# Extract & process | |
json_path = None | |
output_path, result = extract_images_from_pdf(saved_pdf_path, json_path) | |
logger.info("Received request to process PDF.") | |
return jsonify({ | |
"message": "✅ PDF processed successfully", | |
"output_json": output_path, | |
"sprites": result | |
}) | |
except Exception as e: | |
logger.exception("❌ Failed to process PDF") | |
return jsonify({"error": f"❌ Failed to process PDF: {str(e)}"}), 500 | |
if __name__ == '__main__': | |
app.run(host='0.0.0.0', port=7860, debug=True) |