Skip to content
Snippets Groups Projects
Unverified Commit 274f0af5 authored by Coleman Kendrick's avatar Coleman Kendrick Committed by GitHub
Browse files

Merge pull request #30920 from mantidproject/PD157_strain_diagnostic

Relative strain diagnostic plot ornl-next
parents 1bec11d7 af96ea8e
No related branches found
No related tags found
No related merge requests found
Showing
with 507 additions and 4 deletions
......@@ -212,3 +212,10 @@ def get_single_workspace_log_value(ws_index, *, log_values=None, matrix_ws=None,
return 0
return log_values[ws_index]
def colormap_as_plot_color(number_colors: int, colormap_name: str = 'viridis', cmap=None):
if not cmap:
cmap = cm.get_cmap(name=colormap_name)
for i in range(number_colors):
yield cmap(float(i) / number_colors)
320cdbaf7c096978c790eca8568bbfcf
......@@ -292,4 +292,68 @@ A mask can also be applied with a ``MaskWorkspace`` to hide pixels from the plot
LoadDiffCal(Filename="NOM_calibrate_d131573_2019_08_18.h5", WorkspaceName="old")
fig, ax = diagnostics.difc_plot2d("new_cal", "old_cal", instr_ws="new_group", mask="new_mask")
Relative Strain
###############
Plotting the relative strain of the d-spacing for a peak to the nominal d value (:math:`\frac{observed}{expected}`)
can be used as another method to check the calibration consistency at the pixel level. The relative strain
is plotted along the Y-axis for each detector pixel, with the mean and standard deviation reported
on the plot. A solid black line is drawn at the mean, and two dashed lines are drawn above and below
the mean by a threshold percentage (one percent of the mean by default). This can be used to determine
which pixels are bad up to a specific threshold.
Below is an example of the relative strain plot for VULCAN at peak position 1.2615:
.. figure:: /images/VULCAN_relstrain_diagnostic.png
:width: 400px
The plot shown above can be generated from the following script:
.. code::
import numpy as np
from mantid.simpleapi import (LoadEventAndCompress, LoadInstrument, PDCalibration, Rebin)
from Calibration.tofpd import diagnostics
FILENAME = 'VULCAN_192227.nxs.h5'
CALFILE = 'VULCAN_Calibration_CC_4runs_hybrid.h5'
peakpositions = np.asarray(
(0.3117, 0.3257, 0.3499, 0.3916, 0.4205, 0.4645, 0.4768, 0.4996, 0.515, 0.5441, 0.5642, 0.6307, 0.6867,
0.7283, 0.8186, 0.892, 1.0758, 1.2615, 2.06))
LoadEventAndCompress(Filename=FILENAME, OutputWorkspace='ws', FilterBadPulses=0)
LoadInstrument(Workspace='ws', InstrumentName="VULCAN", RewriteSpectraMap='True')
Rebin(InputWorkspace='ws', OutputWorkspace='ws', Params=(5000, -.002, 70000))
PDCalibration(InputWorkspace='ws', TofBinning=(5000,-.002,70000),
PeakPositions=peakpositions,
MinimumPeakHeight=5,
OutputCalibrationTable='calib',
DiagnosticWorkspaces='diag')
dspacing = diagnostics.collect_peaks('diag_dspacing', 'dspacing', donor='diag_fitted',
infotype='dspacing')
strain = diagnostics.collect_peaks('diag_dspacing', 'strain', donor='diag_fitted')
fig, ax = diagnostics.plot_peakd('strain', 1.2615, drange=(0, 200000), plot_regions=True, show_bad_cnt=True)
To plot the relative strain for multiple peaks, an array of positions can be passed instead of a single value.
For example, using ``peakpositions`` in place of ``1.2615`` in the above example results in the relative strain for
all peaks being plotted as shown below.
.. figure:: /images/VULCAN_relstrain_all.png
The vertical lines shown in the plot are drawn between detector regions and can be used to report the
count of bad pixels found in each region. The solid vertical line indicates the start of a region,
while the dashed vertical line indicates the end of a region. The vertical lines can be turned off
with ``plot_regions=False`` and displaying the number of bad counts for each region can also be disabled
with ``show_bad_cnt=False``. When ``plot_regions=False`` but ``show_bad_cnt=True``, a single count of bad
pixels over the entire range is shown at the bottom center of the plot.
As seen in the above example, the x-range of the plot can be narrowed down using the ``drange`` option,
which accepts a tuple of the starting detector ID and ending detector ID to plot.
To adjust the horizontal bars above and below the mean, a percent can be passed to the ``threshold`` option.
.. categories:: Calibration
docs/source/images/VULCAN_relstrain_all.png

236 KiB

docs/source/images/VULCAN_relstrain_diagnostic.png

48.1 KiB

......@@ -18,6 +18,7 @@ New features
- :ref:`PDCalibration <algm-PDCalibration>` now supports workspaces with grouped detectors (i.e. more than one detector per spectrum).
- New diagnostic plotting tool `Calibration.tofpd..diagnostics.plot2d` which adds markers for expected peak positions
- New diagnostic plotting tool `Calibration.tofpd.diagnostics.difc_plot2d` which plots the change in DIFC between two instrument calibrations.
- New diagnostic plotting tool `Calibration.tofpd.diagnostics.plot_peakd` which plots the d-spacing relative strain of peak positions.
Improvements
############
......
......@@ -68,7 +68,7 @@ class ColorSelector(QWidget):
self.prev_color = color
def set_color(self, color_hex):
self.line_edit.setText(color_hex)
self.line_edit.setText(convert_color_to_hex(color_hex))
def set_line_edit(self, color_hex):
self.line_edit.setText(color_hex)
......
from mantid.plots.resampling_image.samplingimage import imshow_sampling
from mantid.plots.datafunctions import get_axes_labels
from mantid.simpleapi import CalculateDIFC, LoadDiffCal, mtd
# Mantid Repository : https://github.com/mantidproject/mantid
#
# Copyright &copy; 2021 ISIS Rutherford Appleton Laboratory UKRI,
# NScD Oak Ridge National Laboratory, European Spallation Source,
# Institut Laue - Langevin & CSNS, Institute of High Energy Physics, CAS
# SPDX - License - Identifier: GPL - 3.0 +
from typing import Union
import matplotlib.pyplot as plt
from matplotlib.patches import Circle
from matplotlib.collections import PatchCollection
import matplotlib.offsetbox as pltbox
import numpy as np
from mantid.api import WorkspaceFactory
from mantid.dataobjects import TableWorkspace, Workspace2D
from mantid.plots.datafunctions import get_axes_labels
from mantid.plots.resampling_image.samplingimage import imshow_sampling
from mantid.plots.utility import colormap_as_plot_color
from mantid.simpleapi import CalculateDIFC, LoadDiffCal, mtd
# Diamond peak positions in d-space which may differ from actual sample
DIAMOND = np.asarray((2.06,1.2615,1.0758,0.892,0.8186,0.7283,0.6867,0.6307,0.5642,0.5441,0.515,0.4996,0.4768,
......@@ -74,6 +86,45 @@ def _get_difc_ws(wksp, instr_ws=None):
return difc_ws
def __get_regions(x: np.ndarray):
"""
Find the beginning and end of all sequences in x
Ex: [1, 2, 3, 5, 6, 8, 9] will return [(1, 3), (5, 6), (8, 9)]
:param x: Array of detector IDs (in ascending order)
:return: List of tuples with start,stop indices to mark detector regions
"""
regions = []
# Compute a +/- 1 shift in indices and compare them to find gaps in the sequence
lower = np.asarray((x + 1)[:-1], dtype=int)
upper = np.asarray((x - 1)[1:], dtype=int)
gaps = lower <= upper
lower, upper = lower[gaps], upper[gaps]
i = 0
# Use the start and stop of each gap to find sequential regions instead
for start, stop in zip(lower, upper):
if i == 0:
# Starting edge case - the start of x until the start of the first gap
regions.append((int(np.min(x)), int(start - 1)))
else:
# Mark the region between the end of the previous gap and the start of the next gap
regions.append((int(upper[i - 1] + 1), int(start - 1)))
i = i + 1
# Add ending region if there was at least one region found:
if i > 0:
# Ending edge case: the end of the last gap until the end of the input
regions.append((int(upper[i - 1] + 1), int(np.max(x))))
return regions
def __get_bad_counts(y, mean=None, band=0.01):
# Counts pixels in y that are outside of +/- mean*band
if not mean:
mean = np.mean(y)
top = mean + mean * band
bot = mean - mean * band
return len(y[(y > top) | (y < bot)])
def plot2d(workspace, tolerance: float=0.001, peakpositions: np.ndarray=DIAMOND,
xmin: float=np.nan, xmax: float=np.nan, horiz_markers=[]):
TOLERANCE_COLOR = 'w' # color to mark the area within tolerance
......@@ -223,3 +274,304 @@ def difc_plot2d(calib_new, calib_old=None, instr_ws=None, mask=None, vrange=(0,1
fig.show()
return fig, ax
def __calculate_strain(obs: dict, exp: np.ndarray):
"""Computes the fraction of observed d-spacing values to expected (diamond) peak positions"""
return __calculate_dspacing(obs) / exp
def __calculate_difference(obs: dict, exp: np.ndarray):
"""Calculate the difference between d-spacing values and expected d-spacing values"""
return np.abs(__calculate_dspacing(obs) - exp) / exp
def __calculate_dspacing(obs: dict):
"""Extract the dspacing values from a row in output dspace table workspace from PDCalibration"""
# Trim the table row since this is from the output of PDCalibration, so remove detid, chisq and normchisq columns
return np.asarray(list(obs.values())[1:-2])
def __create_outputws(donor: Union[str, Workspace2D], numSpec, numPeaks):
'''The resulting workspace needs to be added to the ADS'''
# convert the d-space table to a Workspace2d
if donor:
donor = mtd[str(donor)]
else:
donor = 'Workspace2D'
output = WorkspaceFactory.create(donor, NVectors=numSpec,
XLength=numPeaks, YLength=numPeaks)
output.getAxis(0).setUnit('dSpacing')
return output
def collect_peaks(wksp: Union[str, TableWorkspace], outputname: str, donor: Union[str, Workspace2D] = None,
infotype: str = 'strain'):
"""
Calculate different types of information for each peak position in wksp and create a new Workspace2D
where each column is a peak position and each row is the info for each workspace index in donor
:param wksp: Input TableWorkspace (i.e, dspacing table from PDCalibration)
:param outputname: Name of the Workspace2D to create with the information
:param donor: Name of the donor Workspace2D, used to create the shape of the output
:param infotype: 'strain', 'difference', or 'dpsacing' to specify which kind of data to output
:return: Created Workspace2D with fractional difference, relative difference, or dspacing values
"""
if infotype not in ['strain', 'difference', 'dspacing']:
raise ValueError('Do not know how to calculate "{}"'.format(infotype))
wksp = mtd[str(wksp)]
numSpec = int(wksp.rowCount())
peak_names = [item for item in wksp.getColumnNames()
if item not in ['detid', 'chisq', 'normchisq']]
peaks = np.asarray([float(item[1:]) for item in peak_names])
numPeaks = len(peaks)
# convert the d-space table to a Workspace2d
output = __create_outputws(donor, numSpec, numPeaks)
for i in range(numSpec): # TODO get the detID correct
output.setX(i, peaks)
if infotype == 'strain':
output.setY(i, __calculate_strain(wksp.row(i), peaks))
elif infotype == 'difference':
output.setY(i, __calculate_difference(wksp.row(i), peaks))
elif infotype == 'dspacing':
output.setY(i, __calculate_dspacing(wksp.row(i)))
else:
raise ValueError(f'Do not know how to calculate {infotype}')
# add the workspace to the AnalysisDataService
mtd.addOrReplace(outputname, output)
return mtd[outputname]
def collect_fit_result(wksp: Union[str, TableWorkspace], outputname: str, peaks, donor: Union[str, Workspace2D] = None,
infotype: str = 'centre', chisq_max: float = 1.e4):
"""
Extracts different information about fit results from a TableWorkspace and places into a Workspace2D
This assumes that the input is sorted by wsindex then peakindex
:param wksp: Input TableWorkspace from PDCalibration or FitPeaks (should have infotype as a column)
:param outputname: Name of output Workspace2D created
:param peaks: Array of peak positions
:param donor: Optional Workspace2D to use to determine output size
:param infotype: Type of fit information to extract ("centre", "width", "height", or "intensity")
:param chisq_max: Max chisq value that should be included in output, data gets set to nan if above this
:return: Created Workspace2D with infotype extracted from wksp
"""
KNOWN_COLUMNS = ['centre', 'width', 'height', 'intensity']
if infotype not in KNOWN_COLUMNS:
raise ValueError(f'Do not know how to extract "{infotype}"')
wksp = mtd[str(wksp)]
for name in KNOWN_COLUMNS + ['wsindex', 'peakindex', 'chi2']:
if name not in wksp.getColumnNames():
raise RuntimeError('did not find column "{}" in workspace "{}"'.format(name, str(wksp)))
wsindex = np.asarray(wksp.column('wsindex'))
wsindex_unique = np.unique(wsindex)
chi2 = np.asarray(wksp.column('chi2'))
observable = np.asarray(wksp.column(infotype))
# set values to nan where the chisq is too high
observable[chi2 > chisq_max] = np.nan
# convert the numpy arrays to a Workspace2d
numPeaks = len(peaks)
if donor:
numSpec = mtd[str(donor)].getNumberHistograms()
else:
numSpec = len(wsindex_unique)
output = __create_outputws(donor, numSpec, numPeaks)
for i in range(len(wsindex_unique)):
start = int(np.searchsorted(wsindex, wsindex_unique[i]))
i = int(i) # to be compliant with mantid API
output.setX(i, peaks)
output.setY(i, observable[start:start+numPeaks])
mtd.addOrReplace(outputname, output)
return mtd[outputname]
def extract_peak_info(wksp: Union[str, Workspace2D], outputname: str, peak_position: float):
"""
Extract information about a single peak from a Workspace2D. The input workspace is expected to have
common x-axis of observed d-spacing. The y-values and errors are extracted.
The output workspace will be a single spectra with the x-axis being the detector-id. The y-values
and errors are extracted from the input workspace.
"""
# confirm that the input is a workspace pointer
wksp = mtd[str(wksp)]
numSpec = wksp.getNumberHistograms()
# get the index into the x/y arrays of the peak position
peak_index = wksp.readX(0).searchsorted(peak_position)
# create a workspace to put the result into
single = WorkspaceFactory.create('Workspace2D', NVectors=1,
XLength=wksp.getNumberHistograms(),
YLength=wksp.getNumberHistograms())
single.setTitle('d-spacing={}\\A'.format(wksp.readX(0)[peak_index]))
# get a handle to map the detector positions
detids = wksp.detectorInfo().detectorIDs()
have_detids = bool(len(detids) > 0)
# fill in the data values
x = single.dataX(0)
y = single.dataY(0)
e = single.dataE(0)
start_detid = np.searchsorted(detids, 0)
for wksp_index in range(numSpec):
if have_detids:
x[wksp_index] = detids[start_detid + wksp_index]
else:
x[wksp_index] = wksp_index
y[wksp_index] = wksp.readY(wksp_index)[peak_index]
e[wksp_index] = wksp.readE(wksp_index)[peak_index]
# add the workspace to the AnalysisDataService
mtd.addOrReplace(outputname, single)
return mtd[outputname]
# flake8: noqa: C901
def plot_peakd(wksp: Union[str, Workspace2D], peak_positions: Union[float, list], plot_regions=True, show_bad_cnt=True,
drange=(0, 0), threshold=0.01):
"""
Plots peak d spacing value for each peak position in peaks
:param wksp: Workspace returned from collect_peaks
:param peak_positions: List of peak positions, or single peak position
:param plot_regions: Whether to show vertical lines indicating detector regions
:param show_bad_cnt: Whether to display number of pixels that fall outside of threshold from mean
:param drange: A tuple of the minimum and maximum detector ID to plot, plots full range if (0,0) or out of bounds
:param threshold: The fraction of the mean to display horizontal bars at +/- the mean
:return: plot, plot axes
"""
ylabel = "rel strain"
peaks = peak_positions
if isinstance(peak_positions, float):
peaks = [peak_positions]
ylabel += " (peak {})".format(peaks[0])
if len(peaks) == 0:
raise ValueError("Expected one or more peak positions")
if not mtd.doesExist(str(wksp)):
raise ValueError("Could not find provided workspace in ADS")
fig, ax = plt.subplots()
ax.set_xlabel("detector IDs")
ax.set_ylabel(ylabel)
# Hold the mean and stddev for each peak to compute total at end
means = []
stddevs = []
lens = []
ax.set_prop_cycle(color=colormap_as_plot_color(len(peaks), cmap=plt.get_cmap("jet")))
regions = []
plotted_peaks = []
# Use full detector range if not specified
max_detid = int(np.max(mtd[str(wksp)].detectorInfo().detectorIDs()))
if drange == (0, 0) or drange > (0, max_detid):
drange = (0, max_detid)
print("Specified drange out of bounds, using {}".format(drange))
# Plot data for each peak position
for peak in peaks:
print("Processing peak position {}".format(peak))
single = extract_peak_info(wksp, 'single', peak)
# get x and y arrays from single peak ws
x = single.dataX(0)
y = single.dataY(0)
# filter out any nans
y_val = y[~np.isnan(y)]
try:
dstart = single.yIndexOfX(drange[0])
except ValueError:
# If detector id not valid, find next closest range
dstart = single.yIndexOfX(int(x.searchsorted(drange[0])) - 1)
print("Specified starting detector range was not valid, adjusted to detID={}".format(dstart))
try:
dend = single.yIndexOfX(drange[1])
except ValueError:
dend = single.yIndexOfX(int(x.searchsorted(drange[1])))
print("Specified ending detector range was not valid, adjusted to detID={}".format(dend))
if dstart >= dend:
raise RuntimeError("Detector start region ({}) must be less than the end region ({})".format(dstart, dend))
cut_id = (dstart, dend)
# skip if y was entirely nans or detector slice yields empty region
if len(y_val) == 0 or len(y_val[cut_id[0]:cut_id[1]]) == 0:
print("No valid y-data was found for peak {}, so it will be skipped".format(peak))
continue
lens.append(len(y_val[cut_id[0]:cut_id[1]]))
means.append(np.mean(y_val[cut_id[0]:cut_id[1]]))
stddevs.append(np.std(y_val[cut_id[0]:cut_id[1]]))
# Draw vertical lines between detector regions
if not regions:
regions = __get_regions(x[cut_id[0]:cut_id[1]])
if plot_regions:
for region in regions:
ax.axvline(x=region[0], lw=0.5, color="black")
ax.axvline(x=region[1], lw=0.5, color="black", ls="--")
plotted_peaks.append(peak)
ax.plot(x[cut_id[0]:cut_id[1]], y[cut_id[0]:cut_id[1]], marker="x", linestyle="None",
label="{:0.6f}".format(peak))
ax.legend(bbox_to_anchor=(1, 1), loc="upper left")
# If every peak had nans, raise error
if len(means) == 0 or len(stddevs) == 0:
raise RuntimeError("No valid peak data was found for provided peak positions")
# Calculate total mean and stddev of all peaks
total_mean = np.sum(np.asarray(means) * np.asarray(lens)) / np.sum(lens)
sq_sum = np.sum(lens * (np.power(stddevs, 2) + np.power(means, 2)))
total_stddev = np.sqrt((sq_sum / np.sum(lens)) - total_mean ** 2)
# Get bad counts in regions over all peaks using the total mean
if show_bad_cnt and regions:
region_cnts = [0] * len(regions)
for peak in plotted_peaks:
single = extract_peak_info(wksp, 'single', peak)
y = single.dataY(0)
for i in range(len(regions)):
det_start = single.yIndexOfX(regions[i][0])
det_end = single.yIndexOfX(regions[i][1])
region_cnts[i] += __get_bad_counts(y[det_start:det_end], mean=total_mean, band=threshold)
# Display the bad counts for each region, or the overall count
if plot_regions:
for i in range(len(regions)):
mid_region = regions[i][0] + 0.5 * (regions[i][1] - regions[i][0])
ax.annotate("{}".format(region_cnts[i]), xy=(mid_region, 0.05), xycoords=('data', 'axes fraction'),
clip_on=True, ha="center")
else:
overall_cnt = int(np.sum(region_cnts))
ax.annotate("{}".format(overall_cnt), xy=(0.5, 0.05), xycoords=('axes fraction', 'axes fraction'),
clip_on=True, ha="center")
# Draw solid line at mean
ax.axhline(total_mean, color="black", lw=2.5) # default lw=1.5
# Upper and lower lines calibration lines (1 percent of mean)
band = total_mean * threshold
ax.axhline(total_mean + band, color="black", ls="--")
ax.axhline(total_mean - band, color="black", ls="--")
# Add mean and stddev text annotations
stat_str = "Mean = {:0.6f} Stdev = {:1.5e}".format(total_mean, total_stddev)
plt_text = pltbox.AnchoredText(stat_str, loc="upper center", frameon=True)
ax.add_artist(plt_text)
plt.show()
return fig, ax
......@@ -6,6 +6,8 @@ set(TEST_PY_FILES
test_tube_calib.py
)
add_subdirectory(tofpd)
check_tests_valid(${CMAKE_CURRENT_SOURCE_DIR} ${TEST_PY_FILES})
pyunittest_add_test(${CMAKE_CURRENT_SOURCE_DIR} python.Calibration
......
# Unit tests for Calibration.tofpd
set(TEST_PY_FILES
test_diagnostics.py
)
check_tests_valid(${CMAKE_CURRENT_SOURCE_DIR} ${TEST_PY_FILES})
pyunittest_add_test(${CMAKE_CURRENT_SOURCE_DIR} python.Calibration.tofpd
${TEST_PY_FILES})
# Mantid Repository : https://github.com/mantidproject/mantid
#
# Copyright &copy; 2021 ISIS Rutherford Appleton Laboratory UKRI,
# NScD Oak Ridge National Laboratory, European Spallation Source,
# Institut Laue - Langevin & CSNS, Institute of High Energy Physics, CAS
# SPDX - License - Identifier: GPL - 3.0 +
# Mantid Repository : https://github.com/mantidproject/mantid
#
# Copyright &copy; 2021 ISIS Rutherford Appleton Laboratory UKRI,
# NScD Oak Ridge National Laboratory, European Spallation Source,
# Institut Laue - Langevin & CSNS, Institute of High Energy Physics, CAS
# SPDX - License - Identifier: GPL - 3.0 +
import unittest
from mantid.simpleapi import CompareWorkspaces, DeleteWorkspaces, LoadNexusProcessed, UnGroupWorkspace
from Calibration.tofpd import diagnostics
class TestDiagnostics(unittest.TestCase):
PEAK=1.2615
@classmethod
def setUpClass(cls):
LoadNexusProcessed(Filename="VULCAN_192227_diagnostics.nxs", OutputWorkspace="diagtest")
UnGroupWorkspace("diagtest")
cls.workspaces = ["diag_dspacing", "diag_fitted", "diag_fitparam", "strain", "single_strain", "difference",
"single_diff", "center_tof"]
@classmethod
def tearDownClass(cls) -> None:
if len(cls.workspaces) > 0:
DeleteWorkspaces(cls.workspaces)
def test_collect_peaks_strain(self):
test_strain = diagnostics.collect_peaks('diag_dspacing', 'test_strain', infotype='strain')
result = CompareWorkspaces(test_strain, "strain", Tolerance=1e-6)
self.assertTrue(result)
DeleteWorkspaces(test_strain, result)
def test_collect_peaks_diff(self):
test_diff = diagnostics.collect_peaks('diag_dspacing', 'test_diff', infotype='difference')
result = CompareWorkspaces(test_diff, "difference", Tolerance=1e-6)
self.assertTrue(result)
DeleteWorkspaces(test_diff, result)
def test_extract_peak_info(self):
test_single_strain = diagnostics.extract_peak_info('strain', 'test_single_strain', self.PEAK)
result = CompareWorkspaces(test_single_strain, "single_strain", Tolerance=1e-6)
self.assertTrue(result)
DeleteWorkspaces(test_single_strain, result)
test_single_diff = diagnostics.extract_peak_info('difference', 'test_single_diff', self.PEAK)
result = CompareWorkspaces(test_single_diff, "single_diff", Tolerance=1e-6)
self.assertTrue(result)
DeleteWorkspaces(test_single_diff, result)
def test_collect_fit_result(self):
test_center = diagnostics.collect_fit_result('diag_fitparam', 'test_center', [self.PEAK], infotype='centre')
result = CompareWorkspaces(test_center, "center_tof", Tolerance=1e-6)
self.assertTrue(result)
DeleteWorkspaces(test_center, result)
if __name__ == '__main__':
unittest.main()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment