Newer
Older
""" File contains Descriptors used describe run for direct inelastic reduction """
from Direct.PropertiesDescriptors import *
class RunList(object):
""" helper class to maintain list of runs used in RunDescriptor for summing
or subsequent processing range of files.
Supports basic operations with this list
def __init__(self,run_list,file_names=None,fext=None):
""" """
self._last_ind2sum = -1
self._file_path = None
self._fext = None
self.set_list2add(run_list,file_names,fext)
self._partial_sum_ws_name = None
#
def set_list2add(self,runs_to_add,fnames=None,fext=None):
"""Set run numbers to add together with possible file guess-es """
if not isinstance(runs_to_add,list):
raise KeyError('Can only set list of run numbers to add')
runs = []
for item in runs_to_add:
runs.append(int(item))
self._run_numbers = runs
self._set_fnames(fnames,fext)
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
#--------------------------------------------------------------------------------------------------
#
def set_cashed_sum_ws(self,ws,new_ws_name=None):
""" store the name of a workspace in the class
as reference clone
"""
if new_ws_name:
old_name = ws.name()
if old_name != new_ws_name:
old_mon_name = old_name + '_monitors'
RenameWorkspace(ws,OutputWorkspace=new_ws_name)
if old_mon_name in mtd:
RenameWorkspace(old_mon_name,OutputWorkspace=new_ws_name + '_monitors')
else:
new_ws_name = ws.name()
self._partial_sum_ws_name = new_ws_name
#
def get_cashed_sum_ws(self):
""" """
if not (self._partial_sum_ws_name):
return None
if self._partial_sum_ws_name in mtd:
return mtd[self._partial_sum_ws_name]
else:
return None
#
def get_cashed_sum_clone(self):
""" """
origin = self.get_cashed_sum_ws()
if not origin:
return None
origin_name = origin.name()
mon_name = origin_name + '_monitors'
if mon_name in mtd:
CloneWorkspace(InputWorkspace=mon_name,OutputWorkspace=origin_name + '_clone_monitors')
ws = CloneWorkspace(InputWorkspace=origin_name,OutputWorkspace=origin_name + '_clone')
return ws
#
def del_cashed_sum(self):
""" """
if not self._partial_sum_ws_name:
return
if self._partial_sum_ws_name in mtd:
DeleteWorkspace(self._partial_sum_ws_name)
mon_ws = self._partial_sum_ws_name + '_monitors'
if mon_ws in mtd:
DeleteWorkspace(mon_ws)
#--------------------------------------------------------------------------------------------------
#
def _set_fnames(self,fnames,fext):
""" sets filenames lists and file extension lists
of length correspondent to run number length
if length of the list provided differs from the length
of the run list, expands fnames list and fext list
to the whole runnumber list using last for fext and
first for fnames members of the
"""
if isinstance(fnames,list):
self._file_path = fnames
else:
self._file_path = [fnames]
if not(self._file_path):
self._file_path = [''] * len(self._run_numbers)
else:
if len(self._file_path) != len(self._run_numbers):
self._file_path = [self._file_path[0]] * len(self._run_numbers)
if fext:
if isinstance(fext,list):
self._fext = fext
else:
self._fext = [fext]
self._fext = [''] * len(self._run_numbers)
else:
if len(self._fext) != len(self._run_numbers):
base_fext = self._fext[-1]
self._fext = [base_fext] * len(self._run_numbers)
def get_file_guess(self,inst_name,run_num,default_fext=None):
""" return the name of run file for run number provided
note, that internally set file extension overwrites
default_fext if not empty
"""
index = self._run_numbers.index(run_num)
guess = self._get_file_guess(inst_name,run_num,index,default_fext)
return (guess,index)
#
def _get_file_guess(self,inst_name,run_num,index,def_fext=None):
""" get file guess given index of the run in the list of runs """
path_guess = self._file_path[index]
fext = self._fext[index]
if def_fext and len(fext) == 0:
fext = def_fext
guess = os.path.join(path_guess,'{0}{1}{2}'.\
format(inst_name,run_num,fext))
def add_or_replace_run(self,run_number,fpath='',fext=None,default_fext=False):
""" add run number to list of existing runs
Let's prohibit adding the same run numbers using this method.
Equivalent run numbers can still be added using list assignment
file path and file extension are added/modified if present
regardless of run being added or replaced
"""
if not(run_number in self._run_numbers):
self._run_numbers.append(run_number)
if not fpath:
fpath = self._file_path[-1]
self._file_path.append(fpath)
if not fext:
fext = self._fext[-1]
self._fext.append(fext)
self._last_ind2sum=len(self._run_numbers)-1
return self._last_ind2sum
ext_ind = self._run_numbers.index(run_number)
if len(fpath)>0:
self._file_path[ext_ind]=fpath
if fext:
if not(default_fext and len(self._fext[ext_ind])>0): #not keep existing
self._fext[ext_ind]=fext
self._last_ind2sum=ext_ind
return ext_ind
#
def check_runs_equal(self,run_list,fpath=None,fext=None):
""" returns true if all run numbers in existing list are
in the comparison list and vice versa.
if lists numbers coincide,
sets new file_path and fext list if such are provided
"""
if len(run_list) != len(self._run_numbers):
return False
for run in run_list:
if not(run in self._run_numbers):
return False
self._set_fnames(fpath,fext)
def get_current_run_info(self,sum_runs,ind=None):
""" return last run info for file to sum"""
if ind:
if not(ind > -1 and ind < len(self._run_numbers)):
raise RuntimeError("Index {0} is outside of the run list of {1} runs".format(ind,len(self._run_numbers)))
ind = self.get_last_ind2sum(sum_runs)
return self._run_numbers[ind],self._file_path[ind],self._fext[ind],ind
def set_last_ind2sum(self,run_number):
"""Check and set last number, contributing to summation
if this number is out of summation range, clear the summation
"""
run_number = int(run_number)
if run_number in self._run_numbers:
self._last_ind2sum = self._run_numbers.index(run_number)
def get_run_list2sum(self,num_to_sum=None):
"""Get run numbers of the files to be summed together
from the list of defined run numbers
n_runs = len(self._run_numbers)
if num_to_sum:
if num_to_sum<=0:
num_to_sum = 1
if num_to_sum>n_runs:
num_to_sum = n_runs
else:
num_to_sum=n_runs
if self._last_ind2sum >= 0 and self._last_ind2sum < num_to_sum:
num_to_sum = self._last_ind2sum + 1
return self._run_numbers[:num_to_sum]
#
def get_last_ind2sum(self,sum_runs):
"""Get last run number contributing to sum"""
if self._last_ind2sum >= 0 and self._last_ind2sum < len(self._run_numbers):
ind = self._last_ind2sum
else:
if sum_runs:
ind = len(self._run_numbers) - 1
else:
ind = 0
return ind
#
def sum_ext(self,sum_runs):
if sum_runs:
last = self.get_last_ind2sum(sum_runs)
sum_ext = "SumOf{0}".format(len(self._run_numbers[:last + 1]))
else:
sum_ext = ''
return sum_ext
#
def get_runs(self):
return self._run_numbers
#
def find_run_files(self,inst_name,run_list=None,default_fext=None):
""" find run files correspondent to the run list provided
and set path to these files as new internal parameters
for the files in list
Return the list of the runs, which files were
not found and found
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
Run list have to coincide or be part of self._run_numbers
No special check for correctness is performed, so may fail
miserably
"""
if not run_list:
run_list = self._run_numbers
not_found=[]
found = []
for run in run_list:
file_hint,index = self.get_file_guess(inst_name,run,default_fext)
try:
file = FileFinder.findRuns(file_hint)[0]
fpath,fname = os.path.split(file)
fname,fex = os.path.splitext(fname)
self._fext[index] = fex
self._file_path[index] = fpath
#self._last_ind2sum = index
found.append(run)
except RuntimeError:
not_found.append(run)
return not_found,found
#--------------------------------------------------------------------------------------------------
#--------------------------------------------------------------------------------------------------
#--------------------------------------------------------------------------------------------------
class RunDescriptor(PropDescriptor):
""" descriptor to work with a run or list of runs specified
either as run number (run file) or as
this run loaded in memory as a workspace
Used to help
"""
# the host class referencing contained all instantiated descriptors.
# Descriptors methods rely on it to work (e.g. to extract file loader
# preferences)
# so it has to be set up manually by PropertyManager __init__ method
_sum_log_name = 'SumRuns'
#--------------------------------------------------------------------------------------------------------------------
def __init__(self,prop_name,DocString=None):
""" """
self._prop_name = prop_name
if not DocString is None:
self.__doc__ = DocString
self._ws_name = None
# pointer to workspace used to mask this workspace obtained at diag for
# this ws
def __len__(self):
""" overloaded len function, which
return length of the run-files list
to work with
"""
if not(self._run_number):
return 0
if self._run_list:
return len(self._run_list._run_numbers)
else:
return 1
#--------------------------------------------------------------------------------------------------------------------
def _clear_all(self):
""" clear all internal properties, workspaces and caches,
associated with this run
"""
# Run number
self._run_number = None
# Extension of the file to load data from
self._run_file_path = ''
if self._ws_name:
mon_ws = self._ws_name + '_monitors'
# Workspace name which corresponds to the run
if self._ws_name in mtd:
DeleteWorkspace(self._ws_name)
if mon_ws in mtd:
DeleteWorkspace(mon_ws)
self._ws_name = None # none if not loaded
# String used to identify the workspace related to this property
self._ws_cname = ''
self._ws_suffix = ''
# property contains run lists
self._run_list = None
# clear masking workspace if any available
if self._mask_ws_name:
if self._mask_ws_name in mtd:
DeleteWorkspace(self._mask_ws_name)
self._mask_ws_name = None
#--------------------------------------------------------------------------------------------------------------------
def __get__(self,instance,owner):
""" return current run number or workspace if it is loaded"""
if instance is None:
if self._ws_name and self._ws_name in mtd:
return mtd[self._ws_name]
else:
#--------------------------------------------------------------------------------------------------------------------
def __set__(self,instance,value):
""" Set up Run number and define workspace name from any source """
if value == None: # clear current run number
self._clear_all()
return
if isinstance(value, api.Workspace):
if self._ws_name:
if self._ws_name != value.name():
self._clear_all()
self._set_ws_as_source(value)
else:
return # do nothing
# it is just reassigning the same workspace to itself
else: # first assignment of workspace to property
self._set_ws_as_source(value)
return
if isinstance(value,str): # it may be run number as string or it may be a workspace name
if value in mtd: # workspace name
self.__set__(instance,ws)
return
else: # split string into run indexes and auxiliary file parameters
file_path,run_num,fext = prop_helpers.parse_run_file_name(value)
if isinstance(run_num,list):
self._set_run_list(instance,run_num,file_path,fext)
self._set_single_run(instance,run_num,file_path,fext,False)
self._set_run_list(instance,value,"",instance.data_file_ext)
self._set_single_run(instance,value,"",instance.data_file_ext,True)
#--------------------------------------------------------------------------------------------------------------------
def _set_single_run(self,instance,run_number,file_path='',fext=None,default_fext=False):
""" """
self._run_number = int(run_number)
# build workspace name for current run number
new_ws_name = self._build_ws_name()
if self._run_list and instance.sum_runs:
ind = self._run_list.add_or_replace_run(self._run_number,file_path,fext,default_fext)
self._run_file_path = self._run_list._file_path[ind]
self._fext= self._run_list._fext[ind]
self._ws_name = new_ws_name
if self._ws_name != new_ws_name:
self._clear_all()
# clear all would invalidate run number and workspace number
self._run_number = int(run_number)
self._run_file_path = file_path
self._ws_name = new_ws_name
else: # nothing to do, there is workspace, which corresponds to this run number
pass # and it may be already loaded (may be not)
#--------------------------------------------------------------------------------------------------------------------
def _set_run_list(self,instance,run_list,file_path=None,fext=None):
if self._run_list and self._run_list.check_runs_equal(run_list,file_path,fext):
return
self._clear_all()
self._run_list = RunList(run_list,file_path,fext)
run_num,file_path,main_fext,ind = self._run_list.get_current_run_info(instance.sum_runs)
self._run_list.set_last_ind2sum(ind)
self._run_file_path = file_path
self._fext= main_fext
self._ws_name = self._build_ws_name()
def run_number(self):
""" Return run number regardless of workspace is loaded or not"""
if self._ws_name and self._ws_name in mtd:
ws = mtd[self._ws_name]
return ws.getRunNumber()
else:
#--------------------------------------------------------------------------------------------------------------------
#--------------------------------------------------------------------------------------------------------------------
def get_masking(self):
""" return masking workspace specific to this particular workspace
together with number of masked spectra
"""
if self._mask_ws_name:
mask_ws = mtd[self._mask_ws_name]
num_masked = mask_ws.getRun().getLogData('NUM_SPECTRA_Masked').value
return (mask_ws,num_masked)
else:
return (None,0)
#--------------------------------------------------------------------------------------------------------------------
def add_masked_ws(self,masked_ws):
""" extract masking from the workspace provided and store masks
to use with this run workspace
if self._mask_ws_name:
mask_ws = mtd[self._mask_ws_name]
num_masked = mask_ws.getRun().getLogData('NUM_SPECTRA_Masked').value
add_mask_name = self._prop_name+'_tmp_masking'
else:
num_masked = 0
add_mask_name = self._prop_name+'CurrentMasking'
masks,spectra=ExtractMask(InputWorkspace=masked_ws,OutputWorkspace=add_mask_name)
num_masked+=len(spectra)
if self._mask_ws_name:
mask_ws +=masks
else:
self._mask_ws_name=add_mask_name
AddSampleLog(Workspace=self._mask_ws_name,LogName = 'NUM_SPECTRA_Masked',
LogText=str(num_masked),LogType='Number')
#--------------------------------------------------------------------------------------------------------------------
def is_monws_separate(self):
""" """
mon_ws = self.get_monitors_ws()
if mon_ws:
name = mon_ws.name()
else:
return False
if name.endswith('_monitors'):
return True
else:
return False
#--------------------------------------------------------------------------------------------------------------------
def get_run_list(self):
""" Returns list of the files, assigned to current property """
current_run = self.run_number()
if self._run_list:
runs = self._run_list.get_runs()
if current_run in runs:
return runs
else:
return [current_run]
else:
return [current_run]
#--------------------------------------------------------------------------------------------------------------------
@staticmethod
def get_sum_run_list(ws):
"""retrieve list of contributed run numbers from the sum workspace log"""
summed_runs=[]
if RunDescriptor._sum_log_name in ws.getRun():
summed_str = ws.getRun().getLogData(RunDescriptor._sum_log_name).value
run_nums = summed_str.split(',')
for run_str in run_nums:
summed_runs.append(int(run_str))
else:
raise RuntimeError("Presumably sum workspace {0} does not have sum log attached to it".format(ws.name()))
return summed_runs
#--------------------------------------------------------------------------------------------------------------------
def get_runs_to_sum(self,existing_sum_ws=None,num_files=None):
""" return list of runs, expected to be summed together
excluding the runs, already summed and added to cached sum workspace
"""
if not RunDescriptor._holder.sum_runs:
return ([],None,0)
if not self._run_list:
return ([],None,0)
#
if not existing_sum_ws:
existing_sum_ws = self._run_list.get_cashed_sum_ws()
if existing_sum_ws:
summed_runs = RunDescriptor.get_sum_run_list(existing_sum_ws)
n_existing_sums = len(summed_runs)
runs2_sum = self._run_list.get_run_list2sum(num_files)
for run in summed_runs:
if run in runs2_sum:
del runs2_sum[runs2_sum.index(run)]
return (runs2_sum,existing_sum_ws,n_existing_sums)
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
#--------------------------------------------------------------------------------------------------------------------
def find_run_files(self,run_list=None):
""" find run files correspondent to the run list provided
and set path to these files as new internal parameters
for the files in the list
Returns True and empty list or False and
the list of the runs, which files were not found
or not belong to the existing run list.
"""
if not self._run_list:
if not run_list:
return (True,[],[])
else:
return (False,run_list,[])
if run_list:
existing = self._run_list.get_runs()
non_existing=[]
for run in run_list:
if not(run in existing):
raise RuntimeError('run {0} is not in the existing run list'.format(run))
inst = RunDescriptor._holder.short_instr_name
default_fext= RunDescriptor._holder.data_file_ext
not_found,found=self._run_list.find_run_files(inst,run_list,default_fext)
if len(not_found) == 0:
return (True,[],found)
else:
return (False,not_found,found)
#--------------------------------------------------------------------------------------------------------------------
def set_action_suffix(self,suffix=None):
""" method to set part of the workspace name, which indicate some action performed over this workspace
e.g.: default suffix of a loaded workspace is 'RAW' but we can set it to SPE to show that conversion to
energy will be performed for this workspace.
method returns the name of the workspace is will have with this suffix.
Algorithms would later
work on the initial workspace and modify it in-place or to produce workspace with new name (depending if one
wants to keep initial workspace)
synchronize_ws(ws_pointer) then should synchronize workspace and its name.
TODO: This method should be automatically invoked by an algorithm decorator
Until implemented, one have to ensure that it is correctly used together with synchronize_ws
to ensue one can always get workspace from its name
"""
if suffix:
self._ws_suffix = suffix
else: # return to default
return self._build_ws_name()
#--------------------------------------------------------------------------------------------------------------------
def synchronize_ws(self,workspace=None):
""" Synchronize workspace name (after workspace may have changed due to algorithm)
with internal run holder name. Accounts for the situation when
TODO: This method should be automatically invoked by an algorithm decorator
Until implemented, one have to ensure that it is correctly used together with
set_action_suffix to ensue one can always get expected workspace from its name
outside of a method visibility
"""
if not workspace:
new_name = self._build_ws_name()
old_name = workspace.name()
if new_name != old_name:
RenameWorkspace(InputWorkspace=old_name,OutputWorkspace=new_name)
old_mon_name = old_name + '_monitors'
new_mon_name = new_name + '_monitors'
if old_mon_name in mtd:
RenameWorkspace(InputWorkspace=old_mon_name,OutputWorkspace=new_mon_name)
self._ws_name = new_name
#--------------------------------------------------------------------------------------------------------------------
def get_file_ext(self):
""" Method returns current file extension for file to load workspace from
e.g. .raw or .nxs extension
if self._fext and len(self._fext)>0:
return self._fext
else: # return IDF default
return RunDescriptor._holder.data_file_ext
#--------------------------------------------------------------------------------------------------------------------
def set_file_ext(self,val):
""" set non-default file extension """
if isinstance(val,str):
if val[0] != '.':
value = '.' + val
else:
value = val
else:
raise AttributeError('Source file extension can be only a string')
#--------------------------------------------------------------------------------------------------------------------
def _check_calibration_source():
""" if user have not specified calibration as input to the script,
try to retrieve calibration stored in file with run properties"""
changed_prop = RunDescriptor._holder.getChangedProperties()
if 'det_cal_file' in changed_prop:
use_workspace_calibration = False
else:
use_workspace_calibration = True
return use_workspace_calibration
#--------------------------------------------------------------------------------------------------------------------
""" Method returns workspace correspondent to current run number(s)
and loads this workspace if it has not been loaded
Returns Mantid pointer to the workspace, corresponding to this run number
if not self._ws_name:
if self._ws_name in mtd:
ws = mtd[self._ws_name]
if ws.run().hasProperty("calibrated"):
return ws # already calibrated
else:
prefer_ws_calibration = self._check_calibration_source()
self.apply_calibration(ws,RunDescriptor._holder.det_cal_file,prefer_ws_calibration)
return ws
prefer_ws_calibration = self._check_calibration_source()
inst_name = RunDescriptor._holder.short_inst_name
calibration = RunDescriptor._holder.det_cal_file
if self._run_list and RunDescriptor._holder.sum_runs : # Sum runs
ws = self._load_and_sum_runs(inst_name,RunDescriptor._holder.load_monitors_with_workspace)
ws = self.load_run(inst_name, calibration,False, RunDescriptor._holder.load_monitors_with_workspace,prefer_ws_calibration)
self.synchronize_ws(ws)
self.apply_calibration(ws,calibration,prefer_ws_calibration)
#--------------------------------------------------------------------------------------------------------------------
def get_ws_clone(self,clone_name='ws_clone'):
""" Get unbounded clone of existing Run workspace """
ws = self.get_workspace()
CloneWorkspace(InputWorkspace=ws,OutputWorkspace=clone_name)
mon_ws_name = ws.name() + '_monitors'
cl_mon_name = clone_name + '_monitors'
CloneWorkspace(InputWorkspace=mon_ws_name,OutputWorkspace=cl_mon_name)
return mtd[clone_name]
#--------------------------------------------------------------------------------------------------------------------
def _set_ws_as_source(self,value):
""" assign all parts of the run if input value is workspace """
self._run_number = value.getRunNumber()
ws_name = value.name()
self._split_ws_name(ws_name)
self.synchronize_ws(value)
#--------------------------------------------------------------------------------------------------------------------
def chop_ws_part(self,origin,tof_range,rebin,chunk_num,n_chunks):
""" chop part of the original workspace and sets it up to this run as new original
Return the pointer to workspace being chopped """
if not origin:
mon_ws = mtd[origin_name + '_monitors']
target_name = '#{0}/{1}#'.format(chunk_num,n_chunks) + origin_name
if chunk_num == n_chunks:
RenameWorkspace(InputWorkspace=origin_name,OutputWorkspace=target_name)
RenameWorkspace(InputWorkspace=mon_ws,OutputWorkspace=target_name + '_monitors')
origin_name = target_name
CloneWorkspace(InputWorkspace=mon_ws,OutputWorkspace=target_name + '_monitors')
origin_invalidated = False
if rebin: # debug and compatibility mode with old reduction
Rebin(origin_name,OutputWorkspace=target_name,Params=[tof_range[0],tof_range[1],tof_range[2]],PreserveEvents=False)
else:
CropWorkspace(origin_name,OutputWorkspace=target_name,XMin=tof_range[0],XMax=tof_range[2])
self._set_ws_as_source(mtd[target_name])
if origin_invalidated:
return self.get_workspace()
else:
return origin
#--------------------------------------------------------------------------------------------------------------------
def get_monitors_ws(self,monitor_ID=None):
""" get pointer to a workspace containing monitors.
Explores different ways of finding monitor workspace in Mantid and returns the python pointer to the
workspace which contains monitors.
"""
data_ws = self.get_workspace()
if not data_ws:
return None
monWS_name = data_ws.name() + '_monitors'
if monWS_name in mtd:
mon_ws = mtd[monWS_name]
monitors_separate = True
else:
mon_ws = data_ws
monitors_separate = False
spec_to_mon = RunDescriptor._holder.spectra_to_monitors_list
if monitors_separate and spec_to_mon :
for specID in spec_to_mon:
mon_ws = self.copy_spectrum2monitors(data_ws,mon_ws,specID)
if monitor_ID:
try:
ws_index = mon_ws.getIndexFromSpectrumNumber(monitor_ID)
mon_list = self._holder.get_used_monitors_list()
for monID in mon_list:
try:
ws_ind = mon_ws.getIndexFromSpectrumNumber(int(monID))
except:
mon_ws = None
break
#--------------------------------------------------------------------------------------------------------------------
def is_existing_ws(self):
""" method verifies if property value relates to workspace, present in ADS """
if self._ws_name:
if self._ws_name in mtd:
return True
else:
return False
else:
return False
#--------------------------------------------------------------------------------------------------------------------
def file_hint(self,run_num_str=None,filePath=None,fileExt=None,**kwargs):
""" procedure to provide run file guess name from run properties
main purpose -- to support customized order of file extensions
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
if not run_num_str:
run_num_str = str(self.run_number())
inst_name = RunDescriptor._holder.short_inst_name
if 'file_hint' in kwargs:
hint = kwargs['file_hint']
fname,old_ext = os.path.splitext(hint)
if len(old_ext) == 0:
old_ext = self.get_file_ext()
else:
if fileExt:
old_ext = fileExt
else:
old_ext = self.get_file_ext()
hint = inst_name + run_num_str + old_ext
if not filePath:
filePath = self._run_file_path
if os.path.exists(filePath):
hint = os.path.join(filePath,hint)
if os.path.exists(hint):
return hint,old_ext
else:
fp,hint = os.path.split(hint)
return hint,old_ext
#--------------------------------------------------------------------------------------------------------------------
def find_file(self,inst_name=None,run_num=None,filePath=None,fileExt=None,**kwargs):
"""Use Mantid to search for the given run. """
if not inst_name:
inst_name = RunDescriptor._holder.short_inst_name
if run_num:
run_num_str = str(run_num)
else:
run_num_str = str(self.run_number())
#
file_hint,old_ext = self.file_hint(run_num_str,filePath,fileExt,**kwargs)
try:
file = FileFinder.findRuns(file_hint)[0]
fname,fex = os.path.splitext(file)
message = '*** Cannot find run-file with extension {0}.\n'\
' Found file {1} instead'.format(old_ext,file)
RunDescriptor._logger(message,'notice')
self._run_file_path = os.path.dirname(fname)
message = '*** Cannot find file matching hint {0} on Mantid search paths '.\
if not 'be_quet' in kwargs:
RunDescriptor._logger(message,'warning')
return (False,message)
#--------------------------------------------------------------------------------------------------------------------
def load_file(self,inst_name,ws_name,run_number=None,load_mon_with_workspace=False,filePath=None,fileExt=None,**kwargs):
""" load run for the instrument name provided. If run_numner is None, look for the current run"""
ok,data_file = self.find_file(None,filePath,fileExt,**kwargs)
if not ok:
self._ws_name = None
raise IOError(data_file)
if load_mon_with_workspace:
mon_load_option = 'Include'
else:
mon_load_option = 'Separate'
#
try: # Hack: LoadEventNexus does not understand Separate at the moment and throws.
# And event loader always loads monitors separately
Load(Filename=data_file, OutputWorkspace=ws_name,LoadMonitors = mon_load_option)
except ValueError:
#mon_load_option =str(int(load_mon_with_workspace))
Load(Filename=data_file, OutputWorkspace=ws_name,LoadMonitors = '1',MonitorsAsEvents='0')
RunDescriptor._logger("Loaded {0}".format(data_file),'information')
loaded_ws = mtd[ws_name]
return loaded_ws
#--------------------------------------------------------------------------------------------------------------------
def load_run(self,inst_name, calibration=None, force=False, mon_load_option=False,use_ws_calibration=True,\
"""Loads run into workspace with name provided.
If force is true then the file is loaded regardless of whether this workspace already exists
# If a workspace with this name exists, then assume it is to be used in
# place of a file
if 'ws_name' in kwargs:
ws_name = kwargs['ws_name']
ws_name = self._build_ws_name()
if ws_name in mtd and not force:
RunDescriptor._logger("{0} already loaded as workspace.".format(ws_name),'information')
# If it doesn't exists as a workspace assume we have to try and
# load a file
loaded_ws = self.load_file(inst_name,ws_name,None,mon_load_option,filePath,fileExt,**kwargs)
######## Now we have the workspace
self.apply_calibration(loaded_ws,calibration,use_ws_calibration)
return loaded_ws
#--------------------------------------------------------------------------------------------------------------------
def apply_calibration(self,loaded_ws,calibration=None,use_ws_calibration=True):
""" If calibration is present, apply it to the workspace
use_ws_calibration -- if true, retrieve workspace property, which defines
calibration option (e.g. det_cal_file used a while ago) and try to use it
if not (calibration) or use_ws_calibration:
if not isinstance(loaded_ws, api.Workspace):
raise RuntimeError(' Calibration can be applied to a workspace only and got object of type {0}'.format(type(loaded_ws)))
if loaded_ws.run().hasProperty("calibrated"):
return # already calibrated
ws_calibration = calibration
if use_ws_calibration:
try:
ws_calibration = prop_helpers.get_default_parameter(loaded_ws.getInstrument(),'det_cal_file')
if ws_calibration is None:
ws_calibration = calibration
if isinstance(ws_calibration,str) and ws_calibration.lower() == 'none':
ws_calibration = calibration
if ws_calibration :
test_name = ws_calibration
ws_calibration = FileFinder.getFullPath(ws_calibration)
if len(ws_calibration) == 0:
raise RuntimeError('Can not find defined in run {0} calibration file {1}\n'\
'Define det_cal_file reduction parameter properly'.format(loaded_ws.name(),test_name))
RunDescriptor._logger('*** load_data: Calibrating data using workspace defined calibration file: {0}'.format(ws_calibration),'notice')
except KeyError: # no det_cal_file defined in workspace
if calibration:
ws_calibration = calibration
else:
return
if type(ws_calibration) == str : # It can be only a file (got it from calibration property)
RunDescriptor._logger('load_data: Moving detectors to positions specified in cal file {0}'.format(ws_calibration),'debug')
# Pull in pressures, thicknesses & update from cal file
LoadDetectorInfo(Workspace=loaded_ws, DataFilename=ws_calibration, RelocateDets=True)
AddSampleLog(Workspace=loaded_ws,LogName="calibrated",LogText=str(ws_calibration))
elif isinstance(ws_calibration, api.Workspace):
RunDescriptor._logger('load_data: Copying detectors positions from workspace {0}: '.format(ws_calibration.name()),'debug')
CopyInstrumentParameters(InputWorkspace=ws_calibration,OutputWorkspace=loaded_ws)
AddSampleLog(Workspace=loaded_ws,LogName="calibrated",LogText=str(ws_calibration))
#--------------------------------------------------------------------------------------------------------------------
@staticmethod
def copy_spectrum2monitors(data_ws,mon_ws,spectraID):
"""
this routine copies a spectrum form workspace to monitor workspace and rebins it according to monitor workspace binning
@param data_ws -- the event workspace which detector is considered as monitor or Mantid pointer to this workspace
@param mon_ws -- the histogram workspace with monitors where one needs to place the detector's spectra
@param spectraID-- the ID of the spectra to copy.
"""
# ----------------------------
try:
ws_index = mon_ws.getIndexFromSpectrumNumber(spectraID)
# Spectra is already in the monitor workspace
return mon_ws
except:
ws_index = data_ws.getIndexFromSpectrumNumber(spectraID)
#
x_param = mon_ws.readX(0)
bins = [x_param[0],x_param[1] - x_param[0],x_param[-1]]
ExtractSingleSpectrum(InputWorkspace=data_ws,OutputWorkspace='tmp_mon',WorkspaceIndex=ws_index)
Rebin(InputWorkspace='tmp_mon',OutputWorkspace='tmp_mon',Params=bins,PreserveEvents='0')
# should be vice versa but Conjoin invalidate ws pointers and hopefully
# nothing could happen with workspace during conjoining
#AddSampleLog(Workspace=monWS,LogName=done_log_name,LogText=str(ws_index),LogType='Number')
mon_ws_name = mon_ws.getName()
ConjoinWorkspaces(InputWorkspace1=mon_ws,InputWorkspace2='tmp_mon')
if 'tmp_mon' in mtd:
DeleteWorkspace(WorkspaceName='tmp_mon')
return mon_ws
#--------------------------------------------------------------------------------------------------------------------
def clear_monitors(self):
""" method removes monitor workspace form analysis data service if it is there
(assuming it is not needed any more)
"""
monWS_name = self._ws_name + '_monitors'
if monWS_name in mtd:
DeleteWorkspace(monWS_name)
#--------------------------------------------------------------------------------------------------------------------
def clear_resulting_ws(self):
""" remove workspace from memory as if it has not been processed
and clear all operations indicators except cashes and run lists.