Unverified Commit c7715b36 authored by Tao Lin's avatar Tao Lin Committed by GitHub

Merge pull request #2 from IamTao/master

update the ChocoSGD code in the public repo.
parents 129295de e08e4a5c
*iccluster
*.vscode
*Makefile
*test.sh
\ No newline at end of file
......@@ -22,10 +22,11 @@ If you use the code, please cite the following papers:
```
and
```
@article{koloskova2019decentralized,
@inproceedings{koloskova*2020decentralized,
title={Decentralized Deep Learning with Arbitrary Communication Compression},
author={Koloskova, Anastasia and Lin, Tao and Stich, Sebastian U and Jaggi, Martin},
journal={arXiv preprint arXiv:1907.09356},
year={2019}
author={Anastasia Koloskova* and Tao Lin* and Sebastian U Stich and Martin Jaggi},
booktitle={International Conference on Learning Representations},
year={2020},
url={https://openreview.net/forum?id=SkgGCkrKvH}
}
```
\ No newline at end of file
*.vscode
Makefile
\ No newline at end of file
# CHOCO-SGD
Deep Learning code for the main experiments of the paper [Decentralized Deep Learning with Arbitrary Communication Compression](https://arxiv.org/abs/1907.09356).
## Getting started
# Getting started
Our experiments heavily rely on `Docker` and `Kubernetes`. For the detailed experimental environment setup, please refer to dockerfile under the `environments` folder.
### Use case of distributed training (centralized/decentralized)
## Use case of distributed training (centralized/decentralized)
Some simple explanation of the arguments used in the code.
* Arguments related to *distributed training*:
* The `n_mpi_process` and `n_sub_process` indicates the number of nodes and the number of GPUs for each node. The data-parallel wrapper is adapted and applied locally for each node.
......@@ -20,24 +16,25 @@ Some simple explanation of the arguments used in the code.
* The `graph_topology`
* The `optimizer` will decide the type of distributed training, e.g., centralized SGD, decentralized SGD
* The `comm_op` specifies the communication compressor we can use, e.g., `sign+norm`, `random-k`, `top-k`.
* The `choco_consenus_stepsize` determines the `consenus_stepsize` for `parallel_choco`.
* The `consensus_stepsize` determines the `consensus stepsize` for different decentralized algorithms (e.g. `parallel_choco`, `deep_squeeze`).
* Arguments related to *learning*:
* The `lr_schedule_scheme` and `lr_change_epochs` indicates that it is a stepwise learning rate schedule, with decay factor `10` for epoch `150` and `225`.
* The `lr_scaleup`, `lr_warmup` and `lr_warmup_epochs` will decide if we want to scale up the learning rate, or warm up the learning rate.
* The `lr_scaleup`, `lr_warmup` and `lr_warmup_epochs` will decide if we want to scale up the learning rate, or warm up the learning rate. For more details, please check `pcode/create_scheduler.py`.
The script below trains `ResNet-20` with `CIFAR-10`, as an example of decentralized training algorithm `parallel_choco` with `sign+norm` communication compressor.
### Examples
The script below trains `ResNet-20` with `CIFAR-10`, as an example of centralized training algorithm `CHOCO`. More examples can be found in `exps`.
```bash
$HOME/conda/envs/pytorch-py3.6/bin/python run.py \
OMP_NUM_THREADS=2 MKL_NUM_THREADS=2 $HOME/conda/envs/pytorch-py3.6/bin/python run.py \
--arch resnet20 --optimizer parallel_choco \
--avg_model True --experiment demo \
--avg_model True --experiment demo --manual_seed 6 \
--data cifar10 --pin_memory True \
--batch_size 128 --base_batch_size ${base_batch_size[j]} --num_workers 2 --eval_freq 1 \
--batch_size 128 --base_batch_size 64 --num_workers 2 \
--num_epochs 300 --partition_data random --reshuffle_per_epoch True --stop_criteria epoch \
--n_mpi_process 16 --n_sub_process 1 --world 0,0,0,0,0,0,0,0 --on_cuda True --use_ipc False --comm_device cuda \
--lr 0.1 --lr_scaleup True --lr_scaleup_factor graph --lr_warmup True --lr_warmup_epochs 5 \
--lr_schedule_scheme custom_multistep --lr_change_epochs 150,225 --lr_decay 10 \
--n_mpi_process 16 --n_sub_process 1 --world 0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0 \
--on_cuda True --use_ipc False \
--lr 0.1 --lr_scaleup True --lr_warmup True --lr_warmup_epochs 5 \
--lr_scheduler MultiStepLR --lr_decay 0.1 --lr_milestones 150,225 \
--comm_op sign --consensus_stepsize 0.5 --compress_ratio 0.9 --quantize_level 16 --is_biased True \
--weight_decay 1e-4 --use_nesterov True --momentum_factor 0.9 \
--comm_op sign --choco_consenus_stepsize 0.5 --compress_ratio 0.9 --quantize_level 16 --is_biased True \
--hostfile iccluster/hostfile --graph_topology ring --track_time True --display_tracked_time True \
--python_path $HOME/conda/envs/pytorch-py3.6/bin/python --mpi_path $HOME/.openmpi/ --evaluate_avg True
```
\ No newline at end of file
--hostfile hostfile --graph_topology ring --track_time True --display_tracked_time True \
--python_path $HOME/conda/envs/pytorch-py3.6/bin/python --mpi_path $HOME/.openmpi/
```
......@@ -108,4 +108,4 @@ EXPOSE 22
# expose port for notebook.
EXPOSE 8888
# expose port for tensorboard.
EXPOSE 6666
EXPOSE 6666
\ No newline at end of file
......@@ -23,28 +23,35 @@ RUN $HOME/conda/bin/conda update -n base conda
RUN $HOME/conda/bin/conda create -y --name pytorch-py$PYTHON_VERSION python=$PYTHON_VERSION numpy pyyaml scipy ipython mkl mkl-include
RUN $HOME/conda/bin/conda install --name pytorch-py$PYTHON_VERSION -c soumith magma-cuda100
RUN $HOME/conda/bin/conda install --name pytorch-py$PYTHON_VERSION scikit-learn
RUN $HOME/conda/envs/pytorch-py3.6/bin/pip install pytelegraf pymongo influxdb kubernetes jinja2
RUN $HOME/conda/envs/pytorch-py$PYTHON_VERSION/bin/pip install pytelegraf pymongo influxdb kubernetes jinja2
ENV PATH $HOME/conda/envs/pytorch-py$PYTHON_VERSION/bin:$PATH
# install pytorch, torchvision, torchtext.
RUN git clone --recursive https://github.com/pytorch/pytorch
RUN cd pytorch && \
git checkout tags/v1.3.0 && \
git submodule sync && \
git submodule update --init && \
TORCH_CUDA_ARCH_LIST="3.5 3.7 5.2 6.0 6.1 7.0+PTX" TORCH_NVCC_FLAGS="-Xfatbin -compress-all" \
CMAKE_PREFIX_PATH="$(dirname $(which $HOME/conda/bin/conda))/../" \
pip install -v .
RUN git clone https://github.com/pytorch/vision.git && cd vision && pip install -v .
RUN $HOME/conda/envs/pytorch-py3.6/bin/pip install --upgrade git+https://github.com/pytorch/text
RUN $HOME/conda/envs/pytorch-py3.6/bin/pip install spacy
RUN $HOME/conda/envs/pytorch-py3.6/bin/python -m spacy download en
RUN git clone https://github.com/pytorch/vision.git && cd vision && git checkout v0.4.0 && python setup.py install
RUN $HOME/conda/envs/pytorch-py$PYTHON_VERSION/bin/pip install --upgrade git+https://github.com/pytorch/text
RUN $HOME/conda/envs/pytorch-py$PYTHON_VERSION/bin/pip install spacy
RUN $HOME/conda/envs/pytorch-py$PYTHON_VERSION/bin/python -m spacy download en
RUN $HOME/conda/envs/pytorch-py$PYTHON_VERSION/bin/python -m spacy download de
# install bit2byte.
RUN git clone https://github.com/tvogels/signSGD-with-Majority-Vote.git && \
cd signSGD-with-Majority-Vote/main/bit2byte-extension/ && \
$HOME/conda/envs/pytorch-py3.6/bin/python setup.py develop --user
$HOME/conda/envs/pytorch-py$PYTHON_VERSION/bin/python setup.py develop --user
# install other python related softwares.
RUN $HOME/conda/bin/conda install --name pytorch-py$PYTHON_VERSION -y opencv protobuf
RUN $HOME/conda/bin/conda install --name pytorch-py$PYTHON_VERSION -y networkx
RUN $HOME/conda/envs/pytorch-py3.6/bin/pip install lmdb tensorboard_logger pyarrow msgpack msgpack_numpy mpi4py
RUN $HOME/conda/bin/conda install --name pytorch-py$PYTHON_VERSION -c conda-forge python-blosc
\ No newline at end of file
RUN $HOME/conda/bin/conda install --name pytorch-py$PYTHON_VERSION -y -c anaconda pandas
RUN $HOME/conda/bin/conda install --name pytorch-py$PYTHON_VERSION -y -c conda-forge tabulate
RUN $HOME/conda/envs/pytorch-py$PYTHON_VERSION/bin/pip install lmdb tensorboard_logger pyarrow msgpack msgpack_numpy mpi4py
RUN $HOME/conda/bin/conda install --name pytorch-py$PYTHON_VERSION -c conda-forge python-blosc
RUN $HOME/conda/bin/conda clean -ya
\ No newline at end of file
......@@ -10,4 +10,4 @@ $HOME/conda/envs/pytorch-py3.6/bin/python run.py \
--lr_schedule_scheme custom_multistep --lr_change_epochs 150,225 --lr_decay 10 \
--weight_decay 0 --use_nesterov False --momentum_factor 0 \
--hostfile iccluster/hostfile2 --graph_topology social --track_time True --display_tracked_time True \
--python_path $HOME/conda/envs/pytorch-py3.6/bin/python --mpi_path $HOME/.openmpi/ --evaluate_avg True
\ No newline at end of file
--python_path $HOME/conda/envs/pytorch-py3.6/bin/python --mpi_path $HOME/.openmpi/ --evaluate_avg True
/home/lin/conda/envs/pytorch-py3.6/bin/python run.py \
--arch resnet50 --optimizer sgd \
--avg_model True --experiment plain_decentralized_timing \
--data imagenet --use_lmdb_data True --data_dir /mlodata1/ILSVRC/ --pin_memory True \
--batch_size 128 --base_batch_size 256 --num_workers 16 --eval_freq 1 \
--num_epochs 90 --partition_data random --reshuffle_per_epoch True --stop_criteria epoch \
--on_cuda True --n_mpi_process 8 --n_sub_process 4 --world 0,1,2,3,0,1,2,3,0,1,2,3,0,1,2,3,0,1,2,3,0,1,2,3,0,1,2,3,0,1,2,3 \
--lr 0.1 --lr_scaleup True --lr_scaleup_factor graph --lr_warmup True --lr_warmup_epochs 5 \
--lr_schedule_scheme custom_multistep --lr_change_epochs 30,60,80 \
--weight_decay 1e-4 --use_nesterov True --momentum_factor 0.9 \
--comm_op quantize_qsgd --consensus_stepsize 0.4 --compress_ratio 0.9 --quantize_level 32 --is_biased True \
--hostfile gcloud/hostfile --graph_topology ring --track_time True --track_detailed_time True --display_tracked_time True \
--python_path /home/lin/conda/envs/pytorch-py3.6/bin/python --mpi_path /home/lin/.openmpi/ --evaluate_avg True --summary_freq 100 \
--backend mpi --work_dir /mlodata1/choco_decentralized_code --remote_exec False --clean_python False --mpi_env LD_LIBRARY_PATH=/home/lin/.openmpi/lib:/usr/local/nvidia/lib:/usr/local/nvidia/lib64
apiVersion: extensions/v1beta1
kind: Deployment
metadata:
name: nfs-server
spec:
replicas: 1
selector:
matchLabels:
role: nfs-server
template:
metadata:
labels:
role: nfs-server
spec:
containers:
- name: nfs-server
image: gcr.io/google_containers/volume-nfs:0.8
ports:
- name: nfs
containerPort: 2049
- name: mountd
containerPort: 20048
- name: rpcbind
containerPort: 111
securityContext:
privileged: true
volumeMounts:
- mountPath: /exports
name: mypvc
volumes:
- name: mypvc
gcePersistentDisk:
pdName: gce-nfs-ssd-disk
fsType: ext4
apiVersion: v1
kind: Service
metadata:
name: nfs-server
spec:
# clusterIP: 10.3.240.20
ports:
- name: nfs
port: 2049
- name: mountd
port: 20048
- name: rpcbind
port: 111
selector:
role: nfs-server
# type: "LoadBalancer"
apiVersion: v1
kind: PersistentVolume
metadata:
name: nfs
spec:
capacity:
storage: 1024G
accessModes:
- ReadWriteMany
nfs:
# FIXED: Use internal DNS name
server: nfs-server.default.svc.cluster.local
path: "/"
---
kind: PersistentVolumeClaim
apiVersion: v1
metadata:
name: nfs
spec:
accessModes:
- ReadWriteMany
storageClassName: ""
resources:
requests:
storage: 1024G
10.4.6.3 slots=4
10.4.1.4 slots=4
10.4.3.5 slots=4
10.4.7.3 slots=4
10.4.4.4 slots=4
10.4.5.3 slots=4
10.4.0.13 slots=4
10.4.2.3 slots=4
\ No newline at end of file
# -*- coding: utf-8 -*-
import os
import re
import argparse
import subprocess
def str2bool(v):
if v.lower() in ("yes", "true", "t", "y", "1"):
return True
elif v.lower() in ("no", "false", "f", "n", "0"):
return False
else:
raise argparse.ArgumentTypeError("Boolean value expected.")
def get_args():
# feed them to the parser.
parser = argparse.ArgumentParser(description="Extract results for plots")
# add arguments.
parser.add_argument("--use_cuda", type=str2bool, default=True)
parser.add_argument("--num_cpus", type=str, default="4")
# parse args.
args = parser.parse_args()
return args
def write_txt(data, out_path, type="w"):
"""write the data to the txt file."""
with open(out_path, type) as f:
f.write(data)
def run_cmd(args):
return subprocess.check_output(args).decode("utf-8").strip().split("\n")
def get_existing_pod_names():
lines = run_cmd(["kubectl", "get", "pods"])[1:]
existing_pods_info = [re.split(r"\s+", line) for line in lines]
existing_pods_name = [
l[0]
for l in existing_pods_info
if ("lin-master" in l[0] or "lin-worker" in l[0]) and "Running" in l[2]
]
return existing_pods_name
def get_existing_pod_info(existing_pod_name):
def get(pattern, lines):
got_items = [re.findall(pattern, line, re.DOTALL) for line in lines]
return [item for item in got_items if len(item) != 0][0][0]
info = {}
raw = run_cmd(["kubectl", "describe", "pod", existing_pod_name])
info["name"] = get(r"^Name:\s+([\w-]+)", raw)
print(" processing {}".format(info["name"]))
info["namespace"] = get(r"^Namespace:\s+([\w-]+)", raw)
info["ip"] = get(r"^IP:\s+([\d.]+)", raw)
info["num_gpu"] = get(r"nvidia.com/gpu:\s+(\d)", raw)
return info
def get_existing_pods_info(existing_pod_names):
all_info = {}
for existing_pod_name in existing_pod_names:
all_info[existing_pod_name] = get_existing_pod_info(existing_pod_name)
return all_info
def save_hostfile(args, all_info):
ips = "\n".join(
[
"{} slots={}".format(
info["ip"], info["num_gpu"] if args.use_cuda else args.num_cpus
)
for key, info in all_info.items()
]
)
write_txt(ips, "hostfile")
return ips
def main(args):
print(" get pod names.")
existing_pod_names = get_existing_pod_names()
if len(existing_pod_names) == 0:
print(" does not exist pods.")
return
print(" get pod info.")
existing_pods_info = get_existing_pods_info(existing_pod_names)
print(" get IPs and save them to path.")
save_hostfile(args, existing_pods_info)
if __name__ == "__main__":
args = get_args()
main(args)
apiVersion: apps/v1beta2
kind: StatefulSet
metadata:
name: lin-worker
labels:
name: lin-worker
user: lin
spec:
selector:
matchLabels:
name: lin-worker
serviceName: lin-worker
replicas: 8
template:
metadata:
labels:
name: lin-worker
user: lin
spec:
# nodeSelector:
# cloud.google.com/gke-preemptible: "true"
containers:
- name: lin-worker
image: itamtao/pytorch-mpi:cuda10
imagePullPolicy: Always
stdin: true
tty: true
command:
- "/bin/bash"
- "-c"
- "--"
args :
- '/entrypoint.sh; /usr/local/bin/entrypoint.sh; sleep infinity'
ports:
- containerPort: 8888
name: notebook
- containerPort: 6006
name: tensorboard
- containerPort: 22
name: ssh
env:
- name: ROLE
value: "worker"
resources:
requests:
nvidia.com/gpu: 4
limits:
nvidia.com/gpu: 4
volumeMounts:
- mountPath: /mlodata1
name: my-pvc-nfs
- mountPath: /dev/shm
name: dshm
volumes:
- name: my-pvc-nfs
persistentVolumeClaim:
claimName: nfs
- name: dshm
emptyDir:
medium: Memory
10.233.112.163 slots=2
10.233.107.37 slots=2
10.233.98.241 slots=2
10.233.75.198 slots=2
\ No newline at end of file
localhost slots=32
\ No newline at end of file
......@@ -5,11 +5,10 @@ import datetime
import torch
import torch.distributed as dist
import torch.nn as nn
import torch.multiprocessing as mp
from parameters import get_args
from pcode.models.data_parallel_wrapper import AllReduceDataParallel
import pcode.create_dataset as create_dataset
import pcode.create_optimizer as create_optimizer
import pcode.create_metrics as create_metrics
......@@ -21,6 +20,7 @@ import pcode.utils.checkpoint as checkpoint
import pcode.utils.op_paths as op_paths
import pcode.utils.stat_tracker as stat_tracker
import pcode.utils.logging as logging
from pcode.utils.timer import Timer
def init_distributed_world(conf, backend):
......@@ -55,25 +55,37 @@ def main(conf):
# init the config.
init_config(conf)
# define the timer for different operations.
# if we choose the `train_fast` mode, then we will not track the time.
conf.timer = Timer(
verbosity_level=1 if conf.track_time and not conf.train_fast else 0,
log_fn=conf.logger.log_metric,
on_cuda=conf.on_cuda,
)
# create dataset.
data_loader = create_dataset.define_dataset(conf, force_shuffle=True)
# create model
model = create_model.define_model(conf, data_loader=data_loader)
# define the lr scheduler.
scheduler = create_scheduler.Scheduler(conf)
# define the optimizer.
optimizer = create_optimizer.define_optimizer(conf, model)
# define the lr scheduler.
scheduler = create_scheduler.Scheduler(conf)
# add model with data-parallel wrapper.
if conf.graph.on_cuda:
if conf.n_sub_process > 1:
model = torch.nn.DataParallel(model, device_ids=conf.graph.device)
# (optional) reload checkpoint
checkpoint.maybe_resume_from_checkpoint(conf, model, optimizer)
try:
checkpoint.maybe_resume_from_checkpoint(conf, model, optimizer, scheduler)
except RuntimeError as e:
conf.logger.log(f"Resume Error: {e}")
conf.resumed = False
# train amd evaluate model.
if "rnn_lm" in conf.arch:
......@@ -144,7 +156,8 @@ def init_config(conf):
graph_topology=conf.graph_topology,
world=conf.world,
n_mpi_process=conf.n_mpi_process, # the # of total main processes.
n_sub_process=conf.n_sub_process, # the # of subprocess for each main process.
# the # of subprocess for each main process.
n_sub_process=conf.n_sub_process,
comm_device=conf.comm_device,
on_cuda=conf.on_cuda,
rank=cur_rank,
......@@ -177,4 +190,10 @@ def init_config(conf):
if __name__ == "__main__":
conf = get_args()
if conf.optimizer == "parallel_choco":
mp.set_start_method("forkserver", force=True)
# mp.set_start_method("spawn", force=True)
mp.set_sharing_strategy("file_system")
main(conf)
......@@ -81,7 +81,6 @@ def get_args():
"--lr_scaleup_factor",
type=str,
default="graph",
choices=["graph", "world"],
help="scale by the graph connection, or the world size",
)
parser.add_argument("--lr_warmup", type=str2bool, default=False)
......@@ -122,13 +121,19 @@ def get_args():
parser.add_argument("--compress_warmup_epochs", default=0, type=int)
parser.add_argument("--quantize_level", default=None, type=int)
parser.add_argument("--is_biased", default=False, type=str2bool)
parser.add_argument("--majority_vote", default=False, type=str2bool)
parser.add_argument("--choco_consenus_stepsize", default=0.9, type=float)
parser.add_argument("--consensus_stepsize", default=0.9, type=float)
parser.add_argument("--evaluate_consensus", default=False, type=str2bool)
parser.add_argument("--mask_momentum", default=False, type=str2bool)
parser.add_argument("--clip_grad", default=False, type=str2bool)
parser.add_argument("--clip_grad_val", default=None, type=float)
parser.add_argument("--local_step", default=1, type=int)
parser.add_argument("--turn_on_local_step_from", default=0, type=int)
parser.add_argument("--local_adam_memory_treatment", default=None, type=str)
# momentum scheme
parser.add_argument("--momentum_factor", default=0.9, type=float)
parser.add_argument("--use_nesterov", default=False, type=str2bool)
......
......@@ -16,9 +16,11 @@ def load_data_batch(conf, _input, _target):
def define_dataset(conf, force_shuffle=False):
if "rnn_lm" in conf.arch:
return define_nlp_dataset(conf, force_shuffle)
dataset = define_nlp_dataset(conf, force_shuffle)
else:
return define_cv_dataset(conf, force_shuffle)
dataset = define_cv_dataset(conf, force_shuffle)
print("Defined dataset.")
return dataset
"""define loaders for different datasets."""
......@@ -151,12 +153,13 @@ def _define_cv_dataset(conf, partition_type, dataset_type, force_shuffle=False):
(
"Data stat: we have {} samples for {}, "
+ "load {} data for process (rank {}). "
+ "The number of batches is {}."
+ "The batch size is {}, number of batches is {}."
).format(
len(dataset),
dataset_type,
len(data_to_load),
conf.graph.rank,
batch_size,
len(data_loader),
)
)
......
......@@ -24,6 +24,8 @@ def define_cv_model(conf):
model = models.__dict__["densenet"](conf)
elif "vgg" in conf.arch:
model = models.__dict__["vgg"](conf)
elif "lenet" in conf.arch:
model = models.__dict__["lenet"](conf)
else:
model = models.__dict__[conf.arch](conf)
......
......@@ -3,9 +3,11 @@ from pcode.optim.sgd import SGD
from pcode.optim.dgc import DGC
from pcode.optim.parallel_choco import ParallelCHOCO
from pcode.optim.parallel_choco_v import ParallelCHOCO_V
from pcode.optim.ef_sign_sgd import EF_SignSGD
from pcode.optim.dcd_psgd import DCD_PSGD
from pcode.optim.ecd_psgd import ECD_PSGD
from pcode.optim.deep_squeeze import DeepSqueeze
def define_optimizer(conf, model):
......@@ -32,12 +34,18 @@ def define_optimizer(conf, model):
optim_class = ECD_PSGD
elif conf.optimizer == "parallel_choco":
optim_class = ParallelCHOCO
elif conf.optimizer == "parallel_choco_v":
optim_class = ParallelCHOCO_V
elif conf.optimizer == "ef_sign_sgd":
optim_class = EF_SignSGD
elif conf.optimizer == "deep_squeeze":
optim_class = DeepSqueeze
else:
raise NotImplementedError
return optim_class(
params,
lr=conf.learning_rate,