Skip to content
Snippets Groups Projects
Commit 27bbfbe0 authored by Juve, Gideon's avatar Juve, Gideon
Browse files

Add AMWG diagnostics stage

parent e59cdfd6
No related branches found
No related tags found
No related merge requests found
#!/bin/bash
set -e
if [ $# -ne 1 ]; then
echo "Usage: $0 diag140804.csh"
exit 1
fi
# Need to load the modules for the diagnostics code
module load ncl
module load nco
# Chmod the templated script for this stage
chmod 755 $1
# Run the script on a compute node
aprun -n 1 ./$1
#!/usr/bin/env python
import sys
import re
import string
from ConfigParser import ConfigParser
from Pegasus.DAX3 import *
......@@ -11,8 +12,13 @@ def format_template(name, outfile, **kwargs):
"This fills in the values for the template called 'name' and writes it to 'outfile'"
templatefile = os.path.join(TEMPLATE_DIR, name)
template = open(templatefile).read()
formatter = string.Formatter()
data = formatter.format(template, **kwargs)
def repl(match):
key = match.group(1)
return str(kwargs[key])
data = re.sub("\{\{([a-z0-9A-Z-._]+)\}\}", repl, template)
f = open(outfile, "w")
try:
f.write(data)
......@@ -86,16 +92,48 @@ tr acme-output {
profile globus "jobtype" "single"
}
}
""" % (DAXGEN_DIR, self.mppwidth, DAXGEN_DIR))
tr acme-amwg {
site local {
pfn "file://%s/bin/acme-amwg.sh"
arch "x86_64"
os "linux"
type "STAGEABLE"
profile globus "hostcount" "1"
profile globus "jobtype" "single"
}
}
""" % (DAXGEN_DIR, self.mppwidth, DAXGEN_DIR, DAXGEN_DIR))
finally:
f.close()
def generate_amwg_script(self, stage, first_yr, nyrs):
"Generate the amwg script with the appropriate config"
name = "diag140804.stage%s.csh" % stage
path = os.path.join(self.outdir, name)
kw = {
"first_yr": first_yr,
"nyrs": nyrs,
"casename": self.casename,
"stage": stage
}
format_template("diag140804.csh", path, **kw)
self.add_replica(name, path)
return name
def generate_dax(self):
"Generate a workflow (DAX, config files, and replica catalog)"
dax = ADAG(self.casename)
last = None
if self.stop_option in ["nyear", "nyears"]:
amwg = True
else:
print "WARNING: Diagnostics not added to workflow unles stop option is 'nyears'. Current setting is '%s'" % self.stop_option
amwg = False
tot_years = 0
i = 1
for stop_n, walltime in zip(self.stop_n, self.walltime):
stage = Job(name="acme-run")
......@@ -117,7 +155,34 @@ tr acme-output {
dax.addJob(archive)
dax.depends(archive, stage)
# TODO Add data analysis job
# Figure out how many years we have at this point
cur_years = int(stop_n)
tot_years = tot_years + cur_years
# Add diagnostics job for atmosphere
if amwg:
if tot_years <= 1:
print "WARNING: First stage does not have enough years for diagnostics"
else:
# The first year doesn't count, do no more than 5 years
nyrs = min(tot_years-1, 5)
# Years start at 1, not 0
first_yr = tot_years - nyrs + 1
# Create the amwg script
script_name = self.generate_amwg_script(i, first_yr, nyrs)
script = File(script_name)
diagnostics = File("amwg-stage%s" % i)
# Add the job
diag = Job(name="acme-amwg")
diag.addArguments(script)
diag.uses(script, link=Link.INPUT)
diag.uses(diagnostics, link=Link.OUTPUT, register=False, transfer=True)
dax.addJob(diag)
dax.depends(diag, stage)
last = archive
i+=1
......
......@@ -18,6 +18,7 @@
<file-server operation="all" url="gsiftp://hoppergrid.nersc.gov/project/projectdirs/m2187" />
</directory>
<profile namespace="env" key="PEGASUS_HOME">/project/projectdirs/m2187/pegasus/pegasus-4.4.0</profile>
<profile namespace="env" key="DIAG_HOME">/project/projectdirs/m2187/amwg/amwg_diagnostics</profile>
<profile namespace="globus" key="project">m2187</profile>
<profile namespace="globus" key="queue">regular</profile>
</site>
......
This diff is collapsed.
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment