Skip to content
Snippets Groups Projects
polaris_algs.py 13.7 KiB
Newer Older
# Mantid Repository : https://github.com/mantidproject/mantid
#
# Copyright © 2018 ISIS Rutherford Appleton Laboratory UKRI,
#     NScD Oak Ridge National Laboratory, European Spallation Source
#     & Institut Laue - Langevin
# SPDX - License - Identifier: GPL - 3.0 +
from __future__ import (absolute_import, division, print_function)

import mantid.simpleapi as mantid
from six import string_types
from isis_powder.routines import absorb_corrections, common
from isis_powder.routines.common_enums import WORKSPACE_UNITS
from isis_powder.routines.run_details import create_run_details_object, get_cal_mapping_dict
from isis_powder.polaris_routines import polaris_advanced_config
def calculate_van_absorb_corrections(ws_to_correct, multiple_scattering, is_vanadium):
    mantid.MaskDetectors(ws_to_correct, SpectraList=list(range(1, 55)))
    absorb_dict = polaris_advanced_config.absorption_correction_params
    sample_details_obj = absorb_corrections.create_vanadium_sample_details_obj(config_dict=absorb_dict)
    ws_to_correct = absorb_corrections.run_cylinder_absorb_corrections(
        ws_to_correct=ws_to_correct, multiple_scattering=multiple_scattering, sample_details_obj=sample_details_obj,
        is_vanadium=is_vanadium)
def _get_run_numbers_for_key(current_mode_run_numbers, key):
    err_message = "this must be under the relevant Rietveld or PDF mode."
    return common.cal_map_dictionary_key_helper(current_mode_run_numbers, key=key,
                                                append_to_error_message=err_message)


def _get_current_mode_dictionary(run_number_string, inst_settings):
    mapping_dict = get_cal_mapping_dict(run_number_string, inst_settings.cal_mapping_path)
    if inst_settings.mode is None:
        ws = mantid.Load('POLARIS'+run_number_string+'.nxs')
        mode, cropping_vals = _determine_chopper_mode(ws)
        inst_settings.mode = mode
        inst_settings.focused_cropping_values = cropping_vals
        mantid.DeleteWorkspace(ws)
    # Get the current mode "Rietveld" or "PDF" run numbers
    return common.cal_map_dictionary_key_helper(mapping_dict, inst_settings.mode)


def get_run_details(run_number_string, inst_settings, is_vanadium_run):
    mode_run_numbers = _get_current_mode_dictionary(run_number_string, inst_settings)
    # Get empty and vanadium
    err_message = "this must be under the relevant Rietveld or PDF mode."
    empty_runs = common.cal_map_dictionary_key_helper(mode_run_numbers,
                                                      key="empty_run_numbers", append_to_error_message=err_message)
    vanadium_runs = common.cal_map_dictionary_key_helper(mode_run_numbers, key="vanadium_run_numbers",
                                                         append_to_error_message=err_message)
    grouping_file_name = inst_settings.grouping_file_name
    return create_run_details_object(run_number_string=run_number_string, inst_settings=inst_settings,
                                     is_vanadium_run=is_vanadium_run, empty_run_number=empty_runs,
                                     vanadium_string=vanadium_runs, grouping_file_name=grouping_file_name)
def save_unsplined_vanadium(vanadium_ws, output_path):
    for ws_index in range(vanadium_ws.getNumberOfEntries()):
        ws = vanadium_ws.getItem(ws_index)
        previous_units = ws.getAxis(0).getUnit().unitID()

        if previous_units != WORKSPACE_UNITS.tof:
            ws = mantid.ConvertUnits(InputWorkspace=ws, Target=WORKSPACE_UNITS.tof)

        ws = mantid.RenameWorkspace(InputWorkspace=ws, OutputWorkspace="van_bank_{}".format(ws_index + 1))
        converted_workspaces.append(ws)

    converted_group = mantid.GroupWorkspaces(",".join(ws.name() for ws in converted_workspaces))
    mantid.SaveNexus(InputWorkspace=converted_group, Filename=output_path, Append=False)
    mantid.DeleteWorkspace(converted_group)
def generate_ts_pdf(run_number, focus_file_path, merge_banks=False, q_lims=None, cal_file_name=None,
                    sample_details=None):
    focused_ws = _obtain_focused_run(run_number, focus_file_path)
    focused_ws = mantid.ConvertUnits(InputWorkspace=focused_ws, Target="MomentumTransfer", EMode='Elastic')
    self_scattering_correction = _calculate_self_scattering_correction(run_number, cal_file_name, sample_details)

    ws_group_list = []
    for i in range(self_scattering_correction.getNumberHistograms()):
        ws_name = 'correction_' + str(i)
        mantid.ExtractSpectra(InputWorkspace=self_scattering_correction, OutputWorkspace=ws_name,
                              WorkspaceIndexList=[i])
        ws_group_list.append(ws_name)
    self_scattering_correction = mantid.GroupWorkspaces(InputWorkspaces=ws_group_list)
    self_scattering_correction = mantid.RebinToWorkspace(WorkspaceToRebin=self_scattering_correction,
                                                         WorkspaceToMatch=focused_ws)
    focused_ws = mantid.Subtract(LHSWorkspace=focused_ws, RHSWorkspace=self_scattering_correction)
    if merge_banks:
        merged_ws = _merge_workspace_with_limits(focused_ws, q_lims)
        pdf_output = mantid.PDFFourierTransform(Inputworkspace=merged_ws, InputSofQType="S(Q)-1", PDFType="G(r)",
        pdf_output = mantid.PDFFourierTransform(Inputworkspace='focused_ws', InputSofQType="S(Q)-1",
                                                PDFType="G(r)", Filter=True)
        pdf_output = mantid.RebinToWorkspace(WorkspaceToRebin=pdf_output, WorkspaceToMatch=pdf_output[4],
                                             PreserveEvents=True)
    common.remove_intermediate_workspace('self_scattering_correction')
def _obtain_focused_run(run_number, focus_file_path):
    """
    Searches for the focused workspace to use (based on user specified run number) in the ADS and then the output
    directory.
    If unsuccessful, a ValueError exception is thrown.
    :param run_number: The run number to search for.
    :param focus_file_path: The expected file path for the focused file.
    :return: The focused workspace.
    """
    # Try the ADS first to avoid undesired loading
    if mantid.mtd.doesExist('%s-Results-TOF-Grp' % run_number):
        focused_ws = mantid.mtd['%s-Results-TOF-Grp' % run_number]
    elif mantid.mtd.doesExist('%s-Results-D-Grp' % run_number):
        focused_ws = mantid.mtd['%s-Results-D-Grp' % run_number]
    else:
        # Check output directory
        print('No loaded focused files found. Searching in output directory...')
        try:
            focused_ws = mantid.LoadNexus(Filename=focus_file_path, OutputWorkspace='focused_ws').OutputWorkspace
        except ValueError:
            raise ValueError("Could not find focused file for run number:%s\n"
                             "Please ensure a focused file has been produced and is located in the output directory."
                             % run_number)
    return focused_ws


def _merge_workspace_with_limits(focused_ws, q_lims):
    min_x = np.inf
    max_x = -np.inf
    num_x = -np.inf
    ws_max_range = 0
    largest_range_spectrum = 0
    for i in range(focused_ws.size()):
        x_data = focused_ws[i].dataX(0)
        min_x = min(np.min(x_data), min_x)
        max_x = max(np.max(x_data), max_x)
        num_x = max(x_data.size, num_x)
        ws_range = np.max(x_data)-np.min(x_data)
        if ws_range > ws_max_range:
            largest_range_spectrum = i + 1
    if min_x == -np.inf or max_x == np.inf:
        raise AttributeError("Workspace x range contains an infinite value.")
    focused_ws = mantid.Rebin(InputWorkspace=focused_ws, Params=[min_x, (max_x-min_x)/num_x, max_x])
    while focused_ws.size() > 1:
        mantid.ConjoinWorkspaces(InputWorkspace1=focused_ws[0],
                                 InputWorkspace2=focused_ws[1])
    focused_ws_conjoined = focused_ws[0]
    mantid.MatchSpectra(InputWorkspace=focused_ws_conjoined, OutputWorkspace=focused_ws_conjoined,
                        ReferenceSpectrum=largest_range_spectrum)
    q_min, q_max = _load_qlims(q_lims)
    for i in range(q_min.size):
        pdf_x_array = focused_ws_conjoined.readX(i)
        tmp1 = np.where(pdf_x_array >= q_min[i])
        tmp2 = np.amin(tmp1)
        q_min[i] = pdf_x_array[tmp2]
        q_max[i] = pdf_x_array[np.amax(np.where(pdf_x_array <= q_max[i]))]
        bin_width = min(pdf_x_array[1] - pdf_x_array[0], bin_width)

    if min_x == -np.inf or max_x == np.inf:
        raise AttributeError("Q lims contains an infinite value.")
    focused_data_combined = mantid.CropWorkspaceRagged(InputWorkspace=focused_ws_conjoined, XMin=q_min, XMax=q_max)
    focused_data_combined = mantid.Rebin(InputWorkspace=focused_data_combined,
                                         Params=[min(q_min), bin_width, max(q_max)])
    focused_data_combined = mantid.SumSpectra(InputWorkspace=focused_data_combined,
                                              WeightedSum=True,
    common.remove_intermediate_workspace(focused_ws_conjoined)
    return focused_data_combined


def _load_qlims(q_lims):
    if isinstance(q_lims, string_types):
        q_min = []
        q_max = []
        try:
            with open(q_lims, 'r') as f:
                line_list = [line.rstrip('\n') for line in f]
                for line in line_list[1:]:
                    value_list = line.split()
                    q_min.append(float(value_list[2]))
                    q_max.append(float(value_list[3]))
            q_min = np.array(q_min)
            q_max = np.array(q_max)
        except IOError as exc:
            raise RuntimeError("q_lims path is not valid: {}".format(exc))
    elif isinstance(q_lims, (list, tuple)) or isinstance(q_lims, np.ndarray):
        q_min = q_lims[0, :]
        q_max = q_lims[1, :]
    else:
        raise RuntimeError("q_lims type is not valid. Expected a string filename or an array.")
def _calculate_self_scattering_correction(run_number, cal_file_name, sample_details):
    raw_ws = mantid.Load(Filename='POLARIS'+str(run_number)+'.nxs')
    mantid.SetSample(InputWorkspace=raw_ws,
                     Geometry=common.generate_sample_geometry(sample_details),
                     Material=common.generate_sample_material(sample_details))
    # find the closest monitor to the sample for incident spectrum
    raw_spec_info = raw_ws.spectrumInfo()
    incident_index = None
    for i in range(raw_spec_info.size()):
        if raw_spec_info.isMonitor(i):
            l2 = raw_spec_info.position(i)[2]
            if not incident_index:
                incident_index = i
            else:
                if raw_spec_info.position(incident_index)[2] < l2 < 0:
                    incident_index = i
    monitor = mantid.ExtractSpectra(InputWorkspace=raw_ws, WorkspaceIndexList=[incident_index])
    monitor = mantid.ConvertUnits(InputWorkspace=monitor, Target="Wavelength")
    x_data = monitor.dataX(0)
    min_x = np.min(x_data)
    max_x = np.max(x_data)
    width_x = (max_x - min_x) / x_data.size
    fit_spectra = mantid.FitIncidentSpectrum(InputWorkspace=monitor,
                                             BinningForCalc=[min_x, 1 * width_x, max_x],
                                             BinningForFit=[min_x, 10 * width_x, max_x],
                                             FitSpectrumWith="CubicSpline")
    self_scattering_correction = mantid.CalculatePlaczekSelfScattering(InputWorkspace=raw_ws,
                                                                       IncidentSpecta=fit_spectra)
    cal_workspace = mantid.LoadCalFile(InputWorkspace=self_scattering_correction,
                                       CalFileName=cal_file_name,
                                       Workspacename='cal_workspace',
                                       MakeOffsetsWorkspace=False,
                                       MakeMaskWorkspace=False)
    self_scattering_correction = mantid.DiffractionFocussing(InputWorkspace=self_scattering_correction,
                                                             GroupingFilename=cal_file_name)
    n_pixel = np.zeros(self_scattering_correction.getNumberHistograms())
    for i in range(cal_workspace.getNumberHistograms()):
        grouping = cal_workspace.dataY(i)
        if grouping[0] > 0:
            n_pixel[int(grouping[0] - 1)] += 1
    correction_ws = mantid.CreateWorkspace(DataY=n_pixel, DataX=[0, 1],
                                           NSpec=self_scattering_correction.getNumberHistograms())
    self_scattering_correction = mantid.Divide(LHSWorkspace=self_scattering_correction, RHSWorkspace=correction_ws)
    mantid.ConvertToDistribution(Workspace=self_scattering_correction)
    self_scattering_correction = mantid.ConvertUnits(InputWorkspace=self_scattering_correction,
                                                     Target="MomentumTransfer", EMode='Elastic')
    common.remove_intermediate_workspace('cal_workspace_group')
    common.remove_intermediate_workspace(correction_ws)
    common.remove_intermediate_workspace(fit_spectra)
    common.remove_intermediate_workspace(monitor)
    common.remove_intermediate_workspace(raw_ws)
    return self_scattering_correction
def _determine_chopper_mode(ws):
    if ws.getRun().hasProperty('Frequency'):
        frequency = ws.getRun()['Frequency'].lastValue()
        print("No chopper mode provided")
        if frequency == 50:
            print("automatically chose Rietveld")
            return 'Rietveld', polaris_advanced_config.rietveld_focused_cropping_values
        if frequency == 0:
            print("automatically chose PDF")
            return 'PDF', polaris_advanced_config.pdf_focused_cropping_values
    else:
        raise ValueError("Chopper frequency not in log data. Please specify a chopper mode")