Commit 07f989cd authored by IamTao's avatar IamTao

add more example scripts; enable evaluate the local model/averaged model on...

add more example scripts; enable evaluate the local model/averaged model on both of the training/val dataset.
parent b923ab8e
......@@ -2,7 +2,7 @@
Our experiments heavily rely on `Docker` and `Kubernetes`. For the detailed experimental environment setup, please refer to dockerfile under the `environments` folder.
## Use case of distributed training (centralized)
## Use case of distributed training (centralized/decentralized)
Some simple explanation of the arguments used in the code.
* Arguments related to *distributed training*:
* The `n_mpi_process` and `n_sub_process` indicates the number of nodes and the number of GPUs for each node. The data-parallel wrapper is adapted and applied locally for each node.
......@@ -16,11 +16,12 @@ Some simple explanation of the arguments used in the code.
* The `graph_topology`
* The `optimizer` will decide the type of distributed training, e.g., centralized SGD, decentralized SGD
* The `comm_op` specifies the communication compressor we can use, e.g., `sign+norm`, `random-k`, `top-k`.
* The `consensus_stepsize` determines the `consensus stepsize` for different decentralized algorithms (e.g. `parallel_choco`, `deep_squeeze`).
* Arguments related to *learning*:
* The `lr_scaleup`, `lr_warmup` and `lr_warmup_epochs` will decide if we want to scale up the learning rate, or warm up the learning rate. For more details, please check `pcode/create_scheduler.py`.
### Examples
The script below trains `ResNet-20` with `CIFAR-10`, as an example of centralized training algorithm `CHOCO`.
The script below trains `ResNet-20` with `CIFAR-10`, as an example of centralized training algorithm `CHOCO`. More examples can be found in `exps`.
```bash
OMP_NUM_THREADS=2 MKL_NUM_THREADS=2 $HOME/conda/envs/pytorch-py3.6/bin/python run.py \
--arch resnet20 --optimizer parallel_choco \
......@@ -34,6 +35,6 @@ OMP_NUM_THREADS=2 MKL_NUM_THREADS=2 $HOME/conda/envs/pytorch-py3.6/bin/python ru
--lr_scheduler MultiStepLR --lr_decay 0.1 --lr_milestones 150,225 \
--comm_op sign --consensus_stepsize 0.5 --compress_ratio 0.9 --quantize_level 16 --is_biased True \
--weight_decay 1e-4 --use_nesterov True --momentum_factor 0.9 \
--hostfile hostfile --graph_topology complete --track_time True --display_tracked_time True \
--hostfile hostfile --graph_topology ring --track_time True --display_tracked_time True \
--python_path $HOME/conda/envs/pytorch-py3.6/bin/python --mpi_path $HOME/.openmpi/
```
$HOME/conda/envs/pytorch-py3.6/bin/python run.py \
--arch rnn_lm --rnn_n_hidden 650 --rnn_n_layers 3 --rnn_bptt_len 30 \
--rnn_clip 0.4 --rnn_use_pretrained_emb False --rnn_tie_weights True --drop_rate 0.40 \
--optimizer sgd --avg_model True --experiment test \
--data wikitext2 --pin_memory True \
--batch_size 32 --base_batch_size 24 --num_workers 2 \
--num_epochs 300 --partition_data random --reshuffle_per_epoch False --stop_criteria epoch \
--n_mpi_process 32 --n_sub_process 1 --world 0,0,0,0,0,0,0,0,1,1,1,1,1,1,1,1,0,0,0,0,0,0,0,0,1,1,1,1,1,1,1,1 --on_cuda True --use_ipc False --comm_device cuda \
--lr 2.5 --lr_scaleup True --lr_scaleup_factor graph --lr_warmup True --lr_warmup_epochs 5 \
--lr_scheduler MultiStepLR --lr_decay 0.1 --lr_milestones 150,225 \
--weight_decay 0 --use_nesterov False --momentum_factor 0 \
--hostfile hostfile --graph_topology social --track_time True --display_tracked_time True \
--python_path $HOME/conda/envs/pytorch-py3.6/bin/python --mpi_path $HOME/.openmpi/
# centralized sgd with complete topology.
$HOME/conda/envs/pytorch-py3.6/bin/python run.py \
--arch resnet20 --optimizer sgd \
--avg_model True --experiment test \
--data cifar10 --pin_memory True \
--batch_size 128 --base_batch_size 64 --num_workers 0 \
--num_epochs 300 --partition_data random --reshuffle_per_epoch True --stop_criteria epoch \
--n_mpi_process 8 --n_sub_process 1 --world 0,0,0,0,0,0,0,0 --on_cuda True --use_ipc False --comm_device cuda \
--lr 0.1 --lr_scaleup True --lr_scaleup_factor graph --lr_warmup True --lr_warmup_epochs 5 \
--lr_scheduler MultiStepLR --lr_decay 0.1 --lr_milestones 150,225 \
--weight_decay 1e-4 --use_nesterov True --momentum_factor 0.9 \
--hostfile hostfile --graph_topology complete --track_time True --display_tracked_time True \
--python_path $HOME/conda/envs/pytorch-py3.6/bin/python --mpi_path $HOME/.openmpi/
# decentralized sgd with ring topology.
$HOME/conda/envs/pytorch-py3.6/bin/python run.py \
--arch resnet20 --optimizer sgd \
--avg_model True --experiment test \
--data cifar10 --pin_memory True \
--batch_size 128 --base_batch_size 64 --num_workers 0 \
--num_epochs 300 --partition_data random --reshuffle_per_epoch True --stop_criteria epoch \
--n_mpi_process 8 --n_sub_process 1 --world 0,0,0,0,0,0,0,0 --on_cuda True --use_ipc False --comm_device cuda \
--lr 0.1 --lr_scaleup True --lr_scaleup_factor graph --lr_warmup True --lr_warmup_epochs 5 \
--lr_scheduler MultiStepLR --lr_decay 0.1 --lr_milestones 150,225 \
--weight_decay 1e-4 --use_nesterov True --momentum_factor 0.9 \
--hostfile hostfile --graph_topology ring --track_time True --display_tracked_time True \
--python_path $HOME/conda/envs/pytorch-py3.6/bin/python --mpi_path $HOME/.openmpi/
# parallel_choco with sign + norm for ring topology
$HOME/conda/envs/pytorch-py3.6/bin/python run.py \
--arch resnet20 --optimizer parallel_choco \
--avg_model True --experiment test \
--data cifar10 --pin_memory True \
--batch_size 128 --base_batch_size 64 --num_workers 0 \
--num_epochs 300 --partition_data random --reshuffle_per_epoch True --stop_criteria epoch \
--n_mpi_process 8 --n_sub_process 1 --world 0,0,0,0,0,0,0,0 --on_cuda True --use_ipc False --comm_device cuda \
--lr 0.1 --lr_scaleup True --lr_scaleup_factor graph --lr_warmup True --lr_warmup_epochs 5 \
--lr_scheduler MultiStepLR --lr_decay 0.1 --lr_milestones 150,225 \
--weight_decay 1e-4 --use_nesterov True --momentum_factor 0.9 \
--comm_op sign --consensus_stepsize 0.4 --compress_ratio 0.9 --quantize_level 16 --is_biased True \
--hostfile hostfile --graph_topology ring --track_time True --display_tracked_time True \
--python_path $HOME/conda/envs/pytorch-py3.6/bin/python --mpi_path $HOME/.openmpi/
# parallel_choco with sign + norm for social topology
$HOME/conda/envs/pytorch-py3.6/bin/python run.py \
--arch resnet20 --optimizer parallel_choco \
--avg_model True --experiment test \
--data cifar10 --pin_memory True \
--batch_size 128 --base_batch_size 64 --num_workers 0 \
--num_epochs 300 --partition_data random --reshuffle_per_epoch True --stop_criteria epoch \
--n_mpi_process 32 --n_sub_process 1 --world 0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0 --on_cuda True --use_ipc False --comm_device cuda \
--lr 0.1 --lr_scaleup True --lr_scaleup_factor graph --lr_warmup True --lr_warmup_epochs 5 \
--lr_scheduler MultiStepLR --lr_decay 0.1 --lr_milestones 150,225 \
--weight_decay 1e-4 --use_nesterov True --momentum_factor 0.9 \
--comm_op sign --consensus_stepsize 0.4 --compress_ratio 0.9 --quantize_level 16 --is_biased True \
--hostfile hostfile --graph_topology social --track_time True --display_tracked_time True \
--python_path $HOME/conda/envs/pytorch-py3.6/bin/python --mpi_path $HOME/.openmpi/
# dcd_psgd with quantize_qsgd for ring topology
$HOME/conda/envs/pytorch-py3.6/bin/python run.py \
--arch resnet20 --optimizer dcd_psgd \
--avg_model True --experiment test \
--data cifar10 --pin_memory True \
--batch_size 128 --base_batch_size 64 --num_workers 0 \
--num_epochs 300 --partition_data random --reshuffle_per_epoch True --stop_criteria epoch \
--n_mpi_process 8 --n_sub_process 1 --world 0,0,0,0,0,0,0,0 --on_cuda True --use_ipc False --comm_device cuda \
--lr 0.1 --lr_scaleup True --lr_scaleup_factor graph --lr_warmup True --lr_warmup_epochs 5 \
--lr_scheduler MultiStepLR --lr_decay 0.1 --lr_milestones 150,225 \
--weight_decay 1e-4 --use_nesterov True --momentum_factor 0.9 \
--comm_op quantize_qsgd --compress_ratio 0.9 --quantize_level 16 --is_biased True \
--hostfile hostfile --graph_topology ring --track_time True --display_tracked_time True \
--python_path $HOME/conda/envs/pytorch-py3.6/bin/python --mpi_path $HOME/.openmpi/
# ecd_psgd with compress_top_k for ring topology
$HOME/conda/envs/pytorch-py3.6/bin/python run.py \
--arch resnet20 --optimizer ecd_psgd \
--avg_model True --experiment test \
--data cifar10 --pin_memory True \
--batch_size 128 --base_batch_size 64 --num_workers 0 \
--num_epochs 300 --partition_data random --reshuffle_per_epoch True --stop_criteria epoch \
--n_mpi_process 8 --n_sub_process 1 --world 0,0,0,0,0,0,0,0 --on_cuda True --use_ipc False --comm_device cuda \
--lr 0.1 --lr_scaleup True --lr_scaleup_factor graph --lr_warmup True --lr_warmup_epochs 5 \
--lr_scheduler MultiStepLR --lr_decay 0.1 --lr_milestones 150,225 \
--weight_decay 1e-4 --use_nesterov True --momentum_factor 0.9 \
--comm_op compress_top_k --compress_ratio 0.9 --quantize_level 16 --is_biased True \
--hostfile hostfile --graph_topology ring --track_time True --display_tracked_time True \
--python_path $HOME/conda/envs/pytorch-py3.6/bin/python --mpi_path $HOME/.openmpi/
/home/lin/conda/envs/pytorch-py3.6/bin/python run.py \
--arch resnet50 --optimizer parallel_choco \
--avg_model True --experiment test \
--data imagenet --use_lmdb_data True --data_dir /mlodata1/tlin/dataset/ILSVRC/ --pin_memory True \
--batch_size 512 --base_batch_size 256 --num_workers 2 \
--num_epochs 90 --partition_data random --reshuffle_per_epoch True --stop_criteria epoch \
--on_cuda True --n_mpi_process 8 --n_sub_process 4 --world 0,1,2,3,0,1,2,3,0,1,2,3,0,1,2,3,0,1,2,3,0,1,2,3,0,1,2,3,0,1,2,3 \
--lr 0.1 --lr_scaleup True --lr_scaleup_factor graph --lr_warmup True --lr_warmup_epochs 5 \
--lr_scheduler MultiStepLR --lr_decay 0.1 --lr_milestones 30,60,80 \
--weight_decay 1e-4 --use_nesterov True --momentum_factor 0.9 \
--comm_op sign --consensus_stepsize 0.5 \
--hostfile hostfile --graph_topology ring --track_time True --track_detailed_time True --display_tracked_time True \
--python_path /home/lin/conda/envs/pytorch-py3.6/bin/python --mpi_path /home/lin/.openmpi/ --summary_freq 100 \
--backend mpi --work_dir /mlodata1/tlin/decentralized_code --remote_exec False --clean_python False --mpi_env LD_LIBRARY_PATH=/home/lin/.openmpi/lib:/usr/local/nvidia/lib:/usr/local/nvidia/lib64
......@@ -113,7 +113,12 @@ def get_args():
parser.add_argument("--majority_vote", default=False, type=str2bool)
parser.add_argument("--consensus_stepsize", default=0.9, type=float)
parser.add_argument("--evaluate_consensus", default=False, type=str2bool)
parser.add_argument(
"--evaluate_consensus",
default="val",
type=str,
help="determine the dataset used for the evaluation (on averaged model).",
)
parser.add_argument("--mask_momentum", default=False, type=str2bool)
parser.add_argument("--clip_grad", default=False, type=str2bool)
......
# -*- coding: utf-8 -*-
import gc
import copy
import numpy as np
import torch
import torch.distributed as dist
from pcode.create_dataset import define_dataset, load_data_batch
from pcode.create_dataset import define_dataset, load_data_batch, _define_cv_dataset
from pcode.utils.checkpoint import save_to_checkpoint
from pcode.utils.logging import (
......@@ -14,6 +15,7 @@ from pcode.utils.logging import (
)
from pcode.utils.stat_tracker import RuntimeTracker
import pcode.utils.error_handler as error_handler
import pcode.utils.auxiliary as auxiliary
# sys.excepthook = error_handler.global_except_hook
......@@ -27,6 +29,13 @@ def train_and_validate(
metrics_to_track=metrics.metric_names, on_cuda=conf.graph.on_cuda
)
# prepare the dataloader for the consensus evaluation.
_data_loader = {
"val_loader": _define_cv_dataset(
conf, partition_type=None, dataset_type="train", force_shuffle=True
)
}
# get the timer.
timer = conf.timer
# break until finish expected full epoch training.
......@@ -73,6 +82,23 @@ def train_and_validate(
# refresh the logging cache at the begining of each epoch.
tracker_tr.reset()
# evaluate (and only inference) on the whole training loader.
if (
"train" in conf.evaluate_consensus or scheduler.is_stop()
) and not conf.train_fast:
# evaluate on the local model.
validate(
conf,
model,
optimizer,
criterion,
scheduler,
metrics,
data_loader=_data_loader,
on_averaged_model=True,
on_dataset="train",
)
# determine if the training is finished.
if scheduler.is_stop():
# save json.
......@@ -110,7 +136,14 @@ def do_validate(conf, model, optimizer, criterion, scheduler, metrics, data_load
# wait until the whole group enters this function, and then evaluate.
print("Enter validation phase.")
performance = validate(
conf, model, optimizer, criterion, scheduler, metrics, data_loader
conf,
model,
optimizer,
criterion,
scheduler,
metrics,
data_loader=data_loader,
on_averaged_model="val" in conf.evaluate_consensus,
)
# remember best performance and display the val info.
......@@ -145,7 +178,8 @@ def validate(
scheduler,
metrics,
data_loader,
label="local_model",
on_averaged_model=True,
on_dataset="val",
):
"""A function for model evaluation."""
......@@ -172,6 +206,46 @@ def validate(
global_performance = tracker_te.evaluate_global_metrics()
return global_performance
# evaluate each local model on the validation dataset.
global_performance = _evaluate(model, label=label)
# evaluate the averaged local model.
if (
conf.graph_topology != "complete"
and not conf.train_fast
and conf.evaluate_consensus
and on_averaged_model
):
conf.logger.log(
f"eval the averaged model on full {'training' if on_dataset == 'train' else 'val'} data."
)
copied_model = copy.deepcopy(
model.module if "DataParallel" == model.__class__.__name__ else model
)
optimizer.world_aggregator.agg_model(copied_model, op="avg")
_evaluate(
copied_model,
label="averaged_model_on_full_val"
if on_dataset == "val"
else "averaged_model_on_full_train",
)
# get the l2 distance of the local model to the averaged model
conf.logger.log_metric(
name="stat",
values={
"rank": conf.graph.rank,
"epoch": scheduler.epoch_,
"distance": auxiliary.get_model_difference(model, copied_model),
},
tags={"split": "test", "type": "averaged_model"},
)
# evaluate each local model
conf.logger.log(
f"eval the local model on full {'training' if on_dataset == 'train' else 'val'} data."
)
global_performance = _evaluate(
model,
label="local_model_on_full_val"
if on_dataset == "val"
else "local_model_on_full_train",
)
return global_performance
......@@ -2,6 +2,7 @@
import numpy as np
import torch
from pcode.create_dataset import load_data_batch
from pcode.utils.checkpoint import save_to_checkpoint
from pcode.utils.logging import (
display_training_stat,
......@@ -10,7 +11,7 @@ from pcode.utils.logging import (
)
from pcode.utils.stat_tracker import RuntimeTracker
import pcode.utils.error_handler as error_handler
from pcode.create_dataset import load_data_batch
import pcode.utils.auxiliary as auxiliary
# sys.excepthook = error_handler.global_except_hook
......@@ -53,13 +54,13 @@ def train_and_validate(
_input = batch.text[
:,
conf.graph.rank
* conf.batch_size : (conf.graph.rank + 1)
* conf.batch_size: (conf.graph.rank + 1)
* conf.batch_size,
]
_target = batch.target[
:,
conf.graph.rank
* conf.batch_size : (conf.graph.rank + 1)
* conf.batch_size: (conf.graph.rank + 1)
* conf.batch_size,
]
_input, _target = load_data_batch(conf, _input, _target)
......@@ -83,12 +84,14 @@ def train_and_validate(
with timer("sync_complete", epoch=scheduler.epoch_):
# `clip_grad_norm` helps prevent the exploding gradient problem in RNNs / LSTMs.
torch.nn.utils.clip_grad_norm_(model.parameters(), conf.rnn_clip)
torch.nn.utils.clip_grad_norm_(
model.parameters(), conf.rnn_clip)
n_bits_to_transmit = optimizer.step(timer=timer)
scheduler.step()
# display the logging info.
display_training_stat(conf, scheduler, tracker_tr, n_bits_to_transmit)
display_training_stat(
conf, scheduler, tracker_tr, n_bits_to_transmit)
# finish one epoch training and to decide if we want to val our model.
if scheduler.epoch_ % 1 == 0:
......@@ -123,10 +126,12 @@ def train_and_validate(
def inference(conf, model, criterion, metrics, _input, _target, _hidden, tracker=None):
"""Inference on the given model and get loss and accuracy."""
output, _hidden = model(_input, _hidden)
loss = criterion(output.view(-1, conf.n_tokens), _target.contiguous().view(-1))
loss = criterion(output.view(-1, conf.n_tokens),
_target.contiguous().view(-1))
performance = metrics.evaluate(loss, output, _target)
if tracker is not None:
tracker.update_metrics([loss.item()] + performance, n_samples=_input.size(0))
tracker.update_metrics(
[loss.item()] + performance, n_samples=_input.size(0))
return loss, _hidden
......@@ -169,6 +174,7 @@ def validate(
metrics,
data_loader,
label="local_model",
force_evaluate_on_averaged_model=True,
):
"""A function for model evaluation."""
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment