Commit ae56ca02 authored by Yin, Junqi's avatar Yin, Junqi

add graph shuffle and hybrid feature

parent 9c966420
......@@ -19,6 +19,10 @@ EXPERIMENT=resnet50
TOPOLOGY=complete
# ddp only supports complete topology
DDP=True
# shuffle graph
SHUFFLE_GRAPH=True
# hybrid mode
HYBRID=False
# stage data to nvme
if [ "$EXPERIMENT" == "resnet50" ]; then
......@@ -41,6 +45,8 @@ sed -i "s/TODO_GPURANKS/$WORLD/" run.sh
sed -i "s/TODO_TOPOLOGY/$TOPOLOGY/" run.sh
sed -i "s/TODO_TIMESTAMP/$NOW/" run.sh
sed -i "s/TODO_DDP/$DDP/" run.sh
sed -i "s/TODO_DDP/$SHUFFLE_GRAPH/" run.sh
sed -i "s/TODO_DDP/$HYBRID/" run.sh
if [ "$single_rank_per_node" = true ]; then
jsrun -n${NNODES} -a1 -g6 -c42 -r1 --smpiargs "-gpu" --bind=rs --launch_distribution=packed ./run.sh
......
......@@ -21,25 +21,9 @@ import pcode.utils.op_paths as op_paths
import pcode.utils.stat_tracker as stat_tracker
import pcode.utils.logging as logging
from pcode.utils.timer import Timer
import platform
from torch.nn.parallel import DistributedDataParallel as DDP
def print_neighbors(conf, save=True):
neighbors_info = conf.graph.get_neighborhood()
neighbor_ranks = [
neighbor_rank
for neighbor_rank in neighbors_info.keys()
if neighbor_rank != conf.graph.rank
]
print("NEIGHBOR_INFO: rank %d is on node %s with neighbors: %s"
% (conf.graph.rank, platform.node(), tuple(neighbor_ranks)))
if save:
conf.logger.log(f"rank: {conf.graph.rank}")
conf.logger.log(f"node: {platform.node()}")
conf.logger.log(f"neighbors: {neighbor_ranks}")
conf.logger.log(f"matrix: {conf.graph.matrix}")
from pcode.utils.topology import print_neighbors
def setup_ddp(backend):
""""Initialize DDP"""
import subprocess
......
......@@ -229,6 +229,8 @@ def get_args():
parser.add_argument("--local_rank", default=None, type=str)
parser.add_argument("--clean_python", default=False, type=str2bool)
parser.add_argument("--ddp", type=str2bool, default=False)
parser.add_argument("--shuffle_graph_per_epoch", type=str2bool, default=False)
parser.add_argument("--hybrid", type=str2bool, default=False)
# parse conf.
conf = parser.parse_args()
......
......@@ -16,6 +16,7 @@ from pcode.utils.logging import (
from pcode.utils.stat_tracker import RuntimeTracker
import pcode.utils.error_handler as error_handler
import pcode.utils.auxiliary as auxiliary
from pcode.utils.topology import print_neighbors
# sys.excepthook = error_handler.global_except_hook
......@@ -152,7 +153,18 @@ def train_and_validate(
del data_loader
gc.collect()
data_loader = define_dataset(conf)
# shuffle graph.
if conf.shuffle_graph_per_epoch:
print("\nReshuffle the graph.")
np.random.seed(int(scheduler.epoch_))
conf.graph.shuffle_graph()
print_neighbors(conf)
# hybrid mode
if conf.hybrid and not conf.is_centralized:
print("\nHyrbid mode on.")
optimizer.world_aggregator.agg_model(model, op="avg")
def inference(model, criterion, metrics, _input, _target, tracker=None):
"""Inference on the given model and get loss and accuracy."""
......
......@@ -7,7 +7,23 @@ import networkx
import torch
import torch.distributed as dist
import platform
def print_neighbors(conf, save=True):
neighbors_info = conf.graph.get_neighborhood()
neighbor_ranks = [
neighbor_rank
for neighbor_rank in neighbors_info.keys()
if neighbor_rank != conf.graph.rank
]
print("NEIGHBOR_INFO: rank %d is on node %s with neighbors: %s"
% (conf.graph.rank, platform.node(), tuple(neighbor_ranks)))
if save:
conf.logger.log(f"rank: {conf.graph.rank}")
conf.logger.log(f"node: {platform.node()}")
conf.logger.log(f"neighbors: {neighbor_ranks}")
conf.logger.log(f"matrix: {conf.graph.matrix}")
class UndirectedGraph(ABC):
@property
......@@ -298,6 +314,20 @@ class RingGraph(PhysicalLayout):
vals = row.data
return {int(c): v for c, v in zip(cols, vals)}
def shuffle_graph(self):
size = self._n_mpi_process
permutation = np.random.permutation(size)
m = np.zeros((size, size))
w = 1.0/3
for i in range(size):
cur = permutation[i]
m[cur][cur] = w
prev = permutation[(i-1)%size]
nxt = permutation[(i+1)%size]
m[cur][prev] = m[prev][cur] = w
m[cur][nxt] = m[nxt][cur] = w
self._mixing_matrix = sp.sparse.csr_matrix(m)
class TorusGraph(PhysicalLayout):
def __init__(self, n_mpi_process, n_sub_process, world, comm_device, on_cuda, rank):
......@@ -354,7 +384,6 @@ class TorusGraph(PhysicalLayout):
row = self._mixing_matrix[self._rank]
return {c: v for c, v in zip(range(len(row)), row) if v != 0}
class ExpanderGraph(PhysicalLayout):
def __init__(self, n_mpi_process, n_sub_process, world, comm_device, on_cuda, rank):
super(ExpanderGraph, self).__init__(
......
......@@ -55,4 +55,6 @@ python -u main.py \
--world TODO_GPURANKS \
--on_cuda True \
--comm_device cuda \
--ddp TODO_DDP
--ddp TODO_DDP \
--shuffle_graph_per_epoch TODO_SHUFFLE_GRAPH \
--hybrid TODO_HYBRID
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment