Commit 94dad287 authored by nlaanait's avatar nlaanait
Browse files

Complete Restructuring of the Model Class and addition of the Optimize Class....

Complete Restructuring of the Model Class and addition of the Optimize Class. Paralleling computing of BESHO guesses has been tested and is functional.
parent 8c9fef23
......@@ -6,6 +6,6 @@ from . import guess_methods
from .guess_methods import GuessMethods
from . import model
from .model import Model
__all__ = ['GuessMethods', 'Model', 'BESHOmodel', 'utils']
from .optimize import Optimize
__all__ = ['GuessMethods', 'Model', 'BESHOmodel', 'utils', 'Optimize']
__all__ += utils.__all__
......@@ -12,11 +12,13 @@ from ..io.hdf_utils import buildReducedSpec, copyRegionRefs, linkRefs, getAuxDat
copyAttributes
from ..io.microdata import MicroDataset, MicroDataGroup
from .guess_methods import GuessMethods
import multiprocessing as mp
try:
import multiprocess as mp
except ImportError:
raise ImportError()
import multiprocessing as mp
# try:
# import multiprocess as mp
# except ImportError:
# raise ImportError()
sho32 = np.dtype([('Amplitude [V]', np.float32), ('Frequency [Hz]', np.float32),
('Quality Factor', np.float32), ('Phase [rad]', np.float32),
......@@ -171,13 +173,13 @@ class BESHOmodel(Model):
--------
None
"""
if self.__start_pos < self.h5_main.shape[0]:
self.__end_pos = int(min(self.h5_main.shape[0], self.__start_pos + self._max_pos_per_read))
self.data = self.h5_main[self.__start_pos:self.__end_pos, :]
print('Reading pixels {} to {} of {}'.format(self.__start_pos, self.__end_pos, self.h5_main.shape[0]))
if self._start_pos < self.h5_main.shape[0]:
self.__end_pos = int(min(self.h5_main.shape[0], self._start_pos + self._max_pos_per_read))
self.data = self.h5_main[self._start_pos:self.__end_pos, :]
print('Reading pixels {} to {} of {}'.format(self._start_pos, self.__end_pos, self.h5_main.shape[0]))
# Now update the start position
self.__start_pos = self.__end_pos
self._start_pos = self.__end_pos
else:
print('Finished reading all data!')
self.data = None
......@@ -200,10 +202,10 @@ class BESHOmodel(Model):
None
"""
if self.data is None:
self.__end_pos = int(min(self.h5_main.shape[0], self.__start_pos + self._max_pos_per_read))
self.guess = self.h5_guess[self.__start_pos:self.__end_pos, :]
self.__end_pos = int(min(self.h5_main.shape[0], self._start_pos + self._max_pos_per_read))
self.guess = self.h5_guess[self._start_pos:self.__end_pos, :]
else:
self.guess = self.h5_guess[self.__start_pos:self.__end_pos, :]
self.guess = self.h5_guess[self._start_pos:self.__end_pos, :]
# At this point the self.data object is the raw data that needs to be reshaped to a single UDVS step:
self.guess = reshapeToOneStep(self.guess, self.num_udvs_steps)
# don't keep the R^2.
......@@ -236,7 +238,8 @@ class BESHOmodel(Model):
# ask super to take care of the rest, which is a standardized operation
super(BESHOmodel, self)._setResults(is_guess)
def computeGuess(self, strategy='wavelet_peaks', options={"peak_widths": np.array([10,200])}, **kwargs):
def doGuess(self, processors=4, strategy='wavelet_peaks',
options={"peak_widths": np.array([10,200]),"peak_step":20}, **kwargs):
"""
Parameters
......@@ -246,7 +249,7 @@ class BESHOmodel(Model):
Default is 'Wavelet_Peaks'.
Can be one of ['wavelet_peaks', 'relative_maximum', 'gaussian_processes']. For updated list, run GuessMethods.methods
options: dict
Default {"peaks_widths": np.array([10,200])}}.
Default Options for wavelet_peaks{"peaks_widths": np.array([10,200]), "peak_step":20}.
Dictionary of options passed to strategy. For more info see GuessMethods documentation.
kwargs:
......@@ -257,53 +260,54 @@ class BESHOmodel(Model):
-------
"""
processors = min(processors, self._maxCpus)
self._createGuessDatasets()
self.__start_pos = 0
processors = kwargs.get("processors", self._maxCpus)
gm = GuessMethods()
if strategy in gm.methods:
func = gm.__getattribute__(strategy)(frequencies=self.freq_vec, **options)
results = list()
if self._parallel:
# start pool of workers
print('Computing Guesses In parallel ... launching %i kernels...' % processors)
pool = mp.Pool(processors)
self._getDataChunk()
while self.data is not None: # as long as we have not reached the end of this data set:
# apply guess to this data chunk:
tasks = [vector for vector in self.data]
chunk = int(self.data.shape[0] / processors)
jobs = pool.imap(func, tasks, chunksize=chunk)
# get Results from different processes
print('Extracting Guesses...')
temp = [j for j in jobs]
# Reformat the data to the appropriate type and or do additional computation now
results.append(self._reformatResults(temp, strategy))
# read the next chunk
self._getDataChunk()
# Finished reading the entire data set
print('closing %i kernels...' % processors)
pool.close()
else:
print("Computing Guesses In Serial ...")
self._getDataChunk()
while self.data is not None: # as long as we have not reached the end of this data set:
temp = [func(vector) for vector in self.data]
results.append(self._reformatResults(temp, strategy))
# read the next chunk
self._getDataChunk()
# reorder to get one numpy array out
self.guess = np.hstack(tuple(results))
print('Completed computing guess. Writing to file.')
# Write to file
self._setResults(is_guess=True)
else:
warn('Error: %s is not implemented in pycroscopy.analysis.GuessMethods to find guesses' % strategy)
self._start_pos = 0
super(BESHOmodel, self).doGuess(processors=processors, strategy=strategy, **options)
# processors = kwargs.get("processors", self._maxCpus)
# gm = GuessMethods()
# if strategy in gm.methods:
# func = gm.__getattribute__(strategy)(frequencies=self.freq_vec, **options)
# results = list()
# if self._parallel:
# # start pool of workers
# print('Computing Guesses In parallel ... launching %i kernels...' % processors)
# pool = mp.Pool(processors)
# self._getDataChunk()
# while self.data is not None: # as long as we have not reached the end of this data set:
# # apply guess to this data chunk:
# tasks = [vector for vector in self.data]
# chunk = int(self.data.shape[0] / processors)
# jobs = pool.imap(func, tasks, chunksize=chunk)
# # get Results from different processes
# print('Extracting Guesses...')
# temp = [j for j in jobs]
# # Reformat the data to the appropriate type and or do additional computation now
# results.append(self._reformatResults(temp, strategy))
# # read the next chunk
# self._getDataChunk()
#
# # Finished reading the entire data set
# print('closing %i kernels...' % processors)
# pool.close()
# else:
# print("Computing Guesses In Serial ...")
# self._getDataChunk()
# while self.data is not None: # as long as we have not reached the end of this data set:
# temp = [func(vector) for vector in self.data]
# results.append(self._reformatResults(temp, strategy))
# # read the next chunk
# self._getDataChunk()
#
# # reorder to get one numpy array out
# self.guess = np.hstack(tuple(results))
# print('Completed computing guess. Writing to file.')
#
# # Write to file
# self._setResults(is_guess=True)
# else:
# warn('Error: %s is not implemented in pycroscopy.analysis.GuessMethods to find guesses' % strategy)
def computeFit(self, strategy='SHO', options={}, **kwargs):
......@@ -328,7 +332,7 @@ class BESHOmodel(Model):
"""
self._createFitDataset()
self.__start_pos = 0
self._start_pos = 0
parallel = ''
processors = kwargs.get("processors", self._maxCpus)
......
......@@ -23,8 +23,8 @@ class GuessMethods(object):
def __init__(self):
self.methods = ['wavelet_peaks', 'relative_maximum', 'gaussian_processes', 'complex_gaussian']
@classmethod
def wavelet_peaks(self, *args, **kwargs):
@staticmethod
def wavelet_peaks(*args, **kwargs):
"""
This is a wrapper over scipy.signal.find_peaks_cwt() that finds peaks in the data using wavelet convolution.
......@@ -70,8 +70,8 @@ class GuessMethods(object):
except KeyError:
warn('Error: Please specify "peak_widths" kwarg to use this method')
@classmethod
def absolute_maximum(self, *args, **kwargs):
@staticmethod
def absolute_maximum( *args, **kwargs):
"""
Finds maximum in 1d-array
Parameters
......@@ -88,16 +88,16 @@ class GuessMethods(object):
return vec_max
return fastpeak
@classmethod
def relative_maximum(self, *args, **kwargs):
@staticmethod
def relative_maximum(*args, **kwargs):
pass
@classmethod
def gaussianProcesses(self, *args, **kwargs):
@staticmethod
def gaussianProcesses(*args, **kwargs):
pass
@classmethod
def complex_gaussian(self, *args, **kwargs):
@staticmethod
def complex_gaussian(*args, **kwargs):
"""
Sets up the needed parameters for the analytic approximation for the
Gaussian fit of complex data.
......
......@@ -11,10 +11,12 @@ import scipy
from .guess_methods import GuessMethods
from ..io.hdf_utils import checkIfMain, getAuxData
from ..io.io_hdf5 import ioHDF5
try:
import multiprocess as mp
except ImportError:
raise ImportError()
from .optimize import Optimize
import multiprocessing as mp
# try:
# import multiprocess as mp
# except ImportError:
# raise ImportError()
class Model(object):
......@@ -23,7 +25,7 @@ class Model(object):
This abstract class should be extended to cover different types of imaging modalities.
"""
def __init__(self, h5_main, variables=['Frequency']):
def __init__(self, h5_main, variables=['Frequency'], parallel=True):
"""
For now, we assume that the guess dataset has not been generated for this dataset but we will relax this requirement
after testing the basic components.
......@@ -48,17 +50,12 @@ class Model(object):
warn('Provided dataset is not a "Main" dataset with necessary ancillary datasets')
return
# Checking if parallel processing will be used
try:
import multiprocess
self._parallel = True
except ImportError:
warn("Multiprocess package (pip,github) is needed for parallel computation.\nSwitching to serial version.")
self._parallel = False
self._parallel=parallel
# Determining the max size of the data that can be put into memory
self._setMemoryAndCPUs()
self.__start_pos = 0
self._start_pos = 0
self.__end_pos = self.h5_main.shape[0]
self.h5_guess = None
self.h5_fit = None
......@@ -136,13 +133,13 @@ class Model(object):
Returns:
--------
"""
if self.__start_pos < self.h5_main.shape[0]:
self.__end_pos = int(min(self.h5_main.shape[0], self.__start_pos + self._max_pos_per_read))
self.data = self.h5_main[self.__start_pos:self.__end_pos, :]
print('Reading pixels {} to {} of {}'.format(self.__start_pos, self.__end_pos, self.h5_main.shape[0]))
if self._start_pos < self.h5_main.shape[0]:
self.__end_pos = int(min(self.h5_main.shape[0], self._start_pos + self._max_pos_per_read))
self.data = self.h5_main[self._start_pos:self.__end_pos, :]
print('Reading pixels {} to {} of {}'.format(self._start_pos, self.__end_pos, self.h5_main.shape[0]))
# Now update the start position
self.__start_pos = self.__end_pos
self._start_pos = self.__end_pos
else:
print('Finished reading all data!')
self.data = None
......@@ -162,10 +159,10 @@ class Model(object):
--------
"""
if self.data is None:
self.__end_pos = int(min(self.h5_main.shape[0], self.__start_pos + self._max_pos_per_read))
self.guess = self.h5_guess[self.__start_pos:self.__end_pos, :]
self.__end_pos = int(min(self.h5_main.shape[0], self._start_pos + self._max_pos_per_read))
self.guess = self.h5_guess[self._start_pos:self.__end_pos, :]
else:
self.guess = self.h5_guess[self.__start_pos:self.__end_pos, :]
self.guess = self.h5_guess[self._start_pos:self.__end_pos, :]
def _setResults(self, is_guess=False):
"""
......@@ -234,7 +231,8 @@ class Model(object):
self.fit = None # replace with actual h5 dataset
pass
def computeGuess(self, strategy='wavelet_peaks', options={"peak_widths": np.array([10,200])}, **kwargs):
def doGuess(self, processors=4, strategy='wavelet_peaks',
options={"peak_widths": np.array([10,200]), "peak_step":20}, **kwargs):
"""
Parameters
......@@ -244,7 +242,7 @@ class Model(object):
Default is 'Wavelet_Peaks'.
Can be one of ['wavelet_peaks', 'relative_maximum', 'gaussian_processes']. For updated list, run GuessMethods.methods
options: dict
Default {"peaks_widths": np.array([10,200])}}.
Default, options for wavelet_peaks {"peaks_widths": np.array([10,200]), "peak_step":20}.
Dictionary of options passed to strategy. For more info see GuessMethods documentation.
kwargs:
......@@ -257,42 +255,18 @@ class Model(object):
"""
self._createGuessDatasets()
self.__start_pos = 0
processors = kwargs.get("processors", self._maxCpus)
self._start_pos = 0
self._getDataChunk()
processors = min(processors, self._maxCpus)
gm = GuessMethods()
results = list()
if strategy in gm.methods:
func = gm.__getattribute__(strategy)(**options)
results = list()
if self._parallel:
# start pool of workers
print('Computing Guesses In parallel ... launching %i kernels...' % processors)
pool = mp.Pool(processors)
self._getDataChunk()
while self.data is not None: # as long as we have not reached the end of this data set:
# apply guess to this data chunk:
tasks = [vector for vector in self.data]
chunk = int(self.data.shape[0] / processors)
jobs = pool.imap(func, tasks, chunksize=chunk)
# get Results from different processes
print('Extracting Guesses...')
temp = [j for j in jobs]
# Reformat the data to the appropriate type and or do additional computation now
results.append(self._reformatResults(temp, strategy))
# read the next chunk
self._getDataChunk()
# Finished reading the entire data set
print('closing %i kernels...' % processors)
pool.close()
else:
print("Computing Guesses In Serial ...")
print("Using %s to find guesses...\n" % (strategy))
while self.data is not None:
opt = Optimize(data=self.data, parallel=self._parallel)
temp = opt.computeGuess(processors=processors, strategy=strategy, options=options)
results.append(self._reformatResults(temp, strategy))
self._getDataChunk()
while self.data is not None: # as long as we have not reached the end of this data set:
temp = [func(vector) for vector in self.data]
results.append(self._reformatResults(temp, strategy))
# read the next chunk
self._getDataChunk()
# reorder to get one numpy array out
self.guess = np.hstack(tuple(results))
......@@ -303,6 +277,46 @@ class Model(object):
else:
warn('Error: %s is not implemented in pycroscopy.analysis.GuessMethods to find guesses' % strategy)
# if self._parallel:
# # start pool of workers
# print('Computing Guesses In parallel ... launching %i kernels...' % processors)
# pool = mp.Pool(processors)
# self._getDataChunk()
# while self.data is not None: # as long as we have not reached the end of this data set:
# opt = Optimize(data=self.data, parallel=self._)
# # apply guess to this data chunk:
# tasks = [vector for vector in self.data]
# chunk = int(self.data.shape[0] / processors)
# jobs = pool.imap(func, tasks, chunksize=chunk)
# # get Results from different processes
# print('Extracting Guesses...')
# temp = [j for j in jobs]
# # Reformat the data to the appropriate type and or do additional computation now
# results.append(self._reformatResults(temp, strategy))
# # read the next chunk
# self._getDataChunk()
#
# # Finished reading the entire data set
# print('closing %i kernels...' % processors)
# pool.close()
# else:
# print("Computing Guesses In Serial ...")
# self._getDataChunk()
# while self.data is not None: # as long as we have not reached the end of this data set:
# temp = [func(vector) for vector in self.data]
# results.append(self._reformatResults(temp, strategy))
# # read the next chunk
# self._getDataChunk()
#
# # reorder to get one numpy array out
# self.guess = np.hstack(tuple(results))
# print('Completed computing guess. Writing to file.')
#
# # Write to file
# self._setResults(is_guess=True)
# else:
# warn('Error: %s is not implemented in pycroscopy.analysis.GuessMethods to find guesses' % strategy)
def _reformatResults(self, results, strategy='wavelet_peaks'):
"""
......
"""
Created on 7/17/16 10:08 AM
@author: Numan Laanait, Suhas Somnath
"""
from warnings import warn
import numpy as np
import sys
import multiprocessing as mp
from .guess_methods import GuessMethods
def targetFunc(args,**kwargs):
"""
Needed to create mappable function for multiprocessing
:param args:
:param kwargs:
:return:
"""
func = Optimize._guessFunc(args[-1])
results = func(args[0])
return results
class Optimize(object):
"""
In charge of all optimization and computation and is used within the Model Class.
"""
def __init__(self, data=np.array([]), parallel=True):
"""
:param data:
"""
if isinstance(data, np.ndarray):
self.data = data
else:
warn('Error: data must be numpy.ndarray. Exiting...')
sys.exit()
self._parallel = parallel
def _guessFunc(self):
gm = GuessMethods()
if self.strategy in gm.methods:
func = gm.__getattribute__(self.strategy)(**self.options)
return func
else:
warn('Error: %s is not implemented in pycroscopy.analysis.GuessMethods to find guesses' % strategy)
def computeGuess(self, processors = 1, strategy='wavelet_peaks',
options={"peak_widths": np.array([10,200]),"peak_step":20}, **kwargs):
"""
Parameters
----------
data
strategy: string
Default is 'Wavelet_Peaks'.
Can be one of ['wavelet_peaks', 'relative_maximum', 'gaussian_processes']. For updated list, run GuessMethods.methods
options: dict
Default: Options for wavelet_peaks{"peaks_widths": np.array([10,200]), "peak_step":20}.
Dictionary of options passed to strategy. For more info see GuessMethods documentation.
kwargs:
processors: int
number of processors to use. Default all processors on the system except for 1.
Returns
-------
"""
self.strategy = strategy
self.options = options
processors = processors
gm = GuessMethods()
if strategy in gm.methods:
# func = gm.__getattribute__(strategy)(**options)
results = list()
if self._parallel:
# start pool of workers
print('Computing Jobs In parallel ... launching %i kernels...' % processors)
pool = mp.Pool(processors)
# Vectorize tasks
tasks = [(vector,self) for vector in self.data]
chunk = int(self.data.shape[0] / processors)
# Map them across processors
jobs = pool.imap(targetFunc, tasks, chunksize=chunk)
# get Results from different processes
results = [j for j in jobs]
print('Extracted Results...')
return results
# Finished reading the entire data set
print('closing %i kernels...' % processors)
pool.close()
else:
print("Computing Guesses In Serial ...")
results = [self._guessFunc(vector) for vector in self.data]
return results
else:
warn('Error: %s is not implemented in pycroscopy.analysis.GuessMethods to find guesses' % strategy)
......@@ -1387,13 +1387,13 @@ def visualizeSHOResults(h5_main, save_plots=True, show_plots=True):
plt_title = grp_name + '_Loops'
if save_plots:
plt_path = os.path.join(folder_path, basename + '_' + plt_title + '.png')
plotSHOLoops(dc_vec, phase_mat * amp_mat, 'DC Bias', 'Piezoresponse (a.u.)', title=plt_title,
save_path=plt_path)
plotSHOLoops(dc_vec, phase_mat * amp_mat, 'DC Bias', 'Piezoresponse (a.u.)', title=plt_title)
# save_path=plt_path)
plt_title = grp_name + '_Snaps'
if save_plots:
plt_path = os.path.join(folder_path, basename + '_' + plt_title + '.png')
plotVSsnapshots(phase_mat.reshape(num_rows, num_cols, phase_mat.shape[1]), title=plt_title,
save_path=plt_path)
plotVSsnapshots(phase_mat.reshape(num_rows, num_cols, phase_mat.shape[1]), title=plt_title)
# save_path=plt_path)
else: # BE-Line can only visualize the amplitude and phase maps:
amp_mat = amp_mat.reshape(num_rows, num_cols)
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment