Skip to content
Snippets Groups Projects
Commit 173e44c7 authored by Zhou, Wenduo's avatar Zhou, Wenduo
Browse files

Refs #12122. Added python classes.

parent 1c82c652
No related branches found
No related tags found
No related merge requests found
#pylint: disable=C0103,R0904
# N(DAV)TableWidget
#
from PyQt4 import QtGui, QtCore
try:
_fromUtf8 = QtCore.QString.fromUtf8
except AttributeError:
_fromUtf8 = lambda s: s
class NTableWidget(QtGui.QTableWidget):
"""
NdavTableWidget inherits from QTableWidget by extending the features
for easy application.
"""
def __init__(self, parent):
"""
:param parent:
:return:
"""
QtGui.QTableWidget.__init__(self, parent)
self._myParent = parent
self._myHeaderList = None
self._myColumnTypeList = None
return
def append_row(self, row_value_list, type_list=None):
"""
:param row_value_list:
:return:
"""
# Check input
assert isinstance(row_value_list, list)
if type_list is not None:
assert isinstance(type_list, list)
assert len(row_value_list) == len(type_list)
else:
type_list = self._myColumnTypeList
if len(row_value_list) != self.columnCount():
ret_msg = 'Input number of values (%d) is different from ' \
'column number (%d).' % (len(row_value_list), self.columnCount())
return False, ret_msg
else:
ret_msg = ''
# Insert new row
row_number = self.rowCount()
self.insertRow(row_number)
# Set values
for i_col in xrange(min(len(row_value_list), self.columnCount())):
item = QtGui.QTableWidgetItem()
item.setText(_fromUtf8(str(row_value_list[i_col])))
item.setFlags(item.flags() & ~QtCore.Qt.ItemIsEditable)
if type_list[i_col] == 'checkbox':
self.set_check_box(row_number, i_col, False)
else:
self.setItem(row_number, i_col, item)
# END-FOR(i_col)
return True, ret_msg
def get_selected_rows(self):
"""
:return:
"""
rows_list = list()
index_status = self._myColumnTypeList.index('checkbox')
for i_row in xrange(self.rowCount()):
is_checked = self.get_row_value(i_row)[index_status]
if is_checked:
rows_list.append(i_row)
return rows_list
def get_cell_value(self, row_index, col_index):
"""
:param row_index:
:param col_index:
:return:
"""
c_type = self._myColumnTypeList[col_index]
return_value = None
if c_type == 'checkbox':
# Check box
cell_i_j = self.cellWidget(row_index, col_index)
assert isinstance(cell_i_j, QtGui.QCheckBox)
return_value = cell_i_j.isChecked()
else:
# Regular cell
item_i_j = self.item(row_index, col_index)
assert isinstance(item_i_j, QtGui.QTableWidgetItem)
value = str(item_i_j.text())
if c_type == 'int':
return_value = int(value)
elif c_type == 'float':
return_value = float(value)
return return_value
def get_row_value(self, row_index):
"""
:param row_index:
:return: list of objects
"""
if row_index < 0 or row_index >= self.rowCount():
raise IndexError('Index of row (%d) is out of range.' % row_index)
ret_list = list()
for i_col in xrange(len(self._myColumnTypeList)):
c_type = self._myColumnTypeList[i_col]
if c_type == 'checkbox':
# Check box
cell_i_j = self.cellWidget(row_index, i_col)
assert isinstance(cell_i_j, QtGui.QCheckBox)
is_checked = cell_i_j.isChecked()
ret_list.append(is_checked)
else:
# Regular cell
item_i_j = self.item(row_index, i_col)
assert isinstance(item_i_j, QtGui.QTableWidgetItem)
value = str(item_i_j.text())
if c_type == 'int':
value = int(value)
elif c_type == 'float':
value = float(value)
ret_list.append(value)
# END-IF-ELSE
# END-FOR
return ret_list
def init_setup(self, column_tup_list):
""" Initial setup
:param column_tup_list: list of 2-tuple as string (column name) and string (data type)
:return:
"""
print '[DB] Init set up table with %d columns!' % len(column_tup_list)
# Define column headings
num_cols = len(column_tup_list)
# Class variables
self._myHeaderList = list()
self._myColumnTypeList = list()
for c_tup in column_tup_list:
c_name = c_tup[0]
c_type = c_tup[1]
self._myHeaderList.append(c_name)
self._myColumnTypeList.append(c_type)
self.setColumnCount(num_cols)
self.setHorizontalHeaderLabels(self._myHeaderList)
return
def init_size(self, num_rows, num_cols):
"""
:return:
"""
self.setColumnCount(num_cols)
self.setRowCount(num_rows)
return
def set_check_box(self, row, col, state):
""" function to add a new select checkbox to a cell in a table row
won't add a new checkbox if one already exists
"""
# Check input
assert isinstance(state, bool)
# Check if cellWidget exists
if self.cellWidget(row,col):
# existing: just set the value
self.cellWidget(row, col).setChecked(state)
else:
# case to add checkbox
checkbox = QtGui.QCheckBox()
checkbox.setText('')
checkbox.setChecked(state)
# Adding a widget which will be inserted into the table cell
# then centering the checkbox within this widget which in turn,
# centers it within the table column :-)
self.setCellWidget(row, col, checkbox)
# END-IF-ELSE
return
def set_value_cell(self, row, col, value=''):
"""
Set value to a cell with integer, float or string
:param row:
:param col:
:param value:
:return:
"""
# Check
if row < 0 or row >= self.rowCount() or col < 0 or col >= self.columnCount():
raise IndexError('Input row number or column number is out of range.')
# Init cell
cell_item = QtGui.QTableWidgetItem()
cell_item.setText(_fromUtf8(str(value)))
cell_item.setFlags(cell_item.flags() & ~QtCore.Qt.ItemIsEditable)
self.setItem(row, col, cell_item)
return
def update_cell_value(self, row, col, value):
"""
:param row:
:param col:
:param value:
:return:
"""
cell_item = self.item(row, col)
cell_widget = self.cellWidget(row, col)
if cell_item is not None and cell_widget is None:
# TableWidgetItem
assert isinstance(cell_item, QtGui.QTableWidgetItem)
if isinstance(value, float):
cell_item.setText(_fromUtf8('%.7f' % value))
else:
cell_item.setText(_fromUtf8(str(value)))
elif cell_item is None and cell_widget is not None:
# TableCellWidget
if isinstance(cell_item, QtGui.QCheckBox) is True:
cell_item.setChecked(value)
else:
raise TypeError('Cell of type %s is not supported.' % str(type(cell_item)))
else:
raise TypeError('Table cell (%d, %d) is in an unsupported situation!' % (row, col))
return
#pylint: disable=W0633,too-many-branches
__author__ = 'wzz'
import os
import urllib2
import socket
def check_url(url, read_lines=False):
""" Check whether a URL is valid
:param url:
:return: boolean, error message
"""
lines = None
try:
# Access URL
url_stream = urllib2.urlopen(url, timeout=2)
# Read lines
if read_lines is True:
lines = url_stream.readlines()
except urllib2.URLError as url_error:
url_stream = url_error
except socket.timeout:
return False, 'Time out. Try again!'
# Return result
if url_stream.code in (200, 401):
url_good = True
else:
url_good = False
# Close connect
url_stream.close()
# Return
if read_lines is True:
return url_good, lines
if url_good is False:
error_message = 'Unable to access %s. Check internet access. Code %d' % (url, url_stream.code)
else:
error_message = ''
return url_good, error_message
def get_scans_list(server_url, exp_no, return_list=False):
""" Get list of scans under one experiment
:param server_url:
:param exp_no:
:return: message
"""
if server_url.endswith('/') is False:
server_url = '%s/' % server_url
data_dir_url = '%sexp%d/Datafiles' % (server_url, exp_no)
does_exist, raw_lines = check_url(data_dir_url, read_lines=True)
if does_exist is False:
return "Experiment %d's URL %s cannot be found." % (exp_no, data_dir_url)
# Scan through the index page
scan_list = []
header = 'HB3A_exp%04d_scan' % exp_no
for line in raw_lines:
if line.count(header) > 0:
# try to find file HB3A_exp0123_scan6789.dat
term = line.split(header)[1].split('.dat')[0]
scan = int(term)
# check
if '%04d' % scan == term:
scan_list.append(scan)
# END_FOR
scan_list = sorted(scan_list)
if return_list is True:
return scan_list
message = 'Experiment %d: Scan from %d to %d' % (exp_no, scan_list[0], scan_list[-1])
return message
def get_scans_list_local_disk(local_dir, exp_no):
""" Get scans from a specified directory on local disk
:param local_dir:
:param exp_no:
:return:
"""
scan_list = []
file_names = os.listdir(local_dir)
header = 'HB3A_exp%04d_scan' % exp_no
for name in file_names:
if name.count(header) > 0:
scan = int(name.split(header)[1].split('.dat')[0])
scan_list.append(scan)
scan_list = sorted(scan_list)
if len(scan_list) == 0:
message = 'Experiment %d: No scan can be found.' % exp_no
else:
message = 'Experiment %d: Scan from %d to %d ' % (exp_no, scan_list[0], scan_list[-1])
num_skip_scans = scan_list[-1] - scan_list[0] + 1 - len(scan_list)
if num_skip_scans > 0:
message += 'with %d ' % num_skip_scans
else:
message += 'without '
message += 'missing scans.'
return message
def parse_int_array(int_array_str):
""" Validate whether the string can be divided into integer strings.
Allowed: a, b, c-d, e, f
:param int_array_str:
:return:
"""
int_array_str = str(int_array_str)
if int_array_str == "":
return True, []
# Split by ","
term_level_0 = int_array_str.split(",")
integer_list = []
# For each term
err_msg = ""
ret_status = True
for level0_term in term_level_0:
level0_term = level0_term.strip()
# split upon dash -
num_dashes = level0_term.count("-")
if num_dashes == 0:
# one integer
value_str = level0_term
try:
int_value = int(value_str)
if str(int_value) != value_str:
ret_status = False
err_msg = "Contains non-integer string %s." % value_str
except ValueError:
ret_status = False
err_msg = "String %s is not an integer." % (value_str)
else:
integer_list.append(int_value)
elif num_dashes == 1:
# Integer range
two_terms = level0_term.split("-")
temp_list = []
for i in xrange(2):
value_str = two_terms[i]
try:
int_value = int(value_str)
if str(int_value) != value_str:
ret_status = False
err_msg = "Contains non-integer string %s." % (value_str)
except ValueError:
ret_status = False
err_msg = "String %s is not an integer." % (value_str)
else:
temp_list.append(int_value)
# break loop
if ret_status is False:
break
# END_FOR(i)
integer_list.extend(range(temp_list[0], temp_list[1]+1))
else:
# Undefined situation
ret_status = False
err_msg = "Term %s contains more than 1 dash." % level0_term
# END-IF-ELSE
# break loop if something is wrong
if ret_status is False:
break
# END-FOR(level0_term)
# Return with false
if ret_status is False:
return False, err_msg
return True, integer_list
#
# GUI Utility Methods
#
from PyQt4 import QtGui
def parse_float_array(array_str):
""" Parse a string to an array of float
:param array_str:
:return: boolean, list of floats/error message
"""
print array_str
assert isinstance(array_str, str)
array_str = array_str.replace(',', ' ')
array_str = array_str.replace('\n', ' ')
array_str = array_str.replace('\t ', ' ')
array_str = array_str.strip()
print '[DB] After processing: ', array_str
float_str_list = array_str.split()
float_list = list()
for float_str in float_str_list:
try:
value = float(float_str)
except ValueError as value_error:
return False, 'Unable to parse %s due to %s.' % (float_str, str(value_error))
else:
float_list.append(value)
# END-FOR
return True, float_list
def parse_integer_list(array_str):
""" Parse a string to an array of integer separated by ','
also, the format as 'a-b' is supported too
:param array_str:
:return: boolean, list of floats/error message
"""
assert isinstance(array_str, str)
array_str = array_str.replace(' ', '')
array_str = array_str.replace('\n', '')
array_str = array_str.replace('\t ', '')
int_str_list = array_str.split(',')
int_list = list()
for int_str in int_str_list:
try:
int_value = int(int_str)
int_list.append(int_value)
except ValueError:
num_dash = int_str.count('-')
if num_dash == 1:
terms = int_str.split('-')
try:
start_value = int(terms[0])
end_value = int(terms[1])
except ValueError:
raise 'Unable to parse %s due to value error' % int_str
elif num_dash == 2 and int_str.startswith('-'):
terms = int_str[1:].split('-')
try:
start_value = int(terms[0])*-1
end_value = int(terms[1])
except ValueError:
raise 'Unable to parse %s due to value error' % int_str
elif num_dash == 3:
terms = int_str.split('-')
try:
start_value = -1*int(terms[1])
end_value = -1*int(terms[3])
except ValueError:
raise 'Unable to parse %s due to value error' % int_str
except IndexError:
raise 'Unable to parse %s due to value error' % int_str
else:
raise 'Unable to parse %s due to value error' % int_str
int_list.extend(xrange(start_value, end_value+1))
# END-FOR
return int_list
def parse_float_editors(line_edits):
"""
:param line_edit_list:
:return: (True, list of floats); (False, error message)
"""
# Set flag
return_single_value = False
if isinstance(line_edits, QtGui.QLineEdit) is True:
line_edit_list = [line_edits]
return_single_value = True
elif isinstance(line_edits, list) is True:
line_edit_list = line_edits
else:
raise RuntimeError('Input is not LineEdit or list of LineEdit.')
error_message = ''
float_list = []
for line_edit in line_edit_list:
assert isinstance(line_edit, QtGui.QLineEdit)
try:
str_value = str(line_edit.text()).strip()
float_value = float(str_value)
except ValueError as value_err:
error_message += 'Unable to parse to integer. %s\n' % (str(value_err))
else:
float_list.append(float_value)
# END-TRY
# END-FOR
if len(error_message) > 0:
return False, error_message
elif return_single_value is True:
return True, float_list[0]
return True, float_list
def parse_integers_editors(line_edits):
"""
:param line_edit_list:
:return: (True, list of integers); (False, error message)
"""
# Set flag
return_single_value = False
if isinstance(line_edits, QtGui.QLineEdit) is True:
line_edit_list = [line_edits]
return_single_value = True
elif isinstance(line_edits, list) is True:
line_edit_list = line_edits
else:
raise RuntimeError('Input is not LineEdit or list of LineEdit.')
error_message = ''
integer_list = []
for line_edit in line_edit_list:
assert isinstance(line_edit, QtGui.QLineEdit)
try:
str_value = str(line_edit.text()).strip()
int_value = int(str_value)
except ValueError as value_err:
error_message += 'Unable to parse to integer. %s\n' % (str(value_err))
else:
if str_value != '%d' % int_value:
error_message += 'Value %s is not a proper integer.\n' % str_value
else:
integer_list.append(int_value)
# END-TRY
# END-FOR
if len(error_message) > 0:
return False, error_message
elif return_single_value is True:
return True, integer_list[0]
return True, integer_list
if __name__ == '__main__':
int_list = parse_integer_list('123, -234, 330-350, 400, -2-4')
print int_list
#pylint: disable=W0403,C0103,R0901,R0904
import numpy
import NTableWidget as tableBase
# UB peak information table
Peak_Integration_Table_Setup = [('Scan', 'int'),
('Pt', 'int'),
('H', 'float'),
('K', 'float'),
('L', 'float'),
('Q_x', 'float'),
('Q_y', 'float'),
('Q_z', 'float'),
('Intensity', 'float')]
class IntegratePeaksTableWidget(tableBase.NTableWidget):
"""
Extended table widget for peak integration
"""
def __init__(self, parent):
"""
:param parent:
"""
tableBase.NTableWidget.__init__(self, parent)
return
def setup(self):
"""
Init setup
:return:
"""
self.init_setup(Peak_Integration_Table_Setup)
return
class UBMatrixTable(tableBase.NTableWidget):
"""
Extended table for UB matrix
"""
def __init__(self, parent):
"""
:param parent:
:return:
"""
tableBase.NTableWidget.__init__(self, parent)
# Matrix
self._matrix = numpy.ndarray((3, 3), float)
for i in xrange(3):
for j in xrange(3):
self._matrix[i][j] = 0.
return
def _set_to_table(self):
"""
TODO/DOC
:return:
"""
for i_row in xrange(3):
for j_col in xrange(3):
self.update_cell_value(i_row, j_col, self._matrix[i_row][j_col])
return
def get_matrix(self):
"""
Get the copy of the matrix
:return:
"""
print '[DB] MatrixTable: _Matrix = ', self._matrix
return self._matrix.copy()
def set_from_list(self, element_array):
"""
TODO/DOC
:param element_array:
:return:
"""
# Check
assert isinstance(element_array, list)
assert len(element_array) == 9
# Set value
i_array = 0
for i in xrange(3):
for j in xrange(3):
self._matrix[i][j] = element_array[i_array]
i_array += 1
# Set to table
self._set_to_table()
return
def set_from_matrix(self, matrix):
"""
TODO - DOC
:param matrix:
:return:
"""
# Check
assert isinstance(matrix, numpy.ndarray)
assert matrix.shape == (3, 3)
for i in xrange(3):
for j in xrange(3):
self._matrix[i][j] = matrix[i][j]
self._set_to_table()
return
def setup(self):
"""
Init setup
:return:
"""
# self.init_size(3, 3)
for i in xrange(3):
for j in xrange(3):
self.set_value_cell(i, j)
self._set_to_table()
return
# UB peak information table
UB_Peak_Table_Setup = [('Scan', 'int'),
('Pt', 'int'),
('H', 'float'),
('K', 'float'),
('L', 'float'),
('Q_x', 'float'),
('Q_y', 'float'),
('Q_z', 'float'),
('Use', 'checkbox'),
('m1', 'float'),
('Error', 'float')]
class UBMatrixPeakTable(tableBase.NTableWidget):
"""
Extended table for peaks used to calculate UB matrix
"""
def __init__(self, parent):
"""
:param parent:
:return:
"""
tableBase.NTableWidget.__init__(self, parent)
return
def get_exp_info(self, row_index):
"""
Get experiment information from a row
:return: scan number, pt number
"""
assert isinstance(row_index, int)
scan_number = self.get_cell_value(row_index, 0)
assert isinstance(scan_number, int)
pt_number = self.get_cell_value(row_index, 1)
assert isinstance(pt_number, int)
return scan_number, pt_number
def get_hkl(self, row_index):
"""
Get reflection's miller index
:param row_index:
:return:
"""
assert isinstance(row_index, int)
m_h = self.get_cell_value(row_index, 2)
m_k = self.get_cell_value(row_index, 3)
m_l = self.get_cell_value(row_index, 4)
assert isinstance(m_h, float)
assert isinstance(m_k, float)
assert isinstance(m_l, float)
return m_h, m_k, m_l
def is_selected(self, row_index):
"""
:return:
"""
if row_index < 0 or row_index >= self.rowCount():
raise IndexError('Input row number %d is out of range [0, %d)' % (row_index, self.rowCount()))
col_index = UB_Peak_Table_Setup.index(('Use', 'checkbox'))
return self.get_cell_value(row_index, col_index)
def setup(self):
"""
Init setup
:return:
"""
self.init_setup(UB_Peak_Table_Setup)
return
def set_hkl(self, i_row, hkl, error=None):
"""
Set HKL to table
:param irow:
:param hkl:
"""
# Check
assert isinstance(i_row, int)
assert isinstance(hkl, list)
i_col_h = UB_Peak_Table_Setup.index(('H', 'float'))
i_col_k = UB_Peak_Table_Setup.index(('K', 'float'))
i_col_l = UB_Peak_Table_Setup.index(('L', 'float'))
self.update_cell_value(i_row, i_col_h, hkl[0])
self.update_cell_value(i_row, i_col_k, hkl[1])
self.update_cell_value(i_row, i_col_l, hkl[2])
if error is not None:
i_col_error = UB_Peak_Table_Setup.index(('Error', 'float'))
self.update_cell_value(i_row, i_col_error, error)
return
# Processing status table
Process_Table_Setup = [('Scan', 'int'),
('Number Pt', 'int'),
('Status', 'str'),
('Merged Workspace', 'str'),
('Group Name', 'str'),
('Select', 'checkbox')]
class ProcessTableWidget(tableBase.NTableWidget):
"""
Extended table for peaks used to calculate UB matrix
"""
def __init__(self, parent):
"""
:param parent:
:return:
"""
tableBase.NTableWidget.__init__(self, parent)
return
def append_scans(self, scans):
""" Append rows
:param scans:
:return:
"""
# Check
assert isinstance(scans, list)
# Append rows
for scan in scans:
row_value_list = [scan, 0, 'In Queue', '', '', False]
status, err = self.append_row(row_value_list)
if status is False:
raise RuntimeError(err)
return
def setup(self):
"""
Init setup
:return:
"""
self.init_setup(Process_Table_Setup)
return
def set_scan_pt(self, scan_no, pt_list):
"""
:param scan_no:
:param pt_list:
:return:
"""
# Check
assert isinstance(scan_no, int)
num_rows = self.rowCount()
set_done = False
for i_row in xrange(num_rows):
tmp_scan_no = self.get_cell_value(i_row, 0)
if scan_no == tmp_scan_no:
self.update_cell_value(i_row, 1, len(pt_list))
set_done = True
break
# END-FOR
if set_done is False:
return 'Unable to find scan %d in table.' % scan_no
return ''
def set_status(self, scan_no, status):
"""
TODO/Doc
:param status:
:return:
"""
# Check
assert isinstance(scan_no, int)
num_rows = self.rowCount()
set_done = False
for i_row in xrange(num_rows):
tmp_scan_no = self.get_cell_value(i_row, 0)
if scan_no == tmp_scan_no:
self.update_cell_value(i_row, 2, status)
set_done = True
break
# END-FOR
if set_done is False:
return 'Unable to find scan %d in table.' % scan_no
return ''
def set_ws_names(self, scan_num, merged_md_name, ws_group_name):
"""
TODO/DOC
:param merged_md_name:
:param ws_group_name:
:return:
"""
# Check
assert isinstance(scan_num, int)
assert isinstance(merged_md_name, str) or merged_md_name is None
assert isinstance(ws_group_name, str) or ws_group_name is None
num_rows = self.rowCount()
set_done = False
for i_row in xrange(num_rows):
tmp_scan_no = self.get_cell_value(i_row, 0)
if scan_num == tmp_scan_no:
if merged_md_name is not None:
self.update_cell_value(i_row, 3, merged_md_name)
if ws_group_name is not None:
self.update_cell_value(i_row, 4, ws_group_name)
set_done = True
break
# END-FOR
if set_done is False:
return 'Unable to find scan %d in table.' % scan_num
return
This diff is collapsed.
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment