Classes and functions for managing data
 

Split

  • Splitting is already done in the raw data before vocab creation.
  • The following class is to load and manage the pre-processed splits together.

class EHRDataSplits[source]

EHRDataSplits(path, age_start=0, age_stop=20, age_in_months=False)

Class to hold the PatientList splits; defaults to loading 0 to 20 years age span

EHRDataSplits._load_splits[source]

EHRDataSplits._load_splits(path, age_start, age_stop, age_in_months)

Load splits of preprocessed PatientLists from persistent store using path

EHRDataSplits.get_splits[source]

EHRDataSplits.get_splits()

Return splits

EHRDataSplits.get_lengths[source]

EHRDataSplits.get_lengths()

Return a dataframe with lengths (# of patients) of the splits (train, valid, test) and total

EHRDataSplits.get_label_counts[source]

EHRDataSplits.get_label_counts(labels)

Get prevalence counts of labels in each split - returns a dataframe with counts for each split and total count

EHRDataSplits.get_pos_wts[source]

EHRDataSplits.get_pos_wts(labels)

Get positive weights to be used in nn.BCEWithLogitsLoss

Tests

PATH_1K, CONDITIONS
('/home/vinod/.lemonpie/datasets/synthea/1K',
 {'diabetes': '44054006',
  'stroke': '230690007',
  'alzheimers': '26929004',
  'coronary_heart': '53741008',
  'lung_cancer': '254637007',
  'breast_cancer': '254837009',
  'rheumatoid_arthritis': '69896004',
  'epilepsy': '84757009'})
labels = list(CONDITIONS.keys())
labels
['diabetes',
 'stroke',
 'alzheimers',
 'coronary_heart',
 'lung_cancer',
 'breast_cancer',
 'rheumatoid_arthritis',
 'epilepsy']
splits = EHRDataSplits(PATH_1K)
splits.get_lengths()
lengths
train 702
valid 234
test 235
total 1171
prevalence = splits.get_label_counts(labels)
prevalence
train valid test total
diabetes 43 14 19 76
stroke 30 7 11 48
alzheimers 12 7 6 25
coronary_heart 39 11 11 61
lung_cancer 12 0 2 14
breast_cancer 11 8 2 21
rheumatoid_arthritis 2 0 0 2
epilepsy 15 5 2 22
splits.get_pos_wts(labels)
train valid test total
diabetes 15.0 16.0 11.0 14.0
stroke 22.0 32.0 20.0 23.0
alzheimers 58.0 32.0 38.0 46.0
coronary_heart 17.0 20.0 20.0 18.0
lung_cancer 58.0 inf 116.0 83.0
breast_cancer 63.0 28.0 116.0 55.0
rheumatoid_arthritis 350.0 inf inf 584.0
epilepsy 46.0 46.0 116.0 52.0

Cross check with raw

  • Check total counts against raw_csv
  • Check split counts against split/raw_csv
raw_cnds = pd.read_csv(f'{PATH_1K}/raw_original/conditions.csv', low_memory=False)
cnd_codes = list(CONDITIONS.values())
cnd_codes
['44054006',
 '230690007',
 '26929004',
 '53741008',
 '254637007',
 '254837009',
 '69896004',
 '84757009']
int(CONDITIONS['diabetes'])
44054006
for label in labels:
    print(label,':: ', raw_cnds[raw_cnds.CODE == int(CONDITIONS[label])].CODE.count())
diabetes ::  76
stroke ::  48
alzheimers ::  25
coronary_heart ::  61
lung_cancer ::  14
breast_cancer ::  21
rheumatoid_arthritis ::  2
epilepsy ::  22
raw_cnds_train = pd.read_csv(f'{PATH_1K}/raw_split/train/conditions.csv', low_memory=False)
raw_cnds_valid = pd.read_csv(f'{PATH_1K}/raw_split/valid/conditions.csv', low_memory=False)
raw_cnds_test  = pd.read_csv(f'{PATH_1K}/raw_split/test/conditions.csv', low_memory=False)
for label in labels:
    assert prevalence.loc[label].total == raw_cnds[raw_cnds.CODE == int(CONDITIONS[label])].CODE.count()
    assert prevalence.loc[label].train == raw_cnds_train[raw_cnds_train.CODE == int(CONDITIONS[label])].CODE.count()
    assert prevalence.loc[label].valid == raw_cnds_valid[raw_cnds_valid.CODE == int(CONDITIONS[label])].CODE.count()
    assert prevalence.loc[label].test  == raw_cnds_test [raw_cnds_test.CODE == int(CONDITIONS[label])]. CODE.count()

Label

Labeling definition in fastai -- some processes need to be run on train and applied to valid

This is completed in preprocessing (vocab & transform) as follows

  1. Vocabs created from train data
    • Tokenizing unique values for different record codes & demographic values
    • Calculating mean and std for age
  2. Vocabs applied to train, valid and test data
    • With numericalize for record codes & demographic values
    • With normalizing of age with the mean / std from train

Hence labeling in our case will be creating X and y

  • X is the patient object
  • y (for a single patient) needs to be a tensor made out of the patient's values for labels ('diabetes', 'stroke', 'alzheimers', 'coronary_heart', 'lung_cancer')

So creating the y tensor is simply a matter of ..

  1. extracting the values of each of the labels from each Patient object
  2. turning it into a torch.FloatTensor
  3. and stacking them up using torch.stack
tst_y = np.array((True, False, False, True), dtype='float')
torch.from_numpy(tst_y), torch.FloatTensor(tst_y)
(tensor([1., 0., 0., 1.], dtype=torch.float64), tensor([1., 0., 0., 1.]))

2 ways of creating torch tensor from a numpy array, we will stick with the latter

y = []
for pt in splits.train:
    y.append(torch.FloatTensor(np.array([pt.conditions[label] for label in labels], dtype='float')) )
 
y = torch.stack(y)
y.shape
torch.Size([702, 8])
y
tensor([[0., 0., 0.,  ..., 0., 0., 0.],
        [0., 0., 0.,  ..., 0., 0., 0.],
        [0., 0., 0.,  ..., 0., 0., 0.],
        ...,
        [0., 0., 0.,  ..., 0., 0., 0.],
        [0., 0., 0.,  ..., 0., 0., 0.],
        [0., 0., 0.,  ..., 0., 0., 0.]])

Putting it into a function

def label_data(patient_ds, labels) -> 'x,y':
    '''Extracts y from patient object, returns x=Patient object, y=tensor of conditions'''
    def _get_y(ds, labels):
        y = []
        for pt in ds:
            y.append( torch.FloatTensor(np.array([pt.conditions[label] for label in labels], dtype='float')) )
        return torch.stack(y)
    
    x, y = patient_ds, _get_y(patient_ds, labels)
    return x,y
x_train,y_train = label_data(splits.train, labels)
x_valid,y_valid = label_data(splits.valid, labels)
x_test ,y_test  = label_data(splits.test , labels)
y_train.shape, y_valid.shape, y_test.shape
(torch.Size([702, 8]), torch.Size([234, 8]), torch.Size([235, 8]))

class LabelEHRData[source]

LabelEHRData(train, valid, test, labels)

Class to hold labeled EHR data splits

LabelEHRData.__init__[source]

LabelEHRData.__init__(train, valid, test, labels)

Extracts y from patient object, each labelset a tuple of x,y: x=Patient object, y=tensor of conditions

LabelEHRData._get_y[source]

LabelEHRData._get_y(ds, labels)

Extract y from each patient object in ds and stack them - ds is dataset containing patient objects

labeled = LabelEHRData(*splits.get_splits(), labels)

Dataset

Subclasses torch.utils.data.Dataset

  • that is implements __len__() and __getitem__()

class EHRDataset[source]

EHRDataset(*args, **kwds) :: Dataset

Class to hold a single EHR dataset (holds a tuple of x and y) -- handles lazy vs full loading of dataset on GPU

EHRDataset.__init__[source]

EHRDataset.__init__(x_labeled, y_labeled, lazy_load_gpu=True)

If lazy_load_gpu is False, load entire dataset on GPU

EHRDataset.__getitem__[source]

EHRDataset.__getitem__(i)

If lazy loading, return deep copy of patient object i, else entire dataset already on GPU - just return i

Since Patient is a custom object and not a typical tensor, we need to handle the behavior for Dataset, DataLoader, etc to function correctly.

  • Memory pinning is a good idea for better performance if lazy loading to GPU
  • So when a DataLoader pins memory on a tensor and copy of the tensor is made on page-locked memory in RAM as opposed to swappable memory which speed up transfers to GPU
  • But on custom data type like our Patient object, we need to define the behavior
  • Making a deep copy of the Patientobject to mimick tensor behavior
    • Otherwise, given the Patient holds it's changed tensors, all tensors are CUDA tensors after the first epoch and DL tries to pin memory again and this causes an error (TODO: Need to elaborate)
def get_ds(x_train,y_train, x_valid,y_valid) -> 'train_ds, valid_ds':
    train_ds,valid_ds = EHRDataset(x_train, y_train), EHRDataset(x_valid, y_valid)
    return train_ds, valid_ds

Testing Lazy Load

train_ds, valid_ds = get_ds(*labeled.train, *labeled.valid)
len(train_ds), len(valid_ds)
(702, 234)
len(labeled.train), len(labeled.x_train)
(2, 702)
assert len(train_ds)==len(labeled.x_train)==len(labeled.y_train)
assert len(valid_ds)==len(labeled.y_valid)==len(labeled.x_valid)
xb,yb = train_ds[0:7]
xb,yb
([ptid:0ace3e15-8aa4-41c5-8b90-2408285ebcfe, birthdate:1986-04-02, [('diabetes', False), ('stroke', False)].., device:cpu,
  ptid:af1495be-5077-4087-98b1-9ff624c7582c, birthdate:2008-07-17, [('diabetes', False), ('stroke', False)].., device:cpu,
  ptid:f23e12d9-2ec6-4006-b041-ea78d374e9c9, birthdate:2014-09-06, [('diabetes', False), ('stroke', False)].., device:cpu,
  ptid:1968aa31-5fce-461a-9486-6e385a7b75e7, birthdate:1986-04-11, [('diabetes', False), ('stroke', False)].., device:cpu,
  ptid:1211c8ff-ab73-49f3-b2ab-87b7a03f6167, birthdate:1972-03-24, [('diabetes', False), ('stroke', False)].., device:cpu,
  ptid:27a8b7b6-007d-4036-82a7-80a9ab670dcb, birthdate:2005-04-13, [('diabetes', False), ('stroke', False)].., device:cpu,
  ptid:532696f2-0b76-4eb0-9aea-a74e2fb1bed2, birthdate:1967-05-18, [('diabetes', False), ('stroke', False)].., device:cpu],
 tensor([[0., 0., 0., 0., 0., 0., 0., 0.],
         [0., 0., 0., 0., 0., 0., 0., 0.],
         [0., 0., 0., 0., 0., 0., 0., 0.],
         [0., 0., 0., 0., 0., 0., 0., 0.],
         [0., 0., 0., 0., 0., 0., 0., 0.],
         [0., 0., 0., 0., 0., 0., 0., 0.],
         [0., 0., 0., 0., 0., 0., 0., 0.]]))
yb.shape
torch.Size([7, 8])
xb[0].obs_nums.is_pinned()
False
train_ds._test_getitem(0)
(ptid:0ace3e15-8aa4-41c5-8b90-2408285ebcfe, birthdate:1986-04-02, [('diabetes', False), ('stroke', False)].., device:cpu,
 tensor([0., 0., 0., 0., 0., 0., 0., 0.]))

DataLoader - Using Pytorch DataLoader

Need to define a custom collate function, because default collate cannot handle list of patient objects in x, gives following error

TypeError: default_collate: batch must contain tensors, numpy arrays, numbers, dicts or lists; found <class '__main__.Patient'>
valid_ds[0:4]
([ptid:8d1ba4bb-7250-4295-be1c-5d0d423e55f7, birthdate:1957-02-13, [('diabetes', True), ('stroke', False)].., device:cpu,
  ptid:f1921fc3-fdfc-441d-a928-27c18002fedf, birthdate:1909-12-22, [('diabetes', False), ('stroke', False)].., device:cpu,
  ptid:fc4aa89c-e441-4c0b-841f-3d16ffe1b235, birthdate:1981-04-24, [('diabetes', False), ('stroke', False)].., device:cpu,
  ptid:4e0be087-7a33-4655-a9c0-f00f23178ac1, birthdate:1977-02-03, [('diabetes', False), ('stroke', False)].., device:cpu],
 tensor([[1., 0., 0., 0., 0., 0., 0., 0.],
         [0., 0., 0., 0., 0., 0., 0., 0.],
         [0., 0., 0., 0., 0., 0., 0., 0.],
         [0., 0., 0., 0., 0., 0., 0., 0.]]))
x_tmps,y_tmps = valid_ds[0:4]
x_tmps
[ptid:8d1ba4bb-7250-4295-be1c-5d0d423e55f7, birthdate:1957-02-13, [('diabetes', True), ('stroke', False)].., device:cpu,
 ptid:f1921fc3-fdfc-441d-a928-27c18002fedf, birthdate:1909-12-22, [('diabetes', False), ('stroke', False)].., device:cpu,
 ptid:fc4aa89c-e441-4c0b-841f-3d16ffe1b235, birthdate:1981-04-24, [('diabetes', False), ('stroke', False)].., device:cpu,
 ptid:4e0be087-7a33-4655-a9c0-f00f23178ac1, birthdate:1977-02-03, [('diabetes', False), ('stroke', False)].., device:cpu]
y_tmps
tensor([[1., 0., 0., 0., 0., 0., 0., 0.],
        [0., 0., 0., 0., 0., 0., 0., 0.],
        [0., 0., 0., 0., 0., 0., 0., 0.],
        [0., 0., 0., 0., 0., 0., 0., 0.]])

Old collate fns

1. removed cuda calls

def collate(b):
    xs,ys = zip(*b)
    return [x.to_gpu() for x in xs], torch.unsqueeze(torch.tensor(ys), 1).cuda()

2. removed unsqueeze

def collate(b):
    xs,ys = zip(*b)
    return xs, torch.unsqueeze(torch.tensor(ys), 1)
def collate_ehr(b):
    '''Custom collate function for use in `DataLoader`'''
    xs,ys = zip(*b)
    return xs, torch.stack(ys)
bs = 2
def get_dls(train_ds, valid_ds, bs, collate_fn=collate_ehr, lazy=True) -> 'train_dl, valid_dl':
    return(DataLoader(train_ds, batch_size=bs, shuffle=True, collate_fn=collate_fn, pin_memory=lazy),
           DataLoader(valid_ds, batch_size=bs*2, collate_fn=collate_fn, pin_memory=lazy))
train_dl, valid_dl = get_dls(train_ds, valid_ds, bs)

Tests - iter(), next() - Next Batch

it = iter(valid_dl)
first_x, first_y = next(it)
second_x, second_y = next(it)
first_x, first_y
([ptid:8d1ba4bb-7250-4295-be1c-5d0d423e55f7, birthdate:1957-02-13, [('diabetes', True), ('stroke', False)].., device:cpu,
  ptid:f1921fc3-fdfc-441d-a928-27c18002fedf, birthdate:1909-12-22, [('diabetes', False), ('stroke', False)].., device:cpu,
  ptid:fc4aa89c-e441-4c0b-841f-3d16ffe1b235, birthdate:1981-04-24, [('diabetes', False), ('stroke', False)].., device:cpu,
  ptid:4e0be087-7a33-4655-a9c0-f00f23178ac1, birthdate:1977-02-03, [('diabetes', False), ('stroke', False)].., device:cpu],
 tensor([[1., 0., 0., 0., 0., 0., 0., 0.],
         [0., 0., 0., 0., 0., 0., 0., 0.],
         [0., 0., 0., 0., 0., 0., 0., 0.],
         [0., 0., 0., 0., 0., 0., 0., 0.]]))
first_x[3].med_offsts.is_pinned(), first_y.is_pinned()
(True, True)
second_x, second_y
([ptid:6d048a56-edb8-4f29-891d-7a84d75a8e78, birthdate:1914-09-05, [('diabetes', False), ('stroke', False)].., device:cpu,
  ptid:4fc76a3b-e39e-4091-a6af-3595e0cb607e, birthdate:1948-06-01, [('diabetes', False), ('stroke', False)].., device:cpu,
  ptid:26ca976d-0b5b-4662-af41-535ff670dd5a, birthdate:2014-09-22, [('diabetes', False), ('stroke', False)].., device:cpu,
  ptid:59486a8b-389b-4355-9df4-edc62bbd1a11, birthdate:1951-10-11, [('diabetes', False), ('stroke', False)].., device:cpu],
 tensor([[0., 0., 1., 0., 0., 1., 0., 0.],
         [0., 0., 0., 0., 0., 0., 0., 0.],
         [0., 0., 0., 0., 0., 0., 0., 0.],
         [0., 0., 0., 0., 0., 0., 0., 0.]]))
second_x[0].alg_nums
tensor([0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0])
second_x[0].alg_nums.is_pinned()
True

Testing full GPU loading (non-Lazy)

train_ds,valid_ds = EHRDataset(*labeled.train, lazy_load_gpu=False), EHRDataset(*labeled.valid, lazy_load_gpu=False)
xb,yb = train_ds[0:5]
xb,yb
([ptid:0ace3e15-8aa4-41c5-8b90-2408285ebcfe, birthdate:1986-04-02, [('diabetes', False), ('stroke', False)].., device:cuda:0,
  ptid:af1495be-5077-4087-98b1-9ff624c7582c, birthdate:2008-07-17, [('diabetes', False), ('stroke', False)].., device:cuda:0,
  ptid:f23e12d9-2ec6-4006-b041-ea78d374e9c9, birthdate:2014-09-06, [('diabetes', False), ('stroke', False)].., device:cuda:0,
  ptid:1968aa31-5fce-461a-9486-6e385a7b75e7, birthdate:1986-04-11, [('diabetes', False), ('stroke', False)].., device:cuda:0,
  ptid:1211c8ff-ab73-49f3-b2ab-87b7a03f6167, birthdate:1972-03-24, [('diabetes', False), ('stroke', False)].., device:cuda:0],
 tensor([[0., 0., 0., 0., 0., 0., 0., 0.],
         [0., 0., 0., 0., 0., 0., 0., 0.],
         [0., 0., 0., 0., 0., 0., 0., 0.],
         [0., 0., 0., 0., 0., 0., 0., 0.],
         [0., 0., 0., 0., 0., 0., 0., 0.]], device='cuda:0'))
xb[0].demographics.is_pinned()
False
train_dl, valid_dl = get_dls(train_ds, valid_ds, bs, lazy=False)
x_tmp, y_tmp = next(iter(valid_dl))
x_tmp[0].demographics.is_pinned()
False
x_tmp[0]
ptid:8d1ba4bb-7250-4295-be1c-5d0d423e55f7, birthdate:1957-02-13, [('diabetes', True), ('stroke', False)].., device:cuda:0

class EHRData[source]

EHRData(path, labels, age_start=0, age_stop=20, age_in_months=False, lazy_load_gpu=True)

All encompassing class for EHR data - holds Splits, Labels, Datasets, DataLoaders and provides convenience fns for training and prediction

EHRData.load_splits[source]

EHRData.load_splits()

Load data splits given dataset path

EHRData.label[source]

EHRData.label()

Run labeler - i.e. extract y from patient objects

EHRData.create_datasets[source]

EHRData.create_datasets()

Create EHRDatasets

EHRData.ehr_collate[source]

EHRData.ehr_collate(b)

Custom collate function for use in DataLoader

EHRData.create_dls[source]

EHRData.create_dls(bs, lazy, c_fn=ehr_collate, **kwargs)

Create DataLoaders

EHRData.get_data[source]

EHRData.get_data(bs=64, num_workers=0)

Convenience function - returns everything needed for training

EHRData.get_test_data[source]

EHRData.get_test_data(bs=64, num_workers=0)

Convenience function - returns everything needed for prediction using test data