igor_ibw.py 10.4 KB
Newer Older
1
2
3
4
# -*- coding: utf-8 -*-
"""
Created on Wed Dec 07 16:04:34 2016

5
@author: Suhas Somnath, Chris R. Smith, Raj Giri
6
7
"""

8
from __future__ import division, print_function, absolute_import, unicode_literals
9
import sys
10
11
from os import path, remove  # File Path formatting
import numpy as np  # For array operations
12
import h5py
13
14
from igor import binarywave as bw

15
16
17
from sidpy.sid import Translator
from sidpy.hdf.hdf_utils import write_simple_attrs

18
from pyUSID.io.write_utils import VALUES_DTYPE, Dimension
19
20
from pyUSID.io.hdf_utils import create_indexed_group, write_main_dataset, \
    write_ind_val_dsets
21

22
23
24
if sys.version_info.major == 3:
    unicode = str

25
26
27
28
29
30

class IgorIBWTranslator(Translator):
    """
    Translates Igor Binary Wave (.ibw) files containing images or force curves to .h5
    """

31
32
33
34
35
36
37
38
39
40
41
42
    @staticmethod
    def is_valid_file(file_path):
        """
        Checks whether the provided file can be read by this translator

        Parameters
        ----------
        file_path : str
            Path to raw data file

        Returns
        -------
43
44
45
46
        obj : str
            Path to file that will be accepted by the translate() function if
            this translator is indeed capable of translating the provided file.
            Otherwise, None will be returned
47
        """
48
49
50
51
        if not isinstance(file_path, (str, unicode)):
            raise TypeError('file_path should be a string object')
        if not path.isfile(file_path):
            return None
52
53
54
55
        file_path = path.abspath(file_path)
        extension = path.splitext(file_path)[1][1:]
        if extension == 'ibw':
            # This should be sufficient I think.
56
            return file_path
57
        else:
58
            return None
59

60
    def translate(self, file_path, verbose=False, append_path='', 
61
                  grp_name='Measurement', parm_encoding='utf-8'):
62
63
64
65
66
67
68
        """
        Translates the provided file to .h5

        Parameters
        ----------
        file_path : String / unicode
            Absolute path of the .ibw file
69
70
        verbose : Boolean (Optional)
            Whether or not to show  print statements for debugging
71
        append_path : string (Optional)
72
            h5_file to add these data to, must be a path to the h5_file on disk
73
74
        grp_name : string (Optional)
            Change from default "Measurement" name to something specific
75
76
77
        parm_encoding : str, optional
            Codec to be used to decode the bytestrings into Python strings if needed.
            Default 'utf-8'
78
79
80
81
82
83

        Returns
        -------
        h5_path : String / unicode
            Absolute path of the .h5 file
        """
84
        file_path = path.abspath(file_path)
85
86
87
        # Prepare the .h5 file:
        folder_path, base_name = path.split(file_path)
        base_name = base_name[:-4]
88
89
90
91
92
93
94
95
        
        if not append_path:
            h5_path = path.join(folder_path, base_name + '.h5')
            if path.exists(h5_path):
                remove(h5_path)
            h5_file = h5py.File(h5_path, 'w')
        else:
            h5_path = append_path
96
            if not path.exists(append_path):
97
                raise Exception('File does not exist. Check pathname.')
98
99
            h5_file = h5py.File(h5_path, 'r+')
        
100
101
102
103

        # Load the ibw file first
        ibw_obj = bw.load(file_path)
        ibw_wave = ibw_obj.get('wave')
104
105
        parm_dict = self._read_parms(ibw_wave, parm_encoding)
        chan_labels, chan_units = self._get_chan_labels(ibw_wave, parm_encoding)
106

107
108
109
110
        if verbose:
            print('Channels and units found:')
            print(chan_labels)
            print(chan_units)
111
112
113

        # Get the data to figure out if this is an image or a force curve
        images = ibw_wave.get('wData')
114

Chris Smith's avatar
Chris Smith committed
115
        if images.shape[-1] != len(chan_labels):
Chris Smith's avatar
Chris Smith committed
116
            chan_labels = chan_labels[1:]  # for layer 0 null set errors in older AR software
117

118
        if images.ndim == 3:  # Image stack
119
120
            if verbose:
                print('Found image stack of size {}'.format(images.shape))
121
122
123
124
125
            type_suffix = 'Image'

            num_rows = parm_dict['ScanLines']
            num_cols = parm_dict['ScanPoints']

Somnath, Suhas's avatar
Somnath, Suhas committed
126
            images = images.transpose(2, 1, 0)  # now ordered as [chan, Y, X] image
127
128
            images = np.reshape(images, (images.shape[0], -1, 1))  # 3D [chan, Y*X points,1]

129
130
            pos_desc = [Dimension('X', 'm', np.linspace(0, parm_dict['FastScanSize'], num_cols)),
                        Dimension('Y', 'm', np.linspace(0, parm_dict['SlowScanSize'], num_rows))]
131

132
            spec_desc = Dimension('arb', 'a.u.', [1])
133
134

        else:  # single force curve
135
136
137
            if verbose:
                print('Found force curve of size {}'.format(images.shape))

138
            type_suffix = 'ForceCurve'
139
140
            images = np.atleast_3d(images)  # now [Z, chan, 1]
            images = images.transpose((1, 2, 0))  # [chan ,1, Z] force curve
141
142
143
144
145
146
147

            # The data generated above varies linearly. Override.
            # For now, we'll shove the Z sensor data into the spectroscopic values.

            # Find the channel that corresponds to either Z sensor or Raw:
            try:
                chan_ind = chan_labels.index('ZSnsr')
Chris Smith's avatar
Chris Smith committed
148
                spec_data = VALUES_DTYPE(images[chan_ind]).squeeze()
149
150
151
            except ValueError:
                try:
                    chan_ind = chan_labels.index('Raw')
Chris Smith's avatar
Chris Smith committed
152
                    spec_data = VALUES_DTYPE(images[chan_ind]).squeeze()
153
154
                except ValueError:
                    # We don't expect to come here. If we do, spectroscopic values remains as is
155
                    spec_data = np.arange(images.shape[2])
156

157
158
            pos_desc = Dimension('X', 'm', [1])
            spec_desc = Dimension('Z', 'm', spec_data)
159

160
        # Create measurement group
161
        meas_grp = create_indexed_group(h5_file, grp_name)
162
163

        # Write file and measurement level parameters
164
        global_parms = dict()
165
166
        global_parms['data_type'] = 'IgorIBW_' + type_suffix
        global_parms['translator'] = 'IgorIBW'
167
        write_simple_attrs(h5_file, global_parms)
168

169
        write_simple_attrs(meas_grp, parm_dict)
170

171
172
173
        # Create Position and spectroscopic datasets
        h5_pos_inds, h5_pos_vals = write_ind_val_dsets(meas_grp, pos_desc, is_spectral=False)
        h5_spec_inds, h5_spec_vals = write_ind_val_dsets(meas_grp, spec_desc, is_spectral=True)
174

175
176
        # Prepare the list of raw_data datasets
        for chan_data, chan_name, chan_unit in zip(images, chan_labels, chan_units):
Chris Smith's avatar
Chris Smith committed
177
178
179
            if verbose:
                print('channel', chan_name)
                print('unit', chan_unit)
180
            chan_grp = create_indexed_group(meas_grp, 'Channel')
181

182
183
184
185
186
187
            write_main_dataset(chan_grp, np.atleast_2d(chan_data), 'Raw_Data',
                               chan_name, chan_unit,
                               None, None,
                               h5_pos_inds=h5_pos_inds, h5_pos_vals=h5_pos_vals,
                               h5_spec_inds=h5_spec_inds, h5_spec_vals=h5_spec_vals,
                               dtype=np.float32)
188

189
        if verbose:
190
            print('Finished preparing raw datasets')
191

192
        h5_file.close()
193
194
195
        return h5_path

    @staticmethod
196
    def _read_parms(ibw_wave, codec='utf-8'):
197
198
199
200
201
202
203
        """
        Parses the parameters in the provided dictionary

        Parameters
        ----------
        ibw_wave : dictionary
            Wave entry in the dictionary obtained from loading the ibw file
204
205
206
        codec : str, optional
            Codec to be used to decode the bytestrings into Python strings if needed.
            Default 'utf-8'
207
208
209
210
211
212
213

        Returns
        -------
        parm_dict : dictionary
            Dictionary containing parameters
        """
        parm_string = ibw_wave.get('note')
214
        if type(parm_string) == bytes:
Raj's avatar
Raj committed
215
216
217
            try:
                parm_string = parm_string.decode(codec)
            except:
Chris Smith's avatar
Chris Smith committed
218
                parm_string = parm_string.decode('ISO-8859-1')  # for older AR software
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
        parm_string = parm_string.rstrip('\r')
        parm_list = parm_string.split('\r')
        parm_dict = dict()
        for pair_string in parm_list:
            temp = pair_string.split(':')
            if len(temp) == 2:
                temp = [item.strip() for item in temp]
                try:
                    num = float(temp[1])
                    parm_dict[temp[0]] = num
                    try:
                        if num == int(num):
                            parm_dict[temp[0]] = int(num)
                    except OverflowError:
                        pass
                except ValueError:
                    parm_dict[temp[0]] = temp[1]

        # Grab the creation and modification times:
        other_parms = ibw_wave.get('wave_header')
        for key in ['creationDate', 'modDate', 'bname']:
            try:
                parm_dict[key] = other_parms[key]
            except KeyError:
                pass
        return parm_dict

    @staticmethod
247
    def _get_chan_labels(ibw_wave, codec='utf-8'):
248
249
250
251
252
253
254
        """
        Retrieves the names of the data channels and default units

        Parameters
        ----------
        ibw_wave : dictionary
            Wave entry in the dictionary obtained from loading the ibw file
255
256
257
        codec : str, optional
            Codec to be used to decode the bytestrings into Python strings if needed.
            Default 'utf-8'
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275

        Returns
        -------
        labels : list of strings
            List of the names of the data channels
        default_units : list of strings
            List of units for the measurement in each channel
        """
        temp = ibw_wave.get('labels')
        labels = []
        for item in temp:
            if len(item) > 0:
                labels += item
        for item in labels:
            if item == '':
                labels.remove(item)

        default_units = list()
276
277
        for chan_ind, chan in enumerate(labels):
            # clean up channel names
278
279
            if type(chan) == bytes:
                chan = chan.decode(codec)
280
281
            if chan.lower().rfind('trace') > 0:
                labels[chan_ind] = chan[:chan.lower().rfind('trace') + 5]
Chris Smith's avatar
Chris Smith committed
282
283
            else:
                labels[chan_ind] = chan
284
            # Figure out (default) units
285
286
287
288
289
290
291
292
293
            if chan.startswith('Phase'):
                default_units.append('deg')
            elif chan.startswith('Current'):
                default_units.append('A')
            else:
                default_units.append('m')

        return labels, default_units

294
    def _parse_file_path(self, input_path):
295
296
297
298
        pass

    def _read_data(self):
        pass