Commit af0f5749 authored by Juve, Gideon's avatar Juve, Gideon
Browse files

Add multiple stages, update config

parent 88509201
......@@ -26,6 +26,22 @@ class ACMEWorkflow(object):
self.config = config
self.daxfile = os.path.join(self.outdir, "dax.xml")
self.replicas = {}
self.casename = config.get("acme", "casename")
self.mppwidth = config.get("acme", "mppwidth")
self.stop_option = config.get("acme", "stop_option")
self.stop_n = [x.strip() for x in config.get("acme", "stop_n").split(",")]
self.walltime = [x.strip() for x in config.get("acme", "walltime").split(",")]
if len(self.stop_n) != len(self.walltime):
raise Exception("stop_n should have the same number of entries as walltime")
def generate_env(self):
path = os.path.join(self.outdir, "env.sh")
f = open(path, "w")
try:
f.write("CASENAME=%s" % self.casename)
finally:
f.close()
def add_replica(self, name, path):
"Add a replica entry to the replica catalog for the workflow"
......@@ -54,26 +70,51 @@ tr acme {
arch "x86_64"
os "linux"
type "STAGEABLE"
profile pegasus "exitcode.successmsg" "SUCCESSFUL TERMINATION"
profile globus "count" "%s"
}
}
""" % DAXGEN_DIR)
""" % (DAXGEN_DIR, self.mppwidth))
finally:
f.close()
def generate_workflow(self):
def generate_dax(self):
"Generate a workflow (DAX, config files, and replica catalog)"
dax = ADAG("refinement")
dax = ADAG(self.casename)
last = None
i = 1
for stop_n, walltime in zip(self.stop_n, self.walltime):
stage = Job(name="acme", node_label="stage%s" % i)
stage.addArguments("-run stage%s -stop %s -n %s" % (i, self.stop_option, stop_n))
stage.addProfile(Profile(namespace="globus", key="maxwalltime", value=walltime))
# TODO Add output files
dax.addJob(stage)
stage = Job(name="acme")
stage.addArguments("-run stage1 -stop days -n 5")
dax.addJob(stage)
# TODO Add data analysis job
if last is not None:
dax.depends(stage, last)
last = stage
i+=1
# Write the DAX file
dax.writeXMLFile(self.daxfile)
self.generate_replica_catalog()
def generate_workflow(self):
if os.path.isdir(self.outdir):
raise Exception("Directory exists: %s" % self.outdir)
# Create the output directory
self.outdir = os.path.abspath(self.outdir)
os.makedirs(self.outdir)
self.generate_dax()
self.generate_replica_catalog()
self.generate_transformation_catalog()
self.generate_env()
def main():
if len(sys.argv) != 3:
......@@ -85,13 +126,6 @@ def main():
if not os.path.isfile(configfile):
raise Exception("No such file: %s" % configfile)
if os.path.isdir(outdir):
raise Exception("Directory exists: %s" % outdir)
# Create the output directory
outdir = os.path.abspath(outdir)
os.makedirs(outdir)
# Read the config file
config = ConfigParser()
config.read(configfile)
......
......@@ -17,6 +17,8 @@ else
exit 1
fi
source $WORKFLOW_DIR/env.sh
DIR=$(cd $(dirname $0) && pwd)
SUBMIT_DIR=$WORKFLOW_DIR/submit
DAX=$WORKFLOW_DIR/dax.xml
......@@ -34,7 +36,7 @@ pegasus-plan \
--conf $PP \
--dax $DAX \
--dir $SUBMIT_DIR \
--relative-dir MYCASE \
--relative-dir $CASENAME \
--sites $SITE \
--output-site $OUTPUT_SITE \
--cleanup none \
......
[acme]
# This is the name of the case. This should be the name of the directory
# relative to "shared-scratch" from the site catalog. For example, if
# your shared-scratch is /scratch/juve, and casename is mycase, then the
# case should be set up in /scratch/juve/mycase
casename = F1850.g37.case2
# This is the number of cores to use for each stage
mppwidth = 24
# This is the unit of simulation time
stop_option = days
# This is the number of simulation time units to run for each stage
stop_n = 5, 10, 10
# This is the walltime for each stage in minutes
walltime = 10, 20, 20
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment