broadfield-dev commited on
Commit
3f5a983
Β·
verified Β·
1 Parent(s): a2295c2

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +110 -29
app.py CHANGED
@@ -13,10 +13,12 @@ from cryptography.hazmat.primitives.asymmetric import rsa, padding
13
  from cryptography.hazmat.primitives.ciphers.aead import AESGCM
14
  from cryptography.hazmat.primitives import hashes
15
 
 
16
  logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
17
  logger = logging.getLogger(__name__)
18
 
19
- CREATOR_ENDPOINTS_JSON_URL = "https://huggingface.co/spaces/broadfield-dev/KeyLock-Auth-Creator/raw/main/endpoints.json"
 
20
  BASE_HF_URL = "https://huggingface.co/spaces/"
21
  CREATOR_SPACE_ID = "broadfield-dev/KeyLock-Auth-Creator"
22
  SERVER_SPACE_ID = "broadfield-dev/KeyLock-Auth-Server"
@@ -27,102 +29,180 @@ SERVER_APP_PY_URL = f"{SERVER_URL}/raw/main/app.py"
27
 
28
 
29
  def generate_rsa_keys():
 
30
  private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
31
- private_pem = private_key.private_bytes(encoding=serialization.Encoding.PEM, format=serialization.PrivateFormat.PKCS8, encryption_algorithm=serialization.NoEncryption()).decode('utf-8')
32
- public_pem = private_key.public_key().public_bytes(encoding=serialization.Encoding.PEM, format=serialization.PublicFormat.SubjectPublicKeyInfo).decode('utf-8')
 
 
 
 
 
 
 
33
  return private_pem, public_pem
34
 
35
  def create_encrypted_image(secret_data_str: str, public_key_pem: str) -> Image.Image:
36
- if not secret_data_str.strip(): raise ValueError("Secret data cannot be empty.")
37
- if not public_key_pem.strip(): raise ValueError("Public Key cannot be empty.")
 
 
 
 
38
  data_dict = {}
39
  for line in secret_data_str.splitlines():
40
  line = line.strip()
41
- if not line or line.startswith('#'): continue
 
42
  parts = line.split(':', 1) if ':' in line else line.split('=', 1)
43
- if len(parts) == 2: data_dict[parts[0].strip()] = parts[1].strip().strip("'\"")
44
- if not data_dict: raise ValueError("No valid key-value pairs found.")
 
 
 
 
45
  json_bytes = json.dumps(data_dict).encode('utf-8')
46
  public_key = serialization.load_pem_public_key(public_key_pem.encode('utf-8'))
 
 
47
  aes_key, nonce = os.urandom(32), os.urandom(12)
48
  ciphertext = AESGCM(aes_key).encrypt(nonce, json_bytes, None)
49
- rsa_encrypted_key = public_key.encrypt(aes_key, padding.OAEP(mgf=padding.MGF1(hashes.SHA256()), algorithm=hashes.SHA256(), label=None))
 
 
 
 
 
 
 
50
  encrypted_payload = struct.pack('>I', len(rsa_encrypted_key)) + rsa_encrypted_key + nonce + ciphertext
 
 
51
  img = Image.new('RGB', (800, 600), color=(45, 52, 54))
52
  draw = ImageDraw.Draw(img)
53
- try: font = ImageFont.truetype("DejaVuSans.ttf", 40)
54
- except IOError: font = ImageFont.load_default(size=30)
 
 
55
  draw.text((400, 300), "KeyLock Secure Data", fill=(223, 230, 233), font=font, anchor="ms")
 
 
56
  pixel_data = np.array(img.convert("RGB")).ravel()
57
  binary_payload = ''.join(format(b, '08b') for b in struct.pack('>I', len(encrypted_payload)) + encrypted_payload)
58
- if len(binary_payload) > pixel_data.size: raise ValueError("Data too large for image.")
 
 
 
59
  pixel_data[:len(binary_payload)] = (pixel_data[:len(binary_payload)] & 0xFE) | np.array(list(binary_payload), dtype=np.uint8)
60
  stego_pixels = pixel_data.reshape((600, 800, 3))
 
61
  return Image.fromarray(stego_pixels, 'RGB')
62
 
63
  def get_server_list():
64
- status = f"Fetching server list from remote config..."
 
65
  yield gr.Dropdown(choices=[], value=None, label="⏳ Fetching..."), status, []
66
  try:
67
  response = requests.get(CREATOR_ENDPOINTS_JSON_URL, timeout=10)
68
  response.raise_for_status()
69
  all_entries = response.json()
70
  valid_endpoints = []
71
- required_keys = ["name", "api_endpoint", "public_key"]
72
  for entry in all_entries:
73
- if all(key in entry for key in required_keys):
74
- valid_endpoints.append(entry)
75
- else:
76
- logger.warning(f"Skipping invalid entry in configuration file: {entry}")
 
 
 
 
 
 
 
 
 
 
 
 
77
  if not valid_endpoints:
78
  raise ValueError("No valid server configurations found in the remote file.")
 
79
  endpoint_names = [e['name'] for e in valid_endpoints]
80
  status = f"βœ… Success! Found {len(endpoint_names)} valid servers."
81
  yield gr.Dropdown(choices=endpoint_names, value=endpoint_names[0] if endpoint_names else None, label="Target Server"), status, valid_endpoints
82
  except Exception as e:
83
  status = f"❌ Error fetching or parsing configuration: {e}"
 
84
  yield gr.Dropdown(choices=[], value=None, label="Error fetching servers"), status, []
85
 
86
  def create_keylock_wrapper(service_name: str, secret_data: str, available_endpoints: list):
87
- if not service_name: raise gr.Error("Please select a target server.")
 
 
88
  public_key = next((e['public_key'] for e in available_endpoints if e['name'] == service_name), None)
89
- if not public_key: raise gr.Error(f"Could not find public key for '{service_name}'.")
 
90
  try:
91
  created_image = create_encrypted_image(secret_data, public_key)
92
  return created_image, f"βœ… Success! Image created for '{service_name}'."
93
  except Exception as e:
 
94
  return None, f"❌ Error: {e}"
95
 
96
  def send_keylock_wrapper(service_name: str, image: Image.Image, available_endpoints: list):
97
- if not service_name: raise gr.Error("Please select a target server.")
98
- if image is None: raise gr.Error("Please upload an image to send.")
 
 
 
 
99
  api_endpoint = next((e.get('api_endpoint') for e in available_endpoints if e['name'] == service_name), None)
100
- if not api_endpoint: raise gr.Error(f"Configuration Error: Could not find 'api_endpoint' for '{service_name}'.")
 
 
101
  status = f"Connecting to endpoint: {api_endpoint}"
102
  yield None, status
 
103
  try:
104
  with io.BytesIO() as buffer:
105
  image.save(buffer, format="PNG")
106
  b64_string = base64.b64encode(buffer.getvalue()).decode("utf-8")
 
 
107
  payload = {"data": [b64_string]}
108
  headers = {"Content-Type": "application/json"}
 
109
  response = requests.post(api_endpoint, headers=headers, json=payload, timeout=45)
 
 
110
  response_json = response.json()
111
- if response.status_code == 200:
112
- if "data" in response_json:
113
- yield response_json["data"][0], "βœ… Success! Data decrypted by remote server."
114
- else:
115
- raise gr.Error(f"API returned an unexpected success format: {response_json}")
 
 
 
 
 
 
116
  else:
117
- raise gr.Error(f"API Error (Status {response.status_code}): {response_json.get('error', 'Unknown error')}")
 
118
  except Exception as e:
 
119
  yield None, f"❌ Error calling server API: {e}"
120
 
121
  def refresh_and_update_all():
 
 
122
  for dropdown_update, status_update, state_update in get_server_list():
123
  pass
124
  return dropdown_update, dropdown_update, status_update, state_update
125
 
 
126
  theme = gr.themes.Base(
127
  primary_hue=gr.themes.colors.blue, secondary_hue=gr.themes.colors.sky, neutral_hue=gr.themes.colors.slate,
128
  font=(gr.themes.GoogleFont("Inter"), "system-ui", "sans-serif"),
@@ -186,6 +266,7 @@ with gr.Blocks(theme=theme, title="KeyLock Operations Dashboard") as demo:
186
  output_private_key = gr.Textbox(lines=10, label="Generated Private Key", interactive=False, show_copy_button=True)
187
  gen_keys_button = gr.Button("βš™οΈ Generate New 2048-bit Key Pair", variant="secondary")
188
 
 
189
  gen_keys_button.click(fn=generate_rsa_keys, inputs=None, outputs=[output_private_key, output_public_key])
190
  refresh_button.click(fn=refresh_and_update_all, outputs=[creator_service_dropdown, send_service_dropdown, creator_status, endpoints_state])
191
  demo.load(fn=refresh_and_update_all, outputs=[creator_service_dropdown, send_service_dropdown, creator_status, endpoints_state])
 
13
  from cryptography.hazmat.primitives.ciphers.aead import AESGCM
14
  from cryptography.hazmat.primitives import hashes
15
 
16
+ # --- Basic Configuration ---
17
  logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
18
  logger = logging.getLogger(__name__)
19
 
20
+ # --- Constants ---
21
+ CREATOR_ENDPOINTS_JSON_URL = "./endpoints.json"
22
  BASE_HF_URL = "https://huggingface.co/spaces/"
23
  CREATOR_SPACE_ID = "broadfield-dev/KeyLock-Auth-Creator"
24
  SERVER_SPACE_ID = "broadfield-dev/KeyLock-Auth-Server"
 
29
 
30
 
31
  def generate_rsa_keys():
32
+ """Generates a new 2048-bit RSA private and public key pair."""
33
  private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
34
+ private_pem = private_key.private_bytes(
35
+ encoding=serialization.Encoding.PEM,
36
+ format=serialization.PrivateFormat.PKCS8,
37
+ encryption_algorithm=serialization.NoEncryption()
38
+ ).decode('utf-8')
39
+ public_pem = private_key.public_key().public_bytes(
40
+ encoding=serialization.Encoding.PEM,
41
+ format=serialization.PublicFormat.SubjectPublicKeyInfo
42
+ ).decode('utf-8')
43
  return private_pem, public_pem
44
 
45
  def create_encrypted_image(secret_data_str: str, public_key_pem: str) -> Image.Image:
46
+ """Encrypts data and embeds it into a new image using steganography."""
47
+ if not secret_data_str.strip():
48
+ raise ValueError("Secret data cannot be empty.")
49
+ if not public_key_pem.strip():
50
+ raise ValueError("Public Key cannot be empty.")
51
+
52
  data_dict = {}
53
  for line in secret_data_str.splitlines():
54
  line = line.strip()
55
+ if not line or line.startswith('#'):
56
+ continue
57
  parts = line.split(':', 1) if ':' in line else line.split('=', 1)
58
+ if len(parts) == 2:
59
+ data_dict[parts[0].strip()] = parts[1].strip().strip("'\"")
60
+
61
+ if not data_dict:
62
+ raise ValueError("No valid key-value pairs found in secret data.")
63
+
64
  json_bytes = json.dumps(data_dict).encode('utf-8')
65
  public_key = serialization.load_pem_public_key(public_key_pem.encode('utf-8'))
66
+
67
+ # AES-GCM encryption
68
  aes_key, nonce = os.urandom(32), os.urandom(12)
69
  ciphertext = AESGCM(aes_key).encrypt(nonce, json_bytes, None)
70
+
71
+ # RSA-OAEP encryption for the AES key
72
+ rsa_encrypted_key = public_key.encrypt(
73
+ aes_key,
74
+ padding.OAEP(mgf=padding.MGF1(hashes.SHA256()), algorithm=hashes.SHA256(), label=None)
75
+ )
76
+
77
+ # Pack the payload: [len(rsa_key)][rsa_key][nonce][ciphertext]
78
  encrypted_payload = struct.pack('>I', len(rsa_encrypted_key)) + rsa_encrypted_key + nonce + ciphertext
79
+
80
+ # Create a base image
81
  img = Image.new('RGB', (800, 600), color=(45, 52, 54))
82
  draw = ImageDraw.Draw(img)
83
+ try:
84
+ font = ImageFont.truetype("DejaVuSans.ttf", 40)
85
+ except IOError:
86
+ font = ImageFont.load_default(size=30)
87
  draw.text((400, 300), "KeyLock Secure Data", fill=(223, 230, 233), font=font, anchor="ms")
88
+
89
+ # LSB Steganography
90
  pixel_data = np.array(img.convert("RGB")).ravel()
91
  binary_payload = ''.join(format(b, '08b') for b in struct.pack('>I', len(encrypted_payload)) + encrypted_payload)
92
+
93
+ if len(binary_payload) > pixel_data.size:
94
+ raise ValueError(f"Data is too large for the image. Max size: {pixel_data.size // 8} bytes.")
95
+
96
  pixel_data[:len(binary_payload)] = (pixel_data[:len(binary_payload)] & 0xFE) | np.array(list(binary_payload), dtype=np.uint8)
97
  stego_pixels = pixel_data.reshape((600, 800, 3))
98
+
99
  return Image.fromarray(stego_pixels, 'RGB')
100
 
101
  def get_server_list():
102
+ """Fetches and validates the server list from the remote JSON configuration."""
103
+ status = "Fetching server list from remote config..."
104
  yield gr.Dropdown(choices=[], value=None, label="⏳ Fetching..."), status, []
105
  try:
106
  response = requests.get(CREATOR_ENDPOINTS_JSON_URL, timeout=10)
107
  response.raise_for_status()
108
  all_entries = response.json()
109
  valid_endpoints = []
110
+
111
  for entry in all_entries:
112
+ if not isinstance(entry, dict) or "name" not in entry or "public_key" not in entry:
113
+ logger.warning(f"Skipping invalid entry (missing name or public_key): {entry}")
114
+ continue
115
+
116
+ # **FIX: Auto-generate api_endpoint from link if it's missing**
117
+ if "api_endpoint" not in entry:
118
+ if "link" in entry:
119
+ base_url = entry["link"].strip("/")
120
+ entry["api_endpoint"] = f"{base_url}/api/predict/"
121
+ logger.info(f"Auto-generating API endpoint for '{entry['name']}': {entry['api_endpoint']}")
122
+ else:
123
+ logger.warning(f"Skipping invalid entry '{entry['name']}' (missing 'api_endpoint' and 'link'): {entry}")
124
+ continue
125
+
126
+ valid_endpoints.append(entry)
127
+
128
  if not valid_endpoints:
129
  raise ValueError("No valid server configurations found in the remote file.")
130
+
131
  endpoint_names = [e['name'] for e in valid_endpoints]
132
  status = f"βœ… Success! Found {len(endpoint_names)} valid servers."
133
  yield gr.Dropdown(choices=endpoint_names, value=endpoint_names[0] if endpoint_names else None, label="Target Server"), status, valid_endpoints
134
  except Exception as e:
135
  status = f"❌ Error fetching or parsing configuration: {e}"
136
+ logger.error(status)
137
  yield gr.Dropdown(choices=[], value=None, label="Error fetching servers"), status, []
138
 
139
  def create_keylock_wrapper(service_name: str, secret_data: str, available_endpoints: list):
140
+ """Wrapper function to handle UI for creating the encrypted image."""
141
+ if not service_name:
142
+ raise gr.Error("Please select a target server.")
143
  public_key = next((e['public_key'] for e in available_endpoints if e['name'] == service_name), None)
144
+ if not public_key:
145
+ raise gr.Error(f"Could not find public key for '{service_name}'. Please refresh the server list.")
146
  try:
147
  created_image = create_encrypted_image(secret_data, public_key)
148
  return created_image, f"βœ… Success! Image created for '{service_name}'."
149
  except Exception as e:
150
+ logger.error(f"Error creating image: {e}")
151
  return None, f"❌ Error: {e}"
152
 
153
  def send_keylock_wrapper(service_name: str, image: Image.Image, available_endpoints: list):
154
+ """Wrapper function to handle UI for sending the image to the remote server."""
155
+ if not service_name:
156
+ raise gr.Error("Please select a target server.")
157
+ if image is None:
158
+ raise gr.Error("Please create or upload an image to send.")
159
+
160
  api_endpoint = next((e.get('api_endpoint') for e in available_endpoints if e['name'] == service_name), None)
161
+ if not api_endpoint:
162
+ raise gr.Error(f"Configuration Error: Could not find 'api_endpoint' for '{service_name}'.")
163
+
164
  status = f"Connecting to endpoint: {api_endpoint}"
165
  yield None, status
166
+
167
  try:
168
  with io.BytesIO() as buffer:
169
  image.save(buffer, format="PNG")
170
  b64_string = base64.b64encode(buffer.getvalue()).decode("utf-8")
171
+
172
+ # Gradio API expects data in this format
173
  payload = {"data": [b64_string]}
174
  headers = {"Content-Type": "application/json"}
175
+
176
  response = requests.post(api_endpoint, headers=headers, json=payload, timeout=45)
177
+ response.raise_for_status() # **IMPROVEMENT: Raise exception for HTTP errors**
178
+
179
  response_json = response.json()
180
+ if "data" in response_json:
181
+ decrypted_data = response_json["data"][0]
182
+ # If the server returns a stringified JSON, parse it again
183
+ if isinstance(decrypted_data, str):
184
+ try:
185
+ decrypted_data = json.loads(decrypted_data)
186
+ except json.JSONDecodeError:
187
+ pass # Keep as string if not valid JSON
188
+ yield decrypted_data, "βœ… Success! Data decrypted by remote server."
189
+ elif "error" in response_json:
190
+ raise gr.Error(f"API Error: {response_json['error']}")
191
  else:
192
+ raise gr.Error(f"API returned an unexpected response format: {response_json}")
193
+
194
  except Exception as e:
195
+ logger.error(f"Error calling server API: {e}")
196
  yield None, f"❌ Error calling server API: {e}"
197
 
198
  def refresh_and_update_all():
199
+ """Calls get_server_list and updates all relevant UI components."""
200
+ # The generator yields updates, we need to iterate to get the final one
201
  for dropdown_update, status_update, state_update in get_server_list():
202
  pass
203
  return dropdown_update, dropdown_update, status_update, state_update
204
 
205
+ # --- Gradio UI Definition ---
206
  theme = gr.themes.Base(
207
  primary_hue=gr.themes.colors.blue, secondary_hue=gr.themes.colors.sky, neutral_hue=gr.themes.colors.slate,
208
  font=(gr.themes.GoogleFont("Inter"), "system-ui", "sans-serif"),
 
266
  output_private_key = gr.Textbox(lines=10, label="Generated Private Key", interactive=False, show_copy_button=True)
267
  gen_keys_button = gr.Button("βš™οΈ Generate New 2048-bit Key Pair", variant="secondary")
268
 
269
+ # --- Event Handlers ---
270
  gen_keys_button.click(fn=generate_rsa_keys, inputs=None, outputs=[output_private_key, output_public_key])
271
  refresh_button.click(fn=refresh_and_update_all, outputs=[creator_service_dropdown, send_service_dropdown, creator_status, endpoints_state])
272
  demo.load(fn=refresh_and_update_all, outputs=[creator_service_dropdown, send_service_dropdown, creator_status, endpoints_state])