fine-tuning-service / src /task_manager.py
fashxp's picture
initial commit
7c4332a
raw
history blame
2.34 kB
import asyncio
import logging
from fastapi import BackgroundTasks, HTTPException
from concurrent.futures import ThreadPoolExecutor
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
class Worker:
def doing_work(self, task_manager):
task_manager.task_status["status"] = "Running"
for i in range(1, 101):
if task_manager.task_status["status"] == "Stopped":
break
asyncio.sleep(1) # Simulate a time-consuming task
task_manager.task_status["progress"] = i
logger.info('process ' + str(i) + '%' + ' done')
if task_manager.task_status["status"] != "Stopped":
task_manager.task_status["status"] = "Completed"
class TaskManager:
task_status = {"progress": 0, "status": "Not started"}
task = None
#def __init__(self):
worker = Worker()
async def doing_work(self):
loop = asyncio.get_running_loop()
with ThreadPoolExecutor() as pool:
await loop.run_in_executor(pool, self.worker.doing_work, self)
#self.worker.doing_work(self)
# self.task_status["status"] = "Running"
# for i in range(1, 101):
# if self.task_status["status"] == "Stopped":
# break
# await asyncio.sleep(1) # Simulate a time-consuming task
# self.task_status["progress"] = i
# logger.info('process ' + str(i) + '%' + ' done')
# if self.task_status["status"] != "Stopped":
# self.task_status["status"] = "Completed"
async def start_task(self):
if self.task is None or self.task.done():
self.task_status["progress"] = 0
self.task_status["status"] = "Not started"
self.task = asyncio.create_task(self.doing_work())
return {"message": "Task started"}
else:
raise HTTPException(status_code=409, detail="Task already running")
async def get_task_status(self):
return self.task_status
async def stop_task(self):
if self.task is not None and not self.task.done():
self.task_status["status"] = "Stopped"
self.task.cancel()
return {"message": "Task stopped"}
else:
raise HTTPException(status_code=409, detail="No task running")