|
import sqlite3 |
|
import json |
|
import uuid |
|
import datetime |
|
import logging |
|
import re |
|
|
|
|
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') |
|
|
|
|
|
try: |
|
from transformers import AutoModelForCausalLM, AutoTokenizer |
|
TRANSFORMERS_AVAILABLE = True |
|
except ImportError: |
|
logging.warning("Transformers library not found. Using fallback parser.") |
|
TRANSFORMERS_AVAILABLE = False |
|
AutoModelForCausalLM = None |
|
AutoTokenizer = None |
|
|
|
|
|
model_name = "distilbert-base-uncased" |
|
if TRANSFORMERS_AVAILABLE: |
|
try: |
|
tokenizer = AutoTokenizer.from_pretrained(model_name) |
|
model = AutoModelForCausalLM.from_pretrained(model_name) |
|
logging.info(f"Loaded model: {model_name}") |
|
except Exception as e: |
|
logging.error(f"Failed to load model {model_name}: {e}") |
|
tokenizer = None |
|
model = None |
|
else: |
|
tokenizer = None |
|
model = None |
|
|
|
|
|
conn = sqlite3.connect("erp.db") |
|
cursor = conn.cursor() |
|
|
|
|
|
cursor.execute(""" |
|
CREATE TABLE IF NOT EXISTS chart_of_accounts ( |
|
account_id TEXT PRIMARY KEY, |
|
account_name TEXT NOT NULL, |
|
account_type TEXT NOT NULL, |
|
parent_id TEXT, |
|
allow_budgeting BOOLEAN, |
|
allow_posting BOOLEAN, |
|
FOREIGN KEY (parent_id) REFERENCES chart_of_accounts(account_id) |
|
) |
|
""") |
|
cursor.execute(""" |
|
CREATE TABLE IF NOT EXISTS journal_entries ( |
|
entry_id TEXT PRIMARY KEY, |
|
date TEXT NOT NULL, |
|
debit_account_id TEXT NOT NULL, |
|
credit_account_id TEXT NOT NULL, |
|
amount REAL NOT NULL, |
|
description TEXT, |
|
FOREIGN KEY (debit_account_id) REFERENCES chart_of_accounts(account_id), |
|
FOREIGN KEY (credit_account_id) REFERENCES chart_of_accounts(account_id) |
|
) |
|
""") |
|
conn.commit() |
|
|
|
|
|
ACCOUNT_RULES = { |
|
"Asset": {"increase": "Debit", "decrease": "Credit"}, |
|
"Liability": {"increase": "Credit", "decrease": "Debit"}, |
|
"Equity": {"increase": "Credit", "decrease": "Debit"}, |
|
"Revenue": {"increase": "Credit", "decrease": "Debit"}, |
|
"Expense": {"increase": "Debit", "decrease": "Credit"} |
|
} |
|
|
|
|
|
def initialize_chart_of_accounts(): |
|
accounts = [ |
|
("1", "Assets", "Asset", None, True, False), |
|
("1.1", "Fixed Assets", "Asset", "1", True, False), |
|
("1.1.1", "Plant", "Asset", "1.1", True, True), |
|
("1.1.2", "Machinery", "Asset", "1.1", True, True), |
|
("1.1.3", "Building", "Asset", "1.1", True, True), |
|
("1.2", "Current Assets", "Asset", "1", True, False), |
|
("1.2.1", "Cash", "Asset", "1.2", True, True), |
|
("1.2.2", "Laptop", "Asset", "1.2", True, True), |
|
("1.2.3", "Inventory", "Asset", "1.2", True, True), |
|
("1.2.4", "Accounts Receivable", "Asset", "1.2", True, True), |
|
("2", "Liabilities", "Liability", None, True, False), |
|
("2.1", "Accounts Payable", "Liability", "2", True, True), |
|
("2.2", "Loan Payable", "Liability", "2", True, True), |
|
("3", "Equity", "Equity", None, True, False), |
|
("3.1", "Owner's Equity", "Equity", "3", True, True), |
|
("3.2", "Drawings", "Equity", "3", True, True), |
|
("4", "Revenue", "Revenue", None, True, False), |
|
("4.1", "Sales Revenue", "Revenue", "4", True, True), |
|
("5", "Expenses", "Expense", None, True, False), |
|
("5.1", "Rent Expense", "Expense", "5", True, True), |
|
("5.2", "Salary Expense", "Expense", "5", True, True), |
|
("5.3", "Office Supplies", "Expense", "5", True, True) |
|
] |
|
cursor.executemany(""" |
|
INSERT OR REPLACE INTO chart_of_accounts |
|
(account_id, account_name, account_type, parent_id, allow_budgeting, allow_posting) |
|
VALUES (?, ?, ?, ?, ?, ?) |
|
""", accounts) |
|
conn.commit() |
|
logging.info("Chart of accounts initialized.") |
|
|
|
|
|
def parse_prompt(prompt): |
|
if model and tokenizer: |
|
try: |
|
input_text = f""" |
|
Parse the following accounting prompt into a JSON object with: |
|
- debit: {{account, type, amount}} |
|
- credit: {{account, type, amount}} |
|
- payment_method: 'cash', 'credit', or null |
|
Prompt: {prompt} |
|
""" |
|
inputs = tokenizer(input_text, return_tensors="pt") |
|
outputs = model.generate(**inputs, max_length=300) |
|
response = tokenizer.decode(outputs[0], skip_special_tokens=True) |
|
return json.loads(response) |
|
except Exception as e: |
|
logging.warning(f"Model parsing failed: {e}. Using fallback parser.") |
|
|
|
|
|
prompt_lower = prompt.lower().strip() |
|
amount = None |
|
|
|
match = re.search(r'\$[\d,.]+', prompt_lower) |
|
if match: |
|
try: |
|
amount = float(match.group().replace('$', '').replace(',', '')) |
|
except ValueError: |
|
logging.error("Invalid amount format.") |
|
return None |
|
|
|
if not amount: |
|
logging.error("No amount found in prompt.") |
|
return None |
|
|
|
|
|
account_mappings = { |
|
"laptop": ("Laptop", "Asset"), |
|
"inventory": ("Inventory", "Asset"), |
|
"machinery": ("Machinery", "Asset"), |
|
"building": ("Building", "Asset"), |
|
"plant": ("Plant", "Asset"), |
|
"office supplies": ("Office Supplies", "Expense"), |
|
"cash": ("Cash", "Asset"), |
|
"receivable": ("Accounts Receivable", "Asset"), |
|
"sold goods": ("Sales Revenue", "Revenue"), |
|
"sales": ("Sales Revenue", "Revenue"), |
|
"rent": ("Rent Expense", "Expense"), |
|
"salary": ("Salary Expense", "Expense"), |
|
"paid": ("Cash", "Asset"), |
|
"bought": ("Laptop", "Asset"), |
|
"purchased": ("Laptop", "Asset") |
|
} |
|
|
|
debit_account = None |
|
debit_type = None |
|
credit_account = None |
|
credit_type = None |
|
payment_method = None |
|
|
|
|
|
for keyword, (account, acc_type) in account_mappings.items(): |
|
if keyword in prompt_lower: |
|
if keyword in ["bought", "purchased"]: |
|
|
|
for asset in ["laptop", "inventory", "machinery", "building", "plant", "office supplies"]: |
|
if asset in prompt_lower: |
|
debit_account, debit_type = account_mappings[asset] |
|
break |
|
if not debit_account: |
|
debit_account, debit_type = account, acc_type |
|
elif keyword in ["rent", "salary", "office supplies"]: |
|
debit_account, debit_type = account, acc_type |
|
elif keyword in ["sold goods", "sales"]: |
|
debit_account, debit_type = "Accounts Receivable", "Asset" |
|
credit_account, credit_type = account, acc_type |
|
break |
|
|
|
|
|
if "cash" in prompt_lower: |
|
credit_account, credit_type = "Cash", "Asset" |
|
payment_method = "cash" |
|
elif "credit" in prompt_lower: |
|
credit_account, credit_type = "Accounts Payable", "Liability" |
|
payment_method = "credit" |
|
elif debit_account and not credit_account: |
|
|
|
return {"debit": {"account": debit_account, "type": debit_type, "amount": amount}, "credit": None, "payment_method": None} |
|
|
|
if debit_account and credit_account: |
|
return { |
|
"debit": {"account": debit_account, "type": debit_type, "amount": amount}, |
|
"credit": {"account": credit_account, "type": credit_type, "amount": amount}, |
|
"payment_method": payment_method |
|
} |
|
logging.error("Prompt not recognized.") |
|
return None |
|
|
|
|
|
def generate_journal_entry(prompt, follow_up_response=None): |
|
parsed = parse_prompt(prompt) |
|
if not parsed: |
|
return "Unable to parse prompt. Please provide more details." |
|
|
|
debit_account = parsed["debit"]["account"] |
|
amount = parsed["debit"]["amount"] |
|
payment_method = parsed.get("payment_method") |
|
|
|
|
|
if not payment_method and not follow_up_response: |
|
return {"status": "clarify", "message": "Was this bought on cash or credit? (cash/credit)", "original_prompt": prompt} |
|
|
|
|
|
credit_account = None |
|
credit_type = None |
|
if follow_up_response: |
|
follow_up_lower = follow_up_response.lower() |
|
if follow_up_lower == "cash": |
|
credit_account, credit_type = "Cash", "Asset" |
|
elif follow_up_lower == "credit": |
|
credit_account, credit_type = "Accounts Payable", "Liability" |
|
else: |
|
return "Invalid response. Please specify 'cash' or 'credit'." |
|
elif payment_method == "cash": |
|
credit_account, credit_type = parsed["credit"]["account"], parsed["credit"]["type"] |
|
elif payment_method == "credit": |
|
credit_account, credit_type = "Accounts Payable", "Liability" |
|
else: |
|
return "Invalid payment method specified." |
|
|
|
|
|
cursor.execute("SELECT account_id, account_type, allow_posting FROM chart_of_accounts WHERE account_name = ?", (debit_account,)) |
|
debit_result = cursor.fetchone() |
|
cursor.execute("SELECT account_id, account_type, allow_posting FROM chart_of_accounts WHERE account_name = ?", (credit_account,)) |
|
credit_result = cursor.fetchone() |
|
|
|
if not debit_result or not credit_result: |
|
return "One or both accounts not found in chart of accounts." |
|
if not debit_result[2] or not credit_result[2]: |
|
return "Posting not allowed for one or both accounts." |
|
|
|
|
|
if debit_result[1] != parsed["debit"]["type"] or credit_result[1] != credit_type: |
|
return "Account type mismatch." |
|
|
|
|
|
entry_id = str(uuid.uuid4()) |
|
date = datetime.datetime.now().isoformat() |
|
cursor.execute(""" |
|
INSERT INTO journal_entries (entry_id, date, debit_account_id, credit_account_id, amount, description) |
|
VALUES (?, ?, ?, ?, ?, ?) |
|
""", (entry_id, date, debit_result[0], credit_result[0], amount, prompt)) |
|
conn.commit() |
|
logging.info(f"Journal entry created: Debit {debit_account} ${amount}, Credit {credit_account} ${amount}") |
|
|
|
return f"Journal Entry Created: Debit {debit_account} ${amount}, Credit {credit_account} ${amount}" |
|
|
|
|
|
def generate_t_account(account_name): |
|
cursor.execute("SELECT account_id FROM chart_of_accounts WHERE account_name = ?", (account_name,)) |
|
account_id = cursor.fetchone() |
|
if not account_id: |
|
logging.error(f"Account {account_name} not found.") |
|
return "Account not found." |
|
|
|
account_id = account_id[0] |
|
try: |
|
cursor.execute(""" |
|
SELECT date, amount, description, 'Debit' as type FROM journal_entries WHERE debit_account_id = ? |
|
UNION |
|
SELECT date, amount, description, 'Credit' as type FROM journal_entries WHERE credit_account_id = ? |
|
ORDER BY date |
|
""", (account_id, account_id)) |
|
entries = cursor.fetchall() |
|
logging.info(f"Retrieved {len(entries)} entries for T-account: {account_name}") |
|
except sqlite3.Error as e: |
|
logging.error(f"SQL error in generate_t_account: {e}") |
|
return "Error retrieving T-account data." |
|
|
|
t_account = f"T-Account for {account_name}\n{'='*50}\n{'Debit':<20} | {'Credit':<20} | Description\n{'-'*50}\n" |
|
debit_total = 0 |
|
credit_total = 0 |
|
for date, amount, desc, entry_type in entries: |
|
if entry_type == "Debit": |
|
t_account += f"${amount:<19} | {'':<20} | {desc}\n" |
|
debit_total += amount |
|
else: |
|
t_account += f"{'':<20} | ${amount:<19} | {desc}\n" |
|
credit_total += amount |
|
t_account += f"{'-'*50}\nTotal Debit: ${debit_total:<10} | Total Credit: ${credit_total}\n" |
|
|
|
return t_account |
|
|
|
|
|
def main(): |
|
initialize_chart_of_accounts() |
|
print("AI ERP System: Enter accounting prompts or 't-account <account_name>' to view T-accounts. Type 'exit' to quit.") |
|
|
|
pending_prompt = None |
|
while True: |
|
try: |
|
prompt = input("> ").strip() |
|
if prompt.lower() == "exit": |
|
break |
|
if prompt.lower().startswith("t-account "): |
|
account_name = prompt[10:].strip() |
|
if account_name: |
|
print(generate_t_account(account_name)) |
|
else: |
|
print("Please specify an account name.") |
|
continue |
|
|
|
|
|
if pending_prompt: |
|
result = generate_journal_entry(pending_prompt, prompt) |
|
pending_prompt = None |
|
else: |
|
result = generate_journal_entry(prompt) |
|
|
|
if isinstance(result, dict) and result["status"] == "clarify": |
|
print(result["message"]) |
|
pending_prompt = result["original_prompt"] |
|
else: |
|
print(result) |
|
|
|
except KeyboardInterrupt: |
|
print("\nExiting...") |
|
break |
|
except Exception as e: |
|
logging.error(f"Error processing prompt: {e}") |
|
print("An error occurred. Please try again.") |
|
|
|
conn.close() |
|
|
|
if __name__ == "__main__": |
|
main() |