Loading drtsans/tof/eqsans/elastic_reference_normalization.py +320 −34 Original line number Diff line number Diff line Loading @@ -47,6 +47,33 @@ class ReferenceWavelengths: self.error_vec = errors class ReferenceWavelengths2D: """ Class for keeping track of reference wavelength for each momentum transfer Q (1D) """ def __init__(self, qx_values, qy_values, ref_wavelengths, intensities, errors): """Initialization Parameters ---------- qx_values: ~numpy.ndarray (1, qx_values_size * num_wavelengths) 2D vector for Qx qy_values: ~numpy.ndarray (1, qy_values_size * num_wavelengths) 2D vector for Qy ref_wavelength: int wavelength value of ref wavelength intensities: ~numpy.ndarray (qx_values_size, qy_values_size, 1) 3D vector for intensities of (Qx, Qy, 1) errors: ~numpy.ndarray 3D vector for errors of (Qx, Qy, 1) """ self.qx = qx_values self.qy = qy_values self.ref_wl_vec = ref_wavelengths self.intensity_vec = intensities self.error_vec = errors def reshape_q_wavelength_matrix(i_of_q): """Reshape I(Q) into a mesh grid of (Q, wavelength) and limit Q into q_min and q_max Loading Loading @@ -404,7 +431,145 @@ def determine_reference_wavelength_q1d_mesh( return ReferenceWavelengths(q_vec, min_wl_vec, min_intensity_vec, min_error_vec) def determine_reference_wavelength_q2d(i_of_q): def copy_array_and_mask(array, mask): """Make a copy of a Numpy Array but replace items hidden by the mask np.null Parameters ---------- array: numpy.ndarray ... mask: numpy.ndarray Returns ------- numpy.ndarray Masked copy of the input array """ arrayCopy = array.copy() arrayCopy[~mask] = np.nan return arrayCopy def determine_common_mod_q2d_range_mesh(qx, qy, wavelength, intensity_array): """Determine mask that represents the common intensities between wavelengths Parameters ---------- qx: numpy.ndarray ... qy: numpy.ndarray ... wavelength: numpy.ndarray ... intensity_array: numpy.ndarray Returns ------- numpy.ndarray Mask for common intensities across wavelengths """ intensity_3d = intensity_array.transpose().reshape((len(np.unique(wavelength)), qx.shape[0], qy.shape[0])) # assume all positions exist in each intensity matrix mask = np.full((intensity_3d.shape[1], intensity_3d.shape[2]), True) # filter out positions by logical and'ing the mask with the non-nan mask of each wl for intensity_set in intensity_3d: subMask = ~np.isnan(intensity_set) mask = np.logical_and(mask, subMask) mask = mask.transpose() return mask def calculate_K_2d(i_of_q): """Calculates K, K Error, P, and S for a given 2D IQazimuthal Parameters ---------- i_of_q: ~drtsans.dataobjects.IQazimuthal I(qx, qy, wavelength) Returns ------- tuple K vector, K error vector, P vector, S vector """ # Calculate P(wl), S(wl) p_vec = np.empty(len(np.unique(i_of_q.wavelength))) s_vec = np.zeros_like(p_vec) k_error2_vec = np.zeros_like(p_vec) # will qx ever differ from qy in size/nan ? mask = determine_common_mod_q2d_range_mesh(i_of_q.qx, i_of_q.qy, i_of_q.wavelength, i_of_q.intensity) ref_wavelength_vec = determine_reference_wavelength_q2d(i_of_q, mask) # reshape vectors from 2D to 3D of shape (wavelength, qx_unique.len, qy_unique.len) # initial shape of vectors is assumed to be 2D with each additonal wavelength values concated on the end # e.g. wavelength1.intensity = [1,1] wavelength2.intensity = [.2,.2] # [1,1] [.2,.2] # i_of_q.intensity = [1,1,.2,.2] # [1,1,.2,.2] # # intensity_3d = [ # [[1,1], # [1,1]], # [[.2,.2], # [.2,.2]] # ] num_wl = len(np.unique(i_of_q.wavelength)) intensity_3d = i_of_q.intensity.transpose().reshape((num_wl, i_of_q.qx.shape[0], i_of_q.qy.shape[0])) error_3d = i_of_q.error.transpose().reshape((num_wl, i_of_q.qx.shape[0], i_of_q.qy.shape[0])) for i_wl, lambda_i in enumerate(np.unique(i_of_q.wavelength)): # mask vectors to only be values from the intersection of non nan intensity values currentIntensity = copy_array_and_mask(intensity_3d[i_wl].transpose(), mask) currentError = copy_array_and_mask(error_3d[i_wl].transpose(), mask) refIntensity = copy_array_and_mask(ref_wavelength_vec.intensity_vec, mask) refIntensity = refIntensity.transpose() refError = copy_array_and_mask(ref_wavelength_vec.error_vec, mask) refError = refError.transpose() # P(wl) = sum_q I(qx, qy, ref_wl) * I(qx, qy, wl) p_value = np.nansum( refIntensity * currentIntensity ) # S(wl) = sum_q I(qx, qy, wl)**2 s_value = np.nansum(currentIntensity ** 2) # assign p_vec[i_wl] = p_value s_vec[i_wl] = s_value # now calculate error term0 = currentError ** 2 term1 = (( refIntensity * s_value - 2.0 * currentIntensity * p_value ) / (s_value ** 2)) ** 2 term2 = refError ** 2 term3 = (currentIntensity / s_value) ** 2 k_error2_vec[i_wl] = np.nansum((term0 * term1) + (term2 * term3)) # Calculate K k_vec = p_vec / s_vec return k_vec, k_error2_vec, p_vec, s_vec def determine_reference_wavelength_q2d(i_of_q, mask): """Determine the reference wavelength for each Q. The reference wavelength of a specific Q or (qx, qy) Loading @@ -415,40 +580,35 @@ def determine_reference_wavelength_q2d(i_of_q): ---------- i_of_q: ~drtsans.dataobjects.IQazimuthal I(qx, qy, wavelength) ... mask: numpy.ndarray Returns ------- numpy.ndarray 3D array of (qx, qy, wavelength) ReferenceWavelengths2D Reference wavelengths for each momentum transfer Q and the corresponding intensity and error """ # Construct nd array for qx, qy, I and wavelength combo_matrix = np.array([i_of_q.qx, i_of_q.qy, i_of_q.intensity, i_of_q.wavelength]) combo_matrix = combo_matrix.transpose() # good for filter and slicing # Create a list of unique Q q2d_vec = np.array([i_of_q.qx, i_of_q.qy]) q2d_vec.transpose() unique_q_vec = np.unique(q2d_vec, axis=0) # Init return vector ref_wavelength_vec = np.ndarray(shape=(unique_q_vec.shape[0], 3), dtype="float") # Remove all the I(q, wl) with intensity as nan or infinity combo_matrix = combo_matrix[np.isfinite(combo_matrix)] # For each Q, search wavelength min for index, q_value in unique_q_vec: # filter out the items with desired Q value combo_filter_qx = combo_matrix[combo_matrix[:, 0] == q_value[0]] combo_filter_qy = combo_filter_qx[combo_filter_qx[:, 1] == q_value[1]] # find minimum wavelength and add to output array min_wl = np.min(combo_filter_qy[:, 2]) # set value ref_wavelength_vec[index][0] = q_value[0] ref_wavelength_vec[index][1] = q_value[1] ref_wavelength_vec[index][2] = min_wl return ref_wavelength_vec # Construct 3D versions to easily index by wavelength num_wl = len(np.unique(i_of_q.wavelength)) intensity_3d = i_of_q.intensity.transpose().reshape((num_wl, i_of_q.qx.shape[0], i_of_q.qy.shape[0])) error_3d = i_of_q.error.transpose().reshape((num_wl, i_of_q.qx.shape[0], i_of_q.qy.shape[0])) # determine index of shortest wavelength min_wl = np.min(i_of_q.wavelength) min_wl_index = np.where(np.unique(i_of_q.wavelength) == min_wl)[0] # pull out properties of min wavelength to store in ReferenceWavelengths2D # 2d vector containing intensities for qx and qy min_intensity_vec = intensity_3d[min_wl_index].transpose().copy() # 2d vector containing intensity errors for qx and qy min_error_vec = error_3d[min_wl_index].transpose().copy() # 2d vector containing qx values min_qx = i_of_q.qx[min_wl_index] # 2d vector containing qy values min_qy = i_of_q.qy[min_wl_index] return ReferenceWavelengths2D(min_qx, min_qy, min_wl, min_intensity_vec, min_error_vec) def normalize_intensity_q1d( Loading Loading @@ -496,8 +656,8 @@ def normalize_intensity_q1d( """ # Sanity check assert wl_vec.shape[0] == intensity_array.shape[1] assert q_vec.shape[0] == error_array.shape[0] assert wl_vec.shape[0] == intensity_array.shape[1] # wavelength as lambda assert q_vec.shape[0] == error_array.shape[0] # points as q assert intensity_array.shape == error_array.shape # Normalized intensities Loading Loading @@ -528,7 +688,7 @@ def normalize_intensity_q1d( # y_matrix[i, :] corresponds to a single q_i/r_i # y_matrix[:, j] corresponds to a single q_j/r_j # Term 2 # Term 1 # t2 += [delta I(q', wl)]**2 * Y(q, q'', wl)**2 / S(lw)**4 t2sum_vec = ( error_array[qmin_index:i_qmax, i_wl] ** 2 Loading @@ -544,9 +704,9 @@ def normalize_intensity_q1d( / s_vec[i_wl] ** 2 ) # term 1 # outside of qmin and qmax: t1 = [delta I(q, wl)]**2 * [P(wl) / S(wl)]**2 t1sum_vec = (error_array[:, i_wl] * p_vec[i_wl] / s_vec[i_wl]) ** 2 # term 2 t1sum_vec[qmin_index : qmax_index + 1] = ( error_array[qmin_index : qmax_index + 1, i_wl] ** 2 * ( Loading @@ -562,3 +722,129 @@ def normalize_intensity_q1d( ) return normalized_intensity_array, np.sqrt(normalized_error2_array) def einsum_multiply(A, B): return np.einsum('ij,kl', A, B) def normalize_intensity_q2d( wl_vec, qx, qy, intensity_array, error_array, ref_wl_ints_errs, k_vec, p_vec, s_vec, mask ): """Normalize Q1D intensities and errors Parameters ---------- wl_vec: ~numpy.ndarray 1D vector of wavelength (in ascending order) qx: ~numpy.ndarray 2D vector of Q (in ascending order) qy: ~numpy.ndarray 2D vector of Q (in ascending order) intensity_array: ~numpy.ndarray 2D array of intensities, shape[0] = number of Q, shape[1] = number of wavelength error_array: ~numpy.ndarray 2D array of errors, shape[0] = number of Q, shape[1] = number of wavelength ref_wl_ints_errs: ReferenceWavelengths instance of ReferenceWavelengths containing intensities and errors k_vec: ~numpy.ndarray calculated K vector p_vec: ~numpy.ndarray calculated P vector s_vec: ~numpy.ndarray calculated S vector mask: ~numpy.ndarray boolean array that represents common intensities across wavelengths Returns ------- tuple normalized I(Q2D), normalized error(Q2D) """ # Sanity check assert wl_vec.shape[0] == intensity_array.shape[0] # wavelength as lambda assert wl_vec.shape[1] == intensity_array.shape[1] assert qx.shape[0] == error_array.shape[0] # points as q assert qx.shape[1] == error_array.shape[1] assert intensity_array.shape == error_array.shape # Reshape vectors to be easily indexed by wavelength # Refer to calculate_K_2d() for an input datashape outline sizeZ = len(np.unique(wl_vec)) sizeX = qx.shape[0] sizeY = qy.shape[0] intensity_3d = intensity_array.transpose().reshape((sizeZ, sizeX, sizeY)) error_3d = error_array.transpose().reshape((sizeZ, sizeX, sizeY)) # Init Normalized intensities normalized_intensity_array = intensity_3d * k_vec.reshape((3, 1, 1)) normalized_error2_array = np.zeros_like(error_3d) # Reshape ri_vec = ref_wl_ints_errs.intensity_vec.transpose().reshape((sizeX, sizeY)) re_vec = ref_wl_ints_errs.error_vec.transpose().reshape((sizeX, sizeY)) for i_wl in range(0, sizeZ): # Collect current wavelength's properties and mask for intersetion across all wavelengths intensity_vec = intensity_3d[i_wl] error_vec = error_3d[i_wl] i_mn = intensity_vec i_kl = intensity_vec.transpose() i8_mn = error_vec i8_kl = error_vec.transpose() iref_mn = ri_vec iref_kl = ri_vec.transpose() iref8_mn = re_vec # Calculate Y: Y_mn = (I_kl * R_mn * S) - (I_kl * P * 2 * I_mn) # you would have k,l grid of m,n grids y_mn = np.nan_to_num((einsum_multiply(i_kl, iref_mn) * s_vec[i_wl])) y_mn -= np.nan_to_num((einsum_multiply(i_kl, i_mn) * p_vec[i_wl] * 2)) # what if m,n = 0,0 and k,l = 1,1 y_kl = np.nan_to_num((i_kl * iref_kl * s_vec[i_wl])) y_kl -= np.nan_to_num((i_kl * p_vec[i_wl] * 2 * i_kl)) # Term 1 for each kl, so it would only be one member of y_mn t1sum_vec = np.einsum('klmn->kl',np.nan_to_num( (y_mn ** 2) * (i8_mn ** 2) / (s_vec[i_wl] ** 4)) ) # Term 2 # outside of kl t2sum_vec = (i8_kl ** 2) * (p_vec[i_wl] / s_vec[i_wl]) ** 2 # inside kl kl_in_mn_mask = ~np.isnan(i_mn) t2sum_vec[kl_in_mn_mask] = ( (i8_kl ** 2) * ( ((p_vec[i_wl] ** 2) * (s_vec[i_wl] ** 2)) + (2 * p_vec[i_wl] * s_vec[i_wl] * y_kl) ) / (s_vec[i_wl] ** 4) )[kl_in_mn_mask] # Term 3 # sum over m,n you can pull out kl terms t3sum_vec = np.nansum( (iref8_mn ** 2) * (((i_kl * i_mn) / s_vec[i_wl]) ** 2) ) # sum up normalized_error2_array[i_wl] += (t1sum_vec + t2sum_vec + t3sum_vec) return normalized_intensity_array, np.sqrt(normalized_error2_array) tests/unit/new/drtsans/tof/eqsans/test_elastic_reference_normalization_2d.py 0 → 100644 +138 −0 Original line number Diff line number Diff line import pytest from drtsans.dataobjects import IQazimuthal from drtsans.tof.eqsans.elastic_reference_normalization import ( calculate_K_2d, determine_common_mod_q2d_range_mesh, determine_reference_wavelength_q2d, normalize_intensity_q2d ) import numpy as np def create_testing_iq2d(): """Create a test data I(Q, wavelength) as the attached EXCEL spreadsheet attached in gitlab story SANS 834 https://code.ornl.gov/sns-hfir-scse/sans/sans-backend/uploads/d268b5ddc440becf9c677e5e0e69e9b8/test_elastic_1_.xlsx Returns ------- """ # Intensity vector intensity_vec_3 = np.ones((11, 11), dtype=np.float64) intensity_vec_3[3:8, 3:8] = np.nan intensity_vec_4 = np.full((11, 11), np.nan, dtype=np.float64) intensity_vec_4[1:10, 1:10] = 1.2 intensity_vec_4[4:7, 4:7] = np.nan intensity_vec_5 = np.full((11, 11), np.nan, dtype=np.float64) intensity_vec_5[2:9, 2:9] = 0.9 intensity_vec_5[5, 5] = np.nan intensity_vec = intensity_vec_3 intensity_vec = np.concatenate((intensity_vec, intensity_vec_4), axis=1) intensity_vec = np.concatenate((intensity_vec, intensity_vec_5), axis=1) # Error error_vec_3 = np.full((11, 11), 0.02, dtype=np.float64) error_vec_3[3:8, 3:8] = np.nan error_vec_4 = np.full((11, 11), np.nan, dtype=np.float64) error_vec_4[1:10, 1:10] = 0.03 error_vec_4[4:7, 4:7] = np.nan error_vec_5 = np.full((11, 11), np.nan, dtype=np.float64) error_vec_5[2:9, 2:9] = 0.02 error_vec_5[5, 5] = np.nan error_vec = error_vec_3 error_vec = np.concatenate((error_vec, error_vec_4), axis=1) error_vec = np.concatenate((error_vec, error_vec_5), axis=1) # Q vector vec_q = np.linspace(-0.1, 0.1, num=11, endpoint=True, dtype=np.float64) qx_matrix, qy_matrix = np.meshgrid(vec_q, vec_q, indexing='ij') qx_vec = qx_matrix qx_vec = np.concatenate((qx_vec, qx_matrix), axis=1) qx_vec = np.concatenate((qx_vec, qx_matrix), axis=1) qy_vec = qx_matrix qy_vec = np.concatenate((qy_vec, qy_matrix), axis=1) qy_vec = np.concatenate((qy_vec, qy_matrix), axis=1) # Wavelength vector wavelength_vec = np.full((11, 11), 3., dtype=np.float64) wavelength_vec = np.concatenate((wavelength_vec, np.full((11, 11), 4., dtype=np.float64)), axis=1) wavelength_vec = np.concatenate((wavelength_vec, np.full((11, 11), 5., dtype=np.float64)), axis=1) # Construct IQmod i_of_q = IQazimuthal(intensity=intensity_vec, error=error_vec, qx=qx_vec, qy=qy_vec, wavelength=wavelength_vec) expected_output = np.ones((11, 11), dtype=np.float64) expected_output[5, 5] = np.nan expected_output_before_renormalization = np.ones((11, 11), dtype=np.float64) expected_output_before_renormalization[1:10, 1:10] = 1.1 expected_output_before_renormalization[2:9, 2:9] = 31. / 30. expected_output_before_renormalization[3:8, 3:8] = 1.05 expected_output_before_renormalization[4:7, 4:7] = 0.9 expected_output_before_renormalization[5, 5] = np.nan expected_normalized_error_vec_3 = np.full((11, 11), 0.022360679774998, dtype=np.float64) expected_normalized_error_vec_3[3:8, 3:8] = np.nan expected_normalized_error_vec_4 = np.full((11, 11), np.nan, dtype=np.float64) expected_normalized_error_vec_4[1:10, 1:10] = 0.027239931881135 expected_normalized_error_vec_4[3:8, 3:8] = 0.026266529187458 expected_normalized_error_vec_4[4:7, 4:7] = np.nan expected_normalized_error_vec_5 = np.full((11, 11), np.nan, dtype=np.float64) expected_normalized_error_vec_5[2:9, 2:9] = 0.023921166824012 expected_normalized_error_vec_5[3:8, 3:8] = 0.023044955171311 expected_normalized_error_vec_5[5, 5] = np.nan expected_normalized_error_vec = expected_normalized_error_vec_3 expected_normalized_error_vec = np.concatenate( (expected_normalized_error_vec, expected_normalized_error_vec_4), axis=1 ) expected_normalized_error_vec = np.concatenate( (expected_normalized_error_vec, expected_normalized_error_vec_5), axis=1 ) return i_of_q, expected_output, expected_output_before_renormalization, expected_normalized_error_vec def test_verify_q2d_functions(): np.set_printoptions(edgeitems=30, linewidth=100000) i_of_q, expected_output, expected_output_before_renormalization, expected_normalized_error_vec = create_testing_iq2d() # noqa E501 # print(i_of_q) k_vec, k_error2_vec, p_vec, s_vec = calculate_K_2d(i_of_q) np.testing.assert_allclose(k_vec, [1, 0.833333333333333, 1.11111111111111], rtol=1e-5, atol=0) np.testing.assert_allclose(p_vec, [24,28.8,21.6], rtol=1e-5, atol=0) np.testing.assert_allclose(s_vec, [24,34.56,19.44], rtol=1e-5, atol=0) np.testing.assert_allclose(k_error2_vec, [3.33333E-05, 2.96586E-05, 4.59788E-05], rtol=1e-5, atol=0) mask = determine_common_mod_q2d_range_mesh(i_of_q.qx, i_of_q.qy, i_of_q.wavelength, i_of_q.intensity) ref_wavelength_vec = determine_reference_wavelength_q2d(i_of_q, mask) normalized_intensity_array, normalized_error2_array = normalize_intensity_q2d( i_of_q.wavelength, i_of_q.qx, i_of_q.qy, i_of_q.intensity, i_of_q.error, ref_wavelength_vec, k_vec, p_vec, s_vec, mask ) ma = np.ma.MaskedArray(normalized_intensity_array.transpose(), mask=np.isnan(normalized_intensity_array)) averaged_normalized_intensity_array = np.ma.average(ma,axis=2) np.testing.assert_allclose(averaged_normalized_intensity_array, expected_output, rtol=1e-5, atol=0) np.testing.assert_allclose(normalized_error2_array, expected_normalized_error_vec.transpose().reshape((3, 11, 11)), rtol=1e-5, atol=0) # noqa E501 if __name__ == '__main__': pytest.main([__file__]) Loading
drtsans/tof/eqsans/elastic_reference_normalization.py +320 −34 Original line number Diff line number Diff line Loading @@ -47,6 +47,33 @@ class ReferenceWavelengths: self.error_vec = errors class ReferenceWavelengths2D: """ Class for keeping track of reference wavelength for each momentum transfer Q (1D) """ def __init__(self, qx_values, qy_values, ref_wavelengths, intensities, errors): """Initialization Parameters ---------- qx_values: ~numpy.ndarray (1, qx_values_size * num_wavelengths) 2D vector for Qx qy_values: ~numpy.ndarray (1, qy_values_size * num_wavelengths) 2D vector for Qy ref_wavelength: int wavelength value of ref wavelength intensities: ~numpy.ndarray (qx_values_size, qy_values_size, 1) 3D vector for intensities of (Qx, Qy, 1) errors: ~numpy.ndarray 3D vector for errors of (Qx, Qy, 1) """ self.qx = qx_values self.qy = qy_values self.ref_wl_vec = ref_wavelengths self.intensity_vec = intensities self.error_vec = errors def reshape_q_wavelength_matrix(i_of_q): """Reshape I(Q) into a mesh grid of (Q, wavelength) and limit Q into q_min and q_max Loading Loading @@ -404,7 +431,145 @@ def determine_reference_wavelength_q1d_mesh( return ReferenceWavelengths(q_vec, min_wl_vec, min_intensity_vec, min_error_vec) def determine_reference_wavelength_q2d(i_of_q): def copy_array_and_mask(array, mask): """Make a copy of a Numpy Array but replace items hidden by the mask np.null Parameters ---------- array: numpy.ndarray ... mask: numpy.ndarray Returns ------- numpy.ndarray Masked copy of the input array """ arrayCopy = array.copy() arrayCopy[~mask] = np.nan return arrayCopy def determine_common_mod_q2d_range_mesh(qx, qy, wavelength, intensity_array): """Determine mask that represents the common intensities between wavelengths Parameters ---------- qx: numpy.ndarray ... qy: numpy.ndarray ... wavelength: numpy.ndarray ... intensity_array: numpy.ndarray Returns ------- numpy.ndarray Mask for common intensities across wavelengths """ intensity_3d = intensity_array.transpose().reshape((len(np.unique(wavelength)), qx.shape[0], qy.shape[0])) # assume all positions exist in each intensity matrix mask = np.full((intensity_3d.shape[1], intensity_3d.shape[2]), True) # filter out positions by logical and'ing the mask with the non-nan mask of each wl for intensity_set in intensity_3d: subMask = ~np.isnan(intensity_set) mask = np.logical_and(mask, subMask) mask = mask.transpose() return mask def calculate_K_2d(i_of_q): """Calculates K, K Error, P, and S for a given 2D IQazimuthal Parameters ---------- i_of_q: ~drtsans.dataobjects.IQazimuthal I(qx, qy, wavelength) Returns ------- tuple K vector, K error vector, P vector, S vector """ # Calculate P(wl), S(wl) p_vec = np.empty(len(np.unique(i_of_q.wavelength))) s_vec = np.zeros_like(p_vec) k_error2_vec = np.zeros_like(p_vec) # will qx ever differ from qy in size/nan ? mask = determine_common_mod_q2d_range_mesh(i_of_q.qx, i_of_q.qy, i_of_q.wavelength, i_of_q.intensity) ref_wavelength_vec = determine_reference_wavelength_q2d(i_of_q, mask) # reshape vectors from 2D to 3D of shape (wavelength, qx_unique.len, qy_unique.len) # initial shape of vectors is assumed to be 2D with each additonal wavelength values concated on the end # e.g. wavelength1.intensity = [1,1] wavelength2.intensity = [.2,.2] # [1,1] [.2,.2] # i_of_q.intensity = [1,1,.2,.2] # [1,1,.2,.2] # # intensity_3d = [ # [[1,1], # [1,1]], # [[.2,.2], # [.2,.2]] # ] num_wl = len(np.unique(i_of_q.wavelength)) intensity_3d = i_of_q.intensity.transpose().reshape((num_wl, i_of_q.qx.shape[0], i_of_q.qy.shape[0])) error_3d = i_of_q.error.transpose().reshape((num_wl, i_of_q.qx.shape[0], i_of_q.qy.shape[0])) for i_wl, lambda_i in enumerate(np.unique(i_of_q.wavelength)): # mask vectors to only be values from the intersection of non nan intensity values currentIntensity = copy_array_and_mask(intensity_3d[i_wl].transpose(), mask) currentError = copy_array_and_mask(error_3d[i_wl].transpose(), mask) refIntensity = copy_array_and_mask(ref_wavelength_vec.intensity_vec, mask) refIntensity = refIntensity.transpose() refError = copy_array_and_mask(ref_wavelength_vec.error_vec, mask) refError = refError.transpose() # P(wl) = sum_q I(qx, qy, ref_wl) * I(qx, qy, wl) p_value = np.nansum( refIntensity * currentIntensity ) # S(wl) = sum_q I(qx, qy, wl)**2 s_value = np.nansum(currentIntensity ** 2) # assign p_vec[i_wl] = p_value s_vec[i_wl] = s_value # now calculate error term0 = currentError ** 2 term1 = (( refIntensity * s_value - 2.0 * currentIntensity * p_value ) / (s_value ** 2)) ** 2 term2 = refError ** 2 term3 = (currentIntensity / s_value) ** 2 k_error2_vec[i_wl] = np.nansum((term0 * term1) + (term2 * term3)) # Calculate K k_vec = p_vec / s_vec return k_vec, k_error2_vec, p_vec, s_vec def determine_reference_wavelength_q2d(i_of_q, mask): """Determine the reference wavelength for each Q. The reference wavelength of a specific Q or (qx, qy) Loading @@ -415,40 +580,35 @@ def determine_reference_wavelength_q2d(i_of_q): ---------- i_of_q: ~drtsans.dataobjects.IQazimuthal I(qx, qy, wavelength) ... mask: numpy.ndarray Returns ------- numpy.ndarray 3D array of (qx, qy, wavelength) ReferenceWavelengths2D Reference wavelengths for each momentum transfer Q and the corresponding intensity and error """ # Construct nd array for qx, qy, I and wavelength combo_matrix = np.array([i_of_q.qx, i_of_q.qy, i_of_q.intensity, i_of_q.wavelength]) combo_matrix = combo_matrix.transpose() # good for filter and slicing # Create a list of unique Q q2d_vec = np.array([i_of_q.qx, i_of_q.qy]) q2d_vec.transpose() unique_q_vec = np.unique(q2d_vec, axis=0) # Init return vector ref_wavelength_vec = np.ndarray(shape=(unique_q_vec.shape[0], 3), dtype="float") # Remove all the I(q, wl) with intensity as nan or infinity combo_matrix = combo_matrix[np.isfinite(combo_matrix)] # For each Q, search wavelength min for index, q_value in unique_q_vec: # filter out the items with desired Q value combo_filter_qx = combo_matrix[combo_matrix[:, 0] == q_value[0]] combo_filter_qy = combo_filter_qx[combo_filter_qx[:, 1] == q_value[1]] # find minimum wavelength and add to output array min_wl = np.min(combo_filter_qy[:, 2]) # set value ref_wavelength_vec[index][0] = q_value[0] ref_wavelength_vec[index][1] = q_value[1] ref_wavelength_vec[index][2] = min_wl return ref_wavelength_vec # Construct 3D versions to easily index by wavelength num_wl = len(np.unique(i_of_q.wavelength)) intensity_3d = i_of_q.intensity.transpose().reshape((num_wl, i_of_q.qx.shape[0], i_of_q.qy.shape[0])) error_3d = i_of_q.error.transpose().reshape((num_wl, i_of_q.qx.shape[0], i_of_q.qy.shape[0])) # determine index of shortest wavelength min_wl = np.min(i_of_q.wavelength) min_wl_index = np.where(np.unique(i_of_q.wavelength) == min_wl)[0] # pull out properties of min wavelength to store in ReferenceWavelengths2D # 2d vector containing intensities for qx and qy min_intensity_vec = intensity_3d[min_wl_index].transpose().copy() # 2d vector containing intensity errors for qx and qy min_error_vec = error_3d[min_wl_index].transpose().copy() # 2d vector containing qx values min_qx = i_of_q.qx[min_wl_index] # 2d vector containing qy values min_qy = i_of_q.qy[min_wl_index] return ReferenceWavelengths2D(min_qx, min_qy, min_wl, min_intensity_vec, min_error_vec) def normalize_intensity_q1d( Loading Loading @@ -496,8 +656,8 @@ def normalize_intensity_q1d( """ # Sanity check assert wl_vec.shape[0] == intensity_array.shape[1] assert q_vec.shape[0] == error_array.shape[0] assert wl_vec.shape[0] == intensity_array.shape[1] # wavelength as lambda assert q_vec.shape[0] == error_array.shape[0] # points as q assert intensity_array.shape == error_array.shape # Normalized intensities Loading Loading @@ -528,7 +688,7 @@ def normalize_intensity_q1d( # y_matrix[i, :] corresponds to a single q_i/r_i # y_matrix[:, j] corresponds to a single q_j/r_j # Term 2 # Term 1 # t2 += [delta I(q', wl)]**2 * Y(q, q'', wl)**2 / S(lw)**4 t2sum_vec = ( error_array[qmin_index:i_qmax, i_wl] ** 2 Loading @@ -544,9 +704,9 @@ def normalize_intensity_q1d( / s_vec[i_wl] ** 2 ) # term 1 # outside of qmin and qmax: t1 = [delta I(q, wl)]**2 * [P(wl) / S(wl)]**2 t1sum_vec = (error_array[:, i_wl] * p_vec[i_wl] / s_vec[i_wl]) ** 2 # term 2 t1sum_vec[qmin_index : qmax_index + 1] = ( error_array[qmin_index : qmax_index + 1, i_wl] ** 2 * ( Loading @@ -562,3 +722,129 @@ def normalize_intensity_q1d( ) return normalized_intensity_array, np.sqrt(normalized_error2_array) def einsum_multiply(A, B): return np.einsum('ij,kl', A, B) def normalize_intensity_q2d( wl_vec, qx, qy, intensity_array, error_array, ref_wl_ints_errs, k_vec, p_vec, s_vec, mask ): """Normalize Q1D intensities and errors Parameters ---------- wl_vec: ~numpy.ndarray 1D vector of wavelength (in ascending order) qx: ~numpy.ndarray 2D vector of Q (in ascending order) qy: ~numpy.ndarray 2D vector of Q (in ascending order) intensity_array: ~numpy.ndarray 2D array of intensities, shape[0] = number of Q, shape[1] = number of wavelength error_array: ~numpy.ndarray 2D array of errors, shape[0] = number of Q, shape[1] = number of wavelength ref_wl_ints_errs: ReferenceWavelengths instance of ReferenceWavelengths containing intensities and errors k_vec: ~numpy.ndarray calculated K vector p_vec: ~numpy.ndarray calculated P vector s_vec: ~numpy.ndarray calculated S vector mask: ~numpy.ndarray boolean array that represents common intensities across wavelengths Returns ------- tuple normalized I(Q2D), normalized error(Q2D) """ # Sanity check assert wl_vec.shape[0] == intensity_array.shape[0] # wavelength as lambda assert wl_vec.shape[1] == intensity_array.shape[1] assert qx.shape[0] == error_array.shape[0] # points as q assert qx.shape[1] == error_array.shape[1] assert intensity_array.shape == error_array.shape # Reshape vectors to be easily indexed by wavelength # Refer to calculate_K_2d() for an input datashape outline sizeZ = len(np.unique(wl_vec)) sizeX = qx.shape[0] sizeY = qy.shape[0] intensity_3d = intensity_array.transpose().reshape((sizeZ, sizeX, sizeY)) error_3d = error_array.transpose().reshape((sizeZ, sizeX, sizeY)) # Init Normalized intensities normalized_intensity_array = intensity_3d * k_vec.reshape((3, 1, 1)) normalized_error2_array = np.zeros_like(error_3d) # Reshape ri_vec = ref_wl_ints_errs.intensity_vec.transpose().reshape((sizeX, sizeY)) re_vec = ref_wl_ints_errs.error_vec.transpose().reshape((sizeX, sizeY)) for i_wl in range(0, sizeZ): # Collect current wavelength's properties and mask for intersetion across all wavelengths intensity_vec = intensity_3d[i_wl] error_vec = error_3d[i_wl] i_mn = intensity_vec i_kl = intensity_vec.transpose() i8_mn = error_vec i8_kl = error_vec.transpose() iref_mn = ri_vec iref_kl = ri_vec.transpose() iref8_mn = re_vec # Calculate Y: Y_mn = (I_kl * R_mn * S) - (I_kl * P * 2 * I_mn) # you would have k,l grid of m,n grids y_mn = np.nan_to_num((einsum_multiply(i_kl, iref_mn) * s_vec[i_wl])) y_mn -= np.nan_to_num((einsum_multiply(i_kl, i_mn) * p_vec[i_wl] * 2)) # what if m,n = 0,0 and k,l = 1,1 y_kl = np.nan_to_num((i_kl * iref_kl * s_vec[i_wl])) y_kl -= np.nan_to_num((i_kl * p_vec[i_wl] * 2 * i_kl)) # Term 1 for each kl, so it would only be one member of y_mn t1sum_vec = np.einsum('klmn->kl',np.nan_to_num( (y_mn ** 2) * (i8_mn ** 2) / (s_vec[i_wl] ** 4)) ) # Term 2 # outside of kl t2sum_vec = (i8_kl ** 2) * (p_vec[i_wl] / s_vec[i_wl]) ** 2 # inside kl kl_in_mn_mask = ~np.isnan(i_mn) t2sum_vec[kl_in_mn_mask] = ( (i8_kl ** 2) * ( ((p_vec[i_wl] ** 2) * (s_vec[i_wl] ** 2)) + (2 * p_vec[i_wl] * s_vec[i_wl] * y_kl) ) / (s_vec[i_wl] ** 4) )[kl_in_mn_mask] # Term 3 # sum over m,n you can pull out kl terms t3sum_vec = np.nansum( (iref8_mn ** 2) * (((i_kl * i_mn) / s_vec[i_wl]) ** 2) ) # sum up normalized_error2_array[i_wl] += (t1sum_vec + t2sum_vec + t3sum_vec) return normalized_intensity_array, np.sqrt(normalized_error2_array)
tests/unit/new/drtsans/tof/eqsans/test_elastic_reference_normalization_2d.py 0 → 100644 +138 −0 Original line number Diff line number Diff line import pytest from drtsans.dataobjects import IQazimuthal from drtsans.tof.eqsans.elastic_reference_normalization import ( calculate_K_2d, determine_common_mod_q2d_range_mesh, determine_reference_wavelength_q2d, normalize_intensity_q2d ) import numpy as np def create_testing_iq2d(): """Create a test data I(Q, wavelength) as the attached EXCEL spreadsheet attached in gitlab story SANS 834 https://code.ornl.gov/sns-hfir-scse/sans/sans-backend/uploads/d268b5ddc440becf9c677e5e0e69e9b8/test_elastic_1_.xlsx Returns ------- """ # Intensity vector intensity_vec_3 = np.ones((11, 11), dtype=np.float64) intensity_vec_3[3:8, 3:8] = np.nan intensity_vec_4 = np.full((11, 11), np.nan, dtype=np.float64) intensity_vec_4[1:10, 1:10] = 1.2 intensity_vec_4[4:7, 4:7] = np.nan intensity_vec_5 = np.full((11, 11), np.nan, dtype=np.float64) intensity_vec_5[2:9, 2:9] = 0.9 intensity_vec_5[5, 5] = np.nan intensity_vec = intensity_vec_3 intensity_vec = np.concatenate((intensity_vec, intensity_vec_4), axis=1) intensity_vec = np.concatenate((intensity_vec, intensity_vec_5), axis=1) # Error error_vec_3 = np.full((11, 11), 0.02, dtype=np.float64) error_vec_3[3:8, 3:8] = np.nan error_vec_4 = np.full((11, 11), np.nan, dtype=np.float64) error_vec_4[1:10, 1:10] = 0.03 error_vec_4[4:7, 4:7] = np.nan error_vec_5 = np.full((11, 11), np.nan, dtype=np.float64) error_vec_5[2:9, 2:9] = 0.02 error_vec_5[5, 5] = np.nan error_vec = error_vec_3 error_vec = np.concatenate((error_vec, error_vec_4), axis=1) error_vec = np.concatenate((error_vec, error_vec_5), axis=1) # Q vector vec_q = np.linspace(-0.1, 0.1, num=11, endpoint=True, dtype=np.float64) qx_matrix, qy_matrix = np.meshgrid(vec_q, vec_q, indexing='ij') qx_vec = qx_matrix qx_vec = np.concatenate((qx_vec, qx_matrix), axis=1) qx_vec = np.concatenate((qx_vec, qx_matrix), axis=1) qy_vec = qx_matrix qy_vec = np.concatenate((qy_vec, qy_matrix), axis=1) qy_vec = np.concatenate((qy_vec, qy_matrix), axis=1) # Wavelength vector wavelength_vec = np.full((11, 11), 3., dtype=np.float64) wavelength_vec = np.concatenate((wavelength_vec, np.full((11, 11), 4., dtype=np.float64)), axis=1) wavelength_vec = np.concatenate((wavelength_vec, np.full((11, 11), 5., dtype=np.float64)), axis=1) # Construct IQmod i_of_q = IQazimuthal(intensity=intensity_vec, error=error_vec, qx=qx_vec, qy=qy_vec, wavelength=wavelength_vec) expected_output = np.ones((11, 11), dtype=np.float64) expected_output[5, 5] = np.nan expected_output_before_renormalization = np.ones((11, 11), dtype=np.float64) expected_output_before_renormalization[1:10, 1:10] = 1.1 expected_output_before_renormalization[2:9, 2:9] = 31. / 30. expected_output_before_renormalization[3:8, 3:8] = 1.05 expected_output_before_renormalization[4:7, 4:7] = 0.9 expected_output_before_renormalization[5, 5] = np.nan expected_normalized_error_vec_3 = np.full((11, 11), 0.022360679774998, dtype=np.float64) expected_normalized_error_vec_3[3:8, 3:8] = np.nan expected_normalized_error_vec_4 = np.full((11, 11), np.nan, dtype=np.float64) expected_normalized_error_vec_4[1:10, 1:10] = 0.027239931881135 expected_normalized_error_vec_4[3:8, 3:8] = 0.026266529187458 expected_normalized_error_vec_4[4:7, 4:7] = np.nan expected_normalized_error_vec_5 = np.full((11, 11), np.nan, dtype=np.float64) expected_normalized_error_vec_5[2:9, 2:9] = 0.023921166824012 expected_normalized_error_vec_5[3:8, 3:8] = 0.023044955171311 expected_normalized_error_vec_5[5, 5] = np.nan expected_normalized_error_vec = expected_normalized_error_vec_3 expected_normalized_error_vec = np.concatenate( (expected_normalized_error_vec, expected_normalized_error_vec_4), axis=1 ) expected_normalized_error_vec = np.concatenate( (expected_normalized_error_vec, expected_normalized_error_vec_5), axis=1 ) return i_of_q, expected_output, expected_output_before_renormalization, expected_normalized_error_vec def test_verify_q2d_functions(): np.set_printoptions(edgeitems=30, linewidth=100000) i_of_q, expected_output, expected_output_before_renormalization, expected_normalized_error_vec = create_testing_iq2d() # noqa E501 # print(i_of_q) k_vec, k_error2_vec, p_vec, s_vec = calculate_K_2d(i_of_q) np.testing.assert_allclose(k_vec, [1, 0.833333333333333, 1.11111111111111], rtol=1e-5, atol=0) np.testing.assert_allclose(p_vec, [24,28.8,21.6], rtol=1e-5, atol=0) np.testing.assert_allclose(s_vec, [24,34.56,19.44], rtol=1e-5, atol=0) np.testing.assert_allclose(k_error2_vec, [3.33333E-05, 2.96586E-05, 4.59788E-05], rtol=1e-5, atol=0) mask = determine_common_mod_q2d_range_mesh(i_of_q.qx, i_of_q.qy, i_of_q.wavelength, i_of_q.intensity) ref_wavelength_vec = determine_reference_wavelength_q2d(i_of_q, mask) normalized_intensity_array, normalized_error2_array = normalize_intensity_q2d( i_of_q.wavelength, i_of_q.qx, i_of_q.qy, i_of_q.intensity, i_of_q.error, ref_wavelength_vec, k_vec, p_vec, s_vec, mask ) ma = np.ma.MaskedArray(normalized_intensity_array.transpose(), mask=np.isnan(normalized_intensity_array)) averaged_normalized_intensity_array = np.ma.average(ma,axis=2) np.testing.assert_allclose(averaged_normalized_intensity_array, expected_output, rtol=1e-5, atol=0) np.testing.assert_allclose(normalized_error2_array, expected_normalized_error_vec.transpose().reshape((3, 11, 11)), rtol=1e-5, atol=0) # noqa E501 if __name__ == '__main__': pytest.main([__file__])