Unverified Commit c9411fb1 authored by Raj Giridharagopal's avatar Raj Giridharagopal Committed by GitHub
Browse files

Merge pull request #211 from pycroscopy/raj

Edits to PiFM Translator, Added a Bandpass Filter class to processing/fft
parents 73281a8a 1bb41133
......@@ -42,6 +42,7 @@ class PiFMTranslator(Translator):
self.write_spectrograms()
self.write_images()
self.write_spectra()
self.write_ps_spectra()
return self.h5_f
......@@ -83,6 +84,7 @@ class PiFMTranslator(Translator):
spectrogram_desc = {}
img_desc = {}
spectrum_desc = {}
pspectrum_desc = {}
with open(self.path,'r', encoding="ISO-8859-1") as f:
## can be made more concise...by incorporating conditons with loop control
lines = f.readlines()
......@@ -108,17 +110,28 @@ class PiFMTranslator(Translator):
#filename wavelengths, phys units wavelengths.
spectrogram_desc[file_desc[0]] = file_desc[1:]
if sline[0].startswith('AFMSpectrumDescBegin'):
no_descriptors = 3
file_desc = []
for i in range(no_descriptors):
line_desc = [val.strip() for val in lines[index+i+1].split(':')]
file_desc.append(line_desc[1])
#file name, position x, position y
spectrum_desc[file_desc[0]] = file_desc[1:]
line_desc = [val.strip() for val in lines[index+1].split(':')][1]
if 'powerspectrum' in line_desc:
no_descriptors = 2
for i in range(no_descriptors):
line_desc = [val.strip() for val in lines[index+i+1].split(':')]
file_desc.append(line_desc[1])
#file name, position x, position y
pspectrum_desc[file_desc[0]] = file_desc[1:]
else:
no_descriptors = 7
for i in range(no_descriptors):
line_desc = [val.strip() for val in lines[index+i+1].split(':')]
file_desc.append(line_desc[1])
#file name, position x, position y
spectrum_desc[file_desc[0]] = file_desc[1:]
f.close()
self.img_desc = img_desc
self.spectrogram_desc = spectrogram_desc
self.spectrum_desc = spectrum_desc
self.pspectrum_desc = pspectrum_desc
def read_spectrograms(self):
"""reads spectrograms, associated spectral values, and saves them in two dictionaries"""
......@@ -159,6 +172,12 @@ class PiFMTranslator(Translator):
spectra[file_name] = spectrum_f[:,1]
with open(os.path.join(self.directory, file_name)) as f:
spectra_x_y_dim_name[file_name] = f.readline().strip('\n').split('\t')
for file_name, descriptors in self.pspectrum_desc.items():
spectrum_f = np.loadtxt(os.path.join(self.directory, file_name), skiprows=1)
spectra_spec_vals[file_name] = spectrum_f[:, 0]
spectra[file_name] = spectrum_f[:,1]
with open(os.path.join(self.directory, file_name)) as f:
spectra_x_y_dim_name[file_name] = f.readline().strip('\n').split('\t')
self.spectra = spectra
self.spectra_spec_vals = spectra_spec_vals
self.spectra_x_y_dim_name = spectra_x_y_dim_name
......@@ -302,7 +321,7 @@ class PiFMTranslator(Translator):
def write_spectra(self):
if bool(self.spectrum_desc):
for spec_f, descriptors in self.spectrogram_desc.items():
for spec_f, descriptors in self.spectrum_desc.items():
#create new measurement group for ea spectrum
self.h5_meas_grp = usid.hdf_utils.create_indexed_group(self.h5_f, 'Measurement_')
x_name = self.spectra_x_y_dim_name[spec_f][0].split(' ')[0]
......@@ -312,12 +331,12 @@ class PiFMTranslator(Translator):
spec_i_spec_dims = usid.write_utils.Dimension(x_name, x_unit, self.spectra_spec_vals[spec_f])
spec_i_pos_dims = [usid.write_utils.Dimension('X',
self.params_dictionary['XPhysUnit'].replace('\xb5','u'),
float(descriptors[0])),
np.array([float(descriptors[1])])),
usid.write_utils.Dimension('Y',
self.params_dictionary['YPhysUnit'].replace('\xb5','u'),
float(descriptors[1]))]
np.array([float(descriptors[1])]))]
#write data to a channel in the measurement group
spec_i_ch = usid.hdf_utils.create_indexed_group(self.h5_meas_grp, 'Channel_')
spec_i_ch = usid.hdf_utils.create_indexed_group(self.h5_meas_grp, 'Spectrum_')
h5_raw = usid.hdf_utils.write_main_dataset(spec_i_ch, # parent HDF5 group
(1, len(self.spectra_spec_vals[spec_f])), # shape of Main dataset
'Raw_Spectrum',
......@@ -333,3 +352,40 @@ class PiFMTranslator(Translator):
'YLoc': descriptors[1]})
h5_raw[:, :] = self.spectra[spec_f].reshape(h5_raw.shape)
def write_ps_spectra(self):
if bool(self.pspectrum_desc):
for spec_f, descriptors in self.pspectrum_desc.items():
# create new measurement group for ea spectrum
self.h5_meas_grp = usid.hdf_utils.create_indexed_group(self.h5_f, 'Measurement_')
x_name = self.spectra_x_y_dim_name[spec_f][0].split(' ')[0]
x_unit = self.spectra_x_y_dim_name[spec_f][0].split(' ')[1]
y_name = self.spectra_x_y_dim_name[spec_f][1].split(' ')[0]
y_unit = self.spectra_x_y_dim_name[spec_f][1].split(' ')[1]
spec_i_spec_dims = usid.write_utils.Dimension(x_name, x_unit, self.spectra_spec_vals[spec_f])
spec_i_pos_dims = [usid.write_utils.Dimension('X',
self.params_dictionary['XPhysUnit'].replace(
'\xb5', 'u'),
np.array([0])),
usid.write_utils.Dimension('Y',
self.params_dictionary['YPhysUnit'].replace(
'\xb5', 'u'),
np.array([0]))]
# write data to a channel in the measurement group
spec_i_ch = usid.hdf_utils.create_indexed_group(self.h5_meas_grp, 'PowerSpectrum_')
h5_raw = usid.hdf_utils.write_main_dataset(spec_i_ch, # parent HDF5 group
(1, len(self.spectra_spec_vals[spec_f])),
# shape of Main dataset
'Raw_Spectrum',
# Name of main dataset
y_name,
# Physical quantity contained in Main dataset
y_unit, # Units for the physical quantity
# Position dimensions
pos_dims=spec_i_pos_dims, spec_dims=spec_i_spec_dims,
# Spectroscopic dimensions
dtype=np.float32, # data type / precision
main_dset_attrs={'XLoc': 0,
'YLoc': 0})
h5_raw[:, :] = self.spectra[spec_f].reshape(h5_raw.shape)
......@@ -12,6 +12,7 @@ from __future__ import division, print_function, absolute_import
import numpy as np # for all array, data operations
import matplotlib.pyplot as plt # for all plots
from scipy.special import erf
from scipy import signal as sps
from collections import Iterable
from warnings import warn
......@@ -494,3 +495,68 @@ class HarmonicPassFilter(FrequencyFilter):
# end
# end
# F_AI_vec = removeNoiseFreqs(F_AI_vec,sampRate,freqs,freqWidths);
class BandPassFilter(FrequencyFilter):
def __init__(self, signal_length, samp_rate, f_center, f_width,
fir=False, fir_taps=1999):
"""
Builds a bandpass filter
Parameters
----------
signal_length : unsigned int
Points in the FFT. Assuming Signal in frequency space (ie - after FFT shifting)
samp_rate : unsigned integer
Sampling rate
f_center : unsigned integer
Center frequency for filter
f_width : unsigned integer
Frequency width of the pass band
fir : bool, optional
True uses a finite impulse response (FIR) response instead of a standard boxcar. FIR is causal
fir_taps : int
Number of taps (length of filter) for finite impulse response filter
Returns
-------
bpf : 1D numpy array describing the bandpass filter
"""
if f_center >= 0.5 * samp_rate:
raise ValueError('Filter cutoff exceeds Nyquist rate')
self.f_center = f_center
self.f_width = f_width
super(BandPassFilter, self).__init__(signal_length, samp_rate)
cent = int(round(0.5 * signal_length))
ind = int(round(signal_length * (f_center / samp_rate)))
sz = int(round(cent * f_width / samp_rate))
bpf = np.zeros(signal_length, dtype=np.float32)
# Finite Impulse Response or Boxcar
if not fir:
bpf[cent - ind - sz:cent - ind + sz + 1] = 1
bpf[cent + ind - sz:cent + ind + sz + 1] = 1
else:
freq_low = (f_center - f_width) / (0.5 * samp_rate)
freq_high = (f_center + f_width) / (0.5 * samp_rate)
band = [freq_low, freq_high]
taps = sps.firwin(int(fir_taps), band, pass_zero=False, window='blackman')
bpf = np.abs(np.fft.fftshift(np.fft.fft(taps, n=signal_length)))
self.value = bpf
def get_parms(self):
basic_parms = super(BandPassFilter, self).get_parms()
prefix = 'band_pass_'
this_parms = {prefix + 'start_freq': self.f_center, prefix + 'band_width': self.f_width}
this_parms.update(basic_parms)
return this_parms
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment