Scratch_Vision_Game_dup / live_streaming_flask.py
prthm11's picture
Update live_streaming_flask.py
14a8402 verified
from flask import Flask, render_template, Response, flash, redirect, url_for
import cv2
import numpy as np
from unstructured.partition.pdf import partition_pdf
import json, base64, io, os
from PIL import Image, ImageEnhance, ImageDraw
from imutils.perspective import four_point_transform
from dotenv import load_dotenv
import pytesseract
from transformers import BlipProcessor, BlipForConditionalGeneration
load_dotenv()
app = Flask(__name__)
app.secret_key = os.getenv("SECRET_KEY")
pytesseract.pytesseract.tesseract_cmd = r"C:\Program Files\Tesseract-OCR\tesseract.exe"
poppler_path=r"C:\poppler-23.11.0\Library\bin"
count = 0
OUTPUT_FOLDER = "OUTPUTS"
IMAGE_FOLDER_PATH = os.path.join(OUTPUT_FOLDER, "SCANNED_IMAGE")
DETECTED_IMAGE_FOLDER_PATH = os.path.join(OUTPUT_FOLDER,"DETECTED_IMAGE")
PDF_FOLDER_PATH = os.path.join(OUTPUT_FOLDER, "SCANNED_PDF")
JSON_FOLDER_PATH = os.path.join(OUTPUT_FOLDER, "EXTRACTED_JSON")
for path in [OUTPUT_FOLDER, IMAGE_FOLDER_PATH, DETECTED_IMAGE_FOLDER_PATH, PDF_FOLDER_PATH, JSON_FOLDER_PATH]:
os.makedirs(path, exist_ok=True)
# camera = cv2.VideoCapture('rtsp://freja.hiof.no:1935/rtplive/_definst_/hessdalen03.stream') # use 0 for web camera
# for cctv camera use rtsp://username:password@ip_address:554/user=username_password='password'_channel=channel_number_stream=0.sdp' instead of camera
# for local webcam use
camera= cv2.VideoCapture(0)
# camera = cv2.VideoCapture("http://wmccpinetop.axiscam.net/mjpg/video.mjpg")
# ret, frame = camera.read()
# if not ret:
# raise RuntimeError("❌ Failed to connect to RTSP stream. Check URL or connectivity.")
# Increase resolution if supported by the webcam
# camera.set(cv2.CAP_PROP_FRAME_WIDTH, 1280)
# camera.set(cv2.CAP_PROP_FRAME_HEIGHT, 720)
# camera.set(cv2.CAP_PROP_FPS, 30)
# camera.set(cv2.CAP_PROP_AUTOFOCUS, 1) # Enable autofocus
# --- FUNCTION: Detect document contour ---
def detect_document_contour(image):
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
blur = cv2.GaussianBlur(gray, (5, 5), 0)
_, thresh = cv2.threshold(blur, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)
contours, _ = cv2.findContours(thresh, cv2.RETR_LIST, cv2.CHAIN_APPROX_SIMPLE)
contours = sorted(contours, key=cv2.contourArea, reverse=True)
for contour in contours:
area = cv2.contourArea(contour)
if area > 1000:
peri = cv2.arcLength(contour, True)
approx = cv2.approxPolyDP(contour, 0.02 * peri, True)
if len(approx) == 4:
return approx
return None
def load_image(image_path):
ext = os.path.splitext(image_path)[1].lower()
if ext in ['.png', '.jpg', '.jpeg', '.webp', '.tiff']:
image = cv2.imread(image_path)
cv2.imshow("Original Image",image)
print(f"Image : {image}")
if image is None:
raise ValueError(f"Failed to load image from {image_path}. The file may be corrupted or unreadable.")
return image
else:
raise ValueError(f"Unsupported image format: {ext}")
# Function for upscaling image using OpenCV's INTER_CUBIC
def upscale_image(image, scale=2):
height, width = image.shape[:2]
upscaled_image = cv2.resize(image, (width * scale, height * scale), interpolation=cv2.INTER_CUBIC)
print(f"UPSCALE IMAGE : {upscaled_image}")
return upscaled_image
# Function to denoise the image (reduce noise)
def reduce_noise(image):
return cv2.fastNlMeansDenoisingColored(image, None, 10, 10, 7, 21)
# Function to sharpen the image
def sharpen_image(image):
kernel = np.array([[0, -1, 0],
[-1, 5, -1],
[0, -1, 0]])
sharpened_image = cv2.filter2D(image, -1, kernel)
return sharpened_image
# Function to increase contrast and enhance details without changing color
def enhance_image(image):
pil_img = Image.fromarray(cv2.cvtColor(image, cv2.COLOR_BGR2RGB))
enhancer = ImageEnhance.Contrast(pil_img)
enhanced_image = enhancer.enhance(1.5)
enhanced_image_bgr = cv2.cvtColor(np.array(enhanced_image), cv2.COLOR_RGB2BGR)
return enhanced_image_bgr
# Complete function to process image
def process_image(image_path, scale=2):
# Load the image
image = load_image(image_path)
# Upscale the image
upscaled_image = upscale_image(image, scale)
# Reduce noise
denoised_image = reduce_noise(upscaled_image)
# Sharpen the image
sharpened_image = sharpen_image(denoised_image)
# Enhance the image contrast and details without changing color
final_image = enhance_image(sharpened_image)
print(f"FINAL IMAGE : {final_image}")
cv2.imshow("Final Image",final_image)
return final_image
# BLIP : Bootstrapped Language-Image Pretraining
""" BlipProcessor: converts Image into tensor format"""
blip_processor = BlipProcessor.from_pretrained("Salesforce/blip-image-captioning-base")
# print(f"BLIP Processor: {blip_processor}")
""" BlipForConditionalGeneration: Generates the Image Caption(text)"""
blip_model = BlipForConditionalGeneration.from_pretrained("Salesforce/blip-image-captioning-base").to("cpu")
print(f"BLIP Model: {blip_model}")
def get_blip_description(image: Image.Image) -> str:
inputs = blip_processor(image, return_tensors="pt").to("cpu")
output = blip_model.generate(**inputs, max_new_tokens=100)
caption = blip_processor.decode(output[0], skip_special_tokens=True)
return caption
# --- FUNCTION: Extract images from saved PDF ---
def extract_images_from_pdf(pdf_path, output_json_path):
elements = partition_pdf(
filename=pdf_path,
strategy="hi_res",
extract_image_block_types=["Image"], # or ["Image", "Table"]
extract_image_block_to_payload=True, # Set to True to get base64 in output
)
with open(output_json_path, "w") as f:
json.dump([element.to_dict() for element in elements], f, indent=4)
# Display extracted images
with open(output_json_path, 'r') as file:
file_elements = json.load(file)
extracted_images_dir = os.path.join(os.path.dirname(output_json_path), "extracted_images")
os.makedirs(extracted_images_dir, exist_ok=True)
# Prepare manipulated sprite JSON structure
manipulated_json = {}
pdf_filename = os.path.basename(pdf_path)
pdf_dir_path = os.path.dirname(pdf_path).replace("/", "\\") # windows-style
sprite_count = 1
for i, element in enumerate(file_elements):
if "image_base64" in element["metadata"]:
image_data = base64.b64decode(element["metadata"]["image_base64"])
# image = Image.open(io.BytesIO(image_data))
image = Image.open(io.BytesIO(image_data)).convert("RGB")
image.show(title=f"Extracted Image {i+1}")
# image.save(DETECTED_IMAGE_FOLDER_PATH, f"Extracted Image {i+1}.png")
description = get_blip_description(image)
manipulated_json[f"Sprite {sprite_count}"] = {
"name": pdf_filename,
"base64": element["metadata"]["image_base64"],
"file-path": pdf_dir_path,
"description":description
}
sprite_count += 1
# Save manipulated JSON
manipulated_json_path = output_json_path.replace(".json", "_sprites.json")
with open(manipulated_json_path, "w") as sprite_file:
json.dump(manipulated_json, sprite_file, indent=4)
print(f"✅ Manipulated sprite JSON saved: {manipulated_json_path}")
display = None
scale = 0.5
contour = None
def gen_frames(): # generate frame by frame from camera
global display
while True:
# Capture frame-by-frame
success, frame = camera.read() # read the camera frame
if not success:
break
else:
display = frame.copy()
contour = detect_document_contour(display)
if contour is not None:
cv2.drawContours(display, [contour], -1, (0, 255, 0), 3)
resized = cv2.resize(display, (int(scale * display.shape[1]), int(scale * display.shape[0])))
cv2.imshow("📷 Scan Document - Press 's' to Save, ESC to Exit", resized)
ret, buffer = cv2.imencode('.jpg', resized)
frame = buffer.tobytes()
yield (b'--frame\r\n'
b'Content-Type: image/jpeg\r\n\r\n' + frame + b'\r\n') # concat frame one by one and show result
# --- Route: Scan Document ---
@app.route("/capture", methods=['POST'])
def capture_document():
global count, display
if display is None:
flash("❌ No frame captured!", "error")
return redirect(url_for("index"))
frame = display.copy()
contour = detect_document_contour(frame)
if contour is None:
flash("❌ No document contour found!", "error")
return redirect(url_for("index"))
warped = four_point_transform(frame, contour.reshape(4, 2))
image_path = os.path.join(IMAGE_FOLDER_PATH, f"scanned_colored_{count}.jpg")
pdf_path = os.path.join(PDF_FOLDER_PATH, f"scanned_colored_{count}.pdf")
json_path = os.path.join(JSON_FOLDER_PATH, f"scanned_{count}.json")
# json_path = os.path.join(DETECTED_IMAGE_FOLDER_PATH, f"scanned_{count}.json")
cv2.imwrite(image_path, warped)
# img = process_image(image_path)
# # img = Image.open(image_path).convert("RGB")
# img.save(pdf_path)
img = process_image(image_path)
pil_img = Image.fromarray(cv2.cvtColor(img, cv2.COLOR_BGR2RGB))
pil_img.save(pdf_path)
extract_images_from_pdf(pdf_path, json_path)
flash("✅ Document scanned and saved!", "success")
count += 1
return redirect(url_for("index"))
@app.route('/video_feed')
def video_feed():
#Video streaming route. Put this in the src attribute of an img tag
return Response(gen_frames(), mimetype='multipart/x-mixed-replace; boundary=frame')
@app.route('/')
def index():
"""Video streaming home page."""
return render_template('live_streaming_index.html')
if __name__ == '__main__':
app.run(host="0.0.0.0", port=7860, debug=False)