Skip to content
Snippets Groups Projects
Commit 8242741b authored by Marius van den Beek's avatar Marius van den Beek
Browse files

Fail jobs that error during job submission

Previously jobs that failed to submit (bad resource requests, malformed
scripts, no qsub available, staging problems) would remain in waiting status
until pulsar was restarted.
parent 95f4fc31
No related branches found
No related tags found
No related merge requests found
...@@ -35,8 +35,8 @@ class CliQueueManager(ExternalBaseManager): ...@@ -35,8 +35,8 @@ class CliQueueManager(ExternalBaseManager):
submission_command = job_interface.submit(script_path) submission_command = job_interface.submit(script_path)
cmd_out = shell.execute(submission_command) cmd_out = shell.execute(submission_command)
if cmd_out.returncode != 0: if cmd_out.returncode != 0:
log.warn("Failed to submit job - command was %s" % submission_command) log.warn("Failed to submit job - command was:\n%s" % submission_command)
raise Exception("Failed to submit job, error was:\n %s" % cmd_out.stderr) raise Exception("Failed to submit job, error was:\n%s" % cmd_out.stderr)
external_id = parse_external_id(cmd_out.stdout.strip()) external_id = parse_external_id(cmd_out.stdout.strip())
if not external_id: if not external_id:
message_template = "Failed to obtain externl id for job_id %s and submission_command %s" message_template = "Failed to obtain externl id for job_id %s and submission_command %s"
......
...@@ -17,12 +17,13 @@ log = logging.getLogger(__name__) ...@@ -17,12 +17,13 @@ log = logging.getLogger(__name__)
DEFAULT_DO_MONITOR = False DEFAULT_DO_MONITOR = False
DECACTIVATE_FAILED_MESSAGE = "Failed to deactivate job with job id %s. May be problems when starting Pulsar next." DECACTIVATE_FAILED_MESSAGE = "Failed to deactivate job with job id %s. May cause problems on next Pulsar start."
ACTIVATE_FAILED_MESSAGE = "Failed to activate job wiht job id %s. This job may not recover properly upon Pulsar restart." ACTIVATE_FAILED_MESSAGE = "Failed to activate job with job id %s. This job may not recover properly upon Pulsar restart."
JOB_FILE_FINAL_STATUS = "final_status" JOB_FILE_FINAL_STATUS = "final_status"
JOB_FILE_POSTPROCESSED = "postprocessed" JOB_FILE_POSTPROCESSED = "postprocessed"
JOB_FILE_PREPROCESSED = "preprocessed" JOB_FILE_PREPROCESSED = "preprocessed"
JOB_FILE_PREPROCESSING_FAILED = "preprocessing_failed"
JOB_METADATA_RUNNING = "running" JOB_METADATA_RUNNING = "running"
DEFAULT_MIN_POLLING_INTERVAL = 0.5 DEFAULT_MIN_POLLING_INTERVAL = 0.5
...@@ -80,9 +81,12 @@ class StatefulManagerProxy(ManagerProxy): ...@@ -80,9 +81,12 @@ class StatefulManagerProxy(ManagerProxy):
with job_directory.lock("status"): with job_directory.lock("status"):
job_directory.store_metadata(JOB_FILE_PREPROCESSED, True) job_directory.store_metadata(JOB_FILE_PREPROCESSED, True)
self.active_jobs.activate_job(job_id) self.active_jobs.activate_job(job_id)
except Exception: except Exception as e:
log.exception("Failed job preprocess for %s:", job_id) with job_directory.lock("status"):
self.__state_change_callback(status.FAILED, job_id) job_directory.store_metadata(JOB_FILE_PREPROCESSING_FAILED, True)
job_directory.store_metadata("return_code", 1)
job_directory.write_file("stderr", str(e))
log.exception("Failed job preprocessing for job %s:", job_id)
new_thread_for_job(self, "preprocess", job_id, do_preprocess, daemon=False) new_thread_for_job(self, "preprocess", job_id, do_preprocess, daemon=False)
...@@ -107,7 +111,10 @@ class StatefulManagerProxy(ManagerProxy): ...@@ -107,7 +111,10 @@ class StatefulManagerProxy(ManagerProxy):
complete status from proxy. complete status from proxy.
""" """
state_change = None state_change = None
if not job_directory.has_metadata(JOB_FILE_PREPROCESSED): if job_directory.has_metadata(JOB_FILE_PREPROCESSING_FAILED):
proxy_status = status.COMPLETE
state_change = "to_complete"
elif not job_directory.has_metadata(JOB_FILE_PREPROCESSED):
proxy_status = status.PREPROCESSING proxy_status = status.PREPROCESSING
elif job_directory.has_metadata(JOB_FILE_FINAL_STATUS): elif job_directory.has_metadata(JOB_FILE_FINAL_STATUS):
proxy_status = job_directory.load_metadata(JOB_FILE_FINAL_STATUS) proxy_status = job_directory.load_metadata(JOB_FILE_FINAL_STATUS)
...@@ -143,17 +150,20 @@ class StatefulManagerProxy(ManagerProxy): ...@@ -143,17 +150,20 @@ class StatefulManagerProxy(ManagerProxy):
deactivate_method(job_id) deactivate_method(job_id)
except Exception: except Exception:
log.exception("Failed to deactivate via proxied manager job %s" % job_id) log.exception("Failed to deactivate via proxied manager job %s" % job_id)
if proxy_status == status.COMPLETE: if proxy_status in [ status.COMPLETE, status.CANCELLED]:
self.__handle_postprocessing(job_id) self.__handle_postprocessing(job_id)
def __handle_postprocessing(self, job_id): def __handle_postprocessing(self, job_id):
def do_postprocess(): def do_postprocess():
postprocess_success = False postprocess_success = False
job_directory = self._proxied_manager.job_directory(job_id)
try: try:
postprocess_success = postprocess(self._proxied_manager.job_directory(job_id), self.__postprocess_action_executor) postprocess_success = postprocess(job_directory, self.__postprocess_action_executor)
except Exception: except Exception:
log.exception("Failed to postprocess results for job id %s" % job_id) log.exception("Failed to postprocess results for job id %s" % job_id)
final_status = status.COMPLETE if postprocess_success else status.FAILED final_status = status.COMPLETE if postprocess_success else status.FAILED
if job_directory.has_metadata(JOB_FILE_PREPROCESSING_FAILED):
final_status = status.FAILED
self.__state_change_callback(final_status, job_id) self.__state_change_callback(final_status, job_id)
new_thread_for_job(self, "postprocess", job_id, do_postprocess, daemon=False) new_thread_for_job(self, "postprocess", job_id, do_postprocess, daemon=False)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment