da03 commited on
Commit
1173f9e
Β·
1 Parent(s): 92199b3
Files changed (2) hide show
  1. dispatcher.py +30 -7
  2. worker.py +58 -9
dispatcher.py CHANGED
@@ -866,12 +866,22 @@ async def get():
866
  @app.post("/register_worker")
867
  async def register_worker(worker_info: dict):
868
  """Endpoint for workers to register themselves"""
869
- await session_manager.register_worker(
870
- worker_info["worker_id"],
871
- worker_info["worker_address"],
872
- worker_info["endpoint"]
873
- )
874
- return {"status": "registered"}
 
 
 
 
 
 
 
 
 
 
875
 
876
  @app.post("/worker_ping")
877
  async def worker_ping(worker_info: dict):
@@ -1084,6 +1094,8 @@ async def periodic_worker_health_check():
1084
 
1085
  @app.on_event("startup")
1086
  async def startup_event():
 
 
1087
  # Start background tasks
1088
  asyncio.create_task(periodic_queue_update())
1089
  asyncio.create_task(periodic_system_validation())
@@ -1093,6 +1105,8 @@ async def startup_event():
1093
  # Log initial system status
1094
  analytics._write_log("πŸš€ System initialized and ready to accept connections")
1095
  analytics._write_log(" Waiting for GPU workers to register...")
 
 
1096
 
1097
  @app.on_event("shutdown")
1098
  async def shutdown_event():
@@ -1110,4 +1124,13 @@ if __name__ == "__main__":
1110
  parser.add_argument("--port", type=int, default=8000, help="Port to run the dispatcher on")
1111
  args = parser.parse_args()
1112
 
1113
- uvicorn.run(app, host="0.0.0.0", port=args.port)
 
 
 
 
 
 
 
 
 
 
866
  @app.post("/register_worker")
867
  async def register_worker(worker_info: dict):
868
  """Endpoint for workers to register themselves"""
869
+ logger.info(f"πŸ“₯ Received worker registration request")
870
+ logger.info(f"πŸ“Š Worker info: {worker_info}")
871
+
872
+ try:
873
+ await session_manager.register_worker(
874
+ worker_info["worker_id"],
875
+ worker_info["worker_address"],
876
+ worker_info["endpoint"]
877
+ )
878
+ logger.info(f"βœ… Successfully processed worker registration")
879
+ return {"status": "registered"}
880
+ except Exception as e:
881
+ logger.error(f"❌ Failed to register worker: {e}")
882
+ import traceback
883
+ logger.error(f"πŸ” Full traceback: {traceback.format_exc()}")
884
+ raise
885
 
886
  @app.post("/worker_ping")
887
  async def worker_ping(worker_info: dict):
 
1094
 
1095
  @app.on_event("startup")
1096
  async def startup_event():
1097
+ logger.info("πŸš€ Dispatcher startup event triggered")
1098
+
1099
  # Start background tasks
1100
  asyncio.create_task(periodic_queue_update())
1101
  asyncio.create_task(periodic_system_validation())
 
1105
  # Log initial system status
1106
  analytics._write_log("πŸš€ System initialized and ready to accept connections")
1107
  analytics._write_log(" Waiting for GPU workers to register...")
1108
+
1109
+ logger.info("βœ… Dispatcher startup complete - ready to accept worker registrations")
1110
 
1111
  @app.on_event("shutdown")
1112
  async def shutdown_event():
 
1124
  parser.add_argument("--port", type=int, default=8000, help="Port to run the dispatcher on")
1125
  args = parser.parse_args()
1126
 
1127
+ logger.info(f"🌐 Starting dispatcher on 0.0.0.0:{args.port}")
1128
+ logger.info(f"πŸ”— Dispatcher will be available at http://localhost:{args.port}")
1129
+
1130
+ try:
1131
+ uvicorn.run(app, host="0.0.0.0", port=args.port)
1132
+ except Exception as e:
1133
+ logger.error(f"❌ Failed to start dispatcher: {e}")
1134
+ import traceback
1135
+ logger.error(f"πŸ” Full traceback: {traceback.format_exc()}")
1136
+ raise
worker.py CHANGED
@@ -154,16 +154,47 @@ class GPUWorker:
154
 
155
  async def register_with_dispatcher(self):
156
  """Register this worker with the dispatcher"""
 
 
 
 
 
157
  try:
158
  async with aiohttp.ClientSession() as session:
159
- await session.post(f"{self.dispatcher_url}/register_worker", json={
160
- "worker_id": self.worker_id,
161
- "worker_address": self.worker_address,
162
- "endpoint": f"http://{self.worker_address}"
163
- })
164
- logger.info(f"Successfully registered worker {self.worker_id} ({self.worker_address}) with dispatcher")
165
  except Exception as e:
166
- logger.error(f"Failed to register with dispatcher: {e}")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
167
 
168
  async def ping_dispatcher(self):
169
  """Periodically ping the dispatcher to maintain connection"""
@@ -718,14 +749,21 @@ async def health_check():
718
 
719
  async def startup_worker(worker_address: str, dispatcher_url: str):
720
  """Initialize the worker"""
 
 
721
  global worker
722
  worker = GPUWorker(worker_address, dispatcher_url)
 
723
 
724
  # Register with dispatcher
 
725
  await worker.register_with_dispatcher()
 
726
 
727
  # Start ping task
 
728
  asyncio.create_task(worker.ping_dispatcher())
 
729
 
730
  if __name__ == "__main__":
731
  import uvicorn
@@ -751,7 +789,18 @@ if __name__ == "__main__":
751
 
752
  @app.on_event("startup")
753
  async def startup_event():
 
754
  await startup_worker(args.worker_address, args.dispatcher_url)
 
 
 
 
 
755
 
756
- logger.info(f"Starting worker {args.worker_address}")
757
- uvicorn.run(app, host="0.0.0.0", port=port)
 
 
 
 
 
 
154
 
155
  async def register_with_dispatcher(self):
156
  """Register this worker with the dispatcher"""
157
+ logger.info(f"πŸ”— Attempting to register with dispatcher at {self.dispatcher_url}")
158
+ logger.info(f"πŸ“Š Worker details: ID={self.worker_id}, Address={self.worker_address}")
159
+
160
+ # Test basic connectivity first
161
+ logger.info(f"πŸ§ͺ Testing basic connectivity to dispatcher...")
162
  try:
163
  async with aiohttp.ClientSession() as session:
164
+ async with session.get(f"{self.dispatcher_url}/") as response:
165
+ logger.info(f"🌐 Connectivity test successful - dispatcher responded with status {response.status}")
 
 
 
 
166
  except Exception as e:
167
+ logger.error(f"❌ Connectivity test FAILED: {e}")
168
+ logger.error(f"πŸ” This means the dispatcher is not reachable at {self.dispatcher_url}")
169
+ raise
170
+
171
+ try:
172
+ registration_data = {
173
+ "worker_id": self.worker_id,
174
+ "worker_address": self.worker_address,
175
+ "endpoint": f"http://{self.worker_address}"
176
+ }
177
+ logger.info(f"πŸ“€ Sending registration data: {registration_data}")
178
+
179
+ async with aiohttp.ClientSession() as session:
180
+ logger.info(f"🌐 Making POST request to {self.dispatcher_url}/register_worker")
181
+
182
+ async with session.post(f"{self.dispatcher_url}/register_worker", json=registration_data) as response:
183
+ logger.info(f"πŸ“₯ Dispatcher response status: {response.status}")
184
+ response_text = await response.text()
185
+ logger.info(f"πŸ“₯ Dispatcher response body: {response_text}")
186
+
187
+ if response.status == 200:
188
+ logger.info(f"βœ… Successfully registered worker {self.worker_id} ({self.worker_address}) with dispatcher")
189
+ else:
190
+ logger.error(f"❌ Dispatcher returned error status {response.status}: {response_text}")
191
+
192
+ except Exception as e:
193
+ logger.error(f"❌ Failed to register with dispatcher: {e}")
194
+ logger.error(f"πŸ” Exception type: {type(e)}")
195
+ logger.error(f"πŸ” Dispatcher URL: {self.dispatcher_url}")
196
+ import traceback
197
+ logger.error(f"πŸ” Full traceback: {traceback.format_exc()}")
198
 
199
  async def ping_dispatcher(self):
200
  """Periodically ping the dispatcher to maintain connection"""
 
749
 
750
  async def startup_worker(worker_address: str, dispatcher_url: str):
751
  """Initialize the worker"""
752
+ logger.info(f"πŸ”§ Initializing worker with address {worker_address}")
753
+
754
  global worker
755
  worker = GPUWorker(worker_address, dispatcher_url)
756
+ logger.info(f"πŸ—οΈ Worker object created: {worker.worker_id}")
757
 
758
  # Register with dispatcher
759
+ logger.info(f"πŸ“ž About to register with dispatcher")
760
  await worker.register_with_dispatcher()
761
+ logger.info(f"πŸ“ Registration attempt completed")
762
 
763
  # Start ping task
764
+ logger.info(f"πŸ’“ Starting ping task")
765
  asyncio.create_task(worker.ping_dispatcher())
766
+ logger.info(f"βœ… Worker initialization completed")
767
 
768
  if __name__ == "__main__":
769
  import uvicorn
 
789
 
790
  @app.on_event("startup")
791
  async def startup_event():
792
+ logger.info(f"πŸš€ Worker startup event triggered for {args.worker_address}")
793
  await startup_worker(args.worker_address, args.dispatcher_url)
794
+ logger.info(f"βœ… Worker startup complete for {args.worker_address}")
795
+
796
+ logger.info(f"🌐 Starting worker {args.worker_address} on 0.0.0.0:{port}")
797
+ logger.info(f"πŸ”— Worker will be available at http://{args.worker_address}")
798
+ logger.info(f"πŸ“‘ Will register with dispatcher at {args.dispatcher_url}")
799
 
800
+ try:
801
+ uvicorn.run(app, host="0.0.0.0", port=port)
802
+ except Exception as e:
803
+ logger.error(f"❌ Failed to start worker: {e}")
804
+ import traceback
805
+ logger.error(f"πŸ” Full traceback: {traceback.format_exc()}")
806
+ raise