|
from flask import Flask, request, jsonify, send_file |
|
from flask_cors import CORS |
|
import joblib |
|
import xgboost as xgb |
|
import numpy as np |
|
import json |
|
import math |
|
import os |
|
import logging |
|
|
|
app = Flask(__name__) |
|
CORS(app) |
|
logging.basicConfig(level=logging.INFO) |
|
|
|
# Load models |
|
try: |
|
rf = joblib.load("rf_model.pkl") |
|
xgb_model = xgb.Booster() |
|
xgb_model.load_model("xgb_model.json") |
|
app.logger.info("β
Models loaded successfully.") |
|
except Exception as e: |
|
app.logger.error(f"β Error loading models: {e}") |
|
raise e |
|
|
|
# Load tile metadata |
|
with open("tile_catalog.json", "r", encoding="utf-8") as f: |
|
tile_catalog = json.load(f) |
|
|
|
with open("tile_sizes.json", "r", encoding="utf-8") as f: |
|
tile_sizes = json.load(f) |
|
|
|
@app.route("/") |
|
def home(): |
|
return send_file("index.html") |
|
|
|
@app.route('/recommend', methods=['POST']) |
|
def recommend(): |
|
try: |
|
data = request.get_json() |
|
|
|
required = ['tile_type', 'coverage', 'area', 'price_range'] |
|
if not all(k in data for k in required): |
|
return jsonify({"error": "Missing required fields"}), 400 |
|
|
|
tile_type = data['tile_type'].lower() |
|
if tile_type not in ['floor', 'wall']: |
|
return jsonify({"error": "Invalid tile type"}), 400 |
|
|
|
validate_positive_number(data['coverage'], "coverage") |
|
validate_positive_number(data['area'], "area") |
|
|
|
pr = data['price_range'] |
|
if (not isinstance(pr, list) or len(pr) != 2 or pr[0] < 0 or pr[1] <= 0 or pr[0] >= pr[1]): |
|
return jsonify({"error": "Invalid price range"}), 400 |
|
|
|
features = prepare_features(data) |
|
xgb_pred = float(xgb_model.predict(xgb.DMatrix(features))[0]) |
|
rf_pred = float(rf.predict_proba(features)[0][1]) |
|
combined_score = (xgb_pred + rf_pred) / 2 |
|
|
|
recommended = filter_products( |
|
tile_type=tile_type, |
|
min_price=pr[0], |
|
max_price=pr[1], |
|
preferred_sizes=data.get("preferred_sizes", []), |
|
min_score=0.5 |
|
) |
|
|
|
return jsonify({ |
|
"recommendation_score": round(combined_score, 3), |
|
"total_matches": len(recommended), |
|
"recommended_products": recommended[:5], |
|
"calculation": calculate_requirements(data['area'], data['coverage']) |
|
}) |
|
|
|
except Exception as e: |
|
app.logger.error(f"Error in /recommend: {e}") |
|
return jsonify({"error": "Internal server error"}), 500 |
|
|
|
@app.route('/calculate', methods=['POST']) |
|
def calculate(): |
|
try: |
|
data = request.get_json() |
|
|
|
for k in ['tile_type', 'area', 'tile_size']: |
|
if k not in data: |
|
return jsonify({"error": f"Missing field: {k}"}), 400 |
|
|
|
tile_type = data['tile_type'].lower() |
|
if tile_type not in ['floor', 'wall']: |
|
return jsonify({"error": "Invalid tile type"}), 400 |
|
|
|
tile_size_key = data['tile_size'] |
|
if tile_size_key not in tile_sizes: |
|
return jsonify({"error": "Invalid tile size"}), 400 |
|
|
|
validate_positive_number(data['area'], "area") |
|
|
|
tile_info = tile_sizes[tile_size_key] |
|
area_per_tile = tile_info['length'] * tile_info['width'] |
|
tiles_needed = math.ceil((data['area'] / area_per_tile) * 1.1) |
|
tiles_per_box = tile_info.get('tiles_per_box', 10) |
|
boxes_needed = math.ceil(tiles_needed / tiles_per_box) |
|
|
|
matching = [ |
|
p for p in tile_catalog |
|
if p['type'].lower() == tile_type and p['size'] == tile_size_key |
|
] |
|
|
|
return jsonify({ |
|
"tile_type": tile_type, |
|
"area": data['area'], |
|
"tile_size": tile_size_key, |
|
"tiles_needed": tiles_needed, |
|
"boxes_needed": boxes_needed, |
|
"matching_products": matching[:3], |
|
"total_matches": len(matching) |
|
}) |
|
|
|
except Exception as e: |
|
app.logger.error(f"Error in /calculate: {e}") |
|
return jsonify({"error": "Internal server error"}), 500 |
|
|
|
def prepare_features(data): |
|
tile_type_num = 0 if data['tile_type'] == 'floor' else 1 |
|
price_per_sqft = data['price_range'][1] / data['coverage'] |
|
budget_efficiency = data['coverage'] / data['price_range'][1] |
|
|
|
return np.array([[ |
|
tile_type_num, |
|
data['area'], |
|
data['coverage'], |
|
data['price_range'][0], |
|
data['price_range'][1], |
|
price_per_sqft, |
|
budget_efficiency |
|
]]) |
|
|
|
def filter_products(tile_type, min_price, max_price, preferred_sizes, min_score=0.5): |
|
results = [] |
|
for p in tile_catalog: |
|
if ( |
|
p['type'].lower() == tile_type and |
|
min_price <= p['price'] <= max_price and |
|
(not preferred_sizes or p['size'] in preferred_sizes) |
|
): |
|
price_score = 1 - ((p['price'] - min_price) / (max_price - min_price + 1e-6)) |
|
size_score = 1 if not preferred_sizes or p['size'] in preferred_sizes else 0.5 |
|
score = (price_score + size_score) / 2 |
|
if score >= min_score: |
|
results.append({**p, "recommendation_score": round(score, 2)}) |
|
return sorted(results, key=lambda x: x['recommendation_score'], reverse=True) |
|
|
|
def calculate_requirements(area, coverage): |
|
min_tiles = math.ceil(area / coverage) |
|
suggested = math.ceil(min_tiles * 1.1) |
|
return { |
|
"minimum_tiles": min_tiles, |
|
"suggested_tiles": suggested, |
|
"estimated_cost_range": [round(area * 3, 2), round(area * 10, 2)] |
|
} |
|
|
|
def validate_positive_number(value, field): |
|
if not isinstance(value, (int, float)) or value <= 0: |
|
raise ValueError(f"{field} must be a positive number") |
|
|
|
if __name__ == '__main__': |
|
app.run(host="0.0.0.0", port=5000, debug=True) |
|
|