|
from fastapi import APIRouter, BackgroundTasks, HTTPException |
|
from tortoise.contrib.pydantic.creator import pydantic_queryset_creator |
|
from tortoise.transactions import in_transaction |
|
from App.routers.bonds.models import Bond |
|
from App.routers.tasks.models import ImportTask |
|
from App.routers.bonds.schemas import BondCreate, BondResponse |
|
from App.routers.bonds.utils import BondDataScraper |
|
from App.schemas import ResponseModel |
|
from typing import List |
|
|
|
|
|
router = APIRouter(prefix="/bonds", tags=["Bonds"]) |
|
|
|
|
|
@router.post("/", response_model=ResponseModel) |
|
async def create_bond_entry(payload: BondCreate): |
|
|
|
existing_bond = None |
|
if payload.isin: |
|
existing_bond = await Bond.get_or_none(isin=payload.isin) |
|
|
|
if not existing_bond: |
|
existing_bond = await Bond.get_or_none( |
|
auction_number=payload.auction_number, |
|
auction_date=payload.auction_date, |
|
holding_number=payload.holding_number |
|
) |
|
|
|
if existing_bond: |
|
|
|
await Bond.filter(id=existing_bond.id).update(**payload.dict(exclude_unset=True)) |
|
bond = await Bond.get(id=existing_bond.id) |
|
message = "Bond updated successfully" |
|
else: |
|
|
|
bond = await Bond.create(**payload.dict()) |
|
message = "Bond created successfully" |
|
|
|
return ResponseModel(success=True, message=message, data=await BondResponse.from_tortoise_orm(bond)) |
|
|
|
@router.get("/", response_model=ResponseModel) |
|
async def list_bonds_entries(): |
|
|
|
_bonds = await Bond.all() |
|
print(_bonds) |
|
bonds= await Bond.get_list(_bonds) |
|
print(bonds) |
|
|
|
return ResponseModel(success=True, message="Bonds retrieved successfully", data={"bonds": bonds}) |
|
|
|
|
|
async def run_bond_import_task(task_id: int): |
|
await ImportTask.filter(id=task_id).update(status="running") |
|
scraper = BondDataScraper() |
|
created_count = 0 |
|
updated_count = 0 |
|
failed_count = 0 |
|
processed_isins = set() |
|
|
|
try: |
|
async for bond_data in scraper.scrape_all_bond_data(): |
|
if not bond_data: |
|
failed_count += 1 |
|
continue |
|
|
|
if bond_data.isin and bond_data.isin in processed_isins: |
|
print(f"Skipping duplicate ISIN in current scrape: {bond_data.isin}") |
|
continue |
|
|
|
async with in_transaction(): |
|
try: |
|
|
|
existing_bond = None |
|
if bond_data.isin: |
|
existing_bond = await Bond.get_or_none(isin=bond_data.isin) |
|
|
|
if not existing_bond: |
|
existing_bond = await Bond.get_or_none( |
|
auction_number=bond_data.auction_number, |
|
auction_date=bond_data.auction_date, |
|
|
|
holding_number=bond_data.holding_number |
|
) |
|
|
|
if existing_bond: |
|
await Bond.filter(id=existing_bond.id).update(**bond_data.dict(exclude_unset=True)) |
|
updated_count += 1 |
|
print(f"Updated bond: ISIN {bond_data.isin}, AuNo {bond_data.auction_number}") |
|
else: |
|
await Bond.create(**bond_data.dict()) |
|
created_count += 1 |
|
print(f"Created bond: ISIN {bond_data.isin}, AuNo {bond_data.auction_number}") |
|
|
|
if bond_data.isin: |
|
processed_isins.add(bond_data.isin) |
|
|
|
except Exception as db_e: |
|
failed_count += 1 |
|
print(f"Database error for bond au_no {bond_data.auction_number}: {db_e}") |
|
|
|
summary = { |
|
"created": created_count, |
|
"updated": updated_count, |
|
"failed_during_processing": failed_count, |
|
"message": "Bond import process finished." |
|
} |
|
await ImportTask.filter(id=task_id).update(status="completed", details=summary) |
|
print(f"Bond import task {task_id} completed. Summary: {summary}") |
|
|
|
except Exception as e: |
|
print(f"Fatal error in bond import task {task_id}: {e}") |
|
await ImportTask.filter(id=task_id).update(status="failed", details={"error": str(e)}) |
|
|
|
|
|
@router.post("/import-bonds", response_model=ResponseModel) |
|
async def trigger_bond_import(background_tasks: BackgroundTasks): |
|
task = await ImportTask.create(task_type="bond_import", status="pending") |
|
background_tasks.add_task(run_bond_import_task, task.id) |
|
return ResponseModel(success=True, message="Bond import task started.", data={"task_id": task.id}) |
|
|
|
@router.get("/import-status/{task_id}", response_model=ResponseModel) |
|
async def get_import_status(task_id: int): |
|
task = await ImportTask.get_or_none(id=task_id) |
|
if not task: |
|
raise HTTPException(status_code=404, detail="Import task not found") |
|
return ResponseModel(success=True, message="Task status retrieved", data=task) |