Skip to content
Snippets Groups Projects
Commit d51717b9 authored by Marina Ganeva's avatar Marina Ganeva
Browse files

TOFTOF Vanadium correction algorithm.

parent 98ca6aea
No related branches found
No related tags found
No related merge requests found
from mantid.api import PythonAlgorithm, AlgorithmFactory, WorkspaceProperty, Progress, InstrumentValidator
from mantid.kernel import Direction
import mantid.simpleapi as api
import numpy as np
from scipy import integrate
import scipy as sp
import mlzutils
# Test for keyword Vanadium or vanadium in Title - compulsory entries
class ComputeCalibrationCoefVan(PythonAlgorithm):
""" Calculate coefficients to normalize by Vanadium and correct Debye Waller factor
"""
def __init__(self):
"""
Init
"""
PythonAlgorithm.__init__(self)
self.vanaws = None
self.defaultT = 293.0 # K, default temperature if not given
self.Mvan = 50.942 # [g/mol], Vanadium molar mass
self.DebyeT = 389.0 # K, Debye temperature for Vanadium
def category(self):
""" Return category
"""
return "PythonAlgorithms;CorrectionFunctions\\EfficiencyCorrections"
def name(self):
""" Return summary
"""
return "ComputeCalibrationCoefVan"
def summary(self):
return "Calculate coefficients for detector efficiency correction using the Vanadium data."
def PyInit(self):
""" Declare properties
"""
self.declareProperty(WorkspaceProperty("VanadiumWorkspace", "", direction=Direction.Input,
validator=InstrumentValidator()),
"Input Vanadium workspace")
self.declareProperty(WorkspaceProperty("OutputWorkspace", "", direction=Direction.Output),
"Name the workspace that will contain the calibration coefficients")
return
def validateInputs(self):
issues = dict()
inws = self.getProperty("VanadiumWorkspace").value
run = inws.getRun()
if not run.hasProperty('wavelength'):
issues['VanadiumWorkspace'] = "Input workspace must have wavelength sample log."
else:
try:
float(run.getProperty('wavelength').value)
except ValueError:
issues['VanadiumWorkspace'] = "Invalid value for wavelength sample log. Wavelength must be a number."
outws = self.getPropertyValue("OutputWorkspace")
if not outws:
issues["OutputWorkspace"] = "Name of the output workspace must not be empty."
return issues
def get_temperature(self):
"""
tries to get temperature from the sample logs
in the case of fail, default value is returned
"""
run = self.vanaws.getRun()
if not run.hasProperty('temperature'):
self.log().warning("Temperature sample log is not present in " + self.vanaws.getName() +
"T=293K is assumed for Debye-Waller factor.")
return self.defaultT
try:
temperature = float(run.getProperty('temperature').value)
except ValueError, err:
self.log().warning("Error of getting temperature: " + err +
"T=293K is assumed for Debye-Waller factor.")
return self.defaultT
return temperature
def PyExec(self):
""" Main execution body
"""
self.vanaws = self.getProperty("VanadiumWorkspace").value # returns workspace instance
outws_name = self.getPropertyValue("OutputWorkspace") # returns workspace name (string)
nhist = self.vanaws.getNumberHistograms()
prog_reporter = Progress(self, start=0.0, end=1.0, nreports=nhist+1)
# calculate array of Debye-Waller factors
dwf = self.calculate_dwf()
# for each detector: fit gaussian to get peak_centre and fwhm
# sum data in the range [peak_centre - 3*fwhm, peak_centre + 3*fwhm]
dataX = self.vanaws.readX(0)
coefY = np.zeros(nhist)
coefE = np.zeros(nhist)
for idx in range(nhist):
prog_reporter.report("Setting %dth spectrum" % idx)
dataY = self.vanaws.readY(idx)
if np.max(dataY) != 0:
dataE = self.vanaws.readE(idx)
peak_centre, sigma = mlzutils.do_fit_gaussian(self.vanaws, idx, self.log(), cleanup_fit=False)
fwhm = sigma*2.*np.sqrt(2.*np.log(2.))
idxmin = (np.fabs(dataX-peak_centre+3.*fwhm)).argmin()
idxmax = (np.fabs(dataX-peak_centre-3.*fwhm)).argmin()
self.log().debug("Sigma=%f, centre=%f, fwhm=%f, idxmin=%d, idxmax=%d" % (sigma, peak_centre, fwhm, idxmin, idxmax))
coefY[idx] = dwf[idx]*sum(dataY[idxmin:idxmax+1])
coefE[idx] = dwf[idx]*sum(dataE[idxmin:idxmax+1])
else:
coefY[idx] = 0.
coefE[idx] = 0.
# create X array, X data are the same for all detectors, so
coefX = np.zeros(nhist)
coefX.fill(dataX[0])
api.CreateWorkspace(OutputWorkspace=outws_name, DataX=coefX, DataY=coefY, DataE=coefE, NSpec=nhist, UnitX="TOF")
outws = api.AnalysisDataService.retrieve(outws_name)
api.CopyLogs(self.vanaws, outws)
self.log().information("Workspace with calibration coefficients " + outws.getName() + " has been created.")
self.setProperty("OutputWorkspace", outws)
def calculate_dwf(self):
"""
Calculates Debye-Waller factor according to
Sears and Shelley Acta Cryst. A 47, 441 (1991)
"""
run = self.vanaws.getRun()
nhist = self.vanaws.getNumberHistograms()
thetasort = np.zeros(nhist) # theta in radians !!!NOT 2Theta
instrument = self.vanaws.getInstrument()
detID_offset = 0
try:
instrument.getDetector(0)
except RuntimeError:
detID_offset = 1
for i in range(nhist):
det = instrument.getDetector(i + detID_offset)
thetasort[i] = 0.5*self.vanaws.detectorSignedTwoTheta(det)
temperature = self.get_temperature() # T in K
wlength = float(run.getLogData('wavelength').value) # Wavelength, Angstrom
mass_vana = 0.001*self.Mvan/sp.constants.N_A # Vanadium mass, kg
temp_ratio = temperature/self.DebyeT
if temp_ratio < 1.e-3:
integral = 0.5
else:
integral = integrate.quad(lambda x: x/sp.tanh(0.5*x/temp_ratio), 0, 1)[0]
msd = 3.*sp.constants.hbar**2/(2.*mass_vana*sp.constants.k * self.DebyeT)*integral*1.e20
return np.exp(-msd*(4.*sp.pi*sp.sin(thetasort)/wlength)**2)
# Register algorithm with Mantid.
AlgorithmFactory.subscribe(ComputeCalibrationCoefVan)
# pylint: disable=assignment-from-none
import mantid.simpleapi as api
import numpy as np
def cleanup(wslist):
......@@ -154,3 +156,82 @@ def compare_mandatory(wslist, plist, logger, tolerance=0.01):
"Workspaces: " + ", ".join(wslist) + "\n Values: " + str(properties)
logger.error(message)
raise RuntimeError(message)
def remove_fit_workspaces(prefix):
ws1 = prefix + '_Parameters'
ws2 = prefix + '_NormalisedCovarianceMatrix'
cleanup([ws1, ws2])
def do_fit_gaussian(workspace, index, logger, cleanup_fit=True):
"""
Calculates guess values on peak centre, sigma and peak height.
Uses them as an input to run a fit algorithm
@ param workspace --- input workspace
@ param index --- the spectrum with which WorkspaceIndex to fit
@ param cleanup --- if False, the intermediate workspaces created by Fit algorithm will not be removed
@ returns peak_centre --- fitted peak centre
@ returns sigma --- fitted sigma
"""
nhist = workspace.getNumberHistograms()
if index > nhist:
message = "Index " + str(index) + " is out of range for the workspace " + workspace.getName()
logger.error(message)
raise RuntimeError(message)
x_values = np.array(workspace.readX(index))
y_values = np.array(workspace.readY(index))
# get peak centre position
imax = np.argmax(y_values)
height = y_values[imax]
# check for zero or negative signal
if height <= 0:
logger.warning("Workspace %s, detector %d has maximum <= 0" % (workspace.getName(), index))
return [0, 0]
try_centre = x_values[imax]
# guess sigma
indices = np.argwhere(y_values > 0.5*height)
nentries = len(indices)
if nentries < 3:
message = "Spectrum " + str(index) + " in workspace " + workspace.getName() +\
" has too narrow peak. Cannot guess sigma. Check your data."
logger.error(message)
raise RuntimeError(message)
# fwhm = sigma * (2.*np.sqrt(2.*np.log(2.)))
fwhm = np.fabs(x_values[indices[nentries - 1, 0]] - x_values[indices[0, 0]])
sigma = fwhm/(2.*np.sqrt(2.*np.log(2.)))
# execute Fit algorithm
myfunc = 'name=Gaussian, Height='+str(height)+', PeakCentre='+str(try_centre)+', Sigma='+str(sigma)
startX = try_centre - 3.0*fwhm
endX = try_centre + 3.0*fwhm
prefix = "Fit" + workspace.getName() + str(index)
retvals = api.Fit(InputWorkspace=workspace.getName(), WorkspaceIndex=index, StartX=startX, EndX=endX,
Function=myfunc, Output=prefix, OutputParametersOnly=True)
if not retvals or len(retvals) < 4:
message = "For detector " + str(index) + " in workspace " + workspace.getName() +\
" failed to retrieve fit results. Input guess parameters are " + str(myfunc)
logger.error(message)
if cleanup_fit:
remove_fit_workspaces(prefix)
raise RuntimeError(message)
fitStatus = retvals[0]
paramTable = retvals[3]
if fitStatus != 'success':
message = "For detector " + str(index) + " in workspace " + workspace.getName() +\
"fit has been terminated with status " + fitStatus + ". Input guess parameters are " + str(myfunc)
logger.error(message)
if cleanup_fit:
remove_fit_workspaces(prefix)
raise RuntimeError(message)
result = paramTable.column(1)[1:3]
if cleanup_fit:
remove_fit_workspaces(prefix)
# return list: [peak_centre, sigma]
return result
......@@ -8,6 +8,7 @@ set ( TEST_PY_FILES
CalculateSampleTransmissionTest.py
CheckForSampleLogsTest.py
ConjoinSpectraTest.py
ComputeCalibrationCoefVanTest.py
CorrectLogTimesTest.py
CreateLeBailFitInputTest.py
CreateMDTest.py
......
import unittest
from mantid.simpleapi import DeleteWorkspace, CreateSampleWorkspace, AddSampleLog, EditInstrumentGeometry,\
CloneWorkspace, CheckWorkspacesMatch
from testhelpers import run_algorithm
from mantid.api import AnalysisDataService
from scipy.constants import N_A, hbar, k
import numpy as np
class ComputeCalibrationCoefVanTest(unittest.TestCase):
def setUp(self):
# input_ws = Load(Filename="TOFTOFTestdata.nxs")
input_ws = CreateSampleWorkspace(Function="User Defined",
UserDefinedFunction="name=LinearBackground, A0=0.3;name=Gaussian, \
PeakCentre=5, Height=10, Sigma=0.3", NumBanks=2, BankPixelWidth=1,
XMin=0, XMax=10, BinWidth=0.1, BankDistanceFromSample=4.0)
self._input_ws = input_ws
AddSampleLog(self._input_ws, LogName='wavelength', LogText='4.0', LogType='Number', LogUnit='Angstrom')
def test_output(self):
outputWorkspaceName = "output_ws"
alg_test = run_algorithm("ComputeCalibrationCoefVan", VanadiumWorkspace=self._input_ws,
OutputWorkspace=outputWorkspaceName)
self.assertTrue(alg_test.isExecuted())
wsoutput = AnalysisDataService.retrieve(outputWorkspaceName)
# Output = Vanadium ws
self.assertEqual(wsoutput.getRun().getLogData('run_title').value,
self._input_ws.getRun().getLogData('run_title').value)
# Structure of output workspace
for i in range(wsoutput.getNumberHistograms()):
self.assertIsNotNone(wsoutput.readY(i)[0])
for j in range(1, wsoutput.blocksize()):
self.assertEqual(wsoutput.readY(i)[j], wsoutput.readY(i)[0])
# Size of output workspace
self.assertEqual(wsoutput.getNumberHistograms(), self._input_ws.getNumberHistograms())
DeleteWorkspace(wsoutput)
return
def test_sum(self):
outputWorkspaceName = "output_ws"
alg_test = run_algorithm("ComputeCalibrationCoefVan", VanadiumWorkspace=self._input_ws,
OutputWorkspace=outputWorkspaceName)
self.assertTrue(alg_test.isExecuted())
wsoutput = AnalysisDataService.retrieve(outputWorkspaceName)
# check whether sum is calculated correctly, for theta=0, dwf=1
y_sum = sum(self._input_ws.readY(0)[27:75])
self.assertAlmostEqual(y_sum, wsoutput.readY(0)[0])
DeleteWorkspace(wsoutput)
def test_dwf(self):
outputWorkspaceName = "output_ws"
# change theta to make dwf != 1
EditInstrumentGeometry(self._input_ws, L2="4,8", Polar="0,15", Azimuthal="0,0", DetectorIDs="1,2")
alg_test = run_algorithm("ComputeCalibrationCoefVan", VanadiumWorkspace=self._input_ws,
OutputWorkspace=outputWorkspaceName)
self.assertTrue(alg_test.isExecuted())
wsoutput = AnalysisDataService.retrieve(outputWorkspaceName)
# check dwf calculation
y_sum = sum(self._input_ws.readY(1)[27:75])
det2 = self._input_ws.getInstrument().getDetector(2)
mvan = 0.001*50.942/N_A
Bcoef = 4.736767162094296*1e+20*hbar*hbar/(2.0*mvan*k*389.0)
dwf = np.exp(-1.0*Bcoef*(4.0*np.pi*np.sin(0.5*self._input_ws.detectorSignedTwoTheta(det2))/4.0)**2)
self.assertAlmostEqual(y_sum*dwf, wsoutput.readY(1)[0])
DeleteWorkspace(wsoutput)
def test_input_not_modified(self):
backup = CloneWorkspace(self._input_ws)
outputWorkspaceName = "output_ws"
alg_test = run_algorithm("ComputeCalibrationCoefVan", VanadiumWorkspace=self._input_ws,
OutputWorkspace=outputWorkspaceName)
self.assertTrue(alg_test.isExecuted())
self.assertEqual("Success!", CheckWorkspacesMatch(backup, self._input_ws))
DeleteWorkspace(backup)
def tearDown(self):
if AnalysisDataService.doesExist(self._input_ws.getName()):
DeleteWorkspace(self._input_ws)
if __name__ == "__main__":
unittest.main()
.. algorithm::
.. summary::
.. alias::
.. properties::
Description
-----------
Algorithm creates a workspace with detector sensitivity correction coefficients using the given Vanadium workspace. The correction coefficients are calculated as follows.
1. Calculate the Debye-Waller factor according to Sears and Shelley *Acta Cryst. A* **47**, 441 (1991):
:math:`D_i = \exp\left(-B_i\cdot\frac{4\pi\sin\theta_i}{\lambda^2}\right)`
:math:`B_i = \frac{3\hbar^2\cdot 10^{20}}{2m_VkT_m}\cdot J(y)`
where :math:`J(y) = 0.5` if :math:`y < 10^{-3}`, otherwise
:math:`J(y) = \int_0^1 x\cdot\mathrm{coth}\left(\frac{x}{2y}\right)\,\mathrm{d}x`
where :math:`y=T/T_m` is the ratio of the temperature during the experiment :math:`T` to the Debye temperature :math:`T_m = 389K`, :math:`m_V` is the Vanadium atomic mass (in kg) and :math:`\theta_i` is the polar angle of the i-th detector retrieved using the *detectorSignedTwoTheta* function.
.. warning::
If sample log *temperature* is not present in the given Vanadium workspace or temperature is set to an invalid value, T=293K will be taken for the Debye-Waller factor calculation. Algorithm will produce warning in this case.
2. Perform Gaussian fit of the data to find out the position of the peak centre and FWHM. These values are used to calculate sum :math:`S_i` as
:math:`S_i = \sum_{x = x_C - 3\,\mathrm{fwhm}}^{x_C + 3\,\mathrm{fwhm}} Y_i(x)`
where :math:`x_C` is the peak centre position and :math:`Y_i(x)` is the coresponding to :math:`x` :math:`Y` value for i-th detector.
3. Finally, the correction coefficients :math:`K_i` are calculated as
:math:`K_i = D_i\times S_i`
Workspace containing these correction coefficients is created as an output and can be used as a RHS workspace in :ref:`algm-Divide` to apply correction to the LHS workspace.
.. note::
If gaussian fit fails, algorithm terminates with an error message. The error message contains name of the workspace and detector number.
Restrictions on the input workspace
###################################
The valid input workspace:
- must have an instrument set
- must have a *wavelength* sample log
Usage
-----
**Example**
.. testcode:: ExComputeCalibrationCoefVan
# load Vanadium data
wsVana = LoadMLZ(Filename='TOFTOFTestdata.nxs')
# calculate correction coefficients
wsCoefs = ComputeCalibrationCoefVan(wsVana)
print 'Spectrum 4 of the output workspace is filled with: ', round(wsCoefs.readY(999)[0])
# wsCoefs can be used as rhs with Divide algorithm to apply correction to the data
wsCorr = wsVana/wsCoefs
print 'Spectrum 4 of the input workspace is filled with: ', round(wsVana.readY(999)[0], 1)
print 'Spectrum 4 of the corrected workspace is filled with: ', round(wsCorr.readY(999)[0], 5)
Output:
.. testoutput:: ExComputeCalibrationCoefVan
Spectrum 4 of the output workspace is filled with: 6596.0
Spectrum 4 of the input workspace is filled with: 1.0
Spectrum 4 of the corrected workspace is filled with: 0.00015
.. categories::
.. sourcelink::
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment