From 8242741ba15e5b753fbd7f2845156552071adfa0 Mon Sep 17 00:00:00 2001 From: Marius van den Beek <m.vandenbeek@gmail.com> Date: Thu, 14 Apr 2016 20:24:46 +0200 Subject: [PATCH] Fail jobs that error during job submission Previously jobs that failed to submit (bad resource requests, malformed scripts, no qsub available, staging problems) would remain in waiting status until pulsar was restarted. --- pulsar/managers/queued_cli.py | 4 ++-- pulsar/managers/stateful.py | 26 ++++++++++++++++++-------- 2 files changed, 20 insertions(+), 10 deletions(-) diff --git a/pulsar/managers/queued_cli.py b/pulsar/managers/queued_cli.py index 9f6fb80e..65fc442c 100644 --- a/pulsar/managers/queued_cli.py +++ b/pulsar/managers/queued_cli.py @@ -35,8 +35,8 @@ class CliQueueManager(ExternalBaseManager): submission_command = job_interface.submit(script_path) cmd_out = shell.execute(submission_command) if cmd_out.returncode != 0: - log.warn("Failed to submit job - command was %s" % submission_command) - raise Exception("Failed to submit job, error was:\n %s" % cmd_out.stderr) + log.warn("Failed to submit job - command was:\n%s" % submission_command) + raise Exception("Failed to submit job, error was:\n%s" % cmd_out.stderr) external_id = parse_external_id(cmd_out.stdout.strip()) if not external_id: message_template = "Failed to obtain externl id for job_id %s and submission_command %s" diff --git a/pulsar/managers/stateful.py b/pulsar/managers/stateful.py index 7e335da0..6e25f2f6 100644 --- a/pulsar/managers/stateful.py +++ b/pulsar/managers/stateful.py @@ -17,12 +17,13 @@ log = logging.getLogger(__name__) DEFAULT_DO_MONITOR = False -DECACTIVATE_FAILED_MESSAGE = "Failed to deactivate job with job id %s. May be problems when starting Pulsar next." -ACTIVATE_FAILED_MESSAGE = "Failed to activate job wiht job id %s. This job may not recover properly upon Pulsar restart." +DECACTIVATE_FAILED_MESSAGE = "Failed to deactivate job with job id %s. May cause problems on next Pulsar start." +ACTIVATE_FAILED_MESSAGE = "Failed to activate job with job id %s. This job may not recover properly upon Pulsar restart." JOB_FILE_FINAL_STATUS = "final_status" JOB_FILE_POSTPROCESSED = "postprocessed" JOB_FILE_PREPROCESSED = "preprocessed" +JOB_FILE_PREPROCESSING_FAILED = "preprocessing_failed" JOB_METADATA_RUNNING = "running" DEFAULT_MIN_POLLING_INTERVAL = 0.5 @@ -80,9 +81,12 @@ class StatefulManagerProxy(ManagerProxy): with job_directory.lock("status"): job_directory.store_metadata(JOB_FILE_PREPROCESSED, True) self.active_jobs.activate_job(job_id) - except Exception: - log.exception("Failed job preprocess for %s:", job_id) - self.__state_change_callback(status.FAILED, job_id) + except Exception as e: + with job_directory.lock("status"): + job_directory.store_metadata(JOB_FILE_PREPROCESSING_FAILED, True) + job_directory.store_metadata("return_code", 1) + job_directory.write_file("stderr", str(e)) + log.exception("Failed job preprocessing for job %s:", job_id) new_thread_for_job(self, "preprocess", job_id, do_preprocess, daemon=False) @@ -107,7 +111,10 @@ class StatefulManagerProxy(ManagerProxy): complete status from proxy. """ state_change = None - if not job_directory.has_metadata(JOB_FILE_PREPROCESSED): + if job_directory.has_metadata(JOB_FILE_PREPROCESSING_FAILED): + proxy_status = status.COMPLETE + state_change = "to_complete" + elif not job_directory.has_metadata(JOB_FILE_PREPROCESSED): proxy_status = status.PREPROCESSING elif job_directory.has_metadata(JOB_FILE_FINAL_STATUS): proxy_status = job_directory.load_metadata(JOB_FILE_FINAL_STATUS) @@ -143,17 +150,20 @@ class StatefulManagerProxy(ManagerProxy): deactivate_method(job_id) except Exception: log.exception("Failed to deactivate via proxied manager job %s" % job_id) - if proxy_status == status.COMPLETE: + if proxy_status in [ status.COMPLETE, status.CANCELLED]: self.__handle_postprocessing(job_id) def __handle_postprocessing(self, job_id): def do_postprocess(): postprocess_success = False + job_directory = self._proxied_manager.job_directory(job_id) try: - postprocess_success = postprocess(self._proxied_manager.job_directory(job_id), self.__postprocess_action_executor) + postprocess_success = postprocess(job_directory, self.__postprocess_action_executor) except Exception: log.exception("Failed to postprocess results for job id %s" % job_id) final_status = status.COMPLETE if postprocess_success else status.FAILED + if job_directory.has_metadata(JOB_FILE_PREPROCESSING_FAILED): + final_status = status.FAILED self.__state_change_callback(final_status, job_id) new_thread_for_job(self, "postprocess", job_id, do_postprocess, daemon=False) -- GitLab