from __future__ import (absolute_import, division, print_function) import mantid.simpleapi as mantid from isis_powder.abstract_inst import AbstractInst from isis_powder.polaris_routines import polaris_calib_factory import isis_powder.common as common class Polaris(AbstractInst): _lower_lambda_range = 0.25 _upper_lambda_range = 2.50 # TODO populate this _focus_tof_binning_params = None # TODO _calibration_grouping_names = None _number_of_banks = 5 def __init__(self, user_name=None, calibration_dir=None, raw_data_dir=None, output_dir=None, input_file_ext=".raw", sample_empty_name=None): # TODO move TT_mode PEARL specific super(Polaris, self).__init__(user_name=user_name, calibration_dir=calibration_dir, raw_data_dir=raw_data_dir, output_dir=output_dir, default_input_ext=input_file_ext) self._masking_file_name = "VanaPeaks.dat" self._sample_empty = sample_empty_name # Abstract implementation def _get_lambda_range(self): return self._lower_lambda_range, self._upper_lambda_range def _get_focus_tof_binning(self): return self._focus_tof_binning_params def _get_create_van_tof_binning(self): return self._create_van_calib_tof_binning def _get_default_group_names(self): return self._calibration_grouping_names def _get_calibration_full_paths(self, cycle): # TODO implement this properly offset_file_name, grouping_file_name, vanadium_file_name = polaris_calib_factory.get_calibration_file(cycle) calibration_dir = self.calibration_dir calibration_full_path = calibration_dir + offset_file_name grouping_full_path = calibration_dir + grouping_file_name vanadium_absorb_full_path = None vanadium_full_path = self.raw_data_dir + vanadium_file_name calibration_details = {"calibration": calibration_full_path, "grouping": grouping_full_path, "vanadium_absorption": vanadium_absorb_full_path, "vanadium": vanadium_full_path} return calibration_details @staticmethod def _generate_inst_file_name(run_number): return "POL" + str(run_number) # TODO check this is correct @staticmethod def _get_instrument_alg_save_ranges(instrument=''): alg_range = 5 save_range = 0 # TODO set save range return alg_range, save_range @staticmethod def _get_cycle_information(run_number): return {"cycle": "", # TODO implement properly "instrument_version": ""} def _normalise_ws(self, ws_to_correct, monitor_ws=None, spline_terms=20): normalised_ws = mantid.NormaliseByCurrent(InputWorkspace=ws_to_correct) return normalised_ws def _mask_noisy_detectors(self, vanadium_ws): summed_van_ws = mantid.Integration(InputWorkspace=vanadium_ws) # TODO do they want this masking detectors with too high a contribution? mantid.MaskDetectorsIf(InputWorkspace=summed_van_ws, InputCalFile=self._grouping_file_path, OutputCalFile=self._cal_file_path, Mode="DeselectIf", Operator="LessEqual", Value=10) def _calculate_solid_angle_efficiency_corrections(self, vanadium_ws): solid_angle_ws = mantid.SolidAngle(InputWorkspace=vanadium_ws) solid_angle_multiplicand = mantid.CreateSingleValuedWorkspace(DataValue=str(100)) solid_angle_ws = mantid.Multiply(LHSWorkspace=solid_angle_ws, RHSWorkspace=solid_angle_multiplicand) common.remove_intermediate_workspace(solid_angle_multiplicand) efficiency_ws = mantid.Divide(LHSWorkspace=vanadium_ws, RHSWorkspace=solid_angle_ws) efficiency_ws = mantid.ConvertUnits(InputWorkspace=efficiency_ws, Target="Wavelength") efficiency_ws = mantid.Integration(InputWorkspace=efficiency_ws, RangeLower=self._lower_lambda_range, RangeUpper=self._upper_lambda_range) corrections_ws = mantid.Multiply(LHSWorkspace=solid_angle_ws, RHSWorkspace=efficiency_ws) corrections_divisor_ws = mantid.CreateSingleValuedWorkspace(DataValue=str(100000)) corrections_ws = mantid.Divide(LHSWorkspace=corrections_ws, RHSWorkspace=corrections_divisor_ws) common.remove_intermediate_workspace(corrections_divisor_ws) common.remove_intermediate_workspace(solid_angle_ws) common.remove_intermediate_workspace(efficiency_ws) return corrections_ws def _subtract_sample_empty(self, input_sample): # TODO move this to be generated by calibration factory so we don't have to use the full fname if self._sample_empty is not None: empty_sample_path = self.calibration_dir + self._sample_empty empty_sample = mantid.Load(Filename=empty_sample_path) empty_sample = self._normalise_ws(empty_sample) input_sample = mantid.Minus(LHSWorkspace=input_sample, RHSWorkspace=empty_sample) common.remove_intermediate_workspace(empty_sample) return input_sample def _apply_solid_angle_efficiency_corr(self, ws_to_correct, vanadium_number=None, vanadium_path=None): assert(vanadium_number or vanadium_path) if vanadium_number: solid_angle_vanadium_ws = common._load_raw_files(run_number=vanadium_number, instrument=self) else: solid_angle_vanadium_ws = mantid.Load(Filename=vanadium_path) solid_angle_vanadium_ws = self._normalise_ws(solid_angle_vanadium_ws) corrections = self._calculate_solid_angle_efficiency_corrections(solid_angle_vanadium_ws) corrected_ws = mantid.Divide(LHSWorkspace=ws_to_correct, RHSWorkspace=corrections) common.remove_intermediate_workspace(solid_angle_vanadium_ws) common.remove_intermediate_workspace(corrections) common.remove_intermediate_workspace(ws_to_correct) ws_to_correct = corrected_ws return ws_to_correct def _focus_processing(self, run_number, input_workspace, perform_vanadium_norm): self._get_cycle_information(run_number=run_number) def _spline_background(self, focused_vanadium_ws, spline_number, instrument_version=''): if spline_number is None: spline_number = 100 mode = "spline" # TODO support spline modes for all instruments extracted_spectra = _extract_vanadium_spectra(focused_vanadium_ws, self._number_of_banks) if mode == "spline": output = self._mask_spline_vanadium_ws(vanadium_spectra_list=extracted_spectra, spline_coefficient=spline_number) for ws in extracted_spectra: common.remove_intermediate_workspace(ws) return output def _generate_vanadium_absorb_corrections(self, calibration_full_paths, ws_to_match): absorb_ws = mantid.CloneWorkspace(InputWorkspace=ws_to_match) # TODO move all of this into defaults cylinder_sample_height = str(4) cylinder_sample_radius = str(0.4) attenuation_cross_section = str(4.88350) scattering_cross_section = str(5.15775) sample_number_density = str(0.0718956) number_of_slices = str(10) number_of_annuli = str(10) number_of_wavelength_points = str(100) exp_method = "Normal" # TODO move all of the above into defaults absorb_ws = mantid.CylinderAbsorption(InputWorkspace=absorb_ws, CylinderSampleHeight=cylinder_sample_height, CylinderSampleRadius=cylinder_sample_radius, AttenuationXSection=attenuation_cross_section, ScatteringXSection=scattering_cross_section, SampleNumberDensity=sample_number_density, NumberOfSlices=number_of_slices, NumberOfAnnuli=number_of_annuli, NumberOfWavelengthPoints=number_of_wavelength_points, ExpMethod=exp_method) return absorb_ws def _read_masking_file(self): all_banks_masking_list = [] bank_masking_list = [] mask_path = self.raw_data_dir + self._masking_file_name ignore_line_prefixes = (' ', '\n', '\t', '#') # Matches whitespace or # symbol with open(mask_path) as mask_file: for line in mask_file: if line.startswith(ignore_line_prefixes): # Push back onto new bank all_banks_masking_list.append(bank_masking_list) bank_masking_list = [] else: line.rstrip() bank_masking_list.append(line.split()) return all_banks_masking_list def _mask_spline_vanadium_ws(self, vanadium_spectra_list, spline_coefficient): masked_workspace = _apply_masking(workspaces_to_mask=vanadium_spectra_list, mask_list=self._read_masking_file()) index = 0 output_list = [] for ws in masked_workspace: index += 1 output_ws_name = "splined_vanadium_ws-" + str(index) splined_ws = mantid.SplineBackground(InputWorkspace=ws, OutputWorkspace=output_ws_name, WorkspaceIndex=0, NCoeff=spline_coefficient) output_list.append(splined_ws) return output_list # Class private implementation def _extract_vanadium_spectra(vanadium_ws, num_banks): vanadium_spectra_list = [] for i in range(0, num_banks): output_name = "vanadium-" + str(i + 1) # Have to use crop workspace as extract single spectrum struggles with the variable bin widths vanadium_spectra_list.append(mantid.CropWorkspace(InputWorkspace=vanadium_ws, OutputWorkspace=output_name, StartWorkspaceIndex=i, EndWorkspaceIndex=i)) return vanadium_spectra_list def _apply_masking(workspaces_to_mask, mask_list): index = 0 output_workspaces = [] for ws in workspaces_to_mask: output_workspaces.append(ws) for bank_mask_list in mask_list: if not bank_mask_list: continue output_name = "masked_vanadium-" + str(index + 1) for mask_params in bank_mask_list: out_workspace = mantid.MaskBins(InputWorkspace=output_workspaces[index], OutputWorkspace=output_name, XMin=mask_params[0], XMax=mask_params[1]) output_workspaces[index] = out_workspace index += 1 return output_workspaces def _divide_sample_vanadium_splines(sample_ws, vanadium_splines_ws)