Greg-House commited on
Commit
3532d53
·
verified ·
1 Parent(s): d34c396

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +57 -12
app.py CHANGED
@@ -5,34 +5,58 @@ from datetime import datetime
5
  from collections import deque, defaultdict
6
  import json
7
  import secrets
8
- import re
9
  from aiohttp import web
10
 
11
  class StreamManager:
12
  def __init__(self):
13
  self.streams = {} # Dictionary to store all stream instances
14
  self.tokens = {} # Dictionary to store stream tokens
 
15
 
16
- def create_stream(self, retention_seconds):
17
  # Generate unique stream ID and token
18
  stream_id = secrets.token_urlsafe(8)
19
  access_token = secrets.token_urlsafe(16)
20
 
21
  # Create new stream instance
22
- self.streams[stream_id] = EphemeralStreamReader(
23
  path=stream_id,
24
  retention_seconds=retention_seconds,
25
  show_output=True
26
  )
 
 
27
  self.tokens[stream_id] = access_token
28
 
 
 
 
29
  return stream_id, access_token
30
 
31
  def get_stream(self, stream_id, token):
32
- if stream_id in self.streams and self.tokens[stream_id] == token:
33
  return self.streams[stream_id]
34
  return None
35
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
36
  class EphemeralStreamReader:
37
  def __init__(self, piping_server_url="https://ppng.io", path="test123", retention_seconds=3600, show_output=False):
38
  self.url = f"{piping_server_url}/{path}"
@@ -94,7 +118,8 @@ class EphemeralStreamReader:
94
  except Exception as e:
95
  if self.show_output:
96
  print(f"Error: {e}")
97
-
 
98
  await asyncio.sleep(self.reconnect_delay)
99
 
100
  class WebServer:
@@ -107,9 +132,10 @@ class WebServer:
107
  retention_minutes = float(data.get('retention_minutes', '60'))
108
  retention_seconds = min(3600, max(0, retention_minutes * 60))
109
 
110
- stream_id, token = self.stream_manager.create_stream(retention_seconds)
111
 
112
- return web.Response(text=self.create_stream_page(stream_id, token), content_type='text/html')
 
113
 
114
  except Exception as e:
115
  return web.Response(text=f"Error creating stream: {str(e)}", status=400)
@@ -122,7 +148,8 @@ class WebServer:
122
  if not stream:
123
  return web.Response(text="Invalid stream ID or token", status=403)
124
 
125
- return web.Response(text=self.create_viewer_page(stream, token), content_type='text/html')
 
126
 
127
  async def handle_home(self, request):
128
  return web.Response(text=self.create_home_page(), content_type='text/html')
@@ -224,6 +251,8 @@ class WebServer:
224
  padding: 15px;
225
  border-radius: 4px;
226
  font-family: monospace;
 
 
227
  }}
228
  .button {{
229
  display: inline-block;
@@ -263,7 +292,7 @@ class WebServer:
263
  """
264
 
265
  def create_viewer_page(self, stream, token):
266
- return f"""
267
  <!DOCTYPE html>
268
  <html>
269
  <head>
@@ -298,6 +327,16 @@ class WebServer:
298
  margin-bottom: 20px;
299
  color: #666;
300
  }}
 
 
 
 
 
 
 
 
 
 
301
  </style>
302
  <script>
303
  function refreshData() {{
@@ -313,11 +352,12 @@ class WebServer:
313
  <div class="info">
314
  <p>Stream ID: {stream.path}</p>
315
  <p>Retention Time: {stream.retention_seconds / 60:.1f} minutes</p>
316
- <p>To write to this stream: <code>seq inf | curl -T- {stream.url}</code></p>
317
  </div>
318
  <button class="refresh-btn" onclick="refreshData()">Refresh Data</button>
319
  <div class="data-container">
320
  """
 
321
  data = stream.get_stored_data()
322
  for item in reversed(data):
323
  html += f"""
@@ -350,8 +390,13 @@ async def main():
350
 
351
  print("\nWeb interface available at http://0.0.0.0:7860")
352
 
353
- while True:
354
- await asyncio.sleep(3600) # Keep the server running
 
 
 
 
 
355
 
356
  if __name__ == "__main__":
357
  print("Starting Ephemeral Stream Server...")
 
5
  from collections import deque, defaultdict
6
  import json
7
  import secrets
 
8
  from aiohttp import web
9
 
10
  class StreamManager:
11
  def __init__(self):
12
  self.streams = {} # Dictionary to store all stream instances
13
  self.tokens = {} # Dictionary to store stream tokens
14
+ self.tasks = {} # Dictionary to store stream tasks
15
 
16
+ async def create_stream(self, retention_seconds):
17
  # Generate unique stream ID and token
18
  stream_id = secrets.token_urlsafe(8)
19
  access_token = secrets.token_urlsafe(16)
20
 
21
  # Create new stream instance
22
+ stream = EphemeralStreamReader(
23
  path=stream_id,
24
  retention_seconds=retention_seconds,
25
  show_output=True
26
  )
27
+
28
+ self.streams[stream_id] = stream
29
  self.tokens[stream_id] = access_token
30
 
31
+ # Start the stream reader task
32
+ self.tasks[stream_id] = asyncio.create_task(stream.start_reading())
33
+
34
  return stream_id, access_token
35
 
36
  def get_stream(self, stream_id, token):
37
+ if stream_id in self.streams and self.tokens.get(stream_id) == token:
38
  return self.streams[stream_id]
39
  return None
40
 
41
+ async def cleanup_inactive_streams(self):
42
+ while True:
43
+ current_time = time.time()
44
+ inactive_streams = []
45
+
46
+ for stream_id, stream in self.streams.items():
47
+ # Clean up streams that haven't been accessed in 1 hour
48
+ if current_time - stream.last_access > 3600:
49
+ inactive_streams.append(stream_id)
50
+
51
+ for stream_id in inactive_streams:
52
+ if stream_id in self.tasks:
53
+ self.tasks[stream_id].cancel()
54
+ del self.tasks[stream_id]
55
+ del self.streams[stream_id]
56
+ del self.tokens[stream_id]
57
+
58
+ await asyncio.sleep(300) # Check every 5 minutes
59
+
60
  class EphemeralStreamReader:
61
  def __init__(self, piping_server_url="https://ppng.io", path="test123", retention_seconds=3600, show_output=False):
62
  self.url = f"{piping_server_url}/{path}"
 
118
  except Exception as e:
119
  if self.show_output:
120
  print(f"Error: {e}")
121
+
122
+ # Always wait a bit before reconnecting
123
  await asyncio.sleep(self.reconnect_delay)
124
 
125
  class WebServer:
 
132
  retention_minutes = float(data.get('retention_minutes', '60'))
133
  retention_seconds = min(3600, max(0, retention_minutes * 60))
134
 
135
+ stream_id, token = await self.stream_manager.create_stream(retention_seconds)
136
 
137
+ html = self.create_stream_page(stream_id, token)
138
+ return web.Response(text=html, content_type='text/html')
139
 
140
  except Exception as e:
141
  return web.Response(text=f"Error creating stream: {str(e)}", status=400)
 
148
  if not stream:
149
  return web.Response(text="Invalid stream ID or token", status=403)
150
 
151
+ html = self.create_viewer_page(stream, token)
152
+ return web.Response(text=html, content_type='text/html')
153
 
154
  async def handle_home(self, request):
155
  return web.Response(text=self.create_home_page(), content_type='text/html')
 
251
  padding: 15px;
252
  border-radius: 4px;
253
  font-family: monospace;
254
+ white-space: pre-wrap;
255
+ word-break: break-all;
256
  }}
257
  .button {{
258
  display: inline-block;
 
292
  """
293
 
294
  def create_viewer_page(self, stream, token):
295
+ html = f"""
296
  <!DOCTYPE html>
297
  <html>
298
  <head>
 
327
  margin-bottom: 20px;
328
  color: #666;
329
  }}
330
+ .code-box {{
331
+ background: #2b2b2b;
332
+ color: #ffffff;
333
+ padding: 15px;
334
+ border-radius: 4px;
335
+ font-family: monospace;
336
+ margin: 10px 0;
337
+ white-space: pre-wrap;
338
+ word-break: break-all;
339
+ }}
340
  </style>
341
  <script>
342
  function refreshData() {{
 
352
  <div class="info">
353
  <p>Stream ID: {stream.path}</p>
354
  <p>Retention Time: {stream.retention_seconds / 60:.1f} minutes</p>
355
+ <div class="code-box">To write to this stream: seq inf | curl -T- {stream.url}</div>
356
  </div>
357
  <button class="refresh-btn" onclick="refreshData()">Refresh Data</button>
358
  <div class="data-container">
359
  """
360
+
361
  data = stream.get_stored_data()
362
  for item in reversed(data):
363
  html += f"""
 
390
 
391
  print("\nWeb interface available at http://0.0.0.0:7860")
392
 
393
+ # Start the cleanup task
394
+ cleanup_task = asyncio.create_task(stream_manager.cleanup_inactive_streams())
395
+
396
+ try:
397
+ await asyncio.Event().wait() # Keep the server running indefinitely
398
+ finally:
399
+ cleanup_task.cancel()
400
 
401
  if __name__ == "__main__":
402
  print("Starting Ephemeral Stream Server...")