Skip to content
Snippets Groups Projects
model.py 10.9 KiB
Newer Older
# Mantid Repository : https://github.com/mantidproject/mantid
#
# Copyright © 2019 ISIS Rutherford Appleton Laboratory UKRI,
#   NScD Oak Ridge National Laboratory, European Spallation Source,
#   Institut Laue - Langevin & CSNS, Institute of High Energy Physics, CAS
# SPDX - License - Identifier: GPL - 3.0 +
import csv
from os import path, makedirs
from matplotlib import gridspec
import matplotlib.pyplot as plt

from Engineering.gui.engineering_diffraction.tabs.common import vanadium_corrections, path_handling
from Engineering.gui.engineering_diffraction.settings.settings_helper import get_setting
from mantid.simpleapi import EnggFocus, logger, AnalysisDataService as Ads, SaveNexus, SaveGSS, SaveFocusedXYE, \
    LoadAscii

SAMPLE_RUN_WORKSPACE_NAME = "engggui_focusing_input_ws"
FOCUSED_OUTPUT_WORKSPACE_NAME = "engggui_focusing_output_ws_bank_"


class FocusModel(object):
    def focus_run(self, sample_paths, banks, plot_output, instrument, rb_num, spectrum_numbers):
        """
        Focus some data using the current calibration.
        :param sample_paths: The paths to the data to be focused.
        :param banks: The banks that should be focused.
        :param plot_output: True if the output should be plotted.
        :param instrument: The instrument that the data came from.
        :param rb_num: The experiment number, used to create directories. Can be None
        :param spectrum_numbers: The specific spectra that should be focused. Used instead of banks.
        if not Ads.doesExist(vanadium_corrections.INTEGRATED_WORKSPACE_NAME) and not Ads.doesExist(
                vanadium_corrections.CURVES_WORKSPACE_NAME):
        integration_workspace = Ads.retrieve(vanadium_corrections.INTEGRATED_WORKSPACE_NAME)
        curves_workspace = Ads.retrieve(vanadium_corrections.CURVES_WORKSPACE_NAME)
        output_workspaces = []  # List of collated workspaces to plot.
        full_calib_path = get_setting(path_handling.INTERFACES_SETTINGS_GROUP,
                                      path_handling.ENGINEERING_PREFIX, "full_calibration")
        if full_calib_path is not None and path.exists(full_calib_path):
            full_calib_workspace = LoadAscii(full_calib_path, OutputWorkspace="det_pos", Separator="Tab")
        else:
            full_calib_workspace = None
        if spectrum_numbers is None:
            for sample_path in sample_paths:
                sample_workspace = path_handling.load_workspace(sample_path)
                run_no = path_handling.get_run_number_from_path(sample_path, instrument)
                workspaces_for_run = []
                for name in banks:
                    output_workspace_name = str(run_no) + "_" + FOCUSED_OUTPUT_WORKSPACE_NAME + str(name)
                    self._run_focus(sample_workspace, output_workspace_name, integration_workspace,
                                    curves_workspace, name, full_calib_workspace)
                    workspaces_for_run.append(output_workspace_name)
                    # Save the output to the file system.
                    self._save_output(instrument, sample_path, name, output_workspace_name, rb_num)
                output_workspaces.append(workspaces_for_run)
                self._output_sample_logs(instrument, run_no, sample_workspace, rb_num)
            for sample_path in sample_paths:
                sample_workspace = path_handling.load_workspace(sample_path)
                run_no = path_handling.get_run_number_from_path(sample_path, instrument)
                output_workspace_name = str(run_no) + "_" + FOCUSED_OUTPUT_WORKSPACE_NAME + "cropped"
                self._run_focus(sample_workspace, output_workspace_name, integration_workspace,
                                curves_workspace, None, full_calib_workspace, spectrum_numbers)
                output_workspaces.append([output_workspace_name])
                self._save_output(instrument, sample_path, "cropped", output_workspace_name, rb_num)
                self._output_sample_logs(instrument, run_no, sample_workspace, rb_num)
        # Plot the output
        if plot_output:
            for ws_names in output_workspaces:
                self._plot_focused_workspaces(ws_names)
    def _run_focus(input_workspace,
                   output_workspace,
                   vanadium_integration_ws,
                   vanadium_curves_ws,
                   bank,
                   full_calib_ws=None,
                   spectrum_numbers=None):
        kwargs = {
            "InputWorkspace": input_workspace,
            "OutputWorkspace": output_workspace,
            "VanIntegrationWorkspace": vanadium_integration_ws,
            "VanCurvesWorkspace": vanadium_curves_ws
        }
        if full_calib_ws is not None:
            kwargs["DetectorPositions"] = full_calib_ws

        if bank is None:
            kwargs["SpectrumNumbers"] = spectrum_numbers
        else:
            kwargs["Bank"] = bank

            return EnggFocus(**kwargs)
        except RuntimeError as e:
            logger.error(
                "Error in focusing, Could not run the EnggFocus algorithm successfully for bank "
                + str(bank) + ". Error Description: " + str(e))
            raise RuntimeError()

    def _plot_focused_workspaces(focused_workspaces):
        fig = plt.figure()
        gs = gridspec.GridSpec(1, len(focused_workspaces))
Conor Finn's avatar
Conor Finn committed
        plots = [
            fig.add_subplot(gs[i], projection="mantid") for i in range(len(focused_workspaces))
        ]
        for ax, ws_name in zip(plots, focused_workspaces):
            ax.plot(Ads.retrieve(ws_name), wkspIndex=0)
            ax.set_title(ws_name)
        fig.show()

    def _save_output(self, instrument, sample_path, bank, sample_workspace, rb_num):
        Save a focused workspace to the file system. Saves separate copies to a User directory if an rb number has
        been set.
        :param instrument: The instrument the data is from.
        :param sample_path: The path to the data file that was focused.
        :param bank: The name of the bank being saved.
        :param sample_workspace: The name of the workspace to be saved.
        :param rb_num: Usually an experiment id, defines the name of the user directory.
        self._save_focused_output_files_as_nexus(instrument, sample_path, bank, sample_workspace,
        self._save_focused_output_files_as_gss(instrument, sample_path, bank, sample_workspace,
        self._save_focused_output_files_as_topas_xye(instrument, sample_path, bank, sample_workspace,
                                                     rb_num)
        logger.notice(f"\n\nFocus files saved to: \"{path.join(path_handling.get_output_path(), 'Focus')}\"\n\n")
        if rb_num:
            output_path = path.join(path_handling.get_output_path(), 'User', rb_num, 'Focus')
            logger.notice(f"\n\nFocus files saved to: \"{output_path}\"\n\n")

    def _save_focused_output_files_as_gss(self, instrument, sample_path, bank, sample_workspace,
                                          rb_num):
        gss_output_path = path.join(
            path_handling.get_output_path(), "Focus",
            self._generate_output_file_name(instrument, sample_path, bank, ".gss"))
        SaveGSS(InputWorkspace=sample_workspace, Filename=gss_output_path)
            gss_output_path = path.join(
                path_handling.get_output_path(), "User", rb_num, "Focus",
                self._generate_output_file_name(instrument, sample_path, bank, ".gss"))
            SaveGSS(InputWorkspace=sample_workspace, Filename=gss_output_path)

    def _save_focused_output_files_as_nexus(self, instrument, sample_path, bank, sample_workspace,
                                            rb_num):
        nexus_output_path = path.join(
            path_handling.get_output_path(), "Focus",
            self._generate_output_file_name(instrument, sample_path, bank, ".nxs"))
        SaveNexus(InputWorkspace=sample_workspace, Filename=nexus_output_path)
            nexus_output_path = path.join(
                path_handling.get_output_path(), "User", rb_num, "Focus",
                self._generate_output_file_name(instrument, sample_path, bank, ".nxs"))
            SaveNexus(InputWorkspace=sample_workspace, Filename=nexus_output_path)

    def _save_focused_output_files_as_topas_xye(self, instrument, sample_path, bank,
                                                sample_workspace, rb_num):
        xye_output_path = path.join(
            path_handling.get_output_path(), "Focus",
            self._generate_output_file_name(instrument, sample_path, bank, ".abc"))
        SaveFocusedXYE(InputWorkspace=sample_workspace,
                       Filename=xye_output_path,
                       SplitFiles=False,
                       Format="TOPAS")
            xye_output_path = path.join(
                path_handling.get_output_path(), "User", rb_num, "Focus",
                self._generate_output_file_name(instrument, sample_path, bank, ".abc"))
            SaveFocusedXYE(InputWorkspace=sample_workspace,
                           Filename=xye_output_path,
                           SplitFiles=False,
                           Format="TOPAS")
    @staticmethod
    def _output_sample_logs(instrument, run_number, workspace, rb_num):
        def write_to_file():
            with open(output_path, "w", newline="") as logfile:
                writer = csv.writer(logfile, ["Sample Log", "Avg Value"])
                for log in output_dict:
                    writer.writerow([log, output_dict[log]])

        output_dict = {}
        sample_run = workspace.getRun()
        log_names = sample_run.keys()
        # Collect numerical sample logs.
        for name in log_names:
            try:
                output_dict[name] = sample_run.getPropertyAsSingleValue(name)
            except ValueError:
                logger.information(f"Could not convert {name} to a numerical value. It will not be included in the "
                                   f"sample logs output file.")
        focus_dir = path.join(path_handling.get_output_path(), "Focus")
        if not path.exists(focus_dir):
            makedirs(focus_dir)
        output_path = path.join(focus_dir, (instrument + "_" + run_number + "_sample_logs.csv"))
        write_to_file()
        if rb_num:
            focus_user_dir = path.join(path_handling.get_output_path(), "User", rb_num, "Focus")
            if not path.exists(focus_user_dir):
                makedirs(focus_user_dir)
            output_path = path.join(focus_user_dir, (instrument + "_" + run_number + "_sample_logs.csv"))
    def _generate_output_file_name(instrument, sample_path, bank, suffix):
        run_no = path_handling.get_run_number_from_path(sample_path, instrument)
        return instrument + "_" + run_no + "_bank_" + bank + suffix