property_verify / models /fraud_classification.py
sksameermujahid's picture
Upload 26 files
4796377 verified
# models/fraud_classification.py
import re
from .model_loader import load_model
from .logging_config import logger
def classify_fraud(property_details, description):
"""
Classify the risk of fraud in a property listing using zero-shot classification.
This function analyzes property details and description to identify potential fraud indicators.
"""
try:
# Initialize fraud classification result
fraud_classification = {
'alert_level': 'minimal',
'alert_score': 0.0,
'high_risk': [],
'medium_risk': [],
'low_risk': [],
'confidence_scores': {}
}
# Accept property_details as dict or str
if isinstance(property_details, dict):
details_str = '\n'.join(f"{k}: {v}" for k, v in property_details.items())
else:
details_str = str(property_details)
text_to_analyze = f"{details_str}\n{description if description else ''}"
# Define risk categories for zero-shot classification
risk_categories = [
"fraudulent listing",
"misleading information",
"fake property",
"scam attempt",
"legitimate listing"
]
# Perform zero-shot classification
try:
classifier = load_model("zero-shot-classification", "typeform/mobilebert-uncased-mnli")
result = classifier(text_to_analyze, risk_categories, multi_label=True)
except Exception as e:
logger.error(f"Model error in fraud classification: {str(e)}")
fraud_classification['alert_level'] = 'error'
fraud_classification['high_risk'].append(f"Model error: {str(e)}")
fraud_classification['alert_score'] = 1.0
return fraud_classification
# Process classification results
fraud_score = 0.0
for label, score in zip(result.get('labels', []), result.get('scores', [])):
if label != "legitimate listing":
try:
score_val = float(score)
except Exception:
score_val = 0.0
fraud_score += score_val
fraud_classification['confidence_scores'][label] = score_val
# Normalize fraud score to 0-1 range
try:
fraud_score = min(1.0, fraud_score / (len(risk_categories) - 1))
except Exception:
fraud_score = 0.0
fraud_classification['alert_score'] = fraud_score
# Define fraud indicators to check
fraud_indicators = {
'high_risk': [
r'urgent|immediate|hurry|limited time|special offer',
r'bank|transfer|wire|payment|money',
r'fake|scam|fraud|illegal|unauthorized',
r'guaranteed|promised|assured|certain',
r'contact.*whatsapp|whatsapp.*contact',
r'price.*negotiable|negotiable.*price',
r'no.*documents|documents.*not.*required',
r'cash.*only|only.*cash',
r'off.*market|market.*off',
r'under.*table|table.*under'
],
'medium_risk': [
r'unverified|unconfirmed|unchecked',
r'partial|incomplete|missing',
r'different.*location|location.*different',
r'price.*increased|increased.*price',
r'no.*photos|photos.*not.*available',
r'contact.*email|email.*contact',
r'agent.*not.*available|not.*available.*agent',
r'property.*not.*viewable|not.*viewable.*property',
r'price.*changed|changed.*price',
r'details.*updated|updated.*details'
],
'low_risk': [
r'new.*listing|listing.*new',
r'recent.*update|update.*recent',
r'price.*reduced|reduced.*price',
r'contact.*phone|phone.*contact',
r'agent.*available|available.*agent',
r'property.*viewable|viewable.*property',
r'photos.*available|available.*photos',
r'documents.*available|available.*documents',
r'price.*fixed|fixed.*price',
r'details.*complete|complete.*details'
]
}
# Check for fraud indicators in text
for risk_level, patterns in fraud_indicators.items():
for pattern in patterns:
try:
matches = re.finditer(pattern, text_to_analyze, re.IGNORECASE)
for match in matches:
indicator = match.group(0)
if indicator not in fraud_classification[risk_level]:
fraud_classification[risk_level].append(indicator)
except Exception as e:
logger.warning(f"Regex error in fraud indicator pattern '{pattern}': {str(e)}")
# Determine alert level based on fraud score and indicators
try:
if fraud_score > 0.7 or len(fraud_classification['high_risk']) > 0:
fraud_classification['alert_level'] = 'critical'
elif fraud_score > 0.5 or len(fraud_classification['medium_risk']) > 2:
fraud_classification['alert_level'] = 'high'
elif fraud_score > 0.3 or len(fraud_classification['medium_risk']) > 0:
fraud_classification['alert_level'] = 'medium'
elif fraud_score > 0.1 or len(fraud_classification['low_risk']) > 0:
fraud_classification['alert_level'] = 'low'
else:
fraud_classification['alert_level'] = 'minimal'
except Exception as e:
logger.warning(f"Error determining alert level: {str(e)}")
fraud_classification['alert_level'] = 'minimal'
# Additional checks for common fraud patterns
try:
if re.search(r'price.*too.*good|too.*good.*price', text_to_analyze, re.IGNORECASE):
fraud_classification['high_risk'].append("Unrealistically low price")
if re.search(r'no.*inspection|inspection.*not.*allowed', text_to_analyze, re.IGNORECASE):
fraud_classification['high_risk'].append("No property inspection allowed")
if re.search(r'owner.*abroad|abroad.*owner', text_to_analyze, re.IGNORECASE):
fraud_classification['medium_risk'].append("Owner claims to be abroad")
if re.search(r'agent.*unavailable|unavailable.*agent', text_to_analyze, re.IGNORECASE):
fraud_classification['medium_risk'].append("Agent unavailable for verification")
except Exception as e:
logger.warning(f"Error in additional fraud pattern checks: {str(e)}")
# Check for inconsistencies in property details
try:
if isinstance(property_details, dict) and 'price' in property_details and 'market_value' in property_details:
price_val = float(str(property_details['price']).replace(',', '').replace('₹', '').strip())
market_value_val = float(str(property_details['market_value']).replace(',', '').replace('₹', '').strip())
if price_val < market_value_val * 0.5:
fraud_classification['high_risk'].append("Price significantly below market value")
except Exception as e:
logger.warning(f"Error checking price/market_value: {str(e)}")
return fraud_classification
except Exception as e:
logger.error(f"Error in fraud classification: {str(e)}")
return {
'alert_level': 'error',
'alert_score': 1.0,
'high_risk': [f"Error in fraud classification: {str(e)}"],
'medium_risk': [],
'low_risk': [],
'confidence_scores': {}
}