broadfield-dev commited on
Commit
1406fd9
·
verified ·
1 Parent(s): 9778488

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +207 -214
app.py CHANGED
@@ -15,29 +15,20 @@ from cryptography.hazmat.primitives.ciphers.aead import AESGCM
15
  from cryptography.hazmat.primitives import hashes
16
  from cryptography.exceptions import InvalidTag
17
  from gradio_client import Client
 
18
 
19
- # --- Basic Setup ---
20
  logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
21
  logger = logging.getLogger(__name__)
 
22
 
23
- # #############################################################################
24
- # --- (Part 1) CORE KEYLOCK SERVER LOGIC ---
25
- # This section contains all the backend functions for the KeyLock server.
26
- # #############################################################################
27
-
28
- # --- Constants and Key Loading ---
29
  HEADER_BITS = 32
30
  AES_GCM_NONCE_SIZE = 12
31
  KEYLOCK_PRIV_KEY_PEM = os.environ.get('KEYLOCK_PRIV_KEY')
32
  PRIVATE_KEY_OBJECT = None
33
  PUBLIC_KEY_PEM_STRING = ""
34
  KEYLOCK_STATUS_MESSAGE = ""
35
- MOCK_USER_DATABASE = {
36
- "sk-12345-abcde": {"user": "demo-user", "permissions": "read"},
37
- "sk-67890-fghij": {"user": "admin-user", "permissions": "read,write,delete"}
38
- }
39
 
40
- # --- Initialize Keys ---
41
  if not KEYLOCK_PRIV_KEY_PEM:
42
  logger.warning("No KEYLOCK_PRIV_KEY secret found. Generating a temporary key pair for this session.")
43
  temp_priv_key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
@@ -45,267 +36,269 @@ if not KEYLOCK_PRIV_KEY_PEM:
45
  KEYLOCK_STATUS_MESSAGE = "⚠️ No secret found. Using a temporary key for this session. Keys will be lost on restart."
46
  else:
47
  logger.info("Successfully loaded private key from environment variable 'KEYLOCK_PRIV_KEY'.")
48
- KEYLOCK_STATUS_MESSAGE = "✅ Loaded successfully from secrets/environment variable. Recommended secure configuration."
49
 
50
  try:
51
  PRIVATE_KEY_OBJECT = serialization.load_pem_private_key(KEYLOCK_PRIV_KEY_PEM.encode(), password=None)
52
- public_key = PRIVATE_KEY_OBJECT.public_key()
53
- PUBLIC_KEY_PEM_STRING = public_key.public_bytes(encoding=serialization.Encoding.PEM, format=serialization.PublicFormat.SubjectPublicKeyInfo).decode('utf-8')
54
  KEYLOCK_STATUS_MESSAGE += "\n✅ Public key derived successfully."
55
  except Exception as e:
56
  PRIVATE_KEY_OBJECT = None
57
- PUBLIC_KEY_PEM_STRING = "Error: Could not process the private key."
58
  KEYLOCK_STATUS_MESSAGE += f"\n❌ Failed to parse key: {e}"
59
 
60
- # --- Cryptographic Helper ---
61
- def _create_encrypted_image_helper(payload_dict: dict, public_key_pem: str) -> Image.Image:
62
- """A shared helper to create an encrypted image from a dict."""
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
63
  json_bytes = json.dumps(payload_dict).encode('utf-8')
64
  public_key = serialization.load_pem_public_key(public_key_pem.encode('utf-8'))
65
  aes_key, nonce = os.urandom(32), os.urandom(12)
66
  ciphertext = AESGCM(aes_key).encrypt(nonce, json_bytes, None)
67
  rsa_encrypted_key = public_key.encrypt(aes_key, padding.OAEP(mgf=padding.MGF1(hashes.SHA256()), algorithm=hashes.SHA256(), label=None))
68
  encrypted_payload = struct.pack('>I', len(rsa_encrypted_key)) + rsa_encrypted_key + nonce + ciphertext
69
-
70
- # Create a simple base image
71
- img = Image.new('RGB', (600, 600), color = 'black')
72
- draw = ImageDraw.Draw(img)
73
- draw.text((10,10), "KeyLock Auth Image", fill=(200,200,200))
74
- draw.text((10,40), f"Payload Keys: {', '.join(payload_dict.keys())}", fill=(150,150,150))
75
-
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
76
  pixel_data = np.array(img).ravel()
77
- header = struct.pack('>I', len(encrypted_payload))
78
- binary_payload = ''.join(format(b, '08b') for b in header + encrypted_payload)
79
- if len(binary_payload) > pixel_data.size:
80
- raise ValueError("Payload is too large for the image.")
81
  pixel_data[:len(binary_payload)] = (pixel_data[:len(binary_payload)] & 0xFE) | np.array(list(binary_payload), dtype=np.uint8)
82
- return Image.fromarray(pixel_data.reshape((600, 600, 3)), 'RGB')
83
 
84
- # --- API Functions ---
85
  def api_get_info():
86
- """API: Returns server metadata, including required keys for auth."""
87
- return {
88
- "name": "Live KeyLock Demo Server",
89
- "version": "2.0",
90
- "documentation": "This server can generate and authenticate KeyLock images.",
91
- "required_payload_keys": [
92
- {"key_name": "API_KEY", "description": "Your unique API Key.", "example": "sk-12345-abcde"},
93
- {"key_name": "USER", "description": "The user ID for the key.", "example": "demo-user"}
94
- ]
95
- }
96
 
97
  def api_get_public_key():
98
- """API: Returns the server's public key."""
99
- if not PUBLIC_KEY_PEM_STRING or not PRIVATE_KEY_OBJECT:
100
- raise gr.Error("Server key is not configured.")
101
  return PUBLIC_KEY_PEM_STRING
102
 
103
- def api_generate_image(payload_dict: dict):
104
- """API: Generates a new KeyLock image based on the provided payload."""
105
- if not isinstance(payload_dict, dict):
106
- raise gr.Error("Input must be a valid JSON object.")
107
-
108
- img = _create_encrypted_image_helper(payload_dict, PUBLIC_KEY_PEM_STRING)
109
-
110
- with tempfile.NamedTemporaryFile(suffix=".png", delete=False) as temp_f:
111
- png_path = temp_f.name
112
- img.save(png_path, "PNG", compress_level=1)
113
- return png_path
114
-
115
  def api_decode_and_auth(image_base64_string: str) -> dict:
116
- """API: Decodes an image and uses the payload to authenticate."""
117
- if not PRIVATE_KEY_OBJECT:
118
- raise gr.Error("Server is not configured with a private key.")
119
  try:
120
- image_buffer = base64.b64decode(image_base64_string)
121
- img = Image.open(io.BytesIO(image_buffer)).convert("RGB")
122
- pixel_data = np.array(img).ravel()
123
- header_binary_string = "".join(str(pixel & 1) for pixel in pixel_data[:HEADER_BITS])
124
  data_length = int(header_binary_string, 2)
125
- if data_length == 0: raise ValueError("No data found in image.")
126
- data_bits_count = data_length * 8
127
- end_offset = HEADER_BITS + data_bits_count
128
- data_binary_string = "".join(str(pixel & 1) for pixel in pixel_data[HEADER_BITS:end_offset])
129
  crypto_payload = int(data_binary_string, 2).to_bytes(data_length, byteorder='big')
130
- offset = 4
131
- encrypted_aes_key_len = struct.unpack('>I', crypto_payload[:offset])[0]
132
- encrypted_aes_key = crypto_payload[offset : offset + encrypted_aes_key_len]
133
- offset += encrypted_aes_key_len
134
- nonce = crypto_payload[offset : offset + AES_GCM_NONCE_SIZE]
135
- offset += AES_GCM_NONCE_SIZE
136
  ciphertext_with_tag = crypto_payload[offset:]
137
  recovered_aes_key = PRIVATE_KEY_OBJECT.decrypt(encrypted_aes_key, padding.OAEP(mgf=padding.MGF1(algorithm=hashes.SHA256()), algorithm=hashes.SHA256(), label=None))
138
- decrypted_bytes = AESGCM(recovered_aes_key).decrypt(nonce, ciphertext_with_tag, None)
139
- decrypted_payload = json.loads(decrypted_bytes.decode('utf-8'))
140
-
141
- api_key = decrypted_payload.get('API_KEY')
142
- user_id = decrypted_payload.get('USER')
143
-
144
- db_entry = MOCK_USER_DATABASE.get(api_key)
145
- if db_entry and db_entry.get("user") == user_id:
146
- return {"status": "Success", "message": f"User '{user_id}' authenticated.", "details": decrypted_payload}
147
  else:
148
  return {"status": "Failed", "message": "Invalid credentials.", "details": decrypted_payload}
149
  except Exception as e:
150
- logger.error(f"Auth endpoint error: {e}", exc_info=True)
151
  return {"status": "Error", "message": f"Decryption/Processing Failed: {e}", "details": {}}
152
 
 
 
153
 
154
- # #############################################################################
155
- # --- (Part 2) GRADIO DEMO UI ---
156
- # This section builds the interactive showcase for the server.
157
- # #############################################################################
158
-
159
- theme = gr.themes.Soft(primary_hue="emerald", secondary_hue="green", neutral_hue="slate")
160
 
161
- with gr.Blocks(theme=theme, title="KeyLock Server Demo") as demo:
162
- server_state = gr.State({})
163
-
164
- gr.Markdown("# 🔑 KeyLock Server Showcase")
165
- gr.Markdown("An interactive demonstration of a KeyLock authentication server. Connect to the server, generate a tokenized image, and test its authentication path.")
166
-
167
- with gr.Tabs() as tabs:
168
- with gr.TabItem("🚀 Live Demo"):
169
  with gr.Row():
170
- with gr.Column(scale=1):
171
- # --- Step 1: Connect ---
172
- gr.Markdown("### 1. Connect to the Server")
173
- server_url_input = gr.Textbox(label="Server URL", placeholder="Enter the URL of the running server space")
174
- connect_button = gr.Button("Connect", variant="primary")
175
- with gr.Column(scale=2):
176
- status_display = gr.Markdown("**Status:** Not Connected", visible=False)
177
 
178
- with gr.Accordion("2. Generate an Authentication Image (Server-Side)", open=False, visible=False) as generate_accordion:
179
- gr.Markdown("Use the server's public key to generate an image with an embedded payload. The form below is created based on the server's requirements.")
180
- payload_input = gr.JSON(label="Payload to Encrypt")
181
- generate_button = gr.Button("Generate Image", variant="secondary")
182
  with gr.Row():
183
- generated_image_preview = gr.Image(label="Generated Image Preview", interactive=False, height=300)
184
- generated_file_output = gr.File(label="Download Uncorrupted PNG", interactive=False, file_count="single")
 
 
 
 
 
 
185
 
186
- with gr.Accordion("3. Test an Existing Image", open=False, visible=False) as test_accordion:
187
- gr.Markdown("Upload an image containing KeyLock auth data to test the server's decryption and authentication endpoint.")
188
- test_image_input = gr.Image(type="filepath", label="Upload Encrypted Image")
189
- auth_button = gr.Button("Authenticate Image", variant="secondary")
190
- auth_status_display = gr.Markdown(visible=False)
191
- auth_result_output = gr.JSON(label="Server Authentication Response")
192
-
193
- with gr.TabItem("⚙️ Server Admin & Status"):
194
- gr.Markdown("## Server Status")
195
- gr.Textbox(label="Private Key Status", value=KEYLOCK_STATUS_MESSAGE, interactive=False, lines=3)
196
- gr.Markdown("## This server's address: https://broadfield-dev-keylock-demo.hf.space")
197
- gr.Markdown("---")
198
- gr.Markdown("## Client-Side Key Generator")
199
- gr.Markdown("Use this utility to generate a new RSA key pair for other projects.")
200
- gr.Markdown("Store the Private Key as KEYLOCK_PRIV_KEY in the Secret Environment on the server")
201
-
202
- gen_keys_button = gr.Button("⚙️ Generate New 2048-bit Key Pair")
203
- with gr.Row():
204
- output_private_key = gr.Textbox(lines=8, label="Generated Private Key", interactive=False, show_copy_button=True)
205
- output_public_key = gr.Textbox(lines=8, label="Generated Public Key", interactive=False, show_copy_button=True)
206
-
207
- # --- FIX: Helper function for reliable, pure-Python key generation ---
208
- def generate_pem_keys():
209
- """Generates a new RSA key pair and formats them as PEM strings."""
210
- private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
211
- private_pem = private_key.private_bytes(
212
- encoding=serialization.Encoding.PEM,
213
- format=serialization.PrivateFormat.PKCS8,
214
- encryption_algorithm=serialization.NoEncryption()
215
- ).decode('utf-8')
216
- public_pem = private_key.public_key().public_bytes(
217
- encoding=serialization.Encoding.PEM,
218
- format=serialization.PublicFormat.SubjectPublicKeyInfo
219
- ).decode('utf-8')
220
- return private_pem, public_pem
221
 
222
- # --- FIX: Correctly call the Python function ---
223
- gen_keys_button.click(
224
- fn=generate_pem_keys,
225
- inputs=None,
226
- outputs=[output_private_key, output_public_key],
227
- api_name=False
228
- )
 
 
 
 
 
 
229
 
230
- # --- UI Logic ---
231
- def connect_to_server(url):
 
 
 
 
 
 
 
 
 
 
 
232
  if not url: raise gr.Error("Please provide a server URL.")
 
 
 
 
 
 
 
 
 
233
  try:
234
- connect_button.interactive = False
235
  client = Client(url, verbose=False)
236
  info = client.predict(api_name="/keylock-info")
237
  pubkey = client.predict(api_name="/keylock-pub")
238
-
239
- state_data = {"url": url, "info": info, "pubkey": pubkey}
240
-
241
- # Prepare the example payload for the form
242
- example_payload = {}
243
- if info.get("required_payload_keys"):
244
- for key_info in info["required_payload_keys"]:
245
- example_payload[key_info["key_name"]] = key_info["example"]
246
-
247
- status_md = f"**Status:** ✅ Connected to **{info.get('name', 'Unknown Server')}** (v{info.get('version', 'N/A')})"
248
-
249
- return {
250
- server_state: state_data,
251
- status_display: gr.update(value=status_md, visible=True),
252
- generate_accordion: gr.update(visible=True),
253
- test_accordion: gr.update(visible=True),
254
- payload_input: gr.update(value=example_payload),
255
- connect_button: gr.update(interactive=True)
256
- }
257
  except Exception as e:
258
  gr.Error(f"Connection Failed: {e}")
259
- return {
260
- server_state: {},
261
- status_display: gr.update(value=f"**Status:** ❌ Connection Failed: {e}", visible=True),
262
- connect_button: gr.update(interactive=True)
263
- }
264
 
265
- def generate_image_from_server(current_state, payload):
266
- if not current_state: raise gr.Error("Not connected to a server.")
267
- try:
268
- client = Client(current_state["url"], verbose=False)
269
- # The server API expects a dictionary
270
- generated_path = client.predict(payload, api_name="/keylock-generate")
271
- return gr.update(value=generated_path), gr.update(value=generated_path)
272
- except Exception as e:
273
- gr.Error(f"Image generation failed: {e}")
274
- return None, None
275
-
276
- def authenticate_dropped_image(current_state, image_path):
277
- if not current_state: raise gr.Error("Not connected to a server.")
278
- if not image_path: raise gr.Error("Please upload an image to test.")
279
  try:
280
- with open(image_path, "rb") as f:
281
- b64_img = base64.b64encode(f.read()).decode('utf-8')
282
-
283
- client = Client(current_state["url"], verbose=False)
284
  response = client.predict(b64_img, api_name="/keylock-auth")
285
-
286
- if response.get("status") == "Success":
287
- status_md = "### ✅ Authentication Successful"
288
- elif response.get("status") == "Failed":
289
- status_md = "### ❌ Authentication Failed"
290
- else:
291
- status_md = f"### ⚠️ Server Error: {response.get('message')}"
292
-
293
  return gr.update(value=status_md, visible=True), response
294
  except Exception as e:
295
  gr.Error(f"Authentication request failed: {e}")
296
  return gr.update(value=f"### ⚠️ Request Error: {e}", visible=True), None
297
 
298
- # --- Event Handlers ---
299
- connect_button.click(connect_to_server, inputs=[server_url_input], outputs=[server_state, status_display, generate_accordion, test_accordion, payload_input, connect_button])
300
- generate_button.click(generate_image_from_server, inputs=[server_state, payload_input], outputs=[generated_image_preview, generated_file_output])
301
- auth_button.click(authenticate_dropped_image, inputs=[server_state, test_image_input], outputs=[auth_status_display, auth_result_output])
302
-
303
- # --- Hidden API Endpoints for the Server ---
 
 
 
 
 
 
 
 
 
 
 
 
 
 
304
  with gr.Row(visible=False):
305
  gr.Interface(fn=api_get_info, inputs=None, outputs=gr.JSON(), api_name="keylock-info")
306
  gr.Interface(fn=api_get_public_key, inputs=None, outputs=gr.Textbox(), api_name="keylock-pub")
307
  gr.Interface(fn=api_decode_and_auth, inputs=gr.Textbox(), outputs=gr.JSON(), api_name="keylock-auth")
308
- gr.Interface(fn=api_generate_image, inputs=gr.JSON(), outputs=gr.File(), api_name="keylock-generate")
309
 
310
  if __name__ == "__main__":
311
- demo.launch(share=True)
 
15
  from cryptography.hazmat.primitives import hashes
16
  from cryptography.exceptions import InvalidTag
17
  from gradio_client import Client
18
+ from huggingface_hub import InferenceClient
19
 
 
20
  logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
21
  logger = logging.getLogger(__name__)
22
+ ENDPOINTS_FILE = "endpoints.json"
23
 
 
 
 
 
 
 
24
  HEADER_BITS = 32
25
  AES_GCM_NONCE_SIZE = 12
26
  KEYLOCK_PRIV_KEY_PEM = os.environ.get('KEYLOCK_PRIV_KEY')
27
  PRIVATE_KEY_OBJECT = None
28
  PUBLIC_KEY_PEM_STRING = ""
29
  KEYLOCK_STATUS_MESSAGE = ""
30
+ MOCK_USER_DATABASE = {"sk-12345-abcde": {"user": "demo-user", "permissions": "read"}, "sk-67890-fghij": {"user": "admin-user", "permissions": "read,write,delete"}}
 
 
 
31
 
 
32
  if not KEYLOCK_PRIV_KEY_PEM:
33
  logger.warning("No KEYLOCK_PRIV_KEY secret found. Generating a temporary key pair for this session.")
34
  temp_priv_key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
 
36
  KEYLOCK_STATUS_MESSAGE = "⚠️ No secret found. Using a temporary key for this session. Keys will be lost on restart."
37
  else:
38
  logger.info("Successfully loaded private key from environment variable 'KEYLOCK_PRIV_KEY'.")
39
+ KEYLOCK_STATUS_MESSAGE = "✅ Loaded successfully from secrets/environment variable."
40
 
41
  try:
42
  PRIVATE_KEY_OBJECT = serialization.load_pem_private_key(KEYLOCK_PRIV_KEY_PEM.encode(), password=None)
43
+ PUBLIC_KEY_PEM_STRING = PRIVATE_KEY_OBJECT.public_key().public_bytes(encoding=serialization.Encoding.PEM, format=serialization.PublicFormat.SubjectPublicKeyInfo).decode('utf-8')
 
44
  KEYLOCK_STATUS_MESSAGE += "\n✅ Public key derived successfully."
45
  except Exception as e:
46
  PRIVATE_KEY_OBJECT = None
47
+ PUBLIC_KEY_PEM_STRING = "Error: Key could not be processed."
48
  KEYLOCK_STATUS_MESSAGE += f"\n❌ Failed to parse key: {e}"
49
 
50
+ def _parse_secret_data(secret_data_str: str) -> dict:
51
+ stripped_input = secret_data_str.strip()
52
+ try:
53
+ data_dict = json.loads(stripped_input)
54
+ if isinstance(data_dict, dict): return data_dict
55
+ except json.JSONDecodeError:
56
+ pass
57
+ data_dict = {}
58
+ for line in stripped_input.splitlines():
59
+ line = line.strip()
60
+ if not line or line.startswith('#'): continue
61
+ separator = ':' if ':' in line else '='
62
+ if separator not in line: continue
63
+ parts = line.split(separator, 1)
64
+ if len(parts) == 2:
65
+ key = parts[0].strip().strip("'\"")
66
+ value = parts[1].strip().strip(",").strip().strip("'\"")
67
+ if key: data_dict[key] = value
68
+ return data_dict
69
+
70
+ def prepare_base_image(uploaded_image: Image.Image | None, progress) -> Image.Image:
71
+ size = 600
72
+ if uploaded_image:
73
+ progress(0, desc="✅ Using uploaded image...")
74
+ return ImageOps.fit(uploaded_image, (size, size), Image.Resampling.LANCZOS)
75
+ try:
76
+ progress(0, desc="⏳ Fetching default background...")
77
+ response = requests.get("https://images.unsplash.com/photo-1506318137071-a8e063b4bec0?q=80&w=1200&auto=format=fit=crop", timeout=10)
78
+ response.raise_for_status()
79
+ img = Image.open(io.BytesIO(response.content)).convert("RGB")
80
+ return ImageOps.fit(img, (size, size), Image.Resampling.LANCZOS)
81
+ except Exception as e:
82
+ logger.warning(f"Default image fetch failed: {e}. Falling back to AI.")
83
+ try:
84
+ progress(0, desc="⏳ Generating image with SDXL-Lightning...")
85
+ client = InferenceClient()
86
+ image_bytes = client.text_to_image("A stunning view of a distant galaxy, nebulae, and constellations, digital art, vibrant colors", model="sd-community/sdxl-lightning")
87
+ return ImageOps.fit(Image.open(io.BytesIO(image_bytes)).convert("RGB"), (size, size), Image.Resampling.LANCZOS)
88
+ except Exception as e:
89
+ raise gr.Error(f"All image sources failed. AI error: {e}")
90
+
91
+ def create_encrypted_image(payload_dict: dict, public_key_pem: str, base_image: Image.Image, overlay_option: str, server_url: str) -> Image.Image:
92
  json_bytes = json.dumps(payload_dict).encode('utf-8')
93
  public_key = serialization.load_pem_public_key(public_key_pem.encode('utf-8'))
94
  aes_key, nonce = os.urandom(32), os.urandom(12)
95
  ciphertext = AESGCM(aes_key).encrypt(nonce, json_bytes, None)
96
  rsa_encrypted_key = public_key.encrypt(aes_key, padding.OAEP(mgf=padding.MGF1(hashes.SHA256()), algorithm=hashes.SHA256(), label=None))
97
  encrypted_payload = struct.pack('>I', len(rsa_encrypted_key)) + rsa_encrypted_key + nonce + ciphertext
98
+ img = base_image.copy().convert("RGB")
99
+ width, height = img.size
100
+ draw = ImageDraw.Draw(img, "RGBA")
101
+ try:
102
+ font_bold = ImageFont.truetype("DejaVuSans-Bold.ttf", 30); font_regular = ImageFont.truetype("DejaVuSans.ttf", 15); font_small = ImageFont.truetype("DejaVuSans.ttf", 12)
103
+ except IOError:
104
+ font_bold = ImageFont.load_default(size=28); font_regular = ImageFont.load_default(size=14); font_small = ImageFont.load_default(size=12)
105
+ overlay_color, title_color, key_color, value_color = (15, 23, 42, 190), (226, 232, 240), (148, 163, 184), (241, 245, 249)
106
+ draw.rectangle([0, 20, width, 80], fill=overlay_color)
107
+ draw.text((width / 2, 50), "KeyLock Secure Data", fill=title_color, font=font_bold, anchor="ms")
108
+ draw.text((width - 15, 25), server_url.replace("https://", ""), fill=key_color, font=font_small, anchor="ra")
109
+ if overlay_option != "None":
110
+ lines = list(payload_dict.keys()) if overlay_option == "Keys Only" else [f"{k}: {v}" for k, v in payload_dict.items()]
111
+ line_heights = [draw.textbbox((0, 0), line, font=font_regular)[3] for line in lines]
112
+ box_y0 = height - (sum(line_heights) + (len(lines) - 1) * 6 + 30) - 20
113
+ draw.rectangle([20, box_y0, width - 20, height - 20], fill=overlay_color)
114
+ current_y = box_y0 + 15
115
+ for i, (key, value) in enumerate(payload_dict.items()):
116
+ if overlay_option == "Keys Only":
117
+ draw.text((35, current_y), key, fill=key_color, font=font_regular)
118
+ else:
119
+ key_text = f"{key}:"; draw.text((35, current_y), key_text, fill=key_color, font=font_regular)
120
+ key_bbox = draw.textbbox((35, current_y), key_text, font=font_regular)
121
+ draw.text((key_bbox[2] + 8, current_y), str(value), fill=value_color, font=font_regular)
122
+ current_y += line_heights[i] + 6
123
  pixel_data = np.array(img).ravel()
124
+ binary_payload = ''.join(format(b, '08b') for b in struct.pack('>I', len(encrypted_payload)) + encrypted_payload)
125
+ if len(binary_payload) > pixel_data.size: raise ValueError("Payload is too large for the image.")
 
 
126
  pixel_data[:len(binary_payload)] = (pixel_data[:len(binary_payload)] & 0xFE) | np.array(list(binary_payload), dtype=np.uint8)
127
+ return Image.fromarray(pixel_data.reshape((height, width, 3)), 'RGB')
128
 
 
129
  def api_get_info():
130
+ return {"name": "Embedded KeyLock Server", "version": "2.1", "documentation": "This server can generate and authenticate KeyLock images.", "required_payload_keys": [{"key_name": "API_KEY", "description": "Your unique API Key.", "example": "sk-12345-abcde"}, {"key_name": "USER", "description": "The user ID for the key.", "example": "demo-user"}]}
 
 
 
 
 
 
 
 
 
131
 
132
  def api_get_public_key():
 
 
 
133
  return PUBLIC_KEY_PEM_STRING
134
 
 
 
 
 
 
 
 
 
 
 
 
 
135
  def api_decode_and_auth(image_base64_string: str) -> dict:
136
+ if not PRIVATE_KEY_OBJECT: raise gr.Error("Server is not configured with a private key.")
 
 
137
  try:
138
+ pixel_data = np.array(Image.open(io.BytesIO(base64.b64decode(image_base64_string))).convert("RGB")).ravel()
139
+ header_binary_string = "".join(str(p & 1) for p in pixel_data[:HEADER_BITS])
 
 
140
  data_length = int(header_binary_string, 2)
141
+ data_binary_string = "".join(str(p & 1) for p in pixel_data[HEADER_BITS:HEADER_BITS + data_length * 8])
 
 
 
142
  crypto_payload = int(data_binary_string, 2).to_bytes(data_length, byteorder='big')
143
+ offset = 4; encrypted_aes_key_len = struct.unpack('>I', crypto_payload[:offset])[0]
144
+ encrypted_aes_key = crypto_payload[offset:offset + encrypted_aes_key_len]; offset += encrypted_aes_key_len
145
+ nonce = crypto_payload[offset:offset + AES_GCM_NONCE_SIZE]; offset += AES_GCM_NONCE_SIZE
 
 
 
146
  ciphertext_with_tag = crypto_payload[offset:]
147
  recovered_aes_key = PRIVATE_KEY_OBJECT.decrypt(encrypted_aes_key, padding.OAEP(mgf=padding.MGF1(algorithm=hashes.SHA256()), algorithm=hashes.SHA256(), label=None))
148
+ decrypted_payload = json.loads(AESGCM(recovered_aes_key).decrypt(nonce, ciphertext_with_tag, None).decode('utf-8'))
149
+ db_entry = MOCK_USER_DATABASE.get(decrypted_payload.get('API_KEY'))
150
+ if db_entry and db_entry.get("user") == decrypted_payload.get('USER'):
151
+ return {"status": "Success", "message": f"User '{decrypted_payload.get('USER')}' authenticated.", "details": decrypted_payload}
 
 
 
 
 
152
  else:
153
  return {"status": "Failed", "message": "Invalid credentials.", "details": decrypted_payload}
154
  except Exception as e:
 
155
  return {"status": "Error", "message": f"Decryption/Processing Failed: {e}", "details": {}}
156
 
157
+ def load_endpoints(): return json.load(open(ENDPOINTS_FILE)) if os.path.exists(ENDPOINTS_FILE) else []
158
+ def save_endpoints(endpoints_list): json.dump(endpoints_list, open(ENDPOINTS_FILE, 'w'), indent=2)
159
 
160
+ theme = gr.themes.Soft(primary_hue="sky", secondary_hue="blue", neutral_hue="slate")
161
+ with gr.Blocks(theme=theme, title="KeyLock Showcase") as demo:
162
+ all_servers_state = gr.State(load_endpoints)
163
+ active_server_state = gr.State({})
 
 
164
 
165
+ gr.Markdown("# 🔑 KeyLock Showcase")
166
+ gr.Markdown("A comprehensive toolkit for generating and testing KeyLock authentication images against live servers.")
167
+
168
+ with gr.Tabs():
169
+ with gr.TabItem("Client Operations"):
170
+ gr.Markdown("### 1. Connect to a Target Server")
 
 
171
  with gr.Row():
172
+ saved_servers_dropdown = gr.Dropdown(label="Load Saved Server", interactive=True)
173
+ with gr.Column():
174
+ server_url_input = gr.Textbox(label="Or Add New Server by URL", placeholder="https://your-server.hf.space")
175
+ connect_button = gr.Button("Connect New Server", variant="primary")
176
+ client_status_display = gr.Markdown("**Status:** Not Connected")
 
 
177
 
178
+ with gr.Accordion("2. Create an Encrypted Image for the Connected Server", open=False) as client_generate_accordion:
 
 
 
179
  with gr.Row():
180
+ with gr.Column(scale=2):
181
+ client_payload_input = gr.Textbox(label="Secret Data (key:value or JSON)", lines=5)
182
+ client_overlay_radio = gr.Radio(label="Show Labels on Image", choices=["Keys and Values", "Keys Only", "None"], value="Keys and Values")
183
+ client_base_image_input = gr.Image(label="Optional Base Image", type="pil", height=200)
184
+ client_generate_button = gr.Button("Create Image", variant="secondary")
185
+ with gr.Column(scale=3):
186
+ client_generated_image_preview = gr.Image(label="Generated Image Preview", interactive=False)
187
+ client_generated_file_output = gr.File(label="Download Uncorrupted PNG", interactive=False, file_count="single")
188
 
189
+ with gr.Accordion("3. Test an Existing Image with the Connected Server", open=False) as client_test_accordion:
190
+ client_test_image_input = gr.Image(type="filepath", label="Upload Encrypted Image")
191
+ client_auth_status_display = gr.Markdown(visible=False)
192
+ client_auth_result_output = gr.JSON(label="Server Authentication Response")
193
+ client_test_image_input.change(lambda: (gr.update(visible=False), None), outputs=[client_auth_status_display, client_auth_result_output])
194
+
195
+ with gr.TabItem("Server Showcase & Admin"):
196
+ gr.Markdown("## Embedded Server Details")
197
+ gr.Markdown("This Gradio app is also running its own KeyLock server. You can use its details to test the client.")
198
+ gr.Textbox(label="Embedded Server Status", value=KEYLOCK_STATUS_MESSAGE, interactive=False, lines=3)
199
+ gr.Code(label="Embedded Server Public Key", value=PUBLIC_KEY_PEM_STRING, language="python")
200
+ gr.JSON(label="Embedded Server Required Payload", value={k['key_name']: k['example'] for k in api_get_info()["required_payload_keys"]})
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
201
 
202
+ with gr.Accordion("Generate Image with Embedded Server", open=False):
203
+ server_payload_input = gr.JSON(label="Payload to Encrypt", value={k['key_name']: k['example'] for k in api_get_info()["required_payload_keys"]})
204
+ server_generate_button = gr.Button("Generate Image", variant="secondary")
205
+ server_generated_file_output = gr.File(label="Download Uncorrupted PNG", interactive=False, file_count="single")
206
+
207
+ with gr.Accordion("Admin: Generate New Key Pair", open=False):
208
+ gen_keys_button = gr.Button("⚙️ Generate New 2048-bit Key Pair")
209
+ with gr.Row():
210
+ output_private_key = gr.Textbox(lines=8, label="Generated Private Key", interactive=False, show_copy_button=True)
211
+ output_public_key = gr.Textbox(lines=8, label="Generated Public Key", interactive=False, show_copy_button=True)
212
+
213
+ def initialize_ui(all_servers):
214
+ return gr.update(choices=[s['name'] for s in all_servers] if all_servers else [])
215
 
216
+ def process_server_connection(server_data, all_servers):
217
+ placeholder = "\n".join([f"{k['key_name']}: {k['example']}" for k in server_data['info'].get('required_payload_keys', [])])
218
+ status_md = f"**Status:** ✅ Connected to **{server_data['name']}**"
219
+ return {active_server_state: server_data, client_status_display: status_md, client_payload_input: gr.update(placeholder=placeholder), all_servers_state: all_servers}
220
+
221
+ def load_server_from_dropdown(server_name, all_servers):
222
+ server_from_list = next((s for s in all_servers if s['name'] == server_name), None)
223
+ if server_from_list:
224
+ active_server_data = {'name': server_from_list['name'], 'url': server_from_list['link'], 'pubkey': server_from_list['public_key'], 'info': server_from_list.get('info', {})}
225
+ return process_server_connection(active_server_data, all_servers)
226
+ return {}
227
+
228
+ def add_new_server_from_url(url, all_servers):
229
  if not url: raise gr.Error("Please provide a server URL.")
230
+ url = url.strip().rstrip('/')
231
+
232
+ existing_server = next((s for s in all_servers if s['link'] == url), None)
233
+ if existing_server:
234
+ gr.Info(f"Server already exists. Loading '{existing_server['name']}'.")
235
+ updates = load_server_from_dropdown(existing_server['name'], all_servers)
236
+ updates[saved_servers_dropdown] = gr.update(value=existing_server['name'])
237
+ return updates
238
+
239
  try:
 
240
  client = Client(url, verbose=False)
241
  info = client.predict(api_name="/keylock-info")
242
  pubkey = client.predict(api_name="/keylock-pub")
243
+ server_for_list = {'name': info.get('name', url), 'link': url, 'public_key': pubkey, 'info': info}
244
+ all_servers.append(server_for_list)
245
+ save_endpoints(all_servers)
246
+ gr.Info(f"Successfully added and saved '{server_for_list['name']}'!")
247
+ server_for_state = {'name': info.get('name', url), 'url': url, 'pubkey': pubkey, 'info': info}
248
+ updates = process_server_connection(server_for_state, all_servers)
249
+ updates[saved_servers_dropdown] = gr.update(choices=[s['name'] for s in all_servers], value=server_for_state['name'])
250
+ return updates
 
 
 
 
 
 
 
 
 
 
 
251
  except Exception as e:
252
  gr.Error(f"Connection Failed: {e}")
253
+ return {}
 
 
 
 
254
 
255
+ def client_generate_image_wrapper(active_server, payload_str, overlay, base_img, progress=gr.Progress(track_tqdm=True)):
256
+ if not active_server: raise gr.Error("Not connected to a server.")
257
+ payload_dict = _parse_secret_data(payload_str)
258
+ if not payload_dict: raise gr.Error("Invalid payload format. Please provide key:value pairs or a valid JSON object.")
259
+ base_image = prepare_base_image(base_img, progress)
260
+ img = create_encrypted_image(payload_dict, active_server['pubkey'], base_image, overlay, active_server['url'])
261
+ with tempfile.NamedTemporaryFile(suffix=".png", delete=False) as f:
262
+ img.save(f.name, "PNG", compress_level=1)
263
+ return f.name, f.name
264
+
265
+ def client_authenticate_wrapper(active_server, image_path):
266
+ if not active_server: raise gr.Error("Not connected to a server.")
267
+ if not image_path: raise gr.Error("Please upload an image.")
 
268
  try:
269
+ with open(image_path, "rb") as f: b64_img = base64.b64encode(f.read()).decode('utf-8')
270
+ client = Client(active_server['url'])
 
 
271
  response = client.predict(b64_img, api_name="/keylock-auth")
272
+ status_md = "### ✅ Authentication Successful" if response.get("status") == "Success" else "### ❌ Authentication Failed" if response.get("status") == "Failed" else f"### ⚠️ Server Error: {response.get('message')}"
 
 
 
 
 
 
 
273
  return gr.update(value=status_md, visible=True), response
274
  except Exception as e:
275
  gr.Error(f"Authentication request failed: {e}")
276
  return gr.update(value=f"### ⚠️ Request Error: {e}", visible=True), None
277
 
278
+ def server_generate_image_wrapper(payload):
279
+ img = create_encrypted_image(payload, PUBLIC_KEY_PEM_STRING, prepare_base_image(None, gr.Progress(track_tqdm=True)), "Keys and Values", "Embedded Server")
280
+ with tempfile.NamedTemporaryFile(suffix=".png", delete=False) as f:
281
+ img.save(f.name, "PNG", compress_level=1)
282
+ return f.name
283
+
284
+ def generate_pem_keys():
285
+ pk = rsa.generate_private_key(public_exponent=65537, key_size=2048)
286
+ priv = pk.private_bytes(encoding=serialization.Encoding.PEM, format=serialization.PrivateFormat.PKCS8, encryption_algorithm=serialization.NoEncryption()).decode()
287
+ pub = pk.public_key().public_bytes(encoding=serialization.Encoding.PEM, format=serialization.PublicFormat.SubjectPublicKeyInfo).decode()
288
+ return priv, pub
289
+
290
+ demo.load(initialize_ui, inputs=all_servers_state, outputs=saved_servers_dropdown)
291
+ saved_servers_dropdown.change(load_server_from_dropdown, inputs=[saved_servers_dropdown, all_servers_state], outputs=[active_server_state, client_status_display, client_payload_input, all_servers_state])
292
+ connect_button.click(add_new_server_from_url, inputs=[server_url_input, all_servers_state], outputs=[active_server_state, client_status_display, client_payload_input, all_servers_state, saved_servers_dropdown])
293
+ client_generate_button.click(client_generate_image_wrapper, inputs=[active_server_state, client_payload_input, client_overlay_radio, client_base_image_input], outputs=[client_generated_image_preview, client_generated_file_output])
294
+ client_test_image_input.upload(client_authenticate_wrapper, inputs=[active_server_state, client_test_image_input], outputs=[client_auth_status_display, client_auth_result_output])
295
+ server_generate_button.click(server_generate_image_wrapper, inputs=[server_payload_input], outputs=[server_generated_file_output])
296
+ gen_keys_button.click(generate_pem_keys, outputs=[output_private_key, output_public_key])
297
+
298
  with gr.Row(visible=False):
299
  gr.Interface(fn=api_get_info, inputs=None, outputs=gr.JSON(), api_name="keylock-info")
300
  gr.Interface(fn=api_get_public_key, inputs=None, outputs=gr.Textbox(), api_name="keylock-pub")
301
  gr.Interface(fn=api_decode_and_auth, inputs=gr.Textbox(), outputs=gr.JSON(), api_name="keylock-auth")
 
302
 
303
  if __name__ == "__main__":
304
+ demo.launch()