Loading scripts/reduce_CG1D.py +4 −8 Original line number Diff line number Diff line Loading @@ -11,16 +11,14 @@ from imars3d.backend.workflow.engine import WorkflowEngineAuto, WorkflowEngineEr # standard imports from datetime import datetime import logging from pathlib import Path from typing import Union import sys # declare the conda environment for this to run in CONDA_ENV = "imars3d-dev" ERROR_GENERAL = 1 # if more errors, they could be turned into enum.Enum ERROR_GENERAL = -1 # for things that aren't workflow WORKFLOW_SUCCESS: int = WorkflowEngineExitCodes.SUCCESS.value WORKFLOW_ERROR: int = WorkflowEngineExitCodes.ERROR_GENERAL.value logger = logger_autoredux.getChild("reduce_CG1D") Loading Loading @@ -68,8 +66,6 @@ def main(inputfile: Union[str, Path], outputdir: Union[str, Path]) -> int: # convert the parameters to make the rest easier inputfile = Path(inputfile) outputdir = Path(outputdir) logging.info("INPUT:", inputfile) # TODO remove logging.info("OUTPUT:", outputdir) # TODO remove # verify the inputs are sensible input_checking = _validate_inputs(inputfile, outputdir) Loading @@ -79,7 +75,7 @@ def main(inputfile: Union[str, Path], outputdir: Union[str, Path]) -> int: # step 0: check if data is ready for reduction if not auto_reduction_ready(inputfile): logger.warning("Data incomplete, waiting for next try.") return 1 return ERROR_GENERAL # TODO need to discus this # step 1: load the template configuration file to memory try: Loading Loading @@ -116,9 +112,9 @@ def main(inputfile: Union[str, Path], outputdir: Union[str, Path]) -> int: workflow = WorkflowEngineAuto(config_dict) workflow.run() return WORKFLOW_SUCCESS except WorkflowEngineError: except WorkflowEngineError as e: logger.exception("Failed to create and run workflow") return WORKFLOW_ERROR return e.exit_code if __name__ == "__main__": Loading src/imars3d/backend/workflow/engine.py +20 −13 Original line number Diff line number Diff line Loading @@ -13,17 +13,24 @@ import importlib from typing import Any, Optional class WorkflowEngineExitCodes(Enum): r"""Exit codes to be used with workflow engine errors.""" SUCCESS = 0 ERROR_GENERAL = 1 ERROR_VALIDATION = 2 class WorkflowEngineError(RuntimeError): """Base class for workflow engine errors.""" pass exit_code = WorkflowEngineExitCodes.ERROR_GENERAL.value class WorkflowEngineExitCodes(Enum): r"""Exit codes to be used with workflow engine errors.""" class WorkflowValidationError(WorkflowEngineError): """Class for workflow validation errors.""" SUCCESS = 0 ERROR_GENERAL = 1 exit_code = WorkflowEngineExitCodes.ERROR_GENERAL.value class WorkflowEngine: Loading @@ -48,7 +55,7 @@ class WorkflowEngine: def validate_type(outputs): if not isinstance(outputs, (list, tuple)): raise WorkflowEngineError("Task outputs must be a list or a tuple") raise WorkflowValidationError("Task outputs must be a list or a tuple") validate_type(task_outputs) if function_outputs is not None: Loading @@ -57,7 +64,7 @@ class WorkflowEngine: # import pdb; pdb.set_trace() if len(task_outputs) != len(function_outputs): error = "Task and Function have different number of outputs" raise WorkflowEngineError(error) raise WorkflowValidationError(error) return function_outputs def __init__(self) -> None: Loading Loading @@ -152,11 +159,11 @@ class WorkflowEngineAuto(WorkflowEngine): tasks = self.config["tasks"] if len(tasks) >= 2: if tasks[0]["function"] != self.load_data_function: raise WorkflowEngineError("Incomplete Workflow: Workflow must begin with a load data task.") raise WorkflowValidationError("Incomplete Workflow: Workflow must begin with a load data task.") if tasks[len(tasks) - 1]["function"] != self.save_data_function: raise WorkflowEngineError("Incomplete Workflow: Workflow must end with a save data task") raise WorkflowValidationError("Incomplete Workflow: Workflow must end with a save data task") elif len(tasks) == 1: raise WorkflowEngineError( raise WorkflowValidationError( "Incomplete Workflow: Workflow does not contain at minimum a load task and a save task." ) Loading @@ -175,7 +182,7 @@ class WorkflowEngineAuto(WorkflowEngine): unacounted = set(task.get("inputs", set())) - set(peek.paramdict) if unacounted: pnames = ", ".join([f'"{p}"' for p in unacounted]) raise WorkflowEngineError(f"Parameter(s) {pnames} are not input parameters of {task['function']}") raise WorkflowValidationError(f"Parameter(s) {pnames} are not input parameters of {task['function']}") # assess each function parameter. Is it missing? missing = set([]) for pname, param in peek.paramdict.items(): Loading @@ -195,7 +202,7 @@ class WorkflowEngineAuto(WorkflowEngine): if pname not in registry: missing.add(pname) if missing: raise WorkflowEngineError(f"input(s) {', '.join(missing)} for task {task['name']} are missing") raise WorkflowValidationError(f"input(s) {', '.join(missing)} for task {task['name']} are missing") def _dryrun(self) -> None: r"""Verify validity of the workflow configuration. Loading @@ -205,7 +212,7 @@ class WorkflowEngineAuto(WorkflowEngine): Raises ------ WorkflowEngineError WorkflowValidationError one or more global inputs are not the output(s) of any previous task(s). """ # registry stores parameters that have already been set or computed. Initialize with metadata Loading tests/unit/backend/workflow/test_engine.py +3 −3 Original line number Diff line number Diff line # package imports from imars3d.backend.workflow.engine import WorkflowEngineAuto, WorkflowEngineError from imars3d.backend.workflow.engine import WorkflowEngineAuto, WorkflowValidationError # third party imports import numpy as np Loading Loading @@ -131,7 +131,7 @@ class TestWorkflowEngineAuto: workflow = WorkflowEngineAuto(config_bad) workflow.load_data_function = f"{__name__}.load_data" workflow.save_data_function = f"{__name__}.save_data" with pytest.raises(WorkflowEngineError, match="ct for task task0 are missing"): with pytest.raises(WorkflowValidationError, match="ct for task task0 are missing"): workflow._dryrun() def test_dryrun_missing_rot(self, config): Loading @@ -141,7 +141,7 @@ class TestWorkflowEngineAuto: workflow = WorkflowEngineAuto(config_bad) workflow.load_data_function = f"{__name__}.load_data" workflow.save_data_function = f"{__name__}.save_data" with pytest.raises(WorkflowEngineError, match="rot_center for task task4 are missing"): with pytest.raises(WorkflowValidationError, match="rot_center for task task4 are missing"): workflow._dryrun() def test_run(self, config): Loading Loading
scripts/reduce_CG1D.py +4 −8 Original line number Diff line number Diff line Loading @@ -11,16 +11,14 @@ from imars3d.backend.workflow.engine import WorkflowEngineAuto, WorkflowEngineEr # standard imports from datetime import datetime import logging from pathlib import Path from typing import Union import sys # declare the conda environment for this to run in CONDA_ENV = "imars3d-dev" ERROR_GENERAL = 1 # if more errors, they could be turned into enum.Enum ERROR_GENERAL = -1 # for things that aren't workflow WORKFLOW_SUCCESS: int = WorkflowEngineExitCodes.SUCCESS.value WORKFLOW_ERROR: int = WorkflowEngineExitCodes.ERROR_GENERAL.value logger = logger_autoredux.getChild("reduce_CG1D") Loading Loading @@ -68,8 +66,6 @@ def main(inputfile: Union[str, Path], outputdir: Union[str, Path]) -> int: # convert the parameters to make the rest easier inputfile = Path(inputfile) outputdir = Path(outputdir) logging.info("INPUT:", inputfile) # TODO remove logging.info("OUTPUT:", outputdir) # TODO remove # verify the inputs are sensible input_checking = _validate_inputs(inputfile, outputdir) Loading @@ -79,7 +75,7 @@ def main(inputfile: Union[str, Path], outputdir: Union[str, Path]) -> int: # step 0: check if data is ready for reduction if not auto_reduction_ready(inputfile): logger.warning("Data incomplete, waiting for next try.") return 1 return ERROR_GENERAL # TODO need to discus this # step 1: load the template configuration file to memory try: Loading Loading @@ -116,9 +112,9 @@ def main(inputfile: Union[str, Path], outputdir: Union[str, Path]) -> int: workflow = WorkflowEngineAuto(config_dict) workflow.run() return WORKFLOW_SUCCESS except WorkflowEngineError: except WorkflowEngineError as e: logger.exception("Failed to create and run workflow") return WORKFLOW_ERROR return e.exit_code if __name__ == "__main__": Loading
src/imars3d/backend/workflow/engine.py +20 −13 Original line number Diff line number Diff line Loading @@ -13,17 +13,24 @@ import importlib from typing import Any, Optional class WorkflowEngineExitCodes(Enum): r"""Exit codes to be used with workflow engine errors.""" SUCCESS = 0 ERROR_GENERAL = 1 ERROR_VALIDATION = 2 class WorkflowEngineError(RuntimeError): """Base class for workflow engine errors.""" pass exit_code = WorkflowEngineExitCodes.ERROR_GENERAL.value class WorkflowEngineExitCodes(Enum): r"""Exit codes to be used with workflow engine errors.""" class WorkflowValidationError(WorkflowEngineError): """Class for workflow validation errors.""" SUCCESS = 0 ERROR_GENERAL = 1 exit_code = WorkflowEngineExitCodes.ERROR_GENERAL.value class WorkflowEngine: Loading @@ -48,7 +55,7 @@ class WorkflowEngine: def validate_type(outputs): if not isinstance(outputs, (list, tuple)): raise WorkflowEngineError("Task outputs must be a list or a tuple") raise WorkflowValidationError("Task outputs must be a list or a tuple") validate_type(task_outputs) if function_outputs is not None: Loading @@ -57,7 +64,7 @@ class WorkflowEngine: # import pdb; pdb.set_trace() if len(task_outputs) != len(function_outputs): error = "Task and Function have different number of outputs" raise WorkflowEngineError(error) raise WorkflowValidationError(error) return function_outputs def __init__(self) -> None: Loading Loading @@ -152,11 +159,11 @@ class WorkflowEngineAuto(WorkflowEngine): tasks = self.config["tasks"] if len(tasks) >= 2: if tasks[0]["function"] != self.load_data_function: raise WorkflowEngineError("Incomplete Workflow: Workflow must begin with a load data task.") raise WorkflowValidationError("Incomplete Workflow: Workflow must begin with a load data task.") if tasks[len(tasks) - 1]["function"] != self.save_data_function: raise WorkflowEngineError("Incomplete Workflow: Workflow must end with a save data task") raise WorkflowValidationError("Incomplete Workflow: Workflow must end with a save data task") elif len(tasks) == 1: raise WorkflowEngineError( raise WorkflowValidationError( "Incomplete Workflow: Workflow does not contain at minimum a load task and a save task." ) Loading @@ -175,7 +182,7 @@ class WorkflowEngineAuto(WorkflowEngine): unacounted = set(task.get("inputs", set())) - set(peek.paramdict) if unacounted: pnames = ", ".join([f'"{p}"' for p in unacounted]) raise WorkflowEngineError(f"Parameter(s) {pnames} are not input parameters of {task['function']}") raise WorkflowValidationError(f"Parameter(s) {pnames} are not input parameters of {task['function']}") # assess each function parameter. Is it missing? missing = set([]) for pname, param in peek.paramdict.items(): Loading @@ -195,7 +202,7 @@ class WorkflowEngineAuto(WorkflowEngine): if pname not in registry: missing.add(pname) if missing: raise WorkflowEngineError(f"input(s) {', '.join(missing)} for task {task['name']} are missing") raise WorkflowValidationError(f"input(s) {', '.join(missing)} for task {task['name']} are missing") def _dryrun(self) -> None: r"""Verify validity of the workflow configuration. Loading @@ -205,7 +212,7 @@ class WorkflowEngineAuto(WorkflowEngine): Raises ------ WorkflowEngineError WorkflowValidationError one or more global inputs are not the output(s) of any previous task(s). """ # registry stores parameters that have already been set or computed. Initialize with metadata Loading
tests/unit/backend/workflow/test_engine.py +3 −3 Original line number Diff line number Diff line # package imports from imars3d.backend.workflow.engine import WorkflowEngineAuto, WorkflowEngineError from imars3d.backend.workflow.engine import WorkflowEngineAuto, WorkflowValidationError # third party imports import numpy as np Loading Loading @@ -131,7 +131,7 @@ class TestWorkflowEngineAuto: workflow = WorkflowEngineAuto(config_bad) workflow.load_data_function = f"{__name__}.load_data" workflow.save_data_function = f"{__name__}.save_data" with pytest.raises(WorkflowEngineError, match="ct for task task0 are missing"): with pytest.raises(WorkflowValidationError, match="ct for task task0 are missing"): workflow._dryrun() def test_dryrun_missing_rot(self, config): Loading @@ -141,7 +141,7 @@ class TestWorkflowEngineAuto: workflow = WorkflowEngineAuto(config_bad) workflow.load_data_function = f"{__name__}.load_data" workflow.save_data_function = f"{__name__}.save_data" with pytest.raises(WorkflowEngineError, match="rot_center for task task4 are missing"): with pytest.raises(WorkflowValidationError, match="rot_center for task task4 are missing"): workflow._dryrun() def test_run(self, config): Loading