# routes.py from fastapi import APIRouter, Depends, HTTPException, Query from typing import List, Optional from datetime import date from App.routers.bonds.models import Bond # Import Bond model from .service import _calculate_bond_coupon_dates # Import our from decimal import Decimal # Import Decimal for type hints if necessary from tortoise.exceptions import DoesNotExist from App.routers.utt.models import UTTFundData from App.routers.users.utils import get_current_user from .models import ( Portfolio, PortfolioSnapshot, PortfolioBond, PortfolioCalendar, PortfolioTransaction, PortfolioStock, PortfolioUTT, ) from .schemas import ( PortfolioCreate, PortfolioUpdate, PortfolioBase, PortfolioSummary, StockHoldingCreate, StockHoldingUpdate, StockHoldingResponse, UTTHoldingCreate, UTTHoldingUpdate, UTTHoldingResponse, BondHoldingCreate, BondHoldingUpdate, BondHoldingResponse, CalendarEventCreate, CalendarEventResponse, TransactionDetailResponse, PortfolioListResponse, PositionResponse, StockSellSchema, UTTSellSchema, BondSellSchema, ) from .service import PortfolioService from App.schemas import ResponseModel, AppException from tortoise.contrib.pydantic import pydantic_model_creator, pydantic_queryset_creator from fastapi import BackgroundTasks from .service import PortfolioService # Ensure service is imported from App.routers.tasks.models import ImportTask from tortoise.expressions import Q # For querying JSON fields from datetime import date from datetime import date, datetime, timedelta from App.routers.stocks.models import Dividend, Stock, StockPriceData from decimal import Decimal from .schemas import CalendarEventResponse # Import our new schema from .models import Portfolio, PortfolioStock, PortfolioBond from App.routers.utt.models import UTTFund from App.routers.bonds.models import Bond Portfolio_Pydantic = pydantic_model_creator(Portfolio, name="Portfolio") PortfolioStock_Pydantic = pydantic_model_creator(PortfolioStock, name="PortfolioStock") PortfolioUTT_Pydantic = pydantic_model_creator(PortfolioUTT, name="PortfolioUTT") PortfolioBond_Pydantic = pydantic_model_creator(PortfolioBond, name="PortfolioBond") PortfolioTransaction_Pydantic = pydantic_model_creator( PortfolioTransaction, name="PortfolioTransaction" ) PortfolioCalendar_Pydantic = pydantic_model_creator( PortfolioCalendar, name="PortfolioCalendar" ) Portfolio_Pydantic_List = pydantic_queryset_creator(Portfolio, name="PortfolioList") # Not strictly needed if manually converting list items, but good for consistency # PortfolioStock_Pydantic_List = pydantic_queryset_creator(PortfolioStock, name="PortfolioStockList") # PortfolioUTT_Pydantic_List = pydantic_queryset_creator(PortfolioUTT, name="PortfolioUTTList") # PortfolioBond_Pydantic_List = pydantic_queryset_creator(PortfolioBond, name="PortfolioBondList") # PortfolioTransaction_Pydantic_List = pydantic_queryset_creator(PortfolioTransaction, name="PortfolioTransactionList") # PortfolioCalendar_Pydantic_List = pydantic_queryset_creator(PortfolioCalendar, name="PortfolioCalendarList") PortfolioSnapshotPydantic = pydantic_model_creator( PortfolioSnapshot, name="PortfolioSnapshotResponse" ) # Renamed for clarity router = APIRouter(prefix="/portfolios", tags=["portfolios"]) # Portfolio Management Routes @router.get("/", response_model=ResponseModel) async def get_user_portfolios( include_inactive: bool = Query(False), current_user=Depends(get_current_user) ): try: portfolios = await PortfolioService.get_user_portfolios( user_id=current_user.id, include_inactive=include_inactive ) return ResponseModel( success=True, message="Portfolios retrieved successfully", data={ "portfolios": [ await Portfolio_Pydantic.from_tortoise_orm(p) for p in portfolios ], "total_count": len(portfolios), }, ) except Exception as e: raise AppException(status_code=500, detail=str(e)) @router.post("/", response_model=ResponseModel) async def create_portfolio( portfolio_data: PortfolioCreate, current_user=Depends(get_current_user) ): try: portfolio = await PortfolioService.create_portfolio( user_id=current_user.id, name=portfolio_data.name, description=portfolio_data.description, ) portfolio_pydantic_data = await Portfolio_Pydantic.from_tortoise_orm(portfolio) return ResponseModel( success=True, message="Portfolio created successfully", data=portfolio_pydantic_data, ) except Exception as e: if "unique constraint" in str(e).lower() or "UNIQUE constraint failed" in str( e ): raise AppException(status_code=400, detail="Portfolio name already exists") raise AppException(status_code=500, detail=str(e)) @router.get("/{portfolio_id}", response_model=ResponseModel) async def get_portfolio_summary_route( # Renamed to avoid conflict with service method portfolio_id: int, current_user=Depends(get_current_user) ): try: portfolio = await Portfolio.get_or_none( id=portfolio_id, user_id=current_user.id ) if not portfolio: raise AppException(status_code=404, detail="Portfolio not found") summary = await PortfolioService.get_portfolio_summary(portfolio_id) return ResponseModel( success=True, message="Portfolio summary retrieved successfully", data=summary, ) except AppException: raise except Exception as e: raise AppException(status_code=500, detail=str(e)) @router.put("/{portfolio_id}", response_model=ResponseModel) async def update_portfolio( portfolio_id: int, portfolio_data: PortfolioUpdate, current_user=Depends(get_current_user), ): try: portfolio = await Portfolio.get_or_none( id=portfolio_id, user_id=current_user.id ) if not portfolio: raise AppException(status_code=404, detail="Portfolio not found") update_data = portfolio_data.dict(exclude_unset=True) if update_data: await portfolio.update_from_dict(update_data).save() portfolio_pydantic_data = await Portfolio_Pydantic.from_tortoise_orm(portfolio) return ResponseModel( success=True, message="Portfolio updated successfully", data=portfolio_pydantic_data, ) except AppException: raise except Exception as e: raise AppException(status_code=500, detail=str(e)) @router.delete("/{portfolio_id}", response_model=ResponseModel) async def delete_portfolio(portfolio_id: int, current_user=Depends(get_current_user)): try: portfolio = await Portfolio.get_or_none( id=portfolio_id, user_id=current_user.id ) if not portfolio: raise AppException(status_code=404, detail="Portfolio not found") portfolio.is_active = False await portfolio.save() return ResponseModel( success=True, message="Portfolio deleted successfully (set to inactive)", data=None, ) except AppException: raise except Exception as e: raise AppException(status_code=500, detail=str(e)) # Stock Holdings Routes @router.post( "/{portfolio_id}/stocks", response_model=ResponseModel, summary="Buy/Add Stock to Portfolio", ) async def add_stock_to_portfolio_route( # Renamed portfolio_id: int, stock_data: StockHoldingCreate, current_user=Depends(get_current_user), ): try: portfolio = await Portfolio.get_or_none( id=portfolio_id, user_id=current_user.id, is_active=True ) if not portfolio: raise AppException( status_code=404, detail="Active portfolio not found or access denied" ) holding = await PortfolioService.add_stock_to_portfolio( portfolio_id=portfolio_id, stock_id=stock_data.stock_id, quantity_to_add=stock_data.quantity, purchase_price_of_lot=stock_data.purchase_price, purchase_date=stock_data.purchase_date, notes=stock_data.notes, ) # Convert full holding with related stock to response model if needed, or use Pydantic ORM model # For simplicity, using the Pydantic model from ORM. # The PortfolioStock_Pydantic might not include stock_symbol, stock_name if not configured. # Re-fetch for full response if needed or ensure PortfolioStock_Pydantic has nested details. # For now, assume PortfolioStock_Pydantic is sufficient. holding_pydantic_data = await PortfolioStock_Pydantic.from_tortoise_orm(holding) return ResponseModel( success=True, message="Stock bought and added/updated in portfolio successfully", data=holding_pydantic_data, # This will be the ORM model, not StockHoldingResponse ) except AppException: raise except Exception as e: raise AppException(status_code=500, detail=str(e)) @router.post( "/{portfolio_id}/stocks/{stock_id}/sell", response_model=ResponseModel, summary="Sell Stock from Portfolio", ) async def sell_stock_from_portfolio( portfolio_id: int, stock_id: int, # stock_id identifies the asset sell_data: StockSellSchema, current_user=Depends(get_current_user), ): try: portfolio = await Portfolio.get_or_none( id=portfolio_id, user_id=current_user.id, is_active=True ) if not portfolio: raise AppException( status_code=404, detail="Active portfolio not found or access denied" ) transaction = await PortfolioService.sell_stock_holding( portfolio_id=portfolio_id, stock_id=stock_id, # Pass stock_id from path quantity_to_sell=sell_data.quantity, sell_price=sell_data.sell_price, sell_date=sell_data.sell_date, notes=sell_data.notes, ) transaction_pydantic_data = ( await PortfolioTransaction_Pydantic.from_tortoise_orm(transaction) ) return ResponseModel( success=True, message="Stock sold successfully", data=transaction_pydantic_data, ) except DoesNotExist as e: raise AppException(status_code=404, detail=str(e)) except AppException: raise except Exception as e: raise AppException(status_code=500, detail=str(e)) @router.put("/{portfolio_id}/stocks/{stock_id}", response_model=ResponseModel) async def update_stock_holding( portfolio_id: int, stock_id: int, # Changed from holding_id to stock_id stock_data: StockHoldingUpdate, # Be cautious with fields updated here for aggregated holdings current_user=Depends(get_current_user), ): try: portfolio = await Portfolio.get_or_none( id=portfolio_id, user_id=current_user.id ) if not portfolio: raise AppException(status_code=404, detail="Portfolio not found") # Fetch aggregated holding by stock_id and portfolio_id holding = await PortfolioStock.get_or_none( stock_id=stock_id, portfolio_id=portfolio_id ) if not holding: raise AppException( status_code=404, detail="Stock holding for this stock not found in portfolio.", ) update_data = stock_data.dict(exclude_unset=True) # Warning: Updating quantity/purchase_price/purchase_date directly on aggregated holding # might lead to inconsistencies if not handled with proper recalculation logic. # This endpoint should primarily be for 'notes' or very specific adjustments. if ( "quantity" in update_data or "purchase_price" in update_data or "purchase_date" in update_data ): # Consider adding specific service methods for these adjustments if complex logic is needed. pass # Allowing direct update for now. if update_data: await holding.update_from_dict(update_data).save() holding_pydantic_data = await PortfolioStock_Pydantic.from_tortoise_orm(holding) return ResponseModel( success=True, message="Stock holding updated successfully", data=holding_pydantic_data, ) except DoesNotExist as e: # Should be caught by the get_or_none checks raise AppException(status_code=404, detail=str(e)) except AppException: raise except Exception as e: raise AppException(status_code=500, detail=str(e)) @router.delete( "/{portfolio_id}/stocks/{stock_id}", response_model=ResponseModel, summary="Delete Stock Holding", ) async def remove_stock_from_portfolio( portfolio_id: int, stock_id: int, # Changed from holding_id to stock_id current_user=Depends(get_current_user), ): try: portfolio = await Portfolio.get_or_none( id=portfolio_id, user_id=current_user.id ) if not portfolio: raise AppException(status_code=404, detail="Portfolio not found") success = await PortfolioService.remove_holding( portfolio_id=portfolio_id, asset_type_str="STOCK", asset_id_value=stock_id, # Use stock_id as asset_id_value ) if not success: raise AppException( status_code=404, detail="Stock holding not found or could not be deleted", ) return ResponseModel( success=True, message="Stock holding removed from portfolio successfully", data=None, ) except AppException: raise except Exception as e: raise AppException(status_code=500, detail=str(e)) # UTT Holdings Routes @router.post( "/{portfolio_id}/utts", response_model=ResponseModel, summary="Buy/Add UTT to Portfolio", ) async def add_utt_to_portfolio_route( # Renamed portfolio_id: int, utt_data: UTTHoldingCreate, current_user=Depends(get_current_user), ): try: portfolio = await Portfolio.get_or_none( id=portfolio_id, user_id=current_user.id, is_active=True ) if not portfolio: raise AppException( status_code=404, detail="Active portfolio not found or access denied" ) holding = await PortfolioService.add_utt_to_portfolio( portfolio_id=portfolio_id, utt_fund_id=utt_data.utt_fund_id, units_to_add=utt_data.units_held, purchase_price_of_lot=utt_data.purchase_price, purchase_date=utt_data.purchase_date, notes=utt_data.notes, ) holding_pydantic_data = await PortfolioUTT_Pydantic.from_tortoise_orm(holding) return ResponseModel( success=True, message="UTT fund bought and added/updated in portfolio successfully", data=holding_pydantic_data, ) except DoesNotExist as e: raise AppException(status_code=404, detail=str(e)) except AppException: raise except Exception as e: raise AppException(status_code=500, detail=str(e)) @router.post( "/{portfolio_id}/utts/{utt_fund_id}/sell", response_model=ResponseModel, summary="Sell UTT from Portfolio", ) async def sell_utt_from_portfolio( portfolio_id: int, utt_fund_id: int, # Changed from holding_id to utt_fund_id sell_data: UTTSellSchema, current_user=Depends(get_current_user), ): try: portfolio = await Portfolio.get_or_none( id=portfolio_id, user_id=current_user.id, is_active=True ) if not portfolio: raise AppException( status_code=404, detail="Active portfolio not found or access denied" ) transaction = await PortfolioService.sell_utt_holding( portfolio_id=portfolio_id, utt_fund_id=utt_fund_id, # Use utt_fund_id from path units_to_sell=sell_data.units_to_sell, # Ensure schema field name is correct sell_price=sell_data.sell_price, sell_date=sell_data.sell_date, notes=sell_data.notes, ) transaction_pydantic_data = ( await PortfolioTransaction_Pydantic.from_tortoise_orm(transaction) ) return ResponseModel( success=True, message="UTT units sold successfully", data=transaction_pydantic_data, ) except DoesNotExist as e: raise AppException(status_code=404, detail=str(e)) except AppException: raise except Exception as e: raise AppException(status_code=500, detail=str(e)) @router.put("/{portfolio_id}/utts/{utt_fund_id}", response_model=ResponseModel) async def update_utt_holding( portfolio_id: int, utt_fund_id: int, # Changed from holding_id to utt_fund_id utt_data: UTTHoldingUpdate, current_user=Depends(get_current_user), ): try: portfolio = await Portfolio.get_or_none( id=portfolio_id, user_id=current_user.id ) if not portfolio: raise AppException(status_code=404, detail="Portfolio not found") holding = await PortfolioUTT.get_or_none( utt_fund_id=utt_fund_id, portfolio_id=portfolio_id ) if not holding: raise AppException( status_code=404, detail="UTT holding for this fund not found in portfolio.", ) update_data = utt_data.dict(exclude_unset=True) # Similar caution as with stock update for critical fields. if ( "units_held" in update_data or "purchase_price" in update_data or "purchase_date" in update_data ): pass # Allowing direct update if update_data: await holding.update_from_dict(update_data).save() holding_pydantic_data = await PortfolioUTT_Pydantic.from_tortoise_orm(holding) return ResponseModel( success=True, message="UTT holding updated successfully", data=holding_pydantic_data, ) except DoesNotExist as e: raise AppException(status_code=404, detail=str(e)) except AppException: raise except Exception as e: raise AppException(status_code=500, detail=str(e)) @router.delete( "/{portfolio_id}/utts/{utt_fund_id}", response_model=ResponseModel, summary="Delete UTT Holding", ) async def remove_utt_from_portfolio( portfolio_id: int, utt_fund_id: int, # Changed from holding_id to utt_fund_id current_user=Depends(get_current_user), ): try: portfolio = await Portfolio.get_or_none( id=portfolio_id, user_id=current_user.id ) if not portfolio: raise AppException(status_code=404, detail="Portfolio not found") success = await PortfolioService.remove_holding( portfolio_id=portfolio_id, asset_type_str="UTT", asset_id_value=utt_fund_id, # Use utt_fund_id ) if not success: raise AppException( status_code=404, detail="UTT holding not found or could not be deleted" ) return ResponseModel( success=True, message="UTT fund holding removed from portfolio successfully", data=None, ) except AppException: raise except Exception as e: raise AppException(status_code=500, detail=str(e)) # Bond Holdings Routes @router.post( "/{portfolio_id}/bonds", response_model=ResponseModel, summary="Buy/Add Bond to Portfolio", ) async def add_bond_to_portfolio_route( # Renamed portfolio_id: int, bond_data: BondHoldingCreate, # Assumes bond_data.purchase_price is TOTAL cost current_user=Depends(get_current_user), ): try: portfolio = await Portfolio.get_or_none( id=portfolio_id, user_id=current_user.id, is_active=True ) if not portfolio: raise AppException( status_code=404, detail="Active portfolio not found or access denied" ) _bond = await Bond.get_or_none(auction_number=bond_data.auction_number) holding = await PortfolioService.add_bond_to_portfolio( portfolio_id=portfolio_id, bond_id=_bond.id, face_value_to_add=bond_data.face_value_held, total_purchase_price_of_lot=bond_data.purchase_price, # Assumed total cost from schema purchase_date=bond_data.purchase_date, notes=bond_data.notes, ) holding_pydantic_data = await PortfolioBond_Pydantic.from_tortoise_orm(holding) return ResponseModel( success=True, message="Bond bought and added/updated in portfolio successfully", data=holding_pydantic_data, ) except DoesNotExist as e: raise AppException(status_code=404, detail=str(e)) except AppException: raise except Exception as e: raise AppException(status_code=500, detail=str(e)) @router.post( "/{portfolio_id}/bonds/{bond_id}/sell", response_model=ResponseModel, summary="Sell Bond from Portfolio", ) async def sell_bond_from_portfolio( portfolio_id: int, bond_id: int, # Changed from holding_id to bond_id sell_data: BondSellSchema, # Assumes sell_data.sell_price is TOTAL proceeds current_user=Depends(get_current_user), ): try: portfolio = await Portfolio.get_or_none( id=portfolio_id, user_id=current_user.id, is_active=True ) if not portfolio: raise AppException( status_code=404, detail="Active portfolio not found or access denied" ) transaction = await PortfolioService.sell_bond_holding( portfolio_id=portfolio_id, bond_id=bond_id, # Use bond_id from path face_value_to_sell=sell_data.face_value_to_sell, sell_price_total=sell_data.sell_price, # Assumed total proceeds from schema sell_date=sell_data.sell_date, notes=sell_data.notes, ) transaction_pydantic_data = ( await PortfolioTransaction_Pydantic.from_tortoise_orm(transaction) ) return ResponseModel( success=True, message="Bond portion sold successfully", data=transaction_pydantic_data, ) except DoesNotExist as e: raise AppException(status_code=404, detail=str(e)) except AppException: raise except Exception as e: raise AppException(status_code=500, detail=str(e)) @router.put("/{portfolio_id}/bonds/{bond_id}", response_model=ResponseModel) async def update_bond_holding( portfolio_id: int, bond_id: int, # Changed from holding_id to bond_id bond_data: BondHoldingUpdate, current_user=Depends(get_current_user), ): try: portfolio = await Portfolio.get_or_none( id=portfolio_id, user_id=current_user.id ) if not portfolio: raise AppException(status_code=404, detail="Portfolio not found") holding = await PortfolioBond.get_or_none( bond_id=bond_id, portfolio_id=portfolio_id ) if not holding: raise AppException( status_code=404, detail="Bond holding for this bond not found in portfolio.", ) update_data = bond_data.dict(exclude_unset=True) # Caution: Updating face_value_held or purchase_price (total cost) directly # should be done carefully. If face_value_held changes, purchase_price (total) # should ideally be adjusted proportionally to maintain average cost per unit of FV, # unless it's a specific correction. if ( "face_value_held" in update_data and "purchase_price" not in update_data and holding.face_value_held > 0 ): # If only face_value_held is changing, adjust purchase_price proportionally # This is complex for a simple PUT, better handled by specific service method or by requiring both. # For now, if only FV changes, the total cost is NOT proportionally adjusted here. # User would need to provide new total purchase_price if FV changes and cost basis needs adjustment. pass elif "purchase_price" in update_data: # Allows direct update of total cost pass if update_data: await holding.update_from_dict(update_data).save() holding_pydantic_data = await PortfolioBond_Pydantic.from_tortoise_orm(holding) return ResponseModel( success=True, message="Bond holding updated successfully", data=holding_pydantic_data, ) except DoesNotExist as e: raise AppException(status_code=404, detail=str(e)) except AppException: raise except Exception as e: raise AppException(status_code=500, detail=str(e)) @router.delete( "/{portfolio_id}/bonds/{bond_id}", response_model=ResponseModel, summary="Delete Bond Holding", ) async def remove_bond_from_portfolio( portfolio_id: int, bond_id: int, # Changed from holding_id to bond_id current_user=Depends(get_current_user), ): try: portfolio = await Portfolio.get_or_none( id=portfolio_id, user_id=current_user.id ) if not portfolio: raise AppException(status_code=404, detail="Portfolio not found") success = await PortfolioService.remove_holding( portfolio_id=portfolio_id, asset_type_str="BOND", asset_id_value=bond_id, # Use bond_id ) if not success: raise AppException( status_code=404, detail="Bond holding not found or could not be deleted" ) return ResponseModel( success=True, message="Bond holding removed from portfolio successfully", data=None, ) except AppException: raise except Exception as e: raise AppException(status_code=500, detail=str(e)) # Calendar and Transaction Routes (No changes related to holding_id vs asset_id here) @router.post("/{portfolio_id}/calendar", response_model=ResponseModel) async def add_calendar_event( portfolio_id: int, event_data: CalendarEventCreate, current_user=Depends(get_current_user), ): try: portfolio = await Portfolio.get_or_none( id=portfolio_id, user_id=current_user.id ) if not portfolio: raise AppException(status_code=404, detail="Portfolio not found") event = await PortfolioCalendar.create( portfolio_id=portfolio_id, **event_data.dict() ) event_pydantic_data = await PortfolioCalendar_Pydantic.from_tortoise_orm(event) return ResponseModel( success=True, message="Calendar event added successfully", data=event_pydantic_data, ) except AppException: raise except Exception as e: raise AppException(status_code=500, detail=str(e)) @router.get("/{portfolio_id}/transactions", response_model=ResponseModel) async def get_portfolio_transactions( portfolio_id: int, limit: int = Query(50, ge=1, le=200), offset: int = Query(0, ge=0), current_user=Depends(get_current_user), ): try: # 1. VALIDATION AND INITIAL QUERY (Same as before) portfolio = await Portfolio.get_or_none( id=portfolio_id, user_id=current_user.id ) if not portfolio: raise AppException(status_code=404, detail="Portfolio not found") transactions_query = ( PortfolioTransaction.filter(portfolio_id=portfolio_id) .order_by("-transaction_date", "-created_at") .offset(offset) .limit(limit) ) transactions_list = await transactions_query.all() # --- ENRICHMENT LOGIC STARTS HERE --- # 2. COLLECT UNIQUE ASSET IDs FROM THE TRANSACTION LIST stock_ids = set() utt_ids = set() bond_ids = set() for t in transactions_list: if t.asset_type == "STOCK": stock_ids.add(t.asset_id) elif t.asset_type == "UTT": utt_ids.add(t.asset_id) elif t.asset_type == "BOND": bond_ids.add(t.asset_id) # 3. BULK FETCH ASSET DETAILS stocks_map: Dict[int, Stock] = { s.id: s for s in await Stock.filter(id__in=list(stock_ids)) } utts_map: Dict[int, UTTFund] = { u.id: u for u in await UTTFund.filter(id__in=list(utt_ids)) } bonds_map: Dict[int, Bond] = { b.id: b for b in await Bond.filter(id__in=list(bond_ids)) } # 4. CONSTRUCT THE ENRICHED RESPONSE enriched_transactions: List[TransactionDetailResponse] = [] for t in transactions_list: asset_name = None asset_symbol = None if t.asset_type == "STOCK" and t.asset_id in stocks_map: asset_name = stocks_map[t.asset_id].name asset_symbol = stocks_map[t.asset_id].symbol elif t.asset_type == "UTT" and t.asset_id in utts_map: asset_name = utts_map[t.asset_id].name asset_symbol = utts_map[t.asset_id].symbol elif t.asset_type == "BOND" and t.asset_id in bonds_map: bond = bonds_map[t.asset_id] asset_name = f"{bond.maturity_years} Yr Treasury Bond" asset_symbol = bond.isin # Create the enriched Pydantic model enriched_transaction = TransactionDetailResponse.model_validate( { **t.__dict__, # Unpack the transaction's own fields "asset_name": asset_name, "asset_symbol": asset_symbol, } ) enriched_transactions.append(enriched_transaction) # --- ENRICHMENT LOGIC ENDS --- total_count = await PortfolioTransaction.filter( portfolio_id=portfolio_id ).count() return ResponseModel( success=True, message="Transactions retrieved successfully", data={ # Use the new enriched list "transactions": [et.model_dump() for et in enriched_transactions], "total_count": total_count, "limit": limit, "offset": offset, }, ) except Exception as e: raise AppException(status_code=500, detail=str(e)) # Performance and Analytics Routes @router.get( "/{portfolio_id}/positions", response_model=ResponseModel, summary="Get All Current Portfolio Positions", ) async def get_portfolio_positions( portfolio_id: int, current_user=Depends(get_current_user) ): """ Calculates and retrieves all current positions in a portfolio. It processes all buy/sell transactions to determine average cost, fetches the latest market price, and calculates current value and profit/loss. """ # 1. AUTHENTICATION & VALIDATION portfolio = await Portfolio.get_or_none(id=portfolio_id, user_id=current_user.id) if not portfolio: raise AppException(status_code=404, detail="Portfolio not found") # 2. FETCH AND AGGREGATE TRANSACTIONS transactions = await PortfolioTransaction.filter( portfolio_id=portfolio_id ).order_by("transaction_date") # This dictionary will hold the aggregated data for each asset # Key: (asset_type, asset_id), Value: {buy_qty, buy_cost, sell_qty} aggregated_data: Dict[tuple, Dict] = {} for t in transactions: asset_key = (t.asset_type, t.asset_id) if asset_key not in aggregated_data: aggregated_data[asset_key] = { "buy_qty": Decimal("0.0"), "buy_cost": Decimal("0.0"), "sell_qty": Decimal("0.0"), } if t.transaction_type == "BUY": aggregated_data[asset_key]["buy_qty"] += t.quantity aggregated_data[asset_key]["buy_cost"] += t.total_amount elif t.transaction_type == "SELL": aggregated_data[asset_key]["sell_qty"] += t.quantity # 3. PROCESS AGGREGATES AND FETCH LIVE DATA position_responses: List[PositionResponse] = [] for asset_key, data in aggregated_data.items(): asset_type, asset_id = asset_key current_quantity = data["buy_qty"] - data["sell_qty"] # If the asset has been completely sold, skip it. if current_quantity <= 0: continue # Calculate cost basis for the currently held units avg_buy_price = ( data["buy_cost"] / data["buy_qty"] if data["buy_qty"] > 0 else Decimal("0.0") ) total_invested = current_quantity * avg_buy_price # Fetch current price and asset details based on type current_price = Decimal("0.0") asset_name = "Unknown" asset_symbol = "N/A" if asset_type == "STOCK": stock = await Stock.get_or_none(id=asset_id) if stock: asset_name = stock.name asset_symbol = stock.symbol price_data = ( await StockPriceData.filter(stock_id=asset_id) .order_by("-date") .first() ) if price_data: current_price = price_data.closing_price elif asset_type == "UTT": utt = await UTTFund.get_or_none(id=asset_id) if utt: asset_name = utt.name asset_symbol = utt.symbol price_data = ( await UTTFundData.filter(fund_id=asset_id).order_by("-date").first() ) if price_data: current_price = Decimal(str(price_data.nav_per_unit)) elif asset_type == "BOND": bond = await Bond.get_or_none(id=asset_id) if bond: asset_name = f"{bond.maturity_years} Yr Treasury Bond" asset_symbol = bond.isin # Bond valuation is complex. We'll use a simplified assumption that the # "price" is 100 for valuation purposes against its face value. # Here, we'll represent price_per_100. current_price = ( Decimal(str(bond.price_per_100)) if bond.price_per_100 else Decimal("100.0") ) # Calculate final metrics current_value = current_quantity * current_price profit_loss = current_value - total_invested profit_loss_percent = ( (profit_loss / total_invested) * 100 if total_invested > 0 else 0.0 ) # Create the response object position = PositionResponse( asset_id=asset_id, asset_type=asset_type.capitalize(), # "STOCK" -> "Stock" asset_name=asset_name, asset_symbol=asset_symbol, quantity=current_quantity, avg_buy_price=round(avg_buy_price, 4), total_invested=round(total_invested, 2), current_price=round(current_price, 4), current_value=round(current_value, 2), profit_loss=round(profit_loss, 2), profit_loss_percent=round(float(profit_loss_percent), 2), ) position_responses.append(position) # 4. RETURN THE FINAL RESPONSE return ResponseModel( success=True, message="Positions retrieved successfully.", data={"positions": position_responses}, ) @router.post("/{portfolio_id}/snapshot", response_model=ResponseModel) async def create_portfolio_snapshot_route( # Renamed portfolio_id: int, snapshot_date: Optional[date] = Query( None, description="Date for the snapshot. Defaults to today if None." ), current_user=Depends(get_current_user), ): try: portfolio = await Portfolio.get_or_none( id=portfolio_id, user_id=current_user.id ) if not portfolio: raise AppException(status_code=404, detail="Portfolio not found") snapshot_orm = await PortfolioService.create_portfolio_snapshot( portfolio_id=portfolio_id, snapshot_date_input=snapshot_date ) snapshot_pydantic_data = await PortfolioSnapshotPydantic.from_tortoise_orm( snapshot_orm ) return ResponseModel( success=True, message="Portfolio snapshot created successfully", data=snapshot_pydantic_data, ) except NotImplementedError as e: # Catch specific error from service raise AppException(status_code=501, detail=str(e)) except DoesNotExist: raise AppException( status_code=404, detail="Portfolio not found when creating snapshot." ) except AppException: raise except Exception as e: raise AppException( status_code=500, detail=f"Failed to create snapshot: {str(e)}" ) @router.get( "/{portfolio_id}/performance", response_model=ResponseModel, summary="Get Portfolio Performance Timeseries", ) async def get_portfolio_performance( portfolio_id: int, background_tasks: BackgroundTasks, period: str = Query( "1M", enum=["1D", "1W", "1M", "YTD", "1Y", "Max"], description="The time period for the performance data.", ), current_user=Depends(get_current_user), ): """ Retrieves time-series performance data for a portfolio. If data is missing, it automatically queues a background task to generate all historical data and informs the user to wait. """ try: # 1. AUTHENTICATION portfolio = await Portfolio.get_or_none( id=portfolio_id, user_id=current_user.id ) if not portfolio: raise AppException(status_code=404, detail="Portfolio not found") # 2. CONSOLIDATED TASK CHECK: Check if ANY relevant task is already running. active_task = await ImportTask.filter( Q(details__contains={"portfolio_id": portfolio_id}), Q(task_type__in=["portfolio_regeneration", "portfolio_snapshot_history"]), status__in=["pending", "running"], ).first() if active_task: return ResponseModel( success=False, message="Portfolio performance data is currently being prepared. Please check back in a few moments.", data={"task_id": active_task.id, "status": active_task.status}, ) # 3. DEFINE TIME PERIOD & QUERY EXISTING DATA end_date = date.today() start_date = None if period == "1D": start_date = end_date - timedelta(days=1) elif period == "1W": start_date = end_date - timedelta(weeks=1) elif period == "1M": start_date = end_date - timedelta(days=30) elif period == "YTD": start_date = date(end_date.year, 1, 1) elif period == "1Y": start_date = end_date - timedelta(days=365) if period == "Max": start_date = end_date - timedelta(days=365 * 10) # A 10-year fallback query = PortfolioSnapshot.filter(portfolio_id=portfolio_id) if start_date: start_datetime = datetime.combine(start_date, datetime.min.time()) query = query.filter(snapshot_date__gte=start_datetime) snapshots = await query.order_by("snapshot_date").values( "snapshot_date", "total_value" ) #### delete snapshots #### # 4. DECISION POINT: Serve data OR trigger generation. # If we found no snapshots for the requested period, it's time to generate. if not snapshots: # Since we already checked for active tasks, we know it's safe to start a new one. task = await ImportTask.create( task_type="portfolio_snapshot_history", status="pending", details={ "portfolio_id": portfolio_id, "reason": "First-time data request.", }, ) # We call the task without a start_date, so it will find the earliest transaction. background_tasks.add_task( PortfolioService.regenerate_snapshots_task, task.id, portfolio_id ) return ResponseModel( success=False, message="We're preparing your performance history for the first time. This may take a moment.", data={"task_id": task.id, "status": "pending"}, ) # 5. SUCCESS PATH: This code is only reached if snapshots WERE found. if len(snapshots) < 2: current_value = snapshots[0]["total_value"] return ResponseModel( success=True, message="Not enough historical data to calculate performance change.", data={ "current_value": str(current_value), "change_value": "0.00", "change_percentage": 0.0, "timeseries": [ { "date": s["snapshot_date"].isoformat(), "value": str(s["total_value"]), } for s in snapshots ], }, ) first_value = snapshots[0]["total_value"] last_value = snapshots[-1]["total_value"] change_value = last_value - first_value change_percentage = ( (change_value / first_value) * 100 if first_value > 0 else Decimal("0.0") ) return ResponseModel( success=True, message=f"Performance data for period '{period}' retrieved successfully.", data={ "current_value": str(last_value), "change_value": str(change_value), "change_percentage": round(float(change_percentage), 2), "timeseries": [ { "date": s["snapshot_date"].isoformat(), "value": str(s["total_value"]), } for s in snapshots ], }, ) except Exception as e: raise AppException(status_code=500, detail=f"An unexpected error occurred: {e}") @router.get( "/{portfolio_id}/calendar", response_model=ResponseModel, summary="Get Upcoming Portfolio Calendar Events", ) async def get_portfolio_calendar_events( portfolio_id: int, start_date: Optional[date] = Query( None, description="Start of date range. Defaults to today." ), end_date: Optional[date] = Query( None, description="End of date range. Defaults to 90 days from now." ), current_user=Depends(get_current_user), ): """ Generates a dynamic calendar of expected income events (dividends and coupons) for a user's portfolio within a given date range. """ # 1. SETUP & AUTHENTICATION if start_date is None: start_date = date.today() if end_date is None: end_date = start_date + timedelta(days=90) portfolio = await Portfolio.get_or_none(id=portfolio_id, user_id=current_user.id) if not portfolio: raise AppException(status_code=404, detail="Portfolio not found") calendar_events: List[CalendarEventResponse] = [] print("Hello there buddy!!") # 2. PROCESS STOCK DIVIDENDS # Get all stocks currently held in the portfolio portfolio_stocks = await PortfolioStock.filter( portfolio_id=portfolio_id ).select_related("stock") print("Hello there buddy!!") if portfolio_stocks: stock_ids = [ps.stock.id for ps in portfolio_stocks] # Create a map for quick lookup of quantity held for each stock stock_quantity_map = {ps.stock.id: ps.quantity for ps in portfolio_stocks} # Find all declared dividends for those stocks within the date range dividends = await Dividend.filter( stock_id__in=stock_ids, payment_date__gte=start_date, payment_date__lte=end_date, ).select_related("stock") print(dividends) for div in dividends: quantity_held = stock_quantity_map.get(div.stock.id, 0) if quantity_held > 0: event = CalendarEventResponse( event_date=div.payment_date, event_type="Dividend Payment", asset_symbol=div.stock.symbol, asset_name=div.stock.name, estimated_amount=div.dividend_amount * quantity_held, notes=f"Ex-dividend date: {div.ex_dividend_date.isoformat()}", ) calendar_events.append(event) # 3. PROCESS BOND COUPONS # Get all bonds currently held in the portfolio portfolio_bonds = await PortfolioBond.filter( portfolio_id=portfolio_id ).select_related("bond") if portfolio_bonds: for pb in portfolio_bonds: # Use our helper function to calculate coupon dates in the range coupon_dates = _calculate_bond_coupon_dates(pb.bond, start_date, end_date) for coupon_date in coupon_dates: # Coupon amount is based on face value and semi-annual rate estimated_amount = ( pb.face_value_held * (Decimal(str(pb.bond.coupon_rate)) / Decimal("100")) ) / Decimal("2") event = CalendarEventResponse( event_date=coupon_date, event_type="Bond Coupon", asset_symbol=pb.bond.isin, asset_name=f"{pb.bond.maturity_years} Yr T-Bond", estimated_amount=estimated_amount, notes=f"Matures on {pb.bond.maturity_date.isoformat()}", ) calendar_events.append(event) # 4. SORT AND RETURN # Sort all collected events by date sorted_events = sorted(calendar_events, key=lambda x: x.event_date) return ResponseModel( success=True, message="Portfolio calendar events retrieved successfully.", data={"events": sorted_events, "total_count": len(sorted_events)}, )