Commit 940a2307 authored by Somnath, Suhas's avatar Somnath, Suhas
Browse files

First draft of class and basic parallel compute function

parent f98249a9
......@@ -4,121 +4,133 @@ Created on 7/17/16 10:08 AM
"""
from __future__ import division, print_function, absolute_import
from warnings import warn
import sys
import numpy as np
import psutil
import scipy
import itertools
import multiprocessing as mp
from ..io.hdf_utils import checkIfMain
from ..io.io_hdf5 import ioHDF5
import multiprocessing as mp
from ..io.io_utils import recommendCores
class Process(object):
"""
Encapsulates the typical steps performed when applying a processing function to a dataset.
Parameters
----------
h5_main : h5py.Dataset instance
The dataset over which the analysis will be performed. This dataset should be linked to the spectroscopic
indices and values, and position indices and values datasets.
"""
def __init__(self, h5_main):
def __init__(self, h5_main, cores=None):
"""
Parameters
----------
h5_main : h5py.Dataset instance
The dataset over which the analysis will be performed. This dataset should be linked to the spectroscopic
indices and values, and position indices and values datasets.
cores : uint, optional
Default - None
How many cores to use for the computation
"""
# Checking if dataset is "Main"
if checkIfMain(h5_main):
self.h5_main = h5_main
self.hdf = ioHDF5(self.h5_main.file)
else:
warn('Provided dataset is not a "Main" dataset with necessary ancillary datasets')
return
raise ValueError('Provided dataset is not a "Main" dataset with necessary ancillary datasets')
# Determining the max size of the data that can be put into memory
self._setMemoryAndCPUs()
self._set_memory_and_cores(cores=cores)
self._start_pos = 0
self._end_pos = self.h5_main.shape[0]
def _setMemoryAndCPUs(self):
self._results = None
self.h5_results_grp = None
def _set_memory_and_cores(self, cores=None, verbose=False):
"""
Checks hardware limitations such as memory, # cpus and sets the recommended datachunk sizes and the
number of cores to be used by analysis methods.
Returns
-------
None
Parameters
----------
cores : uint, optional
Default - None
How many cores to use for the computation
verbose : bool, optional
Whether or not to print log statements
"""
if self._parallel:
self._maxCpus = psutil.cpu_count() - 2
if cores is None:
self._cores = psutil.cpu_count() - 2
else:
self._maxCpus = 1
self._maxMemoryMB = psutil.virtual_memory().available / 1e6 # in MB
cores = int(abs(cores))
self._cores = min(psutil.cpu_count(), max(1, cores))
self._max_mem_mb = psutil.virtual_memory().available / 1e6 # in MB
self._maxDataChunk = self._maxMemoryMB / self._maxCpus
max_data_chunk = self._max_mem_mb / self._cores
# Now calculate the number of positions that can be stored in memory in one go.
mb_per_position = self.h5_main.dtype.itemsize * self.h5_main.shape[1] / 1e6
self._max_pos_per_read = int(np.floor(self._maxDataChunk / mb_per_position))
print('Allowed to read {} pixels per chunk'.format(self._max_pos_per_read))
self._max_pos_per_read = int(np.floor(max_data_chunk / mb_per_position))
def _get_data_chunk(self):
"""
Returns a chunk of data for the guess or the fit
if verbose:
print('Allowed to read {} pixels per chunk'.format(self._max_pos_per_read))
print('Allowed to use up to', str(self._cores), 'cores and', str(self._max_mem_mb), 'MB of memory')
Parameters
----------
None
def _unit_function(*args):
raise NotImplementedError('Please override the _create_results_datasets specific to your model')
Returns
-------
def _read_data_chunk(self, verbose=False):
"""
Reads a chunk of data for the intended computation into memory
Parameters
-----
verbose : bool, optional
Whether or not to print log statements
"""
if self._start_pos < self.h5_main.shape[0]:
self._end_pos = int(min(self.h5_main.shape[0], self._start_pos + self._max_pos_per_read))
self.data = self.h5_main[self._start_pos:self._end_pos, :]
print('Reading pixels {} to {} of {}'.format(self._start_pos, self._end_pos, self.h5_main.shape[0]))
if verbose:
print('Reading pixels {} to {} of {}'.format(self._start_pos, self._end_pos, self.h5_main.shape[0]))
# DON'T update the start position
else:
print('Finished reading all data!')
if verbose:
print('Finished reading all data!')
self.data = None
def _set_results(self):
def _write_results_chunk(self):
"""
Writes the provided guess or fit results into appropriate datasets.
Given that the guess and fit datasets are relatively small, we should be able to hold them in memory just fine
Writes the computed results into appropriate datasets.
This needs to be rewritten since the processed data is expected to be at least as large as the dataset
"""
# Now update the start position
self._start_pos = self._end_pos
warn('Please override the _set_results specific to your model')
pass
raise NotImplementedError('Please override the _set_results specific to your model')
def _create_results_datasets(self):
"""
Model specific call that will write the h5 group, guess dataset, corresponding spectroscopic datasets and also
Process specific call that will write the h5 group, guess dataset, corresponding spectroscopic datasets and also
link the guess dataset to the spectroscopic datasets. It is recommended that the ancillary datasets be populated
within this function.
"""
warn('Please override the _create_results_datasets specific to your model')
pass
raise NotImplementedError('Please override the _create_results_datasets specific to your model')
def compute(self, func, **kwargs):
def compute(self, *args, **kwargs):
"""
Parameters
----------
func
strategy: string
Default is 'Wavelet_Peaks'.
Can be one of ['wavelet_peaks', 'relative_maximum', 'gaussian_processes'].
For updated list, run GuessMethods.methods
options: dict
Default {"peaks_widths": np.array([10,200])}}.
Dictionary of options passed to strategy. For more info see GuessMethods documentation.
kwargs:
processors: int
number of processors to use. Default all processors on the system except for 1.
......@@ -131,40 +143,78 @@ class Process(object):
self._create_results_datasets()
self._start_pos = 0
processors = kwargs.get("processors", self._maxCpus)
results = list()
if self._parallel:
# start pool of workers
print('Computing in parallel ... launching %i kernels...' % processors)
pool = mp.Pool(processors)
self._get_data_chunk()
while self.data is not None: # as long as we have not reached the end of this data set:
# apply guess to this data chunk:
tasks = [vector for vector in self.data]
chunk = int(self.data.shape[0] / processors)
jobs = pool.imap(func, tasks, chunksize=chunk)
# get Results from different processes
print('Extracting Results...')
temp = [j for j in jobs]
# Reformat the data to the appropriate type and or do additional computation now
# Write to file
self._set_results()
# read the next chunk
self._get_data_chunk()
# Finished reading the entire data set
print('closing %i kernels...' % processors)
pool.close()
else:
print("Computing Guesses In Serial ...")
self._get_data_chunk()
while self.data is not None: # as long as we have not reached the end of this data set:
temp = [func(vector) for vector in self.data]
results.append(self._reformatResults(temp))
# read the next chunk
self._get_data_chunk()
# reorder to get one numpy array out
self.guess = np.hstack(tuple(results))
self._read_data_chunk()
while self.data is not None:
self._results = parallel_compute(self.data, self._unit_function, cores=self._cores,
lengthy_computation=False, *args, **kwargs)
self._write_results_chunk()
self._read_data_chunk()
print('Completed computing guess. Writing to file.')
return self.h5_results_grp
def parallel_compute(data, func, cores=1, lengthy_computation=False, *args, **kwargs):
"""
Computes the guess function using multiple cores
Parameters
----------
data : numpy.ndarray
Data to map function to. Function will be mapped to the first axis of data
func : callable
Function to map to data
cores : uint, optional
Number of logical cores to use to compute
Default - 1 (serial computation)
lengthy_computation : bool, optional
Whether or not each computation is expected to take substantial time.
Sometimes the time for adding more cores can outweigh the time per core
Default - False
kwargs : dict, optional
keyword arguments to be passed onto function
Returns
-------
results : list
List of computational results
"""
if not callable(func):
raise TypeError('Function argument is not callable')
cores = recommendCores(data.shape[0], requested_cores=cores,
lengthy_computation=lengthy_computation)
if cores > 1:
# Vectorize tasks
if sys.version_info.major == 3:
zip_fun = zip
else:
zip_fun = itertools.izip
# tasks = [(vector, kwargs) for vector in data]
tasks = zip_fun(data, itertools.repeat(args), itertools.repeat(kwargs))
chunk = int(data.shape[0] / cores)
# start pool of workers
print('Computing in parallel ... launching %i kernels...' % cores)
pool = mp.Pool(processes=cores)
# Map them across processors
jobs = pool.imap(func, tasks, chunksize=chunk)
# get Results from different processes
results = [j for j in jobs]
# Finished reading the entire data set
print('Extracted Results... Closing %i kernels...' % cores)
pool.close()
else:
print("Computing serially ...")
results = [func(vector, args, kwargs) for vector in data]
return results
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment