|
project/ |
|
βββ main.py |
|
βββ routers/ |
|
β βββ stocks/ |
|
β β βββ __init__.py |
|
β β βββ models.py |
|
β β βββ schemas.py |
|
β β βββ service.py |
|
β β βββ routes.py |
|
β βββ utt/ |
|
β β βββ __init__.py |
|
β β βββ models.py |
|
β β βββ schemas.py |
|
β β βββ service.py |
|
β β βββ routes.py |
|
β βββ tasks/ |
|
β βββ __init__.py |
|
β βββ models.py |
|
β βββ schemas.py |
|
β βββ routes.py |
|
|
|
# main.py |
|
from fastapi import FastAPI, Request, HTTPException |
|
from fastapi.responses import JSONResponse |
|
fromApp.routers.stocks.routes import router as stocks_router |
|
fromApp.routers.utt.routes import router as utt_router |
|
fromApp.routers.tasks.routes import router as tasks_router |
|
|
|
app = FastAPI() |
|
|
|
@app.exception_handler(HTTPException) |
|
async def http_exception_handler(request: Request, exc: HTTPException): |
|
return JSONResponse(status_code=exc.status_code, content={"success": False, "message": exc.detail}) |
|
|
|
@app.exception_handler(Exception) |
|
async def generic_exception_handler(request: Request, exc: Exception): |
|
return JSONResponse(status_code=500, content={"success": False, "message": str(exc)}) |
|
|
|
app.include_router(stocks_router) |
|
app.include_router(utt_router) |
|
app.include_router(tasks_router) |
|
|
|
# routers/tasks/models.py |
|
from tortoise import fields, models |
|
from enum import Enum |
|
|
|
class TaskStatus(str, Enum): |
|
PENDING = "pending" |
|
RUNNING = "running" |
|
COMPLETED = "completed" |
|
FAILED = "failed" |
|
|
|
class ImportTask(models.Model): |
|
id = fields.IntField(pk=True) |
|
task_type = fields.CharField(max_length=50) |
|
status = fields.CharEnumField(TaskStatus, default=TaskStatus.PENDING) |
|
details = fields.JSONField(null=True) |
|
created_at = fields.DatetimeField(auto_now_add=True) |
|
updated_at = fields.DatetimeField(auto_now=True) |
|
|
|
class Meta: |
|
table = "import_tasks" |
|
|
|
# routers/tasks/schemas.py |
|
from pydantic import BaseModel |
|
from datetime import datetime |
|
from enum import Enum |
|
|
|
class TaskStatus(str, Enum): |
|
PENDING = "pending" |
|
RUNNING = "running" |
|
COMPLETED = "completed" |
|
FAILED = "failed" |
|
|
|
class ImportTaskResponse(BaseModel): |
|
id: int |
|
task_type: str |
|
status: TaskStatus |
|
details: dict | None |
|
created_at: datetime |
|
updated_at: datetime |
|
|
|
class ResponseModel(BaseModel): |
|
success: bool |
|
message: str |
|
data: dict | list | None = None |
|
|
|
# routers/tasks/routes.py |
|
from fastapi import APIRouter, HTTPException |
|
from .models import ImportTask |
|
from .schemas import ImportTaskResponse, ResponseModel |
|
|
|
router = APIRouter(prefix="/tasks", tags=["Tasks"]) |
|
|
|
@router.get("/", response_model=ResponseModel) |
|
async def list_tasks(): |
|
tasks = await ImportTask.all().order_by("-created_at") |
|
return ResponseModel(success=True, message="List of tasks", data=[task for task in tasks]) |
|
|
|
@router.get("/{task_id}", response_model=ResponseModel) |
|
async def get_task(task_id: int): |
|
task = await ImportTask.get_or_none(id=task_id) |
|
if not task: |
|
raise HTTPException(status_code=404, detail="Task not found") |
|
return ResponseModel(success=True, message="Task found", data=task) |
|
|
|
# You would follow the same modular structure for stocks and utt |
|
# Including their own models.py, schemas.py, routes.py, service.py |
|
# The routes would queue background tasks and use task_id for status tracking. |
|
|
|
|
|
|
|
project/ |
|
βββ main.py |
|
βββ routers/ |
|
β βββ stocks/ |
|
β β βββ __init__.py |
|
β β βββ models.py |
|
β β βββ schemas.py |
|
β β βββ service.py |
|
β β βββ routes.py |
|
β βββ utt/ |
|
β β βββ __init__.py |
|
β β βββ models.py |
|
β β βββ schemas.py |
|
β β βββ service.py |
|
β β βββ routes.py |
|
β βββ tasks/ |
|
β βββ __init__.py |
|
β βββ models.py |
|
β βββ schemas.py |
|
β βββ routes.py |
|
|
|
# routers/stocks/models.py |
|
from tortoise import fields, models |
|
|
|
class Stock(models.Model): |
|
id = fields.IntField(pk=True) |
|
symbol = fields.CharField(max_length=10, unique=True) |
|
name = fields.CharField(max_length=100) |
|
sector = fields.CharField(max_length=100, null=True) |
|
|
|
class StockPriceData(models.Model): |
|
id = fields.IntField(pk=True) |
|
stock = fields.ForeignKeyField("models.Stock", related_name="prices") |
|
date = fields.DateField() |
|
opening_price = fields.FloatField() |
|
closing_price = fields.FloatField() |
|
high = fields.FloatField() |
|
low = fields.FloatField() |
|
volume = fields.IntField() |
|
turnover = fields.BigIntField() |
|
shares_in_issue = fields.BigIntField() |
|
market_cap = fields.BigIntField() |
|
|
|
class Meta: |
|
unique_together = ("stock", "date") |
|
|
|
# routers/stocks/schemas.py |
|
from pydantic import BaseModel |
|
from datetime import date |
|
from typing import List, Optional |
|
fromApp.routers.tasks.schemas import ResponseModel |
|
|
|
class StockBase(BaseModel): |
|
symbol: str |
|
name: str |
|
sector: Optional[str] = None |
|
|
|
class StockResponse(StockBase): |
|
id: int |
|
|
|
class StockPriceResponse(BaseModel): |
|
date: date |
|
opening_price: float |
|
closing_price: float |
|
high: float |
|
low: float |
|
volume: int |
|
turnover: int |
|
shares_in_issue: int |
|
market_cap: int |
|
|
|
class StockPriceListResponse(BaseModel): |
|
stock: StockResponse |
|
prices: List[StockPriceResponse] |
|
|
|
# routers/stocks/service.py |
|
from datetime import datetime |
|
|
|
async def fetch_stock_data(symbol: str): |
|
import httpx |
|
url = f"https://dse.co.tz/api/get/market/prices/for/range/duration?security_code={symbol}&days=5000&class=EQUITY" |
|
async with httpx.AsyncClient() as client: |
|
response = await client.get(url) |
|
if response.status_code == 200: |
|
return response.json().get("data", []) |
|
return [] |
|
|
|
def parse_stock_api_row(row: dict) -> dict: |
|
return { |
|
"date": datetime.fromisoformat(row["trade_date"]).date(), |
|
"opening_price": row["opening_price"], |
|
"closing_price": row["closing_price"], |
|
"high": row["high"], |
|
"low": row["low"], |
|
"volume": row["volume"], |
|
"turnover": row["turnover"], |
|
"shares_in_issue": row["shares_in_issue"], |
|
"market_cap": row["market_cap"], |
|
} |
|
|
|
# routers/stocks/routes.py |
|
from fastapi import APIRouter, HTTPException, BackgroundTasks |
|
from tortoise.transactions import in_transaction |
|
from .models import Stock, StockPriceData |
|
from .schemas import StockResponse, StockPriceListResponse, ResponseModel |
|
from .service import fetch_stock_data, parse_stock_api_row |
|
fromApp.routers.tasks.models import ImportTask |
|
from typing import List |
|
|
|
router = APIRouter(prefix="/stocks", tags=["Stocks"]) |
|
|
|
@router.get("/", response_model=ResponseModel) |
|
async def list_stocks(): |
|
stocks = await Stock.all() |
|
return ResponseModel(success=True, message="List of stocks", data=stocks) |
|
|
|
@router.get("/{symbol}", response_model=ResponseModel) |
|
async def get_stock_prices(symbol: str): |
|
stock = await Stock.get_or_none(symbol=symbol) |
|
if not stock: |
|
raise HTTPException(status_code=404, detail="Stock not found") |
|
prices = await StockPriceData.filter(stock=stock).order_by("-date") |
|
return ResponseModel(success=True, message="Stock price data", data={"stock": stock, "prices": prices}) |
|
|
|
@router.post("/import/{symbol}", response_model=ResponseModel) |
|
async def queue_import_stock(symbol: str, background_tasks: BackgroundTasks): |
|
task = await ImportTask.create(task_type="stocks", status="pending", details={"symbol": symbol}) |
|
background_tasks.add_task(run_stock_import_task, task.id, symbol) |
|
return ResponseModel(success=True, message="Stock import task queued", data={"task_id": task.id}) |
|
|
|
async def run_stock_import_task(task_id: int, symbol: str): |
|
try: |
|
await ImportTask.filter(id=task_id).update(status="running") |
|
raw_data = await fetch_stock_data(symbol) |
|
if not raw_data: |
|
await ImportTask.filter(id=task_id).update(status="failed", details={"error": "No data"}) |
|
return |
|
|
|
first = raw_data[0] |
|
stock, _ = await Stock.get_or_create(symbol=first["company"], defaults={"name": first["fullName"]}) |
|
|
|
existing_dates = set(await StockPriceData.filter(stock=stock).values_list("date", flat=True)) |
|
|
|
records = [] |
|
for row in raw_data: |
|
parsed = parse_stock_api_row(row) |
|
if parsed["date"] not in existing_dates: |
|
records.append(StockPriceData(stock=stock, **parsed)) |
|
|
|
async with in_transaction(): |
|
await StockPriceData.bulk_create(records, ignore_conflicts=True) |
|
|
|
await ImportTask.filter(id=task_id).update(status="completed") |
|
except Exception as e: |
|
await ImportTask.filter(id=task_id).update(status="failed", details={"error": str(e)}) |
|
|
|
|
|
# routers/utt/models.py |
|
from tortoise import fields, models |
|
|
|
class UTTFund(models.Model): |
|
id = fields.IntField(pk=True) |
|
symbol = fields.CharField(max_length=20, unique=True) |
|
name = fields.CharField(max_length=100) |
|
|
|
class UTTFundData(models.Model): |
|
id = fields.IntField(pk=True) |
|
fund = fields.ForeignKeyField("models.UTTFund", related_name="data") |
|
date = fields.DateField() |
|
nav_per_unit = fields.FloatField() |
|
sale_price_per_unit = fields.FloatField() |
|
repurchase_price_per_unit = fields.FloatField() |
|
outstanding_number_of_units = fields.BigIntField() |
|
net_asset_value = fields.BigIntField() |
|
|
|
class Meta: |
|
unique_together = ("fund", "date") |
|
|
|
# routers/utt/schemas.py |
|
from pydantic import BaseModel |
|
from datetime import date |
|
from typing import List |
|
fromApp.routers.tasks.schemas import ResponseModel |
|
|
|
class UTTFundResponse(BaseModel): |
|
id: int |
|
symbol: str |
|
name: str |
|
|
|
class UTTFundDataResponse(BaseModel): |
|
date: date |
|
nav_per_unit: float |
|
sale_price_per_unit: float |
|
repurchase_price_per_unit: float |
|
outstanding_number_of_units: int |
|
net_asset_value: int |
|
|
|
class UTTFundListResponse(BaseModel): |
|
fund: UTTFundResponse |
|
data: List[UTTFundDataResponse] |
|
|
|
# routers/utt/service.py |
|
from datetime import datetime |
|
import httpx |
|
|
|
async def fetch_all_utt_data(): |
|
url = "https://example.com/utt/api" # Placeholder |
|
async with httpx.AsyncClient() as client: |
|
response = await client.get(url) |
|
if response.status_code == 200: |
|
return response.json().get("data", []) |
|
return [] |
|
|
|
def parse_utt_api_row(row: dict) -> dict: |
|
return { |
|
"date": datetime.strptime(row["date_valued"], "%d-%m-%Y").date(), |
|
"nav_per_unit": float(row["nav_per_unit"]), |
|
"sale_price_per_unit": float(row["sale_price_per_unit"]), |
|
"repurchase_price_per_unit": float(row["repurchase_price_per_unit"]), |
|
"outstanding_number_of_units": int(float(row["outstanding_number_of_units"])), |
|
"net_asset_value": int(float(row["net_asset_value"])) |
|
} |
|
|
|
# routers/utt/routes.py |
|
from fastapi import APIRouter, BackgroundTasks, HTTPException |
|
from .models import UTTFund, UTTFundData |
|
from .schemas import UTTFundResponse, UTTFundListResponse, ResponseModel |
|
from .service import fetch_all_utt_data, parse_utt_api_row |
|
fromApp.routers.tasks.models import ImportTask |
|
|
|
router = APIRouter(prefix="/utt", tags=["UTT"]) |
|
|
|
@router.get("/", response_model=ResponseModel) |
|
async def list_funds(): |
|
funds = await UTTFund.all() |
|
return ResponseModel(success=True, message="List of UTT funds", data=funds) |
|
|
|
@router.get("/{symbol}", response_model=ResponseModel) |
|
async def get_fund_data(symbol: str): |
|
fund = await UTTFund.get_or_none(symbol=symbol) |
|
if not fund: |
|
raise HTTPException(status_code=404, detail="Fund not found") |
|
data = await UTTFundData.filter(fund=fund).order_by("-date") |
|
return ResponseModel(success=True, message="Fund data", data={"fund": fund, "data": data}) |
|
|
|
@router.post("/import-all", response_model=ResponseModel) |
|
async def queue_import_utt(background_tasks: BackgroundTasks): |
|
task = await ImportTask.create(task_type="utt", status="pending", details={}) |
|
background_tasks.add_task(run_utt_import_task, task.id) |
|
return ResponseModel(success=True, message="UTT import task queued", data={"task_id": task.id}) |
|
|
|
async def run_utt_import_task(task_id: int): |
|
from tortoise.transactions import in_transaction |
|
try: |
|
await ImportTask.filter(id=task_id).update(status="running") |
|
raw_data = await fetch_all_utt_data() |
|
if not raw_data: |
|
await ImportTask.filter(id=task_id).update(status="failed", details={"error": "No data"}) |
|
return |
|
|
|
for row in raw_data: |
|
symbol = row["internal_name"] |
|
name = row["scheme_name"] |
|
fund, _ = await UTTFund.get_or_create(symbol=symbol, defaults={"name": name}) |
|
|
|
parsed = parse_utt_api_row(row) |
|
exists = await UTTFundData.exists(fund=fund, date=parsed["date"]) |
|
if not exists: |
|
await UTTFundData.create(fund=fund, **parsed) |
|
|
|
await ImportTask.filter(id=task_id).update(status="completed") |
|
except Exception as e: |
|
await ImportTask.filter(id=task_id).update(status="failed", details={"error": str(e)}) |
|
|
|
|
|
|
|
|