from flask import Flask, render_template, Response, flash, redirect, url_for import cv2 import numpy as np from unstructured.partition.pdf import partition_pdf import json, base64, io, os from PIL import Image, ImageEnhance, ImageDraw from imutils.perspective import four_point_transform from dotenv import load_dotenv import pytesseract from transformers import BlipProcessor, BlipForConditionalGeneration load_dotenv() app = Flask(__name__) app.secret_key = os.getenv("SECRET_KEY") pytesseract.pytesseract.tesseract_cmd = r"C:\Program Files\Tesseract-OCR\tesseract.exe" poppler_path=r"C:\poppler-23.11.0\Library\bin" count = 0 OUTPUT_FOLDER = "OUTPUTS" IMAGE_FOLDER_PATH = os.path.join(OUTPUT_FOLDER, "SCANNED_IMAGE") DETECTED_IMAGE_FOLDER_PATH = os.path.join(OUTPUT_FOLDER,"DETECTED_IMAGE") PDF_FOLDER_PATH = os.path.join(OUTPUT_FOLDER, "SCANNED_PDF") JSON_FOLDER_PATH = os.path.join(OUTPUT_FOLDER, "EXTRACTED_JSON") for path in [OUTPUT_FOLDER, IMAGE_FOLDER_PATH, DETECTED_IMAGE_FOLDER_PATH, PDF_FOLDER_PATH, JSON_FOLDER_PATH]: os.makedirs(path, exist_ok=True) # camera = cv2.VideoCapture('rtsp://freja.hiof.no:1935/rtplive/_definst_/hessdalen03.stream') # use 0 for web camera # for cctv camera use rtsp://username:password@ip_address:554/user=username_password='password'_channel=channel_number_stream=0.sdp' instead of camera # for local webcam use camera= cv2.VideoCapture(0) # camera = cv2.VideoCapture("http://wmccpinetop.axiscam.net/mjpg/video.mjpg") # ret, frame = camera.read() # if not ret: # raise RuntimeError("❌ Failed to connect to RTSP stream. Check URL or connectivity.") # Increase resolution if supported by the webcam # camera.set(cv2.CAP_PROP_FRAME_WIDTH, 1280) # camera.set(cv2.CAP_PROP_FRAME_HEIGHT, 720) # camera.set(cv2.CAP_PROP_FPS, 30) # camera.set(cv2.CAP_PROP_AUTOFOCUS, 1) # Enable autofocus # --- FUNCTION: Detect document contour --- def detect_document_contour(image): gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) blur = cv2.GaussianBlur(gray, (5, 5), 0) _, thresh = cv2.threshold(blur, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU) contours, _ = cv2.findContours(thresh, cv2.RETR_LIST, cv2.CHAIN_APPROX_SIMPLE) contours = sorted(contours, key=cv2.contourArea, reverse=True) for contour in contours: area = cv2.contourArea(contour) if area > 1000: peri = cv2.arcLength(contour, True) approx = cv2.approxPolyDP(contour, 0.02 * peri, True) if len(approx) == 4: return approx return None def load_image(image_path): ext = os.path.splitext(image_path)[1].lower() if ext in ['.png', '.jpg', '.jpeg', '.webp', '.tiff']: image = cv2.imread(image_path) cv2.imshow("Original Image",image) print(f"Image : {image}") if image is None: raise ValueError(f"Failed to load image from {image_path}. The file may be corrupted or unreadable.") return image else: raise ValueError(f"Unsupported image format: {ext}") # Function for upscaling image using OpenCV's INTER_CUBIC def upscale_image(image, scale=2): height, width = image.shape[:2] upscaled_image = cv2.resize(image, (width * scale, height * scale), interpolation=cv2.INTER_CUBIC) print(f"UPSCALE IMAGE : {upscaled_image}") return upscaled_image # Function to denoise the image (reduce noise) def reduce_noise(image): return cv2.fastNlMeansDenoisingColored(image, None, 10, 10, 7, 21) # Function to sharpen the image def sharpen_image(image): kernel = np.array([[0, -1, 0], [-1, 5, -1], [0, -1, 0]]) sharpened_image = cv2.filter2D(image, -1, kernel) return sharpened_image # Function to increase contrast and enhance details without changing color def enhance_image(image): pil_img = Image.fromarray(cv2.cvtColor(image, cv2.COLOR_BGR2RGB)) enhancer = ImageEnhance.Contrast(pil_img) enhanced_image = enhancer.enhance(1.5) enhanced_image_bgr = cv2.cvtColor(np.array(enhanced_image), cv2.COLOR_RGB2BGR) return enhanced_image_bgr # Complete function to process image def process_image(image_path, scale=2): # Load the image image = load_image(image_path) # Upscale the image upscaled_image = upscale_image(image, scale) # Reduce noise denoised_image = reduce_noise(upscaled_image) # Sharpen the image sharpened_image = sharpen_image(denoised_image) # Enhance the image contrast and details without changing color final_image = enhance_image(sharpened_image) print(f"FINAL IMAGE : {final_image}") cv2.imshow("Final Image",final_image) return final_image # BLIP : Bootstrapped Language-Image Pretraining """ BlipProcessor: converts Image into tensor format""" blip_processor = BlipProcessor.from_pretrained("Salesforce/blip-image-captioning-base") # print(f"BLIP Processor: {blip_processor}") """ BlipForConditionalGeneration: Generates the Image Caption(text)""" blip_model = BlipForConditionalGeneration.from_pretrained("Salesforce/blip-image-captioning-base").to("cpu") print(f"BLIP Model: {blip_model}") def get_blip_description(image: Image.Image) -> str: inputs = blip_processor(image, return_tensors="pt").to("cpu") output = blip_model.generate(**inputs, max_new_tokens=100) caption = blip_processor.decode(output[0], skip_special_tokens=True) return caption # --- FUNCTION: Extract images from saved PDF --- def extract_images_from_pdf(pdf_path, output_json_path): elements = partition_pdf( filename=pdf_path, strategy="hi_res", extract_image_block_types=["Image"], # or ["Image", "Table"] extract_image_block_to_payload=True, # Set to True to get base64 in output ) with open(output_json_path, "w") as f: json.dump([element.to_dict() for element in elements], f, indent=4) # Display extracted images with open(output_json_path, 'r') as file: file_elements = json.load(file) extracted_images_dir = os.path.join(os.path.dirname(output_json_path), "extracted_images") os.makedirs(extracted_images_dir, exist_ok=True) # Prepare manipulated sprite JSON structure manipulated_json = {} pdf_filename = os.path.basename(pdf_path) pdf_dir_path = os.path.dirname(pdf_path).replace("/", "\\") # windows-style sprite_count = 1 for i, element in enumerate(file_elements): if "image_base64" in element["metadata"]: image_data = base64.b64decode(element["metadata"]["image_base64"]) # image = Image.open(io.BytesIO(image_data)) image = Image.open(io.BytesIO(image_data)).convert("RGB") image.show(title=f"Extracted Image {i+1}") # image.save(DETECTED_IMAGE_FOLDER_PATH, f"Extracted Image {i+1}.png") description = get_blip_description(image) manipulated_json[f"Sprite {sprite_count}"] = { "name": pdf_filename, "base64": element["metadata"]["image_base64"], "file-path": pdf_dir_path, "description":description } sprite_count += 1 # Save manipulated JSON manipulated_json_path = output_json_path.replace(".json", "_sprites.json") with open(manipulated_json_path, "w") as sprite_file: json.dump(manipulated_json, sprite_file, indent=4) print(f"✅ Manipulated sprite JSON saved: {manipulated_json_path}") display = None scale = 0.5 contour = None def gen_frames(): # generate frame by frame from camera global display while True: # Capture frame-by-frame success, frame = camera.read() # read the camera frame if not success: break else: display = frame.copy() contour = detect_document_contour(display) if contour is not None: cv2.drawContours(display, [contour], -1, (0, 255, 0), 3) resized = cv2.resize(display, (int(scale * display.shape[1]), int(scale * display.shape[0]))) cv2.imshow("📷 Scan Document - Press 's' to Save, ESC to Exit", resized) ret, buffer = cv2.imencode('.jpg', resized) frame = buffer.tobytes() yield (b'--frame\r\n' b'Content-Type: image/jpeg\r\n\r\n' + frame + b'\r\n') # concat frame one by one and show result # --- Route: Scan Document --- @app.route("/capture", methods=['POST']) def capture_document(): global count, display if display is None: flash("❌ No frame captured!", "error") return redirect(url_for("index")) frame = display.copy() contour = detect_document_contour(frame) if contour is None: flash("❌ No document contour found!", "error") return redirect(url_for("index")) warped = four_point_transform(frame, contour.reshape(4, 2)) image_path = os.path.join(IMAGE_FOLDER_PATH, f"scanned_colored_{count}.jpg") pdf_path = os.path.join(PDF_FOLDER_PATH, f"scanned_colored_{count}.pdf") json_path = os.path.join(JSON_FOLDER_PATH, f"scanned_{count}.json") # json_path = os.path.join(DETECTED_IMAGE_FOLDER_PATH, f"scanned_{count}.json") cv2.imwrite(image_path, warped) # img = process_image(image_path) # # img = Image.open(image_path).convert("RGB") # img.save(pdf_path) img = process_image(image_path) pil_img = Image.fromarray(cv2.cvtColor(img, cv2.COLOR_BGR2RGB)) pil_img.save(pdf_path) extract_images_from_pdf(pdf_path, json_path) flash("✅ Document scanned and saved!", "success") count += 1 return redirect(url_for("index")) @app.route('/video_feed') def video_feed(): #Video streaming route. Put this in the src attribute of an img tag return Response(gen_frames(), mimetype='multipart/x-mixed-replace; boundary=frame') @app.route('/') def index(): """Video streaming home page.""" return render_template('live_streaming_index.html') if __name__ == '__main__': app.run(host="0.0.0.0", port=7860, debug=False)