Skip to content
Snippets Groups Projects
persistence_test.py 4.82 KiB
Newer Older
John Chilton's avatar
John Chilton committed
from contextlib import contextmanager
from os.path import exists, join
import time

from pulsar.managers.queued import QueueManager
from pulsar.managers.stateful import StatefulManagerProxy
from pulsar.tools.authorization import get_authorizer
John Chilton's avatar
John Chilton committed
from .test_utils import (
    temp_directory,
    TestDependencyManager
)
from galaxy.job_metrics import NULL_JOB_INSTRUMENTER
John Chilton's avatar
John Chilton committed
TEST_JOB_ID = "4"
TEST_STAGED_FILE = "cow"
TEST_COMMAND_TOUCH_FILE = "ran"
John Chilton's avatar
John Chilton committed

def test_launched_job_recovery():
    """Tests persistence and recovery of launched managers jobs."""
    with _app() as app:
        staging_directory = app.staging_directory
        queue1 = StatefulManagerProxy(QueueManager('test', app, num_concurrent_jobs=0))
John Chilton's avatar
John Chilton committed
        job_id = queue1.setup_job(TEST_JOB_ID, 'tool1', '1.0.0')
        touch_file = join(staging_directory, TEST_COMMAND_TOUCH_FILE)
        queue1.preprocess_and_launch(job_id, {"command_line": 'touch %s' % touch_file})
John Chilton's avatar
John Chilton committed
        assert not exists(touch_file)
        queue1.shutdown()
John Chilton's avatar
John Chilton committed
        _setup_manager_that_executes(app)
        assert exists(touch_file)


def test_preprocessing_job_recovery():
    """Tests persistence and recovery of preprocessing managers jobs (clean)."""
    with _app() as app:
        _setup_job_with_unexecuted_preprocessing_directive(app)
        staging_directory = app.staging_directory
        staged_file = join(staging_directory, TEST_JOB_ID, "inputs", TEST_STAGED_FILE)
        touch_file = join(staging_directory, TEST_COMMAND_TOUCH_FILE)

        # File shouldn't have been staged because we hacked stateful proxy manager to not
        # run preprocess.
        assert not exists(staged_file)

        _setup_manager_that_preprocesses(app)

        assert exists(staged_file)
        assert not exists(touch_file)

        _setup_manager_that_executes(app)
        assert exists(touch_file)


def test_preprocessing_job_recovery_dirty():
    """Tests persistence and recovery of preprocessing managers jobs (dirty)."""

    # Same test as above, but simulating existing files from a previous partial
    # preprocess.
    with _app() as app:
        _setup_job_with_unexecuted_preprocessing_directive(app)
        staging_directory = app.staging_directory
        staged_file = join(staging_directory, TEST_JOB_ID, "inputs", TEST_STAGED_FILE)
        touch_file = join(staging_directory, TEST_COMMAND_TOUCH_FILE)

        # File shouldn't have been staged because we hacked stateful proxy manager to not
        # run preprocess.
        assert not exists(staged_file)
        # write out partial contents, make sure preprocess writes over this with the correct
        # contents.
        open(staged_file, "wb").write(b"co")
        _setup_manager_that_preprocesses(app)

        assert exists(staged_file)
        assert open(staged_file, "rb").read() == b"cow file"
        assert not exists(touch_file)

        _setup_manager_that_executes(app)
        assert exists(touch_file)


def _setup_manager_that_preprocesses(app):
    # Setup a manager that will preprocess the job but won't execute it.

    # Now start a real stateful manager proxy and watch the file get staged.
    queue2 = StatefulManagerProxy(QueueManager('test', app, num_concurrent_jobs=0))
    try:
        queue2.recover_active_jobs()
        time.sleep(1)
    finally:
        try:
            queue2.shutdown()
        except Exception:
            pass


def _setup_job_with_unexecuted_preprocessing_directive(app):
    staging_directory = app.staging_directory
    queue1 = DoesntPreprocessStatefulManagerProxy(QueueManager('test', app, num_concurrent_jobs=0))
    job_id = queue1.setup_job(TEST_JOB_ID, 'tool1', '1.0.0')
    action = {"name": TEST_STAGED_FILE, "type": "input", "action": {"action_type": "message", "contents": "cow file"}}
    remote_staging = {"setup": [action]}
    touch_file = join(staging_directory, TEST_COMMAND_TOUCH_FILE)
    queue1.preprocess_and_launch(job_id, {"command_line": "touch '%s'" % touch_file, "remote_staging": remote_staging})
    queue1.shutdown()


def _setup_manager_that_executes(app):
    queue2 = StatefulManagerProxy(QueueManager('test', app, num_concurrent_jobs=1))
    try:
        queue2.recover_active_jobs()
    finally:
        try:
            queue2.shutdown()
        except Exception:
John Chilton's avatar
John Chilton committed


@contextmanager
def _app():
    with temp_directory() as staging_directory:
        app = Bunch(
            staging_directory=staging_directory,
            persistence_directory=staging_directory,
            authorizer=get_authorizer(None),
            dependency_manager=TestDependencyManager(),
            job_metrics=Bunch(default_job_instrumenter=NULL_JOB_INSTRUMENTER),
        )
        yield app


class DoesntPreprocessStatefulManagerProxy(StatefulManagerProxy):

    def _launch_prepreprocessing_thread(self, job_id, launch_config):
        pass