|
from flask import Flask, request, jsonify, send_from_directory |
|
from flask_cors import CORS |
|
import joblib |
|
import numpy as np |
|
import json |
|
import math |
|
import os |
|
import xgboost as xgb |
|
import logging |
|
|
|
|
|
app = Flask(__name__, static_folder='.', static_url_path='') |
|
CORS(app) |
|
logging.basicConfig(level=logging.INFO) |
|
|
|
|
|
try: |
|
rf = joblib.load("rf_model.pkl") |
|
xgb_model = xgb.Booster() |
|
xgb_model.load_model("xgb_model.json") |
|
app.logger.info("β
Models loaded successfully.") |
|
except Exception as e: |
|
app.logger.error(f"β Error loading models: {e}") |
|
raise e |
|
|
|
|
|
with open("tile_catalog.json", "r", encoding="utf-8") as f: |
|
tile_catalog = json.load(f) |
|
|
|
with open("tile_sizes.json", "r", encoding="utf-8") as f: |
|
tile_sizes = json.load(f) |
|
|
|
|
|
@app.route('/') |
|
def serve_index(): |
|
return app.send_static_file('index.html') |
|
|
|
|
|
@app.route('/<path:path>') |
|
def serve_static_files(path): |
|
return send_from_directory('.', path) |
|
|
|
|
|
@app.route('/calculate', methods=['POST']) |
|
def calculate(): |
|
try: |
|
data = request.get_json() |
|
for field in ['tile_type', 'length', 'width', 'tile_size']: |
|
if field not in data: |
|
return jsonify({"error": f"Missing field: {field}"}), 400 |
|
|
|
tile_type = data['tile_type'].lower() |
|
length = float(data['length']) |
|
width = float(data['width']) |
|
validate_positive_number(length, "length") |
|
validate_positive_number(width, "width") |
|
|
|
if tile_type not in ['floor', 'wall']: |
|
return jsonify({"error": "Invalid tile type"}), 400 |
|
if data['tile_size'] not in tile_sizes: |
|
return jsonify({"error": "Invalid tile size"}), 400 |
|
|
|
area = length * width |
|
tile_info = tile_sizes[data['tile_size']] |
|
area_per_tile = tile_info['length'] * tile_info['width'] |
|
tiles_needed = math.ceil((area / area_per_tile) * 1.1) |
|
tiles_per_box = tile_info.get('tiles_per_box', 10) |
|
boxes_needed = math.ceil(tiles_needed / tiles_per_box) |
|
|
|
matching_products = [ |
|
{ |
|
**p, |
|
"link": p.get("url", "#") |
|
} |
|
for p in tile_catalog |
|
if p['type'].lower() == tile_type and p['size'] == data['tile_size'] |
|
] |
|
|
|
return jsonify({ |
|
"tile_type": tile_type, |
|
"tile_size": data['tile_size'], |
|
"length": round(length, 2), |
|
"width": round(width, 2), |
|
"area": round(area, 2), |
|
"tiles_needed": tiles_needed, |
|
"boxes_needed": boxes_needed, |
|
"matching_products": matching_products[:5], |
|
"total_matches": len(matching_products) |
|
}) |
|
|
|
except Exception as e: |
|
app.logger.error(f"Error in /calculate: {str(e)}") |
|
return jsonify({"error": "Internal server error"}), 500 |
|
|
|
|
|
@app.route('/recommend', methods=['POST']) |
|
def recommend(): |
|
try: |
|
data = request.get_json() |
|
required_fields = ['tile_type', 'coverage', 'length', 'width', 'price_range'] |
|
for field in required_fields: |
|
if field not in data: |
|
return jsonify({"error": f"Missing field: {field}"}), 400 |
|
|
|
tile_type = data['tile_type'].lower() |
|
length = float(data['length']) |
|
width = float(data['width']) |
|
validate_positive_number(length, "length") |
|
validate_positive_number(width, "width") |
|
area = length * width |
|
validate_positive_number(area, "area") |
|
coverage = float(data['coverage']) |
|
validate_positive_number(coverage, "coverage") |
|
|
|
if not isinstance(data['price_range'], list) or len(data['price_range']) != 2: |
|
return jsonify({"error": "Invalid price range"}), 400 |
|
|
|
features = prepare_features({ |
|
**data, |
|
"area": area |
|
}) |
|
|
|
xgb_pred = xgb_model.predict(xgb.DMatrix(features))[0] |
|
rf_pred = rf.predict_proba(features)[0][1] |
|
combined_score = (xgb_pred + rf_pred) / 2 |
|
|
|
recommended_products = filter_products( |
|
tile_type=tile_type, |
|
min_price=data['price_range'][0], |
|
max_price=data['price_range'][1], |
|
preferred_sizes=data.get('preferred_sizes', []), |
|
min_score=0.5 |
|
) |
|
|
|
return jsonify({ |
|
"recommendation_score": round(float(combined_score), 3), |
|
"recommended_products": recommended_products[:5], |
|
"calculation": calculate_requirements(area, coverage) |
|
}) |
|
|
|
except Exception as e: |
|
app.logger.error(f"Error in /recommend: {str(e)}") |
|
return jsonify({"error": "Internal server error"}), 500 |
|
|
|
|
|
def prepare_features(data): |
|
tile_type_num = 0 if data['tile_type'] == 'floor' else 1 |
|
price_per_sqft = data['price_range'][1] / data['coverage'] |
|
budget_efficiency = data['coverage'] / data['price_range'][1] |
|
|
|
return np.array([[tile_type_num, data['area'], data['coverage'], |
|
data['price_range'][0], data['price_range'][1], |
|
price_per_sqft, budget_efficiency]]) |
|
|
|
def filter_products(tile_type, min_price, max_price, preferred_sizes, min_score=0.5): |
|
filtered = [] |
|
for product in tile_catalog: |
|
if (product['type'].lower() == tile_type and |
|
min_price <= product['price'] <= max_price and |
|
(not preferred_sizes or product['size'] in preferred_sizes)): |
|
|
|
price_score = 1 - ((product['price'] - min_price) / (max_price - min_price + 1e-6)) |
|
size_score = 1 if not preferred_sizes or product['size'] in preferred_sizes else 0.5 |
|
product_score = (price_score + size_score) / 2 |
|
|
|
if product_score >= min_score: |
|
filtered.append({ |
|
**product, |
|
"recommendation_score": round(product_score, 2), |
|
"link": product.get("url", "#") |
|
}) |
|
|
|
return sorted(filtered, key=lambda x: x['recommendation_score'], reverse=True) |
|
|
|
def calculate_requirements(area, coverage): |
|
min_tiles = math.ceil(area / coverage) |
|
suggested_tiles = math.ceil(min_tiles * 1.1) |
|
return { |
|
"minimum_tiles": min_tiles, |
|
"suggested_tiles": suggested_tiles, |
|
"estimated_cost_range": [ |
|
round(area * 3, 2), |
|
round(area * 10, 2) |
|
] |
|
} |
|
|
|
def validate_positive_number(value, field): |
|
if not isinstance(value, (int, float)) or value <= 0: |
|
raise ValueError(f"{field} must be a positive number") |
|
|
|
|
|
if __name__ == '__main__': |
|
app.run(host='0.0.0.0', port=7860, debug=False) |
|
|