|
import torch |
|
import os, pickle |
|
import numpy as np |
|
import pandas as pd |
|
|
|
from polymerlearn.utils.graph_prep import get_AG_info, list_mask |
|
from sklearn.model_selection import cross_val_score, train_test_split, KFold |
|
from sklearn.preprocessing import StandardScaler |
|
from sklearn.metrics import r2_score as R2, mean_absolute_error as MAE |
|
from sklearn.utils import shuffle |
|
|
|
base_rep_dir = os.path.join('../../..', |
|
'Representations') |
|
|
|
def load_pickle(dir, mol_name): |
|
f = os.path.join(dir, mol_name.lower() + '.pickle') |
|
rep = pickle.load(open(f, 'rb')) |
|
|
|
return rep |
|
|
|
class RepDataset: |
|
def __init__(self, |
|
data, |
|
Y_target, |
|
rep_dir = base_rep_dir, |
|
add_features = None, |
|
ac = (20,33), |
|
gc = (34,46), |
|
|
|
rep = 'CM', |
|
standard_scale = False, |
|
device = None |
|
): |
|
|
|
self.add_features = add_features |
|
self.standard_scale = standard_scale |
|
self.device = device |
|
|
|
rep = rep.upper() |
|
assert rep in ['CM', 'MBTR', 'SOAP', 'PI'], "Representation must be in ['CM', 'MBTR', 'SOAP']" |
|
|
|
Y = data.loc[:,Y_target] |
|
non_nan_mask = Y.notna() |
|
if type(Y_target) == list: |
|
assert Y_target.index('IV') < Y_target.index('Tg'), 'IV must come before Tg' |
|
non_nan_mask['res_bool'] = False |
|
non_nan_mask.loc[non_nan_mask[Y_target].all(1), 'res_bool'] = True |
|
non_nan_mask = non_nan_mask['res_bool'].values |
|
|
|
self.Y = Y[non_nan_mask].values |
|
self.data = data.loc[non_nan_mask,:] |
|
|
|
self.acid_included, self.glycol_included, self.acid_pcts, self.glycol_pcts = \ |
|
get_AG_info(self.data, ac, gc) |
|
|
|
if self.add_features is not None: |
|
self.add_features = list_mask(self.add_features, list(non_nan_mask)) |
|
|
|
|
|
|
|
|
|
|
|
dirlook = os.path.join(rep_dir, rep, 'AG') |
|
|
|
self.dataset = [] |
|
max_size = 0 |
|
for A, G in zip(self.acid_included, self.glycol_included): |
|
Asamples = [load_pickle(dirlook, a).flatten() for a in A] |
|
Gsamples = [load_pickle(dirlook, g).flatten() for g in G] |
|
|
|
Asizes = [A.shape[0] for A in Asamples] |
|
Gsizes = [G.shape[0] for G in Gsamples] |
|
|
|
max_size = max(max(Asizes), max(Gsizes), max_size) |
|
|
|
self.dataset.append((Asamples, Gsamples)) |
|
|
|
|
|
for i in range(len(self.dataset)): |
|
for j in range(len(self.dataset[i])): |
|
for k in range(len(self.dataset[i][j])): |
|
cshape = self.dataset[i][j][k].shape[0] |
|
self.dataset[i][j][k] = \ |
|
np.concatenate([self.dataset[i][j][k], np.zeros(max_size - cshape)]) |
|
|
|
self.size_AG = max_size * 2 |
|
|
|
def pool_dataset(self, pool_method = np.max, as_torch = False): |
|
|
|
pooled = [] |
|
i = 0 |
|
for A, G in self.dataset: |
|
poolA = np.max(np.stack(A), axis = 0) |
|
poolG = np.max(np.stack(G), axis = 0) |
|
if self.add_features is not None: |
|
toconcat_list = [poolA.flatten(), poolG.flatten(), self.add_features[i]] |
|
else: |
|
toconcat_list = [poolA.flatten(), poolG.flatten()] |
|
pooled.append(np.concatenate(toconcat_list)) |
|
|
|
i += 1 |
|
|
|
P = np.asarray(pooled) |
|
|
|
if as_torch: |
|
return torch.from_numpy(P).to(self.device) |
|
|
|
return P |
|
|
|
def cross_val_predict(self, |
|
model, |
|
pool_method = np.sum, |
|
verbose = 0, |
|
shuf = True, |
|
folds = 5): |
|
X = self.pool_dataset(pool_method) |
|
|
|
if shuf: |
|
X, y = shuffle(X, self.Y) |
|
else: |
|
y = self.Y |
|
|
|
kf = KFold(n_splits = folds) |
|
|
|
r2_scores = [] |
|
mae_scores = [] |
|
|
|
for train_idx, test_idx in kf.split(X): |
|
|
|
X_train, X_test = X[train_idx], X[test_idx] |
|
y_train, y_test = y[train_idx], y[test_idx] |
|
|
|
if self.standard_scale: |
|
endlen = np.asarray(self.add_features).shape[1] |
|
|
|
ss = StandardScaler().fit(np.array(X_train[:,-endlen:])) |
|
X_train[:,-endlen:] = ss.transform(X_train[:,-endlen:]) |
|
X_test[:,-endlen:] = ss.transform(X_test[:,-endlen:]) |
|
|
|
|
|
|
|
model.fit(X_train, y_train) |
|
|
|
yhat = model.predict(X_test) |
|
r2_scores.append(R2(y_test, yhat)) |
|
mae_scores.append(MAE(y_test, yhat)) |
|
|
|
|
|
|
|
|
|
|
|
return r2_scores, mae_scores |
|
|
|
def __len__(self): |
|
|
|
add_f = len(self.add_features[0]) |
|
return add_f + self.size_AG |
|
|
|
def Kfold_CV(self, folds, pool_method = np.max): |
|
|
|
X = self.pool_dataset(pool_method) |
|
y = self.Y |
|
|
|
kfold = KFold(n_splits=folds, shuffle = True) |
|
|
|
for train_idx, test_idx in kfold.split(X): |
|
X_train, X_test = X[train_idx], X[test_idx] |
|
y_train, y_test = y[train_idx], y[test_idx] |
|
|
|
if self.standard_scale: |
|
endlen = np.asarray(self.add_features).shape[1] |
|
|
|
ss = StandardScaler().fit(X_train[:,-endlen:]) |
|
X_train[:,-endlen:] = ss.transform(X_train[:,-endlen:]) |
|
X_test[:,-endlen:] = ss.transform(X_test[:,-endlen:]) |
|
|
|
yield torch.from_numpy(X_train).float(), \ |
|
torch.from_numpy(X_test).float(), \ |
|
torch.from_numpy(y_train).float(), \ |
|
torch.from_numpy(y_test).float(), \ |
|
torch.from_numpy(train_idx).float(), \ |
|
torch.from_numpy(test_idx).float() |
|
|
|
|
|
def test_dataset(): |
|
data = pd.read_csv('../../../dataset/pub_data.csv') |
|
dataset = RepDataset(data, Y_target = 'IV', rep = 'SOAP') |
|
|
|
X = dataset.pool_dataset() |
|
|
|
print(X[0]) |
|
print(X[0].shape) |
|
|
|
def test_cv(): |
|
from polymerlearn.utils.train_graphs import get_IV_add |
|
data = pd.read_csv('../../../dataset/pub_data.csv') |
|
to_add = get_IV_add(data) |
|
dataset = RepDataset(data, Y_target = 'IV', rep = 'SOAP', add_features=to_add) |
|
|
|
from sklearn.ensemble import RandomForestRegressor |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
model = RandomForestRegressor() |
|
scores = dataset.cross_val_predict(model, verbose = 2) |
|
|
|
print(np.mean(scores)) |
|
|
|
def test_joint(): |
|
from polymerlearn.utils.train_graphs import get_IV_add |
|
data = pd.read_csv('../../../dataset/pub_data.csv') |
|
to_add = get_IV_add(data) |
|
dataset = RepDataset(data, Y_target = ['IV', 'Tg'], rep = 'SOAP', add_features=to_add) |
|
|
|
print(dataset.Y) |
|
|
|
if __name__ == '__main__': |
|
|
|
test_joint() |