-
John Chilton authoredJohn Chilton authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
persistence_test.py 4.82 KiB
from contextlib import contextmanager
from os.path import exists, join
import time
from pulsar.managers.queued import QueueManager
from pulsar.managers.stateful import StatefulManagerProxy
from pulsar.tools.authorization import get_authorizer
from .test_utils import (
temp_directory,
TestDependencyManager
)
from galaxy.job_metrics import NULL_JOB_INSTRUMENTER
from galaxy.util.bunch import Bunch
TEST_JOB_ID = "4"
TEST_STAGED_FILE = "cow"
TEST_COMMAND_TOUCH_FILE = "ran"
def test_launched_job_recovery():
"""Tests persistence and recovery of launched managers jobs."""
with _app() as app:
staging_directory = app.staging_directory
queue1 = StatefulManagerProxy(QueueManager('test', app, num_concurrent_jobs=0))
job_id = queue1.setup_job(TEST_JOB_ID, 'tool1', '1.0.0')
touch_file = join(staging_directory, TEST_COMMAND_TOUCH_FILE)
queue1.preprocess_and_launch(job_id, {"command_line": 'touch %s' % touch_file})
time.sleep(.4)
assert not exists(touch_file)
queue1.shutdown()
_setup_manager_that_executes(app)
assert exists(touch_file)
def test_preprocessing_job_recovery():
"""Tests persistence and recovery of preprocessing managers jobs (clean)."""
with _app() as app:
_setup_job_with_unexecuted_preprocessing_directive(app)
staging_directory = app.staging_directory
staged_file = join(staging_directory, TEST_JOB_ID, "inputs", TEST_STAGED_FILE)
touch_file = join(staging_directory, TEST_COMMAND_TOUCH_FILE)
# File shouldn't have been staged because we hacked stateful proxy manager to not
# run preprocess.
assert not exists(staged_file)
_setup_manager_that_preprocesses(app)
assert exists(staged_file)
assert not exists(touch_file)
_setup_manager_that_executes(app)
assert exists(touch_file)
def test_preprocessing_job_recovery_dirty():
"""Tests persistence and recovery of preprocessing managers jobs (dirty)."""
# Same test as above, but simulating existing files from a previous partial
# preprocess.
with _app() as app:
_setup_job_with_unexecuted_preprocessing_directive(app)
staging_directory = app.staging_directory
staged_file = join(staging_directory, TEST_JOB_ID, "inputs", TEST_STAGED_FILE)
touch_file = join(staging_directory, TEST_COMMAND_TOUCH_FILE)
# File shouldn't have been staged because we hacked stateful proxy manager to not
# run preprocess.
assert not exists(staged_file)
# write out partial contents, make sure preprocess writes over this with the correct
# contents.
open(staged_file, "wb").write(b"co")
_setup_manager_that_preprocesses(app)
assert exists(staged_file)
assert open(staged_file, "rb").read() == b"cow file"
assert not exists(touch_file)
_setup_manager_that_executes(app)
assert exists(touch_file)
def _setup_manager_that_preprocesses(app):
# Setup a manager that will preprocess the job but won't execute it.
# Now start a real stateful manager proxy and watch the file get staged.
queue2 = StatefulManagerProxy(QueueManager('test', app, num_concurrent_jobs=0))
try:
queue2.recover_active_jobs()
time.sleep(1)
finally:
try:
queue2.shutdown()
except Exception:
pass
def _setup_job_with_unexecuted_preprocessing_directive(app):
staging_directory = app.staging_directory
queue1 = DoesntPreprocessStatefulManagerProxy(QueueManager('test', app, num_concurrent_jobs=0))
job_id = queue1.setup_job(TEST_JOB_ID, 'tool1', '1.0.0')
action = {"name": TEST_STAGED_FILE, "type": "input", "action": {"action_type": "message", "contents": "cow file"}}
remote_staging = {"setup": [action]}
touch_file = join(staging_directory, TEST_COMMAND_TOUCH_FILE)
queue1.preprocess_and_launch(job_id, {"command_line": "touch '%s'" % touch_file, "remote_staging": remote_staging})
queue1.shutdown()
def _setup_manager_that_executes(app):
queue2 = StatefulManagerProxy(QueueManager('test', app, num_concurrent_jobs=1))
try:
queue2.recover_active_jobs()
time.sleep(1)
finally:
try:
queue2.shutdown()
except Exception:
pass
@contextmanager
def _app():
with temp_directory() as staging_directory:
app = Bunch(
staging_directory=staging_directory,
persistence_directory=staging_directory,
authorizer=get_authorizer(None),
dependency_manager=TestDependencyManager(),
job_metrics=Bunch(default_job_instrumenter=NULL_JOB_INSTRUMENTER),
)
yield app
class DoesntPreprocessStatefulManagerProxy(StatefulManagerProxy):
def _launch_prepreprocessing_thread(self, job_id, launch_config):
pass