Loading scripts/reduce_CG1D.py +31 −14 Original line number Diff line number Diff line Loading @@ -8,10 +8,12 @@ from imars3d.backend import extract_info_from_path from imars3d.backend import substitute_template from imars3d.backend.autoredux import logger as logger_autoredux from imars3d.backend.workflow.engine import WorkflowEngineAuto, WorkflowEngineError, WorkflowEngineExitCodes from imars3d.backend.util.functions import to_time_str # standard imports from datetime import datetime import logging from pathlib import Path import shutil from typing import Union import sys Loading Loading @@ -70,54 +72,69 @@ def main(inputfile: Union[str, Path], outputdir: Union[str, Path]) -> int: inputfile = Path(inputfile) outputdir = Path(outputdir) time_str = to_time_str() # date stamp for log and configuration files # create log file to capture the root logger, in order to also capture messages from the backend log_fn = outputdir / f"reduce_CG1D_{time_str}.log" log_fh = logging.FileHandler(log_fn) log_fh.setLevel(logging.INFO) logging.getLogger().addHandler(log_fh) # verify the inputs are sensible input_checking = _validate_inputs(inputfile, outputdir) if input_checking > 0: return input_checking # step 0: check if data is ready for reduction # check if data is ready for reduction if not auto_reduction_ready(inputfile): logger.warning("Data incomplete, waiting for next try.") return SCAN_INCOMPLETE # step 1: load the template configuration file to memory # load the template configuration file to memory try: config_path = _find_template_config() config_dict = load_template_config(config_path) except FileNotFoundError as e: logger.error(str(e)) except FileNotFoundError: logger.exception("Unable to load the template configuration") return ERROR_GENERAL # step 2: extract info from inputfile # extract info from inputfile update_dict = extract_info_from_path(str(inputfile)) assert update_dict["instrument"] == "CG1D", "Instrument is not CG1D" update_dict["outputdir"] = str(outputdir) update_dict["workingdir"] = str(outputdir) # step 3: update the dict and save dict to disk # update the dict and save dict to disk try: config_dict = substitute_template(config_dict, update_dict) except Exception as e: logger.error(str(e)) except Exception: logger.exception("Unable to update the template configuration") return ERROR_GENERAL # save config file to working directory # NOTE: # i.e. ironman_20221108_154015.json exp_name = config_dict["name"].replace(" ", "_") now = datetime.now() time_str = now.strftime("%Y%m%d_%H%M%S") config_fn = outputdir / f"{exp_name}_{time_str}.json" save_config(config_dict, config_fn) # step 4: call the auto reduction with updated dict # call the auto reduction with updated dict try: workflow = WorkflowEngineAuto(config_dict) workflow.run() return WORKFLOW_SUCCESS exit_code = WORKFLOW_SUCCESS except WorkflowEngineError as e: logger.exception("Failed to create and run workflow") return e.exit_code exit_code = e.exit_code # move files to image directory if auto-reduction is successful logging.shutdown() # flushing and closing all handlers target_dir = workflow.registry["save_dir"] if exit_code == WORKFLOW_SUCCESS: shutil.move(config_fn, target_dir) shutil.move(log_fn, target_dir) return exit_code if __name__ == "__main__": Loading scripts/reduce_CG1D_config_template.json +1 −1 Original line number Diff line number Diff line Loading @@ -110,7 +110,7 @@ "filename": "test", "outputdir" : "outputdir" }, "outputs": [] "outputs": ["save_dir"] } ] } src/imars3d/backend/dataio/data.py +15 −26 Original line number Diff line number Diff line #!/usr/bin/env python3 """Data handling for iMars3D.""" import re from datetime import datetime import logging import param from imars3d.backend.util.functions import clamp_max_workers # package imports from imars3d.backend.dataio.metadata import MetaData from imars3d.backend.util.functions import clamp_max_workers, to_time_str # third party imports import numpy as np import param import tifffile from tqdm.contrib.concurrent import process_map # standard imports from functools import partial from pathlib import Path from fnmatch import fnmatchcase import logging from pathlib import Path import re from typing import Optional, Tuple, List, Callable from tqdm.contrib.concurrent import process_map from imars3d.backend.dataio.metadata import MetaData # ignore warnings generated by importing dxchange import warnings Loading Loading @@ -475,22 +480,6 @@ def _extract_rotation_angles( return rotation_angles def _to_time_str(value: datetime) -> str: """ Convert the supplied datetime to a formatted string. Parameters ---------- value: datetime object to format correctly Returns ------- The datetime as YYYYMMDDhhmm """ return value.strftime("%Y%m%d%H%M") def _save_data(filename: Path, data: np.ndarray, rot_angles: np.ndarray = None) -> None: if data is None: raise ValueError("Failed to supply data") Loading Loading @@ -549,7 +538,7 @@ class save_data(param.ParameterizedFunction): if params.data is None: raise ValueError("Did not supply data") save_dir = Path(params.outputbase) / f"{params.name}_{_to_time_str(datetime.now())}" save_dir = Path(params.outputbase) / f"{params.name}_{to_time_str()}" # save the data as tiffs _save_data(filename=save_dir / params.name, data=params.data, rot_angles=params.rot_angles) Loading Loading @@ -597,7 +586,7 @@ class save_checkpoint(param.ParameterizedFunction): # sanitize arguments params = param.ParamOverrides(self, params) save_dir = params.outputbase / f"{params.name}_chkpt_{_to_time_str(datetime.now())}" save_dir = params.outputbase / f"{params.name}_chkpt_{to_time_str()}" # save the data as tiffs _save_data(filename=save_dir / params.name, data=params.data, rot_angles=params.rot_angles) Loading src/imars3d/backend/util/functions.py +19 −1 Original line number Diff line number Diff line #!/usr/bin/env python3 """Util for imars3d.""" import resource # standard imports from datetime import datetime import multiprocessing import resource from typing import Union Loading @@ -14,3 +16,19 @@ def clamp_max_workers(max_workers: Union[int, None]) -> int: if max_workers is None: max_workers = 0 return min(resource.RLIMIT_NPROC, max(1, multiprocessing.cpu_count() - 2)) if max_workers <= 0 else max_workers def to_time_str(value: datetime = datetime.now()) -> str: """ Convert the supplied datetime to a formatted string. Parameters ---------- value: datetime object to format correctly Returns ------- The datetime as YYYYMMDDhhmm """ return value.strftime("%Y%m%d%H%M") src/imars3d/backend/workflow/engine.py +5 −0 Original line number Diff line number Diff line Loading @@ -70,6 +70,11 @@ class WorkflowEngine: def __init__(self) -> None: self._registry: Optional[dict] = None # will store set or computed parameters @property def registry(self): r"""Read only registry.""" return self._registry def _instrospect_task_function(self, function_str: str) -> namedtuple: r"""Obtain information from the function associated to one task in the workflow. Loading Loading
scripts/reduce_CG1D.py +31 −14 Original line number Diff line number Diff line Loading @@ -8,10 +8,12 @@ from imars3d.backend import extract_info_from_path from imars3d.backend import substitute_template from imars3d.backend.autoredux import logger as logger_autoredux from imars3d.backend.workflow.engine import WorkflowEngineAuto, WorkflowEngineError, WorkflowEngineExitCodes from imars3d.backend.util.functions import to_time_str # standard imports from datetime import datetime import logging from pathlib import Path import shutil from typing import Union import sys Loading Loading @@ -70,54 +72,69 @@ def main(inputfile: Union[str, Path], outputdir: Union[str, Path]) -> int: inputfile = Path(inputfile) outputdir = Path(outputdir) time_str = to_time_str() # date stamp for log and configuration files # create log file to capture the root logger, in order to also capture messages from the backend log_fn = outputdir / f"reduce_CG1D_{time_str}.log" log_fh = logging.FileHandler(log_fn) log_fh.setLevel(logging.INFO) logging.getLogger().addHandler(log_fh) # verify the inputs are sensible input_checking = _validate_inputs(inputfile, outputdir) if input_checking > 0: return input_checking # step 0: check if data is ready for reduction # check if data is ready for reduction if not auto_reduction_ready(inputfile): logger.warning("Data incomplete, waiting for next try.") return SCAN_INCOMPLETE # step 1: load the template configuration file to memory # load the template configuration file to memory try: config_path = _find_template_config() config_dict = load_template_config(config_path) except FileNotFoundError as e: logger.error(str(e)) except FileNotFoundError: logger.exception("Unable to load the template configuration") return ERROR_GENERAL # step 2: extract info from inputfile # extract info from inputfile update_dict = extract_info_from_path(str(inputfile)) assert update_dict["instrument"] == "CG1D", "Instrument is not CG1D" update_dict["outputdir"] = str(outputdir) update_dict["workingdir"] = str(outputdir) # step 3: update the dict and save dict to disk # update the dict and save dict to disk try: config_dict = substitute_template(config_dict, update_dict) except Exception as e: logger.error(str(e)) except Exception: logger.exception("Unable to update the template configuration") return ERROR_GENERAL # save config file to working directory # NOTE: # i.e. ironman_20221108_154015.json exp_name = config_dict["name"].replace(" ", "_") now = datetime.now() time_str = now.strftime("%Y%m%d_%H%M%S") config_fn = outputdir / f"{exp_name}_{time_str}.json" save_config(config_dict, config_fn) # step 4: call the auto reduction with updated dict # call the auto reduction with updated dict try: workflow = WorkflowEngineAuto(config_dict) workflow.run() return WORKFLOW_SUCCESS exit_code = WORKFLOW_SUCCESS except WorkflowEngineError as e: logger.exception("Failed to create and run workflow") return e.exit_code exit_code = e.exit_code # move files to image directory if auto-reduction is successful logging.shutdown() # flushing and closing all handlers target_dir = workflow.registry["save_dir"] if exit_code == WORKFLOW_SUCCESS: shutil.move(config_fn, target_dir) shutil.move(log_fn, target_dir) return exit_code if __name__ == "__main__": Loading
scripts/reduce_CG1D_config_template.json +1 −1 Original line number Diff line number Diff line Loading @@ -110,7 +110,7 @@ "filename": "test", "outputdir" : "outputdir" }, "outputs": [] "outputs": ["save_dir"] } ] }
src/imars3d/backend/dataio/data.py +15 −26 Original line number Diff line number Diff line #!/usr/bin/env python3 """Data handling for iMars3D.""" import re from datetime import datetime import logging import param from imars3d.backend.util.functions import clamp_max_workers # package imports from imars3d.backend.dataio.metadata import MetaData from imars3d.backend.util.functions import clamp_max_workers, to_time_str # third party imports import numpy as np import param import tifffile from tqdm.contrib.concurrent import process_map # standard imports from functools import partial from pathlib import Path from fnmatch import fnmatchcase import logging from pathlib import Path import re from typing import Optional, Tuple, List, Callable from tqdm.contrib.concurrent import process_map from imars3d.backend.dataio.metadata import MetaData # ignore warnings generated by importing dxchange import warnings Loading Loading @@ -475,22 +480,6 @@ def _extract_rotation_angles( return rotation_angles def _to_time_str(value: datetime) -> str: """ Convert the supplied datetime to a formatted string. Parameters ---------- value: datetime object to format correctly Returns ------- The datetime as YYYYMMDDhhmm """ return value.strftime("%Y%m%d%H%M") def _save_data(filename: Path, data: np.ndarray, rot_angles: np.ndarray = None) -> None: if data is None: raise ValueError("Failed to supply data") Loading Loading @@ -549,7 +538,7 @@ class save_data(param.ParameterizedFunction): if params.data is None: raise ValueError("Did not supply data") save_dir = Path(params.outputbase) / f"{params.name}_{_to_time_str(datetime.now())}" save_dir = Path(params.outputbase) / f"{params.name}_{to_time_str()}" # save the data as tiffs _save_data(filename=save_dir / params.name, data=params.data, rot_angles=params.rot_angles) Loading Loading @@ -597,7 +586,7 @@ class save_checkpoint(param.ParameterizedFunction): # sanitize arguments params = param.ParamOverrides(self, params) save_dir = params.outputbase / f"{params.name}_chkpt_{_to_time_str(datetime.now())}" save_dir = params.outputbase / f"{params.name}_chkpt_{to_time_str()}" # save the data as tiffs _save_data(filename=save_dir / params.name, data=params.data, rot_angles=params.rot_angles) Loading
src/imars3d/backend/util/functions.py +19 −1 Original line number Diff line number Diff line #!/usr/bin/env python3 """Util for imars3d.""" import resource # standard imports from datetime import datetime import multiprocessing import resource from typing import Union Loading @@ -14,3 +16,19 @@ def clamp_max_workers(max_workers: Union[int, None]) -> int: if max_workers is None: max_workers = 0 return min(resource.RLIMIT_NPROC, max(1, multiprocessing.cpu_count() - 2)) if max_workers <= 0 else max_workers def to_time_str(value: datetime = datetime.now()) -> str: """ Convert the supplied datetime to a formatted string. Parameters ---------- value: datetime object to format correctly Returns ------- The datetime as YYYYMMDDhhmm """ return value.strftime("%Y%m%d%H%M")
src/imars3d/backend/workflow/engine.py +5 −0 Original line number Diff line number Diff line Loading @@ -70,6 +70,11 @@ class WorkflowEngine: def __init__(self) -> None: self._registry: Optional[dict] = None # will store set or computed parameters @property def registry(self): r"""Read only registry.""" return self._registry def _instrospect_task_function(self, function_str: str) -> namedtuple: r"""Obtain information from the function associated to one task in the workflow. Loading