Commit 1b63b1cc authored by IamTao's avatar IamTao

update the code in the public repo.

parent 07f989cd
......@@ -108,4 +108,4 @@ EXPOSE 22
# expose port for notebook.
EXPOSE 8888
# expose port for tensorboard.
EXPOSE 6666
EXPOSE 6666
\ No newline at end of file
......@@ -29,6 +29,8 @@ ENV PATH $HOME/conda/envs/pytorch-py$PYTHON_VERSION/bin:$PATH
# install pytorch, torchvision, torchtext.
RUN git clone --recursive https://github.com/pytorch/pytorch
RUN cd pytorch && \
git checkout tags/v1.3.0 && \
git submodule sync && \
git submodule update --init && \
TORCH_CUDA_ARCH_LIST="3.5 3.7 5.2 6.0 6.1 7.0+PTX" TORCH_NVCC_FLAGS="-Xfatbin -compress-all" \
CMAKE_PREFIX_PATH="$(dirname $(which $HOME/conda/bin/conda))/../" \
......@@ -52,4 +54,4 @@ RUN $HOME/conda/bin/conda install --name pytorch-py$PYTHON_VERSION -y -c anacond
RUN $HOME/conda/bin/conda install --name pytorch-py$PYTHON_VERSION -y -c conda-forge tabulate
RUN $HOME/conda/envs/pytorch-py$PYTHON_VERSION/bin/pip install lmdb tensorboard_logger pyarrow msgpack msgpack_numpy mpi4py
RUN $HOME/conda/bin/conda install --name pytorch-py$PYTHON_VERSION -c conda-forge python-blosc
RUN $HOME/conda/bin/conda clean -ya
RUN $HOME/conda/bin/conda clean -ya
\ No newline at end of file
......@@ -73,7 +73,7 @@ def main(conf):
optimizer = create_optimizer.define_optimizer(conf, model)
# define the lr scheduler.
scheduler = create_scheduler.Scheduler(conf, optimizer)
scheduler = create_scheduler.Scheduler(conf)
# add model with data-parallel wrapper.
if conf.graph.on_cuda:
......@@ -156,7 +156,8 @@ def init_config(conf):
graph_topology=conf.graph_topology,
world=conf.world,
n_mpi_process=conf.n_mpi_process, # the # of total main processes.
n_sub_process=conf.n_sub_process, # the # of subprocess for each main process.
# the # of subprocess for each main process.
n_sub_process=conf.n_sub_process,
comm_device=conf.comm_device,
on_cuda=conf.on_cuda,
rank=cur_rank,
......@@ -188,14 +189,11 @@ def init_config(conf):
if __name__ == "__main__":
# parse the arguments.
conf = get_args()
# configure for multi-process training.
if conf.optimizer == "parallel_choco":
mp.set_start_method("forkserver", force=True)
# mp.set_start_method("spawn", force=True)
mp.set_sharing_strategy("file_system")
# enter the training procedure.
main(conf)
......@@ -69,21 +69,32 @@ def get_args():
# learning rate scheme
parser.add_argument("--lr", type=float, default=0.01)
parser.add_argument("--lr_schedule_scheme", type=str, default=None)
parser.add_argument("--lr_change_epochs", type=str, default=None)
parser.add_argument("--lr_fields", type=str, default=None)
parser.add_argument("--lr_scale_indicators", type=str, default=None)
parser.add_argument("--lr_scaleup", type=str2bool, default=False)
parser.add_argument("--lr_scaleup_type", type=str, default="linear")
parser.add_argument(
"--lr_scheduler",
"--lr_scaleup_factor",
type=str,
default="MultiStepLR",
choices=["MultiStepLR", "ExponentialLR", "ReduceLROnPlateau"],
default="graph",
help="scale by the graph connection, or the world size",
)
parser.add_argument("--lr_milestones", type=str, default=None)
parser.add_argument("--lr_decay", type=float, default=0.1)
parser.add_argument("--lr_patience", type=int, default=10)
parser.add_argument("--lr_scaleup", type=str2bool, default=False)
parser.add_argument("--lr_scaleup_init_lr", type=float, default=None)
parser.add_argument("--lr_scaleup_factor", type=str, default=None)
parser.add_argument("--lr_warmup", type=str2bool, default=False)
parser.add_argument("--lr_warmup_epochs", type=int, default=None)
parser.add_argument("--lr_warmup_epochs_upper_bound", type=int, default=150)
parser.add_argument("--lr_warmup_epochs", type=int, default=5)
parser.add_argument("--lr_decay", type=float, default=10)
parser.add_argument("--lr_onecycle_low", type=float, default=0.15)
parser.add_argument("--lr_onecycle_high", type=float, default=3)
parser.add_argument("--lr_onecycle_extra_low", type=float, default=0.0015)
parser.add_argument("--lr_onecycle_num_epoch", type=int, default=46)
parser.add_argument("--lr_gamma", type=float, default=None)
parser.add_argument("--lr_mu", type=float, default=None)
parser.add_argument("--lr_alpha", type=float, default=None)
# optimizer
parser.add_argument("--optimizer", type=str, default="sgd")
......@@ -113,12 +124,7 @@ def get_args():
parser.add_argument("--majority_vote", default=False, type=str2bool)
parser.add_argument("--consensus_stepsize", default=0.9, type=float)
parser.add_argument(
"--evaluate_consensus",
default="val",
type=str,
help="determine the dataset used for the evaluation (on averaged model).",
)
parser.add_argument("--evaluate_consensus", default=False, type=str2bool)
parser.add_argument("--mask_momentum", default=False, type=str2bool)
parser.add_argument("--clip_grad", default=False, type=str2bool)
......@@ -163,11 +169,13 @@ def get_args():
default=False,
help="evaluate model on validation set",
)
parser.add_argument("--eval_freq", default=1, type=int)
parser.add_argument("--summary_freq", default=100, type=int)
parser.add_argument("--timestamp", default=None, type=str)
parser.add_argument("--track_time", default=False, type=str2bool)
parser.add_argument("--track_detailed_time", default=False, type=str2bool)
parser.add_argument("--display_tracked_time", default=False, type=str2bool)
parser.add_argument("--evaluate_avg", default=False, type=str2bool)
# checkpoint
parser.add_argument("--resume", default=None, type=str)
......
# -*- coding: utf-8 -*-
from pcode.optim.sgd import SGD
from pcode.optim.sign_sgd import SignSGD
from pcode.optim.ef_sign_sgd import EF_SignSGD
from pcode.optim.dgc import DGC
from pcode.optim.parallel_choco import ParallelCHOCO
from pcode.optim.parallel_choco_v import ParallelCHOCO_V
from pcode.optim.ef_sign_sgd import EF_SignSGD
from pcode.optim.others.sign_sgd import SignSGD
from pcode.optim.others.local_sign_sgd import Local_SignSGD
from pcode.optim.others.local_sgd import LocalSGD
from pcode.optim.dcd_psgd import DCD_PSGD
from pcode.optim.ecd_psgd import ECD_PSGD
from pcode.optim.deep_squeeze import DeepSqueeze
from pcode.optim.others.local_ef_sign_sgd_v import Local_EFSignSGD_V
from pcode.optim.others.local_sign_sgd_v import Local_SignSGD_V
from pcode.optim.others.sign_sgd_v import SignSGD_V
from pcode.optim.others.adam import Adam
from pcode.optim.others.local_adam import LocalAdam
def define_optimizer(conf, model):
......@@ -29,26 +36,41 @@ def define_optimizer(conf, model):
optim_class = SGD
elif conf.optimizer == "dgc":
optim_class = DGC
elif conf.optimizer == "sign_sgd":
optim_class = SignSGD
elif conf.optimizer == "ef_sign_sgd":
optim_class = EF_SignSGD
elif conf.optimizer == "dcd_psgd":
optim_class = DCD_PSGD
elif conf.optimizer == "ecd_psgd":
optim_class = ECD_PSGD
elif conf.optimizer == "parallel_choco":
optim_class = ParallelCHOCO
elif conf.optimizer == "parallel_choco_v":
optim_class = ParallelCHOCO_V
elif conf.optimizer == "ef_sign_sgd":
optim_class = EF_SignSGD
elif conf.optimizer == "sign_sgd":
optim_class = SignSGD
elif conf.optimizer == "local_sign_sgd":
optim_class = Local_SignSGD
elif conf.optimizer == "local_sgd":
optim_class = LocalSGD
elif conf.optimizer == "deep_squeeze":
optim_class = DeepSqueeze
elif conf.optimizer == "sign_sgd_v":
optim_class = SignSGD_V
elif conf.optimizer == "local_sign_sgd_v":
optim_class = Local_SignSGD_V
elif conf.optimizer == "local_ef_sign_sgd_v":
optim_class = Local_EFSignSGD_V
elif conf.optimizer == "adam":
optim_class = Adam
elif conf.optimizer == "local_adam":
optim_class = LocalAdam
else:
raise NotImplementedError
optimizer = optim_class(
return optim_class(
params,
lr=conf.lr,
momentum=conf.momentum_factor,
nesterov=conf.use_nesterov,
conf=conf,
)
return optimizer
This diff is collapsed.
......@@ -5,23 +5,21 @@ import torch
import torchvision.transforms as transforms
__imagenet_stats = {'mean': [0.485, 0.456, 0.406],
'std': [0.229, 0.224, 0.225]}
__imagenet_stats = {"mean": [0.485, 0.456, 0.406], "std": [0.229, 0.224, 0.225]}
__imagenet_pca = {
'eigval': torch.Tensor([0.2175, 0.0188, 0.0045]),
'eigvec': torch.Tensor([
[-0.5675, 0.7192, 0.4009],
[-0.5808, -0.0045, -0.8140],
[-0.5836, -0.6948, 0.4203],
])
"eigval": torch.Tensor([0.2175, 0.0188, 0.0045]),
"eigvec": torch.Tensor(
[
[-0.5675, 0.7192, 0.4009],
[-0.5808, -0.0045, -0.8140],
[-0.5836, -0.6948, 0.4203],
]
),
}
def scale_crop(input_size, scale_size=None, normalize=__imagenet_stats):
t_list = [
transforms.CenterCrop(input_size),
transforms.ToTensor()
]
t_list = [transforms.CenterCrop(input_size), transforms.ToTensor()]
if normalize is not None:
t_list += [transforms.Normalize(**normalize)]
if scale_size != input_size:
......@@ -30,10 +28,7 @@ def scale_crop(input_size, scale_size=None, normalize=__imagenet_stats):
def scale_random_crop(input_size, scale_size=None, normalize=__imagenet_stats):
t_list = [
transforms.RandomCrop(input_size),
transforms.ToTensor(),
]
t_list = [transforms.RandomCrop(input_size), transforms.ToTensor()]
if normalize is not None:
t_list += [transforms.Normalize(**normalize)]
if scale_size != input_size:
......@@ -69,25 +64,27 @@ def inception_color_preproccess(input_size, normalize=__imagenet_stats):
transforms.RandomResizedCrop(input_size),
transforms.RandomHorizontalFlip(),
transforms.ToTensor(),
ColorJitter(
brightness=0.4,
contrast=0.4,
saturation=0.4,
),
Lighting(0.1, __imagenet_pca['eigval'], __imagenet_pca['eigvec']),
ColorJitter(brightness=0.4, contrast=0.4, saturation=0.4),
Lighting(0.1, __imagenet_pca["eigval"], __imagenet_pca["eigvec"]),
]
if normalize is not None:
t_list += [transforms.Normalize(**normalize)]
return transforms.Compose(t_list)
def get_transform(name='imagenet', input_size=None, scale_size=None,
normalize=None, augment=True, color_process=False):
def get_transform(
name="imagenet",
input_size=None,
scale_size=None,
normalize=None,
augment=True,
color_process=False,
):
normalize = normalize or __imagenet_stats
if 'imagenet' in name:
scale_size = scale_size or (36 if 'downsampled' in name else 256)
input_size = input_size or (32 if 'downsampled' in name else 224)
if "imagenet" in name:
scale_size = scale_size or (36 if "downsampled" in name else 256)
input_size = input_size or (32 if "downsampled" in name else 224)
if augment:
if color_process:
......@@ -96,29 +93,34 @@ def get_transform(name='imagenet', input_size=None, scale_size=None,
preprocess_fn = inception_preproccess
return preprocess_fn(input_size, normalize=normalize)
else:
return scale_crop(input_size=input_size,
scale_size=scale_size, normalize=normalize)
elif 'cifar' in name:
return scale_crop(
input_size=input_size, scale_size=scale_size, normalize=normalize
)
elif "cifar" in name:
input_size = input_size or 32
if augment:
scale_size = scale_size or 40
return pad_random_crop(input_size, scale_size=scale_size,
normalize=normalize)
return pad_random_crop(
input_size, scale_size=scale_size, normalize=normalize
)
else:
scale_size = scale_size or 32
return scale_crop(input_size=input_size,
scale_size=scale_size, normalize=normalize)
elif name == 'mnist':
normalize = {'mean': [0.5], 'std': [0.5]}
return scale_crop(
input_size=input_size, scale_size=scale_size, normalize=normalize
)
elif name == "mnist":
normalize = {"mean": [0.5], "std": [0.5]}
input_size = input_size or 28
if augment:
scale_size = scale_size or 32
return pad_random_crop(input_size, scale_size=scale_size,
normalize=normalize)
return pad_random_crop(
input_size, scale_size=scale_size, normalize=normalize
)
else:
scale_size = scale_size or 32
return scale_crop(input_size=input_size,
scale_size=scale_size, normalize=normalize)
return scale_crop(
input_size=input_size, scale_size=scale_size, normalize=normalize
)
class Lighting(object):
......@@ -134,26 +136,28 @@ class Lighting(object):
return img
alpha = img.new().resize_(3).normal_(0, self.alphastd)
rgb = self.eigvec.type_as(img).clone()\
.mul(alpha.view(1, 3).expand(3, 3))\
.mul(self.eigval.view(1, 3).expand(3, 3))\
.sum(1).squeeze()
rgb = (
self.eigvec.type_as(img)
.clone()
.mul(alpha.view(1, 3).expand(3, 3))
.mul(self.eigval.view(1, 3).expand(3, 3))
.sum(1)
.squeeze()
)
return img.add(rgb.view(3, 1, 1).expand_as(img))
class Grayscale(object):
def __call__(self, img):
gs = img.clone()
gs[0].mul_(0.299).add_(0.587, gs[1]).add_(0.114, gs[2])
gs[0].mul_(0.299).add_(gs[1], alpha=0.587).add_(gs[2], alpha=0.114)
gs[1].copy_(gs[0])
gs[2].copy_(gs[0])
return gs
class Saturation(object):
def __init__(self, var):
self.var = var
......@@ -164,7 +168,6 @@ class Saturation(object):
class Brightness(object):
def __init__(self, var):
self.var = var
......@@ -175,7 +178,6 @@ class Brightness(object):
class Contrast(object):
def __init__(self, var):
self.var = var
......@@ -203,7 +205,6 @@ class RandomOrder(object):
class ColorJitter(RandomOrder):
def __init__(self, brightness=0.4, contrast=0.4, saturation=0.4):
self.transforms = []
if brightness != 0:
......
......@@ -23,21 +23,16 @@ import pcode.utils.auxiliary as auxiliary
def train_and_validate(
conf, model, criterion, scheduler, optimizer, metrics, data_loader
):
print("=>>>> start training and validation.")
print("=>>>> start training and validation.\n")
# define runtime stat tracker and start the training.
tracker_tr = RuntimeTracker(
metrics_to_track=metrics.metric_names, on_cuda=conf.graph.on_cuda
)
# prepare the dataloader for the consensus evaluation.
_data_loader = {
"val_loader": _define_cv_dataset(
conf, partition_type=None, dataset_type="train", force_shuffle=True
)
}
# get the timer.
timer = conf.timer
# break until finish expected full epoch training.
print("=>>>> enter the training.\n")
while True:
......@@ -46,6 +41,7 @@ def train_and_validate(
# configure local step.
for _input, _target in data_loader["train_loader"]:
model.train()
scheduler.step(optimizer)
# load data
with timer("load_data", epoch=scheduler.epoch_):
......@@ -59,9 +55,8 @@ def train_and_validate(
with timer("backward_pass", epoch=scheduler.epoch_):
loss.backward()
with timer("sync_and_apply_grad", epoch=scheduler.epoch_):
with timer("sync_complete", epoch=scheduler.epoch_):
n_bits_to_transmit = optimizer.step(timer=timer, scheduler=scheduler)
scheduler.step()
# display the logging info.
display_training_stat(conf, scheduler, tracker_tr, n_bits_to_transmit)
......@@ -84,9 +79,20 @@ def train_and_validate(
# evaluate (and only inference) on the whole training loader.
if (
"train" in conf.evaluate_consensus or scheduler.is_stop()
conf.evaluate_consensus or scheduler.is_stop()
) and not conf.train_fast:
# prepare the dataloader for the consensus evaluation.
_data_loader = {
"val_loader": _define_cv_dataset(
conf,
partition_type=None,
dataset_type="train",
force_shuffle=True,
)
}
# evaluate on the local model.
conf.logger.log("eval the local model on full training data.")
validate(
conf,
model,
......@@ -95,14 +101,38 @@ def train_and_validate(
scheduler,
metrics,
data_loader=_data_loader,
on_averaged_model=True,
on_dataset="train",
label="eval_local_model_on_full_training_data",
force_evaluate_on_averaged_model=False,
)
# evaluate on the averaged model.
conf.logger.log("eval the averaged model on full training data.")
copied_model = copy.deepcopy(
model.module
if "DataParallel" == model.__class__.__name__
else model
)
optimizer.world_aggregator.agg_model(copied_model, op="avg")
validate(
conf,
copied_model,
optimizer,
criterion,
scheduler,
metrics,
data_loader=_data_loader,
label="eval_averaged_model_on_full_training_data",
force_evaluate_on_averaged_model=False,
)
# determine if the training is finished.
if scheduler.is_stop():
# save json.
conf.logger.save_json()
# temporarily hack the exit parallelchoco
if optimizer.__class__.__name__ == "ParallelCHOCO":
error_handler.abort()
return
# display tracking time.
......@@ -136,14 +166,7 @@ def do_validate(conf, model, optimizer, criterion, scheduler, metrics, data_load
# wait until the whole group enters this function, and then evaluate.
print("Enter validation phase.")
performance = validate(
conf,
model,
optimizer,
criterion,
scheduler,
metrics,
data_loader=data_loader,
on_averaged_model="val" in conf.evaluate_consensus,
conf, model, optimizer, criterion, scheduler, metrics, data_loader
)
# remember best performance and display the val info.
......@@ -178,8 +201,8 @@ def validate(
scheduler,
metrics,
data_loader,
on_averaged_model=True,
on_dataset="val",
label="local_model",
force_evaluate_on_averaged_model=True,
):
"""A function for model evaluation."""
......@@ -206,26 +229,18 @@ def validate(
global_performance = tracker_te.evaluate_global_metrics()
return global_performance
# evaluate the averaged local model.
# evaluate the averaged local model on the validation dataset.
if (
conf.graph_topology != "complete"
and not conf.train_fast
and conf.evaluate_consensus
and on_averaged_model
and force_evaluate_on_averaged_model
):
conf.logger.log(
f"eval the averaged model on full {'training' if on_dataset == 'train' else 'val'} data."
)
copied_model = copy.deepcopy(
model.module if "DataParallel" == model.__class__.__name__ else model
)
optimizer.world_aggregator.agg_model(copied_model, op="avg")
_evaluate(
copied_model,
label="averaged_model_on_full_val"
if on_dataset == "val"
else "averaged_model_on_full_train",
)
_evaluate(copied_model, label="averaged_model")
# get the l2 distance of the local model to the averaged model
conf.logger.log_metric(
......@@ -238,14 +253,6 @@ def validate(
tags={"split": "test", "type": "averaged_model"},
)
# evaluate each local model
conf.logger.log(
f"eval the local model on full {'training' if on_dataset == 'train' else 'val'} data."
)
global_performance = _evaluate(
model,
label="local_model_on_full_val"
if on_dataset == "val"
else "local_model_on_full_train",
)
# evaluate each local model on the validation dataset.
global_performance = _evaluate(model, label=label)
return global_performance
# -*- coding: utf-8 -*-
from copy import deepcopy
import numpy as np
import torch
from pcode.create_dataset import load_data_batch
from pcode.utils.checkpoint import save_to_checkpoint
from pcode.utils.logging import (
display_training_stat,
......@@ -10,8 +11,10 @@ from pcode.utils.logging import (
dispaly_best_test_stat,
)
from pcode.utils.stat_tracker import RuntimeTracker
from pcode.utils.timer import Timer
from pcode.utils.auxiliary import get_model_difference
import pcode.utils.error_handler as error_handler
import pcode.utils.auxiliary as auxiliary
from pcode.create_dataset import load_data_batch
# sys.excepthook = error_handler.global_except_hook
......@@ -21,6 +24,9 @@ def train_and_validate(
conf, model, criterion, scheduler, optimizer, metrics, data_loader
):
print("=>>>> start training and validation.\n")
assert (
optimizer.__class__.__name__ != "ParallelCHOCO"
), "NLP tasks right now do not support ParallelCHOCO based on multiprocessing (please use optimizer=parallel_choco_v instead)."