|
from datetime import datetime, timezone |
|
from typing import Any, Dict, List, Optional |
|
|
|
from fastapi import HTTPException, status |
|
from bson import ObjectId |
|
|
|
from ..database.connection import get_collection |
|
from ..models.monthly_record import ( |
|
MonthlyRecord, MonthlyRecordCreate, MonthlyRecordUpdate, |
|
ExpenseCategory, month_key_from, normalize_month_name |
|
) |
|
|
|
class RecordService: |
|
def __init__(self): |
|
self.col = get_collection() |
|
|
|
async def ensure_indexes(self) -> None: |
|
|
|
await self.col.create_index("month_key", unique=True) |
|
|
|
def _calc_totals(self, salary: float, expenses: List[ExpenseCategory]) -> Dict[str, float]: |
|
total_expenses = round(sum(e.amount for e in expenses), 2) |
|
remaining = round(salary - total_expenses, 2) |
|
return {"total_expenses": total_expenses, "remaining": remaining} |
|
|
|
def _serialize(self, doc: Dict[str, Any]) -> Dict[str, Any]: |
|
if not doc: |
|
return doc |
|
doc["_id"] = str(doc["_id"]) |
|
return doc |
|
|
|
async def get_by_month_key(self, month_key: str) -> Dict[str, Any]: |
|
doc = await self.col.find_one({"month_key": month_key}) |
|
if not doc: |
|
raise HTTPException(status_code=404, detail="No record found for this month") |
|
return self._serialize(doc) |
|
|
|
async def list(self, limit: int = 12, skip: int = 0) -> List[Dict[str, Any]]: |
|
cursor = self.col.find({}).sort("month_key", 1).skip(skip).limit(limit) |
|
return [self._serialize(d) async for d in cursor] |
|
|
|
async def create(self, payload: MonthlyRecordCreate) -> Dict[str, Any]: |
|
month = normalize_month_name(payload.month) |
|
month_key = month_key_from(month, payload.year) |
|
now = datetime.now(timezone.utc) |
|
|
|
totals = self._calc_totals(payload.salary, payload.expenses) |
|
|
|
doc = { |
|
"month": month, |
|
"year": payload.year, |
|
"month_key": month_key, |
|
"salary": float(payload.salary), |
|
"expenses": [e.model_dump() for e in payload.expenses], |
|
"total_expenses": totals["total_expenses"], |
|
"remaining": totals["remaining"], |
|
"created_at": now, |
|
"updated_at": now, |
|
} |
|
|
|
try: |
|
result = await self.col.insert_one(doc) |
|
except Exception as e: |
|
|
|
raise HTTPException(status_code=400, detail=str(e)) |
|
|
|
created = await self.col.find_one({"_id": result.inserted_id}) |
|
return self._serialize(created) |
|
|
|
async def update(self, month_key: str, payload: MonthlyRecordUpdate) -> Dict[str, Any]: |
|
existing = await self.col.find_one({"month_key": month_key}) |
|
if not existing: |
|
raise HTTPException(status_code=404, detail="No record found for this month") |
|
|
|
salary = payload.salary if payload.salary is not None else existing["salary"] |
|
expenses = payload.expenses if payload.expenses is not None else existing["expenses"] |
|
|
|
expenses_list = [e.model_dump() if hasattr(e, "model_dump") else e for e in expenses] |
|
|
|
totals = self._calc_totals(salary, [ExpenseCategory(**e) for e in expenses_list]) |
|
|
|
update_doc = { |
|
"$set": { |
|
"salary": float(salary), |
|
"expenses": expenses_list, |
|
"total_expenses": totals["total_expenses"], |
|
"remaining": totals["remaining"], |
|
"updated_at": datetime.now(timezone.utc), |
|
} |
|
} |
|
|
|
await self.col.update_one({"month_key": month_key}, update_doc) |
|
updated = await self.col.find_one({"month_key": month_key}) |
|
return self._serialize(updated) |
|
|
|
async def delete(self, month_key: str) -> None: |
|
result = await self.col.delete_one({"month_key": month_key}) |
|
if result.deleted_count == 0: |
|
raise HTTPException(status_code=404, detail="No record found for this month") |
|
|