Commit ea14c431 authored by Theodore Papamarkou's avatar Theodore Papamarkou
Browse files

Added pytorch version of bottleneck NNGP marginal log-lik

parent 4de24eda
Loading
Loading
Loading
Loading
+29 −0
Original line number Diff line number Diff line
from sklearn import datasets

import torch
from torch.utils.data import Dataset

class Boston(Dataset):
    def __init__(self, dtype=torch.float64, device='cpu', standardize=False):
        self.dtype = dtype
        self.device = device
        self.load_data(standardize=standardize)

    def __repr__(self):
        return f'Boston dataset'

    def __len__(self):
        return len(self.data)

    def load_data(self, standardize):
        self.data, self.labels = datasets.load_boston(return_X_y=True)
        self.data = torch.from_numpy(self.data).to(dtype=self.dtype, device=self.device)
        self.labels = torch.from_numpy(self.labels).to(dtype=self.dtype, device=self.device).view(len(self.labels), 1)

        if standardize:
            self.data = (self.data - torch.mean(self.data, dim=0, keepdim=True))/torch.std(self.data, dim=0, keepdim=True, unbiased=False)
            self.labels = (self.labels - torch.mean(self.labels, dim=0, keepdim=True))/torch.std(self.labels, dim=0, keepdim=True, unbiased=False)


    def __getitem__(self, idx):
        return self.data[idx], self.labels[idx]
+65 −0
Original line number Diff line number Diff line
# %%

import torch
from torch.utils.data import DataLoader

from eeyore.api import indexify

from bottleneck_nngp_pytorch import BottleneckNNGP, Hyperparameters
from boston_dataset import Boston

# %%

boston = indexify(Boston)(standardize=True)
dataloader = DataLoader(boston, batch_size=506)

# %%

hparams = Hyperparameters(depths=[3, 3], widths=[64], activation=True, noise=1e-10, num_mc=100)

# %%

v_b = 1.2 # 1.0
v_w = 1.5 # 1.0
v_n = 0.25 # 1e-4

# %%

model = BottleneckNNGP(hparams=hparams)

# %%
    
model.set_params(torch.tensor([v_b, v_w, v_n]))

# %%

x, y, z = next(iter(dataloader))

# %%

log_lik_val = model.log_lik(x, y)
print("Log-likelihood using pytorch:", log_lik_val.item())

# %%

import numpy as np
from sklearn import datasets

import bottleneck_nngp_numpy

# %%

X_np, Y_np = datasets.load_boston(return_X_y=True)
Y_np = Y_np.reshape((-1, 1))

X_np = (X_np - np.mean(X_np, axis=0, keepdims=True))/np.std(X_np, axis=0, keepdims=True)
Y_np = (Y_np - np.mean(Y_np, axis=0, keepdims=True))/np.std(Y_np, axis=0, keepdims=True)

# %%

model_np = bottleneck_nngp_numpy.bottleneck_nngp_prior(X_np, v_b=model.v_b.data.item(), v_w=model.v_w.data.item(), v_n=model.v_n.data.item(), depths=model.hp.depths, widths=model.hp.widths, bottleneck_activation=True)

# %%

log_lik_val_np = model_np.log_likelihood(Y_np, samples=model.samples.clone().detach().numpy(), n_samples=model.hp.num_mc)
print("Log-likelihood using numpy:", log_lik_val_np)
+48 −0
Original line number Diff line number Diff line
import numpy as np
from scipy import linalg as sp_linalg
from scipy import special as sp_special

class bottleneck_nngp_prior(object):
    def __init__(self, X, v_b=1.0, v_w=1.0, v_n=1e-4, depths=[0], widths=[], bottleneck_activation=False, bottleneck_noise=1e-10, samples=None):
        self.n_inputs = X.shape[0]
        self.X = X
        self.v_b = v_b
        self.v_w = v_w
        self.v_n = v_n
        self.depths = depths
        self.widths = widths
        self.bottleneck_activation = bottleneck_activation
        self.bottleneck_noise = bottleneck_noise
        self.samples = samples

    def K(self, Gram, depth, noise=0.0):
        Gram = np.atleast_2d(Gram)
        if Gram.ndim == 2:
            Gram = np.expand_dims(Gram, 0)
        K_XX = self.v_b + self.v_w*Gram
        for _ in range(depth):
            A = np.sqrt(np.einsum("ijj,ikk->ijk", K_XX, K_XX))
            R = np.maximum(np.minimum(K_XX/A, 1.0), -1.0)
            K_XX = self.v_b + self.v_w*A/np.pi*(np.sqrt(1-R**2) + (np.pi-np.arccos(R))*R)
        return K_XX + noise*np.expand_dims(np.eye(self.n_inputs), 0)

    def log_likelihood(self, Y, samples=None, n_samples=1):
        grams = np.expand_dims(self.X.dot(self.X.T), 0)
        for (depth, width) in zip(self.depths[:-1], self.widths):
            Ks = self.K(grams, depth, noise=self.bottleneck_noise)
            Ls = np.stack([np.linalg.cholesky(K) for K in Ks], 0)
            if samples is None:
                self.samples = np.random.normal(0, 1, size=(n_samples, self.n_inputs, width))
                self.samples = np.einsum("sik,skj->sij", Ls, self.samples)
                if self.bottleneck_activation:
                    self.samples = np.sqrt(2)*np.maximum(0, self.samples)
            else:
                self.samples = samples
            grams = np.einsum("sik,sjk->sij", samples, samples)/width
        Ks = self.K(grams, self.depths[-1], noise=self.v_n)
        Ls = np.stack([np.linalg.cholesky(K) for K in Ks], 0)
        LYs = np.stack([sp_linalg.solve_triangular(L, Y, lower=True) for L in Ls], 0)
        logdets = 2*np.sum(np.log(np.diagonal(Ls, axis1=1, axis2=2)), 1)
        logliks = -1/2*np.sum(np.sum(LYs**2, 1), 1) - Y.shape[1]/2*logdets - Y.shape[1]*self.n_inputs/2*np.log(2*np.pi)
        loglik = sp_special.logsumexp(logliks) - np.log(n_samples)
        return loglik
+91 −0
Original line number Diff line number Diff line
import math

import torch
from torch.distributions import Normal
import torch.nn as nn

from eeyore.api import BayesianModel

class Hyperparameters:
    def __init__(self, depths=[0], widths=[], activation=True, noise=1e-10, num_mc=1000):
        self.depths = depths
        self.widths = widths
        self.activation = activation
        self.noise = noise  # bottleneck noise
        self.num_mc = num_mc  # number of Monte Carlo samples

        if (len(self.depths) != len(self.widths) + 1):
            raise ValueError

class BottleneckNNGP(BayesianModel):
    def __init__(self, constraint=None, bounds=[None, None], temperature=None, prior=None, hparams=Hyperparameters(),
    samples=None, savefile=None, dtype=torch.float64, device='cpu'):
        super().__init__(
            loss=None, constraint=constraint, bounds=bounds, temperature=temperature, dtype=dtype, device=device
        )
        self.hp = hparams

        self.v_b = nn.Parameter(data=torch.empty([1], dtype=self.dtype, device=self.device), requires_grad=True)
        self.v_w = nn.Parameter(data=torch.empty([1], dtype=self.dtype, device=self.device), requires_grad=True)
        self.v_n = nn.Parameter(data=torch.empty([1], dtype=self.dtype, device=self.device), requires_grad=True)

        self.prior = prior or self.default_prior()

        self.samples = samples

    def default_prior(self):
        return Normal(
            torch.zeros(self.num_params(), dtype=self.dtype, device=self.device),
            torch.ones(self.num_params(), dtype=self.dtype, device=self.device)
        )

    def K(self, gram, depth, n, noise):
        K_XX = self.v_b + self.v_w * gram
        for _ in range(depth):
            A = torch.sqrt(torch.einsum("ijj, ikk->ijk", K_XX, K_XX))
            R = torch.max(torch.min(K_XX / A, torch.ones_like(A)), -torch.ones_like(A))
            K_XX = self.v_b + self.v_w * A / math.pi * (torch.sqrt(1 - R ** 2) + (math.pi - torch.acos(R)) * R)
        return K_XX + noise * torch.eye(n, dtype=self.dtype, device=self.device).unsqueeze(0)

    def log_lik(self, x, y, samples=None):
        """ Log-likelihood """
        gram = (x @ x.t()).unsqueeze(0)

        for (depth, width) in zip(self.hp.depths[:-1], self.hp.widths):
            Ks = self.K(gram, depth, x.shape[0], self.hp.noise)
            Ls = torch.stack([torch.cholesky(K) for K in Ks], 0)

            if samples is None:
                self.samples = torch.randn(self.hp.num_mc, x.shape[0], width, dtype=self.dtype, device=self.device)
                self.samples = torch.cat(
                    [
                        torch.einsum("sik, skj->sij", Ls, self.samples[i, :, :].unsqueeze(0))
                        for i in range(self.hp.num_mc)
                    ]
                )
                if self.hp.activation:
                    self.samples = torch.tensor([2.], dtype=self.dtype, device=self.device).sqrt() \
                        * torch.max(self.samples, torch.zeros_like(self.samples))
            else:
                self.samples = samples

            grams = torch.einsum("sik, sjk->sij", self.samples, self.samples) / width

        Ks = self.K(grams, self.hp.depths[-1], x.shape[0], noise=self.v_n)
        Ls = torch.stack([torch.cholesky(K) for K in Ks], 0)
        LYs = torch.stack([torch.triangular_solve(y, L, upper=False).solution for L in Ls], 0)
        logdets = 2 * torch.sum(torch.log(torch.diagonal(Ls, dim1=1, dim2=2)), 1)
        log_lik_val = torch.logsumexp(
            -0.5 * (
                torch.sum(torch.sum(LYs ** 2, 1), 1)
                + y.shape[1] * (
                    logdets + x.shape[0] * torch.tensor([2 * math.pi], dtype=self.dtype, device=self.device).log()
                )
            ),
            0
        ) - torch.tensor([self.hp.num_mc], dtype=self.dtype, device=self.device).log()

        if self.temperature is not None:
            log_lik_val = self.temperature * log_lik_val

        return log_lik_val