File size: 3,968 Bytes
6b21b32
bf70aa2
 
 
9bd35d7
bf70aa2
11184ec
cea309f
 
 
6b21b32
11184ec
 
cea309f
 
11184ec
bf70aa2
 
9bd35d7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
11184ec
bf70aa2
11184ec
bf70aa2
9bd35d7
bf70aa2
11184ec
9bd35d7
 
 
 
 
 
 
 
 
 
 
11184ec
 
9bd35d7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
bf70aa2
9bd35d7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
bf70aa2
11184ec
9bd35d7
11184ec
 
9bd35d7
 
bf70aa2
11184ec
bf70aa2
 
11184ec
 
 
 
bf70aa2
11184ec
 
 
bf70aa2
11184ec
 
 
 
bf70aa2
6b21b32
11184ec
6b21b32
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
from fastapi import FastAPI
from pydantic import BaseModel
import joblib
import re
from transformers import pipeline

# Initialize FastAPI app
app = FastAPI(
    title="Email Classification API",
    version="1.0.0",
    description="Classifies support emails into categories and masks personal information.",
    docs_url="/docs",
    redoc_url="/redoc"
)

# Load pre-trained model
model = joblib.load("model.joblib")

# Initialize NER pipeline
ner = pipeline('ner', model='Davlan/xlm-roberta-base-ner-hrl', grouped_entities=True)

# Map NER entity labels to token names
NER_TO_TOKEN = {
    'PER': 'full_name',
    'EMAIL': 'email',
    'DATE': 'dob'
}

# Regex patterns for PII detection
EMAIL_REGEX = r'\b[\w\.-]+@[\w\.-]+\.\w{2,}\b'
AADHAAR_REGEX = r'\b\d{4}\s?\d{4}\s?\d{4}\b'
CARD_REGEX = r'\b(?:\d[ -]*?){13,19}\b'
CVV_REGEX = r'(?i)\b(?:cvv[:\s\-]*)?(\d{3,4})\b'
EXPIRY_REGEX = r'\b(0[1-9]|1[0-2])[\/\-]\d{2,4}\b'
PHONE_REGEX = r'\+?\d[\d\s\-]{7,14}\d'
DOB_REGEX = r'\b\d{1,2}[\/\-\.\s]\d{1,2}[\/\-\.\s]\d{2,4}\b'

# Input schema
class EmailInput(BaseModel):
    input_email_body: str

# Updated PII Masking Function with NER and regex
def mask_and_store_all_pii(text):
    text = str(text)
    mapping = {}
    counter = {
        'full_name': 0,
        'email': 0,
        'phone_number': 0,
        'dob': 0,
        'aadhar_num': 0,
        'credit_debit_no': 0,
        'cvv_no': 0,
        'expiry_no': 0
    }
    entity_list = []

    # NER masking
    entities = ner(text)
    for ent in entities:
        label = ent['entity_group']
        if label in NER_TO_TOKEN:
            token_name = NER_TO_TOKEN[label]
            original = ent['word'].replace('##', '')
            token = f"[{token_name}_{counter[token_name]:03d}]"
            if original in text:
                start = text.index(original)
                end = start + len(original)
                text = text.replace(original, token, 1)
                mapping[token] = original
                counter[token_name] += 1
                entity_list.append({
                    "position": [start, start + len(token)],
                    "classification": token_name,
                    "entity": original
                })

    # Regex masking
    regex_map = [
        (CARD_REGEX, 'credit_debit_no'),
        (AADHAAR_REGEX, 'aadhar_num'),
        (PHONE_REGEX, 'phone_number'),
        (CVV_REGEX, 'cvv_no'),
        (EXPIRY_REGEX, 'expiry_no'),
        (EMAIL_REGEX, 'email'),
        (DOB_REGEX, 'dob')
    ]

    for regex, token_name in regex_map:
        for match in re.finditer(regex, text):
            original = match.group(0)
            token = f"[{token_name}_{counter[token_name]:03d}]"
            start = match.start()
            end = match.end()
            if original in text:
                text = text.replace(original, token, 1)
                mapping[token] = original
                counter[token_name] += 1
                entity_list.append({
                    "position": [start, start + len(token)],
                    "classification": token_name,
                    "entity": original
                })

    return text, mapping, entity_list

# Restore PII

def restore_pii(masked_text, pii_map):
    for placeholder, original in pii_map.items():
        masked_text = masked_text.replace(placeholder, original)
    return masked_text

# Classification Endpoint
@app.post("/classify")
def classify_email(data: EmailInput):
    raw_text = data.input_email_body

    # Masking
    masked_text, pii_map, entity_list = mask_and_store_all_pii(raw_text)

    # Prediction
    predicted_category = model.predict([masked_text])[0]

    return {
        "input_email_body": raw_text,
        "list_of_masked_entities": entity_list,
        "masked_email": masked_text,
        "category_of_the_email": predicted_category
    }

# Health check endpoint
@app.get("/")
def root():
    return {"message": "Email Classification API is running."}