Commit b9f824e9 authored by Stansberry, Dale's avatar Stansberry, Dale
Browse files

- Moved from old code-int project

parent 077842a9
Pipeline #2641 skipped
This is a software setup tool to deploy and execute Spark jobs on Rhea.
Two scripts are provided:
spark_setup.py - Configures a spark deployment and generates PBS script
spark_deploy.py - Deploys spark on Rhea (called by generated PBS script)
Note: Spark 1.6 (stand-alone) must be installed and the SPARK_HOME environment variable set to point to the Spark instalation directory. The Rhea production environment will eventually include a Spark module which will configure your environment to point to a global installation of Spark. The two scripts mentioned above will be pre-installed in the SPARK_HOME/bin and SPARK_HOME/sbin directories, respectively. Until Spark is officially supported, Spark must be installed and configured manually.
To configure a spark job, run spark_setup.py with the following parameters:
-s <file> : The PBS script file to generate'
-a <account> : Name of account to charge'
-n <num> : Number of nodes'
-w <time> : Maximum walltime'
-d <path> : Spark deployment directory'
-p : Include python support (if needed for spark application)'
The deployment directory must be unique for each Spark job being executed and should be located in a scratch space (Spark uses this directory to write temporary files). After running spark_setup.py, the specified deployment directory will be created (or re-initialized if it already exists) and template configuration files/scripts will copied into the "templates" subdirectory under the deployment directory. If needed, these template files may be modified before the Spark job is submitted. When the job is submitted, these template files will be copied into per-node configuration directories and used by Spark to configure worker nodes.
The PBS script generated by spark_setup.py must be edited to specify the Spark application and any arguments on the spark-submit line (this line is commented-out initially). You may execute multiple spark jobs within one PBS script (just copy/paste the spark-submit line). When the PBS script exits, the deployed spark cluster will be shutdown.
#!/sw/rhea/python/2.7.9/rhel6.6_gnu4.4.7/bin/python
import os
import sys
import socket
import time
import json
import shutil
import stat
import re
import subprocess
def collectNodeInfo():
# Collect the names of allocated nodes...
# Rank 0 node will be the master, all others will be workers.
# The ${PBS_NODEFILE} file exists only on the master node
nodefilename = os.environ["PBS_NODEFILE"]
hostlist=[]
if os.path.isfile( nodefilename ):
nodefile = open( nodefilename, 'r' )
nodenameset = set([])
for line in nodefile:
nodenameset.add(line.strip())
nodenamelist = list( nodenameset )
# TODO This impl is inefficient
for nodename in nodenamelist:
# need to replace with re.match
hostsfile = open( '/etc/hosts', 'r' )
output = []
for line in hostsfile:
if len( line.strip().split()) > 2:
if re.match(nodename+'\\b',line.strip().split()[2]):
hostdict={"IP":line.strip().split()[0], \
"FULLNAME": line.strip().split()[1], \
"NAME": line.strip().split()[2].lower()}
hostlist.append(hostdict)
return hostlist
def createConfigDirs( hostname ):
LOCAL_DIR = SPARK_DEPLOY_DIR + '/nodes/' + hostname + "/local"
CONF_DIR = SPARK_DEPLOY_DIR + '/nodes/' + hostname + "/conf"
LOG_DIR = SPARK_DEPLOY_DIR + '/nodes/' + hostname + "/logs"
if not os.path.isdir( LOCAL_DIR ):
os.makedirs( LOCAL_DIR )
if not os.path.isdir( CONF_DIR ):
try:
os.makedirs( CONF_DIR )
except OSError as exception:
if exception.errno != errno.EEXIST:
raise
if not os.path.isdir( LOG_DIR ):
try:
os.makedirs( LOG_DIR )
except OSError as exception:
if exception.errno != errno.EEXIST:
raise
def generateConfiguration( hostname, master, nodeinfo ):
# Generate common configuration files
# Original emplates are located in SPARK_HOME/conf
# Local Templates are located in SPARK_DEPLOY_DIR/templates
# Each compute node reads configurations from SPARK_DEPLOY_DIR/nodes/hostname/conf
createConfigDirs( hostname )
SPARK_LOCAL_DIR = SPARK_DEPLOY_DIR + '/nodes/' + hostname
# find the ip address of the master node (master node only)
masterip = None
for node in nodeinfo:
if master == node["NAME"]:
masterip = node["IP"]
shutil.copyfile(
SPARK_DEPLOY_DIR + "/templates/spark-env.sh.template",
SPARK_LOCAL_DIR + "/conf/spark-env.sh" )
conf = open( SPARK_LOCAL_DIR + "/conf/spark-env.sh", "a" )
conf.write( '\n# Per-node settings generated by spark_deploy:\n' )
if masterip != None:
conf.write( 'SPARK_MASTER_IP=' + masterip + '\n' )
conf.write( 'SPARK_LOG_DIR=' + SPARK_LOCAL_DIR + '/logs\n' )
conf.write( 'SPARK_WORKER_DIR=' + SPARK_LOCAL_DIR + '\n' )
conf.write( 'SPARK_LOCAL_DIRS=' + SPARK_LOCAL_DIR + '/local\n' )
if python_support:
conf.write( 'SPARK_PYTHON_SUPPORT=1\n' )
conf.close()
os.chmod( SPARK_LOCAL_DIR + "/conf/spark-env.sh", stat.S_IRWXU )
shutil.copyfile(
SPARK_DEPLOY_DIR + "/templates/spark-defaults.conf.template",
SPARK_LOCAL_DIR + "/conf/spark-defaults.conf" )
conf = open( SPARK_LOCAL_DIR + "/conf/spark-defaults.conf", "a" )
conf.write( '\n# Per-node settings generated by spark_deploy:\n' )
conf.write( 'spark.master spark://' + master + ':7077\n' )
conf.write( 'spark.driver.host ' + master + '\n' )
conf.write( 'spark.local.dir ' + SPARK_LOCAL_DIR + '/local\n' )
conf.close()
shutil.copyfile(
SPARK_DEPLOY_DIR + "/templates/log4j.properties.template",
SPARK_LOCAL_DIR + "/conf/log4j.properties" )
# Slaves file (master node only)
if masterip != None:
outfile = open( SPARK_LOCAL_DIR + "/conf/slaves",'w')
for node in nodeinfo:
nodename = node["NAME"]
if nodename != master:
outfile.write( nodename + "\n" )
outfile.close()
conf = open( SPARK_LOCAL_DIR + "/ready", "w" )
conf.write( '1\n' )
conf.close()
def execCommand( a_command ):
proc = subprocess.Popen( a_command, shell = True,
stdout = subprocess.PIPE, stderr = subprocess.STDOUT,
universal_newlines = True )
output = proc.communicate()
retcode = proc.poll()
if retcode:
raise subprocess.CalledProcessError( retcode, command, output = output[0] )
return output
if __name__ == '__main__':
SPARK_HOME = sys.argv[1]
SPARK_DEPLOY_DIR = sys.argv[2]
hostname = socket.gethostname()
master = os.environ["HOSTNAME"].lower()
print 'Configuring spark on ' + hostname + ' at ' + SPARK_DEPLOY_DIR + ', master: ' + master
if "SPARK_PYTHON_SUPPORT" in os.environ:
python_support = True
else:
python_support = False
nodeinfo = collectNodeInfo()
generateConfiguration( hostname, master, nodeinfo )
os.environ["SPARK_CONF_DIR"] = SPARK_DEPLOY_DIR + "/nodes/" + hostname + "/conf"
wait_time = 0
if hostname == master:
print 'Waiting for worker nodes to configure'
for node in nodeinfo:
nodename = node["NAME"]
if nodename != master:
while not os.path.isfile( SPARK_DEPLOY_DIR + "/nodes/" + nodename + "/ready" ):
time.sleep( 1 )
wait_time += 1
if wait_time >= 60:
print 'Spark worker node ' + nodename + ' did not configure - aborting.'
sys.exit( 1 )
print 'Starting spark master on ' + master + ' at ', time.time()
execCommand( SPARK_HOME + "/sbin/start-master.sh" )
outf = open( SPARK_DEPLOY_DIR + "/nodes/master", "w" )
outf.write( master )
outf.close()
else:
while not os.path.isfile( SPARK_DEPLOY_DIR + "/nodes/master" ):
time.sleep( 1 )
wait_time += 1
if wait_time >= 60:
print 'Master node never started - aborting.'
sys.exit( 1 )
print 'Starting spark slave on ' + hostname + ' at ', time.time()
execCommand( SPARK_HOME + "/sbin/start-slave.sh spark://" + master + ':7077' )
print 'Configuration on ' + hostname + ' finished.'
sys.exit( 0 )
#!/usr/bin/python
import os
import sys
import getopt
import time
import shutil
verbose = False
python_support = False
script_file = 'spark.pbs'
spark_home = None
deploy_dir = None
num_nodes = '###'
account = 'xxx####'
walltime = '00:00:00'
deploy_timeout = '30'
# Default cluster settings (for Rhea)
worker_memory = '100g'
worker_cores = '30'
driver_memory = '64g'
executor_memory = '4g'
def printUsage():
print 'Usage spark_setup.py [options]'
print 'Options:'
print ' -h,-?,--help : Show help'
print ' -s,--script <file> : Generate PBS script <file>'
print ' -a,--account <acc> : Name of account to charge'
print ' -n,--nodes <num> : Number of nodes'
print ' -w,--wall <time> : Maximum walltime'
print ' -d,--deploy-dir <path> : Spark deployment directory *'
print ' -t,--timeout <sec> : Set deployment timeout (default = 30 sec)'
print ' -p,--python : Include python support'
print ' --spark-home <path> : Override/set SPARK_HOME env variable **\n'
print ' --worker-mem <arg> : Worker memory\n'
print ' --worker-cores <arg> : Worker cores\n'
print ' --driver-mem <arg> : Driver memory\n'
print ' --exec-mem <arg> : Executor memory\n'
print '* The deployment directory must be accessible from login AND compute'
print ' nodes, and will also be used for scratch space.\n'
print '** Overriding SPARK_HOME is not recommended, use with caution\n\n'
print 'Note: It is not necessary to re-run spark_setup for subsequent spark'
print 'deployments unless the SPARK_HOME and/or deployment directories are changed.'
print 'The generated PBS script may be edited directly to change or add PBS'
print 'options.'
def printOptions():
print 'PBS script :', script_file
print 'Account :', account
print 'Num nodes :', num_nodes
print 'Wall time :', walltime
print 'Deployment dir :', deploy_dir
print 'Python support :', python_support
print 'SPARK_HOME :', spark_home
print 'Worker memory :', worker_memory
print 'Worker cores :', worker_cores
print 'Driver memory :', driver_memory
print 'Executor memory:', executor_memory
def generateScript():
outf = open( script_file, 'w' )
outf.write( "#!/bin/bash\n\n" )
outf.write( '#PBS -A ' + account + '\n' )
outf.write( '#PBS -l walltime=' + walltime + ',nodes=' + num_nodes + '\n' )
outf.write( '\n' )
outf.write( 'source ${MODULESHOME}/init/bash\n\n' )
outf.write( 'module load python\n' )
if python_support == True:
outf.write( 'export SPARK_PYTHON_SUPPORT=1\n' )
outf.write( 'export SPARK_HOME=' + spark_home + '\n' )
if deploy_dir != None:
outf.write( 'export SPARK_DEPLOY_DIR=' + deploy_dir + '\n\n' )
outf.write( 'if [ -d "$SPARK_DEPLOY_DIR/nodes" ]; then\n' )
outf.write( ' rm -rf $SPARK_DEPLOY_DIR/nodes/*\n' )
outf.write( ' sync\n' )
outf.write( 'fi\n\n' )
outf.write( "echo =============== Running spark deployment ===============\n\n" )
outf.write( 'cd $WORKDIR\n' )
outf.write( 'mpirun -n ' + num_nodes + ' --npernode 1 $SPARK_HOME/sbin/spark_deploy.py $SPARK_HOME $SPARK_DEPLOY_DIR &\n\n' )
outf.write( "MASTERFILE=$SPARK_DEPLOY_DIR/nodes/master\n" )
outf.write( "timeout=" + deploy_timeout + "\n" )
outf.write( "while [ ! -f $MASTERFILE ]\n" )
outf.write( "do\n" )
outf.write( " sleep 1\n" )
outf.write( " let timeout=timeout-1\n" )
outf.write( " if [ $timeout -eq 0 ]; then\n" )
outf.write( " >&2 Spark deployment failed!\n" )
outf.write( " exit 1\n" )
outf.write( " fi\n" )
outf.write( "done\n\n" )
outf.write( 'export SPARK_SUBMIT=$SPARK_HOME/bin/spark-submit --driver-memory '+driver_memory+' --master spark://$MASTERNODE:7077 \n\n' )
outf.write( "MASTERNODE=$(<$MASTERFILE)\n" )
outf.write( "echo Master: $MASTERNODE\n\n" )
outf.write( "echo =============== Running spark job ===============\n\n" )
outf.write( "# Edit/add the following line to with your Spark program and arguments specified...\n\n" )
outf.write( "# $SPARK_SUBMIT <application> [arguments...]\n")
outf.write( "# $SPARK_HOME/bin/spark-submit --driver-memory 100g --master spark://$MASTERNODE:7077 <application> [arguments...]\n\n" )
outf.write( "echo =============== Spark job finished ===============\n\n" )
#outf.write( "$SPARK_HOME/sbin/stop-all.sh\n" )
outf.close()
def setupDeployDir():
try:
if not os.path.exists( deploy_dir ):
os.makedirs( deploy_dir )
if not os.path.exists( deploy_dir + '/templates' ):
os.makedirs( deploy_dir + '/templates')
if not os.path.exists( deploy_dir + '/nodes' ):
os.makedirs( deploy_dir + '/nodes')
shutil.copyfile( spark_home + "/conf/spark-env.sh.template",
deploy_dir + "/templates/spark-env.sh.template" )
# Append default (static) settings to spark-env.sh template
conf = open( deploy_dir + "/templates/spark-env.sh.template", "a" )
conf.write( '\n# Default (static) settings generated by spark_setup:\n' )
conf.write( 'SPARK_HOME='+ spark_home + '\n' )
conf.write( 'SPARK_DRIVER_MEMORY='+driver_memory+'\n' )
conf.write( 'SPARK_WORKER_INSTANCES=1\n' )
conf.write( 'SPARK_WORKER_MEMORY='+worker_memory+'\n' )
conf.write( 'SPARK_WORKER_CORES='+worker_cores+'\n' )
conf.write( '\n# Support for modules:\n' )
conf.write( 'source /etc/profile\n' )
conf.write( 'source $MODULESHOME/init/bash\n' )
if python_support == True:
conf.write( 'module load python\n' )
conf.write( 'module load python_pip\n' )
conf.close()
# Append default (static) settings to spark-defaults.conf template
shutil.copyfile( spark_home + "/conf/spark-defaults.conf.template",
deploy_dir + "/templates/spark-defaults.conf.template" )
conf = open( deploy_dir + "/templates/spark-defaults.conf.template", "a" )
conf.write( '\n# Default (static) settings generated by spark_setup:\n' )
conf.write( 'spark.driver.memory '+driver_memory+'\n' )
conf.write( 'spark.driver.maxResultSize 0\n' )
conf.write( 'spark.executor.memory '+executor_memory+'\n' )
#conf.write( 'spark.shuffle.consolidateFiles true\n' )
conf.write( '\n' )
conf.close()
shutil.copyfile( spark_home + "/conf/log4j.properties.template",
deploy_dir + "/templates/log4j.properties.template" )
except Exception as e:
print 'Failed creating deployment directories.'
print 'Please ensure the deployment path is valid and accessible.\n'
print e
sys.exit(1)
try:
opts, args = getopt.getopt( sys.argv[1:],
'?hvs:d:a:n:w:p',
['verbose','help','script=','deploy-dir=','account=','wall=','nodes=','spark-home=','python'] )
if len(args) > 0:
print 'Unrecognized options: ', args
printUsage()
sys.exit(1)
except getopt.GetoptError as e:
print e
printUsage()
sys.exit(1)
for opt, arg in opts:
if opt in ( '-?', '-h', '--help' ):
printUsage()
sys.exit()
elif opt in ( '-s', '--script' ):
script_file = arg
elif opt in ( '-d', '--deploy-dir' ):
deploy_dir = arg
elif opt in ( '-n', '--nodes' ):
num_nodes = arg
elif opt in ( '-a', '--account' ):
account = arg
elif opt in ( '-w', '--wall' ):
walltime = arg
elif opt in ( '--spark-home' ):
spark_home = arg
elif opt in ( '-p', '--python' ):
python_support = True
elif opt in ( '-t', '--timeout' ):
deploy_timeout = arg
elif opt in ( '-v', '--verbose' ):
verbose = True
elif opt in ( '--worker-mem' ):
worker_memory = arg
elif opt in ( '--worker-cores' ):
worker_cores = arg
elif opt in ( '--driver-mem' ):
driver_memory = arg
elif opt in ( '--exec-mem' ):
executor_memory = arg
if spark_home == None:
if "SPARK_HOME" in os.environ:
spark_home = os.environ["SPARK_HOME"]
else:
print 'SPARK_HOME must either exported or specified with --spark-home option'
sys.exit(1)
if verbose == True:
printOptions()
if script_file != None:
generateScript()
if deploy_dir != None:
setupDeployDir()
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment