Newer
Older
from __future__ import (absolute_import, division, print_function)
import mantid.simpleapi as mantid
import isis_powder.routines.common as common
from isis_powder.routines.common_enums import INPUT_BATCHING
def focus(run_number_string, instrument, perform_vanadium_norm, absorb, sample_details=None):
input_batching = instrument._get_input_batching_mode()
if input_batching == INPUT_BATCHING.Individual:
return _individual_run_focusing(instrument=instrument, perform_vanadium_norm=perform_vanadium_norm,
run_number=run_number_string, absorb=absorb, sample_details=sample_details)
elif input_batching == INPUT_BATCHING.Summed:
return _batched_run_focusing(instrument, perform_vanadium_norm, run_number_string, absorb=absorb,
sample_details=sample_details)
else:
raise ValueError("Input batching not passed through. Please contact development team.")
def _focus_one_ws(input_workspace, run_number, instrument, perform_vanadium_norm, absorb, sample_details, vanadium_path):
run_details = instrument._get_run_details(run_number_string=run_number)
if perform_vanadium_norm:
_test_splined_vanadium_exists(instrument, run_details)
# Subtract empty instrument runs, as long as this run isn't an empty and user hasn't turned empty subtraction off
if not common.runs_overlap(run_number, run_details.empty_runs) and instrument.should_subtract_empty_inst():
input_workspace = common.subtract_summed_runs(ws_to_correct=input_workspace, instrument=instrument,
empty_sample_ws_string=run_details.empty_runs)
# Subtract a sample empty if specified
if run_details.sample_empty:
input_workspace = common.subtract_summed_runs(ws_to_correct=input_workspace, instrument=instrument,
empty_sample_ws_string=run_details.sample_empty,
scale_factor=instrument._inst_settings.sample_empty_scale)
# Crop to largest acceptable TOF range
input_workspace = instrument._crop_raw_to_expected_tof_range(ws_to_crop=input_workspace)
# Correct for absorption / multiple scattering if required
if absorb:
input_workspace = instrument._apply_absorb_corrections(run_details=run_details, ws_to_correct=input_workspace)
else:
# Set sample material if specified by the user
if sample_details is not None:
mantid.SetSample(InputWorkspace=input_workspace,
Geometry=common.generate_sample_geometry(sample_details),
Material=common.generate_sample_material(sample_details))
aligned_ws = mantid.AlignDetectors(InputWorkspace=input_workspace,
CalibrationFile=run_details.offset_file_path)
# Focus the spectra into banks
focused_ws = mantid.DiffractionFocussing(InputWorkspace=aligned_ws,
GroupingFileName=run_details.grouping_file_path)
calibrated_spectra = _apply_vanadium_corrections(instrument=instrument,
input_workspace=focused_ws,
perform_vanadium_norm=perform_vanadium_norm,
vanadium_splines=vanadium_path)
output_spectra = instrument._crop_banks_to_user_tof(calibrated_spectra)
bin_widths = instrument._get_instrument_bin_widths()
if bin_widths:
# Reduce the bin width if required on this instrument
output_spectra = common.rebin_workspace_list(workspace_list=output_spectra,
bin_width_list=bin_widths)
# Output
d_spacing_group, tof_group = instrument._output_focused_ws(output_spectra, run_details=run_details)
common.keep_single_ws_unit(d_spacing_group=d_spacing_group, tof_group=tof_group,
unit_to_keep=instrument._get_unit_to_keep())
# Tidy workspaces from Mantid
common.remove_intermediate_workspace(input_workspace)
common.remove_intermediate_workspace(aligned_ws)
common.remove_intermediate_workspace(focused_ws)
common.remove_intermediate_workspace(output_spectra)
return d_spacing_group
def _apply_vanadium_corrections(instrument, input_workspace, perform_vanadium_norm, vanadium_splines):
input_workspace = mantid.ConvertUnits(InputWorkspace=input_workspace, OutputWorkspace=input_workspace, Target="TOF")
split_data_spectra = common.extract_ws_spectra(input_workspace)
if perform_vanadium_norm:
processed_spectra = _divide_by_vanadium_splines(spectra_list=split_data_spectra,
vanadium_splines=vanadium_splines,
instrument=instrument)
processed_spectra = split_data_spectra
return processed_spectra
def _batched_run_focusing(instrument, perform_vanadium_norm, run_number_string, absorb, sample_details):
read_ws_list = common.load_current_normalised_ws_list(run_number_string=run_number_string,
instrument=instrument)
run_details = instrument._get_run_details(run_number_string=run_number_string)
if perform_vanadium_norm:
vanadium_splines = mantid.LoadNexus(Filename=run_details.splined_vanadium_file_path)
output = None
for ws in read_ws_list:
output = _focus_one_ws(input_workspace=ws, run_number=run_number_string, instrument=instrument,
perform_vanadium_norm=perform_vanadium_norm, absorb=absorb,
sample_details=sample_details, vanadium_path=vanadium_splines)
return output
def _divide_one_spectrum_by_spline(spectrum, spline,instrument):
rebinned_spline = mantid.RebinToWorkspace(WorkspaceToRebin=spline, WorkspaceToMatch=spectrum, StoreInADS=False)
if instrument.get_instrument_prefix() == "GEM":
divided = mantid.Divide(LHSWorkspace=spectrum, RHSWorkspace=rebinned_spline, OutputWorkspace=spectrum,
StoreInADS=False)
output = _crop_spline_to_percent_of_max(rebinned_spline, divided)
mantid.mtd['output'] = output
return output
divided = mantid.Divide(LHSWorkspace=spectrum, RHSWorkspace=rebinned_spline, OutputWorkspace=spectrum)
return divided
def _divide_by_vanadium_splines(spectra_list, vanadium_splines, instrument):
if hasattr(vanadium_splines, "OutputWorkspace"): # vanadium_splines is a group
vanadium_splines = vanadium_splines.OutputWorkspace
num_splines = len(vanadium_splines)
num_spectra = len(spectra_list)
raise RuntimeError("Mismatch between number of banks in vanadium and number of banks in workspace to focus"
"\nThere are {} banks for vanadium but {} for the run".format(num_splines, num_spectra))
output_list = [_divide_one_spectrum_by_spline(data_ws, van_ws, instrument)
for data_ws, van_ws in zip(spectra_list, vanadium_splines)]
return output_list
output_list = [_divide_one_spectrum_by_spline(spectra_list[0], vanadium_splines)]
common.remove_intermediate_workspace(vanadium_splines)
return output_list
def _individual_run_focusing(instrument, perform_vanadium_norm, run_number, absorb, sample_details):
# Load and process one by one
run_numbers = common.generate_run_numbers(run_number_string=run_number)
run_details = instrument._get_run_details(run_number_string=run_number)
if perform_vanadium_norm:
vanadium_splines = mantid.LoadNexus(Filename=run_details.splined_vanadium_file_path)
output = None
ws = common.load_current_normalised_ws_list(run_number_string=run, instrument=instrument)
output = _focus_one_ws(input_workspace=ws[0], run_number=run, instrument=instrument, absorb=absorb,
perform_vanadium_norm=perform_vanadium_norm, sample_details=sample_details,
vanadium_path=vanadium_splines)
return output
def _test_splined_vanadium_exists(instrument, run_details):
# Check the necessary splined vanadium file has been created
if not os.path.isfile(run_details.splined_vanadium_file_path):
raise ValueError("Processed vanadium runs not found at this path: "
+ str(run_details.splined_vanadium_file_path) +
" \nHave you run the method to create a Vanadium spline with these settings yet?\n")
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
def _crop_spline_to_percent_of_max(spline, input_ws):
y_val = 0
spline_spectrum = spline.readY(0)
for i in range(spline.blocksize()):
temp = spline_spectrum[i]
if float(y_val) < float(temp):
y_val = temp
y_val = y_val/100
x_min = 0
before_min = True
x_list = input_ws.readX(0)
x_max = x_list[-1]
for i in range(spline.blocksize()):
if before_min:
if float(y_val) > float(spline_spectrum[i]):
x_min = x_list[i]
else:
before_min = False
else:
if float(y_val) < float(spline_spectrum[i]):
x_max = x_list[i]
output = mantid.CropWorkspace(inputWorkspace=input_ws, XMin=x_min, XMax=x_max, StoreInADS=False)
return output