Newer
Older
from __future__ import (absolute_import, division, print_function)
import mantid.simpleapi as mantid
import numpy as numpy
from isis_powder.abstract_inst import AbstractInst
import isis_powder.common as common
from isis_powder import pearl_calib_factory
from isis_powder import pearl_cycle_factory
from isis_powder.pearl_routines import fmode_output
class Pearl(AbstractInst):
# # Instrument default settings
_default_input_ext = '.raw'
_default_group_names = "bank1,bank2,bank3,bank4"
_lambda_lower = 0.03
_lambda_upper = 6.00
_focus_tof_binning = "1500,-0.0006,19900"
_create_van_first_tof_binning = "100,-0.0006,19990"
_create_van_second_tof_binning = "150,-0.0006,19900"
def __init__(self, user_name=None, calibration_dir=None, raw_data_dir=None, output_dir=None,
input_file_ext=".raw", tt_mode="TT88"):
super(Pearl, self).__init__(user_name=user_name, calibration_dir=calibration_dir, raw_data_dir=raw_data_dir,
output_dir=output_dir, default_input_ext=input_file_ext)
self._tt_mode = tt_mode
# This advanced option disables appending the current cycle to the
# path given for raw files.
self._disable_appending_cycle_to_raw_dir = False
# Old API support
self._old_atten_file = None
self._old_api_uses_full_paths = False
pearl_mc_absorption_file_name = "PRL112_DC25_10MM_FF.OUT" # TODO
self._attenuation_full_path = os.path.join(calibration_dir, pearl_mc_absorption_file_name)
# --- Abstract Implementation ---- #
def _get_default_group_names(self):
return self._default_group_names
def _get_lambda_range(self):
return self._lambda_lower, self._lambda_upper
def _get_create_van_tof_binning(self):
return_dict = {"1": self._create_van_first_tof_binning,
"2": self._create_van_second_tof_binning}
return return_dict
# Methods #
def _get_calibration_full_paths(self, run_number):
cycle_dict = self._get_cycle_information(run_number=run_number)
calibration_file, grouping_file, van_absorb, van_file =\
pearl_calib_factory.get_calibration_filename(cycle=cycle_dict["cycle"], tt_mode=self._tt_mode)
calibration_dir = self.calibration_dir
calibration_full_path = os.path.join(calibration_dir, calibration_file)
grouping_full_path = os.path.join(calibration_dir, grouping_file)
van_absorb_full_path = os.path.join(calibration_dir, van_absorb)
van_file_full_path = os.path.join(calibration_dir, van_file)
# TODO when we move PEARL to save out splined vanadium files support them below
calibration_details = {"calibration": calibration_full_path,
"grouping": grouping_full_path,
"vanadium_absorption": van_absorb_full_path,
"vanadium": van_file_full_path,
"calibrated_vanadium": van_file_full_path}
return calibration_details
def _get_cycle_information(run_number):
run_input = ""
if not run_number.isdigit():
# Only take first valid number as it is probably of the form 12345_12350
for character in run_number:
if character.isdigit():
run_input += character
else:
break
else:
run_input = run_number
cycle, instrument_version = pearl_cycle_factory.get_cycle_dir(run_input)
cycle_information = {'cycle': cycle,
'instrument_version': instrument_version}
return cycle_information
def _get_instrument_alg_save_ranges(instrument=''):
return _get_instrument_ranges(instrument_version=instrument)
def _generate_inst_file_name(run_number):
return _gen_file_name(run_number=run_number)
# Hook overrides
def _attenuate_workspace(self, input_workspace):
return self._run_attenuate_workspace(input_workspace=input_workspace)
def _create_calibration(self, calibration_runs, offset_file_name, grouping_file_name):
input_ws = common._load_current_normalised_ws(number=calibration_runs, instrument=self)
cycle_information = self._get_cycle_information(calibration_runs)
if cycle_information["instrument_version"] == "new" or cycle_information["instrument_version"] == "new2":
input_ws = mantid.Rebin(InputWorkspace=input_ws, Params="100,-0.0006,19950")
d_spacing_cal = mantid.ConvertUnits(InputWorkspace=input_ws, Target="dSpacing")
d_spacing_cal = mantid.Rebin(InputWorkspace=d_spacing_cal, Params="1.8,0.002,2.1")
if cycle_information["instrument_version"] == "new2":
cross_cor_ws = mantid.CrossCorrelate(InputWorkspace=d_spacing_cal, ReferenceSpectra=20,
WorkspaceIndexMin=9, WorkspaceIndexMax=1063, XMin=1.8, XMax=2.1)
elif cycle_information["instrument_version"] == "new":
cross_cor_ws = mantid.CrossCorrelate(InputWorkspace=d_spacing_cal, ReferenceSpectra=20,
WorkspaceIndexMin=9, WorkspaceIndexMax=943, XMin=1.8, XMax=2.1)
else:
cross_cor_ws = mantid.CrossCorrelate(InputWorkspace=d_spacing_cal, ReferenceSpectra=500,
WorkspaceIndexMin=1, WorkspaceIndexMax=1440, XMin=1.8, XMax=2.1)
if self._old_api_uses_full_paths: # Workaround for old API setting full paths
grouping_file_path = grouping_file_name
offset_file_path = offset_file_name
else:
offset_file_path = os.path.join(self.calibration_dir, offset_file_name)
grouping_file_path = os.path.join(self.calibration_dir, grouping_file_name)
# Ceo Cell refined to 5.4102(3) so 220 is 1.912795
offset_output_path = mantid.GetDetectorOffsets(InputWorkspace=cross_cor_ws, Step=0.002, DReference=1.912795,
XMin=-200, XMax=200, GroupingFileName=offset_file_path)
del offset_output_path # This isn't used so delete it to keep linters happy
aligned_ws = mantid.AlignDetectors(InputWorkspace=input_ws, CalibrationFile=offset_file_path)
cal_grouped_ws = mantid.DiffractionFocussing(InputWorkspace=aligned_ws, GroupingFileName=grouping_file_path)
common.remove_intermediate_workspace(d_spacing_cal)
common.remove_intermediate_workspace(cross_cor_ws)
common.remove_intermediate_workspace(aligned_ws)
common.remove_intermediate_workspace(cal_grouped_ws)
def _create_calibration_silicon(self, calibration_runs, cal_file_name, grouping_file_name):
self._do_silicon_calibration(calibration_runs, cal_file_name, grouping_file_name)
def _normalise_ws(self, ws_to_correct, monitor_ws=None, spline_terms=20):
return self._normalise_current_ws(ws_to_correct=ws_to_correct, load_monitor_ws=monitor_ws,
spline_terms=spline_terms)
def _get_monitor_spectra(self, run_number):
return self._get_monitor_spectrum(run_number=run_number)
def _skip_appending_cycle_to_raw_dir(self):
return self._disable_appending_cycle_to_raw_dir
def _spline_background(self, focused_vanadium_ws, spline_number, instrument_version=''):
if instrument_version == "new2":
out_list = _spline_new2_background(in_workspace=focused_vanadium_ws, num_splines=spline_number,
instrument_version=instrument_version)
elif instrument_version == "new":
out_list = _spline_new_background(in_workspace=focused_vanadium_ws, num_splines=spline_number,
instrument_version=instrument_version)
elif instrument_version == "old":
out_list = _spline_old_instrument_background(in_workspace=focused_vanadium_ws)
else:
raise ValueError("Spline Background - PEARL: Instrument version unknown")
return out_list
def _do_tof_rebinning_focus(self, input_workspace):
input_workspace = mantid.Rebin(InputWorkspace=input_workspace, Params=self._focus_tof_binning)
return input_workspace
def _focus_processing(self, run_number, input_workspace, perform_vanadium_norm):
return self._perform_focus_loading(run_number, input_workspace, perform_vanadium_norm)
def _process_focus_output(self, processed_spectra, run_number, attenuate=False):
return fmode_output.generate_and_save_focus_output(self, processed_spectra=processed_spectra,
run_number=run_number, perform_attenuation=attenuate,
focus_mode=self._focus_mode)
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
def _load_monitor(self, number, cycle):
input_dir = self._generate_raw_data_cycle_dir(run_cycle=cycle)
# TODO refactor all of this
if isinstance(number, int):
full_file_path = self._generate_input_full_path(run_number=number, input_dir=input_dir)
mspectra = self._get_monitor_spectra(number)
load_monitor_ws = mantid.LoadRaw(Filename=full_file_path, SpectrumMin=mspectra, SpectrumMax=mspectra,
LoadLogFiles="0")
else:
load_monitor_ws = common._load_monitor_sum_range(files=number, input_dir=input_dir, instrument=self)
return load_monitor_ws
def _apply_van_calibration_tof_rebinning(self, vanadium_ws, tof_rebin_pass, return_units):
tof_rebin_param_dict = self._get_create_van_tof_binning()
tof_rebin_param = tof_rebin_param_dict[str(tof_rebin_pass)]
rebinned_ws = mantid.ConvertUnits(InputWorkspace=vanadium_ws, Target="TOF")
rebinned_ws = mantid.Rebin(InputWorkspace=rebinned_ws, Params=tof_rebin_param)
rebinned_ws = mantid.ConvertUnits(InputWorkspace=rebinned_ws, Target=return_units)
common.remove_intermediate_workspace(vanadium_ws)
vanadium_ws = rebinned_ws
return vanadium_ws
def _generate_vanadium_absorb_corrections(self, calibration_full_paths, ws_to_match):
raise NotImplementedError("Generating absorption corrections needs to be implemented correctly")
# TODO are these values applicable to all instruments
shape_ws = mantid.CloneWorkspace(InputWorkspace=ws_to_match)
mantid.CreateSampleShape(InputWorkspace=shape_ws, ShapeXML='<sphere id="sphere_1"> <centre x="0" y="0" z= "0" />\
<radius val="0.005" /> </sphere>')
absorb_ws = \
mantid.AbsorptionCorrection(InputWorkspace=shape_ws, AttenuationXSection="5.08",
ScatteringXSection="5.1", SampleNumberDensity="0.072",
NumberOfWavelengthPoints="25", ElementSize="0.05")
mantid.SaveNexus(Filename=calibration_full_paths["vanadium_absorption"],
InputWorkspace=absorb_ws, Append=False)
common.remove_intermediate_workspace(shape_ws)
return absorb_ws
def _calibration_rebin_to_workspace(self, ws_to_rebin, ws_to_match):
rebinned_ws = mantid.RebinToWorkspace(WorkspaceToRebin=ws_to_rebin, WorkspaceToMatch=ws_to_match)
common.remove_intermediate_workspace(ws_to_rebin)
ws_to_rebin = rebinned_ws
return ws_to_rebin
def correct_sample_vanadium(self, focused_ws, index, vanadium_ws=None):
data_ws = mantid.ExtractSingleSpectrum(InputWorkspace=focused_ws, WorkspaceIndex=index)
data_ws = mantid.ConvertUnits(InputWorkspace=data_ws, Target="TOF")
data_ws = mantid.Rebin(InputWorkspace=data_ws, Params=self._focus_tof_binning)
if vanadium_ws:
data_processed = "van_processed" + str(index) # Workaround for Mantid overwriting the WS in a loop
vanadium_ws = mantid.Rebin(InputWorkspace=vanadium_ws, Params=self._focus_tof_binning)
data_ws = mantid.Divide(LHSWorkspace=data_ws, RHSWorkspace=vanadium_ws, OutputWorkspace=data_processed)
else:
data_processed = "processed-" + str(index)
mantid.CropWorkspace(InputWorkspace=data_ws, XMin=0.1, OutputWorkspace=data_processed)
if vanadium_ws:
mantid.Scale(InputWorkspace=data_processed, Factor=10, OutputWorkspace=data_processed)
common.remove_intermediate_workspace(data_ws)
return data_processed
# Implementation of instrument specific steps
def _run_attenuate_workspace(self, input_workspace):
if self._old_atten_file is None: # For old API support
attenuation_path = self._attenuation_full_path
else:
attenuation_path = self._old_atten_file
wc_attenuated = mantid.PearlMCAbsorption(attenuation_path)
wc_attenuated = mantid.ConvertToHistogram(InputWorkspace=wc_attenuated, OutputWorkspace=wc_attenuated)
wc_attenuated = mantid.RebinToWorkspace(WorkspaceToRebin=wc_attenuated, WorkspaceToMatch=input_workspace,
OutputWorkspace=wc_attenuated)
pearl_attenuated_ws = mantid.Divide(LHSWorkspace=input_workspace, RHSWorkspace=wc_attenuated)
common.remove_intermediate_workspace(workspace_name=wc_attenuated)
return pearl_attenuated_ws
def _do_silicon_calibration(self, runs_to_process, cal_file_name, grouping_file_name):
# TODO fix all of this as the script is too limited to be useful
create_si_ws = common._load_current_normalised_ws(number=runs_to_process, instrument=self)
cycle_details = self._get_cycle_information(runs_to_process)
instrument_version = cycle_details["instrument_version"]
if instrument_version == "new" or instrument_version == "new2":
create_si_ws = mantid.Rebin(InputWorkspace=create_si_ws, Params="100,-0.0006,19950")
create_si_d_spacing_ws = mantid.ConvertUnits(InputWorkspace=create_si_ws, Target="dSpacing")
if instrument_version == "new2":
create_si_d_spacing_rebin_ws = mantid.Rebin(InputWorkspace=create_si_d_spacing_ws, Params="1.71,0.002,2.1")
create_si_cross_corr_ws = mantid.CrossCorrelate(InputWorkspace=create_si_d_spacing_rebin_ws,
ReferenceSpectra=20, WorkspaceIndexMin=9,
WorkspaceIndexMax=1063, XMin=1.71, XMax=2.1)
elif instrument_version == "new":
create_si_d_spacing_rebin_ws = mantid.Rebin(InputWorkspace=create_si_d_spacing_ws, Params="1.85,0.002,2.05")
create_si_cross_corr_ws = mantid.CrossCorrelate(InputWorkspace=create_si_d_spacing_rebin_ws,
ReferenceSpectra=20, WorkspaceIndexMin=9,
WorkspaceIndexMax=943, XMin=1.85, XMax=2.05)
elif instrument_version == "old":
create_si_d_spacing_rebin_ws = mantid.Rebin(InputWorkspace=create_si_d_spacing_ws, Params="3,0.002,3.2")
create_si_cross_corr_ws = mantid.CrossCorrelate(InputWorkspace=create_si_d_spacing_rebin_ws,
ReferenceSpectra=500, WorkspaceIndexMin=1,
WorkspaceIndexMax=1440, XMin=3, XMax=3.2)
else:
raise NotImplementedError("The instrument version is not supported for creating a silicon calibration")
common.remove_intermediate_workspace(create_si_d_spacing_ws)
common.remove_intermediate_workspace(create_si_d_spacing_rebin_ws)
calibration_output_path = self.calibration_dir + cal_file_name
create_si_offsets_ws = mantid.GetDetectorOffsets(InputWorkspace=create_si_cross_corr_ws,
Step=0.002, DReference=1.920127251, XMin=-200, XMax=200,
GroupingFileName=calibration_output_path)
create_si_aligned_ws = mantid.AlignDetectors(InputWorkspace=create_si_ws,
CalibrationFile=calibration_output_path)
grouping_output_path = self.calibration_dir + grouping_file_name
create_si_grouped_ws = mantid.DiffractionFocussing(InputWorkspace=create_si_aligned_ws,
GroupingFileName=grouping_output_path)
del create_si_offsets_ws, create_si_grouped_ws
def _normalise_current_ws(self, ws_to_correct, load_monitor_ws, spline_terms):
get_monitor_ws = mantid.ConvertUnits(InputWorkspace=load_monitor_ws, Target="Wavelength")
common.remove_intermediate_workspace(load_monitor_ws)
lmin, lmax = self._get_lambda_range()
get_monitor_ws = mantid.CropWorkspace(InputWorkspace=get_monitor_ws, XMin=lmin, XMax=lmax)
ex_regions = numpy.zeros((2, 4))
ex_regions[:, 0] = [3.45, 3.7]
ex_regions[:, 1] = [2.96, 3.2]
ex_regions[:, 2] = [2.1, 2.26]
ex_regions[:, 3] = [1.73, 1.98]
for reg in range(0, 4):
get_monitor_ws = mantid.MaskBins(InputWorkspace=get_monitor_ws, XMin=ex_regions[0, reg],
XMax=ex_regions[1, reg])
monitor_ws = mantid.SplineBackground(InputWorkspace=get_monitor_ws, WorkspaceIndex=0, NCoeff=spline_terms)
normalised_ws = mantid.ConvertUnits(InputWorkspace=ws_to_correct, Target="Wavelength")
normalised_ws = mantid.NormaliseToMonitor(InputWorkspace=normalised_ws, MonitorWorkspace=monitor_ws,
IntegrationRangeMin=0.6, IntegrationRangeMax=5.0)
normalised_ws = mantid.ConvertUnits(InputWorkspace=normalised_ws, Target="TOF")
common.remove_intermediate_workspace(get_monitor_ws)
common.remove_intermediate_workspace(monitor_ws)
return normalised_ws
def _get_monitor_spectrum(self, run_number):
if run_number < 71009:
if self._focus_mode == "trans":
elif self._focus_mode == "all":
elif self._focus_mode == "novan":
else:
raise ValueError("Mode not set or supported")
else:
mspectra = 1
return mspectra
# Support for old API - This can be removed when PEARL_routines is removed
def _old_api_constructor_set(self, user_name=None, calibration_dir=None, raw_data_dir=None, output_dir=None,
input_file_ext=None, tt_mode=None):
# Any param can be set so check each individually
if user_name is not None:
self._user_name = user_name
if calibration_dir is not None:
self._calibration_dir = calibration_dir
if raw_data_dir is not None:
self._raw_data_dir = raw_data_dir
if output_dir is not None:
self._output_dir = output_dir
if input_file_ext is not None:
self.default_input_ext = input_file_ext
if tt_mode is not None:
self._tt_mode = tt_mode
def _old_api_set_tt_mode(self, tt_mode):
self._tt_mode = tt_mode
def _old_api_set_calib_dir(self, calib_dir):
self._calibration_dir = calib_dir
def _old_api_set_raw_data_dir(self, raw_data_dir):
self._disable_appending_cycle_to_raw_dir = True
self._raw_data_dir = raw_data_dir
def _old_api_set_output_dir(self, output_dir):
self._output_dir = output_dir
def _old_api_set_ext(self, ext):
self.default_input_ext = ext
def _old_api_set_atten(self, atten_file):
self._old_atten_file = _old_api_get_file_name(atten_file)
def _old_api_set_full_paths(self, val):
self._old_api_uses_full_paths = val
def _PEARL_filename_is_full_path(self):
return self._old_api_uses_full_paths
def _get_focus_tof_binning(self):
return self._focus_tof_binning
def _old_api_get_file_name(in_path):
# Gets the filename from a full path
return os.path.basename(in_path)
# Implementation of static methods
def _gen_file_name(run_number):
digit = len(str(run_number))
if run_number < 71009:
number_of_digits = 5
filename = "PRL"
else:
number_of_digits = 8
filename = "PEARL"
for i in range(0, number_of_digits - digit):
filename += "0"
filename += str(run_number)
return filename
def _get_instrument_ranges(instrument_version):
if instrument_version == "new" or instrument_version == "old": # New and old have identical ranges
alg_range = 12
save_range = 3
elif instrument_version == "new2":
alg_range = 14
save_range = 5
else:
raise ValueError("Instrument version unknown")
return alg_range, save_range
def _spline_new2_background(in_workspace, num_splines, instrument_version):
# remove bragg peaks before spline
alg_range, unused = _get_instrument_ranges(instrument_version)
van_stripped_ws = _strip_peaks_new_inst(in_workspace, alg_range)
# run twice on low angle as peaks are very broad
for i in range(0, 2):
van_stripped_ws = mantid.StripPeaks(InputWorkspace=van_stripped_ws, FWHM=100, Tolerance=10,
WorkspaceIndex=12)
van_stripped_ws = mantid.StripPeaks(InputWorkspace=van_stripped_ws, FWHM=60, Tolerance=10,
WorkspaceIndex=13)
van_stripped_ws = mantid.ConvertUnits(InputWorkspace=van_stripped_ws, Target="TOF")
splined_ws_list = _perform_spline_range(instrument_version, num_splines, van_stripped_ws)
common.remove_intermediate_workspace(van_stripped_ws)
return splined_ws_list
def _spline_new_background(in_workspace, num_splines, instrument_version):
# Remove bragg peaks before spline
alg_range, unused = _get_instrument_ranges(instrument_version)
van_stripped = _strip_peaks_new_inst(in_workspace, alg_range)
van_stripped = mantid.ConvertUnits(InputWorkspace=van_stripped, Target="TOF")
splined_ws_list = _perform_spline_range(instrument_version, num_splines, van_stripped)
common.remove_intermediate_workspace(van_stripped)
return splined_ws_list
def _strip_peaks_new_inst(input_ws, alg_range):
van_stripped_ws = mantid.StripPeaks(InputWorkspace=input_ws, FWHM=15, Tolerance=8, WorkspaceIndex=0)
for i in range(1, alg_range):
van_stripped_ws = mantid.StripPeaks(InputWorkspace=van_stripped_ws, FWHM=15, Tolerance=8,
WorkspaceIndex=i)
return van_stripped_ws
def _perform_spline_range(instrument_version, num_splines, stripped_ws):
ws_range, unused = _get_instrument_ranges(instrument_version)
splined_ws_list = []
for i in range(0, ws_range):
out_ws_name = "spline" + str(i + 1)
splined_ws_list.append(mantid.SplineBackground(InputWorkspace=stripped_ws, OutputWorkspace=out_ws_name,
WorkspaceIndex=i, NCoeff=num_splines))
return splined_ws_list
def _spline_old_instrument_background(in_workspace):
van_stripped = mantid.ConvertUnits(InputWorkspace=in_workspace, Target="dSpacing")
# remove bragg peaks before spline
van_stripped = mantid.StripPeaks(InputWorkspace=van_stripped, FWHM=15, Tolerance=6, WorkspaceIndex=0)
van_stripped = mantid.StripPeaks(InputWorkspace=van_stripped, FWHM=15, Tolerance=6, WorkspaceIndex=2)
van_stripped = mantid.StripPeaks(InputWorkspace=van_stripped, FWHM=15, Tolerance=6, WorkspaceIndex=3)
van_stripped = mantid.StripPeaks(InputWorkspace=van_stripped, FWHM=40, Tolerance=12, WorkspaceIndex=1)
van_stripped = mantid.StripPeaks(InputWorkspace=van_stripped, FWHM=60, Tolerance=12, WorkspaceIndex=1)
# Mask low d region that is zero before spline
for reg in range(0, 4):
if reg == 1:
van_stripped = mantid.MaskBins(InputWorkspace=van_stripped, XMin=0, XMax=0.14, SpectraList=reg)
else:
van_stripped = mantid.MaskBins(InputWorkspace=van_stripped, XMin=0, XMax=0.06, SpectraList=reg)
van_stripped = mantid.ConvertUnits(InputWorkspace=van_stripped, Target="TOF")
splined_ws_list = []
for i in range(0, 4):
out_ws_name = "spline" + str(i+1)
if i == 1:
coeff = 80
else:
coeff = 100
splined_ws_list.append(mantid.SplineBackground(InputWorkspace=van_stripped, OutputWorkspace=out_ws_name,
WorkspaceIndex=i, NCoeff=coeff))
common.remove_intermediate_workspace(van_stripped)
return splined_ws_list