Loading megatron/arguments.py +6 −2 Original line number Diff line number Diff line Loading @@ -19,6 +19,8 @@ import argparse import os import torch from mpi4py import MPI import subprocess def parse_args(extra_args_provider=None, defaults={}, ignore_unknown_args=False): Loading Loading @@ -54,8 +56,10 @@ def parse_args(extra_args_provider=None, defaults={}, args = parser.parse_args() # Distributed args. args.rank = int(os.getenv('RANK', '0')) args.world_size = int(os.getenv("WORLD_SIZE", '1')) comm = MPI.COMM_WORLD args.rank = comm.Get_rank() args.world_size = comm.Get_size() # Tensor model parallel size. args.tensor_model_parallel_size = min( args.tensor_model_parallel_size, args.world_size) Loading megatron/data/dataset_utils.py +9 −5 Original line number Diff line number Diff line Loading @@ -442,11 +442,15 @@ def build_train_valid_test_datasets(data_prefix, data_impl, splits_string, valid_datasets = [] test_datasets = [] for i in range(len(prefixes)): train_ds, valid_ds, test_ds = _build_train_valid_test_datasets( prefixes[i], data_impl, splits_string, train_ds, valid_ds, test_ds = _build_train_valid_test_datasets(prefixes[i], data_impl, splits_string, datasets_train_valid_test_num_samples[i], max_seq_length, masked_lm_prob, short_seq_prob, seed, skip_warmup, binary_head, dataset_type=dataset_type) max_seq_length, masked_lm_prob, short_seq_prob, seed, skip_warmup, binary_head, max_seq_length_dec, dataset_type=dataset_type) if train_ds: train_datasets.append(train_ds) if valid_ds: Loading megatron/initialize.py +23 −2 Original line number Diff line number Diff line Loading @@ -32,6 +32,9 @@ from megatron.global_vars import set_global_variables from megatron.mpu import (set_tensor_model_parallel_rank, set_tensor_model_parallel_world_size) import os from mpi4py import MPI import subprocess def initialize_megatron(extra_args_provider=None, args_defaults={}, ignore_unknown_args=False, allow_no_cuda=False): Loading Loading @@ -177,10 +180,28 @@ def _initialize_distributed(): args.local_rank = device torch.cuda.set_device(device) # Call the init process comm = MPI.COMM_WORLD rank = comm.Get_rank() world_size = comm.Get_size() master_addr = None if rank == 0: hostname_cmd = ["hostname -I"] result = subprocess.check_output(hostname_cmd, shell=True) master_addr = result.decode('utf-8').split()[0] master_addr = comm.bcast(master_addr, root=0) proc_name = MPI.Get_processor_name() all_procs = comm.allgather(proc_name) local_rank = sum([i == proc_name for i in all_procs[:rank]]) os.environ['RANK'] = str(rank) os.environ['WORLD_SIZE'] = str(world_size) os.environ['LOCAL_RANK'] = str(local_rank) os.environ['MASTER_ADDR'] = master_addr os.environ['MASTER_PORT'] = str(29500) init_method=None torch.distributed.init_process_group( backend=args.distributed_backend, world_size=args.world_size, rank=args.rank, timeout=timedelta(minutes=10)) timeout=timedelta(minutes=10), init_method=init_method) # Set the tensor model-parallel, pipeline model-parallel, and # data-parallel communicators. Loading tools/merge_mp_partitions.py +26 −0 Original line number Diff line number Diff line Loading @@ -32,6 +32,9 @@ from megatron.checkpointing import get_checkpoint_tracker_filename from megatron.global_vars import set_global_variables, get_args from megatron.global_vars import rebuild_tokenizer from mpi4py import MPI import subprocess from datetime import timedelta def split_into_partitions(tensor, num_partitions, partition_dim, stride): Loading Loading @@ -194,6 +197,29 @@ def main(): # so trick it into thinking we are plenty of processes os.environ["WORLD_SIZE"] = f'{2**31}' comm = MPI.COMM_WORLD rank = comm.Get_rank() world_size = comm.Get_size() master_addr = None if rank == 0: hostname_cmd = ["hostname -I"] result = subprocess.check_output(hostname_cmd, shell=True) master_addr = result.decode('utf-8').split()[0] master_addr = comm.bcast(master_addr, root=0) proc_name = MPI.Get_processor_name() all_procs = comm.allgather(proc_name) local_rank = sum([i == proc_name for i in all_procs[:rank]]) os.environ['RANK'] = str(rank) os.environ['WORLD_SIZE'] = str(world_size) os.environ['LOCAL_RANK'] = str(local_rank) os.environ['MASTER_ADDR'] = master_addr os.environ['MASTER_PORT'] = str(29500) init_method=None torch.distributed.init_process_group( backend="nccl", timeout=timedelta(minutes=10), init_method=init_method) # Args set_global_variables(extra_args_provider=get_mp_merge_args, args_defaults = {'use_cpu_initialization': True, Loading Loading
megatron/arguments.py +6 −2 Original line number Diff line number Diff line Loading @@ -19,6 +19,8 @@ import argparse import os import torch from mpi4py import MPI import subprocess def parse_args(extra_args_provider=None, defaults={}, ignore_unknown_args=False): Loading Loading @@ -54,8 +56,10 @@ def parse_args(extra_args_provider=None, defaults={}, args = parser.parse_args() # Distributed args. args.rank = int(os.getenv('RANK', '0')) args.world_size = int(os.getenv("WORLD_SIZE", '1')) comm = MPI.COMM_WORLD args.rank = comm.Get_rank() args.world_size = comm.Get_size() # Tensor model parallel size. args.tensor_model_parallel_size = min( args.tensor_model_parallel_size, args.world_size) Loading
megatron/data/dataset_utils.py +9 −5 Original line number Diff line number Diff line Loading @@ -442,11 +442,15 @@ def build_train_valid_test_datasets(data_prefix, data_impl, splits_string, valid_datasets = [] test_datasets = [] for i in range(len(prefixes)): train_ds, valid_ds, test_ds = _build_train_valid_test_datasets( prefixes[i], data_impl, splits_string, train_ds, valid_ds, test_ds = _build_train_valid_test_datasets(prefixes[i], data_impl, splits_string, datasets_train_valid_test_num_samples[i], max_seq_length, masked_lm_prob, short_seq_prob, seed, skip_warmup, binary_head, dataset_type=dataset_type) max_seq_length, masked_lm_prob, short_seq_prob, seed, skip_warmup, binary_head, max_seq_length_dec, dataset_type=dataset_type) if train_ds: train_datasets.append(train_ds) if valid_ds: Loading
megatron/initialize.py +23 −2 Original line number Diff line number Diff line Loading @@ -32,6 +32,9 @@ from megatron.global_vars import set_global_variables from megatron.mpu import (set_tensor_model_parallel_rank, set_tensor_model_parallel_world_size) import os from mpi4py import MPI import subprocess def initialize_megatron(extra_args_provider=None, args_defaults={}, ignore_unknown_args=False, allow_no_cuda=False): Loading Loading @@ -177,10 +180,28 @@ def _initialize_distributed(): args.local_rank = device torch.cuda.set_device(device) # Call the init process comm = MPI.COMM_WORLD rank = comm.Get_rank() world_size = comm.Get_size() master_addr = None if rank == 0: hostname_cmd = ["hostname -I"] result = subprocess.check_output(hostname_cmd, shell=True) master_addr = result.decode('utf-8').split()[0] master_addr = comm.bcast(master_addr, root=0) proc_name = MPI.Get_processor_name() all_procs = comm.allgather(proc_name) local_rank = sum([i == proc_name for i in all_procs[:rank]]) os.environ['RANK'] = str(rank) os.environ['WORLD_SIZE'] = str(world_size) os.environ['LOCAL_RANK'] = str(local_rank) os.environ['MASTER_ADDR'] = master_addr os.environ['MASTER_PORT'] = str(29500) init_method=None torch.distributed.init_process_group( backend=args.distributed_backend, world_size=args.world_size, rank=args.rank, timeout=timedelta(minutes=10)) timeout=timedelta(minutes=10), init_method=init_method) # Set the tensor model-parallel, pipeline model-parallel, and # data-parallel communicators. Loading
tools/merge_mp_partitions.py +26 −0 Original line number Diff line number Diff line Loading @@ -32,6 +32,9 @@ from megatron.checkpointing import get_checkpoint_tracker_filename from megatron.global_vars import set_global_variables, get_args from megatron.global_vars import rebuild_tokenizer from mpi4py import MPI import subprocess from datetime import timedelta def split_into_partitions(tensor, num_partitions, partition_dim, stride): Loading Loading @@ -194,6 +197,29 @@ def main(): # so trick it into thinking we are plenty of processes os.environ["WORLD_SIZE"] = f'{2**31}' comm = MPI.COMM_WORLD rank = comm.Get_rank() world_size = comm.Get_size() master_addr = None if rank == 0: hostname_cmd = ["hostname -I"] result = subprocess.check_output(hostname_cmd, shell=True) master_addr = result.decode('utf-8').split()[0] master_addr = comm.bcast(master_addr, root=0) proc_name = MPI.Get_processor_name() all_procs = comm.allgather(proc_name) local_rank = sum([i == proc_name for i in all_procs[:rank]]) os.environ['RANK'] = str(rank) os.environ['WORLD_SIZE'] = str(world_size) os.environ['LOCAL_RANK'] = str(local_rank) os.environ['MASTER_ADDR'] = master_addr os.environ['MASTER_PORT'] = str(29500) init_method=None torch.distributed.init_process_group( backend="nccl", timeout=timedelta(minutes=10), init_method=init_method) # Args set_global_variables(extra_args_provider=get_mp_merge_args, args_defaults = {'use_cpu_initialization': True, Loading