Commit 96525675 authored by Stansberry, Dale's avatar Stansberry, Dale
Browse files

- Better start-up sync for slow filesystem

parent 48affa75
......@@ -121,6 +121,7 @@ def generateConfiguration( hostname, master, nodeinfo ):
conf = open( SPARK_LOCAL_DIR + "/ready", "w" )
conf.write( '1\n' )
conf.close()
execCommand( "sync" )
def execCommand( a_command ):
proc = subprocess.Popen( a_command, shell = True,
......@@ -153,17 +154,20 @@ if __name__ == '__main__':
wait_time = 0
SPARK_LOCAL_DIR = SPARK_DEPLOY_DIR + '/nodes/' + hostname
if hostname == master:
print 'Waiting for worker nodes to configure'
for node in nodeinfo:
nodename = node["NAME"]
if nodename != master:
while not os.path.isfile( SPARK_DEPLOY_DIR + "/nodes/" + nodename + "/ready" ):
time.sleep( 1 )
wait_time += 1
if wait_time >= 60:
print 'Spark worker node ' + nodename + ' did not configure - aborting.'
sys.exit( 1 )
#print 'Waiting for worker nodes to configure'
#for node in nodeinfo:
# nodename = node["NAME"]
# if nodename != master:
# print ' testing ' + SPARK_DEPLOY_DIR + "/nodes/" + nodename + "/ready"
# while not os.path.isfile( SPARK_DEPLOY_DIR + "/nodes/" + nodename + "/ready" ):
# time.sleep( 1 )
# wait_time += 1
# if wait_time >= 60:
# print 'Spark worker node ' + nodename + ' did not configure - aborting.'
# sys.exit( 1 )
print 'Starting spark master on ' + master + ' at ', time.time()
execCommand( SPARK_HOME + "/sbin/start-master.sh" )
......@@ -172,16 +176,41 @@ if __name__ == '__main__':
outf.write( master )
outf.close()
print 'Waiting for worker nodes to start'
for node in nodeinfo:
nodename = node["NAME"]
if nodename != master:
print ' testing ' + SPARK_DEPLOY_DIR + "/nodes/" + nodename + "/started"
while not os.path.isfile( SPARK_DEPLOY_DIR + "/nodes/" + nodename + "/started" ):
time.sleep( 1 )
wait_time += 1
if wait_time >= 180:
print 'Spark worker node ' + nodename + ' did not start - aborting.'
sys.exit( 1 )
outf = open( SPARK_DEPLOY_DIR + "/nodes/ready", "w" )
outf.write( '1\n' )
outf.close()
execCommand( "sync" )
else:
print hostname + ' waiting for spark master to start'
while not os.path.isfile( SPARK_DEPLOY_DIR + "/nodes/master" ):
time.sleep( 1 )
wait_time += 1
if wait_time >= 60:
if wait_time >= 120:
print 'Master node never started - aborting.'
sys.exit( 1 )
print 'Starting spark slave on ' + hostname + ' at ', time.time()
execCommand( SPARK_HOME + "/sbin/start-slave.sh spark://" + master + ':7077' )
conf = open( SPARK_LOCAL_DIR + "/started", "w" )
conf.write( '1\n' )
conf.close()
execCommand( "sync" )
print 'Configuration on ' + hostname + ' finished.'
sys.exit( 0 )
......@@ -13,7 +13,7 @@ deploy_dir = None
num_nodes = '###'
account = 'xxx####'
walltime = '00:00:00'
deploy_timeout = '30'
deploy_timeout = '180'
# Default cluster settings (for Rhea)
worker_memory = '100g'
......@@ -94,6 +94,18 @@ def generateScript():
outf.write( 'SPARK_SUBMIT="$SPARK_HOME/bin/spark-submit --driver-memory '+driver_memory+' --executor-memory '+executor_memory+' --master spark://$MASTERNODE:7077"\n\n' )
outf.write( 'READYFILE=$SPARK_DEPLOY_DIR/nodes/ready\n')
outf.write( 'timeout=180\n')
outf.write( 'while [ ! -f $READYFILE ]\n')
outf.write( 'do\n')
outf.write( ' sleep 1\n')
outf.write( ' let timeout=timeout-1\n')
outf.write( ' if [ $timeout -eq 0 ]; then\n')
outf.write( ' echo >&2 Spark never started. Inspect per-node logfiles in deployment directory for diagnostics.\n')
outf.write( ' exit 1\n')
outf.write( ' fi\n')
outf.write( 'done\n\n' )
outf.write( "echo =============== Running spark job ===============\n\n" )
outf.write( "# Edit/add the following line to with your Spark program and arguments specified...\n" )
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment