|
import gradio as gr |
|
from PIL import Image, ImageOps |
|
import requests |
|
import io |
|
import json |
|
import logging |
|
import os |
|
import struct |
|
import tempfile |
|
from cryptography.hazmat.primitives import serialization |
|
from cryptography.hazmat.primitives.asymmetric import rsa, padding |
|
from cryptography.hazmat.primitives.ciphers.aead import AESGCM |
|
from cryptography.hazmat.primitives import hashes |
|
import numpy as np |
|
|
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') |
|
|
|
|
|
class AppServerLogic: |
|
"""Contains all backend logic and mock data needed by the application.""" |
|
def __init__(self): |
|
self.private_key_object = None |
|
self.public_key_pem = "" |
|
self.mock_user_db = { |
|
"demo-user": {"api_key": "sk-12345-abcde", "password": "password123"}, |
|
"admin-user": {"api_key": "sk-67890-fghij", "password": "adminpass"} |
|
} |
|
self._initialize_keys() |
|
|
|
def _initialize_keys(self): |
|
key_pem = os.environ.get('KEYLOCK_PRIV_KEY') |
|
if not key_pem: |
|
pk = rsa.generate_private_key(public_exponent=65537, key_size=2048) |
|
key_pem = pk.private_bytes(encoding=serialization.Encoding.PEM, format=serialization.PrivateFormat.PKCS8, encryption_algorithm=serialization.NoEncryption()).decode('utf-8') |
|
try: |
|
self.private_key_object = serialization.load_pem_private_key(key_pem.encode(), password=None) |
|
self.public_key_pem = self.private_key_object.public_key().public_bytes(encoding=serialization.Encoding.PEM, format=serialization.PublicFormat.SubjectPublicKeyInfo).decode('utf-8') |
|
except Exception as e: |
|
logging.error(f"Key initialization failed: {e}") |
|
|
|
def decode_payload(self, image_input): |
|
if not self.private_key_object: |
|
return {"status": "Error", "message": "Server key not configured."} |
|
try: |
|
pixel_data = np.array(image_input.convert("RGB")).ravel() |
|
header_binary = "".join(str(p & 1) for p in pixel_data[:32]) |
|
data_length = int(header_binary, 2) |
|
required_pixels = 32 + data_length * 8 |
|
if required_pixels > len(pixel_data): |
|
raise ValueError("Incomplete payload in image.") |
|
data_binary = "".join(str(p & 1) for p in pixel_data[32:required_pixels]) |
|
crypto_payload = int(data_binary, 2).to_bytes(data_length, byteorder='big') |
|
offset = 4 |
|
encrypted_aes_key_len = struct.unpack('>I', crypto_payload[:offset])[0] |
|
encrypted_aes_key = crypto_payload[offset:offset + encrypted_aes_key_len]; offset += encrypted_aes_key_len |
|
nonce = crypto_payload[offset:offset + 12]; offset += 12 |
|
ciphertext = crypto_payload[offset:] |
|
recovered_aes_key = self.private_key_object.decrypt(encrypted_aes_key, padding.OAEP(mgf=padding.MGF1(hashes.SHA256()), algorithm=hashes.SHA256(), label=None)) |
|
payload = json.loads(AESGCM(recovered_aes_key).decrypt(nonce, ciphertext, None).decode()) |
|
return {"status": "Success", "payload": payload} |
|
except Exception as e: |
|
return {"status": "Error", "message": f"Decryption Failed: {e}"} |
|
|
|
def generate_encrypted_image(self, payload_dict): |
|
""" |
|
Creates a KeyLock image and returns both a Pillow object for preview |
|
and a filepath to a saved, uncorrupted PNG for download. |
|
""" |
|
try: |
|
response = requests.get("https://images.unsplash.com/photo-1506318137071-a8e063b4bec0?q=80&w=1200", timeout=10) |
|
img = ImageOps.fit(Image.open(io.BytesIO(response.content)).convert("RGB"), (600, 600)) |
|
except: |
|
img = Image.new('RGB', (600, 600), color=(15, 23, 42)) |
|
|
|
json_bytes = json.dumps(payload_dict).encode('utf-8') |
|
public_key = serialization.load_pem_public_key(self.public_key_pem.encode('utf-8')) |
|
aes_key, nonce = os.urandom(32), os.urandom(12) |
|
ciphertext = AESGCM(aes_key).encrypt(nonce, json_bytes, None) |
|
rsa_key = public_key.encrypt(aes_key, padding.OAEP(mgf=padding.MGF1(hashes.SHA256()), algorithm=hashes.SHA256(), label=None)) |
|
payload = struct.pack('>I', len(rsa_key)) + rsa_key + nonce + ciphertext |
|
|
|
pixel_data = np.array(img).ravel() |
|
binary_payload = ''.join(format(b, '08b') for b in struct.pack('>I', len(payload)) + payload) |
|
pixel_data[:len(binary_payload)] = (pixel_data[:len(binary_payload)] & 0xFE) | np.array(list(binary_payload), dtype=np.uint8) |
|
final_image = Image.fromarray(pixel_data.reshape(img.size[1], img.size[0], 3), 'RGB') |
|
|
|
|
|
with tempfile.NamedTemporaryFile(suffix=".png", delete=False) as f: |
|
final_image.save(f.name, "PNG") |
|
|
|
return final_image, f.name |
|
|
|
@staticmethod |
|
def generate_pem_keys(): |
|
pk = rsa.generate_private_key(public_exponent=65537, key_size=2048) |
|
priv = pk.private_bytes(encoding=serialization.Encoding.PEM, format=serialization.PrivateFormat.PKCS8, encryption_algorithm=serialization.NoEncryption()).decode() |
|
pub = pk.public_key().public_bytes(encoding=serialization.Encoding.PEM, format=serialization.PublicFormat.SubjectPublicKeyInfo).decode() |
|
return priv, pub |
|
|
|
|
|
class KeylockDecoderComponent(gr.components.Component): |
|
"""A Gradio Component that decodes a KeyLock image and provides advanced tools.""" |
|
EVENTS = ["change"] |
|
|
|
def __init__(self, server_logic, **kwargs): |
|
self.server_logic = server_logic |
|
self.value = None |
|
super().__init__(**kwargs) |
|
|
|
def _format_message(self, result: dict | None) -> str: |
|
if not result or not result.get("status"): return "Upload a KeyLock image to auto-fill credentials." |
|
status = result["status"] |
|
if status == "Success": |
|
user = result.get("payload", {}).get("USER", "N/A") |
|
return f"<p style='color:green; font-weight:bold;'>β
Success! Decoded credentials for '{user}'.</p>" |
|
else: |
|
message = result.get("message", "An unknown error occurred.") |
|
return f"<p style='color:red; font-weight:bold;'>β Error: {message}</p>" |
|
|
|
def preprocess(self, payload): return payload |
|
def postprocess(self, value): return value |
|
def api_info(self): return {"type": "object", "example": {"status": "Success", "payload": {"USER": "demo-user"}}} |
|
|
|
def render(self): |
|
value_state = gr.State(value=self.value) |
|
|
|
with gr.Column(): |
|
image_input = gr.Image(label="KeyLock Image", type="pil", show_label=False) |
|
status_display = gr.Markdown(self._format_message(self.value)) |
|
|
|
with gr.Accordion("Generate Encrypted Image", open=False): |
|
gr.Markdown("Create a test image using the site's public key.") |
|
payload_input = gr.JSON(label="Payload to Encrypt", value={"API_KEY": "sk-12345-abcde", "USER": "demo-user"}) |
|
generate_img_button = gr.Button("Generate Image", variant="secondary") |
|
|
|
generated_image_preview = gr.Image(label="Generated Image Preview", interactive=False) |
|
generated_file_download = gr.File(label="Download Uncorrupted PNG", interactive=False) |
|
|
|
with gr.Accordion("Create New Standalone Key Pair", open=False): |
|
gr.Markdown("Generate a new, random key pair for testing or other uses.") |
|
generate_keys_button = gr.Button("Generate Keys", variant="secondary") |
|
with gr.Row(): |
|
output_private_key = gr.Code(label="Generated Private Key", language="pem") |
|
output_public_key = gr.Code(label="Generated Public Key", language="pem") |
|
|
|
def on_image_upload(image): |
|
if image is None: return None, "Upload a KeyLock image to auto-fill credentials." |
|
result_dict = self.server_logic.decode_payload(image) |
|
formatted_message = self._format_message(result_dict) |
|
return result_dict, formatted_message |
|
|
|
|
|
image_input.upload(fn=on_image_upload, inputs=[image_input], outputs=[value_state, status_display]) |
|
|
|
generate_img_button.click( |
|
fn=self.server_logic.generate_encrypted_image, |
|
inputs=[payload_input], |
|
outputs=[generated_image_preview, generated_file_download] |
|
) |
|
generate_keys_button.click(fn=self.server_logic.generate_pem_keys, inputs=None, outputs=[output_private_key, output_public_key]) |
|
|
|
return {"value": value_state} |