Commit b3bb25af authored by Laanait, Nouamane's avatar Laanait, Nouamane
Browse files

adding distributed hyperparameter search and associated mods to runtime.train()

parent 76a59811
Loading
Loading
Loading
Loading
Loading
+109 −45
Original line number Diff line number Diff line
@@ -2,25 +2,29 @@
Created on 10/15/17.
@author: Numan Laanait, Mike Matheson
"""

import logging
logging.getLogger('tensorflow').setLevel(logging.ERROR)
#logging.getLogger('tensorflow').disabled = True
import tensorflow as tf
import numpy as np
import argparse
#mikem
import json
import time
import sys
import os
import subprocess, shlex
import shutil
try:
   import horovod.tensorflow as hvd
except:
   print( "< ERROR > Could not import horovod module" )
   raise

sys.path.append('../')
from stemdl import runtime
from stemdl import io_utils

tf.logging.set_verbosity(tf.logging.ERROR)

def add_bool_argument(cmdline, shortname, longname=None, default=False, help=None):
    if longname is None:
        shortname, longname = None, shortname
@@ -63,28 +67,49 @@ def main():
                         help="""Hyper parameters.""")
    cmdline.add_argument( '--ilr', default=None, type=float,
                         help="""Initial learning rate ( hyper parameter).""")
    cmdline.add_argument( '--epochs_per_decay', default=None, type=float,
                         help="""Number of epochs per lr decay ( hyper parameter).""")
    cmdline.add_argument( '--warm_steps', default=int(1e6), type=int,
                         help="""Number of Steps to do linear warm-up.""")
    cmdline.add_argument( '--save_steps', default=int(1e3), type=int,
                         help="""Number of Steps to save""")
    cmdline.add_argument( '--validate_steps', default=int(1e3), type=int,
                         help="""Number of Steps to validate.""")
    cmdline.add_argument( '--decay_steps', default=int(1e3), type=int,
                         help="""Number of steps per lr decay ( hyper parameter).""")
    cmdline.add_argument( '--summary_steps', default=int(1e3), type=int,
                         help="""Number of steps to save summaries.""")
    cmdline.add_argument( '--scaling', default=None, type=float,
                         help="""Scaling (hyper parameter).""")
    cmdline.add_argument( '--bn_decay', default=None, type=float,
                         help="""Batch norm decay (hyper parameter).""")
    cmdline.add_argument('--save_epochs', default=0.5, type=float,
                         help="""Number of epochs to save checkpoint. """)
    cmdline.add_argument('--validate_epochs', default=1.0, type=float,
                         help="""Number of epochs to validate """)
    cmdline.add_argument('--mode', default='train', type=str,
                         help="""train or eval (:validates from checkpoint)""")
    cmdline.add_argument('--cpu_threads', default=10, type=int,
                         help="""cpu threads per rank""")
    cmdline.add_argument('--mixing', default=0.0, type=float,
                         help="""weight of noise layer""")
    cmdline.add_argument('--net_type', default=None, type=str,
                         help=""" Type of network: classifier, regressor, hybrid""")
    cmdline.add_argument('--accumulate_step', default=0, type=int,
                         help="""cpu threads per rank""")
    cmdline.add_argument( '--filetype', default=None, type=str,
                         help=""" lmdb or tfrecord""")
    cmdline.add_argument( '--hvd_group', default=None, type=int,
                         help="""number of horovod message groups""")
    cmdline.add_argument( '--grad_ckpt', default=None, type=str,
                         help="""gradient-checkpointing:collection,memory,speed""")
    cmdline.add_argument( '--max_time', default=int(1e18), type=int,
                         help="""maximum time to run training loop""")
    add_bool_argument( cmdline, '--fp16', default=None,
                         help="""Train with half-precision.""")
    add_bool_argument( cmdline, '--fp32', default=None,
                         help="""Train with single-precision.""")
    add_bool_argument( cmdline, '--restart', default=None,
                         help="""Restart training from checkpoint.""")
    add_bool_argument( cmdline, '--nvme', default=None,
                         help="""Copy data to burst buffer.""")
    add_bool_argument( cmdline, '--debug', default=None,
                         help="""Debug print commands.""")
    add_bool_argument( cmdline, '--hvd_fp16', default=None,
                         help="""horovod message compression""")
   
    
    FLAGS, unknown_args = cmdline.parse_known_args()
    if len(unknown_args) > 0:
@@ -100,9 +125,10 @@ def main():
    else :
       params = io_utils.get_dict_from_json('input_flags.json')
       params[ 'input_flags' ] = 'input_flags.json'

    params['no_jit'] = True 
    params[ 'start_time' ] = time.time( )
    params[ 'cmdline' ] = 'unknown'
    params['accumulate_step'] = FLAGS.accumulate_step
    if FLAGS.batch_size is not None :
        params[ 'batch_size' ] = FLAGS.batch_size
    if FLAGS.log_frequency is not None :
@@ -123,14 +149,31 @@ def main():
        params[ 'IMAGE_FP16' ] = False
    if FLAGS.restart is not None :
        params[ 'restart' ] = True
    if FLAGS.save_epochs is not None:
        params['epochs_per_saving'] = FLAGS.save_epochs
    if FLAGS.validate_epochs is not None:
        params['epochs_per_validation'] = FLAGS.validate_epochs
    if FLAGS.mode == 'train':
        params['mode'] = 'train'
    if FLAGS.mode == 'eval':
        params['mode'] = 'eval'
    if FLAGS.cpu_threads is not None:
        params['IO_threads'] = FLAGS.cpu_threads
    if FLAGS.filetype is not None:
        params['filetype'] = FLAGS.filetype
    if FLAGS.debug is not None:
        params['debug'] = FLAGS.debug
    else: 
        params['debug'] = False
    params['save_step'] = FLAGS.save_steps 
    params['validate_step']= FLAGS.validate_steps 
    params['summary_step']= FLAGS.summary_steps 
    params['hvd_group'] = FLAGS.hvd_group
    params['max_time'] = FLAGS.max_time
    if FLAGS.hvd_fp16 is not None:
        params['hvd_fp16'] = hvd.Compression.fp16
    else: 
        params['hvd_fp16'] = hvd.Compression.none
    params['nvme'] = FLAGS.nvme
    params['grad_ckpt'] = FLAGS.grad_ckpt 

    # Add other params
    params.setdefault( 'restart', False )
@@ -138,10 +181,11 @@ def main():
    checkpt_dir = params[ 'checkpt_dir' ]
    # Also need a directory within the checkpoint dir for event files coming from eval
    eval_dir = os.path.join( checkpt_dir, '_eval' )
    if hvd.rank( ) == 0 :
        print( 'Creating checkpoint directory %s' % checkpt_dir )
        tf.gfile.MakeDirs( checkpt_dir )
        tf.gfile.MakeDirs( eval_dir )
    #if hvd.rank() == 0:
        #print('ENVIRONMENT VARIABLES: %s' %format(os.environ))
    #    print( 'Creating checkpoint directory %s' % checkpt_dir )
    #tf.gfile.MakeDirs( checkpt_dir )
    #tf.gfile.MakeDirs( eval_dir )

    if params[ 'gpu_trace' ] :
        if tf.gfile.Exists( params[ 'trace_dir' ] ) :
@@ -152,49 +196,69 @@ def main():

    params['train_dir'] = checkpt_dir
    params['eval_dir'] = eval_dir

    # load network config file and hyper_parameters
    network_config = io_utils.load_json_network_config(params['network_config'])
    hyper_params = io_utils.load_json_hyper_params(params['hyper_params'])

    if FLAGS.ilr  is not None :
       hyper_params[ 'initial_learning_rate' ] = FLAGS.ilr
       #hyper_params[ 'initial_learning_rate' ] = 1e-5 
    if FLAGS.scaling  is not None :
       hyper_params[ 'scaling' ] = FLAGS.scaling
    if FLAGS.epochs_per_decay is not None :
       hyper_params[ 'num_epochs_per_decay' ] = FLAGS.epochs_per_decay
    if FLAGS.bn_decay is not None :
       hyper_params[ 'batch_norm' ][ 'decay' ] = FLAGS.bn_decay
    if FLAGS.mixing is not None:
       hyper_params['mixing'] = FLAGS.mixing
    if FLAGS.net_type is not None:
       hyper_params['network_type'] = FLAGS.net_type
    if FLAGS.warm_steps >= 1:
       hyper_params['warm_up'] = True
       hyper_params['num_steps_in_warm_up'] = FLAGS.warm_steps 
       hyper_params['num_steps_per_warm_up'] = FLAGS.warm_steps
    else: 
       hyper_params['warm_up'] = False 
       hyper_params['num_steps_in_warm_up'] = 1 
       hyper_params['num_steps_per_warm_up'] = 1 
    hyper_params['num_steps_per_decay'] = FLAGS.decay_steps 
    #cap max warm-up learning rate by ilr
    hyper_params["warm_up_max_learning_rate"] = min(1, hyper_params['initial_learning_rate'] * hvd.size())

    # print relevant params passed to training 
    if hvd.rank( ) == 0 :
       if os.path.isfile( 'cmd.log' ) :
          cmd = open( "cmd.log", "r" )
          cmdline = cmd.readline( )
          params[ 'cmdline' ] = cmdline

       print( "network_config.json" )
       _input = json.dumps( network_config, indent=3, sort_keys=False)
       print( "%s" % _input )

       print( "input_flags.json" )
       _input = json.dumps( params, indent=3, sort_keys=False)
       print( "### hyper_params.json" )
       _input = json.dumps( hyper_params, indent=3, sort_keys=False)
       print( "%s" % _input )
       
       print( "hyper_params.json" )
       _input = json.dumps( hyper_params, indent=3, sort_keys=False)
       print("### params passed at CLI")
       _input = json.dumps(vars(FLAGS), indent=4)
       print("%s" % _input) 
  
    # train or evaluate
    if params['mode'] == 'train':
        runtime.train_horovod_mod(network_config, hyper_params, params)
        runtime.train(network_config, hyper_params, params)
    elif params['mode'] == 'eval':
        params[ 'IMAGE_FP16' ] = False
        runtime.validate_ckpt(network_config, hyper_params, params, last_model=False, sleep=0)
        params['output'] = True
        params['debug'] = True 
        runtime.validate_ckpt(network_config, hyper_params, params, last_model=True, sleep=-1, num_batches=None)
        
    # copy checkpoints from nvme
    if FLAGS.nvme is not None:
        if hvd.rank() == 0:
            print('copying files from bb...')
            nvme_staging(params['data_dir'],params)
    
def nvme_staging(data_dir, params):
    user = os.environ.get('USER')
    gpfs_ckpt_dir = os.environ.get('CKPT_DIR')
    #nvme_dir = '/mnt/bb/%s' %(user)
    #if hvd.rank() == 0: print(os.listdir(nvme_dir))
    cp_args = "cp -r %s %s" %(params['checkpt_dir'], gpfs_ckpt_dir)
    #if hvd.rank() == 0: print(cp_args)
    cp_args = shlex.split(cp_args)
    subprocess.run(cp_args, check=True)
    return         

if __name__ == '__main__':
    main()
+345 −0

File added.

Preview size limit exceeded, changes collapsed.