- Splitting is already done in the raw data before vocab creation.
- The following class is to load and manage the pre-processed splits together.
Tests
PATH_1K, CONDITIONS
labels = list(CONDITIONS.keys())
labels
splits = EHRDataSplits(PATH_1K)
splits.get_lengths()
prevalence = splits.get_label_counts(labels)
prevalence
splits.get_pos_wts(labels)
Cross check with raw
- Check total counts against raw_csv
- Check split counts against split/raw_csv
raw_cnds = pd.read_csv(f'{PATH_1K}/raw_original/conditions.csv', low_memory=False)
cnd_codes = list(CONDITIONS.values())
cnd_codes
int(CONDITIONS['diabetes'])
for label in labels:
print(label,':: ', raw_cnds[raw_cnds.CODE == int(CONDITIONS[label])].CODE.count())
raw_cnds_train = pd.read_csv(f'{PATH_1K}/raw_split/train/conditions.csv', low_memory=False)
raw_cnds_valid = pd.read_csv(f'{PATH_1K}/raw_split/valid/conditions.csv', low_memory=False)
raw_cnds_test = pd.read_csv(f'{PATH_1K}/raw_split/test/conditions.csv', low_memory=False)
for label in labels:
assert prevalence.loc[label].total == raw_cnds[raw_cnds.CODE == int(CONDITIONS[label])].CODE.count()
assert prevalence.loc[label].train == raw_cnds_train[raw_cnds_train.CODE == int(CONDITIONS[label])].CODE.count()
assert prevalence.loc[label].valid == raw_cnds_valid[raw_cnds_valid.CODE == int(CONDITIONS[label])].CODE.count()
assert prevalence.loc[label].test == raw_cnds_test [raw_cnds_test.CODE == int(CONDITIONS[label])]. CODE.count()
Labeling definition in fastai -- some processes need to be run on train
and applied to valid
This is completed in preprocessing (vocab & transform) as follows
- Vocabs created from train data
- Tokenizing unique values for different record codes & demographic values
- Calculating mean and std for age
- Vocabs applied to train, valid and test data
- With
numericalize
for record codes & demographic values - With normalizing of age with the mean / std from train
- With
Hence labeling in our case will be creating X and y
- X is the patient object
- y (for a single patient) needs to be a tensor made out of the patient's values for labels ('diabetes', 'stroke', 'alzheimers', 'coronary_heart', 'lung_cancer')
So creating the y
tensor is simply a matter of ..
- extracting the values of each of the labels from each
Patient
object - turning it into a
torch.FloatTensor
- and stacking them up using
torch.stack
tst_y = np.array((True, False, False, True), dtype='float')
torch.from_numpy(tst_y), torch.FloatTensor(tst_y)
2 ways of creating torch tensor from a numpy array, we will stick with the latter
y = []
for pt in splits.train:
y.append(torch.FloatTensor(np.array([pt.conditions[label] for label in labels], dtype='float')) )
y = torch.stack(y)
y.shape
y
Putting it into a function
def label_data(patient_ds, labels) -> 'x,y':
'''Extracts y from patient object, returns x=Patient object, y=tensor of conditions'''
def _get_y(ds, labels):
y = []
for pt in ds:
y.append( torch.FloatTensor(np.array([pt.conditions[label] for label in labels], dtype='float')) )
return torch.stack(y)
x, y = patient_ds, _get_y(patient_ds, labels)
return x,y
x_train,y_train = label_data(splits.train, labels)
x_valid,y_valid = label_data(splits.valid, labels)
x_test ,y_test = label_data(splits.test , labels)
y_train.shape, y_valid.shape, y_test.shape
labeled = LabelEHRData(*splits.get_splits(), labels)
Subclasses torch.utils.data.Dataset
- that is implements
__len__()
and__getitem__()
Since Patient
is a custom object and not a typical tensor, we need to handle the behavior for Dataset
, DataLoader
, etc to function correctly.
- Memory pinning is a good idea for better performance if lazy loading to GPU
- So when a DataLoader pins memory on a tensor and copy of the tensor is made on page-locked memory in RAM as opposed to swappable memory which speed up transfers to GPU
- But on custom data type like our
Patient
object, we need to define the behavior - Making a deep copy of the
Patient
object to mimick tensor behavior- Otherwise, given the Patient holds it's changed tensors, all tensors are CUDA tensors after the first epoch and DL tries to pin memory again and this causes an error (TODO: Need to elaborate)
def get_ds(x_train,y_train, x_valid,y_valid) -> 'train_ds, valid_ds':
train_ds,valid_ds = EHRDataset(x_train, y_train), EHRDataset(x_valid, y_valid)
return train_ds, valid_ds
Testing Lazy Load
train_ds, valid_ds = get_ds(*labeled.train, *labeled.valid)
len(train_ds), len(valid_ds)
len(labeled.train), len(labeled.x_train)
assert len(train_ds)==len(labeled.x_train)==len(labeled.y_train)
assert len(valid_ds)==len(labeled.y_valid)==len(labeled.x_valid)
xb,yb = train_ds[0:7]
xb,yb
yb.shape
xb[0].obs_nums.is_pinned()
train_ds._test_getitem(0)
Need to define a custom collate function, because default collate cannot handle list of patient objects in x, gives following error
TypeError: default_collate: batch must contain tensors, numpy arrays, numbers, dicts or lists; found <class '__main__.Patient'>
valid_ds[0:4]
x_tmps,y_tmps = valid_ds[0:4]
x_tmps
y_tmps
Old collate fns
1. removed cuda calls
def collate(b):
xs,ys = zip(*b)
return [x.to_gpu() for x in xs], torch.unsqueeze(torch.tensor(ys), 1).cuda()
2. removed unsqueeze
def collate(b):
xs,ys = zip(*b)
return xs, torch.unsqueeze(torch.tensor(ys), 1)
def collate_ehr(b):
'''Custom collate function for use in `DataLoader`'''
xs,ys = zip(*b)
return xs, torch.stack(ys)
bs = 2
def get_dls(train_ds, valid_ds, bs, collate_fn=collate_ehr, lazy=True) -> 'train_dl, valid_dl':
return(DataLoader(train_ds, batch_size=bs, shuffle=True, collate_fn=collate_fn, pin_memory=lazy),
DataLoader(valid_ds, batch_size=bs*2, collate_fn=collate_fn, pin_memory=lazy))
train_dl, valid_dl = get_dls(train_ds, valid_ds, bs)
Tests - iter()
, next()
- Next Batch
it = iter(valid_dl)
first_x, first_y = next(it)
second_x, second_y = next(it)
first_x, first_y
first_x[3].med_offsts.is_pinned(), first_y.is_pinned()
second_x, second_y
second_x[0].alg_nums
second_x[0].alg_nums.is_pinned()
Testing full GPU loading (non-Lazy)
train_ds,valid_ds = EHRDataset(*labeled.train, lazy_load_gpu=False), EHRDataset(*labeled.valid, lazy_load_gpu=False)
xb,yb = train_ds[0:5]
xb,yb
xb[0].demographics.is_pinned()
train_dl, valid_dl = get_dls(train_ds, valid_ds, bs, lazy=False)
x_tmp, y_tmp = next(iter(valid_dl))
x_tmp[0].demographics.is_pinned()
x_tmp[0]