process.py 10 KB
Newer Older
1
2
3
4
5
"""
Created on 7/17/16 10:08 AM
@author: Suhas Somnath, Chris Smith
"""

6
from __future__ import division, print_function, absolute_import
7
8
9

import numpy as np
import psutil
Unknown's avatar
Unknown committed
10
import joblib
11

12
from ..io.hdf_utils import checkIfMain, check_for_old, get_attributes
13
from ..io.io_hdf5 import ioHDF5
Unknown's avatar
Unknown committed
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
from ..io.io_utils import recommendCores, getAvailableMem


def calc_func(some_func, args):
    """

    Parameters
    ----------
    some_func
    args

    Returns
    -------

    """

    return some_func(*args)


def setup_func(args):
    """

    Parameters
    ----------
    args

    Returns
    -------

    """

    return calc_func(*args)
46
47
48
49
50


class Process(object):
    """
    Encapsulates the typical steps performed when applying a processing function to  a dataset.
Chris Smith's avatar
Chris Smith committed
51
    """
52

53
    def __init__(self, h5_main, h5_results_grp=None, cores=None, max_mem_mb=4*1024, verbose=False):
54
55
56
57
58
59
        """
        Parameters
        ----------
        h5_main : h5py.Dataset instance
            The dataset over which the analysis will be performed. This dataset should be linked to the spectroscopic
            indices and values, and position indices and values datasets.
60
61
        h5_results_grp : h5py.Datagroup object, optional
            Datagroup containing partially computed results
62
        cores : uint, optional
63
            Default - all available cores - 2
64
            How many cores to use for the computation
Unknown's avatar
Unknown committed
65
        max_mem_mb : uint, optional
Unknown's avatar
Unknown committed
66
            How much memory to use for the computation.  Default 1024 Mb
67
68
        verbose : Boolean, (Optional, default = False)
            Whether or not to print debugging statements
69
70
        """

71
72
73
        if h5_main.file.mode != 'r+':
            raise TypeError('Need to ensure that the file is in r+ mode to write results back to the file')

74
75
76
77
78
        # Checking if dataset is "Main"
        if checkIfMain(h5_main):
            self.h5_main = h5_main
            self.hdf = ioHDF5(self.h5_main.file)
        else:
79
            raise ValueError('Provided dataset is not a "Main" dataset with necessary ancillary datasets')
80

81
        self.verbose = verbose
82
83
        self._max_pos_per_read = None
        self._max_mem_mb = None
84
85
86
87

        self._start_pos = 0
        self._end_pos = self.h5_main.shape[0]

88
89
        # Determining the max size of the data that can be put into memory
        self._set_memory_and_cores(cores=cores, mem=max_mem_mb)
90
91
92
93
        self.duplicate_h5_groups = []
        self.process_name = None  # Reset this in the extended classes
        self.parms_dict = None

94
95
96
97
98
        self._results = None
        self.h5_results_grp = h5_results_grp
        if self.h5_results_grp is not None:
            self._extract_params(h5_results_grp)

99
        # DON'T check for duplicates since parms_dict has not yet been initialized.
100
        # Sub classes will check by themselves if they are interested.
101
102

    def _check_for_duplicates(self):
103
104
105
106
107
108
109
110
        """
        Checks for instances where the process was applied to the same dataset with the same parameters

        Returns
        -------
        duplicate_h5_groups : list of h5py.Datagroup objects
            List of groups satisfying the above conditions
        """
111
112
113
114
115
116
117
118
        duplicate_h5_groups = check_for_old(self.h5_main, self.process_name, new_parms=self.parms_dict)
        if self.verbose:
            print('Checking for duplicates:')
        if duplicate_h5_groups is not None:
            print('WARNING! ' + self.process_name + ' has already been performed with the same parameters before. '
                                                    'Consider reusing results')
            print(duplicate_h5_groups)
        return duplicate_h5_groups
119

120
121
122
123
124
125
126
127
128
129
    def _extract_params(self, h5_partial_group):
        """
        Extracts the necessary parameters from the provided h5 group to resume computation

        Parameters
        ----------
        h5_partial_group : h5py.Datagroup object
            Datagroup containing partially computed results

        """
130
131
        self.parms_dict = get_attributes(h5_partial_group)
        self._start_pos = self.parms_dict.pop('last_pixel')
132
133
        if self._start_pos == self.h5_main.shape[0] - 1:
            raise ValueError('The last computed pixel shows that the computation was already complete')
134

135
    def _set_memory_and_cores(self, cores=1, mem=1024):
136
137
138
139
        """
        Checks hardware limitations such as memory, # cpus and sets the recommended datachunk sizes and the
        number of cores to be used by analysis methods.

140
141
142
        Parameters
        ----------
        cores : uint, optional
Unknown's avatar
Unknown committed
143
            Default - 1
144
            How many cores to use for the computation
Unknown's avatar
Unknown committed
145
146
147
        mem : uint, optional
            Default - 1024
            The amount a memory in Mb to use in the computation
148
149
        """

150
151
        if cores is None:
            self._cores = psutil.cpu_count() - 2
152
        else:
153
154
155
            cores = int(abs(cores))
            self._cores = min(psutil.cpu_count(), max(1, cores))

Unknown's avatar
Unknown committed
156
157
158
        _max_mem_mb = getAvailableMem() / 1E6  # in MB

        self._max_mem_mb = min(_max_mem_mb, mem)
159

160
        max_data_chunk = self._max_mem_mb / self._cores
161
162

        # Now calculate the number of positions that can be stored in memory in one go.
Unknown's avatar
Unknown committed
163
        mb_per_position = self.h5_main.dtype.itemsize * self.h5_main.shape[1] / 1e6
164
        self._max_pos_per_read = int(np.floor(max_data_chunk / mb_per_position))
165

166
        if self.verbose:
167
168
            print('Allowed to read {} pixels per chunk'.format(self._max_pos_per_read))
            print('Allowed to use up to', str(self._cores), 'cores and', str(self._max_mem_mb), 'MB of memory')
169

Somnath, Suhas's avatar
Somnath, Suhas committed
170
    @staticmethod
171
    def _unit_function(*args):
172
        raise NotImplementedError('Please override the _unit_function specific to your process')
173

174
    def _read_data_chunk(self):
175
176
177
178
179
180
181
        """
        Reads a chunk of data for the intended computation into memory

       Parameters
        -----
        verbose : bool, optional
            Whether or not to print log statements
182
183
184
185
        """
        if self._start_pos < self.h5_main.shape[0]:
            self._end_pos = int(min(self.h5_main.shape[0], self._start_pos + self._max_pos_per_read))
            self.data = self.h5_main[self._start_pos:self._end_pos, :]
186
            if self.verbose:
187
188
189
190
                print('Reading pixels {} to {} of {}'.format(self._start_pos, self._end_pos, self.h5_main.shape[0]))

            # DON'T update the start position

191
        else:
192
            if self.verbose:
193
                print('Finished reading all data!')
194
195
            self.data = None

196
    def _write_results_chunk(self):
197
        """
198
        Writes the computed results into appropriate datasets.
199

200
        This needs to be rewritten since the processed data is expected to be at least as large as the dataset
201
202
203
        """
        # Now update the start position
        self._start_pos = self._end_pos
204
        raise NotImplementedError('Please override the _set_results specific to your process')
205
206
207

    def _create_results_datasets(self):
        """
208
        Process specific call that will write the h5 group, guess dataset, corresponding spectroscopic datasets and also
209
210
211
        link the guess dataset to the spectroscopic datasets. It is recommended that the ancillary datasets be populated
        within this function.
        """
212
        raise NotImplementedError('Please override the _create_results_datasets specific to your process')
213

214
215
216
217
    def _get_existing_datasets(self):
        """
        The purpose of this function is to allow processes to resume from partly computed results

syz's avatar
syz committed
218
        Start with self.h5_results_grp
219
220
221
        """
        raise NotImplementedError('Please override the _get_existing_datasets specific to your process')

222
    def compute(self, *args, **kwargs):
223
224
225
226
227
228
229
230
231
232
233
234
        """

        Parameters
        ----------
        kwargs:
            processors: int
                number of processors to use. Default all processors on the system except for 1.

        Returns
        -------

        """
syz's avatar
syz committed
235
236
237
238
239
240
        if self._start_pos == 0:
            # starting fresh
            self._create_results_datasets()
        else:
            # resuming from previous checkpoint
            self._get_existing_datasets()
241

242
243
        self._read_data_chunk()
        while self.data is not None:
Unknown's avatar
Unknown committed
244
            self._results = parallel_compute(self.data, self._unit_function(), cores=self._cores,
245
246
                                             lengthy_computation=False,
                                             func_args=args, func_kwargs=kwargs)
247
248
249
            self._write_results_chunk()
            self._read_data_chunk()

250
        print('Completed computation on chunk. Writing to file.')
251
252
253
254

        return self.h5_results_grp


255
def parallel_compute(data, func, cores=1, lengthy_computation=False, func_args=list(), func_kwargs=dict()):
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
    """
    Computes the guess function using multiple cores

    Parameters
    ----------
    data : numpy.ndarray
        Data to map function to. Function will be mapped to the first axis of data
    func : callable
        Function to map to data
    cores : uint, optional
        Number of logical cores to use to compute
        Default - 1 (serial computation)
    lengthy_computation : bool, optional
        Whether or not each computation is expected to take substantial time.
        Sometimes the time for adding more cores can outweigh the time per core
        Default - False
272
273
274
    func_args : list, optional
        arguments to be passed to the function
    func_kwargs : dict, optional
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
        keyword arguments to be passed onto function

    Returns
    -------
    results : list
        List of computational results
    """

    if not callable(func):
        raise TypeError('Function argument is not callable')

    cores = recommendCores(data.shape[0], requested_cores=cores,
                           lengthy_computation=lengthy_computation)

    if cores > 1:
290
        values = [joblib.delayed(func)(x, *func_args, **func_kwargs) for x in data]
291
        results = joblib.Parallel(n_jobs=cores)(values)
292
293

        # Finished reading the entire data set
294
        print('Finished parallel computation')
295
296
297

    else:
        print("Computing serially ...")
298
        results = [func(vector, *func_args, **func_kwargs) for vector in data]
299
300

    return results