diff --git a/pulsar/client/action_mapper.py b/pulsar/client/action_mapper.py index 44b633a738d68f75d16187278a6deab34b63e6ea..7d19eb550541584e716a918e4dbbc5e486f3c180 100644 --- a/pulsar/client/action_mapper.py +++ b/pulsar/client/action_mapper.py @@ -282,6 +282,16 @@ class BaseAction(object): def staging_action_local(self): return self.staging == STAGING_ACTION_LOCAL + def to_dict(self): + return dict(action_type=self.action_type) + + def __str__(self): + as_dict = self.to_dict() + attribute_str = "" + for key, value in as_dict.items(): + attribute_str += "%s=%s" % (key, value) + return "FileAction[%s]" % attribute_str + class NoneAction(BaseAction): """ This action indicates the corresponding path does not require any diff --git a/pulsar/client/staging/down.py b/pulsar/client/staging/down.py index 8feeefc4a2b07b1a2a151057a8d804054bf75861..37f2e4965c372db0297fd53e73b59da24d5b7a1d 100644 --- a/pulsar/client/staging/down.py +++ b/pulsar/client/staging/down.py @@ -111,7 +111,7 @@ class ResultsCollector(object): if name in self.downloaded_working_directory_files: continue if self.client_outputs.dynamic_match(name): - log.info("collecting %s" % name) + log.debug("collecting dynamic output %s" % name) output_file = join(working_directory, self.pulsar_outputs.path_helper.local_name(name)) if self._attempt_collect_output(output_type='output_workdir', path=output_file, name=name): self.downloaded_working_directory_files.append(name) @@ -129,6 +129,7 @@ class ResultsCollector(object): return collected def _collect_output(self, output_type, action, name): + log.info("collecting output %s with action %s" % (name, action)) return self.output_collector.collect_output(self, output_type, action, name) diff --git a/pulsar/managers/staging/post.py b/pulsar/managers/staging/post.py index 69178217acaf95fbc1c232f6f9e33f0a67d5f322..a13792b0089c9a6c30576cf06c34a0853b060448 100644 --- a/pulsar/managers/staging/post.py +++ b/pulsar/managers/staging/post.py @@ -55,7 +55,8 @@ class PulsarServerOutputCollector(object): name = os.path.basename(action.path) pulsar_path = self.job_directory.calculate_path(name, output_type) - self.action_executor.execute(lambda: action.write_from_path(pulsar_path)) + description = "staging out file %s via %s" % (pulsar_path, action) + self.action_executor.execute(lambda: action.write_from_path(pulsar_path), description) def __pulsar_outputs(job_directory): diff --git a/pulsar/managers/staging/pre.py b/pulsar/managers/staging/pre.py index 668897d8fc1544c0c119c9025df517fd435a84e6..901008206162cb79050072181de79a01c411b454 100644 --- a/pulsar/managers/staging/pre.py +++ b/pulsar/managers/staging/pre.py @@ -12,7 +12,8 @@ def preprocess(job_directory, setup_actions, action_executor): input_type = setup_action["type"] action = from_dict(setup_action["action"]) path = job_directory.calculate_path(name, input_type) - log.debug("Staging %s '%s' via %s to %s", input_type, name, action.__class__.__name__, path) - action_executor.execute(lambda: action.write_to_path(path)) + description = "Staging %s '%s' via %s to %s" % (input_type, name, action, path) + log.debug(description) + action_executor.execute(lambda: action.write_to_path(path), "action[%s]" % description) __all__ = ['preprocess'] diff --git a/pulsar/managers/util/retry.py b/pulsar/managers/util/retry.py index 2c0860bef72eea48c8c8912d748022e1fa35d8bd..4d9da3c2cefcef5f77176c24b4a3fd232f5af414 100644 --- a/pulsar/managers/util/retry.py +++ b/pulsar/managers/util/retry.py @@ -27,13 +27,16 @@ class RetryActionExecutor(object): self.errback = kwds.get("errback", self.__default_errback) self.catch = kwds.get("catch", DEFAULT_CATCH) - self.description = kwds.get("description", DEFAULT_DESCRIPTION) + self.default_description = kwds.get("description", DEFAULT_DESCRIPTION) - def execute(self, action): + def execute(self, action, description=None): def on_error(exc, intervals, retries, interval=0): interval = next(intervals) if self.errback: - self.errback(exc, interval) + errback_args = [exc, interval] + if description is not None: + errback_args.append(description) + self.errback(exc, interval, description) return interval return _retry_over_time( @@ -46,10 +49,11 @@ class RetryActionExecutor(object): errback=on_error, ) - def __default_errback(self, exc, interval): + def __default_errback(self, exc, interval, description=None): + description = description or self.default_description log.info( "Failed to execute %s, retrying in %s seconds.", - self.description, + description, interval, exc_info=True ) diff --git a/test/retry_action_test.py b/test/retry_action_test.py index e8ef38b81b50af600c46a0b1a06d1e6a09eddbfa..3345a5690bc2680193592f33c4399002c4bd1351 100644 --- a/test/retry_action_test.py +++ b/test/retry_action_test.py @@ -25,7 +25,7 @@ def test_third_execution_fine(): RetryActionExecutor(max_retries=2, interval_start=.01, interval_step=.01).execute(action_tracker.execute) except Exception: exception_raised = True - assert action_tracker.count == 3 + assert action_tracker.count == 3, action_tracker.count assert not exception_raised