Simple-piping / app.py
Greg-House's picture
Update app.py
3532d53 verified
import asyncio
import aiohttp
import time
from datetime import datetime
from collections import deque, defaultdict
import json
import secrets
from aiohttp import web
class StreamManager:
def __init__(self):
self.streams = {} # Dictionary to store all stream instances
self.tokens = {} # Dictionary to store stream tokens
self.tasks = {} # Dictionary to store stream tasks
async def create_stream(self, retention_seconds):
# Generate unique stream ID and token
stream_id = secrets.token_urlsafe(8)
access_token = secrets.token_urlsafe(16)
# Create new stream instance
stream = EphemeralStreamReader(
path=stream_id,
retention_seconds=retention_seconds,
show_output=True
)
self.streams[stream_id] = stream
self.tokens[stream_id] = access_token
# Start the stream reader task
self.tasks[stream_id] = asyncio.create_task(stream.start_reading())
return stream_id, access_token
def get_stream(self, stream_id, token):
if stream_id in self.streams and self.tokens.get(stream_id) == token:
return self.streams[stream_id]
return None
async def cleanup_inactive_streams(self):
while True:
current_time = time.time()
inactive_streams = []
for stream_id, stream in self.streams.items():
# Clean up streams that haven't been accessed in 1 hour
if current_time - stream.last_access > 3600:
inactive_streams.append(stream_id)
for stream_id in inactive_streams:
if stream_id in self.tasks:
self.tasks[stream_id].cancel()
del self.tasks[stream_id]
del self.streams[stream_id]
del self.tokens[stream_id]
await asyncio.sleep(300) # Check every 5 minutes
class EphemeralStreamReader:
def __init__(self, piping_server_url="https://ppng.io", path="test123", retention_seconds=3600, show_output=False):
self.url = f"{piping_server_url}/{path}"
self.path = path
self.reconnect_delay = 1
self.show_output = show_output
self.retention_seconds = min(3600, max(0, retention_seconds)) # Limit between 0 and 3600 seconds
self.stored_data = deque()
self.last_cleanup = time.time()
self.last_access = time.time()
def cleanup_old_data(self):
current_time = time.time()
while self.stored_data and (current_time - self.stored_data[0]['timestamp']) > self.retention_seconds:
self.stored_data.popleft()
def get_stored_data(self):
self.cleanup_old_data()
self.last_access = time.time()
return list(self.stored_data)
def store_chunk(self, data, timestamp):
self.stored_data.append({
'timestamp': timestamp,
'data': data,
'formatted_time': datetime.fromtimestamp(timestamp).strftime('%Y-%m-%d %H:%M:%S')
})
if time.time() - self.last_cleanup > 60:
self.cleanup_old_data()
self.last_cleanup = time.time()
async def start_reading(self):
while True:
try:
async with aiohttp.ClientSession() as session:
if self.show_output:
print(f"Connecting to {self.url}...")
async with session.get(self.url) as response:
if response.status == 200:
if self.show_output:
print("Connected! Reading stream...")
while True:
chunk = await response.content.read(1024)
if not chunk:
break
current_time = time.time()
try:
text = chunk.decode('utf-8')
self.store_chunk(text, current_time)
except UnicodeDecodeError:
self.store_chunk(str(chunk), current_time)
except aiohttp.ClientError as e:
if self.show_output:
print(f"Connection error: {e}")
except Exception as e:
if self.show_output:
print(f"Error: {e}")
# Always wait a bit before reconnecting
await asyncio.sleep(self.reconnect_delay)
class WebServer:
def __init__(self, stream_manager):
self.stream_manager = stream_manager
async def handle_create_stream(self, request):
try:
data = await request.post()
retention_minutes = float(data.get('retention_minutes', '60'))
retention_seconds = min(3600, max(0, retention_minutes * 60))
stream_id, token = await self.stream_manager.create_stream(retention_seconds)
html = self.create_stream_page(stream_id, token)
return web.Response(text=html, content_type='text/html')
except Exception as e:
return web.Response(text=f"Error creating stream: {str(e)}", status=400)
async def handle_view_stream(self, request):
stream_id = request.match_info.get('stream_id')
token = request.query.get('token')
stream = self.stream_manager.get_stream(stream_id, token)
if not stream:
return web.Response(text="Invalid stream ID or token", status=403)
html = self.create_viewer_page(stream, token)
return web.Response(text=html, content_type='text/html')
async def handle_home(self, request):
return web.Response(text=self.create_home_page(), content_type='text/html')
def create_home_page(self):
return """
<!DOCTYPE html>
<html>
<head>
<title>Create Ephemeral Stream</title>
<style>
body {
font-family: Arial, sans-serif;
margin: 20px;
max-width: 800px;
margin: 0 auto;
padding: 20px;
}
.form-container {
margin: 20px 0;
padding: 20px;
border: 1px solid #ddd;
border-radius: 4px;
}
.input-group {
margin: 10px 0;
}
input, button {
padding: 8px;
margin: 5px 0;
}
button {
background: #4CAF50;
color: white;
border: none;
border-radius: 4px;
cursor: pointer;
}
.info {
margin: 20px 0;
padding: 15px;
background: #f5f5f5;
border-radius: 4px;
}
</style>
</head>
<body>
<h1>Create New Ephemeral Stream</h1>
<div class="form-container">
<form method="POST" action="/create">
<div class="input-group">
<label for="retention_minutes">Retention Time (minutes, max 60):</label><br>
<input type="number" id="retention_minutes" name="retention_minutes"
min="0" max="60" value="60" step="0.5" required>
</div>
<button type="submit">Create Stream</button>
</form>
</div>
<div class="info">
<h3>About Ephemeral Streams</h3>
<p>Create your private stream with custom retention time. Each stream:</p>
<ul>
<li>Has a unique ID and access token</li>
<li>Can only be viewed with the correct token</li>
<li>Automatically deletes data older than the specified retention time</li>
<li>Supports retention times from 0 to 60 minutes</li>
</ul>
</div>
</body>
</html>
"""
def create_stream_page(self, stream_id, token):
view_url = f"/view/{stream_id}?token={token}"
write_url = f"https://ppng.io/{stream_id}"
return f"""
<!DOCTYPE html>
<html>
<head>
<title>Stream Created</title>
<style>
body {{
font-family: Arial, sans-serif;
margin: 20px;
max-width: 800px;
margin: 0 auto;
padding: 20px;
}}
.info-box {{
background: #f5f5f5;
padding: 20px;
border-radius: 4px;
margin: 20px 0;
}}
.code-box {{
background: #2b2b2b;
color: #ffffff;
padding: 15px;
border-radius: 4px;
font-family: monospace;
white-space: pre-wrap;
word-break: break-all;
}}
.button {{
display: inline-block;
padding: 10px 20px;
background: #4CAF50;
color: white;
text-decoration: none;
border-radius: 4px;
margin: 10px 0;
}}
</style>
</head>
<body>
<h1>Stream Created Successfully</h1>
<div class="info-box">
<h3>Your Stream Information</h3>
<p><strong>Stream ID:</strong> {stream_id}</p>
<p><strong>Access Token:</strong> {token}</p>
<p><strong>View URL:</strong> <a href="{view_url}">{view_url}</a></p>
<h3>How to Write to Your Stream</h3>
<p>Use curl to write to your stream:</p>
<div class="code-box">
seq inf | curl -T- {write_url}
</div>
<h3>Important Notes</h3>
<ul>
<li>Keep your access token secret</li>
<li>Bookmark the view URL for easy access</li>
<li>Data older than your specified retention time will be automatically deleted</li>
</ul>
</div>
<a href="{view_url}" class="button">View Your Stream</a>
</body>
</html>
"""
def create_viewer_page(self, stream, token):
html = f"""
<!DOCTYPE html>
<html>
<head>
<title>Stream Viewer</title>
<style>
body {{
font-family: Arial, sans-serif;
margin: 20px;
}}
.data-container {{
margin: 20px 0;
}}
.timestamp {{
color: #666;
font-size: 0.9em;
}}
.data-item {{
margin: 10px 0;
padding: 10px;
background: #f5f5f5;
border-radius: 4px;
}}
.refresh-btn {{
padding: 10px 20px;
background: #4CAF50;
color: white;
border: none;
border-radius: 4px;
cursor: pointer;
}}
.info {{
margin-bottom: 20px;
color: #666;
}}
.code-box {{
background: #2b2b2b;
color: #ffffff;
padding: 15px;
border-radius: 4px;
font-family: monospace;
margin: 10px 0;
white-space: pre-wrap;
word-break: break-all;
}}
</style>
<script>
function refreshData() {{
window.location.reload();
}}
// Auto refresh every 5 seconds
setInterval(refreshData, 5000);
</script>
</head>
<body>
<h1>Stream Viewer</h1>
<div class="info">
<p>Stream ID: {stream.path}</p>
<p>Retention Time: {stream.retention_seconds / 60:.1f} minutes</p>
<div class="code-box">To write to this stream: seq inf | curl -T- {stream.url}</div>
</div>
<button class="refresh-btn" onclick="refreshData()">Refresh Data</button>
<div class="data-container">
"""
data = stream.get_stored_data()
for item in reversed(data):
html += f"""
<div class="data-item">
<div class="timestamp">{item['formatted_time']}</div>
<pre>{item['data']}</pre>
</div>
"""
html += """
</div>
</body>
</html>
"""
return html
async def main():
stream_manager = StreamManager()
web_server = WebServer(stream_manager)
app = web.Application()
app.router.add_get('/', web_server.handle_home)
app.router.add_post('/create', web_server.handle_create_stream)
app.router.add_get('/view/{stream_id}', web_server.handle_view_stream)
runner = web.AppRunner(app)
await runner.setup()
site = web.TCPSite(runner, '0.0.0.0', 7860)
await site.start()
print("\nWeb interface available at http://0.0.0.0:7860")
# Start the cleanup task
cleanup_task = asyncio.create_task(stream_manager.cleanup_inactive_streams())
try:
await asyncio.Event().wait() # Keep the server running indefinitely
finally:
cleanup_task.cancel()
if __name__ == "__main__":
print("Starting Ephemeral Stream Server...")
try:
asyncio.run(main())
except KeyboardInterrupt:
print("\nStopping server...")