Commit edb09351 authored by Laanait, Nouamane's avatar Laanait, Nouamane
Browse files

adding a validation call before exiting out of training loop and update Summit scripts

parent a475ff19
Loading
Loading
Loading
Loading
Loading
+76 −32
Original line number Diff line number Diff line
import os, subprocess, shlex, sys
from mpi4py import MPI
import numpy as np

comm = MPI.COMM_WORLD
comm_rank = comm.Get_rank()

def nvme_staging(data_dir):
def nvme_staging(data_dir, data_tally, eval_data=False):
    user = os.environ.get('USER')
    nvme_dir = '/mnt/bb/%s' %(user)
    #src = "%s/batch_train_%d.db" %(data_dir, comm_rank)
    src = "%s/batch_train_0.db" %(data_dir)
    trg = "%s/batch_train_%d.db" %(nvme_dir, comm_rank)
    #cp_args = "cp -r %s/batch_%d.db %s/batch_%d.db" %(data_dir, comm_rank, nvme_dir, comm_rank)
    index = comm_rank
    if not eval_data:
        # training data
        src = os.path.join(data_dir, 'batch_train_%d.db' % comm_rank)
        if not os.path.exists(src):
            src = u'%s/batch_train_0.db' % data_dir
        src = check_file(data_tally, src, mode="train")
        trg = '%s/batch_train_%d.db' %(nvme_dir, comm_rank)
        cp_args = "cp -r %s %s" %(src, trg)
        cp_args = shlex.split(cp_args)
        if not os.path.exists(trg):
            try:
            subprocess.run(cp_args, check=True)
                subprocess.run(cp_args, check=True, timeout=120)
                print("rank %d:staged %s" % (comm_rank, trg ))
            except subprocess.SubprocessError as e:
                print("rank %d: %s" % (comm_rank, format(e)))
    # evaluation data
        src = "%s/batch_eval_%d.db" %(data_dir, index)
        if not os.path.exists(src):
            src =  "%s/batch_eval_0.db" % data_dir
        src = check_file(data_tally, src, mode="eval")
        trg = "%s/batch_eval_%d.db" %(nvme_dir, comm_rank)
        cp_args = "cp -r %s %s" %(src, trg)
        cp_args = shlex.split(cp_args)
        if not os.path.exists(trg):
            try:
                subprocess.run(cp_args, check=True, timeout=120)
                print("rank %d:staged %s" % (comm_rank, trg ))
            except subprocess.SubprocessError as e:
                print("rank %d: %s" % (comm_rank, format(e)))
    else:
        src = "%s/batch_eval_%d.db" %(data_dir, index)
        if not os.path.exists(src):
            src =  "%s/batch_eval_0.db" % data_dir
        src = check_file(data_tally, src, mode="eval")
        trg = "%s/batch_eval_%d.db" %(nvme_dir, comm_rank)
        cp_args = "cp -r %s %s" %(src, trg)
        cp_args = shlex.split(cp_args)
        if not os.path.exists(trg):
            try:
                subprocess.run(cp_args, check=True, timeout=120)
                print("rank %d:staged %s" % (comm_rank, trg ))
            except subprocess.SubprocessError as e:
                print("rank %d: %s" % (comm_rank, format(e)))

def check_file(tally_path, src, mode="train"):
    tally_arr = np.load(tally_path)
    mask = np.array([itm.find('_%s_' % mode) for itm in tally_arr['filepath']])
    mask[mask >= 0] = 1
    mask[mask < 0] = 0
    mask = mask.astype(np.bool)
    tally_arr = tally_arr[mask]
    cnt = tally_arr['num_samples'][np.where(tally_arr['filepath'] == src)[0]]
    if cnt <= 0 :
        idx = np.where(tally_arr['num_samples'] > 4)[0]
        rand = np.random.randint(0, idx.size)
        new_src = tally_arr['filepath'][idx[rand]]
        print("swapping %s with %s" %(src, new_src))
        return new_src
    return src
    

def nvme_staging_ftf(data_dir):
    user = os.environ.get('USER')
@@ -47,36 +98,29 @@ if __name__ == "__main__":
    user = os.environ.get('USER')
    mpi_host = MPI.Get_processor_name()
    nvme_dir = '/mnt/bb/%s' % user
    if len(sys.argv) > 2:
        data_dir = sys.argv[-2]
        file_type = sys.argv[-1]
    if len(sys.argv) > 3:
        data_dir = sys.argv[-4]
        file_type = sys.argv[-2]
        data_tally = sys.argv[-3]
        mode = sys.argv[-1]
        eval_data = True if mode == 'eval' else False 
        if not os.path.exists(data_tally):
            print('data tally file path does not exists, exiting...')
            sys.exit(1)
        if file_type == 'tfrecord':
            nvme_staging = nvme_staging_ftf
        #purge = bool(sys.argv[-1])
        local_rank_0 = not bool(comm_rank % 6)
        if local_rank_0:
            print('nvme contents on %s: %s '%(mpi_host,format(os.listdir(nvme_dir))))
        comm.Barrier()
        # purge
        #if purge: 
        #    nvme_purging()
        #comm.Barrier()
        #if local_rank_0: 
        #    print('nvme purged on %s' % mpi_host)
        #comm.Barrier()
        # check purge
        #if local_rank_0:
        #    print('nvme contents on %s: %s '%(mpi_host ,format(os.listdir(nvme_dir))))
        #comm.Barrier()
        # stage
        if local_rank_0 : print('begin staging on all nodes')
        nvme_staging(data_dir)
        comm.Barrier()
        if local_rank_0 : print('begin staging on %s' % mpi_host)
        nvme_staging(data_dir, data_tally, eval_data=eval_data)
        if local_rank_0:
            print('all local ranks finished nvme staging on %s' % mpi_host)
        comm.Barrier()
        # check stage 
        if local_rank_0:
           print('nvme contents on %s: %s '%(mpi_host, format(os.listdir(nvme_dir))))
        comm.Barrier()
        sys.exit(0)
    else:
        print('Need paths to data and tally array, and file type')
+18 −12
Original line number Diff line number Diff line
@@ -24,6 +24,7 @@ from stemdl import runtime
from stemdl import io_utils

tf.logging.set_verbosity(tf.logging.ERROR)
tf.config.optimizer.set_jit(False)

def add_bool_argument(cmdline, shortname, longname=None, default=False, help=None):
    if longname is None:
@@ -95,6 +96,8 @@ def main():
                         help="""number of horovod message groups""")
    cmdline.add_argument( '--grad_ckpt', default=None, type=str,
                         help="""gradient-checkpointing:collection,memory,speed""")
    cmdline.add_argument( '--max_time', default=None, type=str,
                         help="""maximum time to run training loop""")
    add_bool_argument( cmdline, '--fp16', default=None,
                         help="""Train with half-precision.""")
    add_bool_argument( cmdline, '--fp32', default=None,
@@ -124,7 +127,8 @@ def main():
       params = io_utils.get_dict_from_json('input_flags.json')
       params[ 'input_flags' ] = 'input_flags.json'
    params['no_jit'] = True 
    params[ 'start_time' ] = time.time( )
    params[ 'start_time' ] = float(os.environ["LSF_JOB_TIMESTAMP_VALUE"])
    #params[ 'start_time' ] = 0 
    params[ 'cmdline' ] = 'unknown'
    params['accumulate_step'] = FLAGS.accumulate_step
    if FLAGS.batch_size is not None :
@@ -158,13 +162,14 @@ def main():
    if FLAGS.filetype is not None:
        params['filetype'] = FLAGS.filetype
    if FLAGS.debug is not None:
        params['debug'] = FLAGS.debug
        params['debug'] = True 
    else: 
        params['debug'] = False
    params['save_step'] = FLAGS.save_steps 
    params['validate_step']= FLAGS.validate_steps 
    params['summary_step']= FLAGS.summary_steps 
    params['hvd_group'] = FLAGS.hvd_group
    params['max_time'] = float(FLAGS.max_time) * 60 - 300 # convert from min and give 300s to copy file from bb 
    if FLAGS.hvd_fp16 is not None:
        params['hvd_fp16'] = hvd.Compression.fp16
    else: 
@@ -178,11 +183,6 @@ def main():
    checkpt_dir = params[ 'checkpt_dir' ]
    # Also need a directory within the checkpoint dir for event files coming from eval
    eval_dir = os.path.join( checkpt_dir, '_eval' )
    #if hvd.rank() == 0:
        #print('ENVIRONMENT VARIABLES: %s' %format(os.environ))
    #    print( 'Creating checkpoint directory %s' % checkpt_dir )
    #tf.gfile.MakeDirs( checkpt_dir )
    #tf.gfile.MakeDirs( eval_dir )

    if params[ 'gpu_trace' ] :
        if tf.gfile.Exists( params[ 'trace_dir' ] ) :
@@ -204,8 +204,14 @@ def main():
       hyper_params[ 'scaling' ] = FLAGS.scaling
    if FLAGS.bn_decay is not None :
       hyper_params[ 'batch_norm' ][ 'decay' ] = FLAGS.bn_decay
    if FLAGS.warm_steps >= 1:
       hyper_params['warm_up'] = True
       hyper_params['num_steps_in_warm_up'] = FLAGS.warm_steps 
    hyper_params['num_steps_per_warm_up'] = FLAGS.warm_steps/10 
       hyper_params['num_steps_per_warm_up'] = FLAGS.warm_steps
    else: 
       hyper_params['warm_up'] = False 
       hyper_params['num_steps_in_warm_up'] = 1 
       hyper_params['num_steps_per_warm_up'] = 1 
    hyper_params['num_steps_per_decay'] = FLAGS.decay_steps 
    #cap max warm-up learning rate by ilr
    hyper_params["warm_up_max_learning_rate"] = min(1, hyper_params['initial_learning_rate'] * hvd.size())
@@ -230,13 +236,13 @@ def main():
        runtime.train(network_config, hyper_params, params)
    elif params['mode'] == 'eval':
        params[ 'IMAGE_FP16' ] = False
        params['output'] = True
        params['debug'] = True 
        params['output'] = True if params['debug'] else False
        runtime.validate_ckpt(network_config, hyper_params, params, last_model=True, sleep=-1, num_batches=None)
        
    # copy checkpoints from nvme
    if FLAGS.nvme is not None:
    if FLAGS.nvme is not None and params['mode'] == 'train':
        if hvd.rank() == 0:
            time.sleep(10) # sleep to give time for storage to flush
            print('copying files from bb...')
            nvme_staging(params['data_dir'],params)
    
+3 −0
Original line number Diff line number Diff line
@@ -415,6 +415,9 @@ def train(network_config, hyper_params, params, gpu_id=None):
            tf.keras.backend.clear_session()
            sess.close()
            return val_results, loss_results
    # Do a validation before exiting
    val = validate(network_config, hyper_params, params, sess, dset)
    val_results.append((train_elf.last_step,val))
    tf.reset_default_graph()
    tf.keras.backend.clear_session()
    sess.close()