Core pytorch functions

Device Settings

These functions control what device is used for tensors.

to_device will set a tensor or Pytorch model to the passed device. If no device is passed, the default device is used.

If Cuda is available, the device defined by torch.cuda.current_device(). The current device can be set using Pytorch:

torch.cuda.set_device(1)

To disable GPU usage, set the use_cuda environment variable to cpu. os.environ['use_cuda'] = 'cpu'

get_device[source]

get_device()

get_device - returns current default device.

If Cuda is available and os.environ['use_cuda']=='cuda', torch.cuda.current_device() is returned.

Otherwise, cpu is returned

Returns torch.device

to_device[source]

to_device(tensor, device=None)

to_device - sets tensor to device if possible. If device=None, tensor is set to the default device returned by get_device

Inputs

  • tensor torch.Tensor: input tensor

  • device [str, torch.Device]: device

set_device[source]

set_device(device)

wrapper for torch set_device

get_model_device[source]

get_model_device(model)

gets device for model from first parameter

freeze[source]

freeze(module)

freeze - freezes all parameters in module (requires_grad=False)

Inputs:

  • module nn.Module: Pytorch module

unfreeze[source]

unfreeze(module)

unfreeze - unfreezes all parameters in module (requires_grad=True)

Inputs:

  • module nn.Module: Pytorch module

x_to_preds[source]

x_to_preds(x, multinomial=True)

x_to_preds - helper function for converting x to log probs and taking a hard sample

Inputs:

  • x torch.Tensor: input tensor

  • multinomial bool: if True, use multinomial sampling. If False, use argmax sampling

Returns:

  • idxs torch.LongTensor: index values of hard sample

  • lps torch.FloatTensor: log probabilities for each hard value

gather_lps[source]

gather_lps(lps, y)

subset_tensor[source]

subset_tensor(x, mask)

indexes x with mask. If x is a list or tuple, function will index all items in x

merge_weights[source]

merge_weights(sd1, sd2, alpha=0.5)

merges state dicts following new_weight = alpha*weight_model1 + (1-alpha)*weight_model2

New weights are returned as a new state dict

merge_models[source]

merge_models(model1, model2, alpha=0.5)

merges weights following new_weight = alpha*weight_model1 + (1-alpha)*weight_model2

New weights are loaded into modell inplace

smooth_batches[source]

smooth_batches(batches, beta=0.98)

discount_rewards[source]

discount_rewards(rewards, gamma)

discount_rewards - discounts rewards by gamma

Inputs:

  • rewards torch.Tensor[bs,sl]: tensor of undiscounted rewards

  • gamma float: discount factor

Returns:

  • discounted torch.Tensor[bs, sl]: tensor of discounted rewards

Rewards are discounted following

discounted[i] = rewards[i] + gamma*discounted[i+1]

rewards = torch.tensor([[0., 0., 0., 4.],
                       [0., 0., 0., 3.],
                       [0., 0., 0., 2.],])
gamma = 0.97
discounted = discount_rewards(rewards, gamma)
discounted
tensor([[3.6507, 3.7636, 3.8800, 4.0000],
        [2.7380, 2.8227, 2.9100, 3.0000],
        [1.8253, 1.8818, 1.9400, 2.0000]])

whiten[source]

whiten(values, shift_mean=True, mask=None)

whiten - whitens values

Inputs:

  • values torch.FloatTensor: values to be whitened

  • shift_mean bool: if True, outputs will have zero mean.

  • mask [torch.BoolTensor, torch.LongTensor, None]: if a mask is given, masked values will not contribute to calculating the mean and variance for whitening. Masking is done following masked_values = values*mask. For bool tensors, values where mask=True are kept. For binary float/int tensors, values where mask=1 are kept.

Returns:

  • whitened torch.FloatTensor: whitened values

scatter_rewards[source]

scatter_rewards(rewards, mask)

scatter_rewards - scatter vector of rewards to matrix based on mask

Inputs:

  • rewards torch.FloatTensor[bs]: vector of rewards

  • mask torch.Tensor[bs, sl]: mask tensor

Returns:

  • template torch.FloatTensor: scattered values

In molecular RL, we typically have a single reward per molecule evaluating the entire structure. Before our update, we need to discount the final reward back to all timesteps. However, we want to ignore padding when we do this.

scatter_rewards takes a vector of rewards and a mask where non-padding tokens are True and padding tokens are False. Rewards are placed in the last True index

rewards = torch.tensor([4., 5., 6.]).float()
mask = torch.tensor([[True, True, True, False],
                     [True, True, False, False],
                     [True, True, True, True]])
scattered = scatter_rewards(rewards, mask)
>> torch.tensor([[0., 0., 4., 0.],
                 [0., 5., 0., 0.],
                 [0., 0., 0., 6.]])
rewards = torch.tensor([4., 5., 6.]).float()
mask = torch.tensor([[True, True, True, False],
                     [True, True, False, False],
                     [True, True, True, True]])
scattered = scatter_rewards(rewards, mask)
scattered
tensor([[0., 0., 4., 0.],
        [0., 5., 0., 0.],
        [0., 0., 0., 6.]])

compute_advantages[source]

compute_advantages(rewards, values, gamma, lam)

Calculate advantages according to Generalized Advantage Estimation (GAE)

Inputs:

  • rewards torch.Tensor: reward tensor

  • values torch.Tensor: value function predictions

  • 'gamma float`: GAE gamma factor

  • lam float: GAE lambda factor

Returns:

  • advantages torch.Tensor: computed advantages

Advantages are computed according to GAE

delta = rewards[i] + gamma*values[i+1] - values[i]

advantages[i] = delta + gamma*lam*glv

Loss Functions

Pytorch losses

class CrossEntropy[source]

CrossEntropy()

CrossEntropy - cross entropy loss for sequence predictions. Flattens predictions and targets before computing loss

class BinaryCrossEntropy[source]

BinaryCrossEntropy()

class RegressionLoss[source]

RegressionLoss(loss)

class HuberLoss[source]

HuberLoss(beta=1.0) :: RegressionLoss

class MSELoss[source]

MSELoss() :: RegressionLoss

class MAELoss[source]

MAELoss() :: RegressionLoss

pca[source]

pca(x, k=2)

pca of x to k dimensions