Commit 6129e739 authored by Ortner J L's avatar Ortner J L
Browse files

ssl model

parent ef66976e
import os
from termcolor import colored
import numpy as np
import math
from argparse import ArgumentParser
from termcolor import colored
from enum import Enum
import torch
from torch.nn import functional as F
from torch import nn
from torchvision.datasets import ImageFolder
from torch.optim import SGD
import pytorch_lightning as pl
from pl_bolts.models.self_supervised.ssl_finetuner import SSLFineTuner
from pl_bolts.models.self_supervised.evaluator import SSLEvaluator
from pytorch_lightning.metrics import Accuracy
#Internal Imports
from dali_utils.dali_transforms import SimCLRTransform #same transform as SimCLR, but only 1 copy
from dali_utils.lightning_compat import ClassifierWrapper
class CLASSIFIER(pl.LightningModule): #SSLFineTuner
def __init__(self, encoder, DATA_PATH, VAL_PATH, hidden_dim, image_size, seed, cpus, transform = SimCLRTransform, **classifier_hparams):
super().__init__()
self.DATA_PATH = DATA_PATH
self.VAL_PATH = VAL_PATH
self.transform = transform
self.image_size = image_size
self.cpus = cpus
self.seed = seed
self.batch_size = classifier_hparams['batch_size']
self.classifier_hparams = classifier_hparams
self.linear_layer = SSLEvaluator(
n_input=encoder.embedding_size,
n_classes=self.classifier_hparams['num_classes'],
p=self.classifier_hparams['dropout'],
n_hidden=hidden_dim
)
self.train_acc = Accuracy()
self.val_acc = Accuracy(compute_on_step=False)
self.encoder = encoder
self.weights = None
print(classifier_hparams)
if classifier_hparams['weights'] is not None:
self.weights = torch.tensor([float(item) for item in classifier_hparams['weights'].split(',')])
self.weights = self.weights.cuda()
self.save_hyperparameters()
#override optimizer to allow modification of encoder learning rate
def configure_optimizers(self):
optimizer = SGD([
{'params': self.encoder.parameters()},
{'params': self.linear_layer.parameters(), 'lr': self.classifier_hparams['linear_lr']}
], lr=self.classifier_hparams['learning_rate'], momentum=self.classifier_hparams['momentum'])
if self.classifier_hparams['scheduler_type'] == "step":
scheduler = torch.optim.lr_scheduler.MultiStepLR(optimizer, self.classifier_hparams['decay_epochs'], gamma=self.classifier_hparams['gamma'])
elif self.classifier_hparams['scheduler_type'] == "cosine":
scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(
optimizer,
self.classifier_hparams['epochs'],
eta_min=self.classifier_hparams['final_lr'] # total epochs to run
)
return [optimizer], [scheduler]
def forward(self, x):
feats = self.encoder(x)[-1]
feats = feats.view(feats.size(0), -1)
logits = self.linear_layer(feats)
return logits
def shared_step(self, batch):
x, y = batch
logits = self.forward(x)
loss = self.loss_fn(logits, y)
return loss, logits, y
def training_step(self, batch, batch_idx):
loss, logits, y = self.shared_step(batch)
acc = self.train_acc(logits, y)
self.log('tloss', loss, prog_bar=True)
self.log('tastep', acc, prog_bar=True)
self.log('ta_epoch', self.train_acc)
return loss
def validation_step(self, batch, batch_idx):
with torch.no_grad():
loss, logits, y = self.shared_step(batch)
acc = self.val_acc(logits, y)
acc = self.val_acc(logits, y)
self.log('val_loss', loss, prog_bar=True, sync_dist=True)
self.log('val_acc_epoch', self.val_acc, prog_bar=True)
self.log('val_acc_epoch', self.val_acc, prog_bar=True)
return loss
def loss_fn(self, logits, labels):
return F.cross_entropy(logits, labels, weight = self.weights)
def setup(self, stage = 'inference'):
Options = Enum('Loader', 'fit test inference')
if stage == Options.fit.name:
train = self.transform(self.DATA_PATH, batch_size = self.batch_size, input_height = self.image_size, copies = 1, stage = 'train', num_threads = self.cpus, device_id = self.local_rank, seed = self.seed)
val = self.transform(self.VAL_PATH, batch_size = self.batch_size, input_height = self.image_size, copies = 1, stage = 'validation', num_threads = self.cpus, device_id = self.local_rank, seed = self.seed)
self.train_loader = ClassifierWrapper(transform = train)
self.val_loader = ClassifierWrapper(transform = val)
elif stage == Options.inference.name:
self.test_dataloader = ClassifierWrapper(transform = self.transform(self.DATA_PATH, batch_size = self.batch_size, input_height = self.image_size, copies = 1, stage = 'inference', num_threads = 2*self.cpus, device_id = self.local_rank, seed = self.seed))
self.inference_dataloader = self.test_dataloader
def train_dataloader(self):
return self.train_loader
def val_dataloader(self):
return self.val_loader
#give user permission to add extra arguments for SIMSIAM model particularly. This cannot share the name of any parameters from train.py
def add_model_specific_args(parent_parser):
parser = ArgumentParser(parents=[parent_parser], add_help=False)
# training params
parser.add_argument("--linear_lr", default=1e-1, type=float, help="learning rate for classification head.")
parser.add_argument("--dropout", default=0.1, type=float, help="dropout of neurons during training [0-1].")
parser.add_argument("--nesterov", default=False, type=bool, help="Use nesterov during training.")
parser.add_argument("--scheduler_type", default='cosine', type=str, help="learning rate scheduler: ['cosine' or 'step']")
parser.add_argument("--gamma", default=0.1, type=float, help="gamma param for learning rate.")
parser.add_argument("--decay_epochs", default=[60, 80], type=list, help="epochs to do optimizer decay")
parser.add_argument("--weight_decay", default=1e-6, type=float, help="weight decay")
parser.add_argument("--final_lr", type=float, default=1e-6, help="final learning rate")
parser.add_argument("--momentum", type=float, default=0.9, help="momentum for learning rate")
parser.add_argument('--weights', type=str, help='delimited list of weights for penalty during classification')
return parser
import os
from termcolor import colored
import numpy as np
import math
from argparse import ArgumentParser
from termcolor import colored
from enum import Enum
import torch
from torch.nn import functional as F
from torch import nn
from torchvision.datasets import ImageFolder
import pytorch_lightning as pl
from pl_bolts.models.self_supervised import SimCLR
from pl_bolts.models.self_supervised.simclr.simclr_module import Projection
#Internal Imports
from dali_utils.dali_transforms import SimCLRTransform
from dali_utils.lightning_compat import SimCLRWrapper
class SIMCLR(SimCLR):
def __init__(self, encoder, DATA_PATH, VAL_PATH, hidden_dim, image_size, seed, cpus, transform = SimCLRTransform, **simclr_hparams):
data_temp = ImageFolder(DATA_PATH)
#derived values (not passed in) need to be added to model hparams
simclr_hparams['num_samples'] = len(data_temp)
simclr_hparams['dataset'] = None
simclr_hparams['max_epochs'] = simclr_hparams['epochs']
self.DATA_PATH = DATA_PATH
self.VAL_PATH = VAL_PATH
self.hidden_dim = hidden_dim
self.transform = transform
self.image_size = image_size
self.cpus = cpus
self.seed = seed
super().__init__(**simclr_hparams)
self.encoder = encoder
self.projection = Projection(input_dim = self.encoder.embedding_size, hidden_dim = self.hidden_dim)
self.save_hyperparameters()
#override pytorch SIMCLR with our own encoder so we will overwrite the function plbolts calls to init the encoder
def init_model(self):
return None
def setup(self, stage = 'inference'):
Options = Enum('Loader', 'fit test inference')
if stage == Options.fit.name:
train = self.transform(self.DATA_PATH, batch_size = self.batch_size, input_height = self.image_size, copies = 3, stage = 'train', num_threads = self.cpus, device_id = self.local_rank, seed = self.seed)
val = self.transform(self.VAL_PATH, batch_size = self.batch_size, input_height = self.image_size, copies = 3, stage = 'validation', num_threads = self.cpus, device_id = self.local_rank, seed = self.seed)
self.train_loader = SimCLRWrapper(transform = train)
self.val_loader = SimCLRWrapper(transform = val)
elif stage == Options.inference.name:
self.test_dataloader = SimCLRWrapper(transform = self.transform(self.DATA_PATH, batch_size = self.batch_size, input_height = self.image_size, copies = 1, stage = 'inference', num_threads = 2*self.cpus, device_id = self.local_rank, seed = self.seed))
self.inference_dataloader = self.test_dataloader
def train_dataloader(self):
return self.train_loader
def val_dataloader(self):
return self.val_loader
#give user permission to add extra arguments for SIMCLR model particularly
def add_model_specific_args(parent_parser):
parser = ArgumentParser(parents=[parent_parser], add_help=False)
# things we need to pass into pytorch lightning simclr model
parser.add_argument("--num_workers", default=8, type=int, help="num of workers per GPU")
parser.add_argument("--optimizer", default="adam", type=str, help="choose between adam/sgd")
parser.add_argument("--lars_wrapper", action='store_true', help="apple lars wrapper over optimizer used")
parser.add_argument('--exclude_bn_bias', action='store_true', help="exclude bn/bias from weight decay")
parser.add_argument("--warmup_epochs", default=1, type=int, help="number of warmup epochs")
parser.add_argument("--temperature", default=0.1, type=float, help="temperature parameter in training loss")
parser.add_argument("--weight_decay", default=1e-6, type=float, help="weight decay")
parser.add_argument("--start_lr", default=0, type=float, help="initial warmup learning rate")
parser.add_argument("--final_lr", type=float, default=1e-6, help="final learning rate")
return parser
import os
from termcolor import colored
import numpy as np
import math
from argparse import ArgumentParser
from termcolor import colored
from enum import Enum
from typing import Optional, Tuple
import torch
from torch.nn import functional as F
from torch import nn
from torchvision.datasets import ImageFolder
import pytorch_lightning as pl
from pl_bolts.models.self_supervised import SimSiam
from pl_bolts.utils.self_supervised import torchvision_ssl_encoder
#Internal Imports
from dali_utils.dali_transforms import SimCLRTransform #same transform as SimCLR
from dali_utils.lightning_compat import SimCLRWrapper
class MLP(nn.Module):
def __init__(self, input_dim: int = 2048, hidden_size: int = 4096, output_dim: int = 256) -> None:
super().__init__()
self.output_dim = output_dim
self.input_dim = input_dim
self.model = nn.Sequential(
nn.Linear(input_dim, hidden_size, bias=False),
nn.BatchNorm1d(hidden_size),
nn.ReLU(inplace=True),
nn.Linear(hidden_size, output_dim, bias=True),
)
def forward(self, x: torch.Tensor) -> torch.Tensor:
x = self.model(x)
return x
class SiameseArm(nn.Module):
def __init__(
self,
encoder: Optional[nn.Module] = None,
input_dim: int = 2048,
hidden_size: int = 4096,
output_dim: int = 256,
) -> None:
super().__init__()
# Encoder
self.encoder = encoder
self.embedding_size = self.encoder.embedding_size
# Projector
self.projector = MLP(input_dim, hidden_size, output_dim)
# Predictor
self.predictor = MLP(output_dim, hidden_size, output_dim)
def forward(self, x: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor, torch.Tensor]:
y = self.encoder(x)[0]
z = self.projector(y)
h = self.predictor(z)
return y, z, h
class SIMSIAM(SimSiam):
def __init__(self, encoder, DATA_PATH, VAL_PATH, hidden_dim, image_size, seed, cpus, transform = SimCLRTransform, **simsiam_hparams):
data_temp = ImageFolder(DATA_PATH)
#derived values (not passed in) need to be added to model hparams
simsiam_hparams['num_samples'] = len(data_temp)
simsiam_hparams['dataset'] = None
simsiam_hparams['max_epochs'] = simsiam_hparams['epochs']
self.DATA_PATH = DATA_PATH
self.VAL_PATH = VAL_PATH
self.hidden_dim = hidden_dim
self.transform = transform
self.image_size = image_size
self.cpus = cpus
self.seed = seed
super().__init__(**simsiam_hparams)
#overriding pl_lightning encoder after original simsiam init
self.online_network = SiameseArm(
encoder, input_dim=encoder.embedding_size, hidden_size=self.hidden_dim, output_dim=self.feat_dim
)
self.encoder = self.online_network.encoder
self.save_hyperparameters()
#override pytorch SimSiam with our own encoder so we will overwrite the function plbolts calls to init the encoder
def init_model(self):
return None
def setup(self, stage = 'inference'):
Options = Enum('Loader', 'fit test inference')
if stage == Options.fit.name:
train = self.transform(self.DATA_PATH, batch_size = self.batch_size, input_height = self.image_size, copies = 3, stage = 'train', num_threads = self.cpus, device_id = self.local_rank, seed = self.seed)
val = self.transform(self.VAL_PATH, batch_size = self.batch_size, input_height = self.image_size, copies = 3, stage = 'validation', num_threads = self.cpus, device_id = self.local_rank, seed = self.seed)
self.train_loader = SimCLRWrapper(transform = train)
self.val_loader = SimCLRWrapper(transform = val)
elif stage == Options.inference.name:
self.test_dataloader = SimCLRWrapper(transform = self.transform(self.DATA_PATH, batch_size = self.batch_size, input_height = self.image_size, copies = 1, stage = 'inference', num_threads = 2*self.cpus, device_id = self.local_rank, seed = self.seed))
self.inference_dataloader = self.test_dataloader
def train_dataloader(self):
return self.train_loader
def val_dataloader(self):
return self.val_loader
#give user permission to add extra arguments for SIMSIAM model particularly. This cannot share the name of any parameters from train.py
def add_model_specific_args(parent_parser):
parser = ArgumentParser(parents=[parent_parser], add_help=False)
# things we need to pass into pytorch lightning simsiam model
parser.add_argument("--feat_dim", default=128, type=int, help="feature dimension")
# training params
parser.add_argument("--num_workers", default=8, type=int, help="num of workers per GPU")
parser.add_argument("--optimizer", default="adam", type=str, help="choose between adam/sgd")
parser.add_argument("--lars_wrapper", action="store_true", help="apple lars wrapper over optimizer used")
parser.add_argument("--exclude_bn_bias", action="store_true", help="exclude bn/bias from weight decay")
parser.add_argument("--warmup_epochs", default=1, type=int, help="number of warmup epochs")
parser.add_argument("--temperature", default=0.1, type=float, help="temperature parameter in training loss")
parser.add_argument("--weight_decay", default=1e-6, type=float, help="weight decay")
parser.add_argument("--start_lr", default=0, type=float, help="initial warmup learning rate")
parser.add_argument("--final_lr", type=float, default=1e-6, help="final learning rate")
return parser
from torch.nn import functional as F
from torch import nn
from pl_bolts.models.self_supervised.resnets import resnet18, resnet50
class miniCNN(nn.Module):
def __init__(self, output_dim):
super().__init__()
self.output_dim = output_dim
self.conv1 = nn.Conv2d(in_channels = 3, out_channels = 16, kernel_size = 3, stride = 1, padding = 1)
self.conv2 = nn.Conv2d(in_channels = 16, out_channels = 32, kernel_size = 3, stride = 2, padding = 1)
self.conv3 = nn.Conv2d(in_channels = 32, out_channels = 48, kernel_size = 3, stride = 2, padding = 1)
self.adaptive_pool = nn.AdaptiveAvgPool2d(output_size = (16, 16))
self.conv4 = nn.Conv2d(in_channels = 48, out_channels = 64, kernel_size = 5, stride = 2, padding = 2)
self.pool = nn.MaxPool2d(2, 2)
self.flatten = nn.Flatten()
self.fc1 = nn.Linear(64*4*4, self.output_dim)
def forward(self, x):
x = F.relu(self.conv1(x))
x = self.pool(x)
x = F.relu(self.conv2(x))
x = self.pool(x)
x = F.relu(self.conv3(x))
x = F.relu(self.adaptive_pool(x))
x = F.relu(self.conv4(x))
x = self.pool(x)
x = self.flatten(x)
x = F.relu(self.fc1(x))
return [x]
\ No newline at end of file
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment