process.py 7.37 KB
Newer Older
1
2
3
4
5
"""
Created on 7/17/16 10:08 AM
@author: Suhas Somnath, Chris Smith
"""

6
from __future__ import division, print_function, absolute_import
7
8
9

import numpy as np
import psutil
Unknown's avatar
Unknown committed
10
import joblib
11
12
13

from ..io.hdf_utils import checkIfMain
from ..io.io_hdf5 import ioHDF5
Unknown's avatar
Unknown committed
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
from ..io.io_utils import recommendCores, getAvailableMem


def calc_func(some_func, args):
    """

    Parameters
    ----------
    some_func
    args

    Returns
    -------

    """

    return some_func(*args)


def setup_func(args):
    """

    Parameters
    ----------
    args

    Returns
    -------

    """

    return calc_func(*args)
46
47
48
49
50


class Process(object):
    """
    Encapsulates the typical steps performed when applying a processing function to  a dataset.
Chris Smith's avatar
Chris Smith committed
51
    """
52

Unknown's avatar
Unknown committed
53
    def __init__(self, h5_main, cores=1, max_mem_mb=1024):
54
55
56
57
58
59
60
        """
        Parameters
        ----------
        h5_main : h5py.Dataset instance
            The dataset over which the analysis will be performed. This dataset should be linked to the spectroscopic
            indices and values, and position indices and values datasets.
        cores : uint, optional
Unknown's avatar
Unknown committed
61
            Default - 1
62
            How many cores to use for the computation
Unknown's avatar
Unknown committed
63
        max_mem_mb : uint, optional
Unknown's avatar
Unknown committed
64
            How much memory to use for the computation.  Default 1024 Mb
65
66
        """

67
68
69
70
71
        # Checking if dataset is "Main"
        if checkIfMain(h5_main):
            self.h5_main = h5_main
            self.hdf = ioHDF5(self.h5_main.file)
        else:
72
            raise ValueError('Provided dataset is not a "Main" dataset with necessary ancillary datasets')
73
74

        # Determining the max size of the data that can be put into memory
Unknown's avatar
Unknown committed
75
        self._set_memory_and_cores(cores=cores, mem=max_mem_mb)
76
77
78
79

        self._start_pos = 0
        self._end_pos = self.h5_main.shape[0]

80
81
82
        self._results = None
        self.h5_results_grp = None

Unknown's avatar
Unknown committed
83
    def _set_memory_and_cores(self, cores=1, mem=1024, verbose=False):
84
85
86
87
        """
        Checks hardware limitations such as memory, # cpus and sets the recommended datachunk sizes and the
        number of cores to be used by analysis methods.

88
89
90
        Parameters
        ----------
        cores : uint, optional
Unknown's avatar
Unknown committed
91
            Default - 1
92
            How many cores to use for the computation
Unknown's avatar
Unknown committed
93
94
95
        mem : uint, optional
            Default - 1024
            The amount a memory in Mb to use in the computation
96
97
98
        verbose : bool, optional
            Whether or not to print log statements

99
100
        """

101
102
        if cores is None:
            self._cores = psutil.cpu_count() - 2
103
        else:
104
105
106
            cores = int(abs(cores))
            self._cores = min(psutil.cpu_count(), max(1, cores))

Unknown's avatar
Unknown committed
107
108
109
        _max_mem_mb = getAvailableMem() / 1E6  # in MB

        self._max_mem_mb = min(_max_mem_mb, mem)
110

111
        max_data_chunk = self._max_mem_mb / self._cores
112
113

        # Now calculate the number of positions that can be stored in memory in one go.
Unknown's avatar
Unknown committed
114
        mb_per_position = self.h5_main.dtype.itemsize * self.h5_main.shape[1] / 1e6
115
        self._max_pos_per_read = int(np.floor(max_data_chunk / mb_per_position))
116

117
118
119
        if verbose:
            print('Allowed to read {} pixels per chunk'.format(self._max_pos_per_read))
            print('Allowed to use up to', str(self._cores), 'cores and', str(self._max_mem_mb), 'MB of memory')
120

Somnath, Suhas's avatar
Somnath, Suhas committed
121
    @staticmethod
122
    def _unit_function(*args):
123
        raise NotImplementedError('Please override the _unit_function specific to your process')
124

125
126
127
128
129
130
131
132
    def _read_data_chunk(self, verbose=False):
        """
        Reads a chunk of data for the intended computation into memory

       Parameters
        -----
        verbose : bool, optional
            Whether or not to print log statements
133
134
135
136
        """
        if self._start_pos < self.h5_main.shape[0]:
            self._end_pos = int(min(self.h5_main.shape[0], self._start_pos + self._max_pos_per_read))
            self.data = self.h5_main[self._start_pos:self._end_pos, :]
137
138
139
140
141
            if verbose:
                print('Reading pixels {} to {} of {}'.format(self._start_pos, self._end_pos, self.h5_main.shape[0]))

            # DON'T update the start position

142
        else:
143
144
            if verbose:
                print('Finished reading all data!')
145
146
            self.data = None

147
    def _write_results_chunk(self):
148
        """
149
        Writes the computed results into appropriate datasets.
150

151
        This needs to be rewritten since the processed data is expected to be at least as large as the dataset
152
153
154
        """
        # Now update the start position
        self._start_pos = self._end_pos
155
        raise NotImplementedError('Please override the _set_results specific to your process')
156
157
158

    def _create_results_datasets(self):
        """
159
        Process specific call that will write the h5 group, guess dataset, corresponding spectroscopic datasets and also
160
161
162
        link the guess dataset to the spectroscopic datasets. It is recommended that the ancillary datasets be populated
        within this function.
        """
163
        raise NotImplementedError('Please override the _create_results_datasets specific to your process')
164

165
    def compute(self, *args, **kwargs):
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
        """

        Parameters
        ----------
        kwargs:
            processors: int
                number of processors to use. Default all processors on the system except for 1.

        Returns
        -------

        """

        self._create_results_datasets()
        self._start_pos = 0

182
183
184
        self._read_data_chunk()
        while self.data is not None:
            self._results = parallel_compute(self.data, self._unit_function, cores=self._cores,
185
186
                                             lengthy_computation=False,
                                             func_args=args, func_kwargs=kwargs)
187
188
189
            self._write_results_chunk()
            self._read_data_chunk()

190
        print('Completed computation on chunk. Writing to file.')
191
192
193
194

        return self.h5_results_grp


195
def parallel_compute(data, func, cores=1, lengthy_computation=False, func_args=list(), func_kwargs=dict()):
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
    """
    Computes the guess function using multiple cores

    Parameters
    ----------
    data : numpy.ndarray
        Data to map function to. Function will be mapped to the first axis of data
    func : callable
        Function to map to data
    cores : uint, optional
        Number of logical cores to use to compute
        Default - 1 (serial computation)
    lengthy_computation : bool, optional
        Whether or not each computation is expected to take substantial time.
        Sometimes the time for adding more cores can outweigh the time per core
        Default - False
212
213
214
    func_args : list, optional
        arguments to be passed to the function
    func_kwargs : dict, optional
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
        keyword arguments to be passed onto function

    Returns
    -------
    results : list
        List of computational results
    """

    if not callable(func):
        raise TypeError('Function argument is not callable')

    cores = recommendCores(data.shape[0], requested_cores=cores,
                           lengthy_computation=lengthy_computation)

    if cores > 1:
230
        values = [joblib.delayed(func)(x, *func_args, **func_kwargs) for x in data]
231
        results = joblib.Parallel(n_jobs=cores)(values)
232
233

        # Finished reading the entire data set
234
        print('Finished parallel computation')
235
236
237

    else:
        print("Computing serially ...")
238
        results = [func(vector, *func_args, **func_kwargs) for vector in data]
239
240

    return results