From 2b3942dbd083698225e358f2bb8ec94d3969426c Mon Sep 17 00:00:00 2001 From: John Chilton <jmchilton@gmail.com> Date: Tue, 5 May 2015 13:45:27 -0400 Subject: [PATCH] Even more logging. - Log every upload not just dynamically collected files (more cohesive with download logging). - Log all staging actions (in and out) with a complete description of FileAction. - Log full description of FileAction object on each staging up/staging down failure. Requested by @natefoo. --- pulsar/client/action_mapper.py | 10 ++++++++++ pulsar/client/staging/down.py | 3 ++- pulsar/managers/staging/post.py | 3 ++- pulsar/managers/staging/pre.py | 5 +++-- pulsar/managers/util/retry.py | 14 +++++++++----- test/retry_action_test.py | 2 +- 6 files changed, 27 insertions(+), 10 deletions(-) diff --git a/pulsar/client/action_mapper.py b/pulsar/client/action_mapper.py index 44b633a7..7d19eb55 100644 --- a/pulsar/client/action_mapper.py +++ b/pulsar/client/action_mapper.py @@ -282,6 +282,16 @@ class BaseAction(object): def staging_action_local(self): return self.staging == STAGING_ACTION_LOCAL + def to_dict(self): + return dict(action_type=self.action_type) + + def __str__(self): + as_dict = self.to_dict() + attribute_str = "" + for key, value in as_dict.items(): + attribute_str += "%s=%s" % (key, value) + return "FileAction[%s]" % attribute_str + class NoneAction(BaseAction): """ This action indicates the corresponding path does not require any diff --git a/pulsar/client/staging/down.py b/pulsar/client/staging/down.py index 8feeefc4..37f2e496 100644 --- a/pulsar/client/staging/down.py +++ b/pulsar/client/staging/down.py @@ -111,7 +111,7 @@ class ResultsCollector(object): if name in self.downloaded_working_directory_files: continue if self.client_outputs.dynamic_match(name): - log.info("collecting %s" % name) + log.debug("collecting dynamic output %s" % name) output_file = join(working_directory, self.pulsar_outputs.path_helper.local_name(name)) if self._attempt_collect_output(output_type='output_workdir', path=output_file, name=name): self.downloaded_working_directory_files.append(name) @@ -129,6 +129,7 @@ class ResultsCollector(object): return collected def _collect_output(self, output_type, action, name): + log.info("collecting output %s with action %s" % (name, action)) return self.output_collector.collect_output(self, output_type, action, name) diff --git a/pulsar/managers/staging/post.py b/pulsar/managers/staging/post.py index 69178217..a13792b0 100644 --- a/pulsar/managers/staging/post.py +++ b/pulsar/managers/staging/post.py @@ -55,7 +55,8 @@ class PulsarServerOutputCollector(object): name = os.path.basename(action.path) pulsar_path = self.job_directory.calculate_path(name, output_type) - self.action_executor.execute(lambda: action.write_from_path(pulsar_path)) + description = "staging out file %s via %s" % (pulsar_path, action) + self.action_executor.execute(lambda: action.write_from_path(pulsar_path), description) def __pulsar_outputs(job_directory): diff --git a/pulsar/managers/staging/pre.py b/pulsar/managers/staging/pre.py index 668897d8..90100820 100644 --- a/pulsar/managers/staging/pre.py +++ b/pulsar/managers/staging/pre.py @@ -12,7 +12,8 @@ def preprocess(job_directory, setup_actions, action_executor): input_type = setup_action["type"] action = from_dict(setup_action["action"]) path = job_directory.calculate_path(name, input_type) - log.debug("Staging %s '%s' via %s to %s", input_type, name, action.__class__.__name__, path) - action_executor.execute(lambda: action.write_to_path(path)) + description = "Staging %s '%s' via %s to %s" % (input_type, name, action, path) + log.debug(description) + action_executor.execute(lambda: action.write_to_path(path), "action[%s]" % description) __all__ = ['preprocess'] diff --git a/pulsar/managers/util/retry.py b/pulsar/managers/util/retry.py index 2c0860be..4d9da3c2 100644 --- a/pulsar/managers/util/retry.py +++ b/pulsar/managers/util/retry.py @@ -27,13 +27,16 @@ class RetryActionExecutor(object): self.errback = kwds.get("errback", self.__default_errback) self.catch = kwds.get("catch", DEFAULT_CATCH) - self.description = kwds.get("description", DEFAULT_DESCRIPTION) + self.default_description = kwds.get("description", DEFAULT_DESCRIPTION) - def execute(self, action): + def execute(self, action, description=None): def on_error(exc, intervals, retries, interval=0): interval = next(intervals) if self.errback: - self.errback(exc, interval) + errback_args = [exc, interval] + if description is not None: + errback_args.append(description) + self.errback(exc, interval, description) return interval return _retry_over_time( @@ -46,10 +49,11 @@ class RetryActionExecutor(object): errback=on_error, ) - def __default_errback(self, exc, interval): + def __default_errback(self, exc, interval, description=None): + description = description or self.default_description log.info( "Failed to execute %s, retrying in %s seconds.", - self.description, + description, interval, exc_info=True ) diff --git a/test/retry_action_test.py b/test/retry_action_test.py index e8ef38b8..3345a569 100644 --- a/test/retry_action_test.py +++ b/test/retry_action_test.py @@ -25,7 +25,7 @@ def test_third_execution_fine(): RetryActionExecutor(max_retries=2, interval_start=.01, interval_step=.01).execute(action_tracker.execute) except Exception: exception_raised = True - assert action_tracker.count == 3 + assert action_tracker.count == 3, action_tracker.count assert not exception_raised -- GitLab