XYHLF / App /routers /bonds /routes.py
Mbonea's picture
initial commit
9d4bd7c
from fastapi import APIRouter, BackgroundTasks, HTTPException
from tortoise.contrib.pydantic.creator import pydantic_queryset_creator
from tortoise.transactions import in_transaction
from App.routers.bonds.models import Bond # Adjust import path
from App.routers.tasks.models import ImportTask # Adjust import path
from App.routers.bonds.schemas import BondCreate, BondResponse # Adjust import path
from App.routers.bonds.utils import BondDataScraper # Adjust import path
from App.schemas import ResponseModel # Assuming you have this general response model
from typing import List
router = APIRouter(prefix="/bonds", tags=["Bonds"])
# --- CRUD for Bond (example, you might have these elsewhere) ---
@router.post("/", response_model=ResponseModel)
async def create_bond_entry(payload: BondCreate):
# Check for existing bond using ISIN or combination of auction_number, auction_date, holding_number
existing_bond = None
if payload.isin:
existing_bond = await Bond.get_or_none(isin=payload.isin)
if not existing_bond:
existing_bond = await Bond.get_or_none(
auction_number=payload.auction_number,
auction_date=payload.auction_date,
holding_number=payload.holding_number # or bond_auction_number if that's more unique
)
if existing_bond:
# Update existing bond
await Bond.filter(id=existing_bond.id).update(**payload.dict(exclude_unset=True))
bond = await Bond.get(id=existing_bond.id)
message = "Bond updated successfully"
else:
# Create new bond
bond = await Bond.create(**payload.dict())
message = "Bond created successfully"
return ResponseModel(success=True, message=message, data=await BondResponse.from_tortoise_orm(bond))
@router.get("/", response_model=ResponseModel)
async def list_bonds_entries():
_bonds = await Bond.all()
print(_bonds)
bonds= await Bond.get_list(_bonds)
print(bonds)
return ResponseModel(success=True, message="Bonds retrieved successfully", data={"bonds": bonds})
# --- Import Task ---
async def run_bond_import_task(task_id: int):
await ImportTask.filter(id=task_id).update(status="running")
scraper = BondDataScraper()
created_count = 0
updated_count = 0
failed_count = 0
processed_isins = set()
try:
async for bond_data in scraper.scrape_all_bond_data():
if not bond_data:
failed_count += 1
continue
if bond_data.isin and bond_data.isin in processed_isins:
print(f"Skipping duplicate ISIN in current scrape: {bond_data.isin}")
continue
async with in_transaction(): # Ensure atomicity for each bond
try:
# Use ISIN as primary unique key if available, otherwise fallback
existing_bond = None
if bond_data.isin:
existing_bond = await Bond.get_or_none(isin=bond_data.isin)
if not existing_bond: # Fallback check
existing_bond = await Bond.get_or_none(
auction_number=bond_data.auction_number,
auction_date=bond_data.auction_date,
# Add holding_number or bond_auction_number if they form part of a unique key
holding_number=bond_data.holding_number
)
if existing_bond:
await Bond.filter(id=existing_bond.id).update(**bond_data.dict(exclude_unset=True))
updated_count += 1
print(f"Updated bond: ISIN {bond_data.isin}, AuNo {bond_data.auction_number}")
else:
await Bond.create(**bond_data.dict())
created_count += 1
print(f"Created bond: ISIN {bond_data.isin}, AuNo {bond_data.auction_number}")
if bond_data.isin:
processed_isins.add(bond_data.isin)
except Exception as db_e:
failed_count += 1
print(f"Database error for bond au_no {bond_data.auction_number}: {db_e}")
summary = {
"created": created_count,
"updated": updated_count,
"failed_during_processing": failed_count,
"message": "Bond import process finished."
}
await ImportTask.filter(id=task_id).update(status="completed", details=summary)
print(f"Bond import task {task_id} completed. Summary: {summary}")
except Exception as e:
print(f"Fatal error in bond import task {task_id}: {e}")
await ImportTask.filter(id=task_id).update(status="failed", details={"error": str(e)})
@router.post("/import-bonds", response_model=ResponseModel)
async def trigger_bond_import(background_tasks: BackgroundTasks):
task = await ImportTask.create(task_type="bond_import", status="pending")
background_tasks.add_task(run_bond_import_task, task.id)
return ResponseModel(success=True, message="Bond import task started.", data={"task_id": task.id})
@router.get("/import-status/{task_id}", response_model=ResponseModel)
async def get_import_status(task_id: int):
task = await ImportTask.get_or_none(id=task_id)
if not task:
raise HTTPException(status_code=404, detail="Import task not found")
return ResponseModel(success=True, message="Task status retrieved", data=task)