Spaces:
Running
Running
import asyncio | |
import aiohttp | |
from datetime import datetime | |
from collections import deque | |
import time | |
import gradio as gr | |
class EphemeralStreamReader: | |
def __init__(self, piping_server_url="https://ppng.io", path="test123", retention_hours=1, show_output=False): | |
self.url = f"{piping_server_url}/{path}" | |
self.path = path | |
self.reconnect_delay = 1 | |
self.show_output = show_output | |
self.retention_hours = retention_hours | |
self.stored_data = deque() | |
self.last_cleanup = time.time() | |
def cleanup_old_data(self): | |
current_time = time.time() | |
retention_seconds = self.retention_hours * 3600 | |
while self.stored_data and (current_time - self.stored_data[0]['timestamp']) > retention_seconds: | |
self.stored_data.popleft() | |
def get_stored_data(self): | |
self.cleanup_old_data() | |
return list(self.stored_data) | |
def store_chunk(self, data, timestamp): | |
self.stored_data.append({ | |
'timestamp': timestamp, | |
'data': data, | |
'formatted_time': datetime.fromtimestamp(timestamp).strftime('%Y-%m-%d %H:%M:%S') | |
}) | |
if time.time() - self.last_cleanup > 60: | |
self.cleanup_old_data() | |
self.last_cleanup = time.time() | |
async def start_reading(self): | |
while True: | |
try: | |
async with aiohttp.ClientSession() as session: | |
if self.show_output: | |
print(f"Connecting to {self.url}...") | |
async with session.get(self.url) as response: | |
if response.status == 200: | |
if self.show_output: | |
print("Connected! Reading stream...") | |
while True: | |
chunk = await response.content.read(1024) | |
if not chunk: | |
break | |
current_time = time.time() | |
try: | |
text = chunk.decode('utf-8') | |
self.store_chunk(text, current_time) | |
except UnicodeDecodeError: | |
self.store_chunk(str(chunk), current_time) | |
except aiohttp.ClientError as e: | |
if self.show_output: | |
print(f"Connection error: {e}") | |
except Exception as e: | |
if self.show_output: | |
print(f"Error: {e}") | |
await asyncio.sleep(self.reconnect_delay) | |
def format_stored_data(reader): | |
data = reader.get_stored_data() | |
formatted_text = "" | |
for item in reversed(data): | |
formatted_text += f"[{item['formatted_time']}]\n{item['data']}\n\n" | |
return formatted_text | |
def create_ui(reader): | |
with gr.Blocks() as demo: | |
gr.Markdown(f"# Ephemeral Stream Reader\nStream Path: {reader.path}\nRetention Time: {reader.retention_hours} hours") | |
gr.Markdown(f"To write to this stream: `seq inf | curl -T- {reader.url}`") | |
output_text = gr.TextArea(label="Stream Output", interactive=False) | |
refresh_btn = gr.Button("Refresh") | |
def update_output(): | |
return format_stored_data(reader) | |
refresh_btn.click(fn=update_output, outputs=[output_text]) | |
demo.load(fn=update_output, outputs=[output_text]) | |
# Auto-refresh every 5 seconds | |
demo.load(fn=update_output, outputs=[output_text], every=5) | |
return demo | |
async def start_application(): | |
reader = EphemeralStreamReader(retention_hours=1, show_output=True) | |
# Start the stream reader in the background | |
asyncio.create_task(reader.start_reading()) | |
# Create and launch the Gradio interface | |
demo = create_ui(reader) | |
return demo | |
if __name__ == "__main__": | |
demo = asyncio.run(start_application()) | |
demo.launch(server_name="0.0.0.0", server_port=7860) |