from fastapi import FastAPI, HTTPException from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel from typing import Optional, List import pandas as pd import pickle import os from datetime import datetime import uvicorn os.environ["MPLCONFIGDIR"] = "/tmp" import pandas as pd # Import models from models.market_analysis_model import MarketAnalysisModel from models.recommender_model import PropertyRecommender import logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Import your valuation model from pickle file try: model_path = os.path.join(os.path.dirname(__file__), "trained_models/valuation_model.pkl") if not os.path.exists(model_path): model_path = "trained_models/valuation_model.pkl" logger.info(f"Loading valuation model from: {model_path}") with open(model_path, 'rb') as f: model = pickle.load(f) logger.info("Successfully loaded valuation model") except Exception as e: logger.error(f"Error loading valuation model: {str(e)}") class FallbackModel: def predict(self, X): return [X['Square Footage'].values[0] * 10000] model = FallbackModel() logger.info("Using fallback valuation model") # Initialize other models try: data_path = os.path.join(os.path.dirname(__file__), "data/delhi_market_analysis_data.csv") if not os.path.exists(data_path): data_path = "data/delhi_market_analysis_data.csv" logger.info(f"Loading market analysis data from: {data_path}") market_analysis_model = MarketAnalysisModel(data_path=data_path) logger.info("Successfully loaded market analysis model") except Exception as e: logger.error(f"Error loading market analysis model: {str(e)}") market_analysis_model = None try: data_path = os.path.join(os.path.dirname(__file__), "data/delhi_real_estate_recommender.csv") if not os.path.exists(data_path): data_path = "data/delhi_real_estate_recommender.csv" logger.info(f"Loading recommender data from: {data_path}") recommender_model = PropertyRecommender(data_path=data_path) logger.info("Successfully loaded recommender model") except Exception as e: logger.error(f"Error loading recommender model: {str(e)}") recommender_model = None app = FastAPI() logger.info(f"Current working directory: {os.getcwd()}") logger.info(f"Files in backend directory: {os.listdir('.')}") logger.info(f"Files in data directory: {os.listdir('./data') if os.path.exists('./data') else 'No data directory found'}") logger.info(f"Files in trained_models directory: {os.listdir('./trained_models') if os.path.exists('./trained_models') else 'No trained_models directory found'}") # Configure CORS app.add_middleware( CORSMiddleware, allow_origins=[ "https://prototype-2-hardikkandpals-projects.vercel.app", "https://hardik8588-real-estate.hf.space" ], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Modify the file paths to use relative paths that will work in deployment # Instead of hardcoded absolute paths # Add a root endpoint @app.get("/") async def root(): # Check if models are loaded models_status = { "valuation_model": str(type(model)), "market_analysis_model": str(type(market_analysis_model)), "recommender_model": str(type(recommender_model)) } return { "message": "Welcome to AI RealEstate API", "models_status": models_status, "endpoints": { "Property Valuation": "/api/valuation", "Market Analysis": "/api/market-analysis", "Property Recommendations": "/api/property-recommendations", "Available Locations": "/api/locations", "Test Endpoint": "/api/test" } } class PropertyValuationRequest(BaseModel): property_type: str neighborhood: str bedrooms: int bathrooms: int square_footage: int lot_size: int year_built: int renovation_status: str renovation_year: Optional[int] = None property_features: str # New request models for other endpoints class MarketAnalysisRequest(BaseModel): location: Optional[str] = None months: int = 12 class Config: # This will validate and convert the input types validate_assignment = True # Allow extra fields to be ignored extra = "ignore" class PropertyRecommendationRequest(BaseModel): budget: float location_preference: Optional[str] = None property_type: Optional[str] = None min_bedrooms: Optional[int] = None min_bathrooms: Optional[int] = None desired_amenities: Optional[List[str]] = None @app.post("/api/valuation") async def predict_property_value(request: PropertyValuationRequest): import time start_time = time.time() try: # logger.info received data for debugging logger.info(f"Received data: {request}") logger.info(f"Time after receiving data: {time.time() - start_time:.2f} seconds") # Clean up string values by removing any extra quotes neighborhood = request.neighborhood.strip('"') property_type = request.property_type.strip('"') renovation_status = request.renovation_status.strip('"') property_features = request.property_features.strip('"') logger.info(f"Time after cleaning strings: {time.time() - start_time:.2f} seconds") # Prepare the input data for the model input_data = { 'Property Type': property_type, 'Neighborhood': neighborhood, 'Bedrooms': request.bedrooms, 'Bathrooms': request.bathrooms, 'Square Footage': request.square_footage, 'Lot Size': request.lot_size, 'Year Built': request.year_built, 'Renovation Status': renovation_status, 'Renovation Year': request.renovation_year if request.renovation_year else request.year_built, 'Property Features': property_features } # logger.info formatted data for debugging logger.info(f"Formatted data for model: {input_data}") logger.info(f"Time after formatting data: {time.time() - start_time:.2f} seconds") # Convert to DataFrame (assuming your model expects a DataFrame) input_df = pd.DataFrame([input_data]) logger.info(f"Time after creating DataFrame: {time.time() - start_time:.2f} seconds") # Check if model is loaded correctly logger.info(f"Model type: {type(model)}") # Make prediction with timeout logger.info(f"Starting prediction...") prediction_start = time.time() # Try a simple prediction to see if it works try: predicted_price = model.predict(input_df)[0] logger.info(f"Prediction completed in {time.time() - prediction_start:.2f} seconds") logger.info(f"Predicted price: {predicted_price}") except Exception as pred_error: logger.info(f"Error during model prediction: {str(pred_error)}") # Try a fallback prediction method logger.info(f"Trying fallback prediction...") # Simple fallback calculation based on square footage predicted_price = request.square_footage * 10000 # Simple estimate logger.info(f"Used fallback prediction: {predicted_price}") logger.info(f"Total processing time: {time.time() - start_time:.2f} seconds") # Return a more detailed response return { "predicted_price": float(predicted_price), "status": "success", "processing_time": f"{time.time() - start_time:.2f} seconds" } except Exception as e: end_time = time.time() logger.error(f"Error during prediction: {str(e)}") logger.info(f"Error occurred after {end_time - start_time:.2f} seconds") # Return a fallback response with an error return { "predicted_price": request.square_footage * 10000, # Simple fallback estimate "status": "error", "error": str(e), "note": "Using fallback estimation due to error" } @app.post("/api/market-analysis") async def get_market_analysis(request: MarketAnalysisRequest): try: logger.info(f"Starting market analysis for location: {request.location}, months: {request.months}") # Validate months parameter months = request.months if not isinstance(months, int) or months <= 0: months = 12 logger.info(f"Invalid months value: {request.months}, using default: 12") if market_analysis_model is None: raise ValueError("Market analysis model not initialized") # Add debug information about the location logger.info(f"Location type: {type(request.location)}, value: {request.location}") # Normalize location name to handle case sensitivity and extra spaces normalized_location = request.location.strip() if request.location else None # Try to get market trends with error handling try: trends_data = market_analysis_model.get_market_trends( location=normalized_location, months=months ) logger.info(f"Successfully retrieved trends data for {normalized_location}") except Exception as trend_error: logger.info(f"Error getting trends data: {str(trend_error)}") # Try to get fallback data from the model try: trends_data = market_analysis_model._get_fallback_data(normalized_location) logger.info(f"Using fallback data for {normalized_location}") except Exception as fallback_error: logger.info(f"Error getting fallback data: {str(fallback_error)}") # Use hardcoded fallback as last resort trends_data = get_hardcoded_fallback_data(normalized_location) # Debug the returned data structure logger.info(f"Trends data structure: {type(trends_data)}") if isinstance(trends_data, dict): logger.info(f"Trends data keys: {list(trends_data.keys())}") # Check if the expected keys exist for key in ["marketTrends", "hotNeighborhoods", "insights", "charts"]: logger.info(f"Has {key}: {key in trends_data}") # If the data structure doesn't match what's expected, transform it if "marketTrends" not in trends_data and "marketMetrics" in trends_data: logger.info(f"Transforming marketMetrics to marketTrends") trends_data["marketTrends"] = trends_data.pop("marketMetrics") # Ensure all growth values in hotNeighborhoods are strings with % sign if "hotNeighborhoods" in trends_data and isinstance(trends_data["hotNeighborhoods"], list): for neighborhood in trends_data["hotNeighborhoods"]: if "growth" in neighborhood and not isinstance(neighborhood["growth"], str): neighborhood["growth"] = f"{neighborhood['growth']}%" # Ensure all change values in marketTrends are strings with % sign if "marketTrends" in trends_data and isinstance(trends_data["marketTrends"], list): for trend in trends_data["marketTrends"]: if "change" in trend: # Convert to string with % sign if it's not already a string ending with % if not (isinstance(trend["change"], str) and trend["change"].endswith('%')): # Format as percentage string trend["change"] = f"{float(trend['change']):.1f}%" # Add location to the response for verification trends_data["analyzedLocation"] = normalized_location # Customize insights based on location if they're generic if "insights" in trends_data and isinstance(trends_data["insights"], list): location_specific_insights = [] for insight in trends_data["insights"]: # Check if the insight already mentions the location if normalized_location and normalized_location.lower() not in insight.lower(): # Replace generic location references with the specific location modified_insight = insight for generic_loc in ["Delhi", "the area", "this neighborhood"]: modified_insight = modified_insight.replace(generic_loc, normalized_location) location_specific_insights.append(modified_insight) else: location_specific_insights.append(insight) trends_data["insights"] = location_specific_insights # Limit the size of chart data to prevent frontend rendering issues # Commenting out this block to allow full base64 image data to be sent to frontend """ if "charts" in trends_data and isinstance(trends_data["charts"], dict): for chart_key in trends_data["charts"]: if isinstance(trends_data["charts"][chart_key], str) and len(trends_data["charts"][chart_key]) > 10000: # Truncate or replace with a placeholder trends_data["charts"][chart_key] = "Chart data too large for frontend rendering" """ # Return the data with a success status return { "status": "success", "data": trends_data } except Exception as e: import traceback logger.error(f"Error in market analysis: {str(e)}") logger.info(traceback.format_exc()) # Use hardcoded fallback data fallback_data = get_hardcoded_fallback_data(request.location) return { "status": "error", "error": str(e), "data": fallback_data } def get_hardcoded_fallback_data(location=None): """Return hardcoded fallback data with location-specific customization""" location_text = location if location else "Delhi" # Base fallback data fallback_data = { "marketTrends": [ {"metric": "Median Home Price", "value": 12500000, "change": "5.2%", "isPositive": True}, {"metric": "Number of Sales", "value": 245, "change": "-2.8%", "isPositive": False}, {"metric": "Days on Market", "value": 32, "change": "-15.8%", "isPositive": True}, {"metric": "Price per Square Foot", "value": 9800, "change": "3.5%", "isPositive": True}, {"metric": "Inventory Levels", "value": 320, "change": "8.2%", "isPositive": False}, {"metric": "Year-over-Year Price Change", "value": 5.2, "change": "5.2%", "isPositive": True} ], "hotNeighborhoods": [ {"name": "Vasant Kunj", "growth": "8.5%", "medianPrice": 15800000, "pricePerSqFt": 12500}, {"name": "Greater Kailash", "growth": "7.2%", "medianPrice": 18500000, "pricePerSqFt": 14200}, {"name": "Dwarka", "growth": "6.8%", "medianPrice": 9800000, "pricePerSqFt": 8500} ], "insights": [ f"The {location_text} real estate market has shown strong resilience with a 5.2% increase in median home prices.", f"Luxury properties in {location_text} continue to appreciate faster than other segments.", f"Inventory levels in {location_text} have increased by 8.2%, indicating a potential shift towards a buyer's market.", f"Properties in {location_text} are selling 15% faster than the market average." ], "charts": {} } # Customize data based on location if needed if location == "Nehru Place": fallback_data["marketTrends"][0]["value"] = 13200000 fallback_data["marketTrends"][0]["change"] = "6.1%" fallback_data["insights"][0] = f"{location} real estate has shown 6.1% growth in the past year." elif location == "Vasant Kunj": fallback_data["marketTrends"][0]["value"] = 15800000 fallback_data["marketTrends"][0]["change"] = "8.5%" fallback_data["insights"][0] = f"{location} real estate has shown 8.5% growth in the past year." elif location == "Greater Kailash": fallback_data["marketTrends"][0]["value"] = 18500000 fallback_data["marketTrends"][0]["change"] = "7.2%" fallback_data["insights"][0] = f"{location} real estate has shown 7.2% growth in the past year." return fallback_data class PropertyRecommendationRequest(BaseModel): propertyType: Optional[str] = None budget: Optional[str] = None location: Optional[str] = None bedrooms: Optional[str] = None bathrooms: Optional[str] = None amenities: Optional[List[str]] = None @app.post("/api/property-recommendations") async def get_property_recommendations(request: PropertyRecommendationRequest): try: preferences = request.dict() logger.info(f"Received preferences: {preferences}") recommendations = recommender_model.get_recommendations(preferences) logger.info(f"Number of recommendations: {len(recommendations)}") return {"recommendations": recommendations} except Exception as e: logger.error(f"Error in get_property_recommendations: {e}", exc_info=True) raise HTTPException(status_code=500, detail=str(e)) @app.get("/api/locations") async def get_locations(): try: # Try different paths to find the data file logger.info(f"Current working directory: {os.getcwd()}") data_paths = [ os.path.join(os.path.dirname(__file__), "data/delhi_market_analysis_data.csv"), "data/delhi_market_analysis_data.csv" ] for path in data_paths: logger.info(f"Checking path: {path} Exists: {os.path.exists(path)}") df = None for path in data_paths: if os.path.exists(path): logger.info(f"Loading locations from: {path}") df = pd.read_csv(path) break if df is not None: locations = df["Neighborhood"].dropna().unique().tolist() return {"status": "success", "locations": locations} else: raise FileNotFoundError("Could not find the data file") except Exception as e: logger.error(f"Error getting locations: {str(e)}") raise HTTPException(status_code=500, detail=f"Could not load locations: {str(e)}") @app.get("/api/test") async def test_endpoint(): return {"message": "Test endpoint works!"} # Add these new models class LocationCoordinates(BaseModel): lat: float lng: float # Add these new endpoints @app.get("/api/home-stats") async def get_home_stats(): try: if market_analysis_model is None or not hasattr(market_analysis_model, 'df') or market_analysis_model.df is None: raise ValueError("Market analysis model not initialized or data not loaded") # Get the latest data for each neighborhood latest_data = market_analysis_model.df.loc[market_analysis_model.df.groupby('Neighborhood')['Time Period'].idxmax()] # Calculate average price avg_price = int(latest_data['Median Home Price'].mean()) # Calculate average price growth price_growth = round(latest_data['Year-over-Year Price Change'].mean(), 1) # Get popular locations based on price growth top_locations = latest_data.sort_values('Year-over-Year Price Change', ascending=False).head(5) popular_locations = [ {"name": row['Neighborhood'], "growth": f"{row['Year-over-Year Price Change']:.1f}%"} for _, row in top_locations.iterrows() ] # Generate a simple price trend chart import matplotlib.pyplot as plt import io import base64 import numpy as np from datetime import datetime, timedelta months = 12 base_price = avg_price / 1e6 # Convert to millions for chart end_date = datetime.now() dates = [(end_date - timedelta(days=30*i)).strftime('%Y-%m-%d') for i in range(months)] dates.reverse() np.random.seed(42) growth_rate = 0.008 random_fluctuation = 0.02 prices = [] current_price = base_price for i in range(months): random_factor = 1 + np.random.uniform(-random_fluctuation, random_fluctuation) current_price *= (1 + growth_rate) * random_factor prices.append(current_price) plt.figure(figsize=(10, 6)) plt.plot(dates, prices, marker='o', color='#3b82f6') plt.title('Average Home Price Trend (Last 12 Months)', fontsize=14) plt.ylabel('Price (Million ₹)', fontsize=12) plt.xticks(rotation=45) plt.grid(True, alpha=0.3) plt.tight_layout() buffer = io.BytesIO() plt.savefig(buffer, format='png', dpi=100) buffer.seek(0) price_trend_chart = base64.b64encode(buffer.getvalue()).decode('utf-8') plt.close() buffer.close() return { "total_properties": int(market_analysis_model.df['Property ID'].nunique()) if 'Property ID' in market_analysis_model.df.columns else 1000, "avg_price": avg_price, "price_growth": price_growth, "popular_locations": popular_locations, "price_trend_chart": price_trend_chart } except Exception as e: logger.error(f"Error getting home stats: {str(e)}") raise HTTPException(status_code=500, detail=f"Could not load home stats: {str(e)}") @app.get("/api/featured-properties") async def get_featured_properties(): try: # Try different paths to find the data file data_paths = [ "data/delhi_real_estate_recommender.csv" ] properties_df = None for path in data_paths: try: if os.path.exists(path): logger.info(f"Loading featured properties from: {path}") properties_df = pd.read_csv(path) break except: continue if properties_df is None: raise FileNotFoundError("Could not find the property data file") # Select premium properties (high price, good amenities) premium_properties = properties_df[ (properties_df['Price (INR)'] > 100000000) & # Price > 1 Crore (properties_df['Property Type'].isin(['Villa', 'Apartment', 'Condo'])) & (properties_df['Square Footage'] > 2000) ].sort_values('Price (INR)', ascending=False).head(6) # Format the properties for the frontend featured = [] # Use these specific property images instead of random Unsplash URLs property_images = { 'Apartment': [ "https://images.unsplash.com/photo-1522708323590-d24dbb6b0267?ixlib=rb-1.2.1&auto=format&fit=crop&w=800&q=80", "https://images.unsplash.com/photo-1560185127-6ed189bf02f4?ixlib=rb-1.2.1&auto=format&fit=crop&w=800&q=80", "https://images.unsplash.com/photo-1560448204-e02f11c3d0e2?ixlib=rb-1.2.1&auto=format&fit=crop&w=800&q=80", "https://images.unsplash.com/photo-1560185893-a55cbc8c57e8?ixlib=rb-1.2.1&auto=format&fit=crop&w=800&q=80" ], 'Villa': [ "https://images.unsplash.com/photo-1564013799919-ab600027ffc6?ixlib=rb-1.2.1&auto=format&fit=crop&w=800&q=80", "https://images.unsplash.com/photo-1613490493576-7fde63acd811?ixlib=rb-1.2.1&auto=format&fit=crop&w=800&q=80", "https://images.unsplash.com/photo-1600585154340-be6161a56a0c?ixlib=rb-1.2.1&auto=format&fit=crop&w=800&q=80" ], 'Condo': [ "https://images.unsplash.com/photo-1600607687939-ce8a6c25118c?ixlib=rb-1.2.1&auto=format&fit=crop&w=800&q=80", "https://images.unsplash.com/photo-1600566753376-12c8ab7fb75b?ixlib=rb-1.2.1&auto=format&fit=crop&w=800&q=80" ] } import random for i, (_, prop) in enumerate(premium_properties.iterrows()): # Convert price to Crore format price_cr = round(prop['Price (INR)'] / 10000000, 1) # Get a list of amenities amenities = prop['Amenities'].split(', ') if isinstance(prop['Amenities'], str) else [] # Get property type and select an image prop_type = prop['Property Type'] if prop_type in property_images: image_url = random.choice(property_images[prop_type]) else: image_url = "https://images.unsplash.com/photo-1600585154340-be6161a56a0c?ixlib=rb-1.2.1&auto=format&fit=crop&w=800&q=80" # Create a property object featured.append({ "id": prop['Property ID'], "title": f"{prop['Property Type']} in {prop['Location']}", "price": f"{price_cr} Cr", "image": image_url, "beds": prop['Bedrooms'], "baths": prop['Bathrooms'], "area": prop['Square Footage'], "tag": "Premium" if price_cr > 15 else "Featured", "amenities": amenities[:3] # Show only top 3 amenities }) return featured except Exception as e: logger.error(f"Error getting featured properties: {str(e)}") # Return fallback data return [ { "id": "PROP-0001", "title": "Luxury Villa in Vasant Kunj", "price": "2.5 Cr", "image": "https://images.unsplash.com/photo-1613490493576-7fde63acd811?ixlib=rb-1.2.1&auto=format&fit=crop&w=800&q=80", "beds": 4, "baths": 3, "area": 3500, "tag": "Premium", "amenities": ["Swimming Pool", "Gym", "Security"] }, { "id": "PROP-0002", "title": "Modern Apartment in Saket", "price": "1.8 Cr", "image": "https://images.unsplash.com/photo-1600585154340-be6161a56a0c?ixlib=rb-1.2.1&auto=format&fit=crop&w=800&q=80", "beds": 3, "baths": 2, "area": 2200, "tag": "New Launch", "amenities": ["Power Backup", "Lift", "Parking"] }, { "id": "PROP-0003", "title": "Penthouse in Greater Kailash", "price": "3.2 Cr", "image": "https://images.unsplash.com/photo-1600607687939-ce8a6c25118c?ixlib=rb-1.2.1&auto=format&fit=crop&w=800&q=80", "beds": 5, "baths": 4, "area": 4100, "tag": "Featured", "amenities": ["Clubhouse", "Garden", "24/7 Water Supply"] } ] @app.post("/api/nearby-properties") async def get_nearby_properties(request: LocationCoordinates): try: # Load the property data property_data_path = os.path.join(os.path.dirname(__file__), "data/delhi_real_estate_recommender.csv") properties_df = pd.read_csv(property_data_path) # For demo purposes, we'll just return random properties # In a real app, you would use the coordinates to find nearby properties nearby_props = properties_df.sample(min(6, len(properties_df))) # Format the properties for the frontend nearby = [] for _, prop in nearby_props.iterrows(): # Convert price to Lac/Cr format price_val = prop['Price (INR)'] / 100000 # Convert to Lacs price_str = f"{round(price_val / 100, 1)} Cr" if price_val >= 10000 else f"{int(price_val)} Lac" # Generate a random distance (1-5 km) import random distance = round(random.uniform(0.5, 5.0), 1) # Create a property object nearby.append({ "id": prop['Property ID'], "title": f"{prop['Bedrooms']} BHK {prop['Property Type']}", "location": prop['Location'], "distance": f"{distance} km away", "price": price_str, "image": f"https://source.unsplash.com/random/800x600/?house,{prop['Property Type'].lower()}", "type": prop['Property Type'], "possession": "Ready to Move" if random.random() > 0.3 else "Under Construction" }) return nearby except Exception as e: logger.error(f"Error getting nearby properties: {str(e)}") # Return fallback data return [ { "id": 1, "title": "3 BHK Apartment", "location": "Sector 45, Noida", "distance": "1.2 km away", "price": "85 Lac", "image": "https://images.unsplash.com/photo-1560448204-e02f11c3d0e2?ixlib=rb-1.2.1&auto=format&fit=crop&w=800&q=80", "type": "Apartment", "possession": "Ready to Move" }, { "id": 2, "title": "4 BHK Villa", "location": "Vasant Kunj, Delhi", "distance": "2.5 km away", "price": "1.9 Cr", "image": "https://images.unsplash.com/photo-1564013799919-ab600027ffc6?ixlib=rb-1.2.1&auto=format&fit=crop&w=800&q=80", "type": "Villa", "possession": "Ready to Move" }, { "id": 3, "title": "2 BHK Apartment", "location": "Sector 62, Noida", "distance": "3.1 km away", "price": "65 Lac", "image": "https://images.unsplash.com/photo-1522708323590-d24dbb6b0267?ixlib=rb-1.2.1&auto=format&fit=crop&w=800&q=80", "type": "Apartment", "possession": "Under Construction" } ] @app.get("/ping") async def ping(): """Simple endpoint to test API connectivity""" from datetime import datetime return { "status": "success", "message": "API is running", "timestamp": datetime.now().isoformat(), "server": "FastAPI Backend" } # At the end of the file, update the run command for production if __name__ == "__main__": port = int(os.environ.get("PORT", 7860)) uvicorn.run(app, host="0.0.0.0", port=port)