File size: 2,106 Bytes
492deb9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58

import argparse
import json
import os
from pyspark.sql import SparkSession
from anonyspark.masking import (
    mask_email_udf, mask_name_udf, mask_date_udf,
    mask_ssn_udf, mask_itin_udf, mask_phone_udf
)

def apply_masking(df, schema):
    """

    Apply masking UDFs based on schema definitions.

    """
    for column, dtype in schema.items():
        if dtype == "email":
            df = df.withColumn(f"masked_{column}", mask_email_udf(df[column]))
        elif dtype == "name":
            df = df.withColumn(f"masked_{column}", mask_name_udf(df[column]))
        elif dtype == "dob":
            df = df.withColumn(f"masked_{column}", mask_date_udf(df[column]))
        elif dtype == "ssn":
            df = df.withColumn(f"masked_{column}", mask_ssn_udf(df[column]))
        elif dtype == "itin":
            df = df.withColumn(f"masked_{column}", mask_itin_udf(df[column]))
        elif dtype == "phone":
            df = df.withColumn(f"masked_{column}", mask_phone_udf(df[column]))
    return df

def main():
    parser = argparse.ArgumentParser(description="AnonySpark CLI for masking sensitive data.")
    parser.add_argument('--input', type=str, required=True, help='Path to input CSV file')
    parser.add_argument('--output', type=str, required=True, help='Directory to save masked output')
    parser.add_argument('--schema', type=str, required=True, help='Path to masking schema JSON file')
    args = parser.parse_args()

    # Create output directory if it doesn't exist
    os.makedirs(args.output, exist_ok=True)

    # Start Spark
    spark = SparkSession.builder.master("local[*]").appName("AnonysparkCLI").getOrCreate()

    # Load data and schema
    df = spark.read.csv(args.input, header=True)
    with open(args.schema, 'r') as f:
        schema = json.load(f)

    # Apply masking
    masked_df = apply_masking(df, schema)

    # Save to output directory
    masked_df.write.mode("overwrite").csv(args.output, header=True)

    print(f"Masked file written to: {args.output}")

if __name__ == "__main__":
    main()