Spaces:
Runtime error
Runtime error
File size: 7,914 Bytes
4796377 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 |
# models/fraud_classification.py
import re
from .model_loader import load_model
from .logging_config import logger
def classify_fraud(property_details, description):
"""
Classify the risk of fraud in a property listing using zero-shot classification.
This function analyzes property details and description to identify potential fraud indicators.
"""
try:
# Initialize fraud classification result
fraud_classification = {
'alert_level': 'minimal',
'alert_score': 0.0,
'high_risk': [],
'medium_risk': [],
'low_risk': [],
'confidence_scores': {}
}
# Accept property_details as dict or str
if isinstance(property_details, dict):
details_str = '\n'.join(f"{k}: {v}" for k, v in property_details.items())
else:
details_str = str(property_details)
text_to_analyze = f"{details_str}\n{description if description else ''}"
# Define risk categories for zero-shot classification
risk_categories = [
"fraudulent listing",
"misleading information",
"fake property",
"scam attempt",
"legitimate listing"
]
# Perform zero-shot classification
try:
classifier = load_model("zero-shot-classification", "typeform/mobilebert-uncased-mnli")
result = classifier(text_to_analyze, risk_categories, multi_label=True)
except Exception as e:
logger.error(f"Model error in fraud classification: {str(e)}")
fraud_classification['alert_level'] = 'error'
fraud_classification['high_risk'].append(f"Model error: {str(e)}")
fraud_classification['alert_score'] = 1.0
return fraud_classification
# Process classification results
fraud_score = 0.0
for label, score in zip(result.get('labels', []), result.get('scores', [])):
if label != "legitimate listing":
try:
score_val = float(score)
except Exception:
score_val = 0.0
fraud_score += score_val
fraud_classification['confidence_scores'][label] = score_val
# Normalize fraud score to 0-1 range
try:
fraud_score = min(1.0, fraud_score / (len(risk_categories) - 1))
except Exception:
fraud_score = 0.0
fraud_classification['alert_score'] = fraud_score
# Define fraud indicators to check
fraud_indicators = {
'high_risk': [
r'urgent|immediate|hurry|limited time|special offer',
r'bank|transfer|wire|payment|money',
r'fake|scam|fraud|illegal|unauthorized',
r'guaranteed|promised|assured|certain',
r'contact.*whatsapp|whatsapp.*contact',
r'price.*negotiable|negotiable.*price',
r'no.*documents|documents.*not.*required',
r'cash.*only|only.*cash',
r'off.*market|market.*off',
r'under.*table|table.*under'
],
'medium_risk': [
r'unverified|unconfirmed|unchecked',
r'partial|incomplete|missing',
r'different.*location|location.*different',
r'price.*increased|increased.*price',
r'no.*photos|photos.*not.*available',
r'contact.*email|email.*contact',
r'agent.*not.*available|not.*available.*agent',
r'property.*not.*viewable|not.*viewable.*property',
r'price.*changed|changed.*price',
r'details.*updated|updated.*details'
],
'low_risk': [
r'new.*listing|listing.*new',
r'recent.*update|update.*recent',
r'price.*reduced|reduced.*price',
r'contact.*phone|phone.*contact',
r'agent.*available|available.*agent',
r'property.*viewable|viewable.*property',
r'photos.*available|available.*photos',
r'documents.*available|available.*documents',
r'price.*fixed|fixed.*price',
r'details.*complete|complete.*details'
]
}
# Check for fraud indicators in text
for risk_level, patterns in fraud_indicators.items():
for pattern in patterns:
try:
matches = re.finditer(pattern, text_to_analyze, re.IGNORECASE)
for match in matches:
indicator = match.group(0)
if indicator not in fraud_classification[risk_level]:
fraud_classification[risk_level].append(indicator)
except Exception as e:
logger.warning(f"Regex error in fraud indicator pattern '{pattern}': {str(e)}")
# Determine alert level based on fraud score and indicators
try:
if fraud_score > 0.7 or len(fraud_classification['high_risk']) > 0:
fraud_classification['alert_level'] = 'critical'
elif fraud_score > 0.5 or len(fraud_classification['medium_risk']) > 2:
fraud_classification['alert_level'] = 'high'
elif fraud_score > 0.3 or len(fraud_classification['medium_risk']) > 0:
fraud_classification['alert_level'] = 'medium'
elif fraud_score > 0.1 or len(fraud_classification['low_risk']) > 0:
fraud_classification['alert_level'] = 'low'
else:
fraud_classification['alert_level'] = 'minimal'
except Exception as e:
logger.warning(f"Error determining alert level: {str(e)}")
fraud_classification['alert_level'] = 'minimal'
# Additional checks for common fraud patterns
try:
if re.search(r'price.*too.*good|too.*good.*price', text_to_analyze, re.IGNORECASE):
fraud_classification['high_risk'].append("Unrealistically low price")
if re.search(r'no.*inspection|inspection.*not.*allowed', text_to_analyze, re.IGNORECASE):
fraud_classification['high_risk'].append("No property inspection allowed")
if re.search(r'owner.*abroad|abroad.*owner', text_to_analyze, re.IGNORECASE):
fraud_classification['medium_risk'].append("Owner claims to be abroad")
if re.search(r'agent.*unavailable|unavailable.*agent', text_to_analyze, re.IGNORECASE):
fraud_classification['medium_risk'].append("Agent unavailable for verification")
except Exception as e:
logger.warning(f"Error in additional fraud pattern checks: {str(e)}")
# Check for inconsistencies in property details
try:
if isinstance(property_details, dict) and 'price' in property_details and 'market_value' in property_details:
price_val = float(str(property_details['price']).replace(',', '').replace('₹', '').strip())
market_value_val = float(str(property_details['market_value']).replace(',', '').replace('₹', '').strip())
if price_val < market_value_val * 0.5:
fraud_classification['high_risk'].append("Price significantly below market value")
except Exception as e:
logger.warning(f"Error checking price/market_value: {str(e)}")
return fraud_classification
except Exception as e:
logger.error(f"Error in fraud classification: {str(e)}")
return {
'alert_level': 'error',
'alert_score': 1.0,
'high_risk': [f"Error in fraud classification: {str(e)}"],
'medium_risk': [],
'low_risk': [],
'confidence_scores': {}
}
|