# models/fraud_classification.py import re from .model_loader import load_model from .logging_config import logger def classify_fraud(property_details, description): """ Classify the risk of fraud in a property listing using zero-shot classification. This function analyzes property details and description to identify potential fraud indicators. """ try: # Initialize fraud classification result fraud_classification = { 'alert_level': 'minimal', 'alert_score': 0.0, 'high_risk': [], 'medium_risk': [], 'low_risk': [], 'confidence_scores': {} } # Accept property_details as dict or str if isinstance(property_details, dict): details_str = '\n'.join(f"{k}: {v}" for k, v in property_details.items()) else: details_str = str(property_details) text_to_analyze = f"{details_str}\n{description if description else ''}" # Define risk categories for zero-shot classification risk_categories = [ "fraudulent listing", "misleading information", "fake property", "scam attempt", "legitimate listing" ] # Perform zero-shot classification try: classifier = load_model("zero-shot-classification", "typeform/mobilebert-uncased-mnli") result = classifier(text_to_analyze, risk_categories, multi_label=True) except Exception as e: logger.error(f"Model error in fraud classification: {str(e)}") fraud_classification['alert_level'] = 'error' fraud_classification['high_risk'].append(f"Model error: {str(e)}") fraud_classification['alert_score'] = 1.0 return fraud_classification # Process classification results fraud_score = 0.0 for label, score in zip(result.get('labels', []), result.get('scores', [])): if label != "legitimate listing": try: score_val = float(score) except Exception: score_val = 0.0 fraud_score += score_val fraud_classification['confidence_scores'][label] = score_val # Normalize fraud score to 0-1 range try: fraud_score = min(1.0, fraud_score / (len(risk_categories) - 1)) except Exception: fraud_score = 0.0 fraud_classification['alert_score'] = fraud_score # Define fraud indicators to check fraud_indicators = { 'high_risk': [ r'urgent|immediate|hurry|limited time|special offer', r'bank|transfer|wire|payment|money', r'fake|scam|fraud|illegal|unauthorized', r'guaranteed|promised|assured|certain', r'contact.*whatsapp|whatsapp.*contact', r'price.*negotiable|negotiable.*price', r'no.*documents|documents.*not.*required', r'cash.*only|only.*cash', r'off.*market|market.*off', r'under.*table|table.*under' ], 'medium_risk': [ r'unverified|unconfirmed|unchecked', r'partial|incomplete|missing', r'different.*location|location.*different', r'price.*increased|increased.*price', r'no.*photos|photos.*not.*available', r'contact.*email|email.*contact', r'agent.*not.*available|not.*available.*agent', r'property.*not.*viewable|not.*viewable.*property', r'price.*changed|changed.*price', r'details.*updated|updated.*details' ], 'low_risk': [ r'new.*listing|listing.*new', r'recent.*update|update.*recent', r'price.*reduced|reduced.*price', r'contact.*phone|phone.*contact', r'agent.*available|available.*agent', r'property.*viewable|viewable.*property', r'photos.*available|available.*photos', r'documents.*available|available.*documents', r'price.*fixed|fixed.*price', r'details.*complete|complete.*details' ] } # Check for fraud indicators in text for risk_level, patterns in fraud_indicators.items(): for pattern in patterns: try: matches = re.finditer(pattern, text_to_analyze, re.IGNORECASE) for match in matches: indicator = match.group(0) if indicator not in fraud_classification[risk_level]: fraud_classification[risk_level].append(indicator) except Exception as e: logger.warning(f"Regex error in fraud indicator pattern '{pattern}': {str(e)}") # Determine alert level based on fraud score and indicators try: if fraud_score > 0.7 or len(fraud_classification['high_risk']) > 0: fraud_classification['alert_level'] = 'critical' elif fraud_score > 0.5 or len(fraud_classification['medium_risk']) > 2: fraud_classification['alert_level'] = 'high' elif fraud_score > 0.3 or len(fraud_classification['medium_risk']) > 0: fraud_classification['alert_level'] = 'medium' elif fraud_score > 0.1 or len(fraud_classification['low_risk']) > 0: fraud_classification['alert_level'] = 'low' else: fraud_classification['alert_level'] = 'minimal' except Exception as e: logger.warning(f"Error determining alert level: {str(e)}") fraud_classification['alert_level'] = 'minimal' # Additional checks for common fraud patterns try: if re.search(r'price.*too.*good|too.*good.*price', text_to_analyze, re.IGNORECASE): fraud_classification['high_risk'].append("Unrealistically low price") if re.search(r'no.*inspection|inspection.*not.*allowed', text_to_analyze, re.IGNORECASE): fraud_classification['high_risk'].append("No property inspection allowed") if re.search(r'owner.*abroad|abroad.*owner', text_to_analyze, re.IGNORECASE): fraud_classification['medium_risk'].append("Owner claims to be abroad") if re.search(r'agent.*unavailable|unavailable.*agent', text_to_analyze, re.IGNORECASE): fraud_classification['medium_risk'].append("Agent unavailable for verification") except Exception as e: logger.warning(f"Error in additional fraud pattern checks: {str(e)}") # Check for inconsistencies in property details try: if isinstance(property_details, dict) and 'price' in property_details and 'market_value' in property_details: price_val = float(str(property_details['price']).replace(',', '').replace('₹', '').strip()) market_value_val = float(str(property_details['market_value']).replace(',', '').replace('₹', '').strip()) if price_val < market_value_val * 0.5: fraud_classification['high_risk'].append("Price significantly below market value") except Exception as e: logger.warning(f"Error checking price/market_value: {str(e)}") return fraud_classification except Exception as e: logger.error(f"Error in fraud classification: {str(e)}") return { 'alert_level': 'error', 'alert_score': 1.0, 'high_risk': [f"Error in fraud classification: {str(e)}"], 'medium_risk': [], 'low_risk': [], 'confidence_scores': {} }