Skip to content
Snippets Groups Projects
Unverified Commit b92fbd8e authored by Pete Peterson's avatar Pete Peterson Committed by GitHub
Browse files

Merge pull request #23099 from mantidproject/mr_peak_fitting

Improve peak fitting
parents c29da12e 51e583d3
No related merge requests found
......@@ -7,7 +7,11 @@ import mantid.simpleapi
import math
import copy
import numpy as np
from scipy.optimize import curve_fit
import scipy.optimize as opt
DEAD_PIXELS = 10
NX_PIXELS = 304
NY_PIXELS = 256
class MRInspectData(PythonAlgorithm):
......@@ -34,10 +38,8 @@ class MRInspectData(PythonAlgorithm):
doc="If true, use the 2nd ROI in the meta-data for the background")
self.declareProperty("UseTightBck", False,
doc="If true, use the area on each side of the peak to compute the background")
self.declareProperty("BckWidth", 3,
self.declareProperty("BckWidth", 10,
doc="If UseTightBck is true, width of the background on each side of the peak")
self.declareProperty("HuberXCut", 0.0,
doc="Provide a Huber X value above which a run will be considered a direct beam")
self.declareProperty("ForcePeakROI", False,
doc="If true, use the PeakROI property as the ROI")
......@@ -66,7 +68,6 @@ class MRInspectData(PythonAlgorithm):
use_roi_bck=self.getProperty("UseROIBck").value,
use_tight_bck=self.getProperty("UseTightBck").value,
bck_offset=self.getProperty("BckWidth").value,
huber_x_cut=self.getProperty("HuberXCut").value,
force_peak_roi=self.getProperty("ForcePeakROI").value,
peak_roi=self.getProperty("PeakROI").value,
force_low_res_roi=self.getProperty("ForceLowResPeakROI").value,
......@@ -139,15 +140,11 @@ class DataInfo(object):
"""
Class to hold the relevant information from a run (scattering or direct beam).
"""
n_x_pixel = 304
n_y_pixel = 256
peak_range_offset = 0
tolerance = 0.02
huber_x_cut = 4.95
def __init__(self, ws, cross_section='', use_roi=True, update_peak_range=False, use_roi_bck=False,
use_tight_bck=False, bck_offset=3, huber_x_cut=4.95,
force_peak_roi=False, peak_roi=[0,0],
use_tight_bck=False, bck_offset=3, force_peak_roi=False, peak_roi=[0,0],
force_low_res_roi=False, low_res_roi=[0,0],
force_bck_roi=False, bck_roi=[0,0], event_threshold=10000):
self.cross_section = cross_section
......@@ -158,7 +155,6 @@ class DataInfo(object):
self.peak_range = [0,0]
self.low_res_range = [0,0]
self.background = [0,0]
self.huber_x_cut = huber_x_cut
self.n_events_cutoff = event_threshold
# ROI information
......@@ -344,79 +340,15 @@ class DataInfo(object):
logger.notice("Forcing background ROI: %s" % self.forced_bck_roi)
self.roi_background = self.forced_bck_roi
def determine_peak_range(self, ws, specular=True, max_pixel=304):
"""
Find the reflectivity peak
:param workspace ws: workspace to work with
:param bool specular: if True, we are looking for a specular peak
:param int max_pixel: max pixel above which to exclude peaks
"""
ws_summed = mantid.simpleapi.RefRoi(InputWorkspace=ws, IntegrateY=specular,
NXPixel=self.n_x_pixel, NYPixel=self.n_y_pixel,
ConvertToQ=False,
OutputWorkspace="ws_summed")
integrated = mantid.simpleapi.Integration(ws_summed)
integrated = mantid.simpleapi.Transpose(integrated)
x_values = integrated.readX(0)
y_values = integrated.readY(0)
e_values = integrated.readE(0)
ws_short = mantid.simpleapi.CreateWorkspace(DataX=x_values[self.peak_range_offset:max_pixel],
DataY=y_values[self.peak_range_offset:max_pixel],
DataE=e_values[self.peak_range_offset:max_pixel])
try:
specular_peak, low_res, _ = mantid.simpleapi.LRPeakSelection(InputWorkspace=ws_short)
except:
logger.notice("Peak finding error [specular=%s]: %s" % (specular, sys.exc_info()[1]))
return integrated, [0,0], [0,0]
if specular:
peak = [specular_peak[0]+self.peak_range_offset, specular_peak[1]+self.peak_range_offset]
else:
# The low-resolution range finder tends to be a bit tight.
# Broaden it by a third.
#TODO: Fix the range finder algorithm
broadening = (low_res[1]-low_res[0])/3.0
peak = [low_res[0]+self.peak_range_offset-broadening,
low_res[1]+self.peak_range_offset+broadening]
mantid.simpleapi.DeleteWorkspace(ws_short)
mantid.simpleapi.DeleteWorkspace(ws_summed)
return integrated, peak, [low_res[0]+self.peak_range_offset, low_res[1]+self.peak_range_offset]
@classmethod
def fit_peak(cls, signal_x, signal_y, peak):
"""
Fit a Gaussian peak to a curve
:param array signal_x: list of x values
:param array signal_y: list of y values
:param array peak: initial guess for the peak (one-sigma min and max)
"""
def gauss(x, *p):
A, mu, sigma, bck = p
if A < 0 or sigma < 5:
return -np.inf
return A*np.exp(-(x-mu)**2/(2.*sigma**2)) + bck
p0 = [np.max(signal_y), (peak[1]+peak[0])/2.0, (peak[1]-peak[0])/2.0, 0.0]
err_y = np.sqrt(np.fabs(signal_y))
# Using bounds would be great but only available with scipy>=0.17. bounds=(0, np.inf)
coeff, _ = curve_fit(gauss, signal_x, signal_y, sigma=err_y, p0=p0)
peak_position = coeff[1]
peak_width = math.fabs(3.0*coeff[2])
return peak_position, peak_width
def check_direct_beam(self, ws, peak_position=None):
"""
Determine whether this data is a direct beam
:param workspace ws: Workspace to inspect
:param float peak_position: reflectivity peak position
"""
huber_x = ws.getRun().getProperty("HuberX").getStatistics().mean
sangle = ws.getRun().getProperty("SANGLE").getStatistics().mean
self.theta_d = 180.0 / math.pi * mantid.simpleapi.MRGetTheta(ws, SpecularPixel=peak_position)
return not ((self.theta_d > self.tolerance or sangle > self.tolerance) and huber_x < self.huber_x_cut)
self.theta_d = 180.0 / math.pi * mantid.simpleapi.MRGetTheta(ws, SpecularPixel=peak_position,
UseSANGLE=False)
return not self.theta_d > self.tolerance
def determine_data_type(self, ws):
"""
......@@ -431,18 +363,15 @@ class DataInfo(object):
return
# Find reflectivity peak and low resolution ranges
# Those will be our defaults
integrated, peak, broad_range = self.determine_peak_range(ws, specular=True)
peak, low_res = fit_2d_peak(ws)
if self.use_tight_bck:
bck_range = [int(max(0.0, peak[0]-self.bck_offset)), int(min(NX_PIXELS, peak[1]+self.bck_offset))]
else:
bck_range = [int(max(0.0, peak[0]-2*self.bck_offset)), int(max(0.0, peak[0]-self.bck_offset))]
self.found_peak = copy.copy(peak)
self.found_low_res = copy.copy(low_res)
logger.notice("Run %s [%s]: Peak found %s" % (self.run_number, self.cross_section, peak))
signal_y = integrated.readY(0)
mantid.simpleapi.DeleteWorkspace(integrated)
signal_x = range(len(signal_y))
_, low_res, _ = self.determine_peak_range(ws, specular=False)
logger.notice("Run %s [%s]: Low-res found %s" %(self.run_number, self.cross_section, str(low_res)))
self.found_low_res = low_res
bck_range = None
# Process the ROI information
try:
......@@ -453,91 +382,34 @@ class DataInfo(object):
# Keep track of whether we actually used the ROI
self.use_roi_actual = False
if self.use_roi and not self.roi_peak == [0,0]:
# If we were asked to use the ROI but no peak is in it, use the peak we found
# If we were asked to use the ROI and there's a peak in it, use the ROI
if self.use_roi and not self.update_peak_range and not self.roi_peak == [0,0]:
logger.notice("Using ROI peak range: [%s %s]" % (self.roi_peak[0], self.roi_peak[1]))
self.use_roi_actual = True
peak = copy.copy(self.roi_peak)
if not self.roi_low_res == [0,0]:
low_res = copy.copy(self.roi_low_res)
if not self.roi_background == [0,0]:
bck_range = copy.copy(self.roi_background)
logger.notice("Using ROI peak range: [%s %s]" % (peak[0], peak[1]))
self.use_roi_actual = True
# Determine reflectivity peak position (center)
# Crop three sigmas around the peak before fitting
_width = int(peak[1]-peak[0])
signal_y_crop = signal_y[peak[1]-_width:peak[1]+1+_width]
signal_x_crop = signal_x[peak[0]-_width:peak[1]+1+_width]
# Calculate a reasonable peak position
#peak_mean = np.average(signal_x_crop, weights=signal_y_crop)
peak_position = (peak[1]+peak[0])/2.0
peak_width = (peak[1]-peak[0])/2.0
try:
# Try to find the peak position within the peak range we found
peak_position, peak_width = self.fit_peak(signal_x_crop, signal_y_crop, peak)
# If we are more than two sigmas away from the middle of the range,
# there's clearly a problem.
if np.abs(peak_position - (peak[1]+peak[0])/2.0) > np.abs(peak[1]-peak[0]):
logger.notice("Found peak position outside of given range [x=%s], switching to full detector" % peak_position)
peak_position = (peak[1]+peak[0])/2.0
peak_width = (peak[1]-peak[0])/2.0
raise RuntimeError("Bad peak position")
except:
# If we can't find a peak, try fitting over the full detector.
# If we do find a peak, then update the ranges rather than using
# what we currently have (which is probably given by the ROI).
logger.notice("Run %s [%s]: Could not fit a peak in the supplied peak range" %
(self.run_number, self.cross_section))
logger.notice(str(sys.exc_info()[1]))
try:
# Define a good default that is wide enough for the fit to work
default_width = (self.found_peak[1]-self.found_peak[0])/2.0
default_width = max(default_width, 10.0)
default_center = (self.found_peak[1]+self.found_peak[0])/2.0
default_peak = [default_center-default_width, default_center+default_width]
logger.notice("Run %s [%s]: Broad data region %s" % (self.run_number, self.cross_section, broad_range))
x_min = broad_range[0]+10
x_max = broad_range[1]-10
peak_position, peak_width = self.fit_peak(signal_x[x_min:x_max], signal_y[x_min:x_max], default_peak)
peak = [math.floor(peak_position-peak_width), math.floor(peak_position+peak_width)]
#low_res = [5, self.n_x_pixel-5]
low_res = self.found_low_res
self.use_roi_actual = False
logger.notice("Run %s [%s]: Peak not in supplied range! Found peak: %s low: %s" %
(self.run_number, self.cross_section, peak, low_res))
logger.notice("Run %s [%s]: Peak position: %s Peak width: %s" %
(self.run_number, self.cross_section, peak_position, peak_width))
except:
logger.notice(str(sys.exc_info()[1]))
logger.notice("Run %s [%s]: Gaussian fit failed to determine peak position" %
(self.run_number, self.cross_section))
# Update the specular peak range if needed
if self.update_peak_range:
peak[0] = math.floor(peak_position-peak_width)
peak[1] = math.ceil(peak_position+peak_width)
logger.notice("Updating peak range to: [%s %s]" % (peak[0], peak[1]))
self.use_roi_actual = False
elif self.use_roi and self.update_peak_range and not self.roi_peak == [0,0]:
logger.notice("Using fit peak range: [%s %s]" % (peak[0], peak[1]))
if not self.roi_low_res == [0,0]:
low_res = copy.copy(self.roi_low_res)
if not self.roi_background == [0,0]:
bck_range = copy.copy(self.roi_background)
# Store the information we found
self.peak_position = peak_position
self.peak_range = [int(max(0, peak[0])), int(min(peak[1], self.n_x_pixel))]
self.low_res_range = [int(max(0, low_res[0])), int(min(low_res[1], self.n_y_pixel))]
if not self.use_roi_bck or bck_range is None:
if self.use_tight_bck:
self.background = [self.peak_range[0]-self.bck_offset, self.peak_range[1]+self.bck_offset]
else:
self.background = [4, self.peak_range[0]-30]
else:
self.background = [int(bck_range[0]), int(bck_range[1])]
self.peak_position = (peak[1]+peak[0])/2.0
self.peak_range = [int(max(0, peak[0])), int(min(peak[1], NX_PIXELS))]
self.low_res_range = [int(max(0, low_res[0])), int(min(low_res[1], NY_PIXELS))]
self.background = [int(max(0, bck_range[0])), int(min(bck_range[1], NY_PIXELS))]
# Computed scattering angle
self.calculated_scattering_angle = 180.0 / math.pi * mantid.simpleapi.MRGetTheta(ws, SpecularPixel=peak_position)
self.calculated_scattering_angle = 180.0 / math.pi * mantid.simpleapi.MRGetTheta(ws, SpecularPixel=self.peak_position)
# Determine whether we have a direct beam
self.is_direct_beam = self.check_direct_beam(ws, peak_position)
self.is_direct_beam = self.check_direct_beam(ws, self.peak_position)
# Convenient data type
self.data_type = 0 if self.is_direct_beam else 1
......@@ -546,5 +418,173 @@ class DataInfo(object):
self.log()
def fit_2d_peak(workspace):
"""
Fit a 2D Gaussian peak
:param workspace: workspace to work with
"""
n_x = int(workspace.getInstrument().getNumberParameter("number-of-x-pixels")[0])
n_y = int(workspace.getInstrument().getNumberParameter("number-of-y-pixels")[0])
# Prepare data to fit
_integrated = mantid.simpleapi.Integration(InputWorkspace=workspace)
signal = _integrated.extractY()
z=np.reshape(signal, (n_x, n_y))
x = np.arange(0, n_x)
y = np.arange(0, n_y)
_x, _y = np.meshgrid(x, y)
_x = _x.T
_y = _y.T
code = coord_to_code(_x, _y).ravel()
data_to_fit = z.ravel()
err_y = np.sqrt(np.fabs(data_to_fit))
err_y[err_y<1] = 1
# Use the highest data point as a starting point for a simple Gaussian fit
x_dist = np.sum(z, 1)
y_dist = np.sum(z, 0)
center_x = np.argmax(x_dist)
center_y = np.argmax(y_dist)
# Gaussian fit
p0 = [np.max(z), center_x, 5, center_y, 50, 0]
try:
gauss_coef, _ = opt.curve_fit(gauss_simple, code, data_to_fit, p0=p0, sigma=err_y)
except:
logger.notice("Could not fit simple Gaussian")
gauss_coef = p0
# Keep track of the result
th = gauss_simple(code, *gauss_coef)
th = np.reshape(th, (n_x, n_y))
_chi2 = chi2(th, z)
guess_x = gauss_coef[1]
guess_wx = 2.0 * gauss_coef[2]
guess_y = gauss_coef[3]
guess_wy = 2.0 * gauss_coef[4]
guess_chi2 = _chi2
# Fit a polynomial background, as a starting point to fitting signal + background
try:
step_coef, _ = opt.curve_fit(poly_bck, code, data_to_fit, p0=[0, 0, 0, center_x, 0], sigma=err_y)
except:
logger.notice("Could not fit polynomial background")
step_coef = [0, 0, 0, center_x, 0]
th = poly_bck(code, *step_coef)
th = np.reshape(th, (n_x, n_y))
# Now fit a Gaussian + background
# A, mu_x, sigma_x, mu_y, sigma_y, poly_a, poly_b, poly_c, center, background
coef = [np.max(z), center_x, 5, center_y, 50,
step_coef[0], step_coef[1], step_coef[2], step_coef[3], step_coef[4]]
try:
coef, _ = opt.curve_fit(poly_bck_signal, code, data_to_fit, p0=coef, sigma=err_y)
except:
logger.notice("Could not fit Gaussian + polynomial")
th = poly_bck_signal(code, *coef)
th = np.reshape(th, (n_x, n_y))
_chi2 = chi2(th, z)
if _chi2 < guess_chi2:
guess_x = coef[1]
guess_wx = 2.0 * coef[2]
guess_y = coef[3]
guess_wy = 2.0 * coef[4]
guess_chi2 = _chi2
# Package the best results
x_min = max(0, int(guess_x-np.fabs(guess_wx)))
x_max = min(n_x-1, int(guess_x+np.fabs(guess_wx)))
y_min = max(0, int(guess_y-np.fabs(guess_wy)))
y_max = min(n_y-1, int(guess_y+np.fabs(guess_wy)))
return [x_min, x_max], [y_min, y_max]
def coord_to_code(x, y):
""" Utility function to encode pixel coordinates so we can unravel our distribution in a 1D array """
return 1000 * x + y
def code_to_coord(c):
""" Utility function to decode encoded coordinates """
i_x = c / 1000
i_y = c % 1000
return i_x, i_y
def poly_bck(value, *p):
"""
Polynomial function for background fit
f = a + b*(x-center) + c*(x-center)**2 + bck
where bck is a minimum threshold that is zero when the polynomial
has a value greater than it.
"""
coord = code_to_coord(value)
poly_a, poly_b, poly_c, center, background = p
values = poly_a + poly_b*(coord[0]-center) + poly_c*(coord[0]-center)**2
values[values<background] = background
values[coord[0]<DEAD_PIXELS] = 0
values[coord[0]>=NX_PIXELS-DEAD_PIXELS] = 0
values[coord[1]<DEAD_PIXELS] = 0
values[coord[1]>=NY_PIXELS-DEAD_PIXELS] = 0
return values
def poly_bck_signal(value, *p):
"""
Function for a polynomial + Gaussian signal
f = a + b*(x-center) + c*(x-center)**2 + Gaussian(x) + bck
where bck is a minimum threshold that is zero when the polynomial+Gaussian
has a value greater than it.
"""
coord = code_to_coord(value)
A, mu_x, sigma_x, mu_y, sigma_y, poly_a, poly_b, poly_c, center, background = p
if A<0 or sigma_x>50:
return np.zeros(len(value))
values = poly_a + poly_b * (coord[0] - center) + poly_c * (coord[0] - center)**2
values_g = A * np.exp(-(coord[0] - mu_x)**2 / (2. * sigma_x**2)-(coord[1] - mu_y)**2 / (2. * sigma_y**2))
values += values_g
values[values<background] = background
values[coord[0]<DEAD_PIXELS] = 0
values[coord[0]>=NX_PIXELS-DEAD_PIXELS] = 0
values[coord[1]<DEAD_PIXELS] = 0
values[coord[1]>=NY_PIXELS-DEAD_PIXELS] = 0
return values
def gauss_simple(value, *p):
"""
Gaussian function with threshold background
f = Gaussian(x) + bck
where bck is a minimum threshold that is zero when the Gaussian
has a value greater than it.
"""
coord = code_to_coord(value)
A, mu_x, sigma_x, mu_y, sigma_y, background = p
if A<0 or sigma_x>50:
return np.zeros(len(value))
values = A * np.exp(-(coord[0] - mu_x)**2 / (2. * sigma_x**2) - (coord[1] - mu_y)**2 / (2. * sigma_y**2))
values[values<background] = background
values[coord[0]<DEAD_PIXELS] = 0
values[coord[0]>=NX_PIXELS-DEAD_PIXELS] = 0
values[coord[1]<DEAD_PIXELS] = 0
values[coord[1]>=NY_PIXELS-DEAD_PIXELS] = 0
return values
def chi2(data, model):
""" Returns the chi^2 for a data set and model pair """
err = np.fabs(data.ravel())
err[err<=0] = 1
return np.sum((data.ravel() - model.ravel())**2 / err) / len(data.ravel())
# Register
AlgorithmFactory.subscribe(MRInspectData)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment