Spaces:
Sleeping
Sleeping
File size: 10,152 Bytes
dae324d d08a38d dae324d 8412bf8 51646fe dae324d 14a8402 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 |
from flask import Flask, render_template, Response, flash, redirect, url_for
import cv2
import numpy as np
from unstructured.partition.pdf import partition_pdf
import json, base64, io, os
from PIL import Image, ImageEnhance, ImageDraw
from imutils.perspective import four_point_transform
from dotenv import load_dotenv
import pytesseract
from transformers import BlipProcessor, BlipForConditionalGeneration
load_dotenv()
app = Flask(__name__)
app.secret_key = os.getenv("SECRET_KEY")
pytesseract.pytesseract.tesseract_cmd = r"C:\Program Files\Tesseract-OCR\tesseract.exe"
poppler_path=r"C:\poppler-23.11.0\Library\bin"
count = 0
OUTPUT_FOLDER = "OUTPUTS"
IMAGE_FOLDER_PATH = os.path.join(OUTPUT_FOLDER, "SCANNED_IMAGE")
DETECTED_IMAGE_FOLDER_PATH = os.path.join(OUTPUT_FOLDER,"DETECTED_IMAGE")
PDF_FOLDER_PATH = os.path.join(OUTPUT_FOLDER, "SCANNED_PDF")
JSON_FOLDER_PATH = os.path.join(OUTPUT_FOLDER, "EXTRACTED_JSON")
for path in [OUTPUT_FOLDER, IMAGE_FOLDER_PATH, DETECTED_IMAGE_FOLDER_PATH, PDF_FOLDER_PATH, JSON_FOLDER_PATH]:
os.makedirs(path, exist_ok=True)
# camera = cv2.VideoCapture('rtsp://freja.hiof.no:1935/rtplive/_definst_/hessdalen03.stream') # use 0 for web camera
# for cctv camera use rtsp://username:password@ip_address:554/user=username_password='password'_channel=channel_number_stream=0.sdp' instead of camera
# for local webcam use
camera= cv2.VideoCapture(0)
# camera = cv2.VideoCapture("http://wmccpinetop.axiscam.net/mjpg/video.mjpg")
# ret, frame = camera.read()
# if not ret:
# raise RuntimeError("❌ Failed to connect to RTSP stream. Check URL or connectivity.")
# Increase resolution if supported by the webcam
# camera.set(cv2.CAP_PROP_FRAME_WIDTH, 1280)
# camera.set(cv2.CAP_PROP_FRAME_HEIGHT, 720)
# camera.set(cv2.CAP_PROP_FPS, 30)
# camera.set(cv2.CAP_PROP_AUTOFOCUS, 1) # Enable autofocus
# --- FUNCTION: Detect document contour ---
def detect_document_contour(image):
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
blur = cv2.GaussianBlur(gray, (5, 5), 0)
_, thresh = cv2.threshold(blur, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)
contours, _ = cv2.findContours(thresh, cv2.RETR_LIST, cv2.CHAIN_APPROX_SIMPLE)
contours = sorted(contours, key=cv2.contourArea, reverse=True)
for contour in contours:
area = cv2.contourArea(contour)
if area > 1000:
peri = cv2.arcLength(contour, True)
approx = cv2.approxPolyDP(contour, 0.02 * peri, True)
if len(approx) == 4:
return approx
return None
def load_image(image_path):
ext = os.path.splitext(image_path)[1].lower()
if ext in ['.png', '.jpg', '.jpeg', '.webp', '.tiff']:
image = cv2.imread(image_path)
cv2.imshow("Original Image",image)
print(f"Image : {image}")
if image is None:
raise ValueError(f"Failed to load image from {image_path}. The file may be corrupted or unreadable.")
return image
else:
raise ValueError(f"Unsupported image format: {ext}")
# Function for upscaling image using OpenCV's INTER_CUBIC
def upscale_image(image, scale=2):
height, width = image.shape[:2]
upscaled_image = cv2.resize(image, (width * scale, height * scale), interpolation=cv2.INTER_CUBIC)
print(f"UPSCALE IMAGE : {upscaled_image}")
return upscaled_image
# Function to denoise the image (reduce noise)
def reduce_noise(image):
return cv2.fastNlMeansDenoisingColored(image, None, 10, 10, 7, 21)
# Function to sharpen the image
def sharpen_image(image):
kernel = np.array([[0, -1, 0],
[-1, 5, -1],
[0, -1, 0]])
sharpened_image = cv2.filter2D(image, -1, kernel)
return sharpened_image
# Function to increase contrast and enhance details without changing color
def enhance_image(image):
pil_img = Image.fromarray(cv2.cvtColor(image, cv2.COLOR_BGR2RGB))
enhancer = ImageEnhance.Contrast(pil_img)
enhanced_image = enhancer.enhance(1.5)
enhanced_image_bgr = cv2.cvtColor(np.array(enhanced_image), cv2.COLOR_RGB2BGR)
return enhanced_image_bgr
# Complete function to process image
def process_image(image_path, scale=2):
# Load the image
image = load_image(image_path)
# Upscale the image
upscaled_image = upscale_image(image, scale)
# Reduce noise
denoised_image = reduce_noise(upscaled_image)
# Sharpen the image
sharpened_image = sharpen_image(denoised_image)
# Enhance the image contrast and details without changing color
final_image = enhance_image(sharpened_image)
print(f"FINAL IMAGE : {final_image}")
cv2.imshow("Final Image",final_image)
return final_image
# BLIP : Bootstrapped Language-Image Pretraining
""" BlipProcessor: converts Image into tensor format"""
blip_processor = BlipProcessor.from_pretrained("Salesforce/blip-image-captioning-base")
# print(f"BLIP Processor: {blip_processor}")
""" BlipForConditionalGeneration: Generates the Image Caption(text)"""
blip_model = BlipForConditionalGeneration.from_pretrained("Salesforce/blip-image-captioning-base").to("cpu")
print(f"BLIP Model: {blip_model}")
def get_blip_description(image: Image.Image) -> str:
inputs = blip_processor(image, return_tensors="pt").to("cpu")
output = blip_model.generate(**inputs, max_new_tokens=100)
caption = blip_processor.decode(output[0], skip_special_tokens=True)
return caption
# --- FUNCTION: Extract images from saved PDF ---
def extract_images_from_pdf(pdf_path, output_json_path):
elements = partition_pdf(
filename=pdf_path,
strategy="hi_res",
extract_image_block_types=["Image"], # or ["Image", "Table"]
extract_image_block_to_payload=True, # Set to True to get base64 in output
)
with open(output_json_path, "w") as f:
json.dump([element.to_dict() for element in elements], f, indent=4)
# Display extracted images
with open(output_json_path, 'r') as file:
file_elements = json.load(file)
extracted_images_dir = os.path.join(os.path.dirname(output_json_path), "extracted_images")
os.makedirs(extracted_images_dir, exist_ok=True)
# Prepare manipulated sprite JSON structure
manipulated_json = {}
pdf_filename = os.path.basename(pdf_path)
pdf_dir_path = os.path.dirname(pdf_path).replace("/", "\\") # windows-style
sprite_count = 1
for i, element in enumerate(file_elements):
if "image_base64" in element["metadata"]:
image_data = base64.b64decode(element["metadata"]["image_base64"])
# image = Image.open(io.BytesIO(image_data))
image = Image.open(io.BytesIO(image_data)).convert("RGB")
image.show(title=f"Extracted Image {i+1}")
# image.save(DETECTED_IMAGE_FOLDER_PATH, f"Extracted Image {i+1}.png")
description = get_blip_description(image)
manipulated_json[f"Sprite {sprite_count}"] = {
"name": pdf_filename,
"base64": element["metadata"]["image_base64"],
"file-path": pdf_dir_path,
"description":description
}
sprite_count += 1
# Save manipulated JSON
manipulated_json_path = output_json_path.replace(".json", "_sprites.json")
with open(manipulated_json_path, "w") as sprite_file:
json.dump(manipulated_json, sprite_file, indent=4)
print(f"✅ Manipulated sprite JSON saved: {manipulated_json_path}")
display = None
scale = 0.5
contour = None
def gen_frames(): # generate frame by frame from camera
global display
while True:
# Capture frame-by-frame
success, frame = camera.read() # read the camera frame
if not success:
break
else:
display = frame.copy()
contour = detect_document_contour(display)
if contour is not None:
cv2.drawContours(display, [contour], -1, (0, 255, 0), 3)
resized = cv2.resize(display, (int(scale * display.shape[1]), int(scale * display.shape[0])))
cv2.imshow("📷 Scan Document - Press 's' to Save, ESC to Exit", resized)
ret, buffer = cv2.imencode('.jpg', resized)
frame = buffer.tobytes()
yield (b'--frame\r\n'
b'Content-Type: image/jpeg\r\n\r\n' + frame + b'\r\n') # concat frame one by one and show result
# --- Route: Scan Document ---
@app.route("/capture", methods=['POST'])
def capture_document():
global count, display
if display is None:
flash("❌ No frame captured!", "error")
return redirect(url_for("index"))
frame = display.copy()
contour = detect_document_contour(frame)
if contour is None:
flash("❌ No document contour found!", "error")
return redirect(url_for("index"))
warped = four_point_transform(frame, contour.reshape(4, 2))
image_path = os.path.join(IMAGE_FOLDER_PATH, f"scanned_colored_{count}.jpg")
pdf_path = os.path.join(PDF_FOLDER_PATH, f"scanned_colored_{count}.pdf")
json_path = os.path.join(JSON_FOLDER_PATH, f"scanned_{count}.json")
# json_path = os.path.join(DETECTED_IMAGE_FOLDER_PATH, f"scanned_{count}.json")
cv2.imwrite(image_path, warped)
# img = process_image(image_path)
# # img = Image.open(image_path).convert("RGB")
# img.save(pdf_path)
img = process_image(image_path)
pil_img = Image.fromarray(cv2.cvtColor(img, cv2.COLOR_BGR2RGB))
pil_img.save(pdf_path)
extract_images_from_pdf(pdf_path, json_path)
flash("✅ Document scanned and saved!", "success")
count += 1
return redirect(url_for("index"))
@app.route('/video_feed')
def video_feed():
#Video streaming route. Put this in the src attribute of an img tag
return Response(gen_frames(), mimetype='multipart/x-mixed-replace; boundary=frame')
@app.route('/')
def index():
"""Video streaming home page."""
return render_template('live_streaming_index.html')
if __name__ == '__main__':
app.run(host="0.0.0.0", port=7860, debug=False) |