Greg-House commited on
Commit
5027087
·
verified ·
1 Parent(s): e882b34
Files changed (1) hide show
  1. app.py +102 -33
app.py CHANGED
@@ -1,9 +1,10 @@
1
  import asyncio
2
  import aiohttp
 
3
  from datetime import datetime
4
  from collections import deque
5
- import time
6
- import gradio as gr
7
 
8
  class EphemeralStreamReader:
9
  def __init__(self, piping_server_url="https://ppng.io", path="test123", retention_hours=1, show_output=False):
@@ -56,8 +57,14 @@ class EphemeralStreamReader:
56
  try:
57
  text = chunk.decode('utf-8')
58
  self.store_chunk(text, current_time)
 
 
 
 
59
  except UnicodeDecodeError:
60
  self.store_chunk(str(chunk), current_time)
 
 
61
 
62
  except aiohttp.ClientError as e:
63
  if self.show_output:
@@ -66,44 +73,106 @@ class EphemeralStreamReader:
66
  if self.show_output:
67
  print(f"Error: {e}")
68
 
 
 
69
  await asyncio.sleep(self.reconnect_delay)
70
 
71
- def format_stored_data(reader):
72
- data = reader.get_stored_data()
73
- formatted_text = ""
74
- for item in reversed(data):
75
- formatted_text += f"[{item['formatted_time']}]\n{item['data']}\n\n"
76
- return formatted_text
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
77
 
78
- def create_ui(reader):
79
- with gr.Blocks() as demo:
80
- gr.Markdown(f"# Ephemeral Stream Reader\nStream Path: {reader.path}\nRetention Time: {reader.retention_hours} hours")
81
- gr.Markdown(f"To write to this stream: `seq inf | curl -T- {reader.url}`")
82
-
83
- output_text = gr.TextArea(label="Stream Output", interactive=False)
84
- refresh_btn = gr.Button("Refresh")
85
 
86
- def update_output():
87
- return format_stored_data(reader)
 
 
88
 
89
- refresh_btn.click(fn=update_output, outputs=[output_text])
90
- demo.load(fn=update_output, outputs=[output_text])
91
-
92
- # Auto-refresh every 5 seconds
93
- demo.load(fn=update_output, outputs=[output_text], every=5)
94
-
95
- return demo
96
 
97
- async def start_application():
98
  reader = EphemeralStreamReader(retention_hours=1, show_output=True)
99
 
100
- # Start the stream reader in the background
101
- asyncio.create_task(reader.start_reading())
102
-
103
- # Create and launch the Gradio interface
104
- demo = create_ui(reader)
105
- return demo
106
 
107
  if __name__ == "__main__":
108
- demo = asyncio.run(start_application())
109
- demo.launch(server_name="0.0.0.0", server_port=7860)
 
 
 
 
1
  import asyncio
2
  import aiohttp
3
+ import time
4
  from datetime import datetime
5
  from collections import deque
6
+ import json
7
+ from aiohttp import web
8
 
9
  class EphemeralStreamReader:
10
  def __init__(self, piping_server_url="https://ppng.io", path="test123", retention_hours=1, show_output=False):
 
57
  try:
58
  text = chunk.decode('utf-8')
59
  self.store_chunk(text, current_time)
60
+
61
+ if self.show_output:
62
+ timestamp = datetime.fromtimestamp(current_time).strftime('%Y-%m-%d %H:%M:%S')
63
+ print(f"[{timestamp}] Received: {text}", end='', flush=True)
64
  except UnicodeDecodeError:
65
  self.store_chunk(str(chunk), current_time)
66
+ if self.show_output:
67
+ print(f"Received raw bytes: {chunk}")
68
 
69
  except aiohttp.ClientError as e:
70
  if self.show_output:
 
73
  if self.show_output:
74
  print(f"Error: {e}")
75
 
76
+ if self.show_output:
77
+ print(f"Connection closed. Reconnecting in {self.reconnect_delay} seconds...")
78
  await asyncio.sleep(self.reconnect_delay)
79
 
80
+ async def start_http_server(self, host='0.0.0.0', port=7860):
81
+ async def handle_get(request):
82
+ data = self.get_stored_data()
83
+ html = f"""
84
+ <!DOCTYPE html>
85
+ <html>
86
+ <head>
87
+ <title>Ephemeral Stream Data</title>
88
+ <style>
89
+ body {{
90
+ font-family: Arial, sans-serif;
91
+ margin: 20px;
92
+ }}
93
+ .data-container {{
94
+ margin: 20px 0;
95
+ }}
96
+ .timestamp {{
97
+ color: #666;
98
+ font-size: 0.9em;
99
+ }}
100
+ .data-item {{
101
+ margin: 10px 0;
102
+ padding: 10px;
103
+ background: #f5f5f5;
104
+ border-radius: 4px;
105
+ }}
106
+ .refresh-btn {{
107
+ padding: 10px 20px;
108
+ background: #4CAF50;
109
+ color: white;
110
+ border: none;
111
+ border-radius: 4px;
112
+ cursor: pointer;
113
+ }}
114
+ .info {{
115
+ margin-bottom: 20px;
116
+ color: #666;
117
+ }}
118
+ </style>
119
+ <script>
120
+ function refreshData() {{
121
+ window.location.reload();
122
+ }}
123
+
124
+ // Auto refresh every 5 seconds
125
+ setInterval(refreshData, 5000);
126
+ </script>
127
+ </head>
128
+ <body>
129
+ <h1>Ephemeral Stream Data</h1>
130
+ <div class="info">
131
+ <p>Stream Path: {self.path}</p>
132
+ <p>Retention Time: {self.retention_hours} hours</p>
133
+ <p>To write to this stream: <code>seq inf | curl -T- {self.url}</code></p>
134
+ </div>
135
+ <button class="refresh-btn" onclick="refreshData()">Refresh Data</button>
136
+ <div class="data-container">
137
+ """
138
+
139
+ for item in reversed(data):
140
+ html += f"""
141
+ <div class="data-item">
142
+ <div class="timestamp">{item['formatted_time']}</div>
143
+ <pre>{item['data']}</pre>
144
+ </div>
145
+ """
146
+
147
+ html += """
148
+ </div>
149
+ </body>
150
+ </html>
151
+ """
152
+ return web.Response(text=html, content_type='text/html')
153
 
154
+ app = web.Application()
155
+ app.router.add_get('/', handle_get)
 
 
 
 
 
156
 
157
+ runner = web.AppRunner(app)
158
+ await runner.setup()
159
+ site = web.TCPSite(runner, host, port)
160
+ await site.start()
161
 
162
+ print(f"\nWeb interface available at http://{host}:{port}")
163
+ print(f"To write to this stream: seq inf | curl -T- {self.url}")
 
 
 
 
 
164
 
165
+ async def main():
166
  reader = EphemeralStreamReader(retention_hours=1, show_output=True)
167
 
168
+ await asyncio.gather(
169
+ reader.start_reading(),
170
+ reader.start_http_server(host='0.0.0.0', port=7860)
171
+ )
 
 
172
 
173
  if __name__ == "__main__":
174
+ print("Starting Ephemeral Stream Reader...")
175
+ try:
176
+ asyncio.run(main())
177
+ except KeyboardInterrupt:
178
+ print("\nStopping server...")