Spaces:
Running
Running
import asyncio | |
import aiohttp | |
import time | |
from datetime import datetime | |
from collections import deque, defaultdict | |
import json | |
import secrets | |
from aiohttp import web | |
class StreamManager: | |
def __init__(self): | |
self.streams = {} # Dictionary to store all stream instances | |
self.tokens = {} # Dictionary to store stream tokens | |
self.tasks = {} # Dictionary to store stream tasks | |
async def create_stream(self, retention_seconds): | |
# Generate unique stream ID and token | |
stream_id = secrets.token_urlsafe(8) | |
access_token = secrets.token_urlsafe(16) | |
# Create new stream instance | |
stream = EphemeralStreamReader( | |
path=stream_id, | |
retention_seconds=retention_seconds, | |
show_output=True | |
) | |
self.streams[stream_id] = stream | |
self.tokens[stream_id] = access_token | |
# Start the stream reader task | |
self.tasks[stream_id] = asyncio.create_task(stream.start_reading()) | |
return stream_id, access_token | |
def get_stream(self, stream_id, token): | |
if stream_id in self.streams and self.tokens.get(stream_id) == token: | |
return self.streams[stream_id] | |
return None | |
async def cleanup_inactive_streams(self): | |
while True: | |
current_time = time.time() | |
inactive_streams = [] | |
for stream_id, stream in self.streams.items(): | |
# Clean up streams that haven't been accessed in 1 hour | |
if current_time - stream.last_access > 3600: | |
inactive_streams.append(stream_id) | |
for stream_id in inactive_streams: | |
if stream_id in self.tasks: | |
self.tasks[stream_id].cancel() | |
del self.tasks[stream_id] | |
del self.streams[stream_id] | |
del self.tokens[stream_id] | |
await asyncio.sleep(300) # Check every 5 minutes | |
class EphemeralStreamReader: | |
def __init__(self, piping_server_url="https://ppng.io", path="test123", retention_seconds=3600, show_output=False): | |
self.url = f"{piping_server_url}/{path}" | |
self.path = path | |
self.reconnect_delay = 1 | |
self.show_output = show_output | |
self.retention_seconds = min(3600, max(0, retention_seconds)) # Limit between 0 and 3600 seconds | |
self.stored_data = deque() | |
self.last_cleanup = time.time() | |
self.last_access = time.time() | |
def cleanup_old_data(self): | |
current_time = time.time() | |
while self.stored_data and (current_time - self.stored_data[0]['timestamp']) > self.retention_seconds: | |
self.stored_data.popleft() | |
def get_stored_data(self): | |
self.cleanup_old_data() | |
self.last_access = time.time() | |
return list(self.stored_data) | |
def store_chunk(self, data, timestamp): | |
self.stored_data.append({ | |
'timestamp': timestamp, | |
'data': data, | |
'formatted_time': datetime.fromtimestamp(timestamp).strftime('%Y-%m-%d %H:%M:%S') | |
}) | |
if time.time() - self.last_cleanup > 60: | |
self.cleanup_old_data() | |
self.last_cleanup = time.time() | |
async def start_reading(self): | |
while True: | |
try: | |
async with aiohttp.ClientSession() as session: | |
if self.show_output: | |
print(f"Connecting to {self.url}...") | |
async with session.get(self.url) as response: | |
if response.status == 200: | |
if self.show_output: | |
print("Connected! Reading stream...") | |
while True: | |
chunk = await response.content.read(1024) | |
if not chunk: | |
break | |
current_time = time.time() | |
try: | |
text = chunk.decode('utf-8') | |
self.store_chunk(text, current_time) | |
except UnicodeDecodeError: | |
self.store_chunk(str(chunk), current_time) | |
except aiohttp.ClientError as e: | |
if self.show_output: | |
print(f"Connection error: {e}") | |
except Exception as e: | |
if self.show_output: | |
print(f"Error: {e}") | |
# Always wait a bit before reconnecting | |
await asyncio.sleep(self.reconnect_delay) | |
class WebServer: | |
def __init__(self, stream_manager): | |
self.stream_manager = stream_manager | |
async def handle_create_stream(self, request): | |
try: | |
data = await request.post() | |
retention_minutes = float(data.get('retention_minutes', '60')) | |
retention_seconds = min(3600, max(0, retention_minutes * 60)) | |
stream_id, token = await self.stream_manager.create_stream(retention_seconds) | |
html = self.create_stream_page(stream_id, token) | |
return web.Response(text=html, content_type='text/html') | |
except Exception as e: | |
return web.Response(text=f"Error creating stream: {str(e)}", status=400) | |
async def handle_view_stream(self, request): | |
stream_id = request.match_info.get('stream_id') | |
token = request.query.get('token') | |
stream = self.stream_manager.get_stream(stream_id, token) | |
if not stream: | |
return web.Response(text="Invalid stream ID or token", status=403) | |
html = self.create_viewer_page(stream, token) | |
return web.Response(text=html, content_type='text/html') | |
async def handle_home(self, request): | |
return web.Response(text=self.create_home_page(), content_type='text/html') | |
def create_home_page(self): | |
return """ | |
<!DOCTYPE html> | |
<html> | |
<head> | |
<title>Create Ephemeral Stream</title> | |
<style> | |
body { | |
font-family: Arial, sans-serif; | |
margin: 20px; | |
max-width: 800px; | |
margin: 0 auto; | |
padding: 20px; | |
} | |
.form-container { | |
margin: 20px 0; | |
padding: 20px; | |
border: 1px solid #ddd; | |
border-radius: 4px; | |
} | |
.input-group { | |
margin: 10px 0; | |
} | |
input, button { | |
padding: 8px; | |
margin: 5px 0; | |
} | |
button { | |
background: #4CAF50; | |
color: white; | |
border: none; | |
border-radius: 4px; | |
cursor: pointer; | |
} | |
.info { | |
margin: 20px 0; | |
padding: 15px; | |
background: #f5f5f5; | |
border-radius: 4px; | |
} | |
</style> | |
</head> | |
<body> | |
<h1>Create New Ephemeral Stream</h1> | |
<div class="form-container"> | |
<form method="POST" action="/create"> | |
<div class="input-group"> | |
<label for="retention_minutes">Retention Time (minutes, max 60):</label><br> | |
<input type="number" id="retention_minutes" name="retention_minutes" | |
min="0" max="60" value="60" step="0.5" required> | |
</div> | |
<button type="submit">Create Stream</button> | |
</form> | |
</div> | |
<div class="info"> | |
<h3>About Ephemeral Streams</h3> | |
<p>Create your private stream with custom retention time. Each stream:</p> | |
<ul> | |
<li>Has a unique ID and access token</li> | |
<li>Can only be viewed with the correct token</li> | |
<li>Automatically deletes data older than the specified retention time</li> | |
<li>Supports retention times from 0 to 60 minutes</li> | |
</ul> | |
</div> | |
</body> | |
</html> | |
""" | |
def create_stream_page(self, stream_id, token): | |
view_url = f"/view/{stream_id}?token={token}" | |
write_url = f"https://ppng.io/{stream_id}" | |
return f""" | |
<!DOCTYPE html> | |
<html> | |
<head> | |
<title>Stream Created</title> | |
<style> | |
body {{ | |
font-family: Arial, sans-serif; | |
margin: 20px; | |
max-width: 800px; | |
margin: 0 auto; | |
padding: 20px; | |
}} | |
.info-box {{ | |
background: #f5f5f5; | |
padding: 20px; | |
border-radius: 4px; | |
margin: 20px 0; | |
}} | |
.code-box {{ | |
background: #2b2b2b; | |
color: #ffffff; | |
padding: 15px; | |
border-radius: 4px; | |
font-family: monospace; | |
white-space: pre-wrap; | |
word-break: break-all; | |
}} | |
.button {{ | |
display: inline-block; | |
padding: 10px 20px; | |
background: #4CAF50; | |
color: white; | |
text-decoration: none; | |
border-radius: 4px; | |
margin: 10px 0; | |
}} | |
</style> | |
</head> | |
<body> | |
<h1>Stream Created Successfully</h1> | |
<div class="info-box"> | |
<h3>Your Stream Information</h3> | |
<p><strong>Stream ID:</strong> {stream_id}</p> | |
<p><strong>Access Token:</strong> {token}</p> | |
<p><strong>View URL:</strong> <a href="{view_url}">{view_url}</a></p> | |
<h3>How to Write to Your Stream</h3> | |
<p>Use curl to write to your stream:</p> | |
<div class="code-box"> | |
seq inf | curl -T- {write_url} | |
</div> | |
<h3>Important Notes</h3> | |
<ul> | |
<li>Keep your access token secret</li> | |
<li>Bookmark the view URL for easy access</li> | |
<li>Data older than your specified retention time will be automatically deleted</li> | |
</ul> | |
</div> | |
<a href="{view_url}" class="button">View Your Stream</a> | |
</body> | |
</html> | |
""" | |
def create_viewer_page(self, stream, token): | |
html = f""" | |
<!DOCTYPE html> | |
<html> | |
<head> | |
<title>Stream Viewer</title> | |
<style> | |
body {{ | |
font-family: Arial, sans-serif; | |
margin: 20px; | |
}} | |
.data-container {{ | |
margin: 20px 0; | |
}} | |
.timestamp {{ | |
color: #666; | |
font-size: 0.9em; | |
}} | |
.data-item {{ | |
margin: 10px 0; | |
padding: 10px; | |
background: #f5f5f5; | |
border-radius: 4px; | |
}} | |
.refresh-btn {{ | |
padding: 10px 20px; | |
background: #4CAF50; | |
color: white; | |
border: none; | |
border-radius: 4px; | |
cursor: pointer; | |
}} | |
.info {{ | |
margin-bottom: 20px; | |
color: #666; | |
}} | |
.code-box {{ | |
background: #2b2b2b; | |
color: #ffffff; | |
padding: 15px; | |
border-radius: 4px; | |
font-family: monospace; | |
margin: 10px 0; | |
white-space: pre-wrap; | |
word-break: break-all; | |
}} | |
</style> | |
<script> | |
function refreshData() {{ | |
window.location.reload(); | |
}} | |
// Auto refresh every 5 seconds | |
setInterval(refreshData, 5000); | |
</script> | |
</head> | |
<body> | |
<h1>Stream Viewer</h1> | |
<div class="info"> | |
<p>Stream ID: {stream.path}</p> | |
<p>Retention Time: {stream.retention_seconds / 60:.1f} minutes</p> | |
<div class="code-box">To write to this stream: seq inf | curl -T- {stream.url}</div> | |
</div> | |
<button class="refresh-btn" onclick="refreshData()">Refresh Data</button> | |
<div class="data-container"> | |
""" | |
data = stream.get_stored_data() | |
for item in reversed(data): | |
html += f""" | |
<div class="data-item"> | |
<div class="timestamp">{item['formatted_time']}</div> | |
<pre>{item['data']}</pre> | |
</div> | |
""" | |
html += """ | |
</div> | |
</body> | |
</html> | |
""" | |
return html | |
async def main(): | |
stream_manager = StreamManager() | |
web_server = WebServer(stream_manager) | |
app = web.Application() | |
app.router.add_get('/', web_server.handle_home) | |
app.router.add_post('/create', web_server.handle_create_stream) | |
app.router.add_get('/view/{stream_id}', web_server.handle_view_stream) | |
runner = web.AppRunner(app) | |
await runner.setup() | |
site = web.TCPSite(runner, '0.0.0.0', 7860) | |
await site.start() | |
print("\nWeb interface available at http://0.0.0.0:7860") | |
# Start the cleanup task | |
cleanup_task = asyncio.create_task(stream_manager.cleanup_inactive_streams()) | |
try: | |
await asyncio.Event().wait() # Keep the server running indefinitely | |
finally: | |
cleanup_task.cancel() | |
if __name__ == "__main__": | |
print("Starting Ephemeral Stream Server...") | |
try: | |
asyncio.run(main()) | |
except KeyboardInterrupt: | |
print("\nStopping server...") |