import asyncio import aiohttp import time from datetime import datetime from collections import deque, defaultdict import json import secrets from aiohttp import web class StreamManager: def __init__(self): self.streams = {} # Dictionary to store all stream instances self.tokens = {} # Dictionary to store stream tokens self.tasks = {} # Dictionary to store stream tasks async def create_stream(self, retention_seconds): # Generate unique stream ID and token stream_id = secrets.token_urlsafe(8) access_token = secrets.token_urlsafe(16) # Create new stream instance stream = EphemeralStreamReader( path=stream_id, retention_seconds=retention_seconds, show_output=True ) self.streams[stream_id] = stream self.tokens[stream_id] = access_token # Start the stream reader task self.tasks[stream_id] = asyncio.create_task(stream.start_reading()) return stream_id, access_token def get_stream(self, stream_id, token): if stream_id in self.streams and self.tokens.get(stream_id) == token: return self.streams[stream_id] return None async def cleanup_inactive_streams(self): while True: current_time = time.time() inactive_streams = [] for stream_id, stream in self.streams.items(): # Clean up streams that haven't been accessed in 1 hour if current_time - stream.last_access > 3600: inactive_streams.append(stream_id) for stream_id in inactive_streams: if stream_id in self.tasks: self.tasks[stream_id].cancel() del self.tasks[stream_id] del self.streams[stream_id] del self.tokens[stream_id] await asyncio.sleep(300) # Check every 5 minutes class EphemeralStreamReader: def __init__(self, piping_server_url="https://ppng.io", path="test123", retention_seconds=3600, show_output=False): self.url = f"{piping_server_url}/{path}" self.path = path self.reconnect_delay = 1 self.show_output = show_output self.retention_seconds = min(3600, max(0, retention_seconds)) # Limit between 0 and 3600 seconds self.stored_data = deque() self.last_cleanup = time.time() self.last_access = time.time() def cleanup_old_data(self): current_time = time.time() while self.stored_data and (current_time - self.stored_data[0]['timestamp']) > self.retention_seconds: self.stored_data.popleft() def get_stored_data(self): self.cleanup_old_data() self.last_access = time.time() return list(self.stored_data) def store_chunk(self, data, timestamp): self.stored_data.append({ 'timestamp': timestamp, 'data': data, 'formatted_time': datetime.fromtimestamp(timestamp).strftime('%Y-%m-%d %H:%M:%S') }) if time.time() - self.last_cleanup > 60: self.cleanup_old_data() self.last_cleanup = time.time() async def start_reading(self): while True: try: async with aiohttp.ClientSession() as session: if self.show_output: print(f"Connecting to {self.url}...") async with session.get(self.url) as response: if response.status == 200: if self.show_output: print("Connected! Reading stream...") while True: chunk = await response.content.read(1024) if not chunk: break current_time = time.time() try: text = chunk.decode('utf-8') self.store_chunk(text, current_time) except UnicodeDecodeError: self.store_chunk(str(chunk), current_time) except aiohttp.ClientError as e: if self.show_output: print(f"Connection error: {e}") except Exception as e: if self.show_output: print(f"Error: {e}") # Always wait a bit before reconnecting await asyncio.sleep(self.reconnect_delay) class WebServer: def __init__(self, stream_manager): self.stream_manager = stream_manager async def handle_create_stream(self, request): try: data = await request.post() retention_minutes = float(data.get('retention_minutes', '60')) retention_seconds = min(3600, max(0, retention_minutes * 60)) stream_id, token = await self.stream_manager.create_stream(retention_seconds) html = self.create_stream_page(stream_id, token) return web.Response(text=html, content_type='text/html') except Exception as e: return web.Response(text=f"Error creating stream: {str(e)}", status=400) async def handle_view_stream(self, request): stream_id = request.match_info.get('stream_id') token = request.query.get('token') stream = self.stream_manager.get_stream(stream_id, token) if not stream: return web.Response(text="Invalid stream ID or token", status=403) html = self.create_viewer_page(stream, token) return web.Response(text=html, content_type='text/html') async def handle_home(self, request): return web.Response(text=self.create_home_page(), content_type='text/html') def create_home_page(self): return """
Create your private stream with custom retention time. Each stream:
Stream ID: {stream_id}
Access Token: {token}
View URL: {view_url}
Use curl to write to your stream:
Stream ID: {stream.path}
Retention Time: {stream.retention_seconds / 60:.1f} minutes
{item['data']}