bndl's picture
Upload 115 files
4f5540c
import torch
import os, pickle
import numpy as np
import pandas as pd
from polymerlearn.utils.graph_prep import get_AG_info, list_mask
from sklearn.model_selection import cross_val_score, train_test_split, KFold
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import r2_score as R2, mean_absolute_error as MAE
from sklearn.utils import shuffle
base_rep_dir = os.path.join('../../..',
'Representations')
def load_pickle(dir, mol_name):
f = os.path.join(dir, mol_name.lower() + '.pickle')
rep = pickle.load(open(f, 'rb'))
return rep
class RepDataset:
def __init__(self,
data,
Y_target,
rep_dir = base_rep_dir,
add_features = None,
ac = (20,33),
gc = (34,46),
#test_size = 0.25,
rep = 'CM',
standard_scale = False,
device = None
):
self.add_features = add_features
self.standard_scale = standard_scale
self.device = device
rep = rep.upper()
assert rep in ['CM', 'MBTR', 'SOAP', 'PI'], "Representation must be in ['CM', 'MBTR', 'SOAP']"
Y = data.loc[:,Y_target]
non_nan_mask = Y.notna()
if type(Y_target) == list:
assert Y_target.index('IV') < Y_target.index('Tg'), 'IV must come before Tg'
non_nan_mask['res_bool'] = False
non_nan_mask.loc[non_nan_mask[Y_target].all(1), 'res_bool'] = True
non_nan_mask = non_nan_mask['res_bool'].values
self.Y = Y[non_nan_mask].values # Get Y values
self.data = data.loc[non_nan_mask,:]
self.acid_included, self.glycol_included, self.acid_pcts, self.glycol_pcts = \
get_AG_info(self.data, ac, gc)
if self.add_features is not None:
self.add_features = list_mask(self.add_features, list(non_nan_mask))
# Get entire dataset:
# Structure: [([A, A], [G]), ..., ([A, A, A], [G, G, G, G])]
# len == length of dataset for this value
dirlook = os.path.join(rep_dir, rep, 'AG')
self.dataset = []
max_size = 0
for A, G in zip(self.acid_included, self.glycol_included):
Asamples = [load_pickle(dirlook, a).flatten() for a in A]
Gsamples = [load_pickle(dirlook, g).flatten() for g in G]
Asizes = [A.shape[0] for A in Asamples]
Gsizes = [G.shape[0] for G in Gsamples]
max_size = max(max(Asizes), max(Gsizes), max_size)
self.dataset.append((Asamples, Gsamples))
# Pad all representations with the size of the largest in either grouping
for i in range(len(self.dataset)):
for j in range(len(self.dataset[i])):
for k in range(len(self.dataset[i][j])):
cshape = self.dataset[i][j][k].shape[0]
self.dataset[i][j][k] = \
np.concatenate([self.dataset[i][j][k], np.zeros(max_size - cshape)])
self.size_AG = max_size * 2
def pool_dataset(self, pool_method = np.max, as_torch = False):
# Go through and pool each
pooled = []
i = 0
for A, G in self.dataset:
poolA = np.max(np.stack(A), axis = 0)
poolG = np.max(np.stack(G), axis = 0)
if self.add_features is not None:
toconcat_list = [poolA.flatten(), poolG.flatten(), self.add_features[i]]
else:
toconcat_list = [poolA.flatten(), poolG.flatten()]
pooled.append(np.concatenate(toconcat_list))
i += 1
P = np.asarray(pooled)
if as_torch:
return torch.from_numpy(P).to(self.device)
return P # If not returning torch
def cross_val_predict(self,
model,
pool_method = np.sum,
verbose = 0,
shuf = True,
folds = 5):
X = self.pool_dataset(pool_method)
if shuf:
X, y = shuffle(X, self.Y)
else:
y = self.Y
kf = KFold(n_splits = folds)
r2_scores = []
mae_scores = []
for train_idx, test_idx in kf.split(X):
#print(train_idx)
X_train, X_test = X[train_idx], X[test_idx]
y_train, y_test = y[train_idx], y[test_idx]
if self.standard_scale:
endlen = np.asarray(self.add_features).shape[1]
# Scale only additional values:
ss = StandardScaler().fit(np.array(X_train[:,-endlen:]))
X_train[:,-endlen:] = ss.transform(X_train[:,-endlen:])
X_test[:,-endlen:] = ss.transform(X_test[:,-endlen:])
#M = model.copy()
model.fit(X_train, y_train)
yhat = model.predict(X_test)
r2_scores.append(R2(y_test, yhat))
mae_scores.append(MAE(y_test, yhat))
#r2_scores = cross_val_score(model, X, y, verbose = verbose, scoring = 'r2')
#mae_scores = -1.0 * cross_val_score(model, X, y, verbose = verbose, scoring = 'neg_mean_absolute_error')
#scores = cross_val
return r2_scores, mae_scores
def __len__(self):
# Sum size of both A and G along with add_features
add_f = len(self.add_features[0])
return add_f + self.size_AG
def Kfold_CV(self, folds, pool_method = np.max):
X = self.pool_dataset(pool_method)
y = self.Y
#y = torch.from_numpy(self.Y).to(self.device)
kfold = KFold(n_splits=folds, shuffle = True)
for train_idx, test_idx in kfold.split(X):
X_train, X_test = X[train_idx], X[test_idx]
y_train, y_test = y[train_idx], y[test_idx]
if self.standard_scale:
endlen = np.asarray(self.add_features).shape[1]
# Scale only additional values:
ss = StandardScaler().fit(X_train[:,-endlen:])
X_train[:,-endlen:] = ss.transform(X_train[:,-endlen:])
X_test[:,-endlen:] = ss.transform(X_test[:,-endlen:])
yield torch.from_numpy(X_train).float(), \
torch.from_numpy(X_test).float(), \
torch.from_numpy(y_train).float(), \
torch.from_numpy(y_test).float(), \
torch.from_numpy(train_idx).float(), \
torch.from_numpy(test_idx).float()
def test_dataset():
data = pd.read_csv('../../../dataset/pub_data.csv')
dataset = RepDataset(data, Y_target = 'IV', rep = 'SOAP')
X = dataset.pool_dataset()
print(X[0])
print(X[0].shape)
def test_cv():
from polymerlearn.utils.train_graphs import get_IV_add
data = pd.read_csv('../../../dataset/pub_data.csv')
to_add = get_IV_add(data)
dataset = RepDataset(data, Y_target = 'IV', rep = 'SOAP', add_features=to_add)
from sklearn.ensemble import RandomForestRegressor
#from sklearn.neural_network import MLPRegressor
# model = MLPRegressor(
# hidden_layer_sizes=(128, 128, 64),
# solver = 'adam',
# learning_rate_init = 0.001,
# batch_size = 32,
# max_iter = 500
# )
model = RandomForestRegressor()
scores = dataset.cross_val_predict(model, verbose = 2)
print(np.mean(scores))
def test_joint():
from polymerlearn.utils.train_graphs import get_IV_add
data = pd.read_csv('../../../dataset/pub_data.csv')
to_add = get_IV_add(data)
dataset = RepDataset(data, Y_target = ['IV', 'Tg'], rep = 'SOAP', add_features=to_add)
print(dataset.Y)
if __name__ == '__main__':
#test_dataset()
test_joint()