Commit 557c948c authored by Jessica's avatar Jessica
Browse files

adding pifm translator

parent 0b2b1f8c
import os
import numpy as np
from pyUSID.io.translator import Translator
from pyUSID.io import write_utils
from pyUSID import USIDataset
class PiFMTranslator(Translator):
"""Writes images, spectrograms, point spectra and associated ancillary data sets to h5 file."""
def __init__(self, path=None, *args, **kwargs):
self.path = path
super(HyperspectralTranslator, self).__init__(*args, **kwargs)
def get_path(self):
"""writes full path, directory, and file name as attributes to class"""
# get paths/get params dictionary, img/spectrogram/spectrum descriptions
full_path = os.path.realpath(self.path)
directory = os.path.dirname(full_path)
# file name
basename = os.path.basename(self.path)
self.full_path = full_path
self.directory = directory
self.basename = basename
#these dictionary parameters will be written to hdf5 file under measurement attributes
## make into function
def read_anfatec_params(self):
"""reads the scan parameters and writes them to a dictionary"""
params_dictionary = {}
params = True
with open(self.path, 'r', encoding="ISO-8859-1") as f:
for line in f:
if params:
sline = [val.strip() for val in line.split(':')]
if len(sline) == 2 and sline[0][0] != ';':
params_dictionary[sline[0]] = sline[1]
#in ANFATEC parameter files, all attributes are written before file references.
if sline[0].startswith('FileDesc'):
params = False
self.params_dictionary = params_dictionary
self.x_len, self.y_len = int(params_dictionary['xPixel']), int(params_dictionary['yPixel'])
def read_file_desc(self):
"""reads spectrogram, image, and spectra file descriptions and stores all to dictionary where
the key:value pairs are filename:[all descriptors]"""
spectrogram_desc = {}
img_desc = {}
spectrum_desc = {}
with open(path,'r', encoding="ISO-8859-1") as f:
## can be made more concise...by incorporating conditons with loop control
lines = f.readlines()
for index, line in enumerate(lines):
sline = [val.strip() for val in line.split(':')]
#if true, then file describes image.
if sline[0].startswith('FileDescBegin'):
no_descriptors = 5
file_desc = []
for i in range(no_descriptors):
line_desc = [val.strip() for val in lines[index+i+1].split(':')]
file_desc.append(line_desc[1])
#img_desc['filename'] = caption, scale, physical unit, offset
img_desc[file_desc[0]] = file_desc[1:]
#if true, file describes spectrogram (ie hyperspectral image)
if sline[0].startswith('FileDesc2Begin'):
no_descriptors = 10
file_desc = []
for i in range(no_descriptors):
line_desc = [val.strip() for val in lines[index+i+1].split(':')]
file_desc.append(line_desc[1])
#caption, bytes perpixel, scale, physical unit, offset, offset, datatype, bytes per reading
#filename wavelengths, phys units wavelengths.
spectrogram_desc[file_desc[0]] = file_desc[1:]
if sline[0].startswith('AFMSpectrumDescBegin'):
no_descriptors = 3
file_desc = []
for i in range(no_descriptors):
line_desc = [val.strip() for val in lines[index+i+1].split(':')]
file_desc.append(line_desc[1])
#file name, position x, position y
spectrum_desc[file_desc[0]] = file_desc[1:]
self.img_desc = img_desc
self.spectrogram_desc = spectrogram_desc
self.spectrum_desc = spectrum_desc
def read_spectrograms(self):
"""reads spectrograms, associated spectral values, and saves them in two dictionaries"""
spectrograms = {}
spectrogram_spec_vals = {}
for file_name, descriptors in self.spectrogram_desc.items():
#load and save spectroscopic values
spec_vals_i = np.loadtxt(os.path.join(self.directory, file_name.strip('.int') + 'Wavelengths.txt'))
spectrogram_spec_vals[file_name] = spec_vals_i
#load and save spectrograms
spectrogram_i = np.fromfile(os.path.join(self.directory, file_name), dtype='i4')
spectrograms[file_name] = np.zeros((self.x_len, self.y_len, len(spec_vals_i)))
for y, line in enumerate(np.split(spectrogram_i, self.y_len)):
for x, pt_spectrum in enumerate(np.split(line, self.x_len)):
spectrograms[file_name][x, y, :] = pt_spectrum * float(descriptors[2])
self.spectrograms = spectrograms
self.spectrogram_spec_vals = spectrogram_spec_vals
def read_imgs(self):
"""reads images and saves to dictionary"""
imgs = {}
for file_name, descriptors in self.img_desc.items():
img_i = np.fromfile(os.path.join(self.directory, file_name), dtype='i4')
imgs[file_name] = np.zeros((self.x_len, self.y_len))
for y, line in enumerate(np.split(img_i, self.y_len)):
for x, pixel in enumerate(np.split(line, self.x_len)):
imgs[file_name][x, y] = pixel * float(descriptors[1])
self.imgs = imgs
def read_spectra(self):
"""reads all point spectra and saves to dictionary"""
spectra = {}
spectra_spec_vals = {}
spectra_x_y_dim_name = {}
for file_name, descriptors in self.spectrum_desc.items():
spectrum_f = np.loadtxt(os.path.join(self.directory, file_name), skiprows=1)
spectra_spec_vals[file_name] = spectrum_f[:, 0]
spectra[file_name] = spectrum_f[:,1]
with open(os.path.join(self.directory, file_name)) as f:
spectra_x_y_dim_name[file_name] = f.readline().strip('\n').split('\t')
self.spectra = spectra
self.spectra_spec_vals = spectra_spec_vals
self.spectra_x_y_dim_name = spectra_x_y_dim_name
def make_pos_vals_inds_dims(self):
x_range = float(self.params_dictionary['XScanRange'])
y_range = float(self.params_dictionary['YScanRange'])
x_center = float(self.params_dictionary['xCenter'])
y_center = float(self.params_dictionary['yCenter'])
x_start = x_center-(x_range/2); x_end = x_center+(x_range/2)
y_start = y_center-(y_range/2); y_end = y_center+(y_range/2)
dx = x_range/self.x_len
dy = y_range/self.y_len
#assumes y scan direction:down; scan angle: 0 deg
y_linspace = -np.arange(y_start, y_end, step=dy)
x_linspace = np.arange(x_start, x_end, step=dx)
pos_ind, pos_val = write_utils.build_ind_val_matrices(unit_values=(x_linspace, y_linspace), is_spectral=False)
#usid.write_utils.Dimension uses ascii encoding, which can not encode
# micron symbol, so we replace it, if present, with the letter u.
pos_dims = [usid.write_utils.Dimension('X', self.params_dictionary['XPhysUnit'].replace('\xb5', 'u'), self.x_len),
usid.write_utils.Dimension('Y', self.params_dictionary['YPhysUnit'].replace('\xb5', 'u'), self.y_len)]
self.pos_ind, self.pos_val, self.pos_dims = pos_ind, pos_val, pos_dims
def create_hdf5_file(self):
h5_path = os.path.join(self.directory, self.basename.replace('.txt', '.h5'))
try:
self.h5_f = h5py.File(h5_path, mode='w')
#if file already exists. (maybe there is a better way to check for this)
except OSError:
self.h5_f = h5py.File(h5_path, mode='r+')
self.h5_meas_grp = usid.hdf_utils.create_indexed_group(self.h5_f, 'Measurement_')
usid.hdf_utils.write_simple_attrs(self.h5_meas_grp, self.params_dictionary)
def write_spectrograms(self):
if bool(self.spectrogram_desc):
for spectrogram_f, descriptors in self.spectrogram_desc.items():
channel_i = usid.hdf_utils.create_indexed_group(self.h5_meas_grp, 'Channel_')
spec_vals_i = self.spectrogram_spec_vals[spectrogram_f]
spectrogram_spec_dims = usid.write_utils.Dimension('Wavelength', descriptors[8], spec_vals_i)
h5_raw = usid.hdf_utils.write_main_dataset(channel_i, # parent HDF5 group
(self.x_len *
self.y_len, len(spec_vals_i)), # shape of Main dataset
'Raw_Data', # Name of main dataset
'Spectrogram', # Physical quantity contained in Main dataset
descriptors[3], # Units for the physical quantity
self.pos_dims, # Position dimensions
spectrogram_spec_dims, # Spectroscopic dimensions
dtype=np.float32, # data type / precision
main_dset_attrs={'Caption': descriptors[0],
'Bytes_Per_Pixel': descriptors[1],
'Scale': descriptors[2],
'Physical_Units': descriptors[3],
'Offset': descriptors[4],
'Datatype': descriptors[5],
'Bytes_Per_Reading': descriptors[6],
'Wavelength_File': descriptors[7],
'Wavelength_Units': descriptors[8]})
h5_raw.h5_pos_vals[:, :] = self.pos_val
h5_raw[:, :] = self.spectrograms[spectrogram_f].reshape(h5_raw.shape)
def write_images(self):
if bool(self.img_desc):
for img_f, descriptors in self.img_desc.items():
#check for existing spectrogram or image and link position/spec inds/vals
#at most two channels worth of need to be checked
try:
str_main = str(usid.hdf_utils.get_all_main(self.h5_f['Measurement_000/Channel_000']))
i_beg = str_main.find('located at: \n\t') + 14
i_end = str_main.find('\nData contains') - 1
data_loc = str_main[i_beg:i_end]
channel_data = USIDataset(self.h5_f[data_loc])
h5_pos_inds = channel_data.h5_pos_inds
h5_pos_vals = channel_data.h5_pos_vals
pos_dims = None
write_pos_vals = False
if channel_data.spec_dim_sizes[0] == 1:
h5_spec_inds = channel_data.h5_spec_inds
h5_spec_vals = channel_data.h5_spec_vals
spec_dims = None
#if channel 000 is spectrogram, check next dataset
elif channel_data.spec_dim_sizes[0] !=1:
str_main = str(usid.hdf_utils.get_all_main(self.h5_f['Measurement_000/Channel_001']))
i_beg = str_main.find('located at: \n\t') + 14
i_end = str_main.find('\nData contains') - 1
data_loc = str_main[i_beg:i_end]
channel_data = USIDataset(self.h5_f[data_loc])
#channel data is an image, & we link their spec inds/vals
if channel_data.spec_dim_sizes[0] == 1:
h5_spec_inds = channel_data.h5_spec_inds
h5_spec_vals = channel_data.h5_spec_vals
spec_dims = None
#in case where channel does not exist, we make new spec/pos inds/vals
except KeyError:
#pos dims
h5_pos_inds = None
h5_pos_vals = None
pos_dims = self.pos_dims
write_pos_vals = True
#spec dims
h5_spec_inds = None
h5_spec_vals = None
spec_dims = usid.write_utils.Dimension('arb', 'a.u', 1)
channel_i = usid.hdf_utils.create_indexed_group(self.h5_meas_grp,'Channel_')
h5_raw = usid.hdf_utils.write_main_dataset(channel_i, #parent HDF5 group
(self.x_len * self.y_len, 1), # shape of Main dataset
'Raw_' + descriptors[0].replace('-', '_'),
# Name of main dataset
descriptors[0],
# Physical quantity contained in Main dataset
descriptors[2], # Units for the physical quantity
h5_pos_inds=h5_pos_inds,
h5_pos_vals=h5_pos_vals,
# Position dimensions
pos_dims=pos_dims,
# Spectroscopic dimensions
h5_spec_inds=h5_spec_inds,
h5_spec_vals=h5_spec_vals,
spec_dims=spec_dims,
dtype=np.float32, # data type / precision
main_dset_attrs={'Caption': descriptors[0],
'Scale': descriptors[1],
'Physical_Units': descriptors[2],
'Offset': descriptors[3]})
h5_raw[:, :] = self.imgs[img_f].reshape(h5_raw.shape)
if write_pos_vals:
h5_raw.h5_pos_vals[:, :] = self.pos_val
def write_spectra(self):
if bool(self.spectrum_desc):
for spec_f, descriptors in self.spectrogram_desc.items():
#create new measurement group for ea spectrum
self.h5_meas_grp = usid.hdf_utils.create_indexed_group(self.h5_f, 'Measurement_')
x_name = self.spectra_x_y_dim_name[spec_f][0].split(' ')[0]
x_unit = self.spectra_x_y_dim_name[spec_f][0].split(' ')[1]
y_name = self.spectra_x_y_dim_name[spec_f][1].split(' ')[0]
y_unit = self.spectra_x_y_dim_name[spec_f][1].split(' ')[1]
spec_i_spec_dims = usid.write_utils.Dimension(x_name, x_unit, self.spectra_spec_vals[spec_f])
spec_i_pos_dims = [usid.write_utils.Dimension('X',
self.params_dictionary['XPhysUnit'].replace('\xb5','u'),
float(descriptors[0])),
usid.write_utils.Dimension('Y',
self.params_dictionary['YPhysUnit'].replace('\xb5','u'),
float(descriptors[1]))]
#write data to a channel in the measurement group
spec_i_ch = usid.hdf_utils.create_indexed_group(self.h5_meas_grp, 'Channel_')
h5_raw = usid.hdf_utils.write_main_dataset(spec_i_ch, # parent HDF5 group
(1, len(self.spectra_spec_vals[spec_f])), # shape of Main dataset
'Raw_Spectrum',
# Name of main dataset
y_name,
# Physical quantity contained in Main dataset
y_unit, # Units for the physical quantity
# Position dimensions
pos_dims=spec_i_pos_dims, spec_dims=spec_i_spec_dims,
# Spectroscopic dimensions
dtype=np.float32, # data type / precision
main_dset_attrs={'XLoc': descriptors[0],
'YLoc': descriptors[1]})
h5_raw[:, :] = self.spectra[spec_f].reshape(h5_raw.shape)
def translate(self):
self.get_path()
self.read_anfatec_params()
self.read_file_desc()
self.read_spectrograms()
self.read_imgs()
self.read_spectra()
self.make_pos_vals_inds_dims()
self.create_hdf5_file()
self.write_spectrograms()
self.write_images()
self.write_spectra()
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment