Skip to content
Snippets Groups Projects
load_utils_test.py 7.72 KiB
Newer Older
# Mantid Repository : https://github.com/mantidproject/mantid
#
# Copyright © 2018 ISIS Rutherford Appleton Laboratory UKRI,
#     NScD Oak Ridge National Laboratory, European Spallation Source
#     & Institut Laue - Langevin
# SPDX - License - Identifier: GPL - 3.0 +
from mantid.py3compat import mock

from Muon.GUI.ElementalAnalysis.LoadWidget import load_utils as lutils

import mantid.simpleapi as mantid
from mantid import config
import numpy as np
from six import iteritems


class LoadUtilsTest(unittest.TestCase):
    def setUp(self):
        self.test_path = r"test\path\to\ral012345.rooth2020.dat"
        self.bad_path = r"test\path\to\ral012345.rooth2042"
        self.test_run = 5
        self.var_ws_name = "{}_Delayed_{}"
        self.test_ws_name = self.var_ws_name.format(1, self.test_run)
        self.test_ws_names = [self.var_ws_name.format(i, self.test_run) for i in range(1, 9)]

    def test_pad_run(self):
        tests = {123: "00123", 0: "00000", 12345: "12345", 123456: "123456"}
        for run, padded_run in iteritems(tests):
            self.assertEqual(lutils.pad_run(run), padded_run)
    @mock.patch('Muon.GUI.ElementalAnalysis.LoadWidget.load_utils.glob.iglob')
    @mock.patch('Muon.GUI.ElementalAnalysis.LoadWidget.load_utils.os.path.join')
    def test_that_search_user_dirs_explores_all_directories(self, mock_join, mock_iglob):
        expected_len = len(config["datasearch.directories"].split(";"))
        lutils.search_user_dirs(self.test_run)

        self.assertEqual(mock_join.call_count, expected_len)

    def test_get_detector_num_from_ws(self):
        self.assertEquals(lutils.get_detector_num_from_ws(self.test_ws_name), "1")

    def test_get_detectors_num(self):
        self.assertEqual(lutils.get_detectors_num(self.test_path), "1")
    def test_get_detectors_num_throws_with_bad_input(self):
        input = ['only.one_separator', 'one.much_longer32123.three']
        for value in input:
            with self.assertRaises(Exception):
                lutils.get_detectors_num(value)

    def test_get_end_num(self):
        self.assertEqual(lutils.get_end_num(self.test_path), "rooth2020")
        self.assertEqual(lutils.get_run_type(self.test_path), "Delayed")
        self.assertEqual(lutils.get_run_type('a.b10.c'), "Prompt")
        self.assertEqual(lutils.get_run_type('a.b99.c'), "Total")
        with self.assertRaises(KeyError):
            lutils.get_run_type(self.bad_path)

    def test_get_filename(self):
        self.assertEquals(lutils.get_filename(self.test_path, self.test_run), self.test_ws_name)
    def test_get_filename_returns_none_if_given_bad_path(self):
        self.assertEqual(lutils.get_filename(self.bad_path, self.test_run), None)

    def test_hyphenise(self):
        tests = {"1-5": [1, 2, 3, 4, 5], "1, 4-5": [1, 4, 5], "1-3, 5": [1, 3, 2, 5], "1, 3, 5": [1, 5, 3]}
        for out, arg in iteritems(tests):
            self.assertEqual(lutils.hyphenise(arg), out)
    def test_merge_workspaces(self):
        expected_output, workspaces = [], []
        detectors = range(1, 5)
        for detector in detectors:
            workspace = self.var_ws_name.format(detector, self.test_run)
            workspaces.append(workspace)
            mantid.CreateSampleWorkspace(OutputWorkspace=workspace)
            expected_output.append("{}; Detector {}".format(self.test_run, detector))

        self.assertEquals(lutils.merge_workspaces(self.test_run, workspaces), sorted(expected_output))
        # check call works with empty workspace list
        self.assertEqual(lutils.merge_workspaces(self.test_run, []), sorted(expected_output))

    def test_create_merged_workspace(self):
        workspace_list = []
Stephen's avatar
Stephen committed
        num_workspaces = lutils.num_files_per_detector
        workspace_names = []
        num_bins = 100
        X_data = np.linspace(0, 400, num_bins)
        # create data in each workspace based on y = mx + specNumber
        m = 0.1
        Yfunc = lambda x, specNo: m * x + specNo
        Efunc = lambda x, specNo: 2 * m * x + specNo
Stephen's avatar
Stephen committed
        for i in range(0, num_workspaces):
            name = "test_" + str(i)
            workspace_names.append(name)
            ws = mantid.WorkspaceFactory.create("Workspace2D", NVectors=1,
                                                XLength=num_bins, YLength=num_bins)
            mantid.mtd.add(name, ws)
            Y_data = Yfunc(X_data, i)
            E_data = Efunc(X_data, i)
            ws.setY(0, Y_data)
            ws.setX(0, X_data)
            ws.setE(0, E_data)
            workspace_list.append(ws.name())

        merged_ws = lutils.create_merged_workspace(workspace_list)
        # check number of bins, and number of histograms is correct
        self.assertEqual(merged_ws.getNumberHistograms(), num_workspaces)
        self.assertEqual(merged_ws.blocksize(), num_bins)
        # check that data has been copied over correctly into the new merged workspace
        for i in range(0, num_workspaces):
            self.assertTrue(np.array_equal(merged_ws.readX(i), X_data))
            self.assertTrue(np.array_equal(merged_ws.readY(i), Yfunc(X_data, i)))
            self.assertTrue(np.array_equal(merged_ws.readE(i), Efunc(X_data, i)))
Stephen's avatar
Stephen committed
    def test_merge_workspaces_returns_correctly_if_delayed_data_missing(self):
        num_files_per_detector = lutils.num_files_per_detector
        workspace_list = [None] * num_files_per_detector
        num_bins = 100
        X_data = np.linspace(0, 400, num_bins)
Stephen's avatar
Stephen committed
        names = ["Total", "Prompt"]
        # create data in each workspace based on y = mx + specNumber
        Yfunc = lambda x, specNo: 0.1 * x + specNo
        Efunc = lambda x, specNo: 2 * 0.1 * x + specNo
        for i in range(0, 2):
            name = names[i]
            ws = mantid.WorkspaceFactory.create("Workspace2D", NVectors=1,
                                                XLength=num_bins, YLength=num_bins)
            mantid.mtd.add(name, ws)
            Y_data = Yfunc(X_data, i)
            E_data = Efunc(X_data, i)
            ws.setY(0, Y_data)
            ws.setX(0, X_data)
            ws.setE(0, E_data)
            input_index = lutils.spectrum_index[name] - 1
            workspace_list[input_index] = name

        merged_ws = lutils.create_merged_workspace(workspace_list)
        self.assertEqual(merged_ws.getNumberHistograms(), num_files_per_detector)
        # check that total and prompt data were copied to the correct place
        for i in range(0, 2):
            name = names[i]
            input_index = lutils.spectrum_index[name] - 1
            self.assertTrue(np.array_equal(merged_ws.readX(input_index), X_data))
            self.assertTrue(np.array_equal(merged_ws.readY(input_index), Yfunc(X_data, i)))
            self.assertTrue(np.array_equal(merged_ws.readE(input_index), Efunc(X_data, i)))
        # check that the y data for delayed response is all zeros
Stephen's avatar
Stephen committed
        self.assertTrue(not np.any(merged_ws.readY(lutils.spectrum_index["Delayed"]-1)))

    def test_flatten_run_data(self):
        test_workspaces = [mantid.CreateSampleWorkspace(OutputWorkspace=name) for name in self.test_ws_names]
        workspaces = []
        for i in range(0, len(test_workspaces), 2):
            mantid.GroupWorkspaces(test_workspaces[i:i + 2], OutputWorkspace=name)
            workspaces.append(name)

        self.assertEquals(lutils.flatten_run_data(workspaces), [self.test_ws_names])
    def test_replace_workspace_name_suffix(self):
        tests = {self.test_ws_name: "suffix", "_".join([self.test_ws_name, "test"]): "suffix"}

        for workspace_name, suffix in iteritems(tests):
            self.assertEquals(lutils.replace_workspace_name_suffix(workspace_name, suffix),
                              self.var_ws_name.format(1, suffix))

if __name__ == "__main__":
    unittest.main()