diff --git a/pulsar/managers/queued_cli.py b/pulsar/managers/queued_cli.py index 9f6fb80e43ddce7c8c6b06c5025c84ac4edce8e3..65fc442c0612890f77b7f0b404d89553454495f4 100644 --- a/pulsar/managers/queued_cli.py +++ b/pulsar/managers/queued_cli.py @@ -35,8 +35,8 @@ class CliQueueManager(ExternalBaseManager): submission_command = job_interface.submit(script_path) cmd_out = shell.execute(submission_command) if cmd_out.returncode != 0: - log.warn("Failed to submit job - command was %s" % submission_command) - raise Exception("Failed to submit job, error was:\n %s" % cmd_out.stderr) + log.warn("Failed to submit job - command was:\n%s" % submission_command) + raise Exception("Failed to submit job, error was:\n%s" % cmd_out.stderr) external_id = parse_external_id(cmd_out.stdout.strip()) if not external_id: message_template = "Failed to obtain externl id for job_id %s and submission_command %s" diff --git a/pulsar/managers/stateful.py b/pulsar/managers/stateful.py index 7e335da011f3f46a12418a1253a45a934533a669..6e25f2f6d096518513955628fbad93a4d4cbf81c 100644 --- a/pulsar/managers/stateful.py +++ b/pulsar/managers/stateful.py @@ -17,12 +17,13 @@ log = logging.getLogger(__name__) DEFAULT_DO_MONITOR = False -DECACTIVATE_FAILED_MESSAGE = "Failed to deactivate job with job id %s. May be problems when starting Pulsar next." -ACTIVATE_FAILED_MESSAGE = "Failed to activate job wiht job id %s. This job may not recover properly upon Pulsar restart." +DECACTIVATE_FAILED_MESSAGE = "Failed to deactivate job with job id %s. May cause problems on next Pulsar start." +ACTIVATE_FAILED_MESSAGE = "Failed to activate job with job id %s. This job may not recover properly upon Pulsar restart." JOB_FILE_FINAL_STATUS = "final_status" JOB_FILE_POSTPROCESSED = "postprocessed" JOB_FILE_PREPROCESSED = "preprocessed" +JOB_FILE_PREPROCESSING_FAILED = "preprocessing_failed" JOB_METADATA_RUNNING = "running" DEFAULT_MIN_POLLING_INTERVAL = 0.5 @@ -80,9 +81,12 @@ class StatefulManagerProxy(ManagerProxy): with job_directory.lock("status"): job_directory.store_metadata(JOB_FILE_PREPROCESSED, True) self.active_jobs.activate_job(job_id) - except Exception: - log.exception("Failed job preprocess for %s:", job_id) - self.__state_change_callback(status.FAILED, job_id) + except Exception as e: + with job_directory.lock("status"): + job_directory.store_metadata(JOB_FILE_PREPROCESSING_FAILED, True) + job_directory.store_metadata("return_code", 1) + job_directory.write_file("stderr", str(e)) + log.exception("Failed job preprocessing for job %s:", job_id) new_thread_for_job(self, "preprocess", job_id, do_preprocess, daemon=False) @@ -107,7 +111,10 @@ class StatefulManagerProxy(ManagerProxy): complete status from proxy. """ state_change = None - if not job_directory.has_metadata(JOB_FILE_PREPROCESSED): + if job_directory.has_metadata(JOB_FILE_PREPROCESSING_FAILED): + proxy_status = status.COMPLETE + state_change = "to_complete" + elif not job_directory.has_metadata(JOB_FILE_PREPROCESSED): proxy_status = status.PREPROCESSING elif job_directory.has_metadata(JOB_FILE_FINAL_STATUS): proxy_status = job_directory.load_metadata(JOB_FILE_FINAL_STATUS) @@ -143,17 +150,20 @@ class StatefulManagerProxy(ManagerProxy): deactivate_method(job_id) except Exception: log.exception("Failed to deactivate via proxied manager job %s" % job_id) - if proxy_status == status.COMPLETE: + if proxy_status in [ status.COMPLETE, status.CANCELLED]: self.__handle_postprocessing(job_id) def __handle_postprocessing(self, job_id): def do_postprocess(): postprocess_success = False + job_directory = self._proxied_manager.job_directory(job_id) try: - postprocess_success = postprocess(self._proxied_manager.job_directory(job_id), self.__postprocess_action_executor) + postprocess_success = postprocess(job_directory, self.__postprocess_action_executor) except Exception: log.exception("Failed to postprocess results for job id %s" % job_id) final_status = status.COMPLETE if postprocess_success else status.FAILED + if job_directory.has_metadata(JOB_FILE_PREPROCESSING_FAILED): + final_status = status.FAILED self.__state_change_callback(final_status, job_id) new_thread_for_job(self, "postprocess", job_id, do_postprocess, daemon=False)