Skip to content
Snippets Groups Projects
Commit 6edd45dc authored by John Chilton's avatar John Chilton
Browse files

Implemented test case for restart causes MQ to get finalize message.

parent e7e89340
No related branches found
No related tags found
No related merge requests found
......@@ -123,3 +123,10 @@ class PulsarApp(object):
def __setup_job_metrics(self, conf):
job_metrics_config_file = conf.get("job_metrics_config_file", "job_metrics_conf.xml")
self.job_metrics = JobMetrics(job_metrics_config_file)
@property
def only_manager(self):
# convience method for tests, etc... where when we know there
# is only one manager.
assert len(self.managers) == 1
return self.managers.values()[0]
......@@ -72,9 +72,8 @@ def submit_job(manager, job_config):
jobs_directory = os.path.abspath(os.path.join(job_directory, os.pardir))
command_line = command_line.replace('__PULSAR_JOBS_DIRECTORY__', jobs_directory)
if remote_staging:
# TODO: Handle __PULSAR_JOB_DIRECTORY__ config files, metadata files, etc...
manager.handle_remote_staging(job_id, remote_staging)
# TODO: Handle __PULSAR_JOB_DIRECTORY__ config files, metadata files, etc...
manager.handle_remote_staging(job_id, remote_staging)
dependencies_description = dependencies.DependenciesDescription.from_dict(dependencies_description)
return manager.launch(
......
......@@ -15,8 +15,8 @@ def postprocess(job_directory, action_executor):
# Returns True iff outputs were collected.
try:
staging_config = job_directory.load_metadata("staging_config", None)
if staging_config:
return __collect_outputs(job_directory, staging_config, action_executor)
collected = __collect_outputs(job_directory, staging_config, action_executor)
return collected
finally:
job_directory.write_file("postprocessed", "")
return False
......
......@@ -59,7 +59,7 @@ def clean(manager, job_id):
@PulsarController(path="/jobs/{job_id}/submit", method="POST")
def submit(manager, job_id, command_line, params='{}', dependencies_description='null', setup_params='{}', remote_staging='[]', env='[]'):
def submit(manager, job_id, command_line, params='{}', dependencies_description='null', setup_params='{}', remote_staging='{}', env='[]'):
submit_params = loads(params)
setup_params = loads(setup_params)
dependencies_description = loads(dependencies_description)
......
import threading
from .test_utils import (
TempDirectoryTestCase,
skip_unless_module,
restartable_pulsar_app_provider,
)
from pulsar.manager_endpoint_util import (
submit_job,
status_dict,
)
from pulsar.client.amqp_exchange_factory import get_exchange
from pulsar.managers.util.drmaa import DrmaaSessionFactory
import time
class RestartTestCase(TempDirectoryTestCase):
@skip_unless_module("drmaa")
@skip_unless_module("kombu")
def test_restart_finishes_job(self):
mq_url = "memory://test1092"
app_conf = dict(message_queue_url=mq_url)
app_conf["managers"] = {"manager_restart": {'type': 'queued_drmaa'}}
with restartable_pulsar_app_provider(app_conf=app_conf, web=False) as app_provider:
job_id = '12345'
with app_provider.new_app() as app:
manager = app.only_manager
job_info = {
'job_id': job_id,
'command_line': 'sleep 1000',
'setup': True,
}
submit_job(manager, job_info)
# TODO: unfortunate breaking of abstractions here.
time.sleep(.2)
external_id = manager._proxied_manager._external_id(job_id)
print status_dict(manager, job_id)
drmaa_session = DrmaaSessionFactory().get()
drmaa_session.kill(external_id)
drmaa_session.close()
time.sleep(.2)
consumer = SimpleConsumer(queue="status_update", url=mq_url, manager="manager_restart")
consumer.start()
with app_provider.new_app() as app:
time.sleep(.3)
consumer.join()
assert len(consumer.messages) == 1, len(consumer.messages)
assert consumer.messages[0]["status"] == "complete"
class SimpleConsumer(object):
def __init__(self, queue, url, manager="_default_"):
self.queue = queue
self.url = url
self.manager = manager
self.active = True
self.exchange = get_exchange("memory://test1092", manager, {})
self.messages = []
def start(self):
t = threading.Thread(target=self._run)
t.start()
self.thread = t
def join(self):
self.active = False
self.thread.join(10)
def _run(self):
self.exchange.consume("status_update", self._callback, check=self)
def _callback(self, body, message):
self.messages.append(body)
message.ack()
def __nonzero__(self):
return self.active
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment