Greg-House commited on
Commit
d34c396
·
verified ·
1 Parent(s): 5027087

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +281 -98
app.py CHANGED
@@ -2,28 +2,56 @@ import asyncio
2
  import aiohttp
3
  import time
4
  from datetime import datetime
5
- from collections import deque
6
  import json
 
 
7
  from aiohttp import web
8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9
  class EphemeralStreamReader:
10
- def __init__(self, piping_server_url="https://ppng.io", path="test123", retention_hours=1, show_output=False):
11
  self.url = f"{piping_server_url}/{path}"
12
  self.path = path
13
  self.reconnect_delay = 1
14
  self.show_output = show_output
15
- self.retention_hours = retention_hours
16
  self.stored_data = deque()
17
  self.last_cleanup = time.time()
 
18
 
19
  def cleanup_old_data(self):
20
  current_time = time.time()
21
- retention_seconds = self.retention_hours * 3600
22
- while self.stored_data and (current_time - self.stored_data[0]['timestamp']) > retention_seconds:
23
  self.stored_data.popleft()
24
 
25
  def get_stored_data(self):
26
  self.cleanup_old_data()
 
27
  return list(self.stored_data)
28
 
29
  def store_chunk(self, data, timestamp):
@@ -57,14 +85,8 @@ class EphemeralStreamReader:
57
  try:
58
  text = chunk.decode('utf-8')
59
  self.store_chunk(text, current_time)
60
-
61
- if self.show_output:
62
- timestamp = datetime.fromtimestamp(current_time).strftime('%Y-%m-%d %H:%M:%S')
63
- print(f"[{timestamp}] Received: {text}", end='', flush=True)
64
  except UnicodeDecodeError:
65
  self.store_chunk(str(chunk), current_time)
66
- if self.show_output:
67
- print(f"Received raw bytes: {chunk}")
68
 
69
  except aiohttp.ClientError as e:
70
  if self.show_output:
@@ -73,105 +95,266 @@ class EphemeralStreamReader:
73
  if self.show_output:
74
  print(f"Error: {e}")
75
 
76
- if self.show_output:
77
- print(f"Connection closed. Reconnecting in {self.reconnect_delay} seconds...")
78
  await asyncio.sleep(self.reconnect_delay)
79
 
80
- async def start_http_server(self, host='0.0.0.0', port=7860):
81
- async def handle_get(request):
82
- data = self.get_stored_data()
83
- html = f"""
84
- <!DOCTYPE html>
85
- <html>
86
- <head>
87
- <title>Ephemeral Stream Data</title>
88
- <style>
89
- body {{
90
- font-family: Arial, sans-serif;
91
- margin: 20px;
92
- }}
93
- .data-container {{
94
- margin: 20px 0;
95
- }}
96
- .timestamp {{
97
- color: #666;
98
- font-size: 0.9em;
99
- }}
100
- .data-item {{
101
- margin: 10px 0;
102
- padding: 10px;
103
- background: #f5f5f5;
104
- border-radius: 4px;
105
- }}
106
- .refresh-btn {{
107
- padding: 10px 20px;
108
- background: #4CAF50;
109
- color: white;
110
- border: none;
111
- border-radius: 4px;
112
- cursor: pointer;
113
- }}
114
- .info {{
115
- margin-bottom: 20px;
116
- color: #666;
117
- }}
118
- </style>
119
- <script>
120
- function refreshData() {{
121
- window.location.reload();
122
- }}
123
-
124
- // Auto refresh every 5 seconds
125
- setInterval(refreshData, 5000);
126
- </script>
127
- </head>
128
- <body>
129
- <h1>Ephemeral Stream Data</h1>
130
- <div class="info">
131
- <p>Stream Path: {self.path}</p>
132
- <p>Retention Time: {self.retention_hours} hours</p>
133
- <p>To write to this stream: <code>seq inf | curl -T- {self.url}</code></p>
134
- </div>
135
- <button class="refresh-btn" onclick="refreshData()">Refresh Data</button>
136
- <div class="data-container">
137
- """
138
 
139
- for item in reversed(data):
140
- html += f"""
141
- <div class="data-item">
142
- <div class="timestamp">{item['formatted_time']}</div>
143
- <pre>{item['data']}</pre>
144
- </div>
145
- """
146
 
147
- html += """
148
- </div>
149
- </body>
150
- </html>
151
- """
152
- return web.Response(text=html, content_type='text/html')
153
 
154
- app = web.Application()
155
- app.router.add_get('/', handle_get)
 
156
 
157
- runner = web.AppRunner(app)
158
- await runner.setup()
159
- site = web.TCPSite(runner, host, port)
160
- await site.start()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
161
 
162
- print(f"\nWeb interface available at http://{host}:{port}")
163
- print(f"To write to this stream: seq inf | curl -T- {self.url}")
 
 
 
 
164
 
165
  async def main():
166
- reader = EphemeralStreamReader(retention_hours=1, show_output=True)
 
 
 
 
 
 
 
 
 
 
 
 
 
167
 
168
- await asyncio.gather(
169
- reader.start_reading(),
170
- reader.start_http_server(host='0.0.0.0', port=7860)
171
- )
172
 
173
  if __name__ == "__main__":
174
- print("Starting Ephemeral Stream Reader...")
175
  try:
176
  asyncio.run(main())
177
  except KeyboardInterrupt:
 
2
  import aiohttp
3
  import time
4
  from datetime import datetime
5
+ from collections import deque, defaultdict
6
  import json
7
+ import secrets
8
+ import re
9
  from aiohttp import web
10
 
11
+ class StreamManager:
12
+ def __init__(self):
13
+ self.streams = {} # Dictionary to store all stream instances
14
+ self.tokens = {} # Dictionary to store stream tokens
15
+
16
+ def create_stream(self, retention_seconds):
17
+ # Generate unique stream ID and token
18
+ stream_id = secrets.token_urlsafe(8)
19
+ access_token = secrets.token_urlsafe(16)
20
+
21
+ # Create new stream instance
22
+ self.streams[stream_id] = EphemeralStreamReader(
23
+ path=stream_id,
24
+ retention_seconds=retention_seconds,
25
+ show_output=True
26
+ )
27
+ self.tokens[stream_id] = access_token
28
+
29
+ return stream_id, access_token
30
+
31
+ def get_stream(self, stream_id, token):
32
+ if stream_id in self.streams and self.tokens[stream_id] == token:
33
+ return self.streams[stream_id]
34
+ return None
35
+
36
  class EphemeralStreamReader:
37
+ def __init__(self, piping_server_url="https://ppng.io", path="test123", retention_seconds=3600, show_output=False):
38
  self.url = f"{piping_server_url}/{path}"
39
  self.path = path
40
  self.reconnect_delay = 1
41
  self.show_output = show_output
42
+ self.retention_seconds = min(3600, max(0, retention_seconds)) # Limit between 0 and 3600 seconds
43
  self.stored_data = deque()
44
  self.last_cleanup = time.time()
45
+ self.last_access = time.time()
46
 
47
  def cleanup_old_data(self):
48
  current_time = time.time()
49
+ while self.stored_data and (current_time - self.stored_data[0]['timestamp']) > self.retention_seconds:
 
50
  self.stored_data.popleft()
51
 
52
  def get_stored_data(self):
53
  self.cleanup_old_data()
54
+ self.last_access = time.time()
55
  return list(self.stored_data)
56
 
57
  def store_chunk(self, data, timestamp):
 
85
  try:
86
  text = chunk.decode('utf-8')
87
  self.store_chunk(text, current_time)
 
 
 
 
88
  except UnicodeDecodeError:
89
  self.store_chunk(str(chunk), current_time)
 
 
90
 
91
  except aiohttp.ClientError as e:
92
  if self.show_output:
 
95
  if self.show_output:
96
  print(f"Error: {e}")
97
 
 
 
98
  await asyncio.sleep(self.reconnect_delay)
99
 
100
+ class WebServer:
101
+ def __init__(self, stream_manager):
102
+ self.stream_manager = stream_manager
103
+
104
+ async def handle_create_stream(self, request):
105
+ try:
106
+ data = await request.post()
107
+ retention_minutes = float(data.get('retention_minutes', '60'))
108
+ retention_seconds = min(3600, max(0, retention_minutes * 60))
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
109
 
110
+ stream_id, token = self.stream_manager.create_stream(retention_seconds)
 
 
 
 
 
 
111
 
112
+ return web.Response(text=self.create_stream_page(stream_id, token), content_type='text/html')
113
+
114
+ except Exception as e:
115
+ return web.Response(text=f"Error creating stream: {str(e)}", status=400)
 
 
116
 
117
+ async def handle_view_stream(self, request):
118
+ stream_id = request.match_info.get('stream_id')
119
+ token = request.query.get('token')
120
 
121
+ stream = self.stream_manager.get_stream(stream_id, token)
122
+ if not stream:
123
+ return web.Response(text="Invalid stream ID or token", status=403)
124
+
125
+ return web.Response(text=self.create_viewer_page(stream, token), content_type='text/html')
126
+
127
+ async def handle_home(self, request):
128
+ return web.Response(text=self.create_home_page(), content_type='text/html')
129
+
130
+ def create_home_page(self):
131
+ return """
132
+ <!DOCTYPE html>
133
+ <html>
134
+ <head>
135
+ <title>Create Ephemeral Stream</title>
136
+ <style>
137
+ body {
138
+ font-family: Arial, sans-serif;
139
+ margin: 20px;
140
+ max-width: 800px;
141
+ margin: 0 auto;
142
+ padding: 20px;
143
+ }
144
+ .form-container {
145
+ margin: 20px 0;
146
+ padding: 20px;
147
+ border: 1px solid #ddd;
148
+ border-radius: 4px;
149
+ }
150
+ .input-group {
151
+ margin: 10px 0;
152
+ }
153
+ input, button {
154
+ padding: 8px;
155
+ margin: 5px 0;
156
+ }
157
+ button {
158
+ background: #4CAF50;
159
+ color: white;
160
+ border: none;
161
+ border-radius: 4px;
162
+ cursor: pointer;
163
+ }
164
+ .info {
165
+ margin: 20px 0;
166
+ padding: 15px;
167
+ background: #f5f5f5;
168
+ border-radius: 4px;
169
+ }
170
+ </style>
171
+ </head>
172
+ <body>
173
+ <h1>Create New Ephemeral Stream</h1>
174
+ <div class="form-container">
175
+ <form method="POST" action="/create">
176
+ <div class="input-group">
177
+ <label for="retention_minutes">Retention Time (minutes, max 60):</label><br>
178
+ <input type="number" id="retention_minutes" name="retention_minutes"
179
+ min="0" max="60" value="60" step="0.5" required>
180
+ </div>
181
+ <button type="submit">Create Stream</button>
182
+ </form>
183
+ </div>
184
+ <div class="info">
185
+ <h3>About Ephemeral Streams</h3>
186
+ <p>Create your private stream with custom retention time. Each stream:</p>
187
+ <ul>
188
+ <li>Has a unique ID and access token</li>
189
+ <li>Can only be viewed with the correct token</li>
190
+ <li>Automatically deletes data older than the specified retention time</li>
191
+ <li>Supports retention times from 0 to 60 minutes</li>
192
+ </ul>
193
+ </div>
194
+ </body>
195
+ </html>
196
+ """
197
+
198
+ def create_stream_page(self, stream_id, token):
199
+ view_url = f"/view/{stream_id}?token={token}"
200
+ write_url = f"https://ppng.io/{stream_id}"
201
+
202
+ return f"""
203
+ <!DOCTYPE html>
204
+ <html>
205
+ <head>
206
+ <title>Stream Created</title>
207
+ <style>
208
+ body {{
209
+ font-family: Arial, sans-serif;
210
+ margin: 20px;
211
+ max-width: 800px;
212
+ margin: 0 auto;
213
+ padding: 20px;
214
+ }}
215
+ .info-box {{
216
+ background: #f5f5f5;
217
+ padding: 20px;
218
+ border-radius: 4px;
219
+ margin: 20px 0;
220
+ }}
221
+ .code-box {{
222
+ background: #2b2b2b;
223
+ color: #ffffff;
224
+ padding: 15px;
225
+ border-radius: 4px;
226
+ font-family: monospace;
227
+ }}
228
+ .button {{
229
+ display: inline-block;
230
+ padding: 10px 20px;
231
+ background: #4CAF50;
232
+ color: white;
233
+ text-decoration: none;
234
+ border-radius: 4px;
235
+ margin: 10px 0;
236
+ }}
237
+ </style>
238
+ </head>
239
+ <body>
240
+ <h1>Stream Created Successfully</h1>
241
+ <div class="info-box">
242
+ <h3>Your Stream Information</h3>
243
+ <p><strong>Stream ID:</strong> {stream_id}</p>
244
+ <p><strong>Access Token:</strong> {token}</p>
245
+ <p><strong>View URL:</strong> <a href="{view_url}">{view_url}</a></p>
246
+
247
+ <h3>How to Write to Your Stream</h3>
248
+ <p>Use curl to write to your stream:</p>
249
+ <div class="code-box">
250
+ seq inf | curl -T- {write_url}
251
+ </div>
252
+
253
+ <h3>Important Notes</h3>
254
+ <ul>
255
+ <li>Keep your access token secret</li>
256
+ <li>Bookmark the view URL for easy access</li>
257
+ <li>Data older than your specified retention time will be automatically deleted</li>
258
+ </ul>
259
+ </div>
260
+ <a href="{view_url}" class="button">View Your Stream</a>
261
+ </body>
262
+ </html>
263
+ """
264
+
265
+ def create_viewer_page(self, stream, token):
266
+ return f"""
267
+ <!DOCTYPE html>
268
+ <html>
269
+ <head>
270
+ <title>Stream Viewer</title>
271
+ <style>
272
+ body {{
273
+ font-family: Arial, sans-serif;
274
+ margin: 20px;
275
+ }}
276
+ .data-container {{
277
+ margin: 20px 0;
278
+ }}
279
+ .timestamp {{
280
+ color: #666;
281
+ font-size: 0.9em;
282
+ }}
283
+ .data-item {{
284
+ margin: 10px 0;
285
+ padding: 10px;
286
+ background: #f5f5f5;
287
+ border-radius: 4px;
288
+ }}
289
+ .refresh-btn {{
290
+ padding: 10px 20px;
291
+ background: #4CAF50;
292
+ color: white;
293
+ border: none;
294
+ border-radius: 4px;
295
+ cursor: pointer;
296
+ }}
297
+ .info {{
298
+ margin-bottom: 20px;
299
+ color: #666;
300
+ }}
301
+ </style>
302
+ <script>
303
+ function refreshData() {{
304
+ window.location.reload();
305
+ }}
306
+
307
+ // Auto refresh every 5 seconds
308
+ setInterval(refreshData, 5000);
309
+ </script>
310
+ </head>
311
+ <body>
312
+ <h1>Stream Viewer</h1>
313
+ <div class="info">
314
+ <p>Stream ID: {stream.path}</p>
315
+ <p>Retention Time: {stream.retention_seconds / 60:.1f} minutes</p>
316
+ <p>To write to this stream: <code>seq inf | curl -T- {stream.url}</code></p>
317
+ </div>
318
+ <button class="refresh-btn" onclick="refreshData()">Refresh Data</button>
319
+ <div class="data-container">
320
+ """
321
+ data = stream.get_stored_data()
322
+ for item in reversed(data):
323
+ html += f"""
324
+ <div class="data-item">
325
+ <div class="timestamp">{item['formatted_time']}</div>
326
+ <pre>{item['data']}</pre>
327
+ </div>
328
+ """
329
 
330
+ html += """
331
+ </div>
332
+ </body>
333
+ </html>
334
+ """
335
+ return html
336
 
337
  async def main():
338
+ stream_manager = StreamManager()
339
+ web_server = WebServer(stream_manager)
340
+
341
+ app = web.Application()
342
+ app.router.add_get('/', web_server.handle_home)
343
+ app.router.add_post('/create', web_server.handle_create_stream)
344
+ app.router.add_get('/view/{stream_id}', web_server.handle_view_stream)
345
+
346
+ runner = web.AppRunner(app)
347
+ await runner.setup()
348
+ site = web.TCPSite(runner, '0.0.0.0', 7860)
349
+ await site.start()
350
+
351
+ print("\nWeb interface available at http://0.0.0.0:7860")
352
 
353
+ while True:
354
+ await asyncio.sleep(3600) # Keep the server running
 
 
355
 
356
  if __name__ == "__main__":
357
+ print("Starting Ephemeral Stream Server...")
358
  try:
359
  asyncio.run(main())
360
  except KeyboardInterrupt: