Newer
Older
from __future__ import (absolute_import, division, print_function)
from mantid.kernel import Direction, FloatArrayProperty, IntArrayBoundedValidator, \
IntArrayProperty, StringListValidator
from mantid.api import AlgorithmFactory, DataProcessorAlgorithm, FileAction, FileProperty, \
MultipleFileProperty, Progress, PropertyMode, WorkspaceProperty
from mantid.simpleapi import AlignAndFocusPowder, CloneWorkspace, \
ConvertUnits, CreateGroupingWorkspace, CropWorkspace, DeleteWorkspace, \
Divide, EditInstrumentGeometry, GetIPTS, Load, LoadMask, LoadIsawDetCal, LoadNexusProcessed, \
MaskDetectors, NormaliseByCurrent, PreprocessDetectorsToMD, Rebin, \
RenameWorkspace, ReplaceSpecialValues, RemovePromptPulse, SaveAscii, SaveFocusedXYE, \
SaveGSS, SaveNexusProcessed, mtd
import os
import numpy as np
class SNAPReduce(DataProcessorAlgorithm):
IPTS_dir = None
def get_IPTS_Local(self, run):
if self.IPTS_dir is None:
self.IPTS_dir = GetIPTS(Instrument='SNAP',
RunNumber=str(run))
return self.IPTS_dir
def smooth(self, data, order):
# This smooths data based on linear weigthed average around
# point i for example for an order of 7 the i point is
# weighted 4, i=/- 1 weighted 3, i+/-2 weighted 2 and i+/-3
# weighted 1 this input is only the y values
sm = np.zeros(len(data))
for r in range(max(0, i - int(order / 2)),
min(i + int(order / 2), len(data) - 1) + 1):
temp = temp + (factor - abs(r - i)) * data[r]
ave = ave + factor - abs(r - i)
sm[i] = temp / ave
return sm
def LLS_transformation(self, input):
# this transforms data to be more sensitive to weak peaks. The
# function is reversed by the Inv_LLS function below
return out
def Inv_LLS_transformation(self, input):
# See Function LLS function above
return out
def peak_clip(self, data, win=30, decrese=True, LLS=True, smooth_window=0):
start_data = np.copy(data)
window = win
self.log().information(str(smooth_window))
if smooth_window > 0:
data = self.smooth(data, smooth_window)
if LLS:
data = self.LLS_transformation(data)
temp = data.copy()
if decrese:
for w in scan:
for i in range(len(temp)):
win_array_reversed = win_array[::-1]
temp[i] = np.min(average[:len(average) / 2])
if LLS:
temp = self.Inv_LLS_transformation(temp)
index = np.where((start_data - temp) == min(start_data - temp))[0][0]
output = temp * (start_data[index] / temp[index])
return output
def category(self):
return "Diffraction\\Reduction"
def PyInit(self):
validator = IntArrayBoundedValidator()
validator.setLower(0)
self.declareProperty(IntArrayProperty("RunNumbers", values=[0], direction=Direction.Input,
validator=validator),
"Run numbers to process, comma separated")
self.declareProperty("LiveData", False,
"Read live data - requires a saved run in the current IPTS "
+ "with the same Instrument configuration as the live run")
mask = ["None", "Horizontal", "Vertical",
"Masking Workspace", "Custom - xml masking file"]
self.declareProperty("Masking", "None", StringListValidator(mask),
"Mask to be applied to the data")
self.declareProperty(WorkspaceProperty("MaskingWorkspace", "",
Direction.Input, PropertyMode.Optional),
"The workspace containing the mask.")
self.declareProperty(FileProperty(name="MaskingFilename", defaultValue="",
direction=Direction.Input,
action=FileAction.OptionalLoad),
doc="The file containing the xml mask.")
self.declareProperty(name="Calibration", defaultValue="Convert Units",
['Convert Units', 'Calibration File', 'DetCal File']),
direction=Direction.Input,
doc="The type of conversion to d_spacing to be used.")
self.declareProperty(FileProperty(name="CalibrationFilename", defaultValue="",
direction=Direction.Input,
action=FileAction.OptionalLoad),
doc="The calibration file to convert to d_spacing.")
self.declareProperty(MultipleFileProperty(name='DetCalFilename',
extensions=['.detcal'], action=FileAction.OptionalLoad),
'ISAW DetCal file')
self.declareProperty(FloatArrayProperty("Binning", [0.5, -0.004, 7.0]),
"Min, Step, and Max of d-space bins. Logarithmic binning is used if Step is negative.")
nor_corr = ["None", "From Workspace",
"From Processed Nexus", "Extracted from Data"]
self.declareProperty("Normalization", "None", StringListValidator(nor_corr),
"If needed what type of input to use as normalization, Extracted from "
+ "Data uses a background determination that is peak independent.This "
+ "implemantation can be tested in algorithm SNAP Peak Clipping Background")
self.declareProperty(FileProperty(name="NormalizationFilename", defaultValue="",
direction=Direction.Input,
action=FileAction.OptionalLoad),
doc="The file containing the processed nexus for normalization.")
self.declareProperty(WorkspaceProperty("NormalizationWorkspace", "",
Direction.Input, PropertyMode.Optional),
"The workspace containing the normalization data.")
self.declareProperty("PeakClippingWindowSize", 10,
"Read live data - requires a saved run in the current "
+ "IPTS with the same Instrumnet configuration")
self.declareProperty("SmoothingRange", 10,
"Read live data - requires a saved run in the "
+ "current IPTS with the same Instrumnet configuration")
grouping = ["All", "Column", "Banks", "Modules", "2_4 Grouping"]
self.declareProperty("GroupDetectorsBy", "All", StringListValidator(grouping),
"Detector groups to use for future focussing: "
+ "All detectors as one group, Groups (East,West for "
+ "SNAP), Columns for SNAP, detector banks")
mode = ["Set-Up", "Production"]
self.declareProperty("ProcessingMode", "Production", StringListValidator(mode),
"Set-Up Mode is used for establishing correct parameters. Production "
+ "Mode only Normalized workspace is kept for each run.")
self.declareProperty(name="OptionalPrefix", defaultValue="",
direction=Direction.Input,
doc="Optional Prefix to be added to workspaces and output filenames")
self.declareProperty("SaveData", False,
"Save data in the following formats: Ascii- "
+ "d-spacing ,Nexus Processed,GSAS and Fullprof")
self.declareProperty(FileProperty(name="OutputDirectory", defaultValue="",
action=FileAction.OptionalDirectory),
doc='Default value is proposal shared directory')
def validateInputs(self):
issues = dict()
# cross check masking
masking = self.getProperty("Masking").value
if masking in ("None", "Horizontal", "Vertical"):
pass
elif masking in ("Custom - xml masking file"):
filename = self.getProperty("MaskingFilename").value
issues[
"MaskingFilename"] = "Masking=\"%s\" requires a filename" % masking
mask_workspace = self.getPropertyValue("MaskingWorkspace")
if mask_workspace is None or len(mask_workspace) <= 0:
issues["MaskingWorkspace"] = "Must supply masking workspace"
raise ValueError("Masking value \"%s\" not supported" % masking)
# cross check normalization
normalization = self.getProperty("Normalization").value
if normalization in ("None", "Extracted from Data"):
pass
elif normalization == "From Workspace":
norm_workspace = self.getPropertyValue("NormalizationWorkspace")
issues['NormalizationWorkspace'] = 'Cannot be unset'
elif normalization == "From Processed Nexus":
filename = self.getProperty("NormalizationFilename").value
issues["NormalizationFilename"] = "Normalization=\"%s\" requires a filename" \
raise ValueError("Normalization value \"%s\" not supported" % normalization)
# cross check method of converting to d-spacing
calibration = self.getProperty('Calibration').value
if calibration == 'Convert Units':
pass
elif calibration == 'Calibration File':
filename = self.getProperty('CalibrationFilename').value
if len(filename) <= 0:
issues['CalibrationFilename'] \
= "Calibration=\"%s\" requires a filename" % calibration
elif calibration == 'DetCal File':
filenames = self.getProperty('DetCalFilename').value
if len(filenames) <= 0:
issues['DetCalFilename'] \
= "Calibration=\"%s\" requires a filename" % calibration
if len(filenames) > 2:
issues['DetCalFilename'] \
= "Calibration=\"%s\" requires one or two filenames" % calibration
else:
raise ValueError("Calibration value \"%s\" not supported" % calibration)
masking = self.getProperty("Masking").value
maskFile = None
# none and workspace are special
if masking == 'None':
pass
elif masking == "Masking Workspace":
maskWSname = str(self.getProperty("MaskingWorkspace").value)
# deal with files
elif masking == 'Custom - xml masking file':
maskWSname = 'CustomMask'
maskFile = self.getProperty('MaskingFilename').value
# TODO not reading the correct mask file geometry
elif masking == 'Horizontal' or masking == 'Vertical':
maskWSname = masking + 'Mask' # append the work 'Mask' for the wksp name
if not mtd.doesExist(maskWSname): # only load if it isn't already loaded
maskFile = '/SNS/SNAP/shared/libs/%s_Mask.xml' % masking
if maskFile is not None:
LoadMask(InputFile=maskFile, Instrument='SNAP', OutputWorkspace=maskWSname)
if maskWSname is None:
maskWSname = ''
def _generateNormalization(self, WS, normType, normWS):
if normType == 'None':
return None
elif normType == "Extracted from Data":
window = self.getProperty("PeakClippingWindowSize").value
smooth_range = self.getProperty("SmoothingRange").value
peak_clip_WS = str(WS).replace('_red', '_normalizer')
peak_clip_WS = CloneWorkspace(InputWorkspace=WS, OutputWorkspace=peak_clip_WS)
n_histo = peak_clip_WS.getNumberHistograms()
for h in range(n_histo):
peak_clip_WS.setY(h, self.peak_clip(peak_clip_WS.readY(h), win=window, decrese=True,
LLS=True, smooth_window=smooth_range))
else: # other values are already held in normWS
return normWS
def _save(self, saveDir, basename, outputWksp):
if not self.getProperty("SaveData").value:
return
self.log().notice('Writing to \'' + saveDir + '\'')
SaveNexusProcessed(InputWorkspace=outputWksp,
Filename=os.path.join(saveDir, 'nexus', basename + '.nxs'))
SaveAscii(InputWorkspace=outputWksp,
Filename=os.path.join(saveDir, 'd_spacing', basename + '.dat'))
ConvertUnits(InputWorkspace=outputWksp, OutputWorkspace='WS_tof',
Target="TOF", AlignBins=False)
# GSAS and FullProf require data in time-of-flight
SaveGSS(InputWorkspace='WS_tof',
Filename=os.path.join(saveDir, 'gsas', basename + '.gsa'),
Format='SLOG', SplitFiles=False, Append=False, ExtendedHeader=True)
SaveFocusedXYE(InputWorkspace='WS_tof',
Filename=os.path.join(
saveDir, 'fullprof', basename + '.dat'),
SplitFiles=True, Append=False)
DeleteWorkspace(Workspace='WS_tof')
def PyExec(self):
# Retrieve all relevant notice
in_Runs = self.getProperty("RunNumbers").value
maskWSname = self._getMaskWSname()
progress = Progress(self, 0., .25, 3)
# either type of file-based calibration is stored in the same variable
calib = self.getProperty("Calibration").value
if calib == "Calibration File":
cal_File = self.getProperty("CalibrationFilename").value
progress.report('loaded calibration')
elif calib == 'DetCal File':
cal_File = self.getProperty('DetCalFilename').value
progress.report('loaded detcal')
params = self.getProperty("Binning").value
norm = self.getProperty("Normalization").value
if norm == "From Processed Nexus":
norm_File = self.getProperty("NormalizationFilename").value
normalizationWS = 'normWS'
LoadNexusProcessed(Filename=norm_File, OutputWorkspace=normalizationWS)
progress.report('loaded normalization')
elif norm == "From Workspace":
normalizationWS = str(self.getProperty("NormalizationWorkspace").value)
group_to_real = {'Banks':'Group', 'Modules':'bank', '2_4 Grouping':'2_4Grouping'}
group = self.getProperty('GroupDetectorsBy').value
real_name = group_to_real.get(group, group)
if not mtd.doesExist(group):
if group == '2_4 Grouping':
group = '2_4_Grouping'
# TODO take instrument from the data being loaded
CreateGroupingWorkspace(InstrumentName='SNAP', GroupDetectorsBy=real_name,
OutputWorkspace=group)
progress.report('create grouping')
Process_Mode = self.getProperty("ProcessingMode").value
prefix = self.getProperty("OptionalPrefix").value
# --------------------------- REDUCE DATA -----------------------------
progStart = .25
progDelta = (1.-progStart)/len(in_Runs)
for runnumber in in_Runs:
self.log().notice("processing run %s" % runnumber)
self.log().information(str(self.get_IPTS_Local(runnumber)))
# put together output names
new_Tag = Tag
if len(prefix) > 0:
new_Tag += '_' + prefix
basename = '%s_%s_%s' % (new_Tag, runnumber, group)
if self.getProperty("LiveData").value:
# Need to update the new_Tag and basename variables
raise RuntimeError('Live data is not currently supported')
Load(Filename='SNAP' + str(runnumber), OutputWorkspace=basename + '_red', startProgress=progStart,
endProgress=progStart + .25 * progDelta)
progStart += .25 * progDelta
# overwrite geometry with detcal files
if calib == 'DetCal File':
LoadIsawDetCal(InputWorkspace=redWS, Filename=cal_File)
# create unfocussed data if in set-up mode
unfocussedWksp = '{}_{}_d'.format(new_Tag, runnumber)
else:
unfocussedWksp = ''
# TODO the right thing with the calibration information
AlignAndFocusPowder(InputWorkspace=redWS, OutputWorkspace=redWS,
TMax=50000,
Params=params, # binning parameters in d-space
RemovePromptPulseWidth=1600,
MaskWorkspace=maskWSname, # can be empty string
GroupingWorkspace=group,
UnfocussedWorkspace=unfocussedWksp, # can be empty string
PreserveEvents=False,
Dspacing=True, # bin in d-spacing
startProgress = progStart,
endProgress = progStart + .5 * progDelta
)
progStart += .5 * progDelta
# the rest takes up .25 percent of the run processing
progress = Progress(self, progStart, progStart+.25*progDelta, 2)
# AlignAndFocusPowder leaves the data in time-of-flight
ConvertUnits(InputWorkspace=redWS, OutputWorkspace=redWS, Target='dSpacing', EMode='Elastic')
# AlignAndFocus doesn't necessarily rebin the data correctly
if Process_Mode == "Set-Up":
Rebin(InputWorkspace=unfocussedWksp, Params=params, Outputworkspace=unfocussedWksp)
NormaliseByCurrent(InputWorkspace=redWS, OutputWorkspace=redWS)
# normalize the data as requested
normalizationWS = self._generateNormalization(redWS, norm, normalizationWS)
normalizedWS = None
if normalizationWS is not None:
normalizedWS = basename + '_nor'
Divide(LHSWorkspace=redWS, RHSWorkspace=normalizationWS,
OutputWorkspace=normalizedWS)
ReplaceSpecialValues(Inputworkspace=normalizedWS,
OutputWorkspace=normalizedWS,
NaNValue='0', NaNError='0',
InfinityValue='0', InfinityError='0')
progress.report('normalized')
else:
progress.report()
# Edit instrument geomety to make final workspace smaller on disk
det_table = PreprocessDetectorsToMD(Inputworkspace=redWS,
polar = np.degrees(det_table.column('TwoTheta'))
azi = np.degrees(det_table.column('Azimuthal'))
EditInstrumentGeometry(Workspace=redWS, L2=det_table.column('L2'),
Polar=polar, Azimuthal=azi)
if normalizedWS is not None:
EditInstrumentGeometry(Workspace=normalizedWS, L2=det_table.column('L2'),
mtd.remove('__SNAP_det_table')
progress.report('simplify geometry')
# rename everything as appropriate and determine output workspace name
outputWksp = normalizedWS
if norm == "Extracted from Data":
if Process_Mode == "Production":
DeleteWorkspace(Workspace=redWS)
DeleteWorkspace(Workspace=normalizationWS)
# Save requested formats
saveDir = self.getPropertyValue("OutputDirectory").strip()
if len(saveDir) <= 0:
self.log().notice('Using default save location')
saveDir = os.path.join(self.get_IPTS_Local(runnumber), 'shared', 'data')
self._save(saveDir, basename, outputWksp)
# set workspace as an output so it gets history
propertyName = 'OutputWorkspace_'+str(outputWksp)
self.declareProperty(WorkspaceProperty(
propertyName, outputWksp, Direction.Output))
self.setProperty(propertyName, outputWksp)
# declare some things as extra outputs in set-up
if Process_Mode != "Production": # TODO set them as workspace properties
propNames = ['OuputWorkspace_'+str(it) for it in ['d', 'norm', 'normalizer']]
wkspNames = ['%s_%s_d' % (new_Tag, runnumber), basename + '_red', '%s_%s_normalizer' % (new_Tag, runnumber)]
for (propName, wkspName) in zip(propNames, wkspNames):
if mtd.doesExist(wkspName):
self.declareProperty(WorkspaceProperty(propName, wkspName, Direction.Output))
self.setProperty(propName, wkspName)