from flask import Flask, request, jsonify from flask_cors import CORS import joblib import numpy as np import json import math import xgboost as xgb import logging app = Flask(__name__) CORS(app) # Allow cross-origin requests (important for frontend integration) # Setup logging logging.basicConfig(level=logging.INFO) # Load models try: rf = joblib.load("rf_model.pkl") xgb_model = xgb.Booster() xgb_model.load_model("xgb_model.json") app.logger.info("✅ Models loaded successfully.") except Exception as e: app.logger.error(f"❌ Error loading models: {e}") raise e # Load tile data with open("tile_catalog.json", "r", encoding="utf-8") as f: tile_catalog = json.load(f) with open("tile_sizes.json", "r", encoding="utf-8") as f: tile_sizes = json.load(f) @app.route('/recommend', methods=['POST']) def recommend(): """ Endpoint for product recommendations Expected JSON payload: { "tile_type": "floor"|"wall", "coverage": float, "area": float, "price_range": [min, max], "preferred_sizes": [size1, size2] (optional) } """ try: data = request.get_json() required_fields = ['tile_type', 'coverage', 'area', 'price_range'] if not all(field in data for field in required_fields): return jsonify({"error": "Missing required fields"}), 400 tile_type = data['tile_type'].lower() if tile_type not in ['floor', 'wall']: return jsonify({"error": "Invalid tile type. Use 'floor' or 'wall'"}), 400 # Validate numeric inputs validate_positive_number(data['coverage'], "coverage") validate_positive_number(data['area'], "area") if (not isinstance(data['price_range'], list) or len(data['price_range']) != 2 or data['price_range'][0] < 0 or data['price_range'][1] <= 0 or data['price_range'][0] >= data['price_range'][1]): return jsonify({"error": "Invalid price range"}), 400 features = prepare_features(data) xgb_pred = xgb_model.predict(xgb.DMatrix(features))[0] rf_pred = rf.predict_proba(features)[0][1] combined_score = (xgb_pred + rf_pred) / 2 recommended_products = filter_products( tile_type=tile_type, min_price=data['price_range'][0], max_price=data['price_range'][1], preferred_sizes=data.get('preferred_sizes', []), min_score=0.5 ) response = { "recommendation_score": round(float(combined_score), 3), "total_matches": len(recommended_products), "recommended_products": recommended_products[:5], "calculation": calculate_requirements(data['area'], data['coverage']) } return jsonify(response) except Exception as e: app.logger.error(f"Error in /recommend: {str(e)}") return jsonify({"error": "Internal server error"}), 500 @app.route('/calculate', methods=['POST']) def calculate(): """ Endpoint for tile calculation Expected JSON payload: { "tile_type": "floor"|"wall", "area": float, "tile_size": "12x12"|etc } """ try: data = request.get_json() if 'tile_type' not in data or 'area' not in data or 'tile_size' not in data: return jsonify({"error": "Missing required fields"}), 400 tile_type = data['tile_type'].lower() if tile_type not in ['floor', 'wall']: return jsonify({"error": "Invalid tile type"}), 400 if data['tile_size'] not in tile_sizes: return jsonify({"error": "Invalid tile size"}), 400 validate_positive_number(data['area'], "area") tile_info = tile_sizes[data['tile_size']] area_per_tile = tile_info['length'] * tile_info['width'] tiles_needed = math.ceil((data['area'] / area_per_tile) * 1.1) tiles_per_box = tile_info.get('tiles_per_box', 10) boxes_needed = math.ceil(tiles_needed / tiles_per_box) matching_products = [ p for p in tile_catalog if p['type'].lower() == tile_type and p['size'] == data['tile_size'] ] return jsonify({ "tile_type": tile_type, "area": data['area'], "tile_size": data['tile_size'], "tiles_needed": tiles_needed, "boxes_needed": boxes_needed, "matching_products": matching_products[:3], "total_matches": len(matching_products) }) except Exception as e: app.logger.error(f"Error in /calculate: {str(e)}") return jsonify({"error": "Internal server error"}), 500 def prepare_features(data): """Prepare feature vector for ML prediction""" tile_type_num = 0 if data['tile_type'] == 'floor' else 1 price_per_sqft = data['price_range'][1] / data['coverage'] budget_efficiency = data['coverage'] / data['price_range'][1] return np.array([[ tile_type_num, data['area'], data['coverage'], data['price_range'][0], data['price_range'][1], price_per_sqft, budget_efficiency ]]) def filter_products(tile_type, min_price, max_price, preferred_sizes, min_score=0.5): """Filter and score products""" filtered = [] for product in tile_catalog: if (product['type'].lower() == tile_type and min_price <= product['price'] <= max_price and (not preferred_sizes or product['size'] in preferred_sizes)): price_score = 1 - ((product['price'] - min_price) / (max_price - min_price + 1e-6)) size_score = 1 if not preferred_sizes or product['size'] in preferred_sizes else 0.5 product_score = (price_score + size_score) / 2 if product_score >= min_score: filtered.append({ **product, "recommendation_score": round(product_score, 2) }) return sorted(filtered, key=lambda x: x['recommendation_score'], reverse=True) def calculate_requirements(area, coverage): """Calculate tile quantities and estimated costs""" min_tiles = math.ceil(area / coverage) suggested_tiles = math.ceil(min_tiles * 1.1) return { "minimum_tiles": min_tiles, "suggested_tiles": suggested_tiles, "estimated_cost_range": [ round(area * 3, 2), # example: ₹3 per sqft round(area * 10, 2) # example: ₹10 per sqft ] } def validate_positive_number(value, field): """Raise ValueError if value is not a positive number""" if not isinstance(value, (int, float)) or value <= 0: raise ValueError(f"{field} must be a positive number") if __name__ == '__main__': app.run(host='0.0.0.0', port=5000, debug=True)