Spaces:
Sleeping
Sleeping
import cv2 | |
import numpy as np | |
from PIL import Image, PngImagePlugin | |
import os | |
import uuid | |
def to_bin(data): | |
"""Convert data to binary format""" | |
if isinstance(data, str): | |
return ''.join(format(ord(i), '08b') for i in data) | |
elif isinstance(data, bytes): | |
return ''.join(format(i, '08b') for i in data) | |
elif isinstance(data, np.ndarray): | |
return [format(i, '08b') for i in data] | |
elif isinstance(data, int) or isinstance(data, np.uint8): | |
return format(data, '08b') | |
else: | |
raise TypeError("Type not supported.") | |
def encode(image_path, secret_data): | |
"""Encode watermark into image""" | |
try: | |
# Read the image | |
image = cv2.imread(image_path) | |
if image is None: | |
return None, "Failed to read image" | |
# Calculate maximum bytes for encoding | |
max_bytes = (image.shape[0] * image.shape[1] * 3) // 8 | |
# Add delimiters to secret data | |
secret_data = f"START{secret_data}END" | |
# Check if the data can fit in the image | |
if len(secret_data) * 8 > max_bytes: | |
return None, "Watermark is too large for image size" | |
# Convert data to binary | |
binary_secret_data = to_bin(secret_data) | |
data_len = len(binary_secret_data) | |
# Create output image | |
watermarked_image = image.copy() | |
data_index = 0 | |
# Embed the data | |
for i in range(watermarked_image.shape[0]): | |
if data_index >= data_len: | |
break | |
for j in range(watermarked_image.shape[1]): | |
if data_index >= data_len: | |
break | |
for k in range(3): | |
if data_index >= data_len: | |
break | |
# Get binary pixel value | |
binary_pixel = to_bin(watermarked_image[i, j, k]) | |
# Replace least significant bit | |
new_binary = binary_pixel[:-1] + binary_secret_data[data_index] | |
watermarked_image[i, j, k] = int(new_binary, 2) | |
data_index += 1 | |
return watermarked_image, None | |
except Exception as e: | |
return None, f"Error during encoding: {str(e)}" | |
def decode(image_path): | |
"""Decode watermark from image""" | |
try: | |
# Read the image | |
if isinstance(image_path, str): | |
image = cv2.imread(image_path) | |
else: | |
# Handle PIL Image input | |
image = cv2.cvtColor(np.array(Image.open(image_path)), cv2.COLOR_RGB2BGR) | |
if image is None: | |
return "Failed to read image" | |
binary_data = "" | |
# Extract LSB from each pixel | |
for i in range(image.shape[0]): | |
for j in range(image.shape[1]): | |
for k in range(3): | |
binary_pixel = to_bin(image[i, j, k]) | |
binary_data += binary_pixel[-1] | |
# Convert binary to text | |
text_data = "" | |
for i in range(0, len(binary_data), 8): | |
byte = binary_data[i:i+8] | |
if len(byte) == 8: | |
text_data += chr(int(byte, 2)) | |
# Check for end marker | |
if "END" in text_data: | |
# Extract the actual message between START and END | |
start_idx = text_data.find("START") | |
end_idx = text_data.find("END") | |
if start_idx != -1 and end_idx != -1: | |
return text_data[start_idx+5:end_idx] | |
return "No watermark found" | |
except Exception as e: | |
return f"Error during decoding: {str(e)}" | |
def png_encode(im_name, extra): | |
"""Encode watermark into PNG metadata""" | |
try: | |
im = Image.open(im_name) | |
info = PngImagePlugin.PngInfo() | |
info.add_text("TXT", extra) | |
unique_id = str(uuid.uuid4())[:8] | |
filename, ext = os.path.splitext(im_name) | |
new_filename = f"{filename}_{unique_id}.png" | |
im.save(new_filename, "PNG", pnginfo=info) | |
width, height = im.size | |
rect_width, rect_height = 200, 50 | |
x = width - rect_width - 10 | |
y = height - rect_height - 10 | |
global png_encode_coords | |
png_encode_coords = (x, y, rect_width, rect_height) | |
return new_filename, None | |
except Exception as e: | |
return None, str(e) | |