Skip to content
Snippets Groups Projects
common.py 15.3 KiB
Newer Older
from __future__ import (absolute_import, division, print_function)

import mantid.kernel as kernel
import mantid.simpleapi as mantid
from isis_powder.routines.common_enums import InputBatchingEnum
def crop_banks_in_tof(bank_list, crop_values_list):
    """
    Crops the each bank by the specified tuple values from a list of tuples in TOF. The number
    of tuples must match the number of banks to crop. A list of [(100,200), (150,250)] would crop
    bank 1 to the values 100, 200 and bank 2 to 150 and 250 in TOF.
    :param bank_list: The list of workspaces each containing one bank of data to crop
    :param crop_values_list: The cropping values to apply to each bank
    :return: A list of cropped workspaces
    """
    if not isinstance(crop_values_list, list):
        raise ValueError("The cropping values were not in a list type")
    if len(bank_list) != len(crop_values_list):
        raise RuntimeError("The number of TOF cropping values does not match the number of banks for this instrument")
    output_list = []
    for spectra, cropping_values in zip(bank_list, crop_values_list):
        output_list.append(crop_in_tof(ws_to_crop=spectra, x_min=cropping_values[0], x_max=cropping_values[-1]))

    return output_list


def crop_in_tof(ws_to_crop, x_min=None, x_max=None):
    """
    Crops workspaces to the specified minimum and maximum values in TOF. If x_min or x_max is
    not specified the lower/upper end of the workspace is not modified
    :param ws_to_crop: The workspace to apply the cropping. This can either be a list of single workspace.
    :param x_min: (Optional) The minimum value in TOF to crop to
    :param x_max: (Optional) The maximum value in TOF to crop to
    :return: The cropped workspace
    """
            cropped_ws.append(_crop_single_ws_in_tof(ws, x_max=x_max, x_min=x_min))
    else:
        cropped_ws = _crop_single_ws_in_tof(ws_to_crop, x_max=x_max, x_min=x_min)
def dictionary_key_helper(dictionary, key, throws=True, exception_msg=None):
    """
    Checks if the key is in the dictionary and performs various different actions if it is not depending on
    the user parameters. If set to not throw it will return none. Otherwise it will throw a custom user message
    or the default python exception depending on what the user has specified.
    :param dictionary: The dictionary to search for the key
    :param key: The key to search for in the dictionary
    :param throws: (Optional) Defaults to true, whether this should throw on a key not being present
    :param exception_msg: (Optional) The error message to print in the KeyError instead of the default Python message
    :return: The key if it was found, None if throws was set to false and the key was not found.
    """
    if key in dictionary:
        return dictionary[key]
    elif not throws:
        return None
    elif exception_msg:
        # Print user specified message
        raise KeyError(exception_msg)
    else:
        # Raise default python key error:
        this_throws = dictionary[key]
        return this_throws  # Never gets this far just makes linters happy


def extract_ws_spectra(ws_to_split):
    """
    Extracts individual spectra from the workspace into a list of workspaces. Each workspace will contain
    one of the spectra from the workspace and will have the form of "<ws_name>_<spectrum number>".
    :param ws_to_split: The workspace to split into individual workspaces
    :return: A list of extracted workspaces - one per spectra in the original workspace
    """
    num_spectra = ws_to_split.getNumberHistograms()
    for i in range(0, num_spectra):
        output_name = ws_to_split.getName() + "-" + str(i + 1)
        # Have to use crop workspace as extract single spectrum struggles with the variable bin widths
        spectra_bank_list.append(mantid.CropWorkspace(InputWorkspace=ws_to_split, OutputWorkspace=output_name,
                                                      StartWorkspaceIndex=i, EndWorkspaceIndex=i))
    return spectra_bank_list


def extract_and_crop_spectra(focused_ws, instrument):
    """
    Extracts the individual spectra from a focused workspace (see extract_ws_spectra) then applies
    the user specified cropping. This is the smallest cropping window for use on focused data that
    is applied on a bank by bank basis. It then returns the extracted and cropped workspaces as a list.
    :param focused_ws: The focused workspace to extract and crop the spectra of
    :param instrument: The instrument object associated to this workspace
    :return: The extracted and cropped workspaces as a list
    """
    ws_spectra = extract_ws_spectra(ws_to_split=focused_ws)
    ws_spectra = instrument._crop_banks_to_user_tof(ws_spectra)
def generate_run_numbers(run_number_string):
    """
    Generates a list of run numbers as a list from the input. This input can be either a string or int type
    and uses the same syntax that Mantid supports i.e. 1-10 generates 1,2,3...9,10 inclusive and commas can specify
    breaks between runs
    :param run_number_string: The string or int to convert into a list of run numbers
    :return: A list of run numbers generated from the string
    """
    # Check its not a single run
    if isinstance(run_number_string, int) or run_number_string.isdigit():
        return [int(run_number_string)]  # Cast into a list and return

    # If its a string we must parse it
    run_number_string = run_number_string.strip()
    run_boundaries = run_number_string.replace('_', '-')  # Accept either _ or - delimiters
    run_list = _run_number_generator(processed_string=run_boundaries)
    return run_list
def get_monitor_ws(ws_to_process, run_number_string, instrument):
    """
    Extracts the monitor spectrum into its own individual workspaces from the input workspace
    based on the number of this spectrum in the instrument object. The run number is used
    to determine the monitor spectrum number on instruments who potentially have differing
    monitor numbers depending on the age of the run.
    :param ws_to_process: The workspace to extract the monitor from
    :param run_number_string: The run number as a string to determine the correct monitor position
    :param instrument: The instrument to query for the monitor position
    :return: The extracted monitor as a workspace
    """
    number_list = generate_run_numbers(run_number_string)
    monitor_spectra = instrument._get_monitor_spectra_index(number_list[0])
    load_monitor_ws = mantid.ExtractSingleSpectrum(InputWorkspace=ws_to_process, WorkspaceIndex=monitor_spectra)
def load_current_normalised_ws_list(run_number_string, instrument, input_batching=None):
    """
    Loads a workspace using Mantid and then performs current normalisation on it. Additionally it will either
    load a range of runs individually or summed depending on the user specified behaviour queried from the instrument.
    This can behaviour can be overridden by using the optional parameter input_batching. For
    example if the caller must have the workspaces summed regardless of user selection. The input_batching must be
    from common_enums.InputBatchingEnum
    :param run_number_string: The run number string to turn into a list of run(s) to load
    :param instrument: The instrument to query for the behaviour regarding summing workspaces
    :param input_batching: (Optional) Used to override the user specified choice where a specific batching is required
    :return: The normalised workspace(s) as a list.
    """
    if not input_batching:
        input_batching = instrument._get_input_batching_mode()

    run_information = instrument._get_run_details(run_number_string=run_number_string)
    raw_ws_list = _load_raw_files(run_number_string=run_number_string, instrument=instrument)

    if input_batching.lower() == InputBatchingEnum.Summed.lower() and len(raw_ws_list) > 1:
        summed_ws = _sum_ws_range(ws_list=raw_ws_list)
        remove_intermediate_workspace(raw_ws_list)
        raw_ws_list = [summed_ws]
    normalised_ws_list = _normalise_workspaces(ws_list=raw_ws_list, run_details=run_information,
                                               instrument=instrument)
    return normalised_ws_list
def remove_intermediate_workspace(workspaces):
    """
    Removes the specified workspace(s) from the ADS. Can accept lists of workspaces. It
    does not perform any checks whether the ADS contains those workspaces and it is up
    to the caller to ensure this is a safe operation
    :param workspaces: The workspace(s) either individual or a list of to remove from the ADS
    :return: None
    """
    if isinstance(workspaces, list):
        for ws in workspaces:
            mantid.DeleteWorkspace(ws)
    else:
        mantid.DeleteWorkspace(workspaces)
def spline_vanadium_for_focusing(focused_vanadium_spectra, num_splines):
    """
    Splines a list of workspaces in TOF and returns the splines in new workspaces in a
    list of said splined workspaces. The input workspaces should have any Bragg peaks
    masked before performing this step.
    :param focused_vanadium_spectra: The workspaces to spline as a list
    :param num_splines: The coefficient to use within SplineBackground
    :return: A list of splined workspaces
    """
    for bank_index, ws in enumerate(focused_vanadium_spectra):
        out_name = "spline_bank_" + str(bank_index + 1)
        tof_ws_list.append(mantid.ConvertUnits(InputWorkspace=ws, Target="TOF", OutputWorkspace=out_name))

    splined_ws_list = []
    for ws in tof_ws_list:
        splined_ws_list.append(mantid.SplineBackground(InputWorkspace=ws, OutputWorkspace=ws, NCoeff=num_splines))

    return splined_ws_list


def subtract_sample_empty(ws_to_correct, empty_sample_ws_string, instrument):
    """
    Loads the list of empty runs specified by the empty_sample_ws_string and subtracts
    them from the workspace specified. Returns the subtracted workspace.
    :param ws_to_correct: The workspace to subtract the empty instrument runs from
    :param empty_sample_ws_string: The empty run numbers to subtract from the workspace
    :param instrument: The instrument object these runs belong to
    :return: The workspace with the empty runs subtracted
    """
    if empty_sample_ws_string:
        empty_sample = load_current_normalised_ws_list(run_number_string=empty_sample_ws_string, instrument=instrument,
                                                       input_batching=InputBatchingEnum.Summed)
        mantid.Minus(LHSWorkspace=ws_to_correct, RHSWorkspace=empty_sample[0], OutputWorkspace=ws_to_correct)
        remove_intermediate_workspace(empty_sample)
def _crop_single_ws_in_tof(ws_to_rebin, x_max, x_min):
    """
    Implementation of cropping the single workspace in TOF. First converts to TOF, crops then converts
    back to the original unit.
    :param ws_to_rebin: The workspace to crop
    :param x_max: (Optional) The minimum TOF values to crop to
    :param x_min: (Optional) The maximum TOF values to crop to
    :return: The cropped workspace with the original units
    """
    previous_units = ws_to_rebin.getAxis(0).getUnit().unitID()
    if previous_units != "TOF":
        ws_to_rebin = mantid.ConvertUnits(InputWorkspace=ws_to_rebin, Target="TOF", OutputWorkspace=ws_to_rebin)
    cropped_ws = mantid.CropWorkspace(InputWorkspace=ws_to_rebin, OutputWorkspace=ws_to_rebin,
                                      XMin=x_min, XMax=x_max)
    if previous_units != "TOF":
        cropped_ws = mantid.ConvertUnits(InputWorkspace=cropped_ws, Target=previous_units, OutputWorkspace=cropped_ws)
def _normalise_workspaces(ws_list, instrument, run_details):
    """
    Normalises the workspace list by current by calling the instrument implementation
    :param ws_list: A list of workspace to perform normalisation on
    :param instrument: The instrument these runs belong to
    :param run_details: The run details object associated to this run
    :return: The list of workspaces normalised by current.
    """
    output_list = []
    for ws in ws_list:
        output_list.append(instrument._normalise_ws_current(ws_to_correct=ws, run_details=run_details))
def _check_load_range(list_of_runs_to_load):
    """
    Checks the length of the list of runs which are about to be loaded to determine
    if they are larger than a threshold. Currently this is 1000 as it allows
    users to automatically process all runs within a cycle but detects if they
    have missed a digit which is usually 10,000 runs.
    :param list_of_runs_to_load: The generated run numbers to load as a list
    :return: None - If the list is greater than the specified maximum a ValueError is raised.
    """
    maximum_range_len = 1000  # If more than this number of runs is entered probably wrong
    if len(list_of_runs_to_load) > maximum_range_len:
        raise ValueError("More than " + str(maximum_range_len) + " runs were selected."
                         " Found " + str(len(list_of_runs_to_load)) + " Aborting.")


def _load_raw_files(run_number_string, instrument):
    """
    Uses the run number string to generate a list of run numbers to load in
    :param run_number_string: The run number string to generate
    :param instrument: The instrument to generate the prefix filename for these runs
    :return: A list of loaded workspaces
    """
    run_number_list = generate_run_numbers(run_number_string=run_number_string)
    load_raw_ws = _load_list_of_files(run_number_list, instrument)
def _load_list_of_files(run_numbers_list, instrument):
    """
    Loads files based on the list passed to it. If the list is
    greater than the maximum range it will raise an exception
    see _check_load_range for more details
    :param run_numbers_list: The list of runs to load
    :param instrument: The instrument to generate the prefix for
    :return: The loaded workspaces as a list
    """
    _check_load_range(list_of_runs_to_load=run_numbers_list)

    for run_number in run_numbers_list:
        file_name = instrument._generate_input_file_name(run_number=run_number)
        read_ws = mantid.Load(Filename=file_name)
        read_ws_list.append(mantid.RenameWorkspace(InputWorkspace=read_ws, OutputWorkspace=file_name))
    return read_ws_list


def _sum_ws_range(ws_list):
    """
    Sums a list of workspaces into a single workspace. This will take the name
    of the first and last workspaces in the list and take the form: "summed_<first>_<last>"
    :param ws_list: The workspaces as a list to sum into a single workspace
    :return: A single summed workspace
    """
    out_ws_name = "summed_" + ws_list[0].name() + '_' + ws_list[-1].name()
    summed_ws = mantid.MergeRuns(InputWorkspaces=ws_list, OutputWorkspace=out_ws_name)
def _run_number_generator(processed_string):
    """
    Uses Mantid to generate a list of run numbers from a string input
    :param processed_string: The string representation of run numbers to convert into a list
    :return: A list of run numbers based on the input string
    """
    try:
        number_generator = kernel.IntArrayProperty('array_generator', processed_string)
        return number_generator.value.tolist()
    except RuntimeError:
        raise RuntimeError("Could not generate run numbers from this input: " + processed_string)