XYHLF / structure.txt
Mbonea's picture
initial commit
9d4bd7c
project/
β”œβ”€β”€ main.py
β”œβ”€β”€ routers/
β”‚ β”œβ”€β”€ stocks/
β”‚ β”‚ β”œβ”€β”€ __init__.py
β”‚ β”‚ β”œβ”€β”€ models.py
β”‚ β”‚ β”œβ”€β”€ schemas.py
β”‚ β”‚ β”œβ”€β”€ service.py
β”‚ β”‚ └── routes.py
β”‚ β”œβ”€β”€ utt/
β”‚ β”‚ β”œβ”€β”€ __init__.py
β”‚ β”‚ β”œβ”€β”€ models.py
β”‚ β”‚ β”œβ”€β”€ schemas.py
β”‚ β”‚ β”œβ”€β”€ service.py
β”‚ β”‚ └── routes.py
β”‚ └── tasks/
β”‚ β”œβ”€β”€ __init__.py
β”‚ β”œβ”€β”€ models.py
β”‚ β”œβ”€β”€ schemas.py
β”‚ └── routes.py
# main.py
from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import JSONResponse
fromApp.routers.stocks.routes import router as stocks_router
fromApp.routers.utt.routes import router as utt_router
fromApp.routers.tasks.routes import router as tasks_router
app = FastAPI()
@app.exception_handler(HTTPException)
async def http_exception_handler(request: Request, exc: HTTPException):
return JSONResponse(status_code=exc.status_code, content={"success": False, "message": exc.detail})
@app.exception_handler(Exception)
async def generic_exception_handler(request: Request, exc: Exception):
return JSONResponse(status_code=500, content={"success": False, "message": str(exc)})
app.include_router(stocks_router)
app.include_router(utt_router)
app.include_router(tasks_router)
# routers/tasks/models.py
from tortoise import fields, models
from enum import Enum
class TaskStatus(str, Enum):
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
class ImportTask(models.Model):
id = fields.IntField(pk=True)
task_type = fields.CharField(max_length=50)
status = fields.CharEnumField(TaskStatus, default=TaskStatus.PENDING)
details = fields.JSONField(null=True)
created_at = fields.DatetimeField(auto_now_add=True)
updated_at = fields.DatetimeField(auto_now=True)
class Meta:
table = "import_tasks"
# routers/tasks/schemas.py
from pydantic import BaseModel
from datetime import datetime
from enum import Enum
class TaskStatus(str, Enum):
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
class ImportTaskResponse(BaseModel):
id: int
task_type: str
status: TaskStatus
details: dict | None
created_at: datetime
updated_at: datetime
class ResponseModel(BaseModel):
success: bool
message: str
data: dict | list | None = None
# routers/tasks/routes.py
from fastapi import APIRouter, HTTPException
from .models import ImportTask
from .schemas import ImportTaskResponse, ResponseModel
router = APIRouter(prefix="/tasks", tags=["Tasks"])
@router.get("/", response_model=ResponseModel)
async def list_tasks():
tasks = await ImportTask.all().order_by("-created_at")
return ResponseModel(success=True, message="List of tasks", data=[task for task in tasks])
@router.get("/{task_id}", response_model=ResponseModel)
async def get_task(task_id: int):
task = await ImportTask.get_or_none(id=task_id)
if not task:
raise HTTPException(status_code=404, detail="Task not found")
return ResponseModel(success=True, message="Task found", data=task)
# You would follow the same modular structure for stocks and utt
# Including their own models.py, schemas.py, routes.py, service.py
# The routes would queue background tasks and use task_id for status tracking.
project/
β”œβ”€β”€ main.py
β”œβ”€β”€ routers/
β”‚ β”œβ”€β”€ stocks/
β”‚ β”‚ β”œβ”€β”€ __init__.py
β”‚ β”‚ β”œβ”€β”€ models.py
β”‚ β”‚ β”œβ”€β”€ schemas.py
β”‚ β”‚ β”œβ”€β”€ service.py
β”‚ β”‚ └── routes.py
β”‚ β”œβ”€β”€ utt/
β”‚ β”‚ β”œβ”€β”€ __init__.py
β”‚ β”‚ β”œβ”€β”€ models.py
β”‚ β”‚ β”œβ”€β”€ schemas.py
β”‚ β”‚ β”œβ”€β”€ service.py
β”‚ β”‚ └── routes.py
β”‚ └── tasks/
β”‚ β”œβ”€β”€ __init__.py
β”‚ β”œβ”€β”€ models.py
β”‚ β”œβ”€β”€ schemas.py
β”‚ └── routes.py
# routers/stocks/models.py
from tortoise import fields, models
class Stock(models.Model):
id = fields.IntField(pk=True)
symbol = fields.CharField(max_length=10, unique=True)
name = fields.CharField(max_length=100)
sector = fields.CharField(max_length=100, null=True)
class StockPriceData(models.Model):
id = fields.IntField(pk=True)
stock = fields.ForeignKeyField("models.Stock", related_name="prices")
date = fields.DateField()
opening_price = fields.FloatField()
closing_price = fields.FloatField()
high = fields.FloatField()
low = fields.FloatField()
volume = fields.IntField()
turnover = fields.BigIntField()
shares_in_issue = fields.BigIntField()
market_cap = fields.BigIntField()
class Meta:
unique_together = ("stock", "date")
# routers/stocks/schemas.py
from pydantic import BaseModel
from datetime import date
from typing import List, Optional
fromApp.routers.tasks.schemas import ResponseModel
class StockBase(BaseModel):
symbol: str
name: str
sector: Optional[str] = None
class StockResponse(StockBase):
id: int
class StockPriceResponse(BaseModel):
date: date
opening_price: float
closing_price: float
high: float
low: float
volume: int
turnover: int
shares_in_issue: int
market_cap: int
class StockPriceListResponse(BaseModel):
stock: StockResponse
prices: List[StockPriceResponse]
# routers/stocks/service.py
from datetime import datetime
async def fetch_stock_data(symbol: str):
import httpx
url = f"https://dse.co.tz/api/get/market/prices/for/range/duration?security_code={symbol}&days=5000&class=EQUITY"
async with httpx.AsyncClient() as client:
response = await client.get(url)
if response.status_code == 200:
return response.json().get("data", [])
return []
def parse_stock_api_row(row: dict) -> dict:
return {
"date": datetime.fromisoformat(row["trade_date"]).date(),
"opening_price": row["opening_price"],
"closing_price": row["closing_price"],
"high": row["high"],
"low": row["low"],
"volume": row["volume"],
"turnover": row["turnover"],
"shares_in_issue": row["shares_in_issue"],
"market_cap": row["market_cap"],
}
# routers/stocks/routes.py
from fastapi import APIRouter, HTTPException, BackgroundTasks
from tortoise.transactions import in_transaction
from .models import Stock, StockPriceData
from .schemas import StockResponse, StockPriceListResponse, ResponseModel
from .service import fetch_stock_data, parse_stock_api_row
fromApp.routers.tasks.models import ImportTask
from typing import List
router = APIRouter(prefix="/stocks", tags=["Stocks"])
@router.get("/", response_model=ResponseModel)
async def list_stocks():
stocks = await Stock.all()
return ResponseModel(success=True, message="List of stocks", data=stocks)
@router.get("/{symbol}", response_model=ResponseModel)
async def get_stock_prices(symbol: str):
stock = await Stock.get_or_none(symbol=symbol)
if not stock:
raise HTTPException(status_code=404, detail="Stock not found")
prices = await StockPriceData.filter(stock=stock).order_by("-date")
return ResponseModel(success=True, message="Stock price data", data={"stock": stock, "prices": prices})
@router.post("/import/{symbol}", response_model=ResponseModel)
async def queue_import_stock(symbol: str, background_tasks: BackgroundTasks):
task = await ImportTask.create(task_type="stocks", status="pending", details={"symbol": symbol})
background_tasks.add_task(run_stock_import_task, task.id, symbol)
return ResponseModel(success=True, message="Stock import task queued", data={"task_id": task.id})
async def run_stock_import_task(task_id: int, symbol: str):
try:
await ImportTask.filter(id=task_id).update(status="running")
raw_data = await fetch_stock_data(symbol)
if not raw_data:
await ImportTask.filter(id=task_id).update(status="failed", details={"error": "No data"})
return
first = raw_data[0]
stock, _ = await Stock.get_or_create(symbol=first["company"], defaults={"name": first["fullName"]})
existing_dates = set(await StockPriceData.filter(stock=stock).values_list("date", flat=True))
records = []
for row in raw_data:
parsed = parse_stock_api_row(row)
if parsed["date"] not in existing_dates:
records.append(StockPriceData(stock=stock, **parsed))
async with in_transaction():
await StockPriceData.bulk_create(records, ignore_conflicts=True)
await ImportTask.filter(id=task_id).update(status="completed")
except Exception as e:
await ImportTask.filter(id=task_id).update(status="failed", details={"error": str(e)})
# routers/utt/models.py
from tortoise import fields, models
class UTTFund(models.Model):
id = fields.IntField(pk=True)
symbol = fields.CharField(max_length=20, unique=True)
name = fields.CharField(max_length=100)
class UTTFundData(models.Model):
id = fields.IntField(pk=True)
fund = fields.ForeignKeyField("models.UTTFund", related_name="data")
date = fields.DateField()
nav_per_unit = fields.FloatField()
sale_price_per_unit = fields.FloatField()
repurchase_price_per_unit = fields.FloatField()
outstanding_number_of_units = fields.BigIntField()
net_asset_value = fields.BigIntField()
class Meta:
unique_together = ("fund", "date")
# routers/utt/schemas.py
from pydantic import BaseModel
from datetime import date
from typing import List
fromApp.routers.tasks.schemas import ResponseModel
class UTTFundResponse(BaseModel):
id: int
symbol: str
name: str
class UTTFundDataResponse(BaseModel):
date: date
nav_per_unit: float
sale_price_per_unit: float
repurchase_price_per_unit: float
outstanding_number_of_units: int
net_asset_value: int
class UTTFundListResponse(BaseModel):
fund: UTTFundResponse
data: List[UTTFundDataResponse]
# routers/utt/service.py
from datetime import datetime
import httpx
async def fetch_all_utt_data():
url = "https://example.com/utt/api" # Placeholder
async with httpx.AsyncClient() as client:
response = await client.get(url)
if response.status_code == 200:
return response.json().get("data", [])
return []
def parse_utt_api_row(row: dict) -> dict:
return {
"date": datetime.strptime(row["date_valued"], "%d-%m-%Y").date(),
"nav_per_unit": float(row["nav_per_unit"]),
"sale_price_per_unit": float(row["sale_price_per_unit"]),
"repurchase_price_per_unit": float(row["repurchase_price_per_unit"]),
"outstanding_number_of_units": int(float(row["outstanding_number_of_units"])),
"net_asset_value": int(float(row["net_asset_value"]))
}
# routers/utt/routes.py
from fastapi import APIRouter, BackgroundTasks, HTTPException
from .models import UTTFund, UTTFundData
from .schemas import UTTFundResponse, UTTFundListResponse, ResponseModel
from .service import fetch_all_utt_data, parse_utt_api_row
fromApp.routers.tasks.models import ImportTask
router = APIRouter(prefix="/utt", tags=["UTT"])
@router.get("/", response_model=ResponseModel)
async def list_funds():
funds = await UTTFund.all()
return ResponseModel(success=True, message="List of UTT funds", data=funds)
@router.get("/{symbol}", response_model=ResponseModel)
async def get_fund_data(symbol: str):
fund = await UTTFund.get_or_none(symbol=symbol)
if not fund:
raise HTTPException(status_code=404, detail="Fund not found")
data = await UTTFundData.filter(fund=fund).order_by("-date")
return ResponseModel(success=True, message="Fund data", data={"fund": fund, "data": data})
@router.post("/import-all", response_model=ResponseModel)
async def queue_import_utt(background_tasks: BackgroundTasks):
task = await ImportTask.create(task_type="utt", status="pending", details={})
background_tasks.add_task(run_utt_import_task, task.id)
return ResponseModel(success=True, message="UTT import task queued", data={"task_id": task.id})
async def run_utt_import_task(task_id: int):
from tortoise.transactions import in_transaction
try:
await ImportTask.filter(id=task_id).update(status="running")
raw_data = await fetch_all_utt_data()
if not raw_data:
await ImportTask.filter(id=task_id).update(status="failed", details={"error": "No data"})
return
for row in raw_data:
symbol = row["internal_name"]
name = row["scheme_name"]
fund, _ = await UTTFund.get_or_create(symbol=symbol, defaults={"name": name})
parsed = parse_utt_api_row(row)
exists = await UTTFundData.exists(fund=fund, date=parsed["date"])
if not exists:
await UTTFundData.create(fund=fund, **parsed)
await ImportTask.filter(id=task_id).update(status="completed")
except Exception as e:
await ImportTask.filter(id=task_id).update(status="failed", details={"error": str(e)})