diff --git a/scripts/corelli/calibration/database.py b/scripts/corelli/calibration/database.py index 874c8a80e9664e5f519a064d83adffb5b32c585b..70d2aa4dbd7d6af16f82617b27e45f7d1a5331fb 100644 --- a/scripts/corelli/calibration/database.py +++ b/scripts/corelli/calibration/database.py @@ -495,6 +495,9 @@ def load_calibration_set(input_workspace: Union[str, Workspace], # the correct calibration is the one with a date previous to `run_start` filename = date_to_file[available_dates[available_dates_index - 1]] break + # final boundary case: run_start > latest available_date + if len(available_dates) > 0 and available_dates[-1] < run_start: + filename = date_to_file[available_dates[-1]] if filename is not None: logger.notice(f'Found {filename} for {str(input_workspace)} with run start {run_start}') instrument_tables[table_type] = LoadNexusProcessed(Filename=filename, diff --git a/scripts/test/corelli/calibration/test_database.py b/scripts/test/corelli/calibration/test_database.py index 721ed203d337ad52100bb566f8fa4212f2464be5..416c515bc45eb04bdfa079c248bd6e40d4cdc30d 100644 --- a/scripts/test/corelli/calibration/test_database.py +++ b/scripts/test/corelli/calibration/test_database.py @@ -6,17 +6,20 @@ # Institut Laue - Langevin & CSNS, Institute of High Energy Physics, CAS # SPDX - License - Identifier: GPL - 3.0 + -from os import path, remove -import unittest +from contextlib import contextmanager +from datetime import datetime from numpy.testing import assert_allclose +from os import path, remove import pathlib -from datetime import datetime +import shutil import tempfile +from typing import List +import unittest from mantid import AnalysisDataService, config from mantid.api import mtd, WorkspaceGroup from mantid.dataobjects import TableWorkspace -from mantid.simpleapi import CreateEmptyTableWorkspace, DeleteWorkspaces, LoadNexusProcessed +from mantid.simpleapi import CreateEmptyTableWorkspace, CreateSampleWorkspace, DeleteWorkspaces, LoadNexusProcessed from corelli.calibration.database import (combine_spatial_banks, combine_temporal_banks, day_stamp, filename_bank_table, has_valid_columns, init_corelli_table, load_bank_table, load_calibration_set, @@ -321,8 +324,93 @@ class TestCorelliDatabase(unittest.TestCase): database.cleanup() - def tearDown(self) -> None: + def test_load_calibration_set(self) -> None: + r""" + 1. create an empty "database" + 1.1. create a workspace with a particular daystamp + 1.2. try to find a file in the database + 2. create a database with only one calibration file with daystamp 20200601 + 1.1 create a workspace with the following daystamps and see in which cases the calibration file is loaded + 20200101, 20200601, 20201201 + 3. create a database with two calibration files with day-stamsp 20200401 and 20200801 + 3.1 create a workspace with the following day-stamps and see which (in any) calibration is selected + 20200101, 20200401, 20200601, 20200801, 20201201 + """ + @contextmanager + def mock_database(day_stamps: List[int]): + r"""create a database with mock calibration files""" + dir_path = tempfile.mkdtemp() + path = pathlib.Path(dir_path) + for daystamp in day_stamps: + file_path = path / f'calibration_corelli_{daystamp}.nxs.h5' + with open(str(file_path), 'w') as fp: + fp.write('mock') + try: + yield dir_path + finally: + shutil.rmtree(dir_path) + + def set_daystamp(input_workspace: str, daystamp: int): + r"""Update the run_start log entry of a workspace + :param input_workspace: handle to a workspace (not its name!) + :param daystamp: 8-digit integer + """ + x = str(daystamp) + run_start = f'{x[0:4]}-{x[4:6]}-{x[6:]}T20:54:07.265105667' + run = input_workspace.getRun() + run.addProperty(name='run_start', value=run_start, replace=True) + + workspace = CreateSampleWorkspace(OutputWorkspace='test_load_calibration_set') + set_daystamp(workspace, 20200101) + + # empty calibration database (corner case) + with mock_database([]) as database_path: + instrument_tables = load_calibration_set(workspace, database_path) + assert list(instrument_tables) == [None, None] + + # database with only one calibration file (corner case) + with mock_database([20200601]) as database_path: + set_daystamp(workspace, 20200101) # no calibration found + assert list(load_calibration_set(workspace, database_path)) == [None, None] + + set_daystamp(workspace, 20200601) + with self.assertRaises(RuntimeError) as ar: + load_calibration_set(workspace, database_path) # should pick calibration 20200601 + self.assertEqual('20200601' in str(ar.exception), True) + + set_daystamp(workspace, 20201201) + with self.assertRaises(RuntimeError) as ar: + load_calibration_set(workspace, database_path) + self.assertEqual('calibration_corelli_20200601.nxs.h5' in str(ar.exception), True) + + # database with two calibration files (general case) + with mock_database([20200401, 20200801]) as database_path: + set_daystamp(workspace, '20200101') + assert list(load_calibration_set(workspace, database_path)) == [None, None] + + set_daystamp(workspace, '20200401') + with self.assertRaises(RuntimeError) as ar: + load_calibration_set(workspace, database_path) + self.assertEqual('calibration_corelli_20200401.nxs.h5' in str(ar.exception), True) + + set_daystamp(workspace, '20200601') + with self.assertRaises(RuntimeError) as ar: + load_calibration_set(workspace, database_path) + self.assertEqual('calibration_corelli_20200401.nxs.h5' in str(ar.exception), True) + + set_daystamp(workspace, '20200801') + with self.assertRaises(RuntimeError) as ar: + load_calibration_set(workspace, database_path) + self.assertEqual('calibration_corelli_20200801.nxs.h5' in str(ar.exception), True) + + set_daystamp(workspace, '20201201') + with self.assertRaises(RuntimeError) as ar: + load_calibration_set(workspace, database_path) + self.assertEqual('calibration_corelli_20200801.nxs.h5' in str(ar.exception), True) + workspace.delete() + + def tearDown(self) -> None: date: str = datetime.now().strftime('%Y%m%d') # format YYYYMMDD remove(filename_bank_table(10, self.database_path, date)) remove(filename_bank_table(20, self.database_path, date))