tasks.py 13.9 KB
Newer Older
1
2
3
4
5
6
7
"""Submit a job to a remote server and handle logging.

This module can be used either with Celery, in which case it will run in a
background thread, or as a normal function call, in which case it will block
the current execution thread.

"""
8
from __future__ import absolute_import, print_function, unicode_literals
9

10
11
import collections
import fnmatch
Jose Borreguero's avatar
Jose Borreguero committed
12
from pathlib import PosixPath
13
import time
Jose Borreguero's avatar
Jose Borreguero committed
14
from typing import Optional, Union
Leal, Ricardo's avatar
Leal, Ricardo committed
15
from django.utils import timezone
Tanner Hobson's avatar
Tanner Hobson committed
16

Tanner Hobson's avatar
Tanner Hobson committed
17
from django.core.files import File
Leal, Ricardo's avatar
Leal, Ricardo committed
18
19
from celery.utils.log import get_task_logger

20
from .models import Job, Log, Result
Leal, Ricardo's avatar
Leal, Ricardo committed
21
from .wrapper.local import LocalWrapper
Leal, Ricardo's avatar
Leal, Ricardo committed
22
from .wrapper.remote import RemoteWrapper
Tanner Hobson's avatar
Tanner Hobson committed
23

24
logger = get_task_logger(__name__)
25
26


Tanner Hobson's avatar
Tanner Hobson committed
27
28
29
try:
    from celery import shared_task
except ImportError:
30
    logger.warning('Could not import Celery. ' 'Tasks will not be implemented by Celery\'s queue.')
31

Tanner Hobson's avatar
Tanner Hobson committed
32
    def shared_task(func):
33
        """Naive wrapper in case Celery does not exist."""
34

Tanner Hobson's avatar
Tanner Hobson committed
35
36
37
38
        def delay(*args, **kwargs):
            return func(*args, **kwargs)

        func.delay = delay
Tanner Hobson's avatar
Tanner Hobson committed
39
40
41
        return func


42
class LogPolicy(object):
43
44
    """Specify how logging should be done when running a job."""

45
    LOG_NONE = 0
46
47
    """Don't log anything from the running job."""

48
    LOG_LIVE = 1
49
50
    """Create Log objects immediately when they are received."""

51
    LOG_TOTAL = 2
52
    """Combine all of stdout and stderr at the end of the job."""
53

Tanner Hobson's avatar
Tanner Hobson committed
54

55
def is_matching(filename, patterns=None):
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
    """Check if a filename matches the list of positive and negative patterns.

    Positive patterns are strings like ``"1.txt"``, ``"[23].txt"``, or
    ``"*.txt"``.

    Negative patterns are strings like ``"!1.txt"``, ``"![23].txt"``, or
    ``"!*.txt"``.

    Each pattern is checked in turn, so the list of patterns ``["!*.txt",
    "1.txt"]`` will still match ``"1.txt"``.

    >>> from django_remote_submission.tasks import is_matching
    >>> is_matching("1.txt", patterns=["1.txt"])
    True
    >>> is_matching("1.txt", patterns=["[12].txt"])
    True
    >>> is_matching("1.txt", patterns=["*.txt"])
    True
    >>> is_matching("1.txt", patterns=["1.txt", "!*.txt"])
    False
    >>> is_matching("1.txt", patterns=["!*.txt", "[12].txt"])
    True

    """
80
81
    if patterns is None:
        patterns = ['*']
82

83
    is_matching = False
84

85
    # TODO: suspicious behavior since `is_matching` is determined only by the last pattern in `patterns`
86
87
88
89
90
91
92
    for pattern in patterns:
        if not pattern.startswith('!'):
            if fnmatch.fnmatch(filename, pattern):
                is_matching = True
        else:
            if fnmatch.fnmatch(filename, pattern[1:]):
                is_matching = False
93

94
    return is_matching
95
96


97
class LogContainer(object):
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
    """Manage logs sent by a job according to the log policy.

    .. testsetup::

       from django_remote_submission.models import Job, Server, Interpreter
       from django.contrib.auth import get_user_model
       python3 = Interpreter(name='Python 3', path='/bin/python3', arguments=['-u'])
       server = Server(title='Remote', hostname='foo.invalid', port=22)
       user = get_user_model()(username='john')
       job = Job(title='My Job', program='print("hello world")',
           remote_directory='/tmp/', remote_filename='foobar.py',
           owner=user, server=server, interpreter=python3,
       )

    >>> from django_remote_submission.tasks import LogContainer, LogPolicy
    >>> from datetime import datetime
    >>> now = datetime(year=2017, month=1, day=2, hour=3, minute=4, second=5)
    >>> logs = LogContainer(job, LogPolicy.LOG_LIVE)
    >>> logs.write_stdout(now, 'hello world')  # doctest: +SKIP
    >>> Log.objects.get()  # doctest: +SKIP
    <Log: 2017-01-02 03:04:05 My Job>

    """

122
123
124
125
126
127
128
    LogLine = collections.namedtuple(
        'LogLine',
        [
            'now',
            'output',
        ],
    )
129

130
    def __init__(self, job, log_policy):
131
132
133
134
135
136
        """Instantiate a log container.

        :param models.Job job: the job these logs are coming from
        :param LogPolicy log_policy: the policy to use for logging

        """
137
        self.job = job
138
139
        """The job that these logs are coming from."""

140
        self.log_policy = log_policy
141
142
143
144
145
146
147
148
149
150
151
152
153
154
        """The policy to use when logging."""

        self._stdout = []
        """The list of log lines that came from stdout."""

        self._stderr = []
        """The list of log lines that came from stderr."""

    def _write(self, lst, now, output):
        """Append the current log entry to the given list and flush.

        :param lst: either :attr:`stdout` or :attr:`stderr`
        :param datetime.datetime now: the time this line was produced
        :param str output: the line of output from the job
Tanner Hobson's avatar
Tanner Hobson committed
155

156
        """
157
        if self.log_policy != LogPolicy.LOG_NONE:
158
159
160
161
162
163
            lst.append(
                LogContainer.LogLine(
                    now=now,
                    output=output,
                )
            )
Tanner Hobson's avatar
Tanner Hobson committed
164

165
166
        if self.log_policy == LogPolicy.LOG_LIVE:
            self.flush()
167

168
    def write_stdout(self, now, output):
169
170
171
172
173
174
175
        """Write some output from a job's stdout stream.

        :param datetime.datetime now: the time this output was produced
        :param str output: the output that was produced

        """
        self._write(self._stdout, now, output)
176
177

    def write_stderr(self, now, output):
178
179
180
181
182
183
184
        """Write some output from a job's stderr stream.

        :param datetime.datetime now: the time this output was produced
        :param str output: the output that was produced

        """
        self._write(self._stderr, now, output)
185
186

    def flush(self):
187
188
189
190
191
192
193
194
        """Flush the stdout and stderr lists to Django models.

        If the :attr:`log_policy` is :const:`LogPolicy.LOG_TOTAL`, this method
        will need to be called at the end of the job to ensure all the data
        gets written out.

        There is no penalty for calling this method multiple times, so it can
        be called at the end of the job regardless of which log policy is used.
195

196
197
        """
        if len(self._stdout) > 0:
198
            Log.objects.create(
199
                time=self._stdout[-1].now,
Mathieu Doucet's avatar
Mathieu Doucet committed
200
                content=''.join(line.output for line in self._stdout),
201
                stream='stdout',
202
                job=self.job,
203
            )
204

205
            del self._stdout[:]
206

207
        if len(self._stderr) > 0:
208
            Log.objects.create(
209
                time=self._stderr[-1].now,
Mathieu Doucet's avatar
Mathieu Doucet committed
210
                content=''.join(line.output for line in self._stderr),
211
                stream='stderr',
212
                job=self.job,
213
            )
214

215
            del self._stderr[:]
216

217

218
@shared_task
219
220
221
222
223
224
225
226
227
228
def submit_job_to_server(
    job_pk,
    password=None,
    key_filename=None,
    username=None,
    timeout=None,
    log_policy=LogPolicy.LOG_LIVE,
    store_results=None,
    remote=True,
):
229
230
231
232
233
234
    """Submit a job to the remote server.

    This can be used as a Celery task, if the library is installed and running.

    :param int job_pk: the primary key of the :class:`models.Job` to submit
    :param str password: the password of the user submitting the job
235
    :param key_filename: the path to the private key file
236
237
238
239
240
    :param str username: the username of the user submitting, if it is
        different from the owner of the job
    :param datetime.timedelta timeout: the timeout for running the job
    :param LogPolicy log_policy: the policy to use for logging
    :param list(str) store_results: the patterns to use for the results to store
Leal, Ricardo's avatar
Leal, Ricardo committed
241
    :param bool remote: Either runs this task locally on the host or in a remote server.
242
243

    """
Leal, Ricardo's avatar
Leal, Ricardo committed
244
245
    wrapper_cls = RemoteWrapper if remote else LocalWrapper

246
247
248
249
    job = Job.objects.get(pk=job_pk)

    if username is None:
        username = job.owner.username
250

251
    wrapper = wrapper_cls(
252
253
254
255
        hostname=job.server.hostname,
        username=username,
        port=job.server.port,
    )
256

257
258
259
260
    logs = LogContainer(
        job=job,
        log_policy=log_policy,
    )
Tanner Hobson's avatar
Tanner Hobson committed
261

262
    with wrapper.connect(password, key_filename):
263
264
265
266
267
        wrapper.chdir(job.remote_directory)

        with wrapper.open(job.remote_filename, 'wt') as f:
            f.write(job.program)

268
        time.sleep(1)
269

270
271
272
        job.status = Job.STATUS.submitted
        job.save()

273
274
275
276
277
        interp = job.interpreter.path
        workdir = job.remote_directory
        args = job.interpreter.arguments
        filename = job.remote_filename

278
        job_status = wrapper.exec_command(
279
280
            [interp] + args + [filename],
            workdir,
281
282
283
            timeout=timeout,
            stdout_handler=logs.write_stdout,
            stderr_handler=logs.write_stderr,
Tanner Hobson's avatar
Tanner Hobson committed
284
285
        )

286
287
        logs.flush()

288
289
290
        job.status = Job.STATUS.success if job_status else Job.STATUS.failure
        job.save()

291
        file_attrs = wrapper.listdir_attr()
292
        file_map = {attr.filename: attr for attr in file_attrs}
293
294
295
296
297
298
299
300
301
302
303
        script_attr = file_map[job.remote_filename]
        script_mtime = script_attr.st_mtime

        results = []
        for attr in file_attrs:
            if attr is script_attr:
                continue
            if attr.st_mtime < script_mtime:
                continue
            if not is_matching(attr.filename, store_results):
                continue
304
            else:
305
                pass
306
307
308
309
310
311
312
            result = Result.objects.create(
                remote_filename=attr.filename,
                job=job,
            )

            with wrapper.open(attr.filename, 'rb') as f:
                result.local_file.save(attr.filename, File(f), save=True)
Tanner Hobson's avatar
Tanner Hobson committed
313

314
            results.append(result)
315

316
    return {r.remote_filename: r.pk for r in results}
Leal, Ricardo's avatar
Leal, Ricardo committed
317
318


Leal, Ricardo's avatar
Leal, Ricardo committed
319
@shared_task
320
321
322
323
324
325
326
327
328
329
def copy_job_to_server(
    job_pk,
    password=None,
    key_filename=None,
    username=None,
    timeout=None,
    log_policy=LogPolicy.LOG_LIVE,
    store_results=None,
    remote=True,
):
Leal, Ricardo's avatar
Leal, Ricardo committed
330
    """Copy a job file to the remote server.
Leal, Ricardo's avatar
Leal, Ricardo committed
331
332
333
334
335

    This can be used as a Celery task, if the library is installed and running.

    :param int job_pk: the primary key of the :class:`models.Job` to submit
    :param str password: the password of the user submitting the job
336
    :param key_filename: the path to the private key file
Leal, Ricardo's avatar
Leal, Ricardo committed
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
    :param str username: the username of the user submitting, if it is
        different from the owner of the job
    :param datetime.timedelta timeout: the timeout for running the job
    :param LogPolicy log_policy: the policy to use for logging
    :param list(str) store_results: the patterns to use for the results to store
    :param bool remote: Either runs this task locally on the host or in a remote server.

    """

    logger.debug("copy_job_to_server: %s", locals().keys())

    wrapper_cls = RemoteWrapper if remote else LocalWrapper

    job = Job.objects.get(pk=job_pk)

    if username is None:
        username = job.owner.username

    wrapper = wrapper_cls(
        hostname=job.server.hostname,
        username=username,
        port=job.server.port,
    )

361
    with wrapper.connect(password, key_filename):
Leal, Ricardo's avatar
Leal, Ricardo committed
362
363
364
365
366
367
368
369
370
371
372
        wrapper.chdir(job.remote_directory)

        with wrapper.open(job.remote_filename, 'wt') as f:
            f.write(job.program)

        time.sleep(1)

        job.status = Job.STATUS.submitted
        job.save()

        log = Log(
Leal, Ricardo's avatar
Leal, Ricardo committed
373
            time=timezone.now(),
Leal, Ricardo's avatar
Leal, Ricardo committed
374
            content='File {} successfully copied to {}.'.format(
375
376
                job.remote_filename,
                job.remote_directory,
Leal, Ricardo's avatar
Leal, Ricardo committed
377
378
379
380
381
382
            ),
            stream='stdout',
            job=job,
        )
        log.save()

383
        job.status = Job.STATUS.success
Leal, Ricardo's avatar
Leal, Ricardo committed
384
385
        job.save()

386
    return {}
Leal, Ricardo's avatar
Leal, Ricardo committed
387
388


Leal, Ricardo's avatar
Leal, Ricardo committed
389
@shared_task
390
391
392
393
394
395
396
397
def copy_key_to_server(
    public_key_filename: Optional[Union[PosixPath, str]],
    username: str,
    password: str,
    hostname: str,
    port: int = 22,
    remote: bool = True,
) -> None:
398
    r"""Copy the client key to the remote server so the next connections
Leal, Ricardo's avatar
Leal, Ricardo committed
399
    do not need the password any further
400
    The key will not be copied if it already exists on the remote server.
Leal, Ricardo's avatar
Leal, Ricardo committed
401
    This can be used as a Celery task, if the library is installed and running.
402

Jose Borreguero's avatar
Jose Borreguero committed
403
    :param public_key_filename: absolute path to the public SSH key file
Leal, Ricardo's avatar
Leal, Ricardo committed
404
405
406
407
    :param str username: the username of the user submitting
    :param str password: the password of the user submitting the job
    :param str hostname: The hostname used to connect to the server
    :param int port: The port to connect to for SSH (usually 22)
Leal, Ricardo's avatar
Leal, Ricardo committed
408
    :param bool remote: Either runs this task locally on the host or in a remote server.
Leal, Ricardo's avatar
Leal, Ricardo committed
409
410

    """
Leal, Ricardo's avatar
Leal, Ricardo committed
411
    wrapper_cls = RemoteWrapper if remote else LocalWrapper
Jose Borreguero's avatar
Jose Borreguero committed
412
    wrapper = wrapper_cls(hostname=hostname, username=username, port=port)
413
414
    with wrapper.connect(password):
        wrapper.deploy_key_if_it_does_not_exist(public_key_filename)
Leal, Ricardo's avatar
Leal, Ricardo committed
415
416
417
418
    return None


@shared_task
419
420
421
422
423
424
425
426
427
def delete_key_from_server(
    public_key_filename: Optional[Union[PosixPath, str]],
    username: str,
    password: Optional[str],
    key_filename: Optional[str],
    hostname: str,
    port: int = 22,
    remote: bool = True,
) -> None:
Jose Borreguero's avatar
Jose Borreguero committed
428
    r"""Delete the client key from the remote server so the next connections
Leal, Ricardo's avatar
Leal, Ricardo committed
429
430
431
    will need password. This can be used at the logout of the session.

    This can be used as a Celery task, if the library is installed and running.
432

433
    :param public_key_filename: the path to the public key file
Jose Borreguero's avatar
Jose Borreguero committed
434
    :param username: the username of the user submitting
435
436
    :param key_filename: the password of the user submitting the job
    :param password: the path to the private key file
Jose Borreguero's avatar
Jose Borreguero committed
437
438
439
    :param hostname: The hostname used to connect to the server
    :param port: The port to connect to for SSH (usually 22)
    :param remote: Either runs this task locally on the host or in a remote server.
Leal, Ricardo's avatar
Leal, Ricardo committed
440
    """
Leal, Ricardo's avatar
Leal, Ricardo committed
441
    wrapper_cls = RemoteWrapper if remote else LocalWrapper
Jose Borreguero's avatar
Jose Borreguero committed
442
    wrapper = wrapper_cls(hostname=hostname, username=username, port=port)
443
    with wrapper.connect(password=password, key_filename=key_filename):
444
        wrapper.delete_key(public_key_filename)
Leal, Ricardo's avatar
Leal, Ricardo committed
445
    return None
446
447
448
449
450
451
452
453


# DEBUG: remove this mock task
@shared_task
def clean_expired_sessions():
    r"""Print a silly message"""
    print("DEBUG: clean_expired_sessions")  # print to celery's log file /var/log/celery.log
    open("/tmp/clean_expired_sessions.log", "a").write("DEBUG: clean_expired_sessions\n")