ChintanSatva's picture
Update app.py
3666246 verified
raw
history blame
9.05 kB
from fastapi import FastAPI, HTTPException
import logging
import json
import os
from pydantic import BaseModel
from transformers import AutoModelForCausalLM, AutoTokenizer
import torch
import psutil
import cachetools
import hashlib
# Set environment variable for cache
os.environ["HF_HOME"] = "/app/cache"
app = FastAPI()
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# Initialize BitNet model and tokenizer
try:
model_name = "1bitLLM/bitnet_b1_58-3B"
tokenizer = AutoTokenizer.from_pretrained("hf-internal-testing/llama-tokenizer", cache_dir="/app/cache")
model = AutoModelForCausalLM.from_pretrained(
model_name,
torch_dtype=torch.bfloat16, # Optimized for CPU
device_map="cpu",
low_cpu_mem_usage=True,
cache_dir="/app/cache",
trust_remote_code=True
)
except Exception as e:
logger.error(f"Failed to load BitNet model: {str(e)}")
raise HTTPException(status_code=500, detail=f"BitNet model initialization failed: {str(e)}")
# In-memory cache (1-hour TTL)
structured_data_cache = cachetools.TTLCache(maxsize=100, ttl=3600)
def log_memory_usage():
"""Log current memory usage."""
process = psutil.Process()
mem_info = process.memory_info()
return f"Memory usage: {mem_info.rss / 1024 / 1024:.2f} MB"
def get_text_hash(text: str):
"""Generate MD5 hash of text."""
return hashlib.md5(text.encode('utf-8')).hexdigest()
# Simplified categories (reference only, not in prompt)
ALLOWED_CATEGORIES = [
{"name": "income", "subcategories": ["dividends", "interest earned", "retirement pension", "tax refund", "unemployment", "wages", "other income"]},
{"name": "transfer in", "subcategories": ["cash advances and loans", "deposit", "investment and retirement funds", "savings", "account transfer", "other transfer in"]},
{"name": "transfer out", "subcategories": ["investment and retirement funds", "savings", "withdrawal", "account transfer", "other transfer out"]},
{"name": "loan payments", "subcategories": ["car payment", "credit card payment", "personal loan payment", "mortgage payment", "student loan payment", "other payment"]},
{"name": "bank fees", "subcategories": ["atm fees", "foreign transaction fees", "insufficient funds", "interest charge", "overdraft fees", "other bank fees"]},
{"name": "entertainment", "subcategories": ["casinos and gambling", "music and audio", "sporting events amusement parks and museums", "tv and movies", "video games", "other entertainment"]},
{"name": "food and drink", "subcategories": ["beer wine and liquor", "coffee", "fast food", "groceries", "restaurant", "vending machines", "other food and drink"]},
{"name": "general merchandise", "subcategories": ["bookstores and newsstands", "clothing and accessories", "convenience stores", "department stores", "discount stores", "electronics", "gifts and novelties", "office supplies", "online marketplaces", "pet supplies", "sporting goods", "superstores", "tobacco and vape", "other general merchandise"]},
{"name": "home improvement", "subcategories": ["furniture", "hardware", "repair and maintenance", "security", "other home improvement"]},
{"name": "medical", "subcategories": ["dental care", "eye care", "nursing care", "pharmacies and supplements", "primary care", "veterinary services", "other medical"]},
{"name": "personal care", "subcategories": ["gyms and fitness centers", "hair and beauty", "laundry and dry cleaning", "other personal care"]},
{"name": "general services", "subcategories": ["accounting and financial planning", "automotive", "childcare", "consulting and legal", "education", "insurance", "postage and shipping", "storage", "other general services"]},
{"name": "government and nonprofit", "subcategories": ["donations", "government departments and agencies", "tax payment", "other government and nonprofit"]},
{"name": "transportation", "subcategories": ["bikes and scooters", "gas", "parking", "public transit", "taxis and ride shares", "tolls", "other transportation"]},
{"name": "travel", "subcategories": ["flights", "lodging", "rental cars", "other travel"]},
{"name": "rent and utilities", "subcategories": ["gas and electricity", "internet and cable", "rent", "sewage and waste management", "telephone", "water", "other utilities"]},
{"name": "software and technology", "subcategories": ["software subscriptions", "cloud services", "hardware purchases", "online tools", "it support"]}
]
class TransactionRequest(BaseModel):
description: str
amount: float
model: str = "BITNET"
apiKey: str = None
async def categorize_with_bitnet(description: str, amount: float):
"""Categorize transaction using BitNet."""
logger.info(f"Processing transaction: {description}, amount: {amount}, {log_memory_usage()}")
# Create cache key
text = f"{description}|{amount}"
text_hash = get_text_hash(text)
if text_hash in structured_data_cache:
logger.info(f"Cache hit for transaction: {description}, {log_memory_usage()}")
return structured_data_cache[text_hash]
try:
# Simplified prompt
prompt = f"""Categorize this transaction into a category and subcategory with confidence scores (0 to 1). Use 'income' for positive amounts. If unsure, use confidence 0.7 and 'miscellaneous'/'other' if no match. Output only JSON.
Description: {description}
Amount: {amount}
{{
"category": "",
"subcategory": "",
"category_confidence": 0.0,
"subcategory_confidence": 0.0
}}"""
inputs = tokenizer(prompt, return_tensors="pt").to("cpu")
outputs = model.generate(
**inputs,
max_new_tokens=50, # Further reduced for speed
do_sample=False,
num_beams=1
)
json_str = tokenizer.decode(outputs[0], skip_special_tokens=True)
json_start = json_str.rfind("{")
json_end = json_str.rfind("}") + 1
result = json.loads(json_str[json_start:json_end])
# Normalize category and subcategory
def normalize(s):
return s.strip().lower().replace(" +", " ") if s else ""
category_name = normalize(result.get("category", ""))
subcategory_name = normalize(result.get("subcategory", ""))
matched_category = next((cat for cat in ALLOWED_CATEGORIES if normalize(cat["name"]) == category_name), None)
if not matched_category:
matched_category = next((cat for cat in ALLOWED_CATEGORIES if normalize(cat["name"]) == "miscellaneous"), {"name": "miscellaneous", "subcategories": ["other"]})
category_name = "miscellaneous"
matched_subcategory = ""
if matched_category:
matched_subcategory = next((sub for sub in matched_category["subcategories"] if normalize(sub) == subcategory_name), "")
if not matched_subcategory:
matched_subcategory = next((sub for sub in matched_category["subcategories"] if "other" in normalize(sub)), matched_category["subcategories"][0])
# Enforce income for positive amounts
if amount > 0:
matched_category = next((cat for cat in ALLOWED_CATEGORIES if cat["name"] == "income"), None)
category_name = "income"
matched_subcategory = next((sub for sub in matched_category["subcategories"] if normalize(sub) == subcategory_name), "other income")
category_result = {
"category": matched_category["name"] if matched_category else "miscellaneous",
"subcategory": matched_subcategory,
"category_confidence": float(result.get("category_confidence", 0.7)),
"subcategory_confidence": float(result.get("subcategory_confidence", 0.7))
}
structured_data_cache[text_hash] = category_result
logger.info(f"BitNet categorization completed for {description}, {log_memory_usage()}")
return category_result
except Exception as e:
logger.error(f"BitNet categorization failed for {description}: {str(e)}, {log_memory_usage()}")
return {
"category": "miscellaneous",
"subcategory": "other",
"category_confidence": 0.0,
"subcategory_confidence": 0.0,
"error": f"BitNet categorization failed: {str(e)}"
}
@app.post("/categorize")
async def categorize_transaction(request: TransactionRequest):
"""Categorize a financial transaction."""
logger.info(f"Received request: description={request.description}, amount={request.amount}, model={request.model}, {log_memory_usage()}")
if request.model != "BITNET":
return {
"category": "miscellaneous",
"subcategory": "other",
"category_confidence": 0.0,
"subcategory_confidence": 0.0,
"error": "Only BITNET model is supported"
}
result = await categorize_with_bitnet(request.description, request.amount)
return result