Skip to content
GitLab
Projects
Groups
Snippets
/
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Sign in
Toggle navigation
Menu
Open sidebar
Vasudevan, Rama K
pycroscopy
Commits
35a3c155
Unverified
Commit
35a3c155
authored
Jan 13, 2019
by
Suhas Somnath
Committed by
GitHub
Jan 13, 2019
Browse files
Merge pull request #201 from kongjy/master
translator for data from molecular vista's photoinduced force microscope
parents
99c7a05f
cc733d70
Changes
1
Hide whitespace changes
Inline
Side-by-side
pycroscopy/io/translators/pifm.py
0 → 100644
View file @
35a3c155
import
os
import
numpy
as
np
from
pyUSID.io.translator
import
Translator
from
pyUSID.io
import
write_utils
from
pyUSID
import
USIDataset
class
PiFMTranslator
(
Translator
):
"""
Class that writes images, spectrograms, point spectra and associated ancillary data sets to h5 file in pyUSID data
structure.
"""
def
__init__
(
self
,
path
=
None
):
self
.
path
=
path
# super(HyperspectralTranslator, self).__init__(*args, **kwargs)
def
get_path
(
self
):
"""writes full path, directory, and file name as attributes to class"""
# get paths/get params dictionary, img/spectrogram/spectrum descriptions
full_path
=
os
.
path
.
realpath
(
self
.
path
)
directory
=
os
.
path
.
dirname
(
full_path
)
# file name
basename
=
os
.
path
.
basename
(
self
.
path
)
self
.
full_path
=
full_path
self
.
directory
=
directory
self
.
basename
=
basename
#these dictionary parameters will be written to hdf5 file under measurement attributes
def
read_anfatec_params
(
self
):
"""reads the scan parameters and writes them to a dictionary"""
params_dictionary
=
{}
params
=
True
with
open
(
self
.
path
,
'r'
,
encoding
=
"ISO-8859-1"
)
as
f
:
for
line
in
f
:
if
params
:
sline
=
[
val
.
strip
()
for
val
in
line
.
split
(
':'
)]
if
len
(
sline
)
==
2
and
sline
[
0
][
0
]
!=
';'
:
params_dictionary
[
sline
[
0
]]
=
sline
[
1
]
#in ANFATEC parameter files, all attributes are written before file references.
if
sline
[
0
].
startswith
(
'FileDesc'
):
params
=
False
f
.
close
()
self
.
params_dictionary
=
params_dictionary
self
.
x_len
,
self
.
y_len
=
int
(
params_dictionary
[
'xPixel'
]),
int
(
params_dictionary
[
'yPixel'
])
def
read_file_desc
(
self
):
"""reads spectrogram, image, and spectra file descriptions and stores all to dictionary where
the key:value pairs are filename:[all descriptors]"""
spectrogram_desc
=
{}
img_desc
=
{}
spectrum_desc
=
{}
with
open
(
self
.
path
,
'r'
,
encoding
=
"ISO-8859-1"
)
as
f
:
## can be made more concise...by incorporating conditons with loop control
lines
=
f
.
readlines
()
for
index
,
line
in
enumerate
(
lines
):
sline
=
[
val
.
strip
()
for
val
in
line
.
split
(
':'
)]
#if true, then file describes image.
if
sline
[
0
].
startswith
(
'FileDescBegin'
):
no_descriptors
=
5
file_desc
=
[]
for
i
in
range
(
no_descriptors
):
line_desc
=
[
val
.
strip
()
for
val
in
lines
[
index
+
i
+
1
].
split
(
':'
)]
file_desc
.
append
(
line_desc
[
1
])
#img_desc['filename'] = caption, scale, physical unit, offset
img_desc
[
file_desc
[
0
]]
=
file_desc
[
1
:]
#if true, file describes spectrogram (ie hyperspectral image)
if
sline
[
0
].
startswith
(
'FileDesc2Begin'
):
no_descriptors
=
10
file_desc
=
[]
for
i
in
range
(
no_descriptors
):
line_desc
=
[
val
.
strip
()
for
val
in
lines
[
index
+
i
+
1
].
split
(
':'
)]
file_desc
.
append
(
line_desc
[
1
])
#caption, bytes perpixel, scale, physical unit, offset, offset, datatype, bytes per reading
#filename wavelengths, phys units wavelengths.
spectrogram_desc
[
file_desc
[
0
]]
=
file_desc
[
1
:]
if
sline
[
0
].
startswith
(
'AFMSpectrumDescBegin'
):
no_descriptors
=
3
file_desc
=
[]
for
i
in
range
(
no_descriptors
):
line_desc
=
[
val
.
strip
()
for
val
in
lines
[
index
+
i
+
1
].
split
(
':'
)]
file_desc
.
append
(
line_desc
[
1
])
#file name, position x, position y
spectrum_desc
[
file_desc
[
0
]]
=
file_desc
[
1
:]
f
.
close
()
self
.
img_desc
=
img_desc
self
.
spectrogram_desc
=
spectrogram_desc
self
.
spectrum_desc
=
spectrum_desc
def
read_spectrograms
(
self
):
"""reads spectrograms, associated spectral values, and saves them in two dictionaries"""
spectrograms
=
{}
spectrogram_spec_vals
=
{}
for
file_name
,
descriptors
in
self
.
spectrogram_desc
.
items
():
#load and save spectroscopic values
spec_vals_i
=
np
.
loadtxt
(
os
.
path
.
join
(
self
.
directory
,
file_name
.
strip
(
'.int'
)
+
'Wavelengths.txt'
))
spectrogram_spec_vals
[
file_name
]
=
spec_vals_i
#load and save spectrograms
spectrogram_i
=
np
.
fromfile
(
os
.
path
.
join
(
self
.
directory
,
file_name
),
dtype
=
'i4'
)
spectrograms
[
file_name
]
=
np
.
zeros
((
self
.
x_len
,
self
.
y_len
,
len
(
spec_vals_i
)))
for
y
,
line
in
enumerate
(
np
.
split
(
spectrogram_i
,
self
.
y_len
)):
for
x
,
pt_spectrum
in
enumerate
(
np
.
split
(
line
,
self
.
x_len
)):
spectrograms
[
file_name
][
x
,
y
,
:]
=
pt_spectrum
*
float
(
descriptors
[
2
])
self
.
spectrograms
=
spectrograms
self
.
spectrogram_spec_vals
=
spectrogram_spec_vals
def
read_imgs
(
self
):
"""reads images and saves to dictionary"""
imgs
=
{}
for
file_name
,
descriptors
in
self
.
img_desc
.
items
():
img_i
=
np
.
fromfile
(
os
.
path
.
join
(
self
.
directory
,
file_name
),
dtype
=
'i4'
)
imgs
[
file_name
]
=
np
.
zeros
((
self
.
x_len
,
self
.
y_len
))
for
y
,
line
in
enumerate
(
np
.
split
(
img_i
,
self
.
y_len
)):
for
x
,
pixel
in
enumerate
(
np
.
split
(
line
,
self
.
x_len
)):
imgs
[
file_name
][
x
,
y
]
=
pixel
*
float
(
descriptors
[
1
])
self
.
imgs
=
imgs
def
read_spectra
(
self
):
"""reads all point spectra and saves to dictionary"""
spectra
=
{}
spectra_spec_vals
=
{}
spectra_x_y_dim_name
=
{}
for
file_name
,
descriptors
in
self
.
spectrum_desc
.
items
():
spectrum_f
=
np
.
loadtxt
(
os
.
path
.
join
(
self
.
directory
,
file_name
),
skiprows
=
1
)
spectra_spec_vals
[
file_name
]
=
spectrum_f
[:,
0
]
spectra
[
file_name
]
=
spectrum_f
[:,
1
]
with
open
(
os
.
path
.
join
(
self
.
directory
,
file_name
))
as
f
:
spectra_x_y_dim_name
[
file_name
]
=
f
.
readline
().
strip
(
'
\n
'
).
split
(
'
\t
'
)
self
.
spectra
=
spectra
self
.
spectra_spec_vals
=
spectra_spec_vals
self
.
spectra_x_y_dim_name
=
spectra_x_y_dim_name
def
make_pos_vals_inds_dims
(
self
):
x_range
=
float
(
self
.
params_dictionary
[
'XScanRange'
])
y_range
=
float
(
self
.
params_dictionary
[
'YScanRange'
])
x_center
=
float
(
self
.
params_dictionary
[
'xCenter'
])
y_center
=
float
(
self
.
params_dictionary
[
'yCenter'
])
x_start
=
x_center
-
(
x_range
/
2
);
x_end
=
x_center
+
(
x_range
/
2
)
y_start
=
y_center
-
(
y_range
/
2
);
y_end
=
y_center
+
(
y_range
/
2
)
dx
=
x_range
/
self
.
x_len
dy
=
y_range
/
self
.
y_len
#assumes y scan direction:down; scan angle: 0 deg
y_linspace
=
-
np
.
arange
(
y_start
,
y_end
,
step
=
dy
)
x_linspace
=
np
.
arange
(
x_start
,
x_end
,
step
=
dx
)
pos_ind
,
pos_val
=
write_utils
.
build_ind_val_matrices
(
unit_values
=
(
x_linspace
,
y_linspace
),
is_spectral
=
False
)
#usid.write_utils.Dimension uses ascii encoding, which can not encode
# micron symbol, so we replace it, if present, with the letter u.
pos_dims
=
[
usid
.
write_utils
.
Dimension
(
'X'
,
self
.
params_dictionary
[
'XPhysUnit'
].
replace
(
'
\xb5
'
,
'u'
),
self
.
x_len
),
usid
.
write_utils
.
Dimension
(
'Y'
,
self
.
params_dictionary
[
'YPhysUnit'
].
replace
(
'
\xb5
'
,
'u'
),
self
.
y_len
)]
self
.
pos_ind
,
self
.
pos_val
,
self
.
pos_dims
=
pos_ind
,
pos_val
,
pos_dims
def
create_hdf5_file
(
self
):
h5_path
=
os
.
path
.
join
(
self
.
directory
,
self
.
basename
.
replace
(
'.txt'
,
'.h5'
))
try
:
self
.
h5_f
=
h5py
.
File
(
h5_path
,
mode
=
'w'
)
#if file already exists. (maybe there is a better way to check for this)
except
OSError
:
self
.
h5_f
=
h5py
.
File
(
h5_path
,
mode
=
'r+'
)
self
.
h5_meas_grp
=
usid
.
hdf_utils
.
create_indexed_group
(
self
.
h5_f
,
'Measurement_'
)
usid
.
hdf_utils
.
write_simple_attrs
(
self
.
h5_meas_grp
,
self
.
params_dictionary
)
def
write_spectrograms
(
self
):
if
bool
(
self
.
spectrogram_desc
):
for
spectrogram_f
,
descriptors
in
self
.
spectrogram_desc
.
items
():
channel_i
=
usid
.
hdf_utils
.
create_indexed_group
(
self
.
h5_meas_grp
,
'Channel_'
)
spec_vals_i
=
self
.
spectrogram_spec_vals
[
spectrogram_f
]
spectrogram_spec_dims
=
usid
.
write_utils
.
Dimension
(
'Wavelength'
,
descriptors
[
8
],
spec_vals_i
)
h5_raw
=
usid
.
hdf_utils
.
write_main_dataset
(
channel_i
,
# parent HDF5 group
(
self
.
x_len
*
self
.
y_len
,
len
(
spec_vals_i
)),
# shape of Main dataset
'Raw_Data'
,
# Name of main dataset
'Spectrogram'
,
# Physical quantity contained in Main dataset
descriptors
[
3
],
# Units for the physical quantity
self
.
pos_dims
,
# Position dimensions
spectrogram_spec_dims
,
# Spectroscopic dimensions
dtype
=
np
.
float32
,
# data type / precision
main_dset_attrs
=
{
'Caption'
:
descriptors
[
0
],
'Bytes_Per_Pixel'
:
descriptors
[
1
],
'Scale'
:
descriptors
[
2
],
'Physical_Units'
:
descriptors
[
3
],
'Offset'
:
descriptors
[
4
],
'Datatype'
:
descriptors
[
5
],
'Bytes_Per_Reading'
:
descriptors
[
6
],
'Wavelength_File'
:
descriptors
[
7
],
'Wavelength_Units'
:
descriptors
[
8
]})
h5_raw
.
h5_pos_vals
[:,
:]
=
self
.
pos_val
h5_raw
[:,
:]
=
self
.
spectrograms
[
spectrogram_f
].
reshape
(
h5_raw
.
shape
)
def
write_images
(
self
):
if
bool
(
self
.
img_desc
):
for
img_f
,
descriptors
in
self
.
img_desc
.
items
():
#check for existing spectrogram or image and link position/spec inds/vals
#at most two channels worth of need to be checked
try
:
str_main
=
str
(
usid
.
hdf_utils
.
get_all_main
(
self
.
h5_f
[
'Measurement_000/Channel_000'
]))
i_beg
=
str_main
.
find
(
'located at:
\n\t
'
)
+
14
i_end
=
str_main
.
find
(
'
\n
Data contains'
)
-
1
data_loc
=
str_main
[
i_beg
:
i_end
]
channel_data
=
USIDataset
(
self
.
h5_f
[
data_loc
])
h5_pos_inds
=
channel_data
.
h5_pos_inds
h5_pos_vals
=
channel_data
.
h5_pos_vals
pos_dims
=
None
write_pos_vals
=
False
if
channel_data
.
spec_dim_sizes
[
0
]
==
1
:
h5_spec_inds
=
channel_data
.
h5_spec_inds
h5_spec_vals
=
channel_data
.
h5_spec_vals
spec_dims
=
None
#if channel 000 is spectrogram, check next dataset
elif
channel_data
.
spec_dim_sizes
[
0
]
!=
1
:
str_main
=
str
(
usid
.
hdf_utils
.
get_all_main
(
self
.
h5_f
[
'Measurement_000/Channel_001'
]))
i_beg
=
str_main
.
find
(
'located at:
\n\t
'
)
+
14
i_end
=
str_main
.
find
(
'
\n
Data contains'
)
-
1
data_loc
=
str_main
[
i_beg
:
i_end
]
channel_data
=
USIDataset
(
self
.
h5_f
[
data_loc
])
#channel data is an image, & we link their spec inds/vals
if
channel_data
.
spec_dim_sizes
[
0
]
==
1
:
h5_spec_inds
=
channel_data
.
h5_spec_inds
h5_spec_vals
=
channel_data
.
h5_spec_vals
spec_dims
=
None
#in case where channel does not exist, we make new spec/pos inds/vals
except
KeyError
:
#pos dims
h5_pos_inds
=
None
h5_pos_vals
=
None
pos_dims
=
self
.
pos_dims
write_pos_vals
=
True
#spec dims
h5_spec_inds
=
None
h5_spec_vals
=
None
spec_dims
=
usid
.
write_utils
.
Dimension
(
'arb'
,
'a.u'
,
1
)
channel_i
=
usid
.
hdf_utils
.
create_indexed_group
(
self
.
h5_meas_grp
,
'Channel_'
)
h5_raw
=
usid
.
hdf_utils
.
write_main_dataset
(
channel_i
,
#parent HDF5 group
(
self
.
x_len
*
self
.
y_len
,
1
),
# shape of Main dataset
'Raw_'
+
descriptors
[
0
].
replace
(
'-'
,
'_'
),
# Name of main dataset
descriptors
[
0
],
# Physical quantity contained in Main dataset
descriptors
[
2
],
# Units for the physical quantity
h5_pos_inds
=
h5_pos_inds
,
h5_pos_vals
=
h5_pos_vals
,
# Position dimensions
pos_dims
=
pos_dims
,
# Spectroscopic dimensions
h5_spec_inds
=
h5_spec_inds
,
h5_spec_vals
=
h5_spec_vals
,
spec_dims
=
spec_dims
,
dtype
=
np
.
float32
,
# data type / precision
main_dset_attrs
=
{
'Caption'
:
descriptors
[
0
],
'Scale'
:
descriptors
[
1
],
'Physical_Units'
:
descriptors
[
2
],
'Offset'
:
descriptors
[
3
]})
h5_raw
[:,
:]
=
self
.
imgs
[
img_f
].
reshape
(
h5_raw
.
shape
)
if
write_pos_vals
:
h5_raw
.
h5_pos_vals
[:,
:]
=
self
.
pos_val
def
write_spectra
(
self
):
if
bool
(
self
.
spectrum_desc
):
for
spec_f
,
descriptors
in
self
.
spectrogram_desc
.
items
():
#create new measurement group for ea spectrum
self
.
h5_meas_grp
=
usid
.
hdf_utils
.
create_indexed_group
(
self
.
h5_f
,
'Measurement_'
)
x_name
=
self
.
spectra_x_y_dim_name
[
spec_f
][
0
].
split
(
' '
)[
0
]
x_unit
=
self
.
spectra_x_y_dim_name
[
spec_f
][
0
].
split
(
' '
)[
1
]
y_name
=
self
.
spectra_x_y_dim_name
[
spec_f
][
1
].
split
(
' '
)[
0
]
y_unit
=
self
.
spectra_x_y_dim_name
[
spec_f
][
1
].
split
(
' '
)[
1
]
spec_i_spec_dims
=
usid
.
write_utils
.
Dimension
(
x_name
,
x_unit
,
self
.
spectra_spec_vals
[
spec_f
])
spec_i_pos_dims
=
[
usid
.
write_utils
.
Dimension
(
'X'
,
self
.
params_dictionary
[
'XPhysUnit'
].
replace
(
'
\xb5
'
,
'u'
),
float
(
descriptors
[
0
])),
usid
.
write_utils
.
Dimension
(
'Y'
,
self
.
params_dictionary
[
'YPhysUnit'
].
replace
(
'
\xb5
'
,
'u'
),
float
(
descriptors
[
1
]))]
#write data to a channel in the measurement group
spec_i_ch
=
usid
.
hdf_utils
.
create_indexed_group
(
self
.
h5_meas_grp
,
'Channel_'
)
h5_raw
=
usid
.
hdf_utils
.
write_main_dataset
(
spec_i_ch
,
# parent HDF5 group
(
1
,
len
(
self
.
spectra_spec_vals
[
spec_f
])),
# shape of Main dataset
'Raw_Spectrum'
,
# Name of main dataset
y_name
,
# Physical quantity contained in Main dataset
y_unit
,
# Units for the physical quantity
# Position dimensions
pos_dims
=
spec_i_pos_dims
,
spec_dims
=
spec_i_spec_dims
,
# Spectroscopic dimensions
dtype
=
np
.
float32
,
# data type / precision
main_dset_attrs
=
{
'XLoc'
:
descriptors
[
0
],
'YLoc'
:
descriptors
[
1
]})
h5_raw
[:,
:]
=
self
.
spectra
[
spec_f
].
reshape
(
h5_raw
.
shape
)
def
translate
(
self
):
"""
:return: h5 file.
"""
self
.
get_path
()
self
.
read_anfatec_params
()
self
.
read_file_desc
()
self
.
read_spectrograms
()
self
.
read_imgs
()
self
.
read_spectra
()
self
.
make_pos_vals_inds_dims
()
self
.
create_hdf5_file
()
self
.
write_spectrograms
()
self
.
write_images
()
self
.
write_spectra
()
return
self
.
h5_f
\ No newline at end of file
Write
Preview
Supports
Markdown
0%
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment