"Framework/Examples/PropertyAlgorithm.cpp" did not exist on "00d938145ca5a6c4e22b3de6b6182c9247f6e380"
-
Samuel Jackson authoredSamuel Jackson authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
RefineSatellitePeaks.py 8.87 KiB
from mantid.kernel import *
from mantid.api import *
from mantid.simpleapi import *
import numpy as np
from scipy.spatial import KDTree
from scipy.cluster.vq import kmeans2
import scipy.cluster.hierarchy as hcluster
class RefineSatellitePeaks(DataProcessorAlgorithm):
def PyInit(self):
self.declareProperty(IPeaksWorkspaceProperty(name="MainPeaks",
defaultValue="",
direction=Direction.Input),
doc="Main integer HKL peaks. Q vectors will be calculated relative to these peaks.")
self.declareProperty(IPeaksWorkspaceProperty(name="SatellitePeaks",
defaultValue="",
direction=Direction.Input),
doc="Positions of seed satellite peaks. These will be used to define the q vectors for each satellite.")
self.declareProperty(WorkspaceProperty(name="MDWorkspace",
defaultValue="",
direction=Direction.Input),
doc="Workspace to search for satellites peak in.")
self.declareProperty(IPeaksWorkspaceProperty(name="OutputWorkspace",
defaultValue="",
direction=Direction.Output),
doc="All found satellite peaks. These will be given with satellite coordinates.")
self.declareProperty('NumOfQs', -1, direction=Direction.Input,
doc="The number of satellite peaks to look for. If this option is not set to the default then all the \
provided satellites will be grouped into exactly this number of q vectors")
self.declareProperty('ClusterThreshold', 1.5, direction=Direction.Input,
doc="Threshold for automaticallty deciding on the number of q vectors to use. If NumOfQs found is set then this \
is property is ignored.")
self.declareProperty('PeakRadius', 0.1, direction=Direction.Input,
doc="The peak radius used to integrate the satellite peaks. This is passed direclty to IntegratePeaksMD")
self.declareProperty('BackgroundInnerRadius', 0.1, direction=Direction.Input,
doc="The inner background radius used to integrate the satellite peaks. This is passed direclty to \
IntegratePeaksMD")
self.declareProperty('BackgroundOuterRadius', 0.2, direction=Direction.Input,
doc="The outer background radius used to integrate satellite peaks. This is passed direclty to \
IntegratePeaksMD")
self.declareProperty('I/sigma', 2, direction=Direction.Input,
doc="The I/sigma threshold use to identify if peaks exist. This is passed direclty to FilterPeaks")
def PyExec(self):
k = self.getProperty("NumOfQs").value
peak_radius = self.getProperty("PeakRadius").value
background_radii = (self.getProperty("BackgroundInnerRadius").value, self.getProperty("BackgroundOuterRadius").value)
I_over_sigma = self.getProperty("I/sigma").value
cluster_threshold = self.getProperty("ClusterThreshold").value
# if user did not specify the number of qs then
# set the k value to None
if k == -1:
k = None
md = self.getProperty("MDWorkspace").value
nuclear = self.getProperty("MainPeaks").value
sats = self.getProperty("SatellitePeaks").value
nuclear_hkls = self.get_hkls(nuclear)
sats_hkls = self.get_hkls(sats)
qs = self.find_q_vectors(nuclear_hkls, sats_hkls)
clusters, k = self.cluster_qs(qs, threshold=cluster_threshold)
qs = self.average_clusters(qs, clusters)
predicted_satellites = self.create_fractional_peaks_workspace(qs, nuclear)
centroid_satellites = CentroidPeaksMD(md, predicted_satellites, PeakRadius=peak_radius)
satellites_int_spherical = IntegratePeaksMD(md, centroid_satellites, PeakRadius=peak_radius,
BackgroundInnerRadius=background_radii[0], BackgroundOuterRadius=background_radii[1], IntegrateIfOnEdge=True)
satellites_int_spherical = FilterPeaks(satellites_int_spherical, FilterVariable="Intensity", FilterValue=0, Operator=">")
satellites_int_spherical = FilterPeaks(satellites_int_spherical, FilterVariable="Signal/Noise",
FilterValue=I_over_sigma, Operator=">")
DeleteWorkspace(predicted_satellites)
DeleteWorkspace(centroid_satellites)
name = self.getPropertyValue("OutputWorkspace")
RenameWorkspace(satellites_int_spherical, name)
self.log().notice("Q vectors are: \n{}".format(qs))
self.setProperty("OutputWorkspace", satellites_int_spherical)
def get_hkls(self, peaks_workspace):
"""Return a 2D numpy array from a peaks workspace.
:param peaks_workpace: the PeaksWorkspace to extract HKL values from
:returns: np.ndarry -- 2D array of HKL values
"""
return np.array([np.array([peak['h'], peak['k'], peak['l']]) for peak in peaks_workspace])
def cluster_qs(self, qs, k=None, threshold=1.5):
"""Cluster q vectors into discrete groups.
Classifies each of the q vectors into a number of clusters. The number of clusters used is decided by the parameters passed:
* If the k parameter is supplied then the q vectors are grouped into k clusters using kmeans.
* If the threshold parameter is supplied then the q vectors a split into groups based on cophenetic distance.
:param qs: list of q vectors to cluster. Each element should be a numpy array of length three.
:param k: number of clusters to use (optional).
:param threshold: cophenetic distance cut off point for new clusters (optional)
:returns: tuple (clusters, k)
Where:
list -- clusters is a list of cluster indicies which each q belongs to
int -- k is the number of clusters used
"""
if k is not None:
centroid, clusters = kmeans2(qs, k)
else:
clusters = hcluster.fclusterdata(qs, threshold, criterion="distance")
return clusters, len(set(clusters))
def average_clusters(self, qs, clusters):
"""Find the centroid of the clusters.
For each q vector, group them by their designated cluster and then compute
the average of the group.
:param qs: list of q vectors. Each element should be a numpy array.
:param clusters: the indicies of the cluster that each q belongs to.
:returns: np.ndarry -- the list of centroids for each cluster.
"""
averaged_qs = []
for cluster_index in set(clusters):
averaged_qs.append(np.mean(qs[clusters==cluster_index], axis=0))
return np.array(averaged_qs)
def find_q_vectors(self, nuclear_hkls, sats_hkls):
"""Find the q vector between the nuclear HKLs and the satellite peaks
Given a list of HKL positions and a list of fractional HKL positions of
satellite peaks, find the difference between each satellite and its nearest
integer HKL.
:param nuclear_hkls: the positions of integer HKL peaks.
:param sats_hkl: the positions of fractional "satellite" HKL peaks.
:returns: np.ndarray -- array of q vectors.
"""
peak_map = KDTree(nuclear_hkls)
qs = []
for sat in sats_hkls:
distance, index = peak_map.query(sat, k=1)
if distance > 2:
#peak to far away from satellite ignore
continue
nearest_peak = nuclear_hkls[index]
qs.append(sat - nearest_peak)
return np.array(qs)
def create_fractional_peaks_workspace(self, qs, nuclear):
"""Generate a peaks workspace of possible satellite peaks from a list of q vectors.
:param qs: list of q vectors to use to generate fractional peaks.
:param nuclear: list of integer HKL peak positions.
:returns: PeaksWorkspace -- containing predicted locations of satellite peaks.
"""
predicted_satellites = nuclear.clone()
for _ in range(predicted_satellites.getNumberPeaks()):
predicted_satellites.removePeak(0)
for q in qs:
predicted_q = PredictFractionalPeaks(nuclear, HOffset=q[0], KOffset=q[1], LOffset=q[2])
predicted_satellites = CombinePeaksWorkspaces(predicted_satellites, predicted_q)
DeleteWorkspace(predicted_q)
return predicted_satellites
AlgorithmFactory.subscribe(RefineSatellitePeaks)