pifm.py 18.1 KB
Newer Older
Jessica's avatar
Jessica committed
1
2
3
4
5
6
7
import os
import numpy as np
from pyUSID.io.translator import Translator
from pyUSID.io import write_utils
from pyUSID import USIDataset

class PiFMTranslator(Translator):
Jessica's avatar
Jessica committed
8
9
10
11
12
    """
    Class that writes images, spectrograms, point spectra and associated ancillary data sets to h5 file in pyUSID data
    structure.
    """
    def __init__(self, path=None):
Jessica's avatar
Jessica committed
13
        self.path = path
Jessica's avatar
Jessica committed
14
#        super(HyperspectralTranslator, self).__init__(*args, **kwargs)
Jessica's avatar
Jessica committed
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40

    def get_path(self):
        """writes full path, directory, and file name as attributes to class"""
        # get paths/get params dictionary, img/spectrogram/spectrum descriptions
        full_path = os.path.realpath(self.path)
        directory = os.path.dirname(full_path)
        # file name
        basename = os.path.basename(self.path)
        self.full_path = full_path
        self.directory = directory
        self.basename = basename

    #these dictionary parameters will be written to hdf5 file under measurement attributes
    def read_anfatec_params(self):
        """reads the scan parameters and writes them to a dictionary"""
        params_dictionary = {}
        params = True
        with open(self.path, 'r', encoding="ISO-8859-1") as f:
            for line in f:
                if params:
                    sline = [val.strip() for val in line.split(':')]
                    if len(sline) == 2 and sline[0][0] != ';':
                        params_dictionary[sline[0]] = sline[1]
                    #in ANFATEC parameter files, all attributes are written before file references.
                    if sline[0].startswith('FileDesc'):
                        params = False
Jessica's avatar
Jessica committed
41
            f.close()
Jessica's avatar
Jessica committed
42
43
44
45
46
47
48
49
50
        self.params_dictionary = params_dictionary
        self.x_len, self.y_len = int(params_dictionary['xPixel']), int(params_dictionary['yPixel'])

    def read_file_desc(self):
        """reads spectrogram, image, and spectra file descriptions and stores all to dictionary where
        the key:value pairs are filename:[all descriptors]"""
        spectrogram_desc = {}
        img_desc = {}
        spectrum_desc = {}
Jessica's avatar
Jessica committed
51
        with open(self.path,'r', encoding="ISO-8859-1") as f:
Jessica's avatar
Jessica committed
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
            ## can be made more concise...by incorporating conditons with loop control
            lines = f.readlines()
            for index, line in enumerate(lines):
                sline = [val.strip() for val in line.split(':')]
                #if true, then file describes image.
                if sline[0].startswith('FileDescBegin'):
                    no_descriptors = 5
                    file_desc = []
                    for i in range(no_descriptors):
                        line_desc = [val.strip() for val in lines[index+i+1].split(':')]
                        file_desc.append(line_desc[1])
                    #img_desc['filename'] = caption, scale, physical unit, offset
                    img_desc[file_desc[0]] = file_desc[1:]
                #if true, file describes spectrogram (ie hyperspectral image)
                if sline[0].startswith('FileDesc2Begin'):
                    no_descriptors = 10
                    file_desc = []
                    for i  in range(no_descriptors):
                        line_desc = [val.strip() for val in lines[index+i+1].split(':')]
                        file_desc.append(line_desc[1])
                    #caption, bytes perpixel, scale, physical unit, offset, offset, datatype, bytes per reading
                    #filename wavelengths, phys units wavelengths.
                    spectrogram_desc[file_desc[0]] = file_desc[1:]
                if sline[0].startswith('AFMSpectrumDescBegin'):
                    no_descriptors = 3
                    file_desc = []
                    for i in range(no_descriptors):
                        line_desc = [val.strip() for val in lines[index+i+1].split(':')]
                        file_desc.append(line_desc[1])
                    #file name, position x, position y
                    spectrum_desc[file_desc[0]] = file_desc[1:]
Jessica's avatar
Jessica committed
83
            f.close()
Jessica's avatar
Jessica committed
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
        self.img_desc = img_desc
        self.spectrogram_desc = spectrogram_desc
        self.spectrum_desc = spectrum_desc

    def read_spectrograms(self):
        """reads spectrograms, associated spectral values, and saves them in two dictionaries"""
        spectrograms = {}
        spectrogram_spec_vals = {}
        for file_name, descriptors in self.spectrogram_desc.items():
            #load and save spectroscopic values
            spec_vals_i = np.loadtxt(os.path.join(self.directory, file_name.strip('.int') + 'Wavelengths.txt'))
            spectrogram_spec_vals[file_name] = spec_vals_i
            #load and save spectrograms
            spectrogram_i = np.fromfile(os.path.join(self.directory, file_name), dtype='i4')
            spectrograms[file_name] = np.zeros((self.x_len, self.y_len, len(spec_vals_i)))
            for y, line in enumerate(np.split(spectrogram_i, self.y_len)):
                for x, pt_spectrum in enumerate(np.split(line, self.x_len)):
                    spectrograms[file_name][x, y, :] = pt_spectrum * float(descriptors[2])
        self.spectrograms = spectrograms
        self.spectrogram_spec_vals = spectrogram_spec_vals

    def read_imgs(self):
        """reads images and saves to dictionary"""
        imgs = {}
        for file_name, descriptors in self.img_desc.items():
            img_i = np.fromfile(os.path.join(self.directory, file_name), dtype='i4')
            imgs[file_name] = np.zeros((self.x_len, self.y_len))
            for y, line in enumerate(np.split(img_i, self.y_len)):
                for x, pixel in enumerate(np.split(line, self.x_len)):
                    imgs[file_name][x, y] = pixel * float(descriptors[1])
        self.imgs = imgs

    def read_spectra(self):
        """reads all point spectra and saves to dictionary"""
        spectra = {}
        spectra_spec_vals = {}
        spectra_x_y_dim_name = {}
        for file_name, descriptors in self.spectrum_desc.items():
            spectrum_f = np.loadtxt(os.path.join(self.directory, file_name), skiprows=1)
            spectra_spec_vals[file_name] = spectrum_f[:, 0]
            spectra[file_name] = spectrum_f[:,1]
            with open(os.path.join(self.directory, file_name)) as f:
                spectra_x_y_dim_name[file_name]  = f.readline().strip('\n').split('\t')
        self.spectra = spectra
        self.spectra_spec_vals = spectra_spec_vals
        self.spectra_x_y_dim_name = spectra_x_y_dim_name

    def make_pos_vals_inds_dims(self):
        x_range = float(self.params_dictionary['XScanRange'])
        y_range = float(self.params_dictionary['YScanRange'])
        x_center = float(self.params_dictionary['xCenter'])
        y_center = float(self.params_dictionary['yCenter'])

        x_start = x_center-(x_range/2); x_end = x_center+(x_range/2)
        y_start = y_center-(y_range/2); y_end = y_center+(y_range/2)

        dx = x_range/self.x_len
        dy = y_range/self.y_len
        #assumes y scan direction:down; scan angle: 0 deg
        y_linspace = -np.arange(y_start, y_end, step=dy)
        x_linspace = np.arange(x_start, x_end, step=dx)
        pos_ind, pos_val = write_utils.build_ind_val_matrices(unit_values=(x_linspace, y_linspace), is_spectral=False)
        #usid.write_utils.Dimension uses ascii encoding, which can not encode
        # micron symbol, so we replace it, if present, with the letter u.
        pos_dims = [usid.write_utils.Dimension('X', self.params_dictionary['XPhysUnit'].replace('\xb5', 'u'), self.x_len),
                    usid.write_utils.Dimension('Y', self.params_dictionary['YPhysUnit'].replace('\xb5', 'u'), self.y_len)]
        self.pos_ind, self.pos_val, self.pos_dims = pos_ind, pos_val, pos_dims

    def create_hdf5_file(self):
        h5_path = os.path.join(self.directory, self.basename.replace('.txt', '.h5'))
        try:
            self.h5_f = h5py.File(h5_path, mode='w')
        #if file already exists. (maybe there is a better way to check for this)
        except OSError:
            self.h5_f = h5py.File(h5_path, mode='r+')
        self.h5_meas_grp = usid.hdf_utils.create_indexed_group(self.h5_f, 'Measurement_')
        usid.hdf_utils.write_simple_attrs(self.h5_meas_grp, self.params_dictionary)

    def write_spectrograms(self):
        if bool(self.spectrogram_desc):
            for spectrogram_f, descriptors in self.spectrogram_desc.items():
                channel_i = usid.hdf_utils.create_indexed_group(self.h5_meas_grp, 'Channel_')
                spec_vals_i = self.spectrogram_spec_vals[spectrogram_f]
                spectrogram_spec_dims = usid.write_utils.Dimension('Wavelength', descriptors[8], spec_vals_i)
                h5_raw = usid.hdf_utils.write_main_dataset(channel_i,  # parent HDF5 group
                                                           (self.x_len *
                                                            self.y_len, len(spec_vals_i)),  # shape of Main dataset
                                                           'Raw_Data',  # Name of main dataset
                                                           'Spectrogram',  # Physical quantity contained in Main dataset
                                                           descriptors[3],  # Units for the physical quantity
                                                           self.pos_dims,  # Position dimensions
                                                           spectrogram_spec_dims,  # Spectroscopic dimensions
                                                           dtype=np.float32,  # data type / precision
                                                           main_dset_attrs={'Caption': descriptors[0],
                                                                            'Bytes_Per_Pixel': descriptors[1],
                                                                            'Scale': descriptors[2],
                                                                            'Physical_Units': descriptors[3],
                                                                            'Offset': descriptors[4],
                                                                            'Datatype': descriptors[5],
                                                                            'Bytes_Per_Reading': descriptors[6],
                                                                            'Wavelength_File': descriptors[7],
                                                                            'Wavelength_Units': descriptors[8]})
                h5_raw.h5_pos_vals[:, :] = self.pos_val
                h5_raw[:, :] = self.spectrograms[spectrogram_f].reshape(h5_raw.shape)

    def write_images(self):
        if bool(self.img_desc):
            for img_f, descriptors in self.img_desc.items():
                #check for existing spectrogram or image and link position/spec inds/vals
                #at most two channels worth of need to be checked
                try:
                    str_main = str(usid.hdf_utils.get_all_main(self.h5_f['Measurement_000/Channel_000']))
                    i_beg = str_main.find('located at: \n\t') + 14
                    i_end = str_main.find('\nData contains') - 1
                    data_loc = str_main[i_beg:i_end]
                    channel_data = USIDataset(self.h5_f[data_loc])
                    h5_pos_inds = channel_data.h5_pos_inds
                    h5_pos_vals = channel_data.h5_pos_vals
                    pos_dims = None
                    write_pos_vals = False
                    if channel_data.spec_dim_sizes[0] == 1:
                        h5_spec_inds = channel_data.h5_spec_inds
                        h5_spec_vals = channel_data.h5_spec_vals
                        spec_dims = None
                    #if channel 000 is spectrogram, check next dataset
                    elif channel_data.spec_dim_sizes[0] !=1:
                        str_main = str(usid.hdf_utils.get_all_main(self.h5_f['Measurement_000/Channel_001']))
                        i_beg = str_main.find('located at: \n\t') + 14
                        i_end = str_main.find('\nData contains') - 1
                        data_loc = str_main[i_beg:i_end]
                        channel_data = USIDataset(self.h5_f[data_loc])
                        #channel data is an image, & we link their spec inds/vals
                        if channel_data.spec_dim_sizes[0] == 1:
                            h5_spec_inds = channel_data.h5_spec_inds
                            h5_spec_vals = channel_data.h5_spec_vals
                            spec_dims = None

                #in case where channel does not exist, we make new spec/pos inds/vals
                except KeyError:
                    #pos dims
                    h5_pos_inds = None
                    h5_pos_vals = None
                    pos_dims = self.pos_dims
                    write_pos_vals = True
                    #spec dims
                    h5_spec_inds = None
                    h5_spec_vals = None
                    spec_dims = usid.write_utils.Dimension('arb', 'a.u', 1)

                channel_i = usid.hdf_utils.create_indexed_group(self.h5_meas_grp,'Channel_')
                h5_raw = usid.hdf_utils.write_main_dataset(channel_i, #parent HDF5 group
                                                               (self.x_len * self.y_len, 1),  # shape of Main dataset
                                                               'Raw_' + descriptors[0].replace('-', '_'),
                                                               # Name of main dataset
                                                               descriptors[0],
                                                               # Physical quantity contained in Main dataset
                                                               descriptors[2],  # Units for the physical quantity
                                                               h5_pos_inds=h5_pos_inds,
                                                               h5_pos_vals=h5_pos_vals,
                                                               # Position dimensions
                                                               pos_dims=pos_dims,
                                                               # Spectroscopic dimensions
                                                               h5_spec_inds=h5_spec_inds,
                                                               h5_spec_vals=h5_spec_vals,
                                                               spec_dims=spec_dims,
                                                               dtype=np.float32,  # data type / precision
                                                               main_dset_attrs={'Caption': descriptors[0],
                                                                                'Scale': descriptors[1],
                                                                                'Physical_Units': descriptors[2],
                                                                                'Offset': descriptors[3]})
                h5_raw[:, :] = self.imgs[img_f].reshape(h5_raw.shape)
                if write_pos_vals:
                    h5_raw.h5_pos_vals[:, :] = self.pos_val

    def write_spectra(self):
        if bool(self.spectrum_desc):
            for spec_f, descriptors in self.spectrogram_desc.items():
                #create new measurement group for ea spectrum
                self.h5_meas_grp = usid.hdf_utils.create_indexed_group(self.h5_f, 'Measurement_')
                x_name = self.spectra_x_y_dim_name[spec_f][0].split(' ')[0]
                x_unit = self.spectra_x_y_dim_name[spec_f][0].split(' ')[1]
                y_name = self.spectra_x_y_dim_name[spec_f][1].split(' ')[0]
                y_unit = self.spectra_x_y_dim_name[spec_f][1].split(' ')[1]
                spec_i_spec_dims = usid.write_utils.Dimension(x_name, x_unit, self.spectra_spec_vals[spec_f])
                spec_i_pos_dims = [usid.write_utils.Dimension('X',
                                                              self.params_dictionary['XPhysUnit'].replace('\xb5','u'),
                                                              float(descriptors[0])),
                                   usid.write_utils.Dimension('Y',
                                                              self.params_dictionary['YPhysUnit'].replace('\xb5','u'),
                                                              float(descriptors[1]))]
                #write data to a channel in the measurement group
                spec_i_ch = usid.hdf_utils.create_indexed_group(self.h5_meas_grp, 'Channel_')
                h5_raw = usid.hdf_utils.write_main_dataset(spec_i_ch,  # parent HDF5 group
                                                           (1, len(self.spectra_spec_vals[spec_f])),  # shape of Main dataset
                                                           'Raw_Spectrum',
                                                           # Name of main dataset
                                                           y_name,
                                                           # Physical quantity contained in Main dataset
                                                           y_unit,  # Units for the physical quantity
                                                           # Position dimensions
                                                           pos_dims=spec_i_pos_dims, spec_dims=spec_i_spec_dims,
                                                           # Spectroscopic dimensions
                                                           dtype=np.float32,  # data type / precision
                                                           main_dset_attrs={'XLoc': descriptors[0],
                                                                            'YLoc': descriptors[1]})
                h5_raw[:, :] = self.spectra[spec_f].reshape(h5_raw.shape)

    def translate(self):
Jessica's avatar
Jessica committed
292
293
294
        """
        :return: h5 file.
        """
Jessica's avatar
Jessica committed
295
296
297
298
299
300
301
302
303
304
305
        self.get_path()
        self.read_anfatec_params()
        self.read_file_desc()
        self.read_spectrograms()
        self.read_imgs()
        self.read_spectra()
        self.make_pos_vals_inds_dims()
        self.create_hdf5_file()
        self.write_spectrograms()
        self.write_images()
        self.write_spectra()
Jessica's avatar
Jessica committed
306
        return self.h5_f