Spaces:
Runtime error
Runtime error
| import asyncio | |
| import random | |
| import time | |
| class Job: | |
| def __init__(self, id, data): | |
| self.id = id | |
| self.data = data | |
| async def node1(worker_id: int, input_queue, output_queue): | |
| while True: | |
| job:Job = await input_queue.get() | |
| job.data += f' (processed by node 1, worker {worker_id})' | |
| await output_queue.put(job) | |
| async def node2(worker_id: int, input_queue, output_queue): | |
| while True: | |
| job:Job = await input_queue.get() | |
| sleep_duration = 0.8 + 0.4 * random.random() # Generate a random sleep duration between 0.8 and 1.2 seconds | |
| await asyncio.sleep(sleep_duration) | |
| job.data += f' (processed by node 2, worker {worker_id})' | |
| await output_queue.put(job) | |
| async def node3(worker_id: int, input_queue, job_sync): | |
| buffer = {} | |
| next_i = 0 | |
| while True: | |
| job:Job = await input_queue.get() | |
| buffer[job.id] = job # Store the data in the buffer | |
| # While the next expected item is in the buffer, output it and increment the index | |
| while next_i in buffer: | |
| curr_job = buffer.pop(next_i) | |
| curr_job.data += f' (processed by node 3, worker {worker_id})' | |
| print(f'{curr_job.id} - {curr_job.data}') | |
| next_i += 1 | |
| job_sync.append(curr_job) | |
| async def main(): | |
| input_queue = asyncio.Queue() | |
| buffer_queue = asyncio.Queue() | |
| output_queue = asyncio.Queue() | |
| num_jobs = 100 | |
| joe_source = [Job(i, "") for i in range(num_jobs)] | |
| job_sync = [] | |
| task1 = asyncio.create_task(node1(None, input_queue, buffer_queue)) | |
| task3 = asyncio.create_task(node3(None, output_queue, job_sync)) | |
| num_workers = 5 | |
| tasks2 = [] | |
| for i in range(num_workers): | |
| task2 = asyncio.create_task(node2(i + 1, buffer_queue, output_queue)) | |
| tasks2.append(task2) | |
| for job in joe_source: | |
| await input_queue.put(job) | |
| try: | |
| # await asyncio.gather(task1, *tasks2, task3) | |
| while len(job_sync) < num_jobs: | |
| await asyncio.sleep(0.1) | |
| except asyncio.CancelledError: | |
| print("Pipeline cancelled") | |
| task1.cancel() | |
| for task in tasks2: | |
| task.cancel() | |
| task3.cancel() | |
| await asyncio.gather(task1, *tasks2, task3, return_exceptions=True) | |
| start_time = time.time() | |
| try: | |
| asyncio.run(main()) | |
| except KeyboardInterrupt: | |
| print("Pipeline interrupted by user") | |
| end_time = time.time() | |
| print(f"Pipeline processed in {end_time - start_time} seconds.") | |