Skip to content
Snippets Groups Projects
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
DNSMergeRuns.py 8.11 KiB
from __future__ import (absolute_import, division, print_function)
import mantid.simpleapi as api
from mantid.api import PythonAlgorithm, AlgorithmFactory, MatrixWorkspaceProperty,  WorkspaceGroup
from mantid.kernel import Direction, StringArrayProperty, StringListValidator, V3D, StringArrayLengthValidator
import numpy as np


class DNSMergeRuns(PythonAlgorithm):
    """
    Merges given runs into one matrix workspace.
    This algorithm is written for the DNS @ MLZ,
    but can be adjusted for other instruments if needed.
    """
    properties_to_compare = ['omega', 'slit_i_left_blade_position',
                             'slit_i_right_blade_position', 'slit_i_lower_blade_position',
                             'slit_i_upper_blade_position', 'polarisation', 'polarisation_comment', 'flipper']

    def __init__(self):
        """
        Init
        """
        PythonAlgorithm.__init__(self)
        self.workspace_names = []
        self.xaxis = '2theta'
        self.outws_name = None

    def category(self):
        """
        Returns category
        """
        return 'Workflow\\MLZ\\DNS'

    def name(self):
        """
        Returns name
        """
        return "DNSMergeRuns"

    def summary(self):
        return "Merges runs performed at different detector bank positions into one matrix workspace."

    def PyInit(self):

        validator = StringArrayLengthValidator()
        validator.setLengthMin(1)                               # group of workspaces may be given

        self.declareProperty(StringArrayProperty(name="WorkspaceNames", direction=Direction.Input, validator=validator),
                             doc="List of Workspace names to merge.")
        self.declareProperty(MatrixWorkspaceProperty("OutputWorkspace", "", direction=Direction.Output),
                             doc="A workspace name to save the merged data.")
        H_AXIS = ["2theta", "|Q|", "d-Spacing"]
        self.declareProperty("HorizontalAxis", "2theta", StringListValidator(H_AXIS),
                             doc="X axis in the merged workspace")
        return

    def _expand_groups(self, input_list):
        """
            returns names of the grouped workspaces
        """
        workspaces = []
        for wsname in input_list:
            wks = api.AnalysisDataService.retrieve(wsname)
            if isinstance(wks, WorkspaceGroup):
                workspaces.extend(wks.getNames())
            else:
                workspaces.append(wsname)
        self.log().notice("Workspaces: " + str(workspaces))
        return workspaces

    def validateInputs(self):
        issues = dict()
        input_list = self.getProperty("WorkspaceNames").value

        for wsname in input_list:
            if not api.AnalysisDataService.doesExist(wsname):
                issues["WorkspaceNames"] = "Workspace " + wsname + " does not exist"

        workspace_names = self._expand_groups(input_list)
        if len(workspace_names) < 2:
            issues["WorkspaceNames"] = "At least 2 workspaces required."

        ws0 = api.AnalysisDataService.retrieve(workspace_names[0])
        ndims = ws0.getNumDims()
        nhists = ws0.getNumberHistograms()
        nblocks = ws0.blocksize()
        # workspaces must exist and have the same dimensions
        for wsname in workspace_names[1:]:
            wks = api.AnalysisDataService.retrieve(wsname)
            if wks.getNumDims() != ndims:
                issues["WorkspaceNames"] = "Number of dimensions for workspace " + wks.getName() + \
                    " does not match to one for " + ws0.getName()
            if wks.getNumberHistograms() != nhists:
                issues["WorkspaceNames"] = "Number of histohrams for workspace " + wks.getName() + \
                    " does not match to one for " + ws0.getName()
            if wks.blocksize() != nblocks:
                issues["WorkspaceNames"] = "Number of blocks for workspace " + wks.getName() + \
                    " does not match to one for " + ws0.getName()
        # workspaces must have the same wavelength and normalization
        result = api.CompareSampleLogs(workspace_names, 'wavelength,normalized', 0.01)
        if len(result) > 0:
            issues["WorkspaceNames"] = "Cannot merge workspaces with different " + result

        return issues

    def _merge_workspaces(self):
        """
        merges given workspaces into one
            @param norm If True, normalization workspaces will be merged
        """
        arr = []
        beamDirection = V3D(0, 0, 1)
        # merge workspaces, existance has been checked by _can_merge function
        for ws_name in self.workspace_names:
            wks = api.AnalysisDataService.retrieve(ws_name)
            samplePos = wks.getInstrument().getSample().getPos()
            n_hists = wks.getNumberHistograms()
            two_theta = np.array([wks.getDetector(i).getTwoTheta(samplePos, beamDirection) for i in range(0, n_hists)])
            # round to approximate hardware accuracy 0.05 degree ~ 1 mrad
            two_theta = np.round(two_theta, 4)
            dataY = np.rot90(wks.extractY())[0]
            dataE = np.rot90(wks.extractE())[0]
            for i in range(n_hists):
                arr.append([two_theta[i], dataY[i], dataE[i]])
        data = np.array(arr)
        # sum up intensities for dublicated angles
        data_sorted = data[np.argsort(data[:, 0])]
        # unique values
        unX = np.unique(data_sorted[:, 0])
        if len(data_sorted[:, 0]) - len(unX) > 0:
            arr = []
            self.log().information("There are dublicated 2Theta angles in the dataset. Sum up the intensities.")
            # we must sum up the values
            for i in range(len(unX)):
                idx = np.where(np.fabs(data_sorted[:, 0] - unX[i]) < 1e-4)
                new_y = sum(data_sorted[idx][:, 1])
                err = data_sorted[idx][:, 2]
                new_e = np.sqrt(np.dot(err, err))
                arr.append([unX[i], new_y, new_e])
            data = np.array(arr)

        # define x axis
        wks = api.AnalysisDataService.retrieve(self.workspace_names[0])
        wavelength = wks.getRun().getProperty('wavelength').value
        if self.xaxis == "2theta":
            data[:, 0] = np.round(np.degrees(data[:, 0]), 2)
            unitx = "Degrees"
        elif self.xaxis == "|Q|":
            data[:, 0] = np.fabs(4.0*np.pi*np.sin(0.5*data[:, 0])/wavelength)
            unitx = "MomentumTransfer"
        elif self.xaxis == "d-Spacing":
            data[:, 0] = np.fabs(0.5*wavelength/np.sin(0.5*data[:, 0]))
            unitx = "dSpacing"
        else:
            message = "The value for X axis " + self.xaxis + " is invalid! Cannot merge."
            self.log().error(message)
            raise RuntimeError(message)

        data_sorted = data[np.argsort(data[:, 0])]
        api.CreateWorkspace(dataX=data_sorted[:, 0], dataY=data_sorted[:, 1], dataE=data_sorted[:, 2],
                            UnitX=unitx, OutputWorkspace=self.outws_name)
        outws = api.AnalysisDataService.retrieve(self.outws_name)
        # assume that all input workspaces have the same YUnits and YUnitLabel
        wks = api.AnalysisDataService.retrieve(self.workspace_names[0])
        outws.setYUnit(wks.YUnit())
        outws.setYUnitLabel(wks.YUnitLabel())

        return

    def PyExec(self):
        # Input
        input_list = self.getProperty("WorkspaceNames").value
        self.outws_name = self.getProperty("OutputWorkspace").valueAsStr
        self.xaxis = self.getProperty("HorizontalAxis").value

        self.workspace_names = self._expand_groups(input_list)
        self.log().information("Workspaces to merge: %i" % (len(self.workspace_names)))
        # produce warnings is some optional sample logs do not match
        result = api.CompareSampleLogs(self.workspace_names, self.properties_to_compare, 1e-2)

        self._merge_workspaces()

        outws = api.AnalysisDataService.retrieve(self.outws_name)
        api.CopyLogs(self.workspace_names[0], outws)
        # remove logs which do not match
        if result:
            api.RemoveLogs(outws, result)

        self.setProperty("OutputWorkspace", outws)
        return


# Register algorithm with Mantid
AlgorithmFactory.subscribe(DNSMergeRuns)