Loading scripts/reduce_CG1D.py +9 −9 Original line number Diff line number Diff line Loading @@ -11,21 +11,22 @@ from imars3d.backend.workflow.engine import WorkflowEngineAuto, WorkflowEngineEr # standard imports from datetime import datetime import logging from pathlib import Path from typing import Union import sys # declare the conda environment for this to run in CONDA_ENV = "imars3d-dev" ERROR_GENERAL = 1 # if more errors, they could be turned into enum.Enum ERROR_GENERAL = -1 # for things that aren't workflow SCAN_INCOMPLETE = 10 # TODO need to discus this WORKFLOW_SUCCESS: int = WorkflowEngineExitCodes.SUCCESS.value WORKFLOW_ERROR: int = WorkflowEngineExitCodes.SUCCESS.value logger = logger_autoredux.getChild("reduce_CG1D") def _validate_inputs(inputfile: Path, outputdir: Path) -> int: """This returns the number of inputs that are broken. The result is useful for a return code""" input_checking = 0 if not inputfile.is_file(): logger.error(f"'{inputfile}' is not a file") Loading Loading @@ -68,8 +69,6 @@ def main(inputfile: Union[str, Path], outputdir: Union[str, Path]) -> int: # convert the parameters to make the rest easier inputfile = Path(inputfile) outputdir = Path(outputdir) logging.info("INPUT:", inputfile) # TODO remove logging.info("OUTPUT:", outputdir) # TODO remove # verify the inputs are sensible input_checking = _validate_inputs(inputfile, outputdir) Loading @@ -79,7 +78,7 @@ def main(inputfile: Union[str, Path], outputdir: Union[str, Path]) -> int: # step 0: check if data is ready for reduction if not auto_reduction_ready(inputfile): logger.warning("Data incomplete, waiting for next try.") return 1 return SCAN_INCOMPLETE # step 1: load the template configuration file to memory try: Loading Loading @@ -112,12 +111,13 @@ def main(inputfile: Union[str, Path], outputdir: Union[str, Path]) -> int: save_config(config_dict, config_fn) # step 4: call the auto reduction with updated dict workflow = WorkflowEngineAuto(config_dict) try: workflow = WorkflowEngineAuto(config_dict) workflow.run() return WORKFLOW_SUCCESS except WorkflowEngineError: return WORKFLOW_ERROR except WorkflowEngineError as e: logger.exception("Failed to create and run workflow") return e.exit_code if __name__ == "__main__": Loading src/imars3d/backend/workflow/engine.py +20 −13 Original line number Diff line number Diff line Loading @@ -13,17 +13,24 @@ import importlib from typing import Any, Optional class WorkflowEngineExitCodes(Enum): r"""Exit codes to be used with workflow engine errors.""" SUCCESS = 0 ERROR_GENERAL = 1 ERROR_VALIDATION = 2 class WorkflowEngineError(RuntimeError): """Base class for workflow engine errors.""" pass exit_code = WorkflowEngineExitCodes.ERROR_GENERAL.value class WorkflowEngineExitCodes(Enum): r"""Exit codes to be used with workflow engine errors.""" class WorkflowValidationError(WorkflowEngineError): """Class for workflow validation errors.""" SUCCESS = 0 ERROR_GENERAL = 1 exit_code = WorkflowEngineExitCodes.ERROR_GENERAL.value class WorkflowEngine: Loading @@ -48,7 +55,7 @@ class WorkflowEngine: def validate_type(outputs): if not isinstance(outputs, (list, tuple)): raise WorkflowEngineError("Task outputs must be a list or a tuple") raise WorkflowValidationError("Task outputs must be a list or a tuple") validate_type(task_outputs) if function_outputs is not None: Loading @@ -57,7 +64,7 @@ class WorkflowEngine: # import pdb; pdb.set_trace() if len(task_outputs) != len(function_outputs): error = "Task and Function have different number of outputs" raise WorkflowEngineError(error) raise WorkflowValidationError(error) return function_outputs def __init__(self) -> None: Loading Loading @@ -152,11 +159,11 @@ class WorkflowEngineAuto(WorkflowEngine): tasks = self.config["tasks"] if len(tasks) >= 2: if tasks[0]["function"] != self.load_data_function: raise WorkflowEngineError("Incomplete Workflow: Workflow must begin with a load data task.") raise WorkflowValidationError("Incomplete Workflow: Workflow must begin with a load data task.") if tasks[len(tasks) - 1]["function"] != self.save_data_function: raise WorkflowEngineError("Incomplete Workflow: Workflow must end with a save data task") raise WorkflowValidationError("Incomplete Workflow: Workflow must end with a save data task") elif len(tasks) == 1: raise WorkflowEngineError( raise WorkflowValidationError( "Incomplete Workflow: Workflow does not contain at minimum a load task and a save task." ) Loading @@ -175,7 +182,7 @@ class WorkflowEngineAuto(WorkflowEngine): unacounted = set(task.get("inputs", set())) - set(peek.paramdict) if unacounted: pnames = ", ".join([f'"{p}"' for p in unacounted]) raise WorkflowEngineError(f"Parameter(s) {pnames} are not input parameters of {task['function']}") raise WorkflowValidationError(f"Parameter(s) {pnames} are not input parameters of {task['function']}") # assess each function parameter. Is it missing? missing = set([]) for pname, param in peek.paramdict.items(): Loading @@ -195,7 +202,7 @@ class WorkflowEngineAuto(WorkflowEngine): if pname not in registry: missing.add(pname) if missing: raise WorkflowEngineError(f"input(s) {', '.join(missing)} for task {task['name']} are missing") raise WorkflowValidationError(f"input(s) {', '.join(missing)} for task {task['name']} are missing") def _dryrun(self) -> None: r"""Verify validity of the workflow configuration. Loading @@ -205,7 +212,7 @@ class WorkflowEngineAuto(WorkflowEngine): Raises ------ WorkflowEngineError WorkflowValidationError one or more global inputs are not the output(s) of any previous task(s). """ # registry stores parameters that have already been set or computed. Initialize with metadata Loading tests/unit/backend/test_cli.py +3 −2 Original line number Diff line number Diff line Loading @@ -4,6 +4,7 @@ import pytest from json.decoder import JSONDecodeError TIFF_DIR = "tests/data/imars3d-data/HFIR/CG1D/IPTS-25777/raw/ct_scans/iron_man" TIFF_RANDOM = TIFF_DIR + "/20191030_ironman_small_0070_233_740_0405.tiff" def test_bad(JSON_DIR): Loading @@ -20,11 +21,11 @@ def test_good(JSON_DIR): @pytest.mark.datarepo def test_outputdir_not_writable(): assert main_CG1D(TIFF_DIR, "this/dir/doesnt/exist") != 0 assert main_CG1D(TIFF_RANDOM, "this/dir/doesnt/exist") == 1 def test_input_dir_doesnt_exist(): assert main_CG1D("this/dir/doesnt/exist", "/tmp/") != 0 assert main_CG1D("this/dir/doesnt/exist", "/tmp/") == 1 if __name__ == "__main__": Loading tests/unit/backend/workflow/test_engine.py +3 −3 Original line number Diff line number Diff line # package imports from imars3d.backend.workflow.engine import WorkflowEngineAuto, WorkflowEngineError from imars3d.backend.workflow.engine import WorkflowEngineAuto, WorkflowValidationError # third party imports import numpy as np Loading Loading @@ -131,7 +131,7 @@ class TestWorkflowEngineAuto: workflow = WorkflowEngineAuto(config_bad) workflow.load_data_function = f"{__name__}.load_data" workflow.save_data_function = f"{__name__}.save_data" with pytest.raises(WorkflowEngineError, match="ct for task task0 are missing"): with pytest.raises(WorkflowValidationError, match="ct for task task0 are missing"): workflow._dryrun() def test_dryrun_missing_rot(self, config): Loading @@ -141,7 +141,7 @@ class TestWorkflowEngineAuto: workflow = WorkflowEngineAuto(config_bad) workflow.load_data_function = f"{__name__}.load_data" workflow.save_data_function = f"{__name__}.save_data" with pytest.raises(WorkflowEngineError, match="rot_center for task task4 are missing"): with pytest.raises(WorkflowValidationError, match="rot_center for task task4 are missing"): workflow._dryrun() def test_run(self, config): Loading Loading
scripts/reduce_CG1D.py +9 −9 Original line number Diff line number Diff line Loading @@ -11,21 +11,22 @@ from imars3d.backend.workflow.engine import WorkflowEngineAuto, WorkflowEngineEr # standard imports from datetime import datetime import logging from pathlib import Path from typing import Union import sys # declare the conda environment for this to run in CONDA_ENV = "imars3d-dev" ERROR_GENERAL = 1 # if more errors, they could be turned into enum.Enum ERROR_GENERAL = -1 # for things that aren't workflow SCAN_INCOMPLETE = 10 # TODO need to discus this WORKFLOW_SUCCESS: int = WorkflowEngineExitCodes.SUCCESS.value WORKFLOW_ERROR: int = WorkflowEngineExitCodes.SUCCESS.value logger = logger_autoredux.getChild("reduce_CG1D") def _validate_inputs(inputfile: Path, outputdir: Path) -> int: """This returns the number of inputs that are broken. The result is useful for a return code""" input_checking = 0 if not inputfile.is_file(): logger.error(f"'{inputfile}' is not a file") Loading Loading @@ -68,8 +69,6 @@ def main(inputfile: Union[str, Path], outputdir: Union[str, Path]) -> int: # convert the parameters to make the rest easier inputfile = Path(inputfile) outputdir = Path(outputdir) logging.info("INPUT:", inputfile) # TODO remove logging.info("OUTPUT:", outputdir) # TODO remove # verify the inputs are sensible input_checking = _validate_inputs(inputfile, outputdir) Loading @@ -79,7 +78,7 @@ def main(inputfile: Union[str, Path], outputdir: Union[str, Path]) -> int: # step 0: check if data is ready for reduction if not auto_reduction_ready(inputfile): logger.warning("Data incomplete, waiting for next try.") return 1 return SCAN_INCOMPLETE # step 1: load the template configuration file to memory try: Loading Loading @@ -112,12 +111,13 @@ def main(inputfile: Union[str, Path], outputdir: Union[str, Path]) -> int: save_config(config_dict, config_fn) # step 4: call the auto reduction with updated dict workflow = WorkflowEngineAuto(config_dict) try: workflow = WorkflowEngineAuto(config_dict) workflow.run() return WORKFLOW_SUCCESS except WorkflowEngineError: return WORKFLOW_ERROR except WorkflowEngineError as e: logger.exception("Failed to create and run workflow") return e.exit_code if __name__ == "__main__": Loading
src/imars3d/backend/workflow/engine.py +20 −13 Original line number Diff line number Diff line Loading @@ -13,17 +13,24 @@ import importlib from typing import Any, Optional class WorkflowEngineExitCodes(Enum): r"""Exit codes to be used with workflow engine errors.""" SUCCESS = 0 ERROR_GENERAL = 1 ERROR_VALIDATION = 2 class WorkflowEngineError(RuntimeError): """Base class for workflow engine errors.""" pass exit_code = WorkflowEngineExitCodes.ERROR_GENERAL.value class WorkflowEngineExitCodes(Enum): r"""Exit codes to be used with workflow engine errors.""" class WorkflowValidationError(WorkflowEngineError): """Class for workflow validation errors.""" SUCCESS = 0 ERROR_GENERAL = 1 exit_code = WorkflowEngineExitCodes.ERROR_GENERAL.value class WorkflowEngine: Loading @@ -48,7 +55,7 @@ class WorkflowEngine: def validate_type(outputs): if not isinstance(outputs, (list, tuple)): raise WorkflowEngineError("Task outputs must be a list or a tuple") raise WorkflowValidationError("Task outputs must be a list or a tuple") validate_type(task_outputs) if function_outputs is not None: Loading @@ -57,7 +64,7 @@ class WorkflowEngine: # import pdb; pdb.set_trace() if len(task_outputs) != len(function_outputs): error = "Task and Function have different number of outputs" raise WorkflowEngineError(error) raise WorkflowValidationError(error) return function_outputs def __init__(self) -> None: Loading Loading @@ -152,11 +159,11 @@ class WorkflowEngineAuto(WorkflowEngine): tasks = self.config["tasks"] if len(tasks) >= 2: if tasks[0]["function"] != self.load_data_function: raise WorkflowEngineError("Incomplete Workflow: Workflow must begin with a load data task.") raise WorkflowValidationError("Incomplete Workflow: Workflow must begin with a load data task.") if tasks[len(tasks) - 1]["function"] != self.save_data_function: raise WorkflowEngineError("Incomplete Workflow: Workflow must end with a save data task") raise WorkflowValidationError("Incomplete Workflow: Workflow must end with a save data task") elif len(tasks) == 1: raise WorkflowEngineError( raise WorkflowValidationError( "Incomplete Workflow: Workflow does not contain at minimum a load task and a save task." ) Loading @@ -175,7 +182,7 @@ class WorkflowEngineAuto(WorkflowEngine): unacounted = set(task.get("inputs", set())) - set(peek.paramdict) if unacounted: pnames = ", ".join([f'"{p}"' for p in unacounted]) raise WorkflowEngineError(f"Parameter(s) {pnames} are not input parameters of {task['function']}") raise WorkflowValidationError(f"Parameter(s) {pnames} are not input parameters of {task['function']}") # assess each function parameter. Is it missing? missing = set([]) for pname, param in peek.paramdict.items(): Loading @@ -195,7 +202,7 @@ class WorkflowEngineAuto(WorkflowEngine): if pname not in registry: missing.add(pname) if missing: raise WorkflowEngineError(f"input(s) {', '.join(missing)} for task {task['name']} are missing") raise WorkflowValidationError(f"input(s) {', '.join(missing)} for task {task['name']} are missing") def _dryrun(self) -> None: r"""Verify validity of the workflow configuration. Loading @@ -205,7 +212,7 @@ class WorkflowEngineAuto(WorkflowEngine): Raises ------ WorkflowEngineError WorkflowValidationError one or more global inputs are not the output(s) of any previous task(s). """ # registry stores parameters that have already been set or computed. Initialize with metadata Loading
tests/unit/backend/test_cli.py +3 −2 Original line number Diff line number Diff line Loading @@ -4,6 +4,7 @@ import pytest from json.decoder import JSONDecodeError TIFF_DIR = "tests/data/imars3d-data/HFIR/CG1D/IPTS-25777/raw/ct_scans/iron_man" TIFF_RANDOM = TIFF_DIR + "/20191030_ironman_small_0070_233_740_0405.tiff" def test_bad(JSON_DIR): Loading @@ -20,11 +21,11 @@ def test_good(JSON_DIR): @pytest.mark.datarepo def test_outputdir_not_writable(): assert main_CG1D(TIFF_DIR, "this/dir/doesnt/exist") != 0 assert main_CG1D(TIFF_RANDOM, "this/dir/doesnt/exist") == 1 def test_input_dir_doesnt_exist(): assert main_CG1D("this/dir/doesnt/exist", "/tmp/") != 0 assert main_CG1D("this/dir/doesnt/exist", "/tmp/") == 1 if __name__ == "__main__": Loading
tests/unit/backend/workflow/test_engine.py +3 −3 Original line number Diff line number Diff line # package imports from imars3d.backend.workflow.engine import WorkflowEngineAuto, WorkflowEngineError from imars3d.backend.workflow.engine import WorkflowEngineAuto, WorkflowValidationError # third party imports import numpy as np Loading Loading @@ -131,7 +131,7 @@ class TestWorkflowEngineAuto: workflow = WorkflowEngineAuto(config_bad) workflow.load_data_function = f"{__name__}.load_data" workflow.save_data_function = f"{__name__}.save_data" with pytest.raises(WorkflowEngineError, match="ct for task task0 are missing"): with pytest.raises(WorkflowValidationError, match="ct for task task0 are missing"): workflow._dryrun() def test_dryrun_missing_rot(self, config): Loading @@ -141,7 +141,7 @@ class TestWorkflowEngineAuto: workflow = WorkflowEngineAuto(config_bad) workflow.load_data_function = f"{__name__}.load_data" workflow.save_data_function = f"{__name__}.save_data" with pytest.raises(WorkflowEngineError, match="rot_center for task task4 are missing"): with pytest.raises(WorkflowValidationError, match="rot_center for task task4 are missing"): workflow._dryrun() def test_run(self, config): Loading