Spaces:
Runtime error
Runtime error
# models/fraud_classification.py | |
import re | |
from .model_loader import load_model | |
from .logging_config import logger | |
def classify_fraud(property_details, description): | |
""" | |
Classify the risk of fraud in a property listing using zero-shot classification. | |
This function analyzes property details and description to identify potential fraud indicators. | |
""" | |
try: | |
# Initialize fraud classification result | |
fraud_classification = { | |
'alert_level': 'minimal', | |
'alert_score': 0.0, | |
'high_risk': [], | |
'medium_risk': [], | |
'low_risk': [], | |
'confidence_scores': {} | |
} | |
# Accept property_details as dict or str | |
if isinstance(property_details, dict): | |
details_str = '\n'.join(f"{k}: {v}" for k, v in property_details.items()) | |
else: | |
details_str = str(property_details) | |
text_to_analyze = f"{details_str}\n{description if description else ''}" | |
# Define risk categories for zero-shot classification | |
risk_categories = [ | |
"fraudulent listing", | |
"misleading information", | |
"fake property", | |
"scam attempt", | |
"legitimate listing" | |
] | |
# Perform zero-shot classification | |
try: | |
classifier = load_model("zero-shot-classification", "typeform/mobilebert-uncased-mnli") | |
result = classifier(text_to_analyze, risk_categories, multi_label=True) | |
except Exception as e: | |
logger.error(f"Model error in fraud classification: {str(e)}") | |
fraud_classification['alert_level'] = 'error' | |
fraud_classification['high_risk'].append(f"Model error: {str(e)}") | |
fraud_classification['alert_score'] = 1.0 | |
return fraud_classification | |
# Process classification results | |
fraud_score = 0.0 | |
for label, score in zip(result.get('labels', []), result.get('scores', [])): | |
if label != "legitimate listing": | |
try: | |
score_val = float(score) | |
except Exception: | |
score_val = 0.0 | |
fraud_score += score_val | |
fraud_classification['confidence_scores'][label] = score_val | |
# Normalize fraud score to 0-1 range | |
try: | |
fraud_score = min(1.0, fraud_score / (len(risk_categories) - 1)) | |
except Exception: | |
fraud_score = 0.0 | |
fraud_classification['alert_score'] = fraud_score | |
# Define fraud indicators to check | |
fraud_indicators = { | |
'high_risk': [ | |
r'urgent|immediate|hurry|limited time|special offer', | |
r'bank|transfer|wire|payment|money', | |
r'fake|scam|fraud|illegal|unauthorized', | |
r'guaranteed|promised|assured|certain', | |
r'contact.*whatsapp|whatsapp.*contact', | |
r'price.*negotiable|negotiable.*price', | |
r'no.*documents|documents.*not.*required', | |
r'cash.*only|only.*cash', | |
r'off.*market|market.*off', | |
r'under.*table|table.*under' | |
], | |
'medium_risk': [ | |
r'unverified|unconfirmed|unchecked', | |
r'partial|incomplete|missing', | |
r'different.*location|location.*different', | |
r'price.*increased|increased.*price', | |
r'no.*photos|photos.*not.*available', | |
r'contact.*email|email.*contact', | |
r'agent.*not.*available|not.*available.*agent', | |
r'property.*not.*viewable|not.*viewable.*property', | |
r'price.*changed|changed.*price', | |
r'details.*updated|updated.*details' | |
], | |
'low_risk': [ | |
r'new.*listing|listing.*new', | |
r'recent.*update|update.*recent', | |
r'price.*reduced|reduced.*price', | |
r'contact.*phone|phone.*contact', | |
r'agent.*available|available.*agent', | |
r'property.*viewable|viewable.*property', | |
r'photos.*available|available.*photos', | |
r'documents.*available|available.*documents', | |
r'price.*fixed|fixed.*price', | |
r'details.*complete|complete.*details' | |
] | |
} | |
# Check for fraud indicators in text | |
for risk_level, patterns in fraud_indicators.items(): | |
for pattern in patterns: | |
try: | |
matches = re.finditer(pattern, text_to_analyze, re.IGNORECASE) | |
for match in matches: | |
indicator = match.group(0) | |
if indicator not in fraud_classification[risk_level]: | |
fraud_classification[risk_level].append(indicator) | |
except Exception as e: | |
logger.warning(f"Regex error in fraud indicator pattern '{pattern}': {str(e)}") | |
# Determine alert level based on fraud score and indicators | |
try: | |
if fraud_score > 0.7 or len(fraud_classification['high_risk']) > 0: | |
fraud_classification['alert_level'] = 'critical' | |
elif fraud_score > 0.5 or len(fraud_classification['medium_risk']) > 2: | |
fraud_classification['alert_level'] = 'high' | |
elif fraud_score > 0.3 or len(fraud_classification['medium_risk']) > 0: | |
fraud_classification['alert_level'] = 'medium' | |
elif fraud_score > 0.1 or len(fraud_classification['low_risk']) > 0: | |
fraud_classification['alert_level'] = 'low' | |
else: | |
fraud_classification['alert_level'] = 'minimal' | |
except Exception as e: | |
logger.warning(f"Error determining alert level: {str(e)}") | |
fraud_classification['alert_level'] = 'minimal' | |
# Additional checks for common fraud patterns | |
try: | |
if re.search(r'price.*too.*good|too.*good.*price', text_to_analyze, re.IGNORECASE): | |
fraud_classification['high_risk'].append("Unrealistically low price") | |
if re.search(r'no.*inspection|inspection.*not.*allowed', text_to_analyze, re.IGNORECASE): | |
fraud_classification['high_risk'].append("No property inspection allowed") | |
if re.search(r'owner.*abroad|abroad.*owner', text_to_analyze, re.IGNORECASE): | |
fraud_classification['medium_risk'].append("Owner claims to be abroad") | |
if re.search(r'agent.*unavailable|unavailable.*agent', text_to_analyze, re.IGNORECASE): | |
fraud_classification['medium_risk'].append("Agent unavailable for verification") | |
except Exception as e: | |
logger.warning(f"Error in additional fraud pattern checks: {str(e)}") | |
# Check for inconsistencies in property details | |
try: | |
if isinstance(property_details, dict) and 'price' in property_details and 'market_value' in property_details: | |
price_val = float(str(property_details['price']).replace(',', '').replace('₹', '').strip()) | |
market_value_val = float(str(property_details['market_value']).replace(',', '').replace('₹', '').strip()) | |
if price_val < market_value_val * 0.5: | |
fraud_classification['high_risk'].append("Price significantly below market value") | |
except Exception as e: | |
logger.warning(f"Error checking price/market_value: {str(e)}") | |
return fraud_classification | |
except Exception as e: | |
logger.error(f"Error in fraud classification: {str(e)}") | |
return { | |
'alert_level': 'error', | |
'alert_score': 1.0, | |
'high_risk': [f"Error in fraud classification: {str(e)}"], | |
'medium_risk': [], | |
'low_risk': [], | |
'confidence_scores': {} | |
} | |