Mbonea's picture
initial commit
9d4bd7c
# routes.py
from fastapi import APIRouter, Depends, HTTPException, Query
from typing import List, Optional
from datetime import date
from App.routers.bonds.models import Bond # Import Bond model
from .service import _calculate_bond_coupon_dates # Import our
from decimal import Decimal # Import Decimal for type hints if necessary
from tortoise.exceptions import DoesNotExist
from App.routers.utt.models import UTTFundData
from App.routers.users.utils import get_current_user
from .models import (
Portfolio,
PortfolioSnapshot,
PortfolioBond,
PortfolioCalendar,
PortfolioTransaction,
PortfolioStock,
PortfolioUTT,
)
from .schemas import (
PortfolioCreate,
PortfolioUpdate,
PortfolioBase,
PortfolioSummary,
StockHoldingCreate,
StockHoldingUpdate,
StockHoldingResponse,
UTTHoldingCreate,
UTTHoldingUpdate,
UTTHoldingResponse,
BondHoldingCreate,
BondHoldingUpdate,
BondHoldingResponse,
CalendarEventCreate,
CalendarEventResponse,
TransactionDetailResponse,
PortfolioListResponse,
PositionResponse,
StockSellSchema,
UTTSellSchema,
BondSellSchema,
)
from .service import PortfolioService
from App.schemas import ResponseModel, AppException
from tortoise.contrib.pydantic import pydantic_model_creator, pydantic_queryset_creator
from fastapi import BackgroundTasks
from .service import PortfolioService # Ensure service is imported
from App.routers.tasks.models import ImportTask
from tortoise.expressions import Q # For querying JSON fields
from datetime import date
from datetime import date, datetime, timedelta
from App.routers.stocks.models import Dividend, Stock, StockPriceData
from decimal import Decimal
from .schemas import CalendarEventResponse # Import our new schema
from .models import Portfolio, PortfolioStock, PortfolioBond
from App.routers.utt.models import UTTFund
from App.routers.bonds.models import Bond
Portfolio_Pydantic = pydantic_model_creator(Portfolio, name="Portfolio")
PortfolioStock_Pydantic = pydantic_model_creator(PortfolioStock, name="PortfolioStock")
PortfolioUTT_Pydantic = pydantic_model_creator(PortfolioUTT, name="PortfolioUTT")
PortfolioBond_Pydantic = pydantic_model_creator(PortfolioBond, name="PortfolioBond")
PortfolioTransaction_Pydantic = pydantic_model_creator(
PortfolioTransaction, name="PortfolioTransaction"
)
PortfolioCalendar_Pydantic = pydantic_model_creator(
PortfolioCalendar, name="PortfolioCalendar"
)
Portfolio_Pydantic_List = pydantic_queryset_creator(Portfolio, name="PortfolioList")
# Not strictly needed if manually converting list items, but good for consistency
# PortfolioStock_Pydantic_List = pydantic_queryset_creator(PortfolioStock, name="PortfolioStockList")
# PortfolioUTT_Pydantic_List = pydantic_queryset_creator(PortfolioUTT, name="PortfolioUTTList")
# PortfolioBond_Pydantic_List = pydantic_queryset_creator(PortfolioBond, name="PortfolioBondList")
# PortfolioTransaction_Pydantic_List = pydantic_queryset_creator(PortfolioTransaction, name="PortfolioTransactionList")
# PortfolioCalendar_Pydantic_List = pydantic_queryset_creator(PortfolioCalendar, name="PortfolioCalendarList")
PortfolioSnapshotPydantic = pydantic_model_creator(
PortfolioSnapshot, name="PortfolioSnapshotResponse"
) # Renamed for clarity
router = APIRouter(prefix="/portfolios", tags=["portfolios"])
# Portfolio Management Routes
@router.get("/", response_model=ResponseModel)
async def get_user_portfolios(
include_inactive: bool = Query(False), current_user=Depends(get_current_user)
):
try:
portfolios = await PortfolioService.get_user_portfolios(
user_id=current_user.id, include_inactive=include_inactive
)
return ResponseModel(
success=True,
message="Portfolios retrieved successfully",
data={
"portfolios": [
await Portfolio_Pydantic.from_tortoise_orm(p) for p in portfolios
],
"total_count": len(portfolios),
},
)
except Exception as e:
raise AppException(status_code=500, detail=str(e))
@router.post("/", response_model=ResponseModel)
async def create_portfolio(
portfolio_data: PortfolioCreate, current_user=Depends(get_current_user)
):
try:
portfolio = await PortfolioService.create_portfolio(
user_id=current_user.id,
name=portfolio_data.name,
description=portfolio_data.description,
)
portfolio_pydantic_data = await Portfolio_Pydantic.from_tortoise_orm(portfolio)
return ResponseModel(
success=True,
message="Portfolio created successfully",
data=portfolio_pydantic_data,
)
except Exception as e:
if "unique constraint" in str(e).lower() or "UNIQUE constraint failed" in str(
e
):
raise AppException(status_code=400, detail="Portfolio name already exists")
raise AppException(status_code=500, detail=str(e))
@router.get("/{portfolio_id}", response_model=ResponseModel)
async def get_portfolio_summary_route( # Renamed to avoid conflict with service method
portfolio_id: int, current_user=Depends(get_current_user)
):
try:
portfolio = await Portfolio.get_or_none(
id=portfolio_id, user_id=current_user.id
)
if not portfolio:
raise AppException(status_code=404, detail="Portfolio not found")
summary = await PortfolioService.get_portfolio_summary(portfolio_id)
return ResponseModel(
success=True,
message="Portfolio summary retrieved successfully",
data=summary,
)
except AppException:
raise
except Exception as e:
raise AppException(status_code=500, detail=str(e))
@router.put("/{portfolio_id}", response_model=ResponseModel)
async def update_portfolio(
portfolio_id: int,
portfolio_data: PortfolioUpdate,
current_user=Depends(get_current_user),
):
try:
portfolio = await Portfolio.get_or_none(
id=portfolio_id, user_id=current_user.id
)
if not portfolio:
raise AppException(status_code=404, detail="Portfolio not found")
update_data = portfolio_data.dict(exclude_unset=True)
if update_data:
await portfolio.update_from_dict(update_data).save()
portfolio_pydantic_data = await Portfolio_Pydantic.from_tortoise_orm(portfolio)
return ResponseModel(
success=True,
message="Portfolio updated successfully",
data=portfolio_pydantic_data,
)
except AppException:
raise
except Exception as e:
raise AppException(status_code=500, detail=str(e))
@router.delete("/{portfolio_id}", response_model=ResponseModel)
async def delete_portfolio(portfolio_id: int, current_user=Depends(get_current_user)):
try:
portfolio = await Portfolio.get_or_none(
id=portfolio_id, user_id=current_user.id
)
if not portfolio:
raise AppException(status_code=404, detail="Portfolio not found")
portfolio.is_active = False
await portfolio.save()
return ResponseModel(
success=True,
message="Portfolio deleted successfully (set to inactive)",
data=None,
)
except AppException:
raise
except Exception as e:
raise AppException(status_code=500, detail=str(e))
# Stock Holdings Routes
@router.post(
"/{portfolio_id}/stocks",
response_model=ResponseModel,
summary="Buy/Add Stock to Portfolio",
)
async def add_stock_to_portfolio_route( # Renamed
portfolio_id: int,
stock_data: StockHoldingCreate,
current_user=Depends(get_current_user),
):
try:
portfolio = await Portfolio.get_or_none(
id=portfolio_id, user_id=current_user.id, is_active=True
)
if not portfolio:
raise AppException(
status_code=404, detail="Active portfolio not found or access denied"
)
holding = await PortfolioService.add_stock_to_portfolio(
portfolio_id=portfolio_id,
stock_id=stock_data.stock_id,
quantity_to_add=stock_data.quantity,
purchase_price_of_lot=stock_data.purchase_price,
purchase_date=stock_data.purchase_date,
notes=stock_data.notes,
)
# Convert full holding with related stock to response model if needed, or use Pydantic ORM model
# For simplicity, using the Pydantic model from ORM.
# The PortfolioStock_Pydantic might not include stock_symbol, stock_name if not configured.
# Re-fetch for full response if needed or ensure PortfolioStock_Pydantic has nested details.
# For now, assume PortfolioStock_Pydantic is sufficient.
holding_pydantic_data = await PortfolioStock_Pydantic.from_tortoise_orm(holding)
return ResponseModel(
success=True,
message="Stock bought and added/updated in portfolio successfully",
data=holding_pydantic_data, # This will be the ORM model, not StockHoldingResponse
)
except AppException:
raise
except Exception as e:
raise AppException(status_code=500, detail=str(e))
@router.post(
"/{portfolio_id}/stocks/{stock_id}/sell",
response_model=ResponseModel,
summary="Sell Stock from Portfolio",
)
async def sell_stock_from_portfolio(
portfolio_id: int,
stock_id: int, # stock_id identifies the asset
sell_data: StockSellSchema,
current_user=Depends(get_current_user),
):
try:
portfolio = await Portfolio.get_or_none(
id=portfolio_id, user_id=current_user.id, is_active=True
)
if not portfolio:
raise AppException(
status_code=404, detail="Active portfolio not found or access denied"
)
transaction = await PortfolioService.sell_stock_holding(
portfolio_id=portfolio_id,
stock_id=stock_id, # Pass stock_id from path
quantity_to_sell=sell_data.quantity,
sell_price=sell_data.sell_price,
sell_date=sell_data.sell_date,
notes=sell_data.notes,
)
transaction_pydantic_data = (
await PortfolioTransaction_Pydantic.from_tortoise_orm(transaction)
)
return ResponseModel(
success=True,
message="Stock sold successfully",
data=transaction_pydantic_data,
)
except DoesNotExist as e:
raise AppException(status_code=404, detail=str(e))
except AppException:
raise
except Exception as e:
raise AppException(status_code=500, detail=str(e))
@router.put("/{portfolio_id}/stocks/{stock_id}", response_model=ResponseModel)
async def update_stock_holding(
portfolio_id: int,
stock_id: int, # Changed from holding_id to stock_id
stock_data: StockHoldingUpdate, # Be cautious with fields updated here for aggregated holdings
current_user=Depends(get_current_user),
):
try:
portfolio = await Portfolio.get_or_none(
id=portfolio_id, user_id=current_user.id
)
if not portfolio:
raise AppException(status_code=404, detail="Portfolio not found")
# Fetch aggregated holding by stock_id and portfolio_id
holding = await PortfolioStock.get_or_none(
stock_id=stock_id, portfolio_id=portfolio_id
)
if not holding:
raise AppException(
status_code=404,
detail="Stock holding for this stock not found in portfolio.",
)
update_data = stock_data.dict(exclude_unset=True)
# Warning: Updating quantity/purchase_price/purchase_date directly on aggregated holding
# might lead to inconsistencies if not handled with proper recalculation logic.
# This endpoint should primarily be for 'notes' or very specific adjustments.
if (
"quantity" in update_data
or "purchase_price" in update_data
or "purchase_date" in update_data
):
# Consider adding specific service methods for these adjustments if complex logic is needed.
pass # Allowing direct update for now.
if update_data:
await holding.update_from_dict(update_data).save()
holding_pydantic_data = await PortfolioStock_Pydantic.from_tortoise_orm(holding)
return ResponseModel(
success=True,
message="Stock holding updated successfully",
data=holding_pydantic_data,
)
except DoesNotExist as e: # Should be caught by the get_or_none checks
raise AppException(status_code=404, detail=str(e))
except AppException:
raise
except Exception as e:
raise AppException(status_code=500, detail=str(e))
@router.delete(
"/{portfolio_id}/stocks/{stock_id}",
response_model=ResponseModel,
summary="Delete Stock Holding",
)
async def remove_stock_from_portfolio(
portfolio_id: int,
stock_id: int, # Changed from holding_id to stock_id
current_user=Depends(get_current_user),
):
try:
portfolio = await Portfolio.get_or_none(
id=portfolio_id, user_id=current_user.id
)
if not portfolio:
raise AppException(status_code=404, detail="Portfolio not found")
success = await PortfolioService.remove_holding(
portfolio_id=portfolio_id,
asset_type_str="STOCK",
asset_id_value=stock_id, # Use stock_id as asset_id_value
)
if not success:
raise AppException(
status_code=404,
detail="Stock holding not found or could not be deleted",
)
return ResponseModel(
success=True,
message="Stock holding removed from portfolio successfully",
data=None,
)
except AppException:
raise
except Exception as e:
raise AppException(status_code=500, detail=str(e))
# UTT Holdings Routes
@router.post(
"/{portfolio_id}/utts",
response_model=ResponseModel,
summary="Buy/Add UTT to Portfolio",
)
async def add_utt_to_portfolio_route( # Renamed
portfolio_id: int,
utt_data: UTTHoldingCreate,
current_user=Depends(get_current_user),
):
try:
portfolio = await Portfolio.get_or_none(
id=portfolio_id, user_id=current_user.id, is_active=True
)
if not portfolio:
raise AppException(
status_code=404, detail="Active portfolio not found or access denied"
)
holding = await PortfolioService.add_utt_to_portfolio(
portfolio_id=portfolio_id,
utt_fund_id=utt_data.utt_fund_id,
units_to_add=utt_data.units_held,
purchase_price_of_lot=utt_data.purchase_price,
purchase_date=utt_data.purchase_date,
notes=utt_data.notes,
)
holding_pydantic_data = await PortfolioUTT_Pydantic.from_tortoise_orm(holding)
return ResponseModel(
success=True,
message="UTT fund bought and added/updated in portfolio successfully",
data=holding_pydantic_data,
)
except DoesNotExist as e:
raise AppException(status_code=404, detail=str(e))
except AppException:
raise
except Exception as e:
raise AppException(status_code=500, detail=str(e))
@router.post(
"/{portfolio_id}/utts/{utt_fund_id}/sell",
response_model=ResponseModel,
summary="Sell UTT from Portfolio",
)
async def sell_utt_from_portfolio(
portfolio_id: int,
utt_fund_id: int, # Changed from holding_id to utt_fund_id
sell_data: UTTSellSchema,
current_user=Depends(get_current_user),
):
try:
portfolio = await Portfolio.get_or_none(
id=portfolio_id, user_id=current_user.id, is_active=True
)
if not portfolio:
raise AppException(
status_code=404, detail="Active portfolio not found or access denied"
)
transaction = await PortfolioService.sell_utt_holding(
portfolio_id=portfolio_id,
utt_fund_id=utt_fund_id, # Use utt_fund_id from path
units_to_sell=sell_data.units_to_sell, # Ensure schema field name is correct
sell_price=sell_data.sell_price,
sell_date=sell_data.sell_date,
notes=sell_data.notes,
)
transaction_pydantic_data = (
await PortfolioTransaction_Pydantic.from_tortoise_orm(transaction)
)
return ResponseModel(
success=True,
message="UTT units sold successfully",
data=transaction_pydantic_data,
)
except DoesNotExist as e:
raise AppException(status_code=404, detail=str(e))
except AppException:
raise
except Exception as e:
raise AppException(status_code=500, detail=str(e))
@router.put("/{portfolio_id}/utts/{utt_fund_id}", response_model=ResponseModel)
async def update_utt_holding(
portfolio_id: int,
utt_fund_id: int, # Changed from holding_id to utt_fund_id
utt_data: UTTHoldingUpdate,
current_user=Depends(get_current_user),
):
try:
portfolio = await Portfolio.get_or_none(
id=portfolio_id, user_id=current_user.id
)
if not portfolio:
raise AppException(status_code=404, detail="Portfolio not found")
holding = await PortfolioUTT.get_or_none(
utt_fund_id=utt_fund_id, portfolio_id=portfolio_id
)
if not holding:
raise AppException(
status_code=404,
detail="UTT holding for this fund not found in portfolio.",
)
update_data = utt_data.dict(exclude_unset=True)
# Similar caution as with stock update for critical fields.
if (
"units_held" in update_data
or "purchase_price" in update_data
or "purchase_date" in update_data
):
pass # Allowing direct update
if update_data:
await holding.update_from_dict(update_data).save()
holding_pydantic_data = await PortfolioUTT_Pydantic.from_tortoise_orm(holding)
return ResponseModel(
success=True,
message="UTT holding updated successfully",
data=holding_pydantic_data,
)
except DoesNotExist as e:
raise AppException(status_code=404, detail=str(e))
except AppException:
raise
except Exception as e:
raise AppException(status_code=500, detail=str(e))
@router.delete(
"/{portfolio_id}/utts/{utt_fund_id}",
response_model=ResponseModel,
summary="Delete UTT Holding",
)
async def remove_utt_from_portfolio(
portfolio_id: int,
utt_fund_id: int, # Changed from holding_id to utt_fund_id
current_user=Depends(get_current_user),
):
try:
portfolio = await Portfolio.get_or_none(
id=portfolio_id, user_id=current_user.id
)
if not portfolio:
raise AppException(status_code=404, detail="Portfolio not found")
success = await PortfolioService.remove_holding(
portfolio_id=portfolio_id,
asset_type_str="UTT",
asset_id_value=utt_fund_id, # Use utt_fund_id
)
if not success:
raise AppException(
status_code=404, detail="UTT holding not found or could not be deleted"
)
return ResponseModel(
success=True,
message="UTT fund holding removed from portfolio successfully",
data=None,
)
except AppException:
raise
except Exception as e:
raise AppException(status_code=500, detail=str(e))
# Bond Holdings Routes
@router.post(
"/{portfolio_id}/bonds",
response_model=ResponseModel,
summary="Buy/Add Bond to Portfolio",
)
async def add_bond_to_portfolio_route( # Renamed
portfolio_id: int,
bond_data: BondHoldingCreate, # Assumes bond_data.purchase_price is TOTAL cost
current_user=Depends(get_current_user),
):
try:
portfolio = await Portfolio.get_or_none(
id=portfolio_id, user_id=current_user.id, is_active=True
)
if not portfolio:
raise AppException(
status_code=404, detail="Active portfolio not found or access denied"
)
_bond = await Bond.get_or_none(auction_number=bond_data.auction_number)
holding = await PortfolioService.add_bond_to_portfolio(
portfolio_id=portfolio_id,
bond_id=_bond.id,
face_value_to_add=bond_data.face_value_held,
total_purchase_price_of_lot=bond_data.purchase_price, # Assumed total cost from schema
purchase_date=bond_data.purchase_date,
notes=bond_data.notes,
)
holding_pydantic_data = await PortfolioBond_Pydantic.from_tortoise_orm(holding)
return ResponseModel(
success=True,
message="Bond bought and added/updated in portfolio successfully",
data=holding_pydantic_data,
)
except DoesNotExist as e:
raise AppException(status_code=404, detail=str(e))
except AppException:
raise
except Exception as e:
raise AppException(status_code=500, detail=str(e))
@router.post(
"/{portfolio_id}/bonds/{bond_id}/sell",
response_model=ResponseModel,
summary="Sell Bond from Portfolio",
)
async def sell_bond_from_portfolio(
portfolio_id: int,
bond_id: int, # Changed from holding_id to bond_id
sell_data: BondSellSchema, # Assumes sell_data.sell_price is TOTAL proceeds
current_user=Depends(get_current_user),
):
try:
portfolio = await Portfolio.get_or_none(
id=portfolio_id, user_id=current_user.id, is_active=True
)
if not portfolio:
raise AppException(
status_code=404, detail="Active portfolio not found or access denied"
)
transaction = await PortfolioService.sell_bond_holding(
portfolio_id=portfolio_id,
bond_id=bond_id, # Use bond_id from path
face_value_to_sell=sell_data.face_value_to_sell,
sell_price_total=sell_data.sell_price, # Assumed total proceeds from schema
sell_date=sell_data.sell_date,
notes=sell_data.notes,
)
transaction_pydantic_data = (
await PortfolioTransaction_Pydantic.from_tortoise_orm(transaction)
)
return ResponseModel(
success=True,
message="Bond portion sold successfully",
data=transaction_pydantic_data,
)
except DoesNotExist as e:
raise AppException(status_code=404, detail=str(e))
except AppException:
raise
except Exception as e:
raise AppException(status_code=500, detail=str(e))
@router.put("/{portfolio_id}/bonds/{bond_id}", response_model=ResponseModel)
async def update_bond_holding(
portfolio_id: int,
bond_id: int, # Changed from holding_id to bond_id
bond_data: BondHoldingUpdate,
current_user=Depends(get_current_user),
):
try:
portfolio = await Portfolio.get_or_none(
id=portfolio_id, user_id=current_user.id
)
if not portfolio:
raise AppException(status_code=404, detail="Portfolio not found")
holding = await PortfolioBond.get_or_none(
bond_id=bond_id, portfolio_id=portfolio_id
)
if not holding:
raise AppException(
status_code=404,
detail="Bond holding for this bond not found in portfolio.",
)
update_data = bond_data.dict(exclude_unset=True)
# Caution: Updating face_value_held or purchase_price (total cost) directly
# should be done carefully. If face_value_held changes, purchase_price (total)
# should ideally be adjusted proportionally to maintain average cost per unit of FV,
# unless it's a specific correction.
if (
"face_value_held" in update_data
and "purchase_price" not in update_data
and holding.face_value_held > 0
):
# If only face_value_held is changing, adjust purchase_price proportionally
# This is complex for a simple PUT, better handled by specific service method or by requiring both.
# For now, if only FV changes, the total cost is NOT proportionally adjusted here.
# User would need to provide new total purchase_price if FV changes and cost basis needs adjustment.
pass
elif "purchase_price" in update_data: # Allows direct update of total cost
pass
if update_data:
await holding.update_from_dict(update_data).save()
holding_pydantic_data = await PortfolioBond_Pydantic.from_tortoise_orm(holding)
return ResponseModel(
success=True,
message="Bond holding updated successfully",
data=holding_pydantic_data,
)
except DoesNotExist as e:
raise AppException(status_code=404, detail=str(e))
except AppException:
raise
except Exception as e:
raise AppException(status_code=500, detail=str(e))
@router.delete(
"/{portfolio_id}/bonds/{bond_id}",
response_model=ResponseModel,
summary="Delete Bond Holding",
)
async def remove_bond_from_portfolio(
portfolio_id: int,
bond_id: int, # Changed from holding_id to bond_id
current_user=Depends(get_current_user),
):
try:
portfolio = await Portfolio.get_or_none(
id=portfolio_id, user_id=current_user.id
)
if not portfolio:
raise AppException(status_code=404, detail="Portfolio not found")
success = await PortfolioService.remove_holding(
portfolio_id=portfolio_id,
asset_type_str="BOND",
asset_id_value=bond_id, # Use bond_id
)
if not success:
raise AppException(
status_code=404, detail="Bond holding not found or could not be deleted"
)
return ResponseModel(
success=True,
message="Bond holding removed from portfolio successfully",
data=None,
)
except AppException:
raise
except Exception as e:
raise AppException(status_code=500, detail=str(e))
# Calendar and Transaction Routes (No changes related to holding_id vs asset_id here)
@router.post("/{portfolio_id}/calendar", response_model=ResponseModel)
async def add_calendar_event(
portfolio_id: int,
event_data: CalendarEventCreate,
current_user=Depends(get_current_user),
):
try:
portfolio = await Portfolio.get_or_none(
id=portfolio_id, user_id=current_user.id
)
if not portfolio:
raise AppException(status_code=404, detail="Portfolio not found")
event = await PortfolioCalendar.create(
portfolio_id=portfolio_id, **event_data.dict()
)
event_pydantic_data = await PortfolioCalendar_Pydantic.from_tortoise_orm(event)
return ResponseModel(
success=True,
message="Calendar event added successfully",
data=event_pydantic_data,
)
except AppException:
raise
except Exception as e:
raise AppException(status_code=500, detail=str(e))
@router.get("/{portfolio_id}/transactions", response_model=ResponseModel)
async def get_portfolio_transactions(
portfolio_id: int,
limit: int = Query(50, ge=1, le=200),
offset: int = Query(0, ge=0),
current_user=Depends(get_current_user),
):
try:
# 1. VALIDATION AND INITIAL QUERY (Same as before)
portfolio = await Portfolio.get_or_none(
id=portfolio_id, user_id=current_user.id
)
if not portfolio:
raise AppException(status_code=404, detail="Portfolio not found")
transactions_query = (
PortfolioTransaction.filter(portfolio_id=portfolio_id)
.order_by("-transaction_date", "-created_at")
.offset(offset)
.limit(limit)
)
transactions_list = await transactions_query.all()
# --- ENRICHMENT LOGIC STARTS HERE ---
# 2. COLLECT UNIQUE ASSET IDs FROM THE TRANSACTION LIST
stock_ids = set()
utt_ids = set()
bond_ids = set()
for t in transactions_list:
if t.asset_type == "STOCK":
stock_ids.add(t.asset_id)
elif t.asset_type == "UTT":
utt_ids.add(t.asset_id)
elif t.asset_type == "BOND":
bond_ids.add(t.asset_id)
# 3. BULK FETCH ASSET DETAILS
stocks_map: Dict[int, Stock] = {
s.id: s for s in await Stock.filter(id__in=list(stock_ids))
}
utts_map: Dict[int, UTTFund] = {
u.id: u for u in await UTTFund.filter(id__in=list(utt_ids))
}
bonds_map: Dict[int, Bond] = {
b.id: b for b in await Bond.filter(id__in=list(bond_ids))
}
# 4. CONSTRUCT THE ENRICHED RESPONSE
enriched_transactions: List[TransactionDetailResponse] = []
for t in transactions_list:
asset_name = None
asset_symbol = None
if t.asset_type == "STOCK" and t.asset_id in stocks_map:
asset_name = stocks_map[t.asset_id].name
asset_symbol = stocks_map[t.asset_id].symbol
elif t.asset_type == "UTT" and t.asset_id in utts_map:
asset_name = utts_map[t.asset_id].name
asset_symbol = utts_map[t.asset_id].symbol
elif t.asset_type == "BOND" and t.asset_id in bonds_map:
bond = bonds_map[t.asset_id]
asset_name = f"{bond.maturity_years} Yr Treasury Bond"
asset_symbol = bond.isin
# Create the enriched Pydantic model
enriched_transaction = TransactionDetailResponse.model_validate(
{
**t.__dict__, # Unpack the transaction's own fields
"asset_name": asset_name,
"asset_symbol": asset_symbol,
}
)
enriched_transactions.append(enriched_transaction)
# --- ENRICHMENT LOGIC ENDS ---
total_count = await PortfolioTransaction.filter(
portfolio_id=portfolio_id
).count()
return ResponseModel(
success=True,
message="Transactions retrieved successfully",
data={
# Use the new enriched list
"transactions": [et.model_dump() for et in enriched_transactions],
"total_count": total_count,
"limit": limit,
"offset": offset,
},
)
except Exception as e:
raise AppException(status_code=500, detail=str(e))
# Performance and Analytics Routes
@router.get(
"/{portfolio_id}/positions",
response_model=ResponseModel,
summary="Get All Current Portfolio Positions",
)
async def get_portfolio_positions(
portfolio_id: int, current_user=Depends(get_current_user)
):
"""
Calculates and retrieves all current positions in a portfolio.
It processes all buy/sell transactions to determine average cost,
fetches the latest market price, and calculates current value and profit/loss.
"""
# 1. AUTHENTICATION & VALIDATION
portfolio = await Portfolio.get_or_none(id=portfolio_id, user_id=current_user.id)
if not portfolio:
raise AppException(status_code=404, detail="Portfolio not found")
# 2. FETCH AND AGGREGATE TRANSACTIONS
transactions = await PortfolioTransaction.filter(
portfolio_id=portfolio_id
).order_by("transaction_date")
# This dictionary will hold the aggregated data for each asset
# Key: (asset_type, asset_id), Value: {buy_qty, buy_cost, sell_qty}
aggregated_data: Dict[tuple, Dict] = {}
for t in transactions:
asset_key = (t.asset_type, t.asset_id)
if asset_key not in aggregated_data:
aggregated_data[asset_key] = {
"buy_qty": Decimal("0.0"),
"buy_cost": Decimal("0.0"),
"sell_qty": Decimal("0.0"),
}
if t.transaction_type == "BUY":
aggregated_data[asset_key]["buy_qty"] += t.quantity
aggregated_data[asset_key]["buy_cost"] += t.total_amount
elif t.transaction_type == "SELL":
aggregated_data[asset_key]["sell_qty"] += t.quantity
# 3. PROCESS AGGREGATES AND FETCH LIVE DATA
position_responses: List[PositionResponse] = []
for asset_key, data in aggregated_data.items():
asset_type, asset_id = asset_key
current_quantity = data["buy_qty"] - data["sell_qty"]
# If the asset has been completely sold, skip it.
if current_quantity <= 0:
continue
# Calculate cost basis for the currently held units
avg_buy_price = (
data["buy_cost"] / data["buy_qty"]
if data["buy_qty"] > 0
else Decimal("0.0")
)
total_invested = current_quantity * avg_buy_price
# Fetch current price and asset details based on type
current_price = Decimal("0.0")
asset_name = "Unknown"
asset_symbol = "N/A"
if asset_type == "STOCK":
stock = await Stock.get_or_none(id=asset_id)
if stock:
asset_name = stock.name
asset_symbol = stock.symbol
price_data = (
await StockPriceData.filter(stock_id=asset_id)
.order_by("-date")
.first()
)
if price_data:
current_price = price_data.closing_price
elif asset_type == "UTT":
utt = await UTTFund.get_or_none(id=asset_id)
if utt:
asset_name = utt.name
asset_symbol = utt.symbol
price_data = (
await UTTFundData.filter(fund_id=asset_id).order_by("-date").first()
)
if price_data:
current_price = Decimal(str(price_data.nav_per_unit))
elif asset_type == "BOND":
bond = await Bond.get_or_none(id=asset_id)
if bond:
asset_name = f"{bond.maturity_years} Yr Treasury Bond"
asset_symbol = bond.isin
# Bond valuation is complex. We'll use a simplified assumption that the
# "price" is 100 for valuation purposes against its face value.
# Here, we'll represent price_per_100.
current_price = (
Decimal(str(bond.price_per_100))
if bond.price_per_100
else Decimal("100.0")
)
# Calculate final metrics
current_value = current_quantity * current_price
profit_loss = current_value - total_invested
profit_loss_percent = (
(profit_loss / total_invested) * 100 if total_invested > 0 else 0.0
)
# Create the response object
position = PositionResponse(
asset_id=asset_id,
asset_type=asset_type.capitalize(), # "STOCK" -> "Stock"
asset_name=asset_name,
asset_symbol=asset_symbol,
quantity=current_quantity,
avg_buy_price=round(avg_buy_price, 4),
total_invested=round(total_invested, 2),
current_price=round(current_price, 4),
current_value=round(current_value, 2),
profit_loss=round(profit_loss, 2),
profit_loss_percent=round(float(profit_loss_percent), 2),
)
position_responses.append(position)
# 4. RETURN THE FINAL RESPONSE
return ResponseModel(
success=True,
message="Positions retrieved successfully.",
data={"positions": position_responses},
)
@router.post("/{portfolio_id}/snapshot", response_model=ResponseModel)
async def create_portfolio_snapshot_route( # Renamed
portfolio_id: int,
snapshot_date: Optional[date] = Query(
None, description="Date for the snapshot. Defaults to today if None."
),
current_user=Depends(get_current_user),
):
try:
portfolio = await Portfolio.get_or_none(
id=portfolio_id, user_id=current_user.id
)
if not portfolio:
raise AppException(status_code=404, detail="Portfolio not found")
snapshot_orm = await PortfolioService.create_portfolio_snapshot(
portfolio_id=portfolio_id, snapshot_date_input=snapshot_date
)
snapshot_pydantic_data = await PortfolioSnapshotPydantic.from_tortoise_orm(
snapshot_orm
)
return ResponseModel(
success=True,
message="Portfolio snapshot created successfully",
data=snapshot_pydantic_data,
)
except NotImplementedError as e: # Catch specific error from service
raise AppException(status_code=501, detail=str(e))
except DoesNotExist:
raise AppException(
status_code=404, detail="Portfolio not found when creating snapshot."
)
except AppException:
raise
except Exception as e:
raise AppException(
status_code=500, detail=f"Failed to create snapshot: {str(e)}"
)
@router.get(
"/{portfolio_id}/performance",
response_model=ResponseModel,
summary="Get Portfolio Performance Timeseries",
)
async def get_portfolio_performance(
portfolio_id: int,
background_tasks: BackgroundTasks,
period: str = Query(
"1M",
enum=["1D", "1W", "1M", "YTD", "1Y", "Max"],
description="The time period for the performance data.",
),
current_user=Depends(get_current_user),
):
"""
Retrieves time-series performance data for a portfolio.
If data is missing, it automatically queues a background task to generate
all historical data and informs the user to wait.
"""
try:
# 1. AUTHENTICATION
portfolio = await Portfolio.get_or_none(
id=portfolio_id, user_id=current_user.id
)
if not portfolio:
raise AppException(status_code=404, detail="Portfolio not found")
# 2. CONSOLIDATED TASK CHECK: Check if ANY relevant task is already running.
active_task = await ImportTask.filter(
Q(details__contains={"portfolio_id": portfolio_id}),
Q(task_type__in=["portfolio_regeneration", "portfolio_snapshot_history"]),
status__in=["pending", "running"],
).first()
if active_task:
return ResponseModel(
success=False,
message="Portfolio performance data is currently being prepared. Please check back in a few moments.",
data={"task_id": active_task.id, "status": active_task.status},
)
# 3. DEFINE TIME PERIOD & QUERY EXISTING DATA
end_date = date.today()
start_date = None
if period == "1D":
start_date = end_date - timedelta(days=1)
elif period == "1W":
start_date = end_date - timedelta(weeks=1)
elif period == "1M":
start_date = end_date - timedelta(days=30)
elif period == "YTD":
start_date = date(end_date.year, 1, 1)
elif period == "1Y":
start_date = end_date - timedelta(days=365)
if period == "Max":
start_date = end_date - timedelta(days=365 * 10) # A 10-year fallback
query = PortfolioSnapshot.filter(portfolio_id=portfolio_id)
if start_date:
start_datetime = datetime.combine(start_date, datetime.min.time())
query = query.filter(snapshot_date__gte=start_datetime)
snapshots = await query.order_by("snapshot_date").values(
"snapshot_date", "total_value"
)
#### delete snapshots ####
# 4. DECISION POINT: Serve data OR trigger generation.
# If we found no snapshots for the requested period, it's time to generate.
if not snapshots:
# Since we already checked for active tasks, we know it's safe to start a new one.
task = await ImportTask.create(
task_type="portfolio_snapshot_history",
status="pending",
details={
"portfolio_id": portfolio_id,
"reason": "First-time data request.",
},
)
# We call the task without a start_date, so it will find the earliest transaction.
background_tasks.add_task(
PortfolioService.regenerate_snapshots_task, task.id, portfolio_id
)
return ResponseModel(
success=False,
message="We're preparing your performance history for the first time. This may take a moment.",
data={"task_id": task.id, "status": "pending"},
)
# 5. SUCCESS PATH: This code is only reached if snapshots WERE found.
if len(snapshots) < 2:
current_value = snapshots[0]["total_value"]
return ResponseModel(
success=True,
message="Not enough historical data to calculate performance change.",
data={
"current_value": str(current_value),
"change_value": "0.00",
"change_percentage": 0.0,
"timeseries": [
{
"date": s["snapshot_date"].isoformat(),
"value": str(s["total_value"]),
}
for s in snapshots
],
},
)
first_value = snapshots[0]["total_value"]
last_value = snapshots[-1]["total_value"]
change_value = last_value - first_value
change_percentage = (
(change_value / first_value) * 100 if first_value > 0 else Decimal("0.0")
)
return ResponseModel(
success=True,
message=f"Performance data for period '{period}' retrieved successfully.",
data={
"current_value": str(last_value),
"change_value": str(change_value),
"change_percentage": round(float(change_percentage), 2),
"timeseries": [
{
"date": s["snapshot_date"].isoformat(),
"value": str(s["total_value"]),
}
for s in snapshots
],
},
)
except Exception as e:
raise AppException(status_code=500, detail=f"An unexpected error occurred: {e}")
@router.get(
"/{portfolio_id}/calendar",
response_model=ResponseModel,
summary="Get Upcoming Portfolio Calendar Events",
)
async def get_portfolio_calendar_events(
portfolio_id: int,
start_date: Optional[date] = Query(
None, description="Start of date range. Defaults to today."
),
end_date: Optional[date] = Query(
None, description="End of date range. Defaults to 90 days from now."
),
current_user=Depends(get_current_user),
):
"""
Generates a dynamic calendar of expected income events (dividends and coupons)
for a user's portfolio within a given date range.
"""
# 1. SETUP & AUTHENTICATION
if start_date is None:
start_date = date.today()
if end_date is None:
end_date = start_date + timedelta(days=90)
portfolio = await Portfolio.get_or_none(id=portfolio_id, user_id=current_user.id)
if not portfolio:
raise AppException(status_code=404, detail="Portfolio not found")
calendar_events: List[CalendarEventResponse] = []
print("Hello there buddy!!")
# 2. PROCESS STOCK DIVIDENDS
# Get all stocks currently held in the portfolio
portfolio_stocks = await PortfolioStock.filter(
portfolio_id=portfolio_id
).select_related("stock")
print("Hello there buddy!!")
if portfolio_stocks:
stock_ids = [ps.stock.id for ps in portfolio_stocks]
# Create a map for quick lookup of quantity held for each stock
stock_quantity_map = {ps.stock.id: ps.quantity for ps in portfolio_stocks}
# Find all declared dividends for those stocks within the date range
dividends = await Dividend.filter(
stock_id__in=stock_ids,
payment_date__gte=start_date,
payment_date__lte=end_date,
).select_related("stock")
print(dividends)
for div in dividends:
quantity_held = stock_quantity_map.get(div.stock.id, 0)
if quantity_held > 0:
event = CalendarEventResponse(
event_date=div.payment_date,
event_type="Dividend Payment",
asset_symbol=div.stock.symbol,
asset_name=div.stock.name,
estimated_amount=div.dividend_amount * quantity_held,
notes=f"Ex-dividend date: {div.ex_dividend_date.isoformat()}",
)
calendar_events.append(event)
# 3. PROCESS BOND COUPONS
# Get all bonds currently held in the portfolio
portfolio_bonds = await PortfolioBond.filter(
portfolio_id=portfolio_id
).select_related("bond")
if portfolio_bonds:
for pb in portfolio_bonds:
# Use our helper function to calculate coupon dates in the range
coupon_dates = _calculate_bond_coupon_dates(pb.bond, start_date, end_date)
for coupon_date in coupon_dates:
# Coupon amount is based on face value and semi-annual rate
estimated_amount = (
pb.face_value_held
* (Decimal(str(pb.bond.coupon_rate)) / Decimal("100"))
) / Decimal("2")
event = CalendarEventResponse(
event_date=coupon_date,
event_type="Bond Coupon",
asset_symbol=pb.bond.isin,
asset_name=f"{pb.bond.maturity_years} Yr T-Bond",
estimated_amount=estimated_amount,
notes=f"Matures on {pb.bond.maturity_date.isoformat()}",
)
calendar_events.append(event)
# 4. SORT AND RETURN
# Sort all collected events by date
sorted_events = sorted(calendar_events, key=lambda x: x.event_date)
return ResponseModel(
success=True,
message="Portfolio calendar events retrieved successfully.",
data={"events": sorted_events, "total_count": len(sorted_events)},
)