Skip to content
Snippets Groups Projects
SNAPReduce.py 21.4 KiB
Newer Older
Peterson, Peter's avatar
Peterson, Peter committed
# pylint: disable=invalid-name,no-init,too-many-lines
from __future__ import (absolute_import, division, print_function)
from mantid.kernel import Direction, FloatArrayProperty, IntArrayBoundedValidator, \
    IntArrayProperty, StringListValidator
from mantid.api import AlgorithmFactory, DataProcessorAlgorithm, FileAction, \
    FileProperty, PropertyMode, WorkspaceProperty
from mantid.simpleapi import AlignDetectors, CloneWorkspace, CompressEvents, \
    ConvertUnits, CropWorkspace, DeleteWorkspace, DiffractionFocussing, Divide, \
    EditInstrumentGeometry, GetIPTS, GroupDetectors, Load, LoadDetectorsGroupingFile, LoadMask, \
    MaskDetectors, NormaliseByCurrent, PreprocessDetectorsToMD, Rebin, RenameWorkspace, \
    ReplaceSpecialValues, RemovePromptPulse, SaveAscii, SaveFocusedXYE, \
    SaveGSS, SaveNexusProcessed, mtd
import os
import numpy as np


class SNAPReduce(DataProcessorAlgorithm):
    IPTS_dir = None

    def get_IPTS_Local(self, run):
        if self.IPTS_dir is None:
            self.IPTS_dir = GetIPTS(Instrument='SNAP',
                                    RunNumber=str(run))
        return self.IPTS_dir

    def get_Run_Log_Local(self, run):

Peterson, Peter's avatar
Peterson, Peter committed
        runs = list(range(run - 5, run + 1))
        file_Str = ''

        while len(runs) > 0:
            try:
                r = runs.pop()
                self.log().notice('\n run is : %s' % r)
                file_Str = FileFinder.findRuns(str(r))[0]
                break
            except RuntimeError:
                pass

        if file_Str == '':
            self.log().error('The SNAP run %s is not found in the server - '
                             + 'cannot find current IPTS, Choose another '
                             + 'run' % run)
            return False
        else:
            self.log().notice('File is: %s' % file_Str)
            LogRun = int(file_Str.split('_')[1])
            self.log().notice('Log Run is: %s' % LogRun)
            return LogRun

    def smooth(self, data, order):
        # This smooths data based on linear weigthed average around
        # point i for example for an order of 7 the i point is
        # weighted 4, i=/- 1 weighted 3, i+/-2 weighted 2 and i+/-3
        # weighted 1 this input is only the y values
        sm = np.zeros(len(data))
Peterson, Peter's avatar
Peterson, Peter committed
        factor = order / 2 + 1
        for i in range(len(data)):
            for r in range(max(0, i - int(order / 2)),
                           min(i + int(order / 2), len(data) - 1) + 1):
Peterson, Peter's avatar
Peterson, Peter committed
                temp = temp + (factor - abs(r - i)) * data[r]
                ave = ave + factor - abs(r - i)
            sm[i] = temp / ave

        return sm

    def LLS_transformation(self, input):
        # this transforms data to be more sensitive to weak peaks. The
        # function is reversed by the Inv_LLS function below
Peterson, Peter's avatar
Peterson, Peter committed
        out = np.log(np.log((input + 1)**0.5 + 1) + 1)

        return out

    def Inv_LLS_transformation(self, input):
        # See Function LLS function above
Peterson, Peter's avatar
Peterson, Peter committed
        out = (np.exp(np.exp(input) - 1) - 1)**2 - 1

        return out

    def peak_clip(self, data, win=30, decrese=True, LLS=True, smooth_window=0):
        start_data = np.copy(data)

        window = win
        self.log().information(str(smooth_window))

        if smooth_window > 0:
            data = self.smooth(data, smooth_window)

        if LLS:
            data = self.LLS_transformation(data)

        temp = data.copy()

        if decrese:
Peterson, Peter's avatar
Peterson, Peter committed
            scan = list(range(window + 1, 0, -1))
Peterson, Peter's avatar
Peterson, Peter committed
            scan = list(range(1, window + 1))

        for w in scan:
            for i in range(len(temp)):
Peterson, Peter's avatar
Peterson, Peter committed
                if i < w or i > (len(temp) - w - 1):
Peterson, Peter's avatar
Peterson, Peter committed
                    win_array = temp[i - w:i + w + 1].copy()
                    win_array_reversed = win_array[::-1]
Peterson, Peter's avatar
Peterson, Peter committed
                    average = (win_array + win_array_reversed) / 2
                    temp[i] = np.min(average[:len(average) / 2])

        if LLS:
            temp = self.Inv_LLS_transformation(temp)

Peterson, Peter's avatar
Peterson, Peter committed
        self.log().information(str(min(start_data - temp)))

        index = np.where((start_data - temp) == min(start_data - temp))[0][0]

        output = temp * (start_data[index] / temp[index])

        return output

    def category(self):
        return "Diffraction\\Reduction"

    def PyInit(self):

        validator = IntArrayBoundedValidator()
        validator.setLower(0)
        self.declareProperty(IntArrayProperty("RunNumbers", values=[0], direction=Direction.Input,
                                              validator=validator),
                             "Run numbers to process, comma separated")

        self.declareProperty("LiveData", False,
                             "Read live data - requires a saved run in the current IPTS "
                             + "with the same Instrument configuration as the live run")

Peterson, Peter's avatar
Peterson, Peter committed
        mask = ["None", "Horizontal", "Vertical",
                "Masking Workspace", "Custom - xml masking file"]
        self.declareProperty("Masking", "None", StringListValidator(mask),
                             "Mask to be applied to the data")

        self.declareProperty(WorkspaceProperty("MaskingWorkspace", "",
                                               Direction.Input, PropertyMode.Optional),
                             "The workspace containing the mask.")

        self.declareProperty(FileProperty(name="MaskingFilename", defaultValue="",
                                          direction=Direction.Input,
                                          action=FileAction.OptionalLoad),
                             doc="The file containing the xml mask.")

        self.declareProperty(name="Calibration", defaultValue="Convert Units",
Peterson, Peter's avatar
Peterson, Peter committed
                             validator=StringListValidator(
                                 ["Convert Units", "Calibration File"]),
                             direction=Direction.Input,
                             doc="The type of conversion to d_spacing to be used.")

        self.declareProperty(FileProperty(name="CalibrationFilename", defaultValue="",
                                          direction=Direction.Input,
                                          action=FileAction.OptionalLoad),
                             doc="The calibration file to convert to d_spacing.")

        self.declareProperty(FloatArrayProperty("Binning", [0.5, -0.004, 7.0]),
                             "Min, Step, and Max of d-space bins.  Logarithmic binning is used if Step is negative.")

Peterson, Peter's avatar
Peterson, Peter committed
        nor_corr = ["None", "From Workspace",
                    "From Processed Nexus", "Extracted from Data"]
        self.declareProperty("Normalization", "None", StringListValidator(nor_corr),
                             "If needed what type of input to use as normalization, Extracted from "
                             + "Data uses a background determination that is peak independent.This "
                             + "implemantation can be tested in algorithm SNAP Peak Clipping Background")

        self.declareProperty(FileProperty(name="NormalizationFilename", defaultValue="",
                                          direction=Direction.Input,
                                          action=FileAction.OptionalLoad),
                             doc="The file containing the processed nexus for normalization.")

        self.declareProperty(WorkspaceProperty("NormalizationWorkspace", "",
                                               Direction.Input, PropertyMode.Optional),
                             "The workspace containing the normalization data.")

        self.declareProperty("PeakClippingWindowSize", 10,
                             "Read live data - requires a saved run in the current "
                             + "IPTS with the same Instrumnet configuration")

        self.declareProperty("SmoothingRange", 10,
                             "Read live data - requires a saved run in the "
                             + "current IPTS with the same Instrumnet configuration")

        grouping = ["All", "Column", "Banks", "Modules", "2_4 Grouping"]
        self.declareProperty("GroupDetectorsBy", "All", StringListValidator(grouping),
                             "Detector groups to use for future focussing: "
                             + "All detectors as one group, Groups (East,West for "
                             + "SNAP), Columns for SNAP, detector banks")

        mode = ["Set-Up", "Production"]
        self.declareProperty("ProcessingMode", "Production", StringListValidator(mode),
                             "Set-Up Mode is used for establishing correct parameters. Production "
                             + "Mode only Normalized workspace is kept for each run.")

        self.declareProperty(name="OptionalPrefix", defaultValue="",
                             direction=Direction.Input,
                             doc="Optional Prefix to be added to workspaces and output filenames")

        self.declareProperty("SaveData", False,
                             "Save data in the following formats: Ascii- "
                             + "d-spacing ,Nexus Processed,GSAS and Fullprof")
        self.declareProperty(FileProperty(name="OutputDirectory", defaultValue="",
                                          action=FileAction.OptionalDirectory),
                             doc='Default value is proposal shared directory')

    def validateInputs(self):
        issues = dict()

        # cross check masking
        masking = self.getProperty("Masking").value
        if masking in ("None", "Horizontal", "Vertical"):
            pass
        elif masking in ("Custom - xml masking file"):
            filename = self.getProperty("MaskingFilename").value
            if len(filename) <= 0:
Peterson, Peter's avatar
Peterson, Peter committed
                issues[
                    "MaskingFilename"] = "Masking=\"%s\" requires a filename" % masking
        elif masking == "MaskingWorkspace":
            mask_workspace = self.getPropertyValue("MaskingWorkspace")
            if mask_workspace is None or len(mask_workspace) <= 0:
                issues["MaskingWorkspace"] = "Must supply masking workspace"
        else:
            raise RuntimeError("Masking value \"%s\" not supported" % masking)

        # cross check normalization
        normalization = self.getProperty("Normalization").value
        if normalization in ("None", "Extracted from Data"):
            pass
        elif normalization == "From Workspace":
            norm_workspace = self.getPropertyValue("NormalizationWorkspace")
        elif normalization == "From Processed Nexus":
            filename = self.getProperty("NormalizationFilename").value
            if len(filename) <= 0:
                issues["NormalizationFilename"] = "Normalization=\"%s\" requires a filename" \
Peterson, Peter's avatar
Peterson, Peter committed
                                                  % normalization
Peterson, Peter's avatar
Peterson, Peter committed
            raise RuntimeError(
                "Normalization value \"%s\" not supported" % normalization)

        return issues

    def PyExec(self):
        # Retrieve all relevant notice

        in_Runs = self.getProperty("RunNumbers").value

        masking = self.getProperty("Masking").value

        if masking == "Custom - xml masking file":
            LoadMask(InputFile=self.getProperty("MaskingFilename").value,
                     Instrument='SNAP', OutputWorkspace='CustomMask')
        elif masking == "Horizontal":
            if not mtd.doesExist('HorizontalMask'):
                LoadMask(InputFile='/SNS/SNAP/shared/libs/Horizontal_Mask.xml',
                         Instrument='SNAP', OutputWorkspace='HorizontalMask')
        elif masking == "Vertical":
            if not mtd.doesExist('VerticalMask'):
                LoadMask(InputFile='/SNS/SNAP/shared/libs/Vertical_Mask.xml',
                         Instrument='SNAP', OutputWorkspace='VerticalMask')
        elif masking == "Masking Workspace":
            mask_workspace = self.getProperty("MaskingWorkspace").value

        calib = self.getProperty("Calibration").value
        if calib == "Calibration File":
            cal_File = self.getProperty("CalibrationFilename").value

        params = self.getProperty("Binning").value
        norm = self.getProperty("Normalization").value

        if norm == "From Processed Nexus":
            norm_File = self.getProperty("Normalization filename").value
            Normalization = LoadNexusProcessed(Filename=norm_File)
        if norm == "From Workspace":
            normWS = self.getProperty("NormalizationWorkspace").value

        group = self.getProperty("GroupDetectorsBy").value

        if group == 'All':
            real_name = 'All'
        elif group == 'Column':
            real_name = 'Column'
        elif group == 'Banks':
            real_name = 'Group'
        elif group == 'Modules':
            real_name = 'bank'
        elif group == '2_4 Grouping':
            real_name = '2_4_Grouping'

        if not mtd.doesExist(group):
            if group == "2_4 Grouping":
                group = real_name
                LoadDetectorsGroupingFile(InputFile=r'/SNS/SNAP/shared/libs/SNAP_group_2_4.xml',
                                          OutputWorkspace=group)
            else:
                CreateGroupingWorkspace(InstrumentName='SNAP', GroupDetectorsBy=real_name,
                                        OutputWorkspace=group)

        Process_Mode = self.getProperty("ProcessingMode").value
        prefix = self.getProperty("OptionalPrefix").value
        save_Data = self.getProperty("SaveData").value
Peterson, Peter's avatar
Peterson, Peter committed
        # --------------------------- REDUCE DATA -----------------------------

        Tag = 'SNAP'
        for r in in_Runs:
            self.log().notice("processing run %s" % r)
            self.log().information(str(self.get_IPTS_Local(r)))
            if self.getProperty("LiveData").value:
Peterson, Peter's avatar
Peterson, Peter committed
                WS = LoadPreNexusLive(Instrument='SNAP')
Peterson, Peter's avatar
Peterson, Peter committed
                WS = Load(Filename='SNAP' + str(r), Outputworkspace='WS')
                WS = NormaliseByCurrent(
                    InputWorkspace=WS, Outputworkspace='WS')

            WS = CompressEvents(InputWorkspace=WS, Outputworkspace='WS')
Peterson, Peter's avatar
Peterson, Peter committed
            WS = CropWorkspace(InputWorkspace='WS',
                               OutputWorkspace='WS', XMax=50000)
            WS = RemovePromptPulse(InputWorkspace=WS, OutputWorkspace='WS',
                                   Width='1600', Frequency='60.4')

            if masking == "Custom - xml masking file":
                WS = MaskDetectors(Workspace=WS, MaskedWorkspace='CustomMask')
            elif masking == "Masking Workspace":
Peterson, Peter's avatar
Peterson, Peter committed
                WS = MaskDetectors(
                    Workspace=WS, MaskedWorkspace=mask_workspace)
            elif masking == "Vertical":
Peterson, Peter's avatar
Peterson, Peter committed
                WS = MaskDetectors(
                    Workspace=WS, MaskedWorkspace='VerticalMask')
            elif masking == "Horizontal":
Peterson, Peter's avatar
Peterson, Peter committed
                WS = MaskDetectors(
                    Workspace=WS, MaskedWorkspace='HorizontalMask')

            if calib == "Convert Units":
Peterson, Peter's avatar
Peterson, Peter committed
                WS_d = ConvertUnits(InputWorkspace='WS',
                                    Target='dSpacing', Outputworkspace='WS_d')

            else:
                self.log().notice("\n calibration file : %s" % cal_File)
Peterson, Peter's avatar
Peterson, Peter committed
                WS_d = AlignDetectors(
                    InputWorkspace='WS', CalibrationFile=cal_File, Outputworkspace='WS_d')
Peterson, Peter's avatar
Peterson, Peter committed
            WS_d = Rebin(InputWorkspace=WS_d, Params=params,
                         Outputworkspace='WS_d')

            if group == "Low Res 5:6-high Res 1:4":
                WS_red = GroupDetectors(InputWorkspace=WS_d,
                                        MapFile=r'/SNS/SNAP/shared/libs/14_56_grouping.xml',
                                        PreserveEvents=False)
            else:
                WS_red = DiffractionFocussing(InputWorkspace=WS_d, GroupingWorkspace=group,
                                              PreserveEvents=False)

            if norm == "From Processed Nexus":
Peterson, Peter's avatar
Peterson, Peter committed
                WS_nor = Divide(LHSWorkspace=WS_red,
                                RHSWorkspace=Normalization)
            elif norm == "From Workspace":
                WS_nor = Divide(LHSWorkspace=WS_red, RHSWorkspace=normWS)
            elif norm == "Extracted from Data":

                window = self.getProperty("PeakClippingWindowSize").value
                smooth_range = self.getProperty("SmoothingRange").value

                peak_clip_WS = CloneWorkspace(WS_red)
                n_histo = peak_clip_WS.getNumberHistograms()

                x = peak_clip_WS.extractX()
                y = peak_clip_WS.extractY()
                e = peak_clip_WS.extractE()

                for h in range(n_histo):
                    peak_clip_WS.setX(h, x[h])
                    peak_clip_WS.setY(h, self.peak_clip(y[h], win=window, decrese=True,
                                                        LLS=True, smooth_window=smooth_range))
                    peak_clip_WS.setE(h, e[h])

                WS_nor = Divide(LHSWorkspace=WS_red, RHSWorkspace=peak_clip_WS)

            if norm != 'None':
                WS_nor = ReplaceSpecialValues(Inputworkspace=WS_nor,
                                              NaNValue='0', NaNError='0',
                                              InfinityValue='0', InfinityError='0')

            new_Tag = Tag
            if len(prefix) > 0:
                new_Tag += '_' + prefix
            # Edit instrument geomety to make final workspace smaller on disk
            det_table = PreprocessDetectorsToMD(Inputworkspace='WS_red',
                                                     OutputWorkspace='__SNAP_det_table')
            polar = np.degrees(det_table.column('TwoTheta'))
            azi = np.degrees(det_table.column('Azimuthal'))
            EditInstrumentGeometry(Workspace="WS_red", L2=det_table.column('L2'),
                                   Polar=polar, Azimuthal=azi)
            EditInstrumentGeometry(Workspace="WS_nor", L2=det_table.column('L2'),
                                   Polar=polar, Azimuthal=azi)
            mtd.remove('__SNAP_det_table')

            # Save requested formats
                saveDir = self.getProperty("OutputDirectory").value.strip()
                if len(saveDir) <= 0:
                    self.log().notice('Using default save location')
Peterson, Peter's avatar
Peterson, Peter committed
                    saveDir = os.path.join(
                        self.get_IPTS_Local(r), 'shared', 'data')
                self.log().notice('Writing to \'' + saveDir + '\'')

                basename = '%s_%s_%s' % (new_Tag, r, group)

                if norm == 'None':
Peterson, Peter's avatar
Peterson, Peter committed
                    WS_tof = ConvertUnits(
                        InputWorkspace="WS_red", Target="TOF", AlignBins=False)
Peterson, Peter's avatar
Peterson, Peter committed
                    WS_tof = ConvertUnits(
                        InputWorkspace="WS_nor", Target="TOF", AlignBins=False)

                if norm == 'None':
                    SaveNexusProcessed(InputWorkspace=WS_red,
Peterson, Peter's avatar
Peterson, Peter committed
                                       Filename=os.path.join(saveDir, 'nexus', basename + '.nxs'))
                    SaveAscii(InputWorkspace=WS_red,
Peterson, Peter's avatar
Peterson, Peter committed
                              Filename=os.path.join(saveDir, 'd_spacing', basename + '.dat'))
                else:
                    SaveNexusProcessed(InputWorkspace=WS_nor,
Peterson, Peter's avatar
Peterson, Peter committed
                                       Filename=os.path.join(saveDir, 'nexus', basename + '.nxs'))
                    SaveAscii(InputWorkspace=WS_nor,
Peterson, Peter's avatar
Peterson, Peter committed
                              Filename=os.path.join(saveDir, 'd_spacing', basename + '.dat'))

                SaveGSS(InputWorkspace=WS_tof,
Peterson, Peter's avatar
Peterson, Peter committed
                        Filename=os.path.join(
                            saveDir, 'gsas', basename + '.gsa'),
                        Format='SLOG', SplitFiles=False, Append=False, ExtendedHeader=True)
                SaveFocusedXYE(InputWorkspace=WS_tof,
Peterson, Peter's avatar
Peterson, Peter committed
                               Filename=os.path.join(
                                   saveDir, 'fullprof', basename + '.dat'),
                               SplitFiles=True, Append=False)
                DeleteWorkspace(Workspace='WS_tof')

            DeleteWorkspace(Workspace='WS')

            if Process_Mode == "Production":
                DeleteWorkspace(Workspace='WS_d')
                if norm != "None":
                    DeleteWorkspace(Workspace='WS_red')
                    RenameWorkspace(InputWorkspace=WS_nor,
                                    OutputWorkspace='%s_%s_%s_nor' % (new_Tag, r, group))
                else:
                    RenameWorkspace(InputWorkspace=WS_red,
                                    OutputWorkspace='%s_%s_%s_red' % (new_Tag, r, group))
                if norm == "Extracted from Data":
                    DeleteWorkspace(Workspace='peak_clip_WS')
            else:
                RenameWorkspace(Inputworkspace='WS_d',
                                OutputWorkspace='%s_%s_d' % (new_Tag, r))
                RenameWorkspace(Inputworkspace='WS_red',
                                OutputWorkspace='%s_%s_%s_red' % (new_Tag, r, group))
                if norm != "None":
                    RenameWorkspace(Inputworkspace='WS_nor',
                                    OutputWorkspace='%s_%s_%s_nor' % (new_Tag, r, group))
                if norm == "Extracted from Data":
                    RenameWorkspace(Inputworkspace='peak_clip_WS',
                                    OutputWorkspace='%s_%s_normalizer' % (new_Tag, r))

Peterson, Peter's avatar
Peterson, Peter committed
            propertyName = 'OutputWorkspace'
            wksp = '%s_%s_%s_nor' % (new_Tag, r, group)
Peterson, Peter's avatar
Peterson, Peter committed
            self.declareProperty(WorkspaceProperty(
                propertyName, wksp, Direction.Output))
            self.setProperty(propertyName, wksp)
AlgorithmFactory.subscribe(SNAPReduce)