Commit 9c966420 authored by Yin, Junqi's avatar Yin, Junqi

add DDP support

parent 61bededb
......@@ -16,7 +16,9 @@ single_rank_per_node=false
EXPERIMENT=resnet50
# centralized: complete; decentralized: ring, torus, expander, margulis_expander, social
TOPOLOGY=ring
TOPOLOGY=complete
# ddp only supports complete topology
DDP=True
# stage data to nvme
if [ "$EXPERIMENT" == "resnet50" ]; then
......@@ -38,6 +40,7 @@ NOW=$(date '+%Y%m%d%H%M%S')
sed -i "s/TODO_GPURANKS/$WORLD/" run.sh
sed -i "s/TODO_TOPOLOGY/$TOPOLOGY/" run.sh
sed -i "s/TODO_TIMESTAMP/$NOW/" run.sh
sed -i "s/TODO_DDP/$DDP/" run.sh
if [ "$single_rank_per_node" = true ]; then
jsrun -n${NNODES} -a1 -g6 -c42 -r1 --smpiargs "-gpu" --bind=rs --launch_distribution=packed ./run.sh
......
......@@ -22,6 +22,7 @@ import pcode.utils.stat_tracker as stat_tracker
import pcode.utils.logging as logging
from pcode.utils.timer import Timer
import platform
from torch.nn.parallel import DistributedDataParallel as DDP
def print_neighbors(conf, save=True):
neighbors_info = conf.graph.get_neighborhood()
......@@ -40,7 +41,7 @@ def print_neighbors(conf, save=True):
def setup_ddp(backend):
""""Initialize Apex DDP"""
""""Initialize DDP"""
import subprocess
try:
get_master = "echo $(cat {} | sort | uniq | grep -v batch | grep -v login | head -1)".format(os.environ['LSB_DJOB_HOSTFILE'])
......@@ -106,16 +107,21 @@ def main(conf):
model = create_model.define_model(conf, data_loader=data_loader)
# define the optimizer.
optimizer = create_optimizer.define_optimizer(conf, model)
if not conf.ddp:
optimizer = create_optimizer.define_optimizer(conf, model)
# define the lr scheduler.
scheduler = create_scheduler.Scheduler(conf)
# add model with data-parallel wrapper.
if conf.graph.on_cuda:
if conf.n_sub_process > 1:
if conf.n_sub_process > 1 and not conf.ddp:
model = torch.nn.DataParallel(model, device_ids=conf.graph.device)
if conf.ddp:
model = DDP(model)
optimizer = torch.optim.SGD(model.module.parameters(),
lr=conf.lr,
momentum=conf.momentum_factor)
# (optional) reload checkpoint
try:
checkpoint.maybe_resume_from_checkpoint(conf, model, optimizer, scheduler)
......@@ -136,7 +142,7 @@ def main(conf):
criterion = nn.CrossEntropyLoss(reduction="mean")
criterion = criterion.cuda() if conf.graph.on_cuda else criterion
metrics = create_metrics.Metrics(
model.module if "DataParallel" == model.__class__.__name__ else model,
model.module if "DataParallel" in model.__class__.__name__ else model,
task="language_modeling",
)
......@@ -156,7 +162,7 @@ def main(conf):
criterion = nn.CrossEntropyLoss(reduction="mean")
criterion = criterion.cuda() if conf.graph.on_cuda else criterion
metrics = create_metrics.Metrics(
model.module if "DataParallel" == model.__class__.__name__ else model,
model.module if "DataParallel" in model.__class__.__name__ else model,
task="classification",
)
......
......@@ -228,6 +228,7 @@ def get_args():
parser.add_argument("--comm_device", type=str, default="cuda")
parser.add_argument("--local_rank", default=None, type=str)
parser.add_argument("--clean_python", default=False, type=str2bool)
parser.add_argument("--ddp", type=str2bool, default=False)
# parse conf.
conf = parser.parse_args()
......
......@@ -56,8 +56,11 @@ def train_and_validate(
loss.backward()
with timer("sync_complete", epoch=scheduler.epoch_):
n_bits_to_transmit = optimizer.step(timer=timer, scheduler=scheduler)
if not conf.ddp:
n_bits_to_transmit = optimizer.step(timer=timer, scheduler=scheduler)
else:
optimizer.step()
n_bits_to_transmit = np.nan
# display the logging info.
display_training_stat(conf, scheduler, tracker_tr, n_bits_to_transmit)
......@@ -80,7 +83,7 @@ def train_and_validate(
# evaluate (and only inference) on the whole training loader.
if (
conf.evaluate_consensus or scheduler.is_stop()
) and not conf.train_fast and conf.data != "imagenet":
) and not conf.train_fast and not conf.ddp and conf.data != "imagenet":
# prepare the dataloader for the consensus evaluation.
_data_loader = {
"val_loader": _define_cv_dataset(
......
......@@ -78,4 +78,5 @@ python -u main.py \
--n_sub_process TODO_NSUB \
--world TODO_GPURANKS \
--on_cuda True \
--comm_device cuda
--comm_device cuda \
--ddp TODO_DDP
......@@ -29,38 +29,12 @@ python -u main.py \
--lr_onecycle_num_epoch 46 \
--lr_schedule_scheme custom_one_cycle \
--optimizer sgd \
--adam_beta_1 0.9 \
--adam_beta_2 0.999 \
--adam_eps 1e-08 \
--graph_topology TODO_TOPOLOGY \
--comm_op sign \
--compress_ratio 0.9 \
--compress_warmup_values 0.75,0.9375,0.984375,0.996,0.999 \
--compress_warmup_epochs 0 \
--quantize_level 16 \
--is_biased True \
--majority_vote False \
--consensus_stepsize 0.5 \
--evaluate_consensus False \
--mask_momentum False \
--clip_grad False \
--local_step 1 \
--turn_on_local_step_from 0 \
--momentum_factor 0.9 \
--use_nesterov True \
--weight_decay 0.0001 \
--drop_rate 0.0 \
--densenet_growth_rate 12 \
--densenet_bc_mode False \
--densenet_compression 0.5 \
--wideresnet_widen_factor 4 \
--rnn_n_hidden 200 \
--rnn_n_layers 2 \
--rnn_bptt_len 35 \
--rnn_clip 0.25 \
--rnn_use_pretrained_emb True \
--rnn_tie_weights True \
--rnn_weight_norm False \
--manual_seed 6 \
--evaluate False \
--eval_freq 1 \
......@@ -72,8 +46,6 @@ python -u main.py \
--evaluate_avg False \
--checkpoint ./data/checkpoint \
--save_all_models False \
--user lin \
--project distributed_adam_type_algorithm \
--experiment test \
--backend mpi \
--use_ipc False \
......@@ -82,4 +54,5 @@ python -u main.py \
--n_sub_process TODO_NSUB \
--world TODO_GPURANKS \
--on_cuda True \
--comm_device cuda
--comm_device cuda \
--ddp TODO_DDP
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment