Commit c758f0d1 authored by Somnath, Suhas's avatar Somnath, Suhas
Browse files

Updated to work with the MPI-ized Process class in pyUSID

parent c4aca462
...@@ -25,7 +25,6 @@ class SignalFilter(Process): ...@@ -25,7 +25,6 @@ class SignalFilter(Process):
write_condensed=False, num_pix=1, phase_rad=0, **kwargs): write_condensed=False, num_pix=1, phase_rad=0, **kwargs):
""" """
Filters the entire h5 dataset with the given filtering parameters. Filters the entire h5 dataset with the given filtering parameters.
Parameters Parameters
---------- ----------
h5_main : h5py.Dataset object h5_main : h5py.Dataset object
...@@ -94,7 +93,7 @@ class SignalFilter(Process): ...@@ -94,7 +93,7 @@ class SignalFilter(Process):
scaling_factor = 1 + 2 * self.write_filtered + 0.25 * self.write_condensed scaling_factor = 1 + 2 * self.write_filtered + 0.25 * self.write_condensed
self._max_pos_per_read = int(self._max_pos_per_read / scaling_factor) self._max_pos_per_read = int(self._max_pos_per_read / scaling_factor)
if self.verbose: if self.verbose and self.mpi_rank == 0:
print('Allowed to read {} pixels per chunk'.format(self._max_pos_per_read)) print('Allowed to read {} pixels per chunk'.format(self._max_pos_per_read))
self.parms_dict = dict() self.parms_dict = dict()
...@@ -119,7 +118,6 @@ class SignalFilter(Process): ...@@ -119,7 +118,6 @@ class SignalFilter(Process):
def test(self, pix_ind=None, excit_wfm=None, **kwargs): def test(self, pix_ind=None, excit_wfm=None, **kwargs):
""" """
Tests the signal filter on a single pixel (randomly chosen unless manually specified) worth of data. Tests the signal filter on a single pixel (randomly chosen unless manually specified) worth of data.
Parameters Parameters
---------- ----------
pix_ind : int, optional. default = random pix_ind : int, optional. default = random
...@@ -129,11 +127,12 @@ class SignalFilter(Process): ...@@ -129,11 +127,12 @@ class SignalFilter(Process):
length of a single pixel's data. For example, in the case of G-mode, where a single scan line is yet to be length of a single pixel's data. For example, in the case of G-mode, where a single scan line is yet to be
broken down into pixels, the excitation waveform for a single pixel can br provided to automatically broken down into pixels, the excitation waveform for a single pixel can br provided to automatically
break the raw and filtered responses also into chunks of the same size. break the raw and filtered responses also into chunks of the same size.
Returns Returns
------- -------
fig, axes fig, axes
""" """
if self.mpi_rank > 0:
return
if pix_ind is None: if pix_ind is None:
pix_ind = np.random.randint(0, high=self.h5_main.shape[0]) pix_ind = np.random.randint(0, high=self.h5_main.shape[0])
return test_filter(self.h5_main[pix_ind], frequency_filters=self.frequency_filters, excit_wfm=excit_wfm, return test_filter(self.h5_main[pix_ind], frequency_filters=self.frequency_filters, excit_wfm=excit_wfm,
...@@ -148,6 +147,7 @@ class SignalFilter(Process): ...@@ -148,6 +147,7 @@ class SignalFilter(Process):
self.h5_results_grp = create_results_group(self.h5_main, self.process_name) self.h5_results_grp = create_results_group(self.h5_main, self.process_name)
self.parms_dict.update({'last_pixel': 0, 'algorithm': 'pycroscopy_SignalFilter'}) self.parms_dict.update({'last_pixel': 0, 'algorithm': 'pycroscopy_SignalFilter'})
write_simple_attrs(self.h5_results_grp, self.parms_dict) write_simple_attrs(self.h5_results_grp, self.parms_dict)
assert isinstance(self.h5_results_grp, h5py.Group) assert isinstance(self.h5_results_grp, h5py.Group)
...@@ -156,6 +156,9 @@ class SignalFilter(Process): ...@@ -156,6 +156,9 @@ class SignalFilter(Process):
h5_comp_filt = self.h5_results_grp.create_dataset('Composite_Filter', h5_comp_filt = self.h5_results_grp.create_dataset('Composite_Filter',
data=np.float32(self.composite_filter)) data=np.float32(self.composite_filter))
if self.verbose and self.mpi_rank==0:
print('Rank {} - Finished creating the Composite_Filter dataset'.format(self.mpi_rank))
# First create the position datsets if the new indices are smaller... # First create the position datsets if the new indices are smaller...
if self.num_effective_pix != self.h5_main.shape[0]: if self.num_effective_pix != self.h5_main.shape[0]:
# TODO: Do this part correctly. See past solution: # TODO: Do this part correctly. See past solution:
...@@ -169,26 +172,36 @@ class SignalFilter(Process): ...@@ -169,26 +172,36 @@ class SignalFilter(Process):
pos_descriptor.append(Dimension(name, units, np.arange(leng))) pos_descriptor.append(Dimension(name, units, np.arange(leng)))
ds_pos_inds, ds_pos_vals = build_ind_val_dsets(pos_descriptor, is_spectral=False, verbose=self.verbose) ds_pos_inds, ds_pos_vals = build_ind_val_dsets(pos_descriptor, is_spectral=False, verbose=self.verbose)
h5_pos_vals.data = np.atleast_2d(new_pos_vals) # The data generated above varies linearly. Override. h5_pos_vals.data = np.atleast_2d(new_pos_vals) # The data generated above varies linearly. Override.
""" """
h5_pos_inds_new, h5_pos_vals_new = write_ind_val_dsets(self.h5_results_grp, h5_pos_inds_new, h5_pos_vals_new = write_ind_val_dsets(self.h5_results_grp,
Dimension('pixel', 'a.u.', self.num_effective_pix), Dimension('pixel', 'a.u.', self.num_effective_pix),
is_spectral=False, verbose=self.verbose) is_spectral=False, verbose=self.verbose and self.mpi_rank==0)
if self.verbose and self.mpi_rank==0:
print('Rank {} - Created the new position ancillary dataset'.format(self.mpi_rank))
else: else:
h5_pos_inds_new = self.h5_main.h5_pos_inds h5_pos_inds_new = self.h5_main.h5_pos_inds
h5_pos_vals_new = self.h5_main.h5_pos_vals h5_pos_vals_new = self.h5_main.h5_pos_vals
if self.verbose and self.mpi_rank==0:
print('Rank {} - Reusing source datasets position datasets'.format(self.mpi_rank))
if self.noise_threshold is not None: if self.noise_threshold is not None:
self.h5_noise_floors = write_main_dataset(self.h5_results_grp, (self.num_effective_pix, 1), 'Noise_Floors', self.h5_noise_floors = write_main_dataset(self.h5_results_grp, (self.num_effective_pix, 1), 'Noise_Floors',
'Noise', 'a.u.', None, Dimension('arb', '', [1]), 'Noise', 'a.u.', None, Dimension('arb', '', [1]),
dtype=np.float32, aux_spec_prefix='Noise_Spec_', dtype=np.float32, aux_spec_prefix='Noise_Spec_',
h5_pos_inds=h5_pos_inds_new, h5_pos_vals=h5_pos_vals_new, h5_pos_inds=h5_pos_inds_new, h5_pos_vals=h5_pos_vals_new,
verbose=self.verbose) verbose=self.verbose and self.mpi_rank==0)
if self.verbose and self.mpi_rank==0:
print('Rank {} - Finished creating the Noise_Floors dataset'.format(self.mpi_rank))
if self.write_filtered: if self.write_filtered:
# Filtered data is identical to Main_Data in every way - just a duplicate # Filtered data is identical to Main_Data in every way - just a duplicate
self.h5_filtered = create_empty_dataset(self.h5_main, self.h5_main.dtype, 'Filtered_Data', self.h5_filtered = create_empty_dataset(self.h5_main, self.h5_main.dtype, 'Filtered_Data',
h5_group=self.h5_results_grp) h5_group=self.h5_results_grp)
if self.verbose and self.mpi_rank==0:
print('Rank {} - Finished creating the Filtered dataset'.format(self.mpi_rank))
self.hot_inds = None self.hot_inds = None
...@@ -199,7 +212,13 @@ class SignalFilter(Process): ...@@ -199,7 +212,13 @@ class SignalFilter(Process):
self.h5_condensed = write_main_dataset(self.h5_results_grp, (self.num_effective_pix, len(self.hot_inds)), self.h5_condensed = write_main_dataset(self.h5_results_grp, (self.num_effective_pix, len(self.hot_inds)),
'Condensed_Data', 'Complex', 'a. u.', None, condensed_spec, 'Condensed_Data', 'Complex', 'a. u.', None, condensed_spec,
h5_pos_inds=h5_pos_inds_new, h5_pos_vals=h5_pos_vals_new, h5_pos_inds=h5_pos_inds_new, h5_pos_vals=h5_pos_vals_new,
dtype=np.complex, verbose=self.verbose) dtype=np.complex, verbose=self.verbose and self.mpi_rank==0)
if self.verbose and self.mpi_rank==0:
print('Rank {} - Finished creating the Condensed dataset'.format(self.mpi_rank))
if self.mpi_size > 1:
self.mpi_comm.Barrier()
self.h5_main.file.flush()
def _get_existing_datasets(self): def _get_existing_datasets(self):
""" """
...@@ -216,30 +235,21 @@ class SignalFilter(Process): ...@@ -216,30 +235,21 @@ class SignalFilter(Process):
""" """
Writes data chunks back to the file Writes data chunks back to the file
""" """
# Get access to the private variable:
pos_slice = slice(self._start_pos, self._end_pos) pos_in_batch = self._get_pixels_in_current_batch()
if self.write_condensed: if self.write_condensed:
self.h5_condensed[pos_slice] = self.condensed_data self.h5_condensed[pos_in_batch, :] = self.condensed_data
if self.noise_threshold is not None: if self.noise_threshold is not None:
self.h5_noise_floors[pos_slice] = np.atleast_2d(self.noise_floors) self.h5_noise_floors[pos_in_batch, :] = np.atleast_2d(self.noise_floors)
if self.write_filtered: if self.write_filtered:
self.h5_filtered[pos_slice] = self.filtered_data self.h5_filtered[pos_in_batch, :] = self.filtered_data
# Leaving in this provision that will allow restarting of processes
self.h5_results_grp.attrs['last_pixel'] = self._end_pos
self.h5_main.file.flush()
print('Finished processing upto pixel ' + str(self._end_pos) + ' of ' + str(self.h5_main.shape[0]))
# Now update the start position # Not responsible for checkpointing anymore. Process class handles this.
self._start_pos = self._end_pos
def _unit_computation(self, *args, **kwargs): def _unit_computation(self, *args, **kwargs):
""" """
Processing per chunk of the dataset Processing per chunk of the dataset
Parameters Parameters
---------- ----------
args : list args : list
...@@ -275,3 +285,4 @@ class SignalFilter(Process): ...@@ -275,3 +285,4 @@ class SignalFilter(Process):
# do np.roll on data # do np.roll on data
# self.data = np.roll(self.data, 0, axis=1) # self.data = np.roll(self.data, 0, axis=1)
pass pass
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment