daxgen.py 7.49 KB
Newer Older
1
2
#!/usr/bin/env python
import sys
Juve, Gideon's avatar
Juve, Gideon committed
3
import re
4
5
6
7
8
9
10
11
12
13
14
import string
from ConfigParser import ConfigParser
from Pegasus.DAX3 import *

DAXGEN_DIR = os.path.dirname(os.path.realpath(__file__))
TEMPLATE_DIR = os.path.join(DAXGEN_DIR, "templates")

def format_template(name, outfile, **kwargs):
    "This fills in the values for the template called 'name' and writes it to 'outfile'"
    templatefile = os.path.join(TEMPLATE_DIR, name)
    template = open(templatefile).read()
Juve, Gideon's avatar
Juve, Gideon committed
15
16
17
18
19
20
21

    def repl(match):
        key = match.group(1)
        return str(kwargs[key])

    data = re.sub("\{\{([a-z0-9A-Z-._]+)\}\}", repl, template)

22
23
24
25
26
27
28
29
30
31
32
33
34
    f = open(outfile, "w")
    try:
        f.write(data)
    finally:
        f.close()

class ACMEWorkflow(object):
    def __init__(self, outdir, config):
        "'outdir' is the directory where the workflow is written, and 'config' is a ConfigParser object"
        self.outdir = outdir
        self.config = config
        self.daxfile = os.path.join(self.outdir, "dax.xml")
        self.replicas = {}
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
        self.casename = config.get("acme", "casename")
        self.mppwidth = config.get("acme", "mppwidth")
        self.stop_option = config.get("acme", "stop_option")
        self.stop_n = [x.strip() for x in config.get("acme", "stop_n").split(",")]
        self.walltime = [x.strip() for x in config.get("acme", "walltime").split(",")]

        if len(self.stop_n) != len(self.walltime):
            raise Exception("stop_n should have the same number of entries as walltime")

    def generate_env(self):
        path = os.path.join(self.outdir, "env.sh")
        f = open(path, "w")
        try:
            f.write("CASENAME=%s" % self.casename)
        finally:
            f.close()
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72

    def add_replica(self, name, path):
        "Add a replica entry to the replica catalog for the workflow"
        url = "file://%s" % path
        self.replicas[name] = url

    def generate_replica_catalog(self):
        "Write the replica catalog for this workflow to a file"
        path = os.path.join(self.outdir, "rc.txt")
        f = open(path, "w")
        try:
            for name, url in self.replicas.items():
                f.write('%-30s %-100s pool="local"\n' % (name, url))
        finally:
            f.close()

    def generate_transformation_catalog(self):
        "Write the transformation catalog for this workflow to a file"
        path = os.path.join(self.outdir, "tc.txt")
        f = open(path, "w")
        try:
            f.write("""
73
tr acme-run {
74
    site local {
75
        pfn "file://%s/bin/acme-run.sh"
76
77
78
        arch "x86_64"
        os "linux"
        type "STAGEABLE"
79
        profile pegasus "exitcode.successmsg" "SUCCESSFUL TERMINATION"
80
        profile globus "hostcount" "%s"
Juve, Gideon's avatar
Juve, Gideon committed
81
        profile globus "jobtype" "single"
82
83
    }
}
Juve, Gideon's avatar
Juve, Gideon committed
84

85
tr acme-output {
Juve, Gideon's avatar
Juve, Gideon committed
86
    site local {
87
        pfn "file://%s/bin/acme-output.sh"
Juve, Gideon's avatar
Juve, Gideon committed
88
89
90
        arch "x86_64"
        os "linux"
        type "STAGEABLE"
91
        profile globus "hostcount" "1"
Juve, Gideon's avatar
Juve, Gideon committed
92
93
94
        profile globus "jobtype" "single"
    }
}
Juve, Gideon's avatar
Juve, Gideon committed
95
96
97
98
99
100
101
102
103

tr acme-amwg {
    site local {
        pfn "file://%s/bin/acme-amwg.sh"
        arch "x86_64"
        os "linux"
        type "STAGEABLE"
        profile globus "hostcount" "1"
        profile globus "jobtype" "single"
104
105
106
        profile pegasus "exitcode.successmsg" "NORMAL EXIT FROM SCRIPT"
        profile pegasus "exitcode.failuremsg" "CONVERT NOT FOUND"
        profile pegasus "exitcode.failuremsg" "Segmentation fault"
Juve, Gideon's avatar
Juve, Gideon committed
107
108
109
    }
}
""" % (DAXGEN_DIR, self.mppwidth, DAXGEN_DIR, DAXGEN_DIR))
110
111
112
        finally:
            f.close()

Juve, Gideon's avatar
Juve, Gideon committed
113
114
115
116
117
118
119
120
121
122
123
124
125
126
    def generate_amwg_script(self, stage, first_yr, nyrs):
        "Generate the amwg script with the appropriate config"
        name = "diag140804.stage%s.csh" % stage
        path = os.path.join(self.outdir, name)
        kw = {
            "first_yr": first_yr,
            "nyrs": nyrs,
            "casename": self.casename,
            "stage": stage
        }
        format_template("diag140804.csh", path, **kw)
        self.add_replica(name, path)
        return name

127
    def generate_dax(self):
128
        "Generate a workflow (DAX, config files, and replica catalog)"
129
130
131
132
        dax = ADAG(self.casename)

        last = None

Juve, Gideon's avatar
Juve, Gideon committed
133
134
135
136
137
138
139
        if self.stop_option in ["nyear", "nyears"]:
            amwg = True
        else:
            print "WARNING: Diagnostics not added to workflow unles stop option is 'nyears'. Current setting is '%s'" % self.stop_option
            amwg = False

        tot_years = 0
140
141
        i = 1
        for stop_n, walltime in zip(self.stop_n, self.walltime):
142
            stage = Job(name="acme-run")
Juve, Gideon's avatar
Juve, Gideon committed
143
144
145
            if i > 1:
                stage.addArguments("-continue")
            stage.addArguments("-stage %s -stop %s -n %s" % (i, self.stop_option, stop_n))
146
147
            stage.addProfile(Profile(namespace="globus", key="maxwalltime", value=walltime))
            dax.addJob(stage)
148

149
150
151
            if last is not None:
                dax.depends(stage, last)

Juve, Gideon's avatar
Juve, Gideon committed
152
153
            output = File("%s.stage%s.tar.gz" % (self.casename, i))

154
            archive = Job(name="acme-output")
Juve, Gideon's avatar
Juve, Gideon committed
155
156
157
158
159
160
            archive.addArguments("-stage %s" % i)
            archive.uses(output, link=Link.OUTPUT, register=False, transfer=True)
            archive.addProfile(Profile(namespace="globus", key="maxwalltime", value="30"))
            dax.addJob(archive)
            dax.depends(archive, stage)

Juve, Gideon's avatar
Juve, Gideon committed
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
            # Figure out how many years we have at this point
            cur_years = int(stop_n)
            tot_years = tot_years + cur_years

            # Add diagnostics job for atmosphere
            if amwg:
                if tot_years <= 1:
                    print "WARNING: First stage does not have enough years for diagnostics"
                else:
                    # The first year doesn't count, do no more than 5 years
                    nyrs = min(tot_years-1, 5)

                    # Years start at 1, not 0
                    first_yr = tot_years - nyrs + 1

                    # Create the amwg script
                    script_name = self.generate_amwg_script(i, first_yr, nyrs)
                    script = File(script_name)

                    diagnostics = File("amwg-stage%s" % i)

                    # Add the job
                    diag = Job(name="acme-amwg")
                    diag.addArguments(script)
                    diag.uses(script, link=Link.INPUT)
                    diag.uses(diagnostics, link=Link.OUTPUT, register=False, transfer=True)
187
                    diag.addProfile(Profile(namespace="globus", key="maxwalltime", value="30"))
Juve, Gideon's avatar
Juve, Gideon committed
188
189
                    dax.addJob(diag)
                    dax.depends(diag, stage)
Juve, Gideon's avatar
Juve, Gideon committed
190
191

            last = archive
192
            i+=1
193
194
195
196

        # Write the DAX file
        dax.writeXMLFile(self.daxfile)

197
198
199
    def generate_workflow(self):
        if os.path.isdir(self.outdir):
            raise Exception("Directory exists: %s" % self.outdir)
200

201
202
203
204
205
206
        # Create the output directory
        self.outdir = os.path.abspath(self.outdir)
        os.makedirs(self.outdir)

        self.generate_dax()
        self.generate_replica_catalog()
207
        self.generate_transformation_catalog()
208
        self.generate_env()
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231

def main():
    if len(sys.argv) != 3:
        raise Exception("Usage: %s CONFIGFILE OUTDIR" % sys.argv[0])

    configfile = sys.argv[1]
    outdir = sys.argv[2]

    if not os.path.isfile(configfile):
        raise Exception("No such file: %s" % configfile)

    # Read the config file
    config = ConfigParser()
    config.read(configfile)

    # Generate the workflow in outdir based on the config file
    workflow = ACMEWorkflow(outdir, config)
    workflow.generate_workflow()


if __name__ == '__main__':
    main()