Simple-piping / app.py
Greg-House's picture
Update app.py
1d78341 verified
raw
history blame
4.17 kB
import asyncio
import aiohttp
from datetime import datetime
from collections import deque
import time
import gradio as gr
class EphemeralStreamReader:
def __init__(self, piping_server_url="https://ppng.io", path="test123", retention_hours=1, show_output=False):
self.url = f"{piping_server_url}/{path}"
self.path = path
self.reconnect_delay = 1
self.show_output = show_output
self.retention_hours = retention_hours
self.stored_data = deque()
self.last_cleanup = time.time()
def cleanup_old_data(self):
current_time = time.time()
retention_seconds = self.retention_hours * 3600
while self.stored_data and (current_time - self.stored_data[0]['timestamp']) > retention_seconds:
self.stored_data.popleft()
def get_stored_data(self):
self.cleanup_old_data()
return list(self.stored_data)
def store_chunk(self, data, timestamp):
self.stored_data.append({
'timestamp': timestamp,
'data': data,
'formatted_time': datetime.fromtimestamp(timestamp).strftime('%Y-%m-%d %H:%M:%S')
})
if time.time() - self.last_cleanup > 60:
self.cleanup_old_data()
self.last_cleanup = time.time()
async def start_reading(self):
while True:
try:
async with aiohttp.ClientSession() as session:
if self.show_output:
print(f"Connecting to {self.url}...")
async with session.get(self.url) as response:
if response.status == 200:
if self.show_output:
print("Connected! Reading stream...")
while True:
chunk = await response.content.read(1024)
if not chunk:
break
current_time = time.time()
try:
text = chunk.decode('utf-8')
self.store_chunk(text, current_time)
except UnicodeDecodeError:
self.store_chunk(str(chunk), current_time)
except aiohttp.ClientError as e:
if self.show_output:
print(f"Connection error: {e}")
except Exception as e:
if self.show_output:
print(f"Error: {e}")
await asyncio.sleep(self.reconnect_delay)
def format_stored_data(reader):
data = reader.get_stored_data()
formatted_text = ""
for item in reversed(data):
formatted_text += f"[{item['formatted_time']}]\n{item['data']}\n\n"
return formatted_text
def create_ui(reader):
with gr.Blocks() as demo:
gr.Markdown(f"# Ephemeral Stream Reader\nStream Path: {reader.path}\nRetention Time: {reader.retention_hours} hours")
gr.Markdown(f"To write to this stream: `seq inf | curl -T- {reader.url}`")
output_text = gr.TextArea(label="Stream Output", interactive=False)
refresh_btn = gr.Button("Refresh")
def update_output():
return format_stored_data(reader)
refresh_btn.click(fn=update_output, outputs=[output_text])
demo.load(fn=update_output, outputs=[output_text])
# Auto-refresh every 5 seconds
demo.load(fn=update_output, outputs=[output_text], every=5)
return demo
async def start_application():
reader = EphemeralStreamReader(retention_hours=1, show_output=True)
# Start the stream reader in the background
asyncio.create_task(reader.start_reading())
# Create and launch the Gradio interface
demo = create_ui(reader)
return demo
if __name__ == "__main__":
demo = asyncio.run(start_application())
demo.launch(server_name="0.0.0.0", server_port=7860)