Spaces:
Runtime error
Runtime error
# app.py | |
from flask import Flask, render_template, request, jsonify | |
from flask_cors import CORS | |
import base64 | |
import io | |
import re | |
import json | |
import uuid | |
import time | |
import asyncio | |
from geopy.geocoders import Nominatim | |
from datetime import datetime | |
from models.logging_config import logger | |
from models.model_loader import load_model | |
from models.image_analysis import analyze_image | |
from models.pdf_analysis import extract_pdf_text, analyze_pdf_content | |
from models.property_summary import generate_property_summary | |
from models.fraud_classification import classify_fraud | |
from models.trust_score import generate_trust_score | |
from models.suggestions import generate_suggestions | |
from models.text_quality import assess_text_quality | |
from models.address_verification import verify_address | |
from models.cross_validation import perform_cross_validation | |
from models.location_analysis import analyze_location | |
from models.price_analysis import analyze_price | |
from models.legal_analysis import analyze_legal_details | |
from models.property_specs import verify_property_specs | |
from models.market_value import analyze_market_value | |
from models.image_quality import assess_image_quality | |
from models.property_relation import check_if_property_related | |
import torch | |
import numpy as np | |
import concurrent.futures | |
from PIL import Image | |
app = Flask(__name__) | |
CORS(app) # Enable CORS for frontend | |
# Initialize geocoder | |
geocoder = Nominatim(user_agent="indian_property_verifier", timeout=10) | |
def make_json_serializable(obj): | |
try: | |
if isinstance(obj, (bool, int, float, str, type(None))): | |
return obj | |
elif isinstance(obj, (list, tuple)): | |
return [make_json_serializable(item) for item in obj] | |
elif isinstance(obj, dict): | |
return {str(key): make_json_serializable(value) for key, value in obj.items()} | |
elif torch.is_tensor(obj): | |
return obj.item() if obj.numel() == 1 else obj.tolist() | |
elif np.isscalar(obj): | |
return obj.item() if hasattr(obj, 'item') else float(obj) | |
elif isinstance(obj, np.ndarray): | |
return obj.tolist() | |
else: | |
return str(obj) | |
except Exception as e: | |
logger.error(f"Error serializing object: {str(e)}") | |
return str(obj) | |
def index(): | |
return render_template('index.html') | |
def get_location(): | |
try: | |
data = request.json or {} | |
latitude = data.get('latitude') | |
longitude = data.get('longitude') | |
if not latitude or not longitude: | |
logger.warning("Missing latitude or longitude") | |
return jsonify({ | |
'status': 'error', | |
'message': 'Latitude and longitude are required' | |
}), 400 | |
# Validate coordinates are within India | |
try: | |
lat, lng = float(latitude), float(longitude) | |
if not (6.5 <= lat <= 37.5 and 68.0 <= lng <= 97.5): | |
return jsonify({ | |
'status': 'error', | |
'message': 'Coordinates are outside India' | |
}), 400 | |
except ValueError: | |
return jsonify({ | |
'status': 'error', | |
'message': 'Invalid coordinates format' | |
}), 400 | |
# Retry geocoding up to 3 times | |
for attempt in range(3): | |
try: | |
location = geocoder.reverse((latitude, longitude), exactly_one=True) | |
if location: | |
address_components = location.raw.get('address', {}) | |
# Extract Indian-specific address components | |
city = address_components.get('city', '') | |
if not city: | |
city = address_components.get('town', '') | |
if not city: | |
city = address_components.get('village', '') | |
if not city: | |
city = address_components.get('suburb', '') | |
state = address_components.get('state', '') | |
if not state: | |
state = address_components.get('state_district', '') | |
# Get postal code and validate Indian format | |
postal_code = address_components.get('postcode', '') | |
if postal_code and not re.match(r'^\d{6}$', postal_code): | |
postal_code = '' | |
# Get road/street name | |
road = address_components.get('road', '') | |
if not road: | |
road = address_components.get('street', '') | |
# Get area/locality | |
area = address_components.get('suburb', '') | |
if not area: | |
area = address_components.get('neighbourhood', '') | |
return jsonify({ | |
'status': 'success', | |
'address': location.address, | |
'street': road, | |
'area': area, | |
'city': city, | |
'state': state, | |
'country': 'India', | |
'postal_code': postal_code, | |
'latitude': latitude, | |
'longitude': longitude, | |
'formatted_address': f"{road}, {area}, {city}, {state}, India - {postal_code}" | |
}) | |
logger.warning(f"Geocoding failed on attempt {attempt + 1}") | |
time.sleep(1) # Wait before retry | |
except Exception as e: | |
logger.error(f"Geocoding error on attempt {attempt + 1}: {str(e)}") | |
time.sleep(1) | |
return jsonify({ | |
'status': 'error', | |
'message': 'Could not determine location after retries' | |
}), 500 | |
except Exception as e: | |
logger.error(f"Error in get_location: {str(e)}") | |
return jsonify({ | |
'status': 'error', | |
'message': str(e) | |
}), 500 | |
def calculate_final_verdict(results): | |
""" | |
Calculate a comprehensive final verdict based on all analysis results. | |
This function combines all verification scores, fraud indicators, and quality assessments | |
to determine if a property listing is legitimate, suspicious, or fraudulent. | |
""" | |
try: | |
# Defensive: ensure results is a dict | |
if not isinstance(results, dict): | |
logger.warning(f"Input to calculate_final_verdict is not a dict: {type(results)}") | |
results = {} | |
# Extract key components from results, defaulting to safe values | |
trust_score = results.get('trust_score', {}).get('score', 0) or 0 | |
fraud_classification = results.get('fraud_classification', {}) or {} | |
quality_assessment = results.get('quality_assessment', {}) or {} | |
specs_verification = results.get('specs_verification', {}) or {} | |
cross_validation = results.get('cross_validation', []) or [] | |
location_analysis = results.get('location_analysis', {}) or {} | |
price_analysis = results.get('price_analysis', {}) or {} | |
legal_analysis = results.get('legal_analysis', {}) or {} | |
document_analysis = results.get('document_analysis', {}) or {} | |
image_analysis = results.get('image_analysis', {}) or {} | |
# Calculate component scores (0-100) | |
component_scores = { | |
'trust': trust_score, | |
'fraud': 100 - (fraud_classification.get('alert_score', 0) * 100) if fraud_classification.get('alert_score') is not None else 100, | |
'quality': quality_assessment.get('score', 0) or 0, | |
'specs': specs_verification.get('verification_score', 0) or 0, | |
'location': location_analysis.get('completeness_score', 0) or 0, | |
'price': price_analysis.get('confidence', 0) * 100 if price_analysis.get('has_price') else 0, | |
'legal': legal_analysis.get('completeness_score', 0) or 0, | |
'documents': min(100, (document_analysis.get('pdf_count', 0) / 3) * 100) if document_analysis.get('pdf_count') else 0, | |
'images': min(100, (image_analysis.get('image_count', 0) / 5) * 100) if image_analysis.get('image_count') else 0 | |
} | |
# Calculate weighted final score with adjusted weights | |
weights = { | |
'trust': 0.20, | |
'fraud': 0.25, # Increased weight for fraud detection | |
'quality': 0.15, | |
'specs': 0.10, | |
'location': 0.10, | |
'price': 0.05, | |
'legal': 0.05, | |
'documents': 0.05, | |
'images': 0.05 | |
} | |
final_score = sum(score * weights.get(component, 0) for component, score in component_scores.items()) | |
# Defensive: ensure verdict structure | |
verdict = { | |
'status': 'unknown', | |
'confidence': 0.0, | |
'score': final_score, | |
'reasons': [], | |
'critical_issues': [], | |
'warnings': [], | |
'recommendations': [] | |
} | |
# Determine verdict status based on multiple factors | |
fraud_level = fraud_classification.get('alert_level', 'minimal') | |
high_risk_indicators = len(fraud_classification.get('high_risk', [])) if fraud_classification.get('high_risk') else 0 | |
critical_issues = [] | |
warnings = [] | |
# Check for critical issues | |
if fraud_level in ['critical', 'high']: | |
critical_issues.append(f"High fraud risk detected: {fraud_level} alert level") | |
if trust_score < 40: | |
critical_issues.append(f"Very low trust score: {trust_score}%") | |
if quality_assessment.get('score', 0) < 30: | |
critical_issues.append(f"Very low content quality: {quality_assessment.get('score', 0)}%") | |
if specs_verification.get('verification_score', 0) < 40: | |
critical_issues.append(f"Property specifications verification failed: {specs_verification.get('verification_score', 0)}%") | |
# Check for warnings | |
if fraud_level == 'medium': | |
warnings.append(f"Medium fraud risk detected: {fraud_level} alert level") | |
if trust_score < 60: | |
warnings.append(f"Low trust score: {trust_score}%") | |
if quality_assessment.get('score', 0) < 60: | |
warnings.append(f"Low content quality: {quality_assessment.get('score', 0)}%") | |
if specs_verification.get('verification_score', 0) < 70: | |
warnings.append(f"Property specifications have issues: {specs_verification.get('verification_score', 0)}%") | |
# Check cross-validation results | |
for check in cross_validation: | |
if check.get('status') in ['inconsistent', 'invalid', 'suspicious', 'no_match']: | |
warnings.append(f"Cross-validation issue: {check.get('message', 'Unknown issue')}") | |
# Check for missing critical information | |
missing_critical = [] | |
if not location_analysis.get('completeness_score', 0) > 70: | |
missing_critical.append("Location information is incomplete") | |
if not price_analysis.get('has_price', False): | |
missing_critical.append("Price information is missing") | |
if not legal_analysis.get('completeness_score', 0) > 70: | |
missing_critical.append("Legal information is incomplete") | |
if document_analysis.get('pdf_count', 0) == 0: | |
missing_critical.append("No supporting documents provided") | |
if image_analysis.get('image_count', 0) == 0: | |
missing_critical.append("No property images provided") | |
if missing_critical: | |
warnings.append(f"Missing critical information: {', '.join(missing_critical)}") | |
# Enhanced verdict determination with more strict criteria | |
if critical_issues or (fraud_level in ['critical', 'high'] and trust_score < 50) or high_risk_indicators > 0: | |
verdict['status'] = 'fraudulent' | |
verdict['confidence'] = min(100, max(70, 100 - (trust_score * 0.5))) | |
elif warnings or (fraud_level == 'medium' and trust_score < 70) or specs_verification.get('verification_score', 0) < 60: | |
verdict['status'] = 'suspicious' | |
verdict['confidence'] = min(100, max(50, trust_score * 0.8)) | |
else: | |
verdict['status'] = 'legitimate' | |
verdict['confidence'] = min(100, max(70, trust_score * 0.9)) | |
# Add reasons to verdict | |
verdict['critical_issues'] = critical_issues | |
verdict['warnings'] = warnings | |
# Add recommendations based on issues | |
if critical_issues: | |
verdict['recommendations'].append("Do not proceed with this property listing") | |
verdict['recommendations'].append("Report this listing to the platform") | |
elif warnings: | |
verdict['recommendations'].append("Proceed with extreme caution") | |
verdict['recommendations'].append("Request additional verification documents") | |
verdict['recommendations'].append("Verify all information with independent sources") | |
else: | |
verdict['recommendations'].append("Proceed with standard due diligence") | |
verdict['recommendations'].append("Verify final details before transaction") | |
# Add specific recommendations based on missing information | |
for missing in missing_critical: | |
verdict['recommendations'].append(f"Request {missing.lower()}") | |
return verdict | |
except Exception as e: | |
logger.error(f"Error calculating final verdict: {str(e)}") | |
return { | |
'status': 'error', | |
'confidence': 0.0, | |
'score': 0.0, | |
'reasons': [f"Error calculating verdict: {str(e)}"], | |
'critical_issues': [], | |
'warnings': [], | |
'recommendations': ["Unable to determine property status due to an error"] | |
} | |
def verify_property(): | |
try: | |
if not request.form and not request.files: | |
logger.warning("No form data or files provided") | |
return jsonify({ | |
'error': 'No data provided', | |
'status': 'error' | |
}), 400 | |
# Extract form data | |
data = { | |
'property_name': request.form.get('property_name', '').strip(), | |
'property_type': request.form.get('property_type', '').strip(), | |
'status': request.form.get('status', '').strip(), | |
'description': request.form.get('description', '').strip(), | |
'address': request.form.get('address', '').strip(), | |
'city': request.form.get('city', '').strip(), | |
'state': request.form.get('state', '').strip(), | |
'country': request.form.get('country', 'India').strip(), | |
'zip': request.form.get('zip', '').strip(), | |
'latitude': request.form.get('latitude', '').strip(), | |
'longitude': request.form.get('longitude', '').strip(), | |
'bedrooms': request.form.get('bedrooms', '').strip(), | |
'bathrooms': request.form.get('bathrooms', '').strip(), | |
'total_rooms': request.form.get('total_rooms', '').strip(), | |
'year_built': request.form.get('year_built', '').strip(), | |
'parking': request.form.get('parking', '').strip(), | |
'sq_ft': request.form.get('sq_ft', '').strip(), | |
'market_value': request.form.get('market_value', '').strip(), | |
'amenities': request.form.get('amenities', '').strip(), | |
'nearby_landmarks': request.form.get('nearby_landmarks', '').strip(), | |
'legal_details': request.form.get('legal_details', '').strip() | |
} | |
# Validate required fields | |
required_fields = ['property_name', 'property_type', 'address', 'city', 'state'] | |
missing_fields = [field for field in required_fields if not data[field]] | |
if missing_fields: | |
logger.warning(f"Missing required fields: {', '.join(missing_fields)}") | |
return jsonify({ | |
'error': f"Missing required fields: {', '.join(missing_fields)}", | |
'status': 'error' | |
}), 400 | |
# Process images | |
images = [] | |
image_analysis = [] | |
if 'images' in request.files: | |
# Get unique image files by filename to prevent duplicates | |
image_files = {} | |
for img_file in request.files.getlist('images'): | |
if img_file.filename and img_file.filename.lower().endswith(('.jpg', '.jpeg', '.png')): | |
image_files[img_file.filename] = img_file | |
# Process unique images | |
for img_file in image_files.values(): | |
try: | |
img = Image.open(img_file) | |
buffered = io.BytesIO() | |
img.save(buffered, format="JPEG") | |
img_str = base64.b64encode(buffered.getvalue()).decode('utf-8') | |
images.append(img_str) | |
image_analysis.append(analyze_image(img)) | |
except Exception as e: | |
logger.error(f"Error processing image {img_file.filename}: {str(e)}") | |
image_analysis.append({'error': str(e), 'is_property_related': False}) | |
# Process PDFs | |
pdf_texts = [] | |
pdf_analysis = [] | |
if 'documents' in request.files: | |
# Get unique PDF files by filename to prevent duplicates | |
pdf_files = {} | |
for pdf_file in request.files.getlist('documents'): | |
if pdf_file.filename and pdf_file.filename.lower().endswith('.pdf'): | |
pdf_files[pdf_file.filename] = pdf_file | |
# Process unique PDFs | |
for pdf_file in pdf_files.values(): | |
try: | |
pdf_text = extract_pdf_text(pdf_file) | |
pdf_texts.append({ | |
'filename': pdf_file.filename, | |
'text': pdf_text | |
}) | |
pdf_analysis.append(analyze_pdf_content(pdf_text, data)) | |
except Exception as e: | |
logger.error(f"Error processing PDF {pdf_file.filename}: {str(e)}") | |
pdf_analysis.append({'error': str(e)}) | |
# Create consolidated text for analysis | |
consolidated_text = f""" | |
Property Name: {data['property_name']} | |
Property Type: {data['property_type']} | |
Status: {data['status']} | |
Description: {data['description']} | |
Location: {data['address']}, {data['city']}, {data['state']}, {data['country']}, {data['zip']} | |
Coordinates: Lat {data['latitude']}, Long {data['longitude']} | |
Specifications: {data['bedrooms']} bedrooms, {data['bathrooms']} bathrooms, {data['total_rooms']} total rooms | |
Year Built: {data['year_built']} | |
Parking: {data['parking']} | |
Size: {data['sq_ft']} sq. ft. | |
Market Value: ₹{data['market_value']} | |
Amenities: {data['amenities']} | |
Nearby Landmarks: {data['nearby_landmarks']} | |
Legal Details: {data['legal_details']} | |
""" | |
# Process description translation if needed | |
try: | |
description = data['description'] | |
if description and len(description) > 10: | |
text_language = detect(description) | |
if text_language != 'en': | |
translated_description = GoogleTranslator(source=text_language, target='en').translate(description) | |
data['description_translated'] = translated_description | |
else: | |
data['description_translated'] = description | |
else: | |
data['description_translated'] = description | |
except Exception as e: | |
logger.error(f"Error in language detection/translation: {str(e)}") | |
data['description_translated'] = data['description'] | |
# Run all analyses in parallel using asyncio | |
async def run_analyses(): | |
with concurrent.futures.ThreadPoolExecutor() as executor: | |
loop = asyncio.get_event_loop() | |
tasks = [ | |
loop.run_in_executor(executor, generate_property_summary, data), | |
loop.run_in_executor(executor, classify_fraud, consolidated_text, data), | |
loop.run_in_executor(executor, generate_trust_score, consolidated_text, image_analysis, pdf_analysis), | |
loop.run_in_executor(executor, generate_suggestions, consolidated_text, data), | |
loop.run_in_executor(executor, assess_text_quality, data['description_translated']), | |
loop.run_in_executor(executor, verify_address, data), | |
loop.run_in_executor(executor, perform_cross_validation, data), | |
loop.run_in_executor(executor, analyze_location, data), | |
loop.run_in_executor(executor, analyze_price, data), | |
loop.run_in_executor(executor, analyze_legal_details, data['legal_details']), | |
loop.run_in_executor(executor, verify_property_specs, data), | |
loop.run_in_executor(executor, analyze_market_value, data) | |
] | |
results = await asyncio.gather(*tasks) | |
return results | |
# Run analyses and get results | |
loop = asyncio.new_event_loop() | |
asyncio.set_event_loop(loop) | |
analysis_results = loop.run_until_complete(run_analyses()) | |
loop.close() | |
# Unpack results | |
summary, fraud_classification, (trust_score, trust_reasoning), suggestions, quality_assessment, \ | |
address_verification, cross_validation, location_analysis, price_analysis, legal_analysis, \ | |
specs_verification, market_analysis = analysis_results | |
# Prepare response | |
document_analysis = { | |
'pdf_count': len(pdf_texts), | |
'pdf_texts': pdf_texts, | |
'pdf_analysis': pdf_analysis | |
} | |
image_results = { | |
'image_count': len(images), | |
'image_analysis': image_analysis | |
} | |
report_id = str(uuid.uuid4()) | |
# Create results dictionary | |
results = { | |
'report_id': report_id, | |
'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S'), | |
'summary': summary, | |
'fraud_classification': fraud_classification, | |
'trust_score': { | |
'score': trust_score, | |
'reasoning': trust_reasoning | |
}, | |
'suggestions': suggestions, | |
'quality_assessment': quality_assessment, | |
'address_verification': address_verification, | |
'cross_validation': cross_validation, | |
'location_analysis': location_analysis, | |
'price_analysis': price_analysis, | |
'legal_analysis': legal_analysis, | |
'document_analysis': document_analysis, | |
'image_analysis': image_results, | |
'specs_verification': specs_verification, | |
'market_analysis': market_analysis, | |
'images': images | |
} | |
# Calculate final verdict | |
final_verdict = calculate_final_verdict(results) | |
results['final_verdict'] = final_verdict | |
return jsonify(make_json_serializable(results)) | |
except Exception as e: | |
logger.error(f"Error in verify_property: {str(e)}") | |
return jsonify({ | |
'error': 'Server error occurred. Please try again later.', | |
'status': 'error', | |
'details': str(e) | |
}), 500 | |
if __name__ == '__main__': | |
# Run Flask app | |
app.run(host='0.0.0.0', port=8000, debug=True, use_reloader=False) | |