XYHLF / App /routers /tasks /routes.py
Mbonea's picture
initial commit
9d4bd7c
from fastapi import APIRouter, HTTPException
from .models import ImportTask
from .schemas import ImportTaskResponse
from App.schemas import ResponseModel
from tortoise.contrib.pydantic import pydantic_queryset_creator, pydantic_model_creator
router = APIRouter(prefix="/tasks", tags=["Tasks"])
TaskData_Pydantic_List = pydantic_queryset_creator(
ImportTask,
)
TaskData_Pydantic = pydantic_model_creator(
ImportTask,
)
@router.get("/", response_model=ResponseModel)
async def list_tasks():
tasks = ImportTask.all().order_by("-created_at")
pydantic_tasks= await TaskData_Pydantic_List.from_queryset(tasks)
return ResponseModel(success=True, message="List of tasks", data=pydantic_tasks.model_dump())
@router.get("/{task_id}", response_model=ResponseModel)
async def get_task(task_id: int):
task = await ImportTask.get_or_none(id=task_id)
if not task:
raise HTTPException(status_code=404, detail="Task not found")
pydantic_task = await TaskData_Pydantic.from_tortoise_orm(task)
return ResponseModel(success=True, message="Task found", data=pydantic_task.model_dump())