Commit 438bfb62 authored by Somnath, Suhas's avatar Somnath, Suhas
Browse files

Updated to MPI-ized Process class in pyUSID

parent 95e57a70
...@@ -16,6 +16,14 @@ from pyUSID.io.hdf_utils import write_main_dataset, create_results_group, create ...@@ -16,6 +16,14 @@ from pyUSID.io.hdf_utils import write_main_dataset, create_results_group, create
from pyUSID.io.write_utils import Dimension from pyUSID.io.write_utils import Dimension
from pyUSID import USIDataset from pyUSID import USIDataset
from .utils.giv_utils import do_bayesian_inference, bayesian_inference_on_period from .utils.giv_utils import do_bayesian_inference, bayesian_inference_on_period
try:
from mpi4py import MPI
if MPI.COMM_WORLD.Get_size() == 1:
# mpi4py available but NOT called via mpirun or mpiexec => single node
MPI = None
except ImportError:
# mpi4py not even present! Single node by default:
MPI = None
cap_dtype = np.dtype({'names': ['Forward', 'Reverse'], cap_dtype = np.dtype({'names': ['Forward', 'Reverse'],
'formats': [np.float32, np.float32]}) 'formats': [np.float32, np.float32]})
...@@ -50,7 +58,7 @@ class GIVBayesian(Process): ...@@ -50,7 +58,7 @@ class GIVBayesian(Process):
self.num_x_steps = int(num_x_steps) self.num_x_steps = int(num_x_steps)
if self.num_x_steps % 4 == 0: if self.num_x_steps % 4 == 0:
self.num_x_steps = ((self.num_x_steps // 2) + 1) * 2 self.num_x_steps = ((self.num_x_steps // 2) + 1) * 2
if self.verbose: if self.verbose and self.mpi_rank == 0:
print('ensuring that half steps should be odd, num_x_steps is now', self.num_x_steps) print('ensuring that half steps should be odd, num_x_steps is now', self.num_x_steps)
self.h5_main = USIDataset(self.h5_main) self.h5_main = USIDataset(self.h5_main)
...@@ -80,6 +88,8 @@ class GIVBayesian(Process): ...@@ -80,6 +88,8 @@ class GIVBayesian(Process):
self.forward_results = None self.forward_results = None
self._bayes_parms = None self._bayes_parms = None
self.__first_batch = True
def test(self, pix_ind=None, show_plots=True): def test(self, pix_ind=None, show_plots=True):
""" """
Tests the inference on a single pixel (randomly chosen unless manually specified) worth of data. Tests the inference on a single pixel (randomly chosen unless manually specified) worth of data.
...@@ -95,6 +105,9 @@ class GIVBayesian(Process): ...@@ -95,6 +105,9 @@ class GIVBayesian(Process):
------- -------
fig, axes fig, axes
""" """
if self.mpi_rank > 0:
return
if pix_ind is None: if pix_ind is None:
pix_ind = np.random.randint(0, high=self.h5_main.shape[0]) pix_ind = np.random.randint(0, high=self.h5_main.shape[0])
other_params = self.parms_dict.copy() other_params = self.parms_dict.copy()
...@@ -104,7 +117,7 @@ class GIVBayesian(Process): ...@@ -104,7 +117,7 @@ class GIVBayesian(Process):
return bayesian_inference_on_period(self.h5_main[pix_ind], self.single_ao, self.parms_dict['freq'], return bayesian_inference_on_period(self.h5_main[pix_ind], self.single_ao, self.parms_dict['freq'],
show_plots=show_plots, **other_params) show_plots=show_plots, **other_params)
def _set_memory_and_cores(self, cores=1, mem=1024): def _set_memory_and_cores(self, cores=None, mem=None):
""" """
Checks hardware limitations such as memory, # cpus and sets the recommended datachunk sizes and the Checks hardware limitations such as memory, # cpus and sets the recommended datachunk sizes and the
number of cores to be used by analysis methods. number of cores to be used by analysis methods.
...@@ -124,8 +137,9 @@ class GIVBayesian(Process): ...@@ -124,8 +137,9 @@ class GIVBayesian(Process):
# raw, compensated current, resistance, variance # raw, compensated current, resistance, variance
self._max_pos_per_read = self._max_pos_per_read // 4 # Integer division self._max_pos_per_read = self._max_pos_per_read // 4 # Integer division
# Since these computations take far longer than functional fitting, do in smaller batches: # Since these computations take far longer than functional fitting, do in smaller batches:
self._max_pos_per_read = min(100, self._max_pos_per_read) self._max_pos_per_read = min(1000, self._max_pos_per_read)
if self.verbose:
if self.verbose and self.mpi_rank == 0:
print('Max positions per read set to {}'.format(self._max_pos_per_read)) print('Max positions per read set to {}'.format(self._max_pos_per_read))
def _create_results_datasets(self): def _create_results_datasets(self):
...@@ -135,35 +149,36 @@ class GIVBayesian(Process): ...@@ -135,35 +149,36 @@ class GIVBayesian(Process):
# create all h5 datasets here: # create all h5 datasets here:
num_pos = self.h5_main.shape[0] num_pos = self.h5_main.shape[0]
if self.verbose: if self.verbose and self.mpi_rank == 0:
print('Now creating the datasets') print('Now creating the datasets')
h5_group = create_results_group(self.h5_main, self.process_name) self.h5_results_grp = create_results_group(self.h5_main, self.process_name)
self.h5_results_grp = h5_group
write_simple_attrs(h5_group, {'algorithm_author': 'Kody J. Law', 'last_pixel': 0})
write_simple_attrs(h5_group, self.parms_dict)
if self.verbose: write_simple_attrs(self.h5_results_grp, {'algorithm_author': 'Kody J. Law', 'last_pixel': 0})
print('created group: {}'.format(h5_group.name)) write_simple_attrs(self.h5_results_grp, self.parms_dict)
print(get_attributes(h5_group))
if self.verbose and self.mpi_rank == 0:
print('created group: {} with attributes:'.format(self.h5_results_grp.name))
print(get_attributes(self.h5_results_grp))
# One of those rare instances when the result is exactly the same as the source # One of those rare instances when the result is exactly the same as the source
self.h5_i_corrected = create_empty_dataset(self.h5_main, np.float32, 'Corrected_Current', h5_group=h5_group) self.h5_i_corrected = create_empty_dataset(self.h5_main, np.float32, 'Corrected_Current', h5_group=self.h5_results_grp)
if self.verbose: if self.verbose and self.mpi_rank == 0:
print('Created I Corrected') print('Created I Corrected')
print_tree(h5_group) # print_tree(self.h5_results_grp)
# For some reason, we cannot specify chunks or compression!
# The resistance dataset requires the creation of a new spectroscopic dimension # The resistance dataset requires the creation of a new spectroscopic dimension
self.h5_resistance = write_main_dataset(h5_group, (num_pos, self.num_x_steps), 'Resistance', 'Resistance', self.h5_resistance = write_main_dataset(self.h5_results_grp, (num_pos, self.num_x_steps), 'Resistance', 'Resistance',
'GOhms', None, Dimension('Bias', 'V', self.num_x_steps), 'GOhms', None, Dimension('Bias', 'V', self.num_x_steps),
dtype=np.float32, chunks=(1, self.num_x_steps), compression='gzip', dtype=np.float32, # chunks=(1, self.num_x_steps), #compression='gzip',
h5_pos_inds=self.h5_main.h5_pos_inds, h5_pos_inds=self.h5_main.h5_pos_inds,
h5_pos_vals=self.h5_main.h5_pos_vals) h5_pos_vals=self.h5_main.h5_pos_vals)
if self.verbose: if self.verbose and self.mpi_rank == 0:
print('Created Resistance') print('Created Resistance')
print_tree(h5_group) # print_tree(self.h5_results_grp)
assert isinstance(self.h5_resistance, USIDataset) # only here for PyCharm assert isinstance(self.h5_resistance, USIDataset) # only here for PyCharm
self.h5_new_spec_vals = self.h5_resistance.h5_spec_vals self.h5_new_spec_vals = self.h5_resistance.h5_spec_vals
...@@ -171,21 +186,23 @@ class GIVBayesian(Process): ...@@ -171,21 +186,23 @@ class GIVBayesian(Process):
# The variance is identical to the resistance dataset # The variance is identical to the resistance dataset
self.h5_variance = create_empty_dataset(self.h5_resistance, np.float32, 'R_variance') self.h5_variance = create_empty_dataset(self.h5_resistance, np.float32, 'R_variance')
if self.verbose: if self.verbose and self.mpi_rank == 0:
print('Created Variance') print('Created Variance')
print_tree(h5_group) # print_tree(self.h5_results_grp)
# The capacitance dataset requires new spectroscopic dimensions as well # The capacitance dataset requires new spectroscopic dimensions as well
self.h5_cap = write_main_dataset(h5_group, (num_pos, 1), 'Capacitance', 'Capacitance', 'pF', None, self.h5_cap = write_main_dataset(self.h5_results_grp, (num_pos, 1), 'Capacitance', 'Capacitance', 'pF', None,
Dimension('Direction', '', [1]), h5_pos_inds=self.h5_main.h5_pos_inds, Dimension('Direction', '', [1]), h5_pos_inds=self.h5_main.h5_pos_inds,
h5_pos_vals=self.h5_main.h5_pos_vals, dtype=cap_dtype, compression='gzip', h5_pos_vals=self.h5_main.h5_pos_vals, dtype=cap_dtype, #compression='gzip',
aux_spec_prefix='Cap_Spec_') aux_spec_prefix='Cap_Spec_')
if self.verbose: if self.verbose and self.mpi_rank == 0:
print('Created Capacitance') print('Created Capacitance')
print_tree(h5_group) # print_tree(self.h5_results_grp)
print('Done!') print('Done creating all results datasets!')
if self.mpi_size > 1:
self.mpi_comm.Barrier()
self.h5_main.file.flush() self.h5_main.file.flush()
def _get_existing_datasets(self): def _get_existing_datasets(self):
...@@ -204,7 +221,8 @@ class GIVBayesian(Process): ...@@ -204,7 +221,8 @@ class GIVBayesian(Process):
""" """
if self.verbose: if self.verbose:
print('Started accumulating all results') print('Rank {} - Started accumulating results for this chunk'.format(self.mpi_rank))
num_pixels = len(self.forward_results) num_pixels = len(self.forward_results)
cap_mat = np.zeros((num_pixels, 2), dtype=np.float32) cap_mat = np.zeros((num_pixels, 2), dtype=np.float32)
r_inf_mat = np.zeros((num_pixels, self.num_x_steps), dtype=np.float32) r_inf_mat = np.zeros((num_pixels, self.num_x_steps), dtype=np.float32)
...@@ -245,26 +263,19 @@ class GIVBayesian(Process): ...@@ -245,26 +263,19 @@ class GIVBayesian(Process):
# Now write to h5 files: # Now write to h5 files:
if self.verbose: if self.verbose:
print('Finished accumulating results. Writing to h5') print('Rank {} - Finished accumulating results. Writing results of chunk to h5'.format(self.mpi_rank))
if self._start_pos == 0: if self.__first_batch:
self.h5_new_spec_vals[0, :] = full_results['x'] # Technically this needs to only be done once self.h5_new_spec_vals[0, :] = full_results['x'] # Technically this needs to only be done once
self.__first_batch = False
pos_slice = slice(self._start_pos, self._end_pos) # Get access to the private variable:
self.h5_cap[pos_slice] = np.atleast_2d(stack_real_to_compound(cap_mat, cap_dtype)).T pos_in_batch = self._get_pixels_in_current_batch()
self.h5_variance[pos_slice] = r_var_mat
self.h5_resistance[pos_slice] = r_inf_mat
self.h5_i_corrected[pos_slice] = i_cor_sin_mat
# Leaving in this provision that will allow restarting of processes self.h5_cap[pos_in_batch, :] = np.atleast_2d(stack_real_to_compound(cap_mat, cap_dtype)).T
self.h5_results_grp.attrs['last_pixel'] = self._end_pos self.h5_variance[pos_in_batch, :] = r_var_mat
self.h5_resistance[pos_in_batch, :] = r_inf_mat
self.h5_main.file.flush() self.h5_i_corrected[pos_in_batch, :] = i_cor_sin_mat
print('Finished processing up to pixel ' + str(self._end_pos) + ' of ' + str(self.h5_main.shape[0]))
# Now update the start position
self._start_pos = self._end_pos
def _unit_computation(self, *args, **kwargs): def _unit_computation(self, *args, **kwargs):
""" """
...@@ -282,22 +293,24 @@ class GIVBayesian(Process): ...@@ -282,22 +293,24 @@ class GIVBayesian(Process):
# first roll the data # first roll the data
rolled_raw_data = np.roll(self.data, self.roll_pts, axis=1) rolled_raw_data = np.roll(self.data, self.roll_pts, axis=1)
# Ensure that the bias has a positive slope. Multiply current by -1 accordingly # Ensure that the bias has a positive slope. Multiply current by -1 accordingly
if self.verbose:
print('Rank {} beginning parallel compute for Forward'.format(self.mpi_rank))
self.reverse_results = parallel_compute(rolled_raw_data[:, :half_v_steps] * -1, do_bayesian_inference, self.reverse_results = parallel_compute(rolled_raw_data[:, :half_v_steps] * -1, do_bayesian_inference,
cores=self._cores, cores=self._cores,
func_args=[self.rolled_bias[:half_v_steps] * -1, self.ex_freq], func_args=[self.rolled_bias[:half_v_steps] * -1, self.ex_freq],
func_kwargs=self._bayes_parms, lengthy_computation=True, func_kwargs=self._bayes_parms, lengthy_computation=False,
verbose=self.verbose) verbose=self.verbose)
if self.verbose: if self.verbose:
print('Finished processing forward sections. Now working on reverse sections....') print('Rank {} finished processing forward sections. Now working on reverse sections....'.format(self.mpi_rank))
self.forward_results = parallel_compute(rolled_raw_data[:, half_v_steps:], do_bayesian_inference, self.forward_results = parallel_compute(rolled_raw_data[:, half_v_steps:], do_bayesian_inference,
cores=self._cores, cores=self._cores,
func_args=[self.rolled_bias[half_v_steps:], self.ex_freq], func_args=[self.rolled_bias[half_v_steps:], self.ex_freq],
func_kwargs=self._bayes_parms, lengthy_computation=True, func_kwargs=self._bayes_parms, lengthy_computation=False,
verbose=self.verbose) verbose=self.verbose)
if self.verbose: if self.verbose:
print('Finished processing reverse loops') print('Rank {} Finished processing reverse loops (and this chunk)'.format(self.mpi_rank))
def compute(self, override=False, *args, **kwargs): def compute(self, override=False, *args, **kwargs):
""" """
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment