-
David Fairbrother authoredDavid Fairbrother authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
polaris.py 10.77 KiB
from __future__ import (absolute_import, division, print_function)
import mantid.simpleapi as mantid
from isis_powder.abstract_inst import AbstractInst
from isis_powder.polaris_routines import polaris_calib_factory
import isis_powder.common as common
class Polaris(AbstractInst):
_lower_lambda_range = 0.25
_upper_lambda_range = 2.50 # TODO populate this
_focus_tof_binning_params = None # TODO
_calibration_grouping_names = None
_number_of_banks = 5
def __init__(self, user_name=None, calibration_dir=None, raw_data_dir=None, output_dir=None,
input_file_ext=".raw", sample_empty_name=None): # TODO move TT_mode PEARL specific
super(Polaris, self).__init__(user_name=user_name, calibration_dir=calibration_dir, raw_data_dir=raw_data_dir,
output_dir=output_dir, default_input_ext=input_file_ext)
self._masking_file_name = "VanaPeaks.dat"
self._sample_empty = sample_empty_name
# Abstract implementation
def _get_lambda_range(self):
return self._lower_lambda_range, self._upper_lambda_range
def _get_focus_tof_binning(self):
return self._focus_tof_binning_params
def _get_create_van_tof_binning(self):
return self._create_van_calib_tof_binning
def _get_default_group_names(self):
return self._calibration_grouping_names
def _get_calibration_full_paths(self, cycle):
# TODO implement this properly
offset_file_name, grouping_file_name, vanadium_file_name = polaris_calib_factory.get_calibration_file(cycle)
calibration_dir = self.calibration_dir
calibration_full_path = calibration_dir + offset_file_name
grouping_full_path = calibration_dir + grouping_file_name
vanadium_absorb_full_path = None
vanadium_full_path = self.raw_data_dir + vanadium_file_name
calibration_details = {"calibration": calibration_full_path,
"grouping": grouping_full_path,
"vanadium_absorption": vanadium_absorb_full_path,
"vanadium": vanadium_full_path}
return calibration_details
@staticmethod
def _generate_inst_file_name(run_number):
return "POL" + str(run_number) # TODO check this is correct
@staticmethod
def _get_instrument_alg_save_ranges(instrument=''):
alg_range = 5
save_range = 0 # TODO set save range
return alg_range, save_range
@staticmethod
def _get_cycle_information(run_number):
return {"cycle": "", # TODO implement properly
"instrument_version": ""}
def _normalise_ws(self, ws_to_correct, monitor_ws=None, spline_terms=20):
normalised_ws = mantid.NormaliseByCurrent(InputWorkspace=ws_to_correct)
return normalised_ws
def _mask_noisy_detectors(self, vanadium_ws):
summed_van_ws = mantid.Integration(InputWorkspace=vanadium_ws)
# TODO do they want this masking detectors with too high a contribution?
mantid.MaskDetectorsIf(InputWorkspace=summed_van_ws, InputCalFile=self._grouping_file_path,
OutputCalFile=self._cal_file_path, Mode="DeselectIf", Operator="LessEqual", Value=10)
def _calculate_solid_angle_efficiency_corrections(self, vanadium_ws):
solid_angle_ws = mantid.SolidAngle(InputWorkspace=vanadium_ws)
solid_angle_multiplicand = mantid.CreateSingleValuedWorkspace(DataValue=str(100))
solid_angle_ws = mantid.Multiply(LHSWorkspace=solid_angle_ws, RHSWorkspace=solid_angle_multiplicand)
common.remove_intermediate_workspace(solid_angle_multiplicand)
efficiency_ws = mantid.Divide(LHSWorkspace=vanadium_ws, RHSWorkspace=solid_angle_ws)
efficiency_ws = mantid.ConvertUnits(InputWorkspace=efficiency_ws, Target="Wavelength")
efficiency_ws = mantid.Integration(InputWorkspace=efficiency_ws,
RangeLower=self._lower_lambda_range, RangeUpper=self._upper_lambda_range)
corrections_ws = mantid.Multiply(LHSWorkspace=solid_angle_ws, RHSWorkspace=efficiency_ws)
corrections_divisor_ws = mantid.CreateSingleValuedWorkspace(DataValue=str(100000))
corrections_ws = mantid.Divide(LHSWorkspace=corrections_ws, RHSWorkspace=corrections_divisor_ws)
common.remove_intermediate_workspace(corrections_divisor_ws)
common.remove_intermediate_workspace(solid_angle_ws)
common.remove_intermediate_workspace(efficiency_ws)
return corrections_ws
def _subtract_sample_empty(self, input_sample):
# TODO move this to be generated by calibration factory so we don't have to use the full fname
if self._sample_empty is not None:
empty_sample_path = self.calibration_dir + self._sample_empty
empty_sample = mantid.Load(Filename=empty_sample_path)
empty_sample = self._normalise_ws(empty_sample)
input_sample = mantid.Minus(LHSWorkspace=input_sample, RHSWorkspace=empty_sample)
common.remove_intermediate_workspace(empty_sample)
return input_sample
def _apply_solid_angle_efficiency_corr(self, ws_to_correct, vanadium_number=None, vanadium_path=None):
assert(vanadium_number or vanadium_path)
if vanadium_number:
solid_angle_vanadium_ws = common._load_raw_files(run_number=vanadium_number, instrument=self)
else:
solid_angle_vanadium_ws = mantid.Load(Filename=vanadium_path)
solid_angle_vanadium_ws = self._normalise_ws(solid_angle_vanadium_ws)
corrections = self._calculate_solid_angle_efficiency_corrections(solid_angle_vanadium_ws)
corrected_ws = mantid.Divide(LHSWorkspace=ws_to_correct, RHSWorkspace=corrections)
common.remove_intermediate_workspace(solid_angle_vanadium_ws)
common.remove_intermediate_workspace(corrections)
common.remove_intermediate_workspace(ws_to_correct)
ws_to_correct = corrected_ws
return ws_to_correct
def _focus_processing(self, run_number, input_workspace, perform_vanadium_norm):
self._get_cycle_information(run_number=run_number)
def _spline_background(self, focused_vanadium_ws, spline_number, instrument_version=''):
if spline_number is None:
spline_number = 100
mode = "spline" # TODO support spline modes for all instruments
extracted_spectra = _extract_vanadium_spectra(focused_vanadium_ws, self._number_of_banks)
if mode == "spline":
output = self._mask_spline_vanadium_ws(vanadium_spectra_list=extracted_spectra,
spline_coefficient=spline_number)
for ws in extracted_spectra:
common.remove_intermediate_workspace(ws)
return output
def _generate_vanadium_absorb_corrections(self, calibration_full_paths, ws_to_match):
absorb_ws = mantid.CloneWorkspace(InputWorkspace=ws_to_match)
# TODO move all of this into defaults
cylinder_sample_height = str(4)
cylinder_sample_radius = str(0.4)
attenuation_cross_section = str(4.88350)
scattering_cross_section = str(5.15775)
sample_number_density = str(0.0718956)
number_of_slices = str(10)
number_of_annuli = str(10)
number_of_wavelength_points = str(100)
exp_method = "Normal"
# TODO move all of the above into defaults
absorb_ws = mantid.CylinderAbsorption(InputWorkspace=absorb_ws,
CylinderSampleHeight=cylinder_sample_height,
CylinderSampleRadius=cylinder_sample_radius,
AttenuationXSection=attenuation_cross_section,
ScatteringXSection=scattering_cross_section,
SampleNumberDensity=sample_number_density,
NumberOfSlices=number_of_slices,
NumberOfAnnuli=number_of_annuli,
NumberOfWavelengthPoints=number_of_wavelength_points,
ExpMethod=exp_method)
return absorb_ws
def _read_masking_file(self):
all_banks_masking_list = []
bank_masking_list = []
mask_path = self.raw_data_dir + self._masking_file_name
ignore_line_prefixes = (' ', '\n', '\t', '#') # Matches whitespace or # symbol
with open(mask_path) as mask_file:
for line in mask_file:
if line.startswith(ignore_line_prefixes):
# Push back onto new bank
all_banks_masking_list.append(bank_masking_list)
bank_masking_list = []
else:
line.rstrip()
bank_masking_list.append(line.split())
return all_banks_masking_list
def _mask_spline_vanadium_ws(self, vanadium_spectra_list, spline_coefficient):
masked_workspace = _apply_masking(workspaces_to_mask=vanadium_spectra_list, mask_list=self._read_masking_file())
index = 0
output_list = []
for ws in masked_workspace:
index += 1
output_ws_name = "splined_vanadium_ws-" + str(index)
splined_ws = mantid.SplineBackground(InputWorkspace=ws, OutputWorkspace=output_ws_name,
WorkspaceIndex=0, NCoeff=spline_coefficient)
output_list.append(splined_ws)
return output_list
# Class private implementation
def _extract_vanadium_spectra(vanadium_ws, num_banks):
vanadium_spectra_list = []
for i in range(0, num_banks):
output_name = "vanadium-" + str(i + 1)
# Have to use crop workspace as extract single spectrum struggles with the variable bin widths
vanadium_spectra_list.append(mantid.CropWorkspace(InputWorkspace=vanadium_ws, OutputWorkspace=output_name,
StartWorkspaceIndex=i, EndWorkspaceIndex=i))
return vanadium_spectra_list
def _apply_masking(workspaces_to_mask, mask_list):
index = 0
output_workspaces = []
for ws in workspaces_to_mask:
output_workspaces.append(ws)
for bank_mask_list in mask_list:
if not bank_mask_list:
continue
output_name = "masked_vanadium-" + str(index + 1)
for mask_params in bank_mask_list:
out_workspace = mantid.MaskBins(InputWorkspace=output_workspaces[index], OutputWorkspace=output_name,
XMin=mask_params[0], XMax=mask_params[1])
output_workspaces[index] = out_workspace
index += 1
return output_workspaces
def _divide_sample_vanadium_splines(sample_ws, vanadium_splines_ws)