Unverified Commit b62cb5b2 authored by Suhas Somnath's avatar Suhas Somnath Committed by GitHub
Browse files

Merge pull request #193 from pycroscopy/sxm_dev

Add support for Nanonis's sxm file format
parents 35a3c155 80ab5609
......@@ -42,5 +42,5 @@ def read_nanonis_file(file_path):
data = reader(file_path)
return data.header, data.signals
return data, file_ext
import os
import numpy as np
import warnings
_end_tags = dict(grid=':HEADER_END:', scan='SCANIT_END', spec='[DATA]')
......@@ -32,12 +33,6 @@ class NanonisFile(object):
"""
def __init__(self, fname):
"""
Parameters
----------
fname
"""
self.datadir, self.basename = os.path.split(fname)
self.fname = fname
self.filetype = self._determine_filetype()
......@@ -61,14 +56,18 @@ class NanonisFile(object):
'sxm', or 'dat'.
"""
if self.fname[-3:] == '3ds':
_, fname_ext = os.path.splitext(self.fname)
if fname_ext == '.3ds':
return 'grid'
elif self.fname[-3:] == 'sxm':
elif self.fname[-3:] == '.sxm':
return 'scan'
elif self.fname[-3:] == 'dat':
elif self.fname[-3:] == '.dat':
return 'spec'
else:
raise UnhandledFileError('{} is not a supported filetype or does not exist'.format(self.basename))
raise UnhandledFileError(
('{} is not a supported filetype or does not exist')
.format(self.basename)
)
def read_raw_header(self, byte_offset):
"""
......@@ -90,7 +89,7 @@ class NanonisFile(object):
"""
with open(self.fname, 'rb') as f:
return f.read(byte_offset).decode()
return f.read(byte_offset).decode('utf-8', errors='replace')
def start_byte(self):
"""
......@@ -116,14 +115,22 @@ class NanonisFile(object):
for line in f:
# Convert from bytes to str
entry = line.strip().decode()
try:
entry = line.strip().decode()
except UnicodeDecodeError:
warnings.warn(
('{} has non-uft-8 characters, replacing them.'
.format(f.name))
)
entry = line.strip().decode('utf-8', errors='replace')
if tag in entry:
byte_offset = f.tell()
break
if byte_offset == -1:
raise FileHeaderNotFoundError(
'Could not find the {} end tag in {}'.format(tag, self.basename)
('Could not find the {} end tag in {}'
.format(tag, self.basename))
)
return byte_offset
......@@ -157,6 +164,10 @@ class Grid(NanonisFile):
----------
fname : str
Filename for grid file.
header_override : dict, optional
A dict of key:value to override any corresponding key:value should
they be wrong or missing in your header. Keys in header_override must
match keys in Grid.header_raw.
Attributes
----------
......@@ -173,10 +184,11 @@ class Grid(NanonisFile):
If fname does not have a '.3ds' extension.
"""
def __init__(self, fname):
def __init__(self, fname, header_override=None):
_is_valid_file(fname, ext='3ds')
super(Grid, self).__init__(fname)
self.header = _parse_3ds_header(self.header_raw)
super().__init__(fname)
self.header = _parse_3ds_header(self.header_raw,
header_override=header_override)
self.signals = self._load_data()
self.signals['sweep_signal'] = self._derive_sweep_signal()
self.signals['topo'] = self._extract_topo()
......@@ -238,7 +250,8 @@ class Grid(NanonisFile):
sweep_start, sweep_end = self.signals['params'][0, 0, :2]
num_sweep_signal = self.header['num_sweep_signal']
return np.linspace(sweep_start, sweep_end, num_sweep_signal, dtype=np.float32)
return np.linspace(sweep_start, sweep_end, num_sweep_signal,
dtype=np.float32)
def _extract_topo(self):
"""
......@@ -300,7 +313,7 @@ class Scan(NanonisFile):
def __init__(self, fname):
_is_valid_file(fname, ext='sxm')
super(Scan, self).__init__(fname)
super().__init__(fname)
self.header = _parse_sxm_header(self.header_raw)
# data begins with 4 byte code, add 4 bytes to offset instead
......@@ -371,7 +384,7 @@ class Spec(NanonisFile):
def __init__(self, fname):
_is_valid_file(fname, ext='dat')
super(Spec, self).__init__(fname)
super().__init__(fname)
self.header = _parse_dat_header(self.header_raw)
self.signals = self._load_data()
......@@ -396,7 +409,8 @@ class Spec(NanonisFile):
column_names = f.readline().strip('\n').split('\t')
f.close()
header_lines = len(self.header) + 4
specdata = np.genfromtxt(self.fname, delimiter='\t', skip_header=header_lines)
specdata = np.genfromtxt(self.fname, delimiter='\t',
skip_header=header_lines)
for i, name in enumerate(column_names):
data_dict[name] = specdata[:, i]
......@@ -418,7 +432,7 @@ class FileHeaderNotFoundError(Exception):
pass
def _parse_3ds_header(header_raw):
def _parse_3ds_header(header_raw, header_override):
"""
Parse raw header string.
......@@ -445,56 +459,102 @@ def _parse_3ds_header(header_raw):
key, val = _split_header_entry(entry)
raw_dict[key] = val
# Creates new entry if key doesn't match key in raw_dict
if header_override is not None:
for key, val in header_override.items():
raw_dict[key] = val
# Transfer parameters from raw_dict to header_dict
# Get the expected parameters first
header_dict = dict()
# grid dimensions in pixels
dim_px_str = raw_dict.pop('Grid dim', '1 x 1')
header_dict['dim_px'] = [int(val) for val in dim_px_str.split(' x ')]
# grid frame center position, size, angle
grid_str = raw_dict.pop('Grid settings', [0, 0, 0, 0])
header_dict['pos_xy'] = [float(val) for val in grid_str[:2]]
header_dict['size_xy'] = [float(val) for val in grid_str[2:4]]
header_dict['angle'] = float(grid_str[-1])
# sweep signal
header_dict['sweep_signal'] = raw_dict.pop('Sweep Signal', 'Bias (V)')
# fixed parameters
header_dict['fixed_parameters'] = raw_dict.pop('Fixed parameters', [''])
# experimental parameters
header_dict['experimental_parameters'] = raw_dict.pop('Experiment parameters', [''])
# number of parameters (each 4 bytes)
header_dict['num_parameters'] = int(raw_dict.pop('# Parameters (4 byte)',
len(header_dict['fixed_parameters']) +
len(header_dict['experimental_parameters'])))
# experiment size in bytes
header_dict['experiment_size'] = int(raw_dict.pop('Experiment size (bytes)', header_dict['num_parameters'] * 2))
# number of points of sweep signal
header_dict['num_sweep_signal'] = int(raw_dict.pop('Points', 1))
# channel names
header_dict['channels'] = raw_dict.pop('Channels', ['Input 1'])
header_dict['num_channels'] = len(header_dict['channels'])
# measure delay
header_dict['measure_delay'] = float(raw_dict.pop('Delay before measuring (s)', 0))
# metadata
header_dict['experiment_name'] = raw_dict.pop('Experiment', 'Experiment')
header_dict['start_time'] = raw_dict.pop('Start time', '')
header_dict['end_time'] = raw_dict.pop('End time', '')
header_dict['user'] = raw_dict.pop('User', 'User')
header_dict['comment'] = raw_dict.pop('Comment', '')
try:
# grid dimensions in pixels
header_dict['dim_px'] = [int(val)
for val in raw_dict['Grid dim'].split(' x ')]
raw_dict.pop('Grid dim')
# grid frame center position, size, angle.
# Assumes len(raw_dict['Grid settings']) = 4
header_dict['pos_xy'] = [float(val)
for val in raw_dict['Grid settings'][:2]]
header_dict['size_xy'] = [float(val)
for val in raw_dict['Grid settings'][2:4]]
header_dict['angle'] = float(raw_dict['Grid settings'][4])
raw_dict.pop('Grid settings')
# sweep signal
header_dict['sweep_signal'] = raw_dict['Sweep Signal']
raw_dict.pop('Sweep Signal')
# fixed parameters
header_dict['fixed_parameters'] = raw_dict['Fixed parameters']
raw_dict.pop('Fixed parameters')
# experimental parameters
header_dict['experimental_parameters'] = raw_dict[
'Experiment parameters']
raw_dict.pop('Experiment parameters')
# number of parameters (each 4 bytes)
header_dict['num_parameters'] = int(raw_dict['# Parameters (4 byte)'])
raw_dict.pop('# Parameters (4 byte)')
# experiment size in bytes
header_dict['experiment_size'] = int(
raw_dict['Experiment size (bytes)'])
raw_dict.pop('Experiment size (bytes)')
# number of points of sweep signal
header_dict['num_sweep_signal'] = int(raw_dict['Points'])
raw_dict.pop('Points')
# channel names
header_dict['channels'] = raw_dict['Channels']
if type(header_dict['channels']) == str:
# will be str if only one channel,
# make list of str so number of channels can be counted properly
lst = []
lst.append(header_dict['channels'])
header_dict['channels'] = lst
header_dict['num_channels'] = len(header_dict['channels'])
raw_dict.pop('Channels')
# measure delay
header_dict['measure_delay'] = float(
raw_dict['Delay before measuring (s)'])
raw_dict.pop('Delay before measuring (s)')
# metadata
header_dict['experiment_name'] = raw_dict['Experiment']
header_dict['start_time'] = raw_dict['Start time']
header_dict['end_time'] = raw_dict['End time']
header_dict['user'] = raw_dict['User']
header_dict['comment'] = raw_dict['Comment']
raw_dict.pop('Experiment')
raw_dict.pop('Start time')
raw_dict.pop('End time')
raw_dict.pop('User')
raw_dict.pop('Comment')
except (KeyError, ValueError) as e:
msg = (' You can edit your header file or provide an '
'override value in header_override')
# guide user to using override dict
if isinstance(e, KeyError):
raise KeyError(
'[{key}] is missing from header.'.format(key=e.args[0]) + msg
)
elif isinstance(e, ValueError):
print(e.args)
raise ValueError('Unexpected value found in header.' + msg)
else:
raise
# Add all remaining parameters to header_dict unchanged
header_dict.update(raw_dict)
# fold remaining header entries into dict
for key, val in raw_dict.items():
header_dict[key] = val
return header_dict
......@@ -541,10 +601,12 @@ def _parse_sxm_header(header_raw):
break
if header_entries[j][0] == '\t':
count += 1
header_dict[entry.strip(':').lower()] = _parse_scan_header_table(header_entries[i + 1:i + count])
header_dict[entry.strip(':').lower()] = _parse_scan_header_table(
header_entries[i + 1:i + count])
continue
if entry.startswith(':'):
header_dict[entry.strip(':').lower()] = header_entries[i + 1].strip()
header_dict[entry.strip(':').lower()] = (header_entries[i + 1]
.strip())
for key in entries_to_be_split:
header_dict[key] = header_dict[key].split()
......@@ -576,7 +638,13 @@ def _parse_dat_header(header_raw):
header_entries = header_entries[:-3]
header_dict = dict()
for entry in header_entries:
key, val, _ = entry.split('\t')
# homogenize output of .dat files with \t delimit at end of every key
if entry[-1] == '\t':
entry = entry[:-1]
if '\t' not in entry:
entry += '\t'
key, val = entry.split('\t')
header_dict[key] = val
return header_dict
......@@ -693,5 +761,6 @@ def _is_valid_file(fname, ext):
"""
Detect if invalid file is being initialized by class.
"""
if fname[-3:] != ext:
_, fname_ext = os.path.splitext(fname)
if fname_ext[1:] != ext:
raise UnhandledFileError('{} is not a {} file'.format(fname, ext))
# -*- coding: utf-8 -*-
from __future__ import division, print_function, absolute_import, unicode_literals
from __future__ import (division, print_function, absolute_import,
unicode_literals)
import os
import numpy as np
import h5py
from pyUSID.io.hdf_utils import create_indexed_group, write_main_dataset, write_simple_attrs, Dimension, \
write_ind_val_dsets
from pyUSID.io.hdf_utils import (create_indexed_group, write_main_dataset,
write_simple_attrs, Dimension,
write_ind_val_dsets)
from pyUSID.io.translator import Translator
from pyUSID.io.write_utils import get_aux_dset_slicing
from .df_utils.nanonis_utils import read_nanonis_file
......@@ -15,16 +17,17 @@ from .df_utils.nanonis_utils import read_nanonis_file
class NanonisTranslator(Translator):
"""
Translate Nanonis data files (3ds, sxm, and dat) into Pycroscopy compatible HDF5 files
Translator for Nanonis data files.
This translator provides method to translate Nanonis data files
(3ds, sxm, and dat) into Pycroscopy compatible HDF5 files.
Parameters
----------
filepath : str
Path to the input data file.
"""
def __init__(self, filepath, *args, **kwargs):
"""
Parameters
----------
filepath : str
Path to the input data file.
"""
super(Translator, self).__init__(*args, **kwargs)
filepath = os.path.abspath(filepath)
......@@ -48,7 +51,7 @@ class NanonisTranslator(Translator):
self._read_data(self.data_path)
print("The following channels were found in the file:")
for channel in self.parm_dict['channels']:
for channel in self.parm_dict['channel_parms'].keys():
print(channel)
print('You may specify which channels to use when calling translate.')
......@@ -57,7 +60,7 @@ class NanonisTranslator(Translator):
def translate(self, data_channels=None, verbose=False):
"""
Translates the data in the Nanonis file into a Pycroscopy compatible HDF5 file.
Translate the data into a Pycroscopy compatible HDF5 file.
Parameters
----------
......@@ -77,8 +80,8 @@ class NanonisTranslator(Translator):
self._read_data(self.data_path)
if data_channels is None:
print('No channels specified. All channels in file will be used.')
data_channels = self.parm_dict['channels']
print('No channels specified. All channels in file will be used.')
data_channels = self.parm_dict['channel_parms'].keys()
if verbose:
print('Using the following channels')
......@@ -90,34 +93,38 @@ class NanonisTranslator(Translator):
h5_file = h5py.File(self.h5_path, 'w')
# Create measurement group and assign attributes
meas_grp = create_indexed_group(h5_file, 'Measurement')
write_simple_attrs(
meas_grp, self.parm_dict['meas_parms']
)
dc_offset = self.data_dict['sweep_signal']
spec_label, spec_units = self.parm_dict['sweep_signal'].split()
spec_units = spec_units.strip('()')
spec_dim = Dimension(spec_label, spec_units, dc_offset)
# Create datasets for positional and spectroscopic indices and values
spec_dim = self.data_dict['Spectroscopic Dimensions']
pos_dims = self.data_dict['Position Dimensions']
h5_pos_inds, h5_pos_vals = write_ind_val_dsets(meas_grp, pos_dims,
is_spectral=False)
h5_spec_inds, h5_spec_vals = write_ind_val_dsets(meas_grp, spec_dim,
is_spectral=True)
h5_pos_inds, h5_pos_vals = write_ind_val_dsets(meas_grp, pos_dims, is_spectral=False)
h5_spec_inds, h5_spec_vals = write_ind_val_dsets(meas_grp, spec_dim, is_spectral=True)
# Create the datasets for all the channels
num_points = h5_pos_inds.shape[0]
for data_channel in data_channels:
raw_data = self.data_dict[data_channel].reshape([num_points, -1]) * 1E9 # Convert to nA
raw_data = self.data_dict[data_channel].reshape([num_points, -1])
chan_grp = create_indexed_group(meas_grp, 'Channel')
data_label, data_unit = data_channel.rsplit(maxsplit=1)
data_unit = data_unit.strip('()')
data_label = data_channel
data_unit = self.parm_dict['channel_parms'][data_channel]['Unit']
write_simple_attrs(
chan_grp, self.parm_dict['channel_parms'][data_channel]
)
write_main_dataset(chan_grp, raw_data, 'Raw_Data',
data_label, data_unit,
None, None,
h5_pos_inds=h5_pos_inds, h5_pos_vals=h5_pos_vals,
h5_spec_inds=h5_spec_inds, h5_spec_vals=h5_spec_vals)
h5_pos_inds=h5_pos_inds,
h5_pos_vals=h5_pos_vals,
h5_spec_inds=h5_spec_inds,
h5_spec_vals=h5_spec_vals)
h5_file.flush()
h5_file.close()
......@@ -125,13 +132,13 @@ class NanonisTranslator(Translator):
return self.h5_path
def _read_data(self, grid_file_path):
def _read_data(self, file_path):
"""
Handles reading the data from the file and extracting the needed parameters for translating.
Extracting data and parameters from Nanonis files.
Parameters
----------
grid_file_path : str
file_path : str
File path to the source data file
Returns
......@@ -139,37 +146,209 @@ class NanonisTranslator(Translator):
None
"""
header_dict, signal_dict = read_nanonis_file(grid_file_path)
data, file_ext = read_nanonis_file(file_path)
header_dict = data.header
signal_dict = data.signals
if file_ext == '.3ds':
parm_dict, data_dict = self._parse_3ds_parms(header_dict,
signal_dict)
elif file_ext == '.sxm':
parm_dict, data_dict = self._parse_sxm_parms(header_dict,
signal_dict)
else:
parm_dict, data_dict = self._parse_dat_parms(header_dict,
signal_dict)
self.parm_dict = parm_dict
self.data_dict = data_dict
return
@staticmethod
def _parse_sxm_parms(header_dict, signal_dict):
"""
Parse sxm files.
Parameters
----------
header_dict : dict
signal_dict : dict
Returns
-------
parm_dict : dict
"""
parm_dict = dict()
for key, parm_grid in zip(header_dict['fixed_parameters'] + header_dict['experimental_parameters'],
signal_dict['params'].T):
parm_dict[key] = parm_grid
data_dict = dict()
# Create dictionary with measurement parameters
meas_parms = {key: value for key, value in header_dict.items()
if value is not None}
info_dict = meas_parms.pop('data_info')
parm_dict['meas_parms'] = meas_parms
# Create dictionary with channel parameters
channel_parms = dict()
channel_names = info_dict['Name']
single_channel_parms = {name: dict() for name in channel_names}
for field_name, field_value, in info_dict.items():
for channel_name, value in zip(channel_names, field_value):
single_channel_parms[channel_name][field_name] = value
for value in single_channel_parms.values():
if value['Direction'] == 'both':
value['Direction'] = ['forward', 'backward']
else:
direction = [value['Direction']]
scan_dir = meas_parms['scan_dir']
for name, parms in single_channel_parms.items():
for direction in parms['Direction']:
key = ' '.join((name, direction))
channel_parms[key] = dict(parms)
channel_parms[key]['Direction'] = direction
data = signal_dict[name][direction]
if scan_dir == 'up':
data = np.flip(data, axis=0)
if direction == 'backward':
data = np.flip(data, axis=1)
data_dict[key] = data
parm_dict['channel_parms'] = channel_parms
# Position dimensions
num_cols, num_rows = header_dict['scan_pixels']
width, height = header_dict['scan_range']
pos_names = ['X', 'Y']
pos_units = ['nm', 'nm']
pos_vals = np.vstack([
np.linspace(0, width, num_cols),
np.linspace(0, height, num_rows),
])
pos_vals *= 1e9
pos_dims = [Dimension(name, unit, values) for name, unit, values
in zip(pos_names, pos_units, pos_vals)]
data_dict['Position Dimensions'] = pos_dims
# Spectroscopic dimensions
spec_dims = Dimension('arb.', 'a. u.', np.arange(1, dtype=np.float32))
data_dict['Spectroscopic Dimensions'] = spec_dims
return parm_dict, data_dict
@staticmethod
def _parse_3ds_parms(header_dict, signal_dict):
"""
Parse 3ds files.
parm_dict['channels'] = header_dict['channels']
parm_dict['sweep_signal'] = header_dict['sweep_signal']
nx, ny = header_dict['dim_px']
parm_dict['num_cols'] = nx
parm_dict['num_rows'] = ny
Parameters
----------
header_dict : dict
signal_dict : dict
Returns
-------
parm_dict : dict
num_points = nx * ny
pos_vals = np.hstack([parm_dict['X (m)'].reshape(-1, 1), parm_dict['Y (m)'].reshape(-1, 1)])
z_data = signal_dict['Z (m)'][:, :, 0].reshape([num_points, -1])