From 8ad0fbeb6eee97ef70df61389c9f3ed5561d2766 Mon Sep 17 00:00:00 2001 From: John Chilton <jmchilton@gmail.com> Date: Sun, 10 Apr 2016 15:48:58 -0400 Subject: [PATCH] Revise cleaning up jobs. xref #94 --- pulsar/client/staging/down.py | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/pulsar/client/staging/down.py b/pulsar/client/staging/down.py index e03a0fe0..10a3423b 100644 --- a/pulsar/client/staging/down.py +++ b/pulsar/client/staging/down.py @@ -1,3 +1,4 @@ +"""Code run on the client side for unstaging complete Pulsar jobs.""" from os.path import join from os.path import relpath from contextlib import contextmanager @@ -11,8 +12,10 @@ log = getLogger(__name__) def finish_job(client, cleanup_job, job_completed_normally, client_outputs, pulsar_outputs): - """ Responsible for downloading results from remote server and cleaning up - Pulsar staging directory (if needed.) + """Process for "un-staging" a complete Pulsar job. + + This function is responsible for downloading results from remote + server and cleaning up Pulsar staging directory (if needed.) """ collection_failure_exceptions = [] if job_completed_normally: @@ -20,7 +23,7 @@ def finish_job(client, cleanup_job, job_completed_normally, client_outputs, puls action_mapper = FileActionMapper(client) results_stager = ResultsCollector(output_collector, action_mapper, client_outputs, pulsar_outputs) collection_failure_exceptions = results_stager.collect() - __clean(collection_failure_exceptions, cleanup_job, client) + _clean(collection_failure_exceptions, cleanup_job, client) return collection_failure_exceptions @@ -157,9 +160,15 @@ class DownloadExceptionTracker(object): self.collection_failure_exceptions.append(e) -def __clean(collection_failure_exceptions, cleanup_job, client): +def _clean(collection_failure_exceptions, cleanup_job, client): failed = (len(collection_failure_exceptions) > 0) - if (not failed and cleanup_job != "never") or cleanup_job == "always": + do_clean = (not failed and cleanup_job != "never") or cleanup_job == "always" + if do_clean: + message = "Cleaning up job (failed [%s], cleanup_job [%s])" + else: + message = "Skipping job cleanup (failed [%s], cleanup_job [%s])" + log.debug(message % (failed, cleanup_job)) + if do_clean: try: client.clean() except Exception: -- GitLab