Loading gpt2_data_loader.py→megatron/data/gpt2_dataset.py +3 −78 Original line number Diff line number Diff line Loading @@ -13,71 +13,15 @@ # See the License for the specific language governing permissions and # limitations under the License. """GPT2 dataset.""" import json import os import numpy as np import torch from torch.multiprocessing import Lock from torch.utils.data import Dataset from megatron import mpu from megatron.data_utils.samplers import DistributedBatchSampler from megatron.data_utils.tokenization_gpt2 import GPT2Tokenizer def make_gpt2_dataloaders(args): # Input parameters. input_data_sizes_file = args.input_data_sizes_file seq_length = args.seq_length initial_seed = args.seed # Data parallel arguments. world_size = mpu.get_data_parallel_world_size() rank = mpu.get_data_parallel_rank() global_batch_size = args.batch_size * world_size num_workers = args.num_workers def make_data_loader_(data_path): # Build the dataset. dataset = GPT2Dataset(data_path, input_data_sizes_file, seq_length, initial_seed) # Use a simple sampler with distributed batch sampler. sampler = torch.utils.data.SequentialSampler(dataset) batch_sampler = DistributedBatchSampler(sampler=sampler, batch_size=global_batch_size, drop_last=True, rank=rank, world_size=world_size) # Torch dataloader. return torch.utils.data.DataLoader(dataset, batch_sampler=batch_sampler, num_workers=num_workers, pin_memory=True) train = make_data_loader_(args.train_data) valid = make_data_loader_(args.valid_data) test = make_data_loader_(args.test_data) args.do_train = False args.do_valid = False args.do_test = False if train is not None: args.do_train = True if valid is not None: args.do_valid = True if test is not None: args.do_test = True # Tokenizer. tokenizer = GPT2Tokenizer.from_pretrained('gpt2', cache_dir=args.cache_dir) eod_token = tokenizer.encoder['<|endoftext|>'] num_tokens = eod_token + 1 return (train, valid, test), num_tokens, eod_token class GPT2Dataset(Dataset): Loading @@ -89,8 +33,6 @@ class GPT2Dataset(Dataset): self.seq_length = seq_length self.initial_seed = initial_seed self.max_epochs = max_epochs # Lock for building the dataset. self.lock = Lock() # Shard stuff. # Dictionary from shard nameto its size (number of element). Loading Loading @@ -120,13 +62,11 @@ class GPT2Dataset(Dataset): # data index in the shard. data_idx = idx - self.shards_start_index[shard_index] # Load the shard if it is not in memory. #self.lock.acquire() if self.shards_data[shard_index] is None: print('global rank {} is building data for shard index {} ...'. format(torch.distributed.get_rank(), shard_index)) self.build_dataset_(shard_index) #assert self.shards_data[shard_index] is not None #self.lock.release() # Start index. start_index = self.shards_sample_index[shard_index][data_idx] # Add one for label shift. Loading Loading @@ -194,18 +134,3 @@ class GPT2Dataset(Dataset): size = self.shard_size_dict[shard] self.shards_start_index[i] = self.shards_start_index[i-1] + \ size // self.seq_length ''' if __name__ == '__main__': print('gpt2 data loader ...') path = '/raid/mshoeybi/data/gpt2/adlr/reddit_all_ftfy_lg200/npys' dataset = GPT2Dataset(path, 'sizes.txt', 1024, 1234, 100) print('dataset contains {} samples'.format(dataset.data_length)) for i in range(len(dataset)): if i % 512000 == 0: print(i) data = dataset[i] ''' megatron/tokenizer/tokenizer.py +1 −2 Original line number Diff line number Diff line Loading @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. """Megatron tokenizer.""" """Megatron tokenizers.""" from abc import ABC from abc import abstractmethod Loading Loading @@ -100,7 +100,6 @@ class AbstractTokenizer(ABC): 'tokenizer'.format(self.name)) class _BertWordPieceTokenizer(AbstractTokenizer): """Original BERT wordpiece tokenizer.""" Loading pretrain_gpt2.py +68 −27 Original line number Diff line number Diff line Loading @@ -15,18 +15,22 @@ """Pretrain GPT2""" import os import torch from gpt2_data_loader import make_gpt2_dataloaders from megatron import get_args from megatron import get_timers from megatron import get_tokenizer from megatron import mpu from megatron import print_rank_0 from megatron.data.gpt2_dataset import GPT2Dataset from megatron.data_utils.samplers import DistributedBatchSampler from megatron.model import GPT2Model from megatron.training import pretrain from megatron.utils import get_ltor_masks_and_position_ids from megatron.utils import reduce_losses import os def model_provider(): """Build the model.""" Loading Loading @@ -87,7 +91,6 @@ def get_batch(data_iterator): def forward_step(data_iterator, model): """Forward step.""" args = get_args() timers = get_timers() # Get the batch. Loading @@ -109,6 +112,56 @@ def forward_step(data_iterator, model): return loss, {'lm loss': reduced_loss[0]} def make_gpt2_dataloaders(): """Build gpt2 dataloders.""" args = get_args() # Input parameters. input_data_sizes_file = args.input_data_sizes_file seq_length = args.seq_length initial_seed = args.seed # Data parallel arguments. world_size = mpu.get_data_parallel_world_size() rank = mpu.get_data_parallel_rank() global_batch_size = args.batch_size * world_size num_workers = args.num_workers def make_data_loader_(data_path): # Build the dataset. dataset = GPT2Dataset(data_path, input_data_sizes_file, seq_length, initial_seed) # Use a simple sampler with distributed batch sampler. sampler = torch.utils.data.SequentialSampler(dataset) batch_sampler = DistributedBatchSampler(sampler=sampler, batch_size=global_batch_size, drop_last=True, rank=rank, world_size=world_size) # Torch dataloader. return torch.utils.data.DataLoader(dataset, batch_sampler=batch_sampler, num_workers=num_workers, pin_memory=True) train = make_data_loader_(os.path.join(args.data_path, 'train')) valid = make_data_loader_(os.path.join(args.data_path, 'valid')) test = make_data_loader_(os.path.join(args.data_path, 'test')) args.do_train = False args.do_valid = False args.do_test = False if train is not None: args.do_train = True if valid is not None: args.do_valid = True if test is not None: args.do_test = True return (train, valid, test) def get_train_val_test_data(): """Load the data on rank zero and boradcast number of tokens to all GPUS.""" args = get_args() Loading @@ -118,35 +171,23 @@ def get_train_val_test_data(): # Data loader only on rank 0 of each model parallel group. if mpu.get_model_parallel_rank() == 0: args.cache_dir = 'cache' args.train_data = os.path.join(args.data_path, 'train') args.valid_data = os.path.join(args.data_path, 'valid') args.test_data = os.path.join(args.data_path, 'test') (train_data, val_data, test_data), num_tokens, \ eod_token = make_gpt2_dataloaders(args) # pad. from megatron.tokenizer.tokenizer import _vocab_size_with_padding num_tokens = _vocab_size_with_padding(num_tokens, args) print_rank_0('> found end-of-document token: {}'.format(eod_token)) token_counts = torch.cuda.LongTensor([num_tokens, eod_token, int(args.do_train), (train_data, val_data, test_data) = make_gpt2_dataloaders() flags = torch.cuda.LongTensor([int(args.do_train), int(args.do_valid), int(args.do_test)]) else: token_counts = torch.cuda.LongTensor([0, 0, 0, 0, 0]) flags = torch.cuda.LongTensor([0, 0, 0]) # Broadcast num tokens. torch.distributed.broadcast(token_counts, torch.distributed.broadcast(flags, mpu.get_model_parallel_src_rank(), group=mpu.get_model_parallel_group()) num_tokens = token_counts[0].item() eod_token = token_counts[1].item() args.do_train = token_counts[2].item() args.do_valid = token_counts[3].item() args.do_test = token_counts[4].item() args.do_train = flags[0].item() args.do_valid = flags[1].item() args.do_test = flags[2].item() args.eod_token = eod_token tokenizer = get_tokenizer() args.eod_token = tokenizer.eod_id return train_data, val_data, test_data Loading Loading
gpt2_data_loader.py→megatron/data/gpt2_dataset.py +3 −78 Original line number Diff line number Diff line Loading @@ -13,71 +13,15 @@ # See the License for the specific language governing permissions and # limitations under the License. """GPT2 dataset.""" import json import os import numpy as np import torch from torch.multiprocessing import Lock from torch.utils.data import Dataset from megatron import mpu from megatron.data_utils.samplers import DistributedBatchSampler from megatron.data_utils.tokenization_gpt2 import GPT2Tokenizer def make_gpt2_dataloaders(args): # Input parameters. input_data_sizes_file = args.input_data_sizes_file seq_length = args.seq_length initial_seed = args.seed # Data parallel arguments. world_size = mpu.get_data_parallel_world_size() rank = mpu.get_data_parallel_rank() global_batch_size = args.batch_size * world_size num_workers = args.num_workers def make_data_loader_(data_path): # Build the dataset. dataset = GPT2Dataset(data_path, input_data_sizes_file, seq_length, initial_seed) # Use a simple sampler with distributed batch sampler. sampler = torch.utils.data.SequentialSampler(dataset) batch_sampler = DistributedBatchSampler(sampler=sampler, batch_size=global_batch_size, drop_last=True, rank=rank, world_size=world_size) # Torch dataloader. return torch.utils.data.DataLoader(dataset, batch_sampler=batch_sampler, num_workers=num_workers, pin_memory=True) train = make_data_loader_(args.train_data) valid = make_data_loader_(args.valid_data) test = make_data_loader_(args.test_data) args.do_train = False args.do_valid = False args.do_test = False if train is not None: args.do_train = True if valid is not None: args.do_valid = True if test is not None: args.do_test = True # Tokenizer. tokenizer = GPT2Tokenizer.from_pretrained('gpt2', cache_dir=args.cache_dir) eod_token = tokenizer.encoder['<|endoftext|>'] num_tokens = eod_token + 1 return (train, valid, test), num_tokens, eod_token class GPT2Dataset(Dataset): Loading @@ -89,8 +33,6 @@ class GPT2Dataset(Dataset): self.seq_length = seq_length self.initial_seed = initial_seed self.max_epochs = max_epochs # Lock for building the dataset. self.lock = Lock() # Shard stuff. # Dictionary from shard nameto its size (number of element). Loading Loading @@ -120,13 +62,11 @@ class GPT2Dataset(Dataset): # data index in the shard. data_idx = idx - self.shards_start_index[shard_index] # Load the shard if it is not in memory. #self.lock.acquire() if self.shards_data[shard_index] is None: print('global rank {} is building data for shard index {} ...'. format(torch.distributed.get_rank(), shard_index)) self.build_dataset_(shard_index) #assert self.shards_data[shard_index] is not None #self.lock.release() # Start index. start_index = self.shards_sample_index[shard_index][data_idx] # Add one for label shift. Loading Loading @@ -194,18 +134,3 @@ class GPT2Dataset(Dataset): size = self.shard_size_dict[shard] self.shards_start_index[i] = self.shards_start_index[i-1] + \ size // self.seq_length ''' if __name__ == '__main__': print('gpt2 data loader ...') path = '/raid/mshoeybi/data/gpt2/adlr/reddit_all_ftfy_lg200/npys' dataset = GPT2Dataset(path, 'sizes.txt', 1024, 1234, 100) print('dataset contains {} samples'.format(dataset.data_length)) for i in range(len(dataset)): if i % 512000 == 0: print(i) data = dataset[i] '''
megatron/tokenizer/tokenizer.py +1 −2 Original line number Diff line number Diff line Loading @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. """Megatron tokenizer.""" """Megatron tokenizers.""" from abc import ABC from abc import abstractmethod Loading Loading @@ -100,7 +100,6 @@ class AbstractTokenizer(ABC): 'tokenizer'.format(self.name)) class _BertWordPieceTokenizer(AbstractTokenizer): """Original BERT wordpiece tokenizer.""" Loading
pretrain_gpt2.py +68 −27 Original line number Diff line number Diff line Loading @@ -15,18 +15,22 @@ """Pretrain GPT2""" import os import torch from gpt2_data_loader import make_gpt2_dataloaders from megatron import get_args from megatron import get_timers from megatron import get_tokenizer from megatron import mpu from megatron import print_rank_0 from megatron.data.gpt2_dataset import GPT2Dataset from megatron.data_utils.samplers import DistributedBatchSampler from megatron.model import GPT2Model from megatron.training import pretrain from megatron.utils import get_ltor_masks_and_position_ids from megatron.utils import reduce_losses import os def model_provider(): """Build the model.""" Loading Loading @@ -87,7 +91,6 @@ def get_batch(data_iterator): def forward_step(data_iterator, model): """Forward step.""" args = get_args() timers = get_timers() # Get the batch. Loading @@ -109,6 +112,56 @@ def forward_step(data_iterator, model): return loss, {'lm loss': reduced_loss[0]} def make_gpt2_dataloaders(): """Build gpt2 dataloders.""" args = get_args() # Input parameters. input_data_sizes_file = args.input_data_sizes_file seq_length = args.seq_length initial_seed = args.seed # Data parallel arguments. world_size = mpu.get_data_parallel_world_size() rank = mpu.get_data_parallel_rank() global_batch_size = args.batch_size * world_size num_workers = args.num_workers def make_data_loader_(data_path): # Build the dataset. dataset = GPT2Dataset(data_path, input_data_sizes_file, seq_length, initial_seed) # Use a simple sampler with distributed batch sampler. sampler = torch.utils.data.SequentialSampler(dataset) batch_sampler = DistributedBatchSampler(sampler=sampler, batch_size=global_batch_size, drop_last=True, rank=rank, world_size=world_size) # Torch dataloader. return torch.utils.data.DataLoader(dataset, batch_sampler=batch_sampler, num_workers=num_workers, pin_memory=True) train = make_data_loader_(os.path.join(args.data_path, 'train')) valid = make_data_loader_(os.path.join(args.data_path, 'valid')) test = make_data_loader_(os.path.join(args.data_path, 'test')) args.do_train = False args.do_valid = False args.do_test = False if train is not None: args.do_train = True if valid is not None: args.do_valid = True if test is not None: args.do_test = True return (train, valid, test) def get_train_val_test_data(): """Load the data on rank zero and boradcast number of tokens to all GPUS.""" args = get_args() Loading @@ -118,35 +171,23 @@ def get_train_val_test_data(): # Data loader only on rank 0 of each model parallel group. if mpu.get_model_parallel_rank() == 0: args.cache_dir = 'cache' args.train_data = os.path.join(args.data_path, 'train') args.valid_data = os.path.join(args.data_path, 'valid') args.test_data = os.path.join(args.data_path, 'test') (train_data, val_data, test_data), num_tokens, \ eod_token = make_gpt2_dataloaders(args) # pad. from megatron.tokenizer.tokenizer import _vocab_size_with_padding num_tokens = _vocab_size_with_padding(num_tokens, args) print_rank_0('> found end-of-document token: {}'.format(eod_token)) token_counts = torch.cuda.LongTensor([num_tokens, eod_token, int(args.do_train), (train_data, val_data, test_data) = make_gpt2_dataloaders() flags = torch.cuda.LongTensor([int(args.do_train), int(args.do_valid), int(args.do_test)]) else: token_counts = torch.cuda.LongTensor([0, 0, 0, 0, 0]) flags = torch.cuda.LongTensor([0, 0, 0]) # Broadcast num tokens. torch.distributed.broadcast(token_counts, torch.distributed.broadcast(flags, mpu.get_model_parallel_src_rank(), group=mpu.get_model_parallel_group()) num_tokens = token_counts[0].item() eod_token = token_counts[1].item() args.do_train = token_counts[2].item() args.do_valid = token_counts[3].item() args.do_test = token_counts[4].item() args.do_train = flags[0].item() args.do_valid = flags[1].item() args.do_test = flags[2].item() args.eod_token = eod_token tokenizer = get_tokenizer() args.eod_token = tokenizer.eod_id return train_data, val_data, test_data Loading