Spaces:
Sleeping
Sleeping
File size: 9,554 Bytes
6ae852e |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 |
from numbers import Number
from typing import Literal, Union, Sequence
import pandas as pd
from sklearn.base import TransformerMixin
from sklearn.exceptions import NotFittedError
from sklearn.utils.validation import check_is_fitted
from torch.utils.data import Dataset
from deepscreen.data.utils import label_transform, FlexibleIterable
class BaseEntityDataset(Dataset):
def __init__(
self,
dataset_path: str,
use_col_prefixes=('X', 'Y', 'ID', 'U')
):
# Read the data table header row first to filter columns and create column dtype dict
df = pd.read_csv(
dataset_path,
header=0, nrows=0,
usecols=lambda col: col.startswith(use_col_prefixes)
)
# Read the whole data table
df = pd.read_csv(
dataset_path,
header=0,
usecols=df.columns,
dtype={col: 'float32' if col.startswith('Y') else 'string' for col in df.columns}
)
self.df = df
self.label_cols = [col for col in df.columns if col.startswith('Y')]
self.label_unit_cols = [col for col in df.columns if col.startswith('U')]
self.entity_id_cols = [col for col in df.columns if col.startswith('ID')]
self.entity_cols = [col for col in df.columns if col.startswith('X')]
def __len__(self):
return len(self.df.index)
def __getitem__(self, idx):
raise NotImplementedError
# TODO test transform
class SingleEntitySingleTargetDataset(BaseEntityDataset):
def __init__(
self,
dataset_path: str,
task: Literal['regression', 'binary', 'multiclass'],
n_classes: int,
featurizer: callable,
transformer: TransformerMixin = None,
thresholds: Union[Number, Sequence[Number]] = None,
discard_intermediate: bool = None,
forward_fill: bool = True
):
super().__init__(dataset_path)
assert len(self.entity_cols) == 1, 'The dataset contains more than 1 entity column (starting with `X`).'
if len(self.label_cols) >= 0:
assert len(self.label_cols) == 1, 'The dataset contains more than 1 label column (starting with `Y`).'
# Remove trailing `1`s in column names for flexibility
self.df.columns = self.df.columns.str.rstrip('1')
# Forward-fill non-label columns
nonlabel_cols = self.label_unit_cols + self.entity_id_cols + self.entity_cols
if forward_fill:
self.df[nonlabel_cols] = self.df[nonlabel_cols].ffill(axis=0)
# Process target labels for training/testing if exist
if self.label_cols:
# Transform target labels
self.df[self.label_cols] = self.df[self.label_cols].apply(
label_transform,
units=self.df.get('U', None),
thresholds=thresholds,
discard_intermediate=discard_intermediate).astype('float32')
# Filter out rows with a NaN in Y (missing values); use inplace to save memory
self.df.dropna(subset=self.label_cols, inplace=True)
# Validate target labels
# TODO: check sklearn.utils.multiclass.check_classification_targets
match task:
case 'regression':
assert all(self.df['Y'].apply(lambda x: isinstance(x, Number))), \
f"Y for task `regression` must be numeric; got {set(self.df['Y'].apply(type))}."
case 'binary':
assert all(self.df['Y'].isin([0, 1])), \
f"Y for task `binary` (classification) must be 0 or 1, but Y got {pd.unique(self.df['Y'])}." \
"\nYou may set `thresholds` to discretize continuous labels."
case 'multiclass':
assert n_classes >= 3, f'n_classes for task `multiclass` (classification) must be at least 3.'
assert all(self.df['Y'].apply(lambda x: x.is_integer() and x >= 0)), \
f"``Y` for task `multiclass` (classification) must be non-negative integers, " \
f"but `Y` got {pd.unique(self.df['Y'])}." \
"\nYou may set `thresholds` to discretize continuous labels."
target_n_unique = self.df['Y'].nunique()
assert target_n_unique == n_classes, \
f"You have set n_classes for task `multiclass` (classification) task to {n_classes}, " \
f"but `Y` has {target_n_unique} unique labels."
if transformer:
self.df['X'] = self.df['X'].apply(featurizer)
try:
check_is_fitted(transformer)
self.df['X'] = list(transformer.transform(self.df['X']))
except NotFittedError:
self.df['X'] = list(transformer.fit_transform(self.df['X']))
# Skip sample-wise feature extraction because it has already been done dataset-wise
self.featurizer = lambda x: x
self.featurizer = featurizer
self.n_classes = n_classes
self.df['ID'] = self.df.get('ID', self.df['X'])
def __getitem__(self, idx):
sample = self.df.loc[idx]
return {
'X': self.featurizer(sample['X']),
'ID': sample['ID'],
'Y': sample.get('Y')
}
# TODO WIP
class MultiEntityMultiTargetDataset(BaseEntityDataset):
def __init__(
self,
dataset_path: str,
task: FlexibleIterable[Literal['regression', 'binary', 'multiclass']],
n_class: FlexibleIterable[int],
featurizers: FlexibleIterable[callable],
thresholds: FlexibleIterable[Union[Number, Sequence[Number]]] = None,
discard_intermediate: FlexibleIterable[bool] = None,
):
super().__init__(dataset_path)
label_col_prefix = tuple('Y')
nonlabel_col_prefixes = tuple(('X', 'ID', 'U'))
allowed_col_prefixes = label_col_prefix + nonlabel_col_prefixes
# Read the headers first to filter columns and create column dtype dict
df = pd.read_csv(
dataset_path,
header=0, nrows=0,
usecols=lambda col: col.startswith(allowed_col_prefixes)
)
# Read the whole table
df = pd.read_csv(
dataset_path,
header=0,
usecols=df.columns,
dtype={col: 'float32' if col.startswith('Y') else 'string' for col in df.columns}
)
label_cols = [col for col in df.columns if col.startswith(label_col_prefix)]
nonlabel_cols = [col for col in df.columns if col.startswith(nonlabel_col_prefixes)]
self.entity_cols = [col for col in nonlabel_cols if col.startswith('X')]
# Forward-fill all non-label columns
df[nonlabel_cols] = df[nonlabel_cols].ffill(axis=0)
# Process target labels for training/testing
if label_cols:
# Transform target labels
df[label_cols] = df[label_cols].apply(label_transform, units=df.get('U', None), thresholds=thresholds,
discard_intermediate=discard_intermediate).astype('float32')
# Filter out rows with a NaN in Y (missing values)
df.dropna(subset=label_cols, inplace=True)
# Validate target labels
# TODO: check sklearn.utils.multiclass.check_classification_targets
# WIP
match task:
case 'regression':
assert all(df['Y'].apply(lambda x: isinstance(x, Number))), \
f"Y for task `regression` must be numeric; got {set(df['Y'].apply(type))}."
case 'binary':
assert all(df['Y'].isin([0, 1])), \
f"Y for task `binary` must be 0 or 1, but Y got {pd.unique(df['Y'])}." \
"\nYou may set `thresholds` to discretize continuous labels."
case 'multiclass':
assert len(label_cols) == len(n_class), \
(f'Data table has {len(label_cols)} label columns (`Y*`) but you have specified '
f'n_class of length {len(n_class)} for task `multiclass`.')
for label, n in zip(df[label_cols], n_class):
assert n >= 3, f'n_class for task `multiclass` must be at least 3.'
assert all(label.apply(lambda x: x.is_integer() and x >= 0)), \
f"Y for task `multiclass` must be non-negative integers, " \
f"but Y got {pd.unique(label)}." \
"\nYou may set `thresholds` to discretize continuous labels."
target_n_unique = label.nunique()
assert target_n_unique == n, \
f"You have set n_classes for task `multiclass` task to {n}, " \
f"but Y has {target_n_unique} unique labels."
self.df = df
self.featurizers = featurizers
self.n_class = n_class
def __len__(self):
return len(self.df.index)
# WIP
def __getitem__(self, idx):
sample = self.df.loc[idx]
return {
'X': [featurizer(x) for featurizer, x in zip(self.featurizers, sample[self.entity_cols])],
'ID': sample.get('ID', sample['X']),
'Y': sample.get('Y')
}
|