Duplicate_Records_Prediction / duplicate training and prediction.py
saritha5's picture
Upload 21 files
c5f0a6a
#!/usr/bin/env python
# coding: utf-8
# In[20]:
import os
import sys
import random
import statistics
import numpy as np
import matplotlib.pyplot as plt
import threading
import time
import queue
sys.path.append(os.path.abspath("../lib"))
sys.path.append(os.path.abspath("../supv"))
sys.path.append(os.path.abspath("../text"))
from util import *
from sampler import *
from tnn import *
from txproc import *
emailDoms = ["yahoo.com", "gmail.com", "hotmail.com", "aol.com"]
# In[21]:
def mutStr(st):
"""mutate a char in string"""
l = len(st)
ci = randomInt(0, l - 1)
cv = st[ci]
if cv.isdigit():
r = selectRandomFromList(dig)
elif cv.isupper():
r = selectRandomFromList(ucc)
else:
r = selectRandomFromList(lcc)
nst = st[:ci] + r + st[ci+1:] if l > 1 else r
return nst
# In[23]:
def createPosMatch(rec, fi):
"""
create positive match by mutating a field
"""
mrec = rec.copy()
fv = mrec[fi]
nc = fv.split()
le = len(nc)
if fi == 0:
#name
if isEventSampled(50):
nfv = nc[0] + " " + selectRandomFromList(ucc) + " " + nc[1]
else:
nc[1] = mutStr(nc[1])
nfv = nc[0] + " " + nc[1]
elif fi == 1:
#address
mutated = False
if isEventSampled(50):
mutated = True
s = nc[-1]
if s == "Rd":
nc[-1] = "Road"
elif s == "Ave":
nc[-1] = "Avenue"
elif s == "St":
nc[-1] = "Street"
elif s == "Dr":
nc[-1] = "Drive"
else:
mutated = False
if not mutated:
si = randomInt(0, 1)
nc[si] = mutStr(nc[si])
nfv = " ".join(nc)
elif fi == 2:
#city
si = randomInt(0, le - 1) if le > 1 else 0
nc[si] = mutStr(nc[si])
nfv = " ".join(nc) if le > 1 else nc[0]
elif fi == 3:
#state
nc[0] = mutStr(nc[0])
nfv = nc[0]
elif fi == 4:
#zip
nc[0] = mutStr(nc[0])
nfv = nc[0]
elif fi == 5:
#email
if isEventSampled(60):
nc[0] = mutStr(nc[0])
nfv = nc[0]
else:
nfv = genLowCaseID(randomInt(4, 10)) + "@" + selectRandomFromList(emailDoms)
mrec[fi] = nfv
return mrec
# In[24]:
def printNgramVec(ngv):
"""
print ngram vector
"""
print("ngram vector")
for i in range(len(ngv)):
if ngv[i] > 0:
print("{} {}".format(i, ngv[i]))
# In[25]:
def createNegMatch(tdata, ri):
"""
create negative match by randomly selecting another record
"""
nri = randomInt(0, len(tdata)-1)
while nri == ri:
nri = randomInt(0, len(tdata)-1)
return tdata[nri]
# In[26]:
def createNgramCreator():
""" create ngram creator """
cng = CharNGram(["lcc", "ucc", "dig"], 3, True)
spc = ["@", "#", "_", "-", "."]
cng.addSpChar(spc)
cng.setWsRepl("$")
cng.finalize()
return cng
# In[27]:
def getSim(rec, incOutput=True):
""" get rec pair similarity """
#print(rec)
sim = list()
for i in range(6):
#print("field " + str(i))
if i == 3:
s = levenshteinSimilarity(rec[i],rec[i+6])
else:
ngv1 = cng.toMgramCount(rec[i])
ngv2 = cng.toMgramCount(rec[i+6])
#printNgramVec(ngv1)
#printNgramVec(ngv2)
s = cosineSimilarity(ngv1, ngv2)
sim.append(s)
ss = toStrFromList(sim, 6)
srec = ss + "," + rec[-1] if incOutput else ss
return srec
# In[28]:
class SimThread (threading.Thread):
""" multi threaded similarity calculation """
def __init__(self, tName, cng, qu, incOutput, outQu, outQuSize):
""" initialize """
threading.Thread.__init__(self)
self.tName = tName
self.cng = cng
self.qu = qu
self.incOutput = incOutput
self.outQu = outQu
self.outQuSize = outQuSize
def run(self):
""" exeution """
while not exitFlag:
rec = dequeue(self.qu, workQuLock)
if rec is not None:
srec = getSim(rec, self.incOutput)
if outQu is None:
print(srec)
else:
enqueue(srec, self.outQu, outQuLock, self.outQuSize)
def createThreads(nworker, cng, workQu, incOutput, outQu, outQuSize):
"""create worker threads """
threadList = list(map(lambda i : "Thread-" + str(i+1), range(nworker)))
threads = list()
if(outQu is None):
outQu = queue.Queue(outQuSize)
for tName in threadList:
thread = SimThread(tName, cng, workQu, incOutput, outQu, outQuSize)
thread.start()
threads.append(thread)
return threads
def enqueue(rec, qu, quLock, qSize):
""" enqueue record """
queued = False
while not queued:
quLock.acquire()
if qu.qsize() < qSize - 1:
qu.put(rec)
queued = True
quLock.release()
time.sleep(1)
def dequeue(qu, quLock):
""" dequeue record """
rec = None
quLock.acquire()
if not qu.empty():
rec = qu.get()
quLock.release()
return rec
# In[30]:
if __name__ == "__main__":
op = sys.argv[1]
#multi threading related
workQuLock = threading.Lock()
outQuLock = threading.Lock()
exitFlag = False
if op == "gen":
fp= open(r'pers.txt', 'w', encoding='utf-8')
""" generate data from from source file"""
srcFilePath = sys.argv[2]
i = 0
x=""
for rec in fileRecGen(srcFilePath, ","):
if i > 0:
nrec = list()
fname = rec[0]
lname = rec[1]
nrec.append(fname + " " + lname)
'''
nrec.append(rec[-9][1:-1])
nrec.append(rec[-8][1:-1])
nrec.append(rec[-6][1:-1])
'''
nrec.append(rec[-9])
nrec.append(rec[-8])
nrec.append(rec[-6])
z = rec[-5]
nrec.append(z)
nrec.append(rec[-2])
print(",".join(nrec))
x= ",".join(nrec)
print(x)
fp.write(x)
fp.write("\n")
i += 1
fp.close()
if op == "genad":
""" generate additional data by swapping name and address with another random record"""
srcFilePath = sys.argv[2]
nrec = int(sys.argv[3])
tdata = getFileLines(srcFilePath)
x=""
fp= open(r'pers_new.txt', 'w', encoding='utf-8')
for _ in range(nrec):
r1 = selectRandomFromList(tdata)
#print(",".join(r1))
r2 = selectRandomFromList(tdata)
while r1[0] == r2[0]:
r1 = selectRandomFromList(tdata)
r2 = selectRandomFromList(tdata)
nm = r2[0]
r1[0] = nm
r1[1] = r2[1]
email = nm.split()[0].lower() + "@" + r1[5].split("@")[1]
r1[5] = email
print(",".join(r1))
x= ",".join(r1)
print(x)
fp.write(x)
fp.write("\n")
fp.close()
if op == "gendup":
fp= open(r'pers_new_dup.txt', 'w', encoding='utf-8')
""" replace some records in first file with records from another file"""
srcFilePath = sys.argv[2]
dupFilePath = sys.argv[3]
ndup = int(sys.argv[4])
tdata = getFileLines(srcFilePath, None)
percen = 10
tdataSec = list()
while len(tdataSec) < ndup:
tdataSec = getFileSampleLines(dupFilePath, percen)
print("----------", tdataSec)
percen = int(percen * ndup / len(tdataSec) + 2)
tdataSec = selectRandomSubListFromList(tdataSec, ndup)
drecs = list()
for rec in tdataSec:
fi = randomInt(0, 5)
mrec = createPosMatch(rec, fi)
if isEventSampled(30):
fi = randomInt(0, 5)
mrec = createPosMatch(mrec, fi)
drecs.append(",".join(mrec))
setListRandomFromList(tdata, drecs)
for r in tdata:
print(r)
#x= ",".join(nrec)
fp.write(r)
fp.write("\n")
fp.close()
elif op == "genpn":
fp= open(r'ppers1.txt', 'w', encoding='utf-8')
""" generate pos pos and pos neg paire """
srcFilePath = sys.argv[2]
tdata = getFileLines(srcFilePath, None) if len(sys.argv) == 3 else getFileLines(sys.argv[3], None)
ri = 0
for rec in fileRecGen(srcFilePath, "\n"):
for _ in range(2):
fi = randomInt(0, 5)
rec_copy = rec[0].split(",")
mrec = createPosMatch(rec_copy, fi)
if isEventSampled(30):
fi = randomInt(0, 5)
mrec = createPosMatch(mrec, fi)
#print(",".join(rec) + "," + ",".join(mrec) + "," + "1")
x= ",".join(rec)+ "," + ",".join(mrec) + "," + "1"
print(x)
fp.write(x)
fp.write("\n")
for _ in range(2):
mrec = createNegMatch(tdata, ri)
#print(",".join(rec) + "," + mrec + "," + "0")
x= ",".join(rec)+ "," + mrec + "," + "0"
fp.write(x)
fp.write("\n")
ri += 1
fp.close()
elif op == "sim":
fp= open(r'spers_tr.txt', 'w', encoding='utf-8')
""" create field pair similarity """
srcFilePath = sys.argv[2]
cng = CharNGram(["lcc", "ucc", "dig"], 3, True)
spc = ["@", "#", "_", "-", "."]
cng.addSpChar(spc)
cng.setWsRepl("$")
cng.finalize()
c = 0
x = ""
for rec in fileRecGen(srcFilePath, ","):
#print(",".join(rec))
srec = getSim(rec)
#print(srec)
c += 1
x= ",".join(srec)
print(x)
fp.write(srec)
fp.write("\n")
fp.close()
elif op == "msim":
""" create field pair similarity in parallel"""
srcFilePath = sys.argv[2]
nworker = int(sys.argv[3])
cng = createNgramCreator()
c = 0
#create threads
qSize = 100
outQu = queue.Queue(qSize)
workQu = queue.Queue(qSize)
#threads = createThreads(nworker, cng, workQu, True, None, None)
threads = createThreads(nworker, cng, workQu, True, outQu, qSize)
# print(rec, workQu, qSize)
for rec in fileRecGen(srcFilePath, ","):
enqueue(rec, workQu, workQuLock, qSize)
#wrqp up
while not workQu.empty():
pass
exitFlag = True
for t in threads:
t.join()
elif op == "nnTrain":
""" train neural network model """
prFile = sys.argv[2]
regr = FeedForwardNetwork(prFile)
regr.buildModel()
FeedForwardNetwork.batchTrain(regr)
elif op == "nnPred":
""" predict with neural network model """
newFilePath = sys.argv[2]
existFilePath = sys.argv[3]
nworker = int(sys.argv[4])
prFile = sys.argv[5]
regr = FeedForwardNetwork(prFile)
regr.buildModel()
cng = createNgramCreator()
#create threads
qSize = 100
workQu = queue.Queue(qSize)
outQu = queue.Queue(qSize)
threads = createThreads(nworker, cng, workQu, False, outQu, qSize)
for nrec in fileRecGen(newFilePath):
srecs = list()
ecount = 0
y_pred = []
# print("processing ", nrec)
for erec in fileRecGen(existFilePath):
rec = nrec.copy()
rec.extend(erec)
# print(rec)
enqueue(rec, workQu, workQuLock, qSize)
srec = dequeue(outQu, outQuLock)
if srec is not None:
srecs.append(strToFloatArray(srec))
ecount += 1
# print("srecs1", srecs)
#wait til workq queue is drained
while not workQu.empty():
pass
#drain out queue
while len(srecs) < ecount:
srec = dequeue(outQu, outQuLock)
if srec is not None:
srecs.append(strToFloatArray(srec))
time.sleep(1)
# print("srecs--------------", srecs)
#predict
simMax = 0
sims = FeedForwardNetwork.predict(regr, srecs)
#sims = FeedForwardNetwork.predict(nrec, srecs)
sims = sims.reshape(sims.shape[0])
y_pred.append(max(sims))
#print("{} {:.3f}".format(nrec, y_pred))
print(nrec, max(y_pred))
#exitFlag = True
#for t in threads:
# t.join()
elif op == "nnPred_withthread":
""" predict with neural network model """
newFilePath = sys.argv[2]
existFilePath = sys.argv[3]
nworker = int(sys.argv[4])
prFile = sys.argv[5]
regr = FeedForwardNetwork(prFile)
regr.buildModel()
cng = createNgramCreator()
#create threads
qSize = 100
workQu = queue.Queue(qSize)
outQu = queue.Queue(qSize)
threads = createThreads(nworker, cng, workQu, False, outQu, qSize)
for nrec in fileRecGen(newFilePath):
srecs = list()
ecount = 0
print("processing ", nrec)
for erec in fileRecGen(existFilePath):
rec = nrec.copy()
rec.extend(erec)
print(rec)
enqueue(rec, workQu, workQuLock, qSize)
srec = dequeue(outQu, outQuLock)
if srec is not None:
srecs.append(strToFloatArray(srec))
ecount += 1
#wait til workq queue is drained
while not workQu.empty():
pass
#drain out queue
while len(srecs) < ecount:
srec = dequeue(outQu, outQuLock)
if srec is not None:
srecs.append(strToFloatArray(srec))
time.sleep(1)
#predict
simMax = 0
sims = FeedForwardNetwork.predict(regr, srecs)
sims = sims.reshape(sims.shape[0])
print("{} {:.3f}".format(nrec, max(sims)))
exitFlag = True
for t in threads:
t.join()
# In[ ]: