Loading numerical_labelling/labelling.py +129 −112 Original line number Diff line number Diff line Loading @@ -15,6 +15,7 @@ def int_to_bool_list(num, nSensorCandidate): bin_string = format(num, form) return [x == "1" for x in bin_string[::-1]], bin_string, form def bool_list_to_int(layout, nSensorCandidate): """ takey a boolean layout (True=measured, False=unmeasured) and converts it Loading @@ -24,7 +25,9 @@ def bool_list_to_int(layout, nSensorCandidate): if isinstance(layout, list): layout = np.array(layout) rev_layout = np.concatenate([layout[nSensorCandidate:],layout[:nSensorCandidate]]) # Assuming layout is a list rev_layout = np.concatenate( [layout[nSensorCandidate:], layout[:nSensorCandidate]] ) # Assuming layout is a list tlayout_np = np.array(rev_layout) rev_bin = [str(x) for x in tlayout_np[::-1].astype(int)] rev_bin_str = str.join("", rev_bin) Loading Loading @@ -56,7 +59,6 @@ def jacnum(x0, fx, dx=1e-6): def jacsym(constr, varbs, debug=False): """ generate jacobian symbolically constr: list of sympy constraint equations Loading @@ -75,9 +77,11 @@ def jacsym(constr, varbs, debug=False): print(f"∂({con})/∂({var}) = {dmat[row, col]}") return dmat def symbolic_is_zero(x): return sy.Symbol("?") if (e := x.equals(0)) is None else int(e) def find_zero_elements(M): # rows = np.where(np.all(A_uo==0, axis=1))[0] sym_indicator = sy.Matrix( Loading @@ -93,6 +97,7 @@ def find_nonzero_columns(M): return nonzero def get_observability_rref(Ju, bool_measured): bool_observable = bool_measured.copy() Loading @@ -110,6 +115,7 @@ def get_observability_rref(Ju, bool_measured): return bool_observable, Ju_rref, Zero def get_red_rref(J, bool_measured): """ iterates over the measurement columns Loading @@ -132,13 +138,22 @@ def get_red_rref(J, bool_measured): return bool_redundant def observability_redundancy_labelling_rref(J, bool_measured,x0={}, debug=False, le_red_method=False, enforce_integer =True): def observability_redundancy_labelling_rref( J, bool_measured, x0={}, debug=False, le_red_method=False, enforce_integer=True, ): if len(x0) > 0: J = J.subs(x0) if enforce_integer and len(x0) > 0: # convert to int matrix J = sy.Matrix(np.matrix(J).astype(int)) pass # ensure jacobian columns are all accounted for if len(bool_measured) != J.shape[1]: Loading @@ -151,6 +166,7 @@ def observability_redundancy_labelling_rref(J, bool_measured,x0={}, debug=False, bool_observable, Ju_rref, Zero = get_observability_rref(Ju, bool_measured) if le_red_method: bool_redundant = bool_measured.copy() include_rows = np.any(~Zero, axis=1) Loading @@ -159,7 +175,6 @@ def observability_redundancy_labelling_rref(J, bool_measured,x0={}, debug=False, rref_end = time.time() print(f"rref elapsed: {rref_end - rref_start}") VV = V.T @ V # leftinv_V = VV.inv() @ V.T leftinv_V_start = time.time() Loading Loading @@ -203,29 +218,29 @@ def observability_redundancy_labelling_rref(J, bool_measured,x0={}, debug=False, else: return bool_observable, bool_redundant def sort_zero_rows_to_bottom(matrix, additional_mats=[]): # Create a boolean mask where True means the row is not all zeros non_zero_mask = np.any(matrix != 0, axis=1) # Use the mask to sort rows - non-zero rows first, then zero rows sorted_matrix = np.vstack([ sorted_matrix = np.vstack( [ matrix[non_zero_mask], # All non-zero rows matrix[~non_zero_mask] # All zero rows ]) matrix[~non_zero_mask], # All zero rows ] ) sorted_additional_mats = [] if len(additional_mats) > 0: for imat in additional_mats: iadditional_mat = np.vstack([ imat[non_zero_mask], imat[~non_zero_mask] ]) iadditional_mat = np.vstack([imat[non_zero_mask], imat[~non_zero_mask]]) sorted_additional_mats.append(iadditional_mat) return (sorted_matrix, *sorted_additional_mats) # get U1, U2 def get_us(U, debug=False): """ Loading Loading @@ -288,6 +303,7 @@ def get_us(U, debug=False): return U1, U2 def get_obs_lu(U, Jm, debug=False, return_all=False): """ get observability from U matrix Loading Loading @@ -325,6 +341,7 @@ def get_obs_lu(U, Jm, debug=False, return_all=False): else: return obs, obs_cols def get_red_lu(L, Pi, U, A_m, debug=False, return_all=False): """ simply get the number of redundant transmitters. Loading Loading @@ -384,6 +401,7 @@ def get_red_lu(L, Pi, U, A_m, debug=False, return_all = False): else: return red, red_cols def observability_redundancy_labelling_lu(J, bool_measured, x0={}, debug=False): bool_measured = np.array(bool_measured) Loading @@ -396,7 +414,9 @@ def observability_redundancy_labelling_lu(J, bool_measured, x0 = {}, debug=False try: J = np.matrix(J).astype(np.float64) except TypeError: raise TypeError("Matrix contains symbolic elements. Please supply operating point x0") raise TypeError( "Matrix contains symbolic elements. Please supply operating point x0" ) Jm = J[:, list(np.where(bool_measured)[0])] Ju = J[:, list(np.where(~bool_measured)[0])] Loading Loading @@ -464,7 +484,6 @@ def observability_redundancy_labelling_lu(J, bool_measured, x0 = {}, debug=False return bool_observable, bool_redundant def observability_redundancy_labelling_qr(J, bool_measured, x0={}, debug=False): bool_measured = np.array(bool_measured) Loading @@ -477,7 +496,9 @@ def observability_redundancy_labelling_qr(J, bool_measured, x0 = {}, debug=False try: J = np.matrix(J).astype(np.float64) except TypeError: raise TypeError("Matrix contains symbolic elements. Please supply operating point x0") raise TypeError( "Matrix contains symbolic elements. Please supply operating point x0" ) Jm = J[:, list(np.where(bool_measured)[0])] Ju = J[:, list(np.where(~bool_measured)[0])] Loading Loading @@ -511,7 +532,3 @@ def observability_redundancy_labelling_qr(J, bool_measured, x0 = {}, debug=False bool_redundant = np.full(len(bool_observable), np.nan) return bool_observable, None requirements_minimal.txt +1 −1 Original line number Diff line number Diff line python==3.12 # python==3.12 scipy jupyter numpy Loading testing/generate_pgraph.py +111 −86 Original line number Diff line number Diff line Loading @@ -7,7 +7,14 @@ import argparse # soar_integration tools from tools.soar_utilities import check_incidence, define_system def build_pgraph(sys_name="ccro", overwrite=False, n_components=3, arc_splitter=None, arc_equal_composition = None,): def build_pgraph( sys_name="ccro", overwrite=False, n_components=3, arc_splitter=None, arc_equal_composition=None, ): """ Build soar process graph object for the CCRO example """ Loading @@ -15,12 +22,17 @@ def build_pgraph(sys_name="ccro", overwrite=False, n_components=3, arc_splitter= sys_name = str(sys_name).lower() # check if pgraph already exists pgraph_path = os.path.join(module_path, "testing", "process_graphs", f"{sys_name}_pgraph.pkl") # pgraph_path = os.path.join(module_path, "testing", "process_graphs", f"{sys_name}_pgraph.pkl") pgraph_path = os.path.join(os.getcwd(), f"{sys_name}_pgraph.pkl") if os.path.exists(pgraph_path): if overwrite: logging.warning(f"WARNING: file {pgraph_path} already exists, overwriting file") logging.warning( f"WARNING: file {pgraph_path} already exists, overwriting file" ) else: raise Exception(f"ERROR: file {pgraph_path} already exists and overwrite not specified. Aborting.") raise Exception( f"ERROR: file {pgraph_path} already exists and overwrite not specified. Aborting." ) if sys_name == "minimal": # almost the exact same layout as CCRO, but combining streams Loading @@ -35,11 +47,13 @@ def build_pgraph(sys_name="ccro", overwrite=False, n_components=3, arc_splitter= ] ) coordinates = np.array([ coordinates = np.array( [ [2, 1], # env [0.75, 0], # mx [1.5, 0], # me1 [2, -1],] # sp1 [2, -1], ] # sp1 ) if arc_splitter is None: arc_splitter = [[4 - 1, 5 - 1, 6 - 1]] Loading @@ -47,6 +61,7 @@ def build_pgraph(sys_name="ccro", overwrite=False, n_components=3, arc_splitter= arc_equal_composition = [[4 - 1, 5 - 1, 6 - 1]] if sys_name == "ccro": strSystemID = "NAWI305" incidence = np.array( [ # e P1 Mx Me1 Sp1 P2 [-1, +1, 0, 0, 0, 0], # 1 Loading @@ -60,13 +75,15 @@ def build_pgraph(sys_name="ccro", overwrite=False, n_components=3, arc_splitter= ] ) coordinates = np.array([ coordinates = np.array( [ [2, 1], # env [0, 0], # p1 [0.75, 0], # mx [1.5, 0], # me1 [2, -1], # sp1 [1.5,-1]] # p2 [1.5, -1], ] # p2 ) if arc_splitter is None: arc_splitter = [[5 - 1, 6 - 1, 7 - 1]] Loading Loading @@ -96,7 +113,8 @@ def build_pgraph(sys_name="ccro", overwrite=False, n_components=3, arc_splitter= ] ) # define coordinates for plotting process graph coordinates = np.array([ coordinates = np.array( [ [0, 0], # a [1, 0], # b [2, 0], # c Loading @@ -119,14 +137,17 @@ def build_pgraph(sys_name="ccro", overwrite=False, n_components=3, arc_splitter= check_incidence(incidence) pgraph = define_system( iIncidence=incidence, coordinates = coordinates, verbose=5, iIncidence=incidence, coordinates=coordinates, verbose=5, arcSplitter=arc_splitter, arcEqualComposition=arc_equal_composition, numberOfComponents=n_components, strSystemID=strSystemID, ) # this is the longest step, generates the cutsets. pgraph.add_graph_info(verbose=5) pgraph.add_graph_info(verbose=5, use_networkx=True) # save pgraph with open(pgraph_path, "wb") as f: Loading @@ -135,9 +156,13 @@ def build_pgraph(sys_name="ccro", overwrite=False, n_components=3, arc_splitter= if __name__ == "__main__": parser = argparse.ArgumentParser(description="Generates a pickle file for a soar processGraph") parser.add_argument('--sysname', type=str, help='Name of the system', required=True) parser.add_argument('--overwrite', action='store_true', help='overwrite existing pickle file') parser = argparse.ArgumentParser( description="Generates a pickle file for a soar processGraph" ) parser.add_argument("--sysname", type=str, help="Name of the system", required=True) parser.add_argument( "--overwrite", action="store_true", help="overwrite existing pickle file" ) # 3. Parse the arguments from the command line args = parser.parse_args() Loading testing/generate_test_cases.py +45 −45 Original line number Diff line number Diff line import os import sys import pickle Loading @@ -24,6 +23,7 @@ from numerical_labelling.constraints import labelConstraint from numerical_labelling.labelling import jacsym from testing.generate_numerical_system import constr_minimal, constr_ccro, constr_pilot def build_test_case(ptype=1, system="ccro", custom_system=None, debug=False): """ ptype: problem type per test matrix Loading Loading @@ -85,7 +85,9 @@ def build_test_case(ptype=1, system = "ccro", custom_system = None, debug=False) cindx = 0 # exclude any nonlinear constraints for iconstr, constr in constr_sys.items(): if not constr.type.startswith("nl_") and not constr.type.startswith("bl_"): if not constr.type.startswith("nl_") and not constr.type.startswith( "bl_" ): constr_active[cindx] = constr cindx += 1 Loading @@ -94,7 +96,9 @@ def build_test_case(ptype=1, system = "ccro", custom_system = None, debug=False) # use only m and x variables varbs_active = {} for varb_name, varb in varbs_sys.items(): if varb_name.lower().startswith("m") or varb_name.lower().startswith("x"): if varb_name.lower().startswith("m") or varb_name.lower().startswith( "x" ): varbs_active[varb_name] = varb # only select constraints that are linear or bilinear constr_active = {} Loading Loading @@ -172,6 +176,7 @@ def build_test_case(ptype=1, system = "ccro", custom_system = None, debug=False) else: return constr_sys, varbs_sys, coeff_sys, jac_sys def make_layout(soar_layout, varbs_sys, ptype=1): """ soar_layout: bilinear layout Loading @@ -179,9 +184,9 @@ def make_layout(soar_layout, varbs_sys, ptype=1): type: test to be performed """ # get number of pressure variables nmass = sum(1 for varb in varbs_sys if varb.lower().startswith('m')) nconc = sum(1 for varb in varbs_sys if varb.lower().startswith('x')) npress = sum(1 for varb in varbs_sys if varb.lower().startswith('p')) nmass = sum(1 for varb in varbs_sys if varb.lower().startswith("m")) nconc = sum(1 for varb in varbs_sys if varb.lower().startswith("x")) npress = sum(1 for varb in varbs_sys if varb.lower().startswith("p")) ncoeff = len(varbs_sys) - nmass - nconc - npress match ptype: case 100: Loading @@ -202,8 +207,3 @@ def make_layout(soar_layout, varbs_sys, ptype=1): layout = soar_layout + [False] * npress + [False] * ncoeff return layout tools/soar_utilities.py +51 −75 Original line number Diff line number Diff line Loading @@ -6,6 +6,10 @@ import matplotlib.pyplot as plt import matplotlib # soar_libraries SOARPY = r"C:\Users\2kv\OneDrive - Oak Ridge National Laboratory\C_code\ORNL\NAWI\nawi802\soar\python" sys.path.insert(0, SOARPY) import soar.optimization.fEvaluate as feval from soar.graph_labeling.fDefineSystem import ProcessGraph from soar.graph_labeling.fPlotGraph import plot_graph Loading @@ -22,16 +26,19 @@ def check_incidence(iIncidence): assert sum(row) == 0, f"Incidence matrix row sums to {sum(row)}" def define_system(iIncidence=[], coordinates = [], verbose=5, def define_system( iIncidence=[], coordinates=[], verbose=5, arcSplitter=[[5 - 1, 6 - 1, 7 - 1]], arcEqualComposition=[[1 - 1, 2 - 1], [5 - 1, 6 - 1, 7 - 1, 8 - 1]], numberOfComponents=2, strSystemID="membrane1", ): if verbose >= 0: print("******* DEFINE SYSTEM *******") pass strSystemID = "membrane1" print(strSystemID) # ============================================================================================== if verbose >= 1: Loading Loading @@ -63,8 +70,7 @@ def define_system(iIncidence=[], coordinates = [], verbose=5, ] ) # 7 if len(coordinates) == 0: coordinates = np.array([[2, 1], [0, 0], [0.75, 0], [1.5, 0], [2, -1], [3,0]]) coordinates = np.array([[2, 1], [0, 0], [0.75, 0], [1.5, 0], [2, -1], [3, 0]]) iIncidence = iIncidence.T Loading Loading @@ -110,7 +116,6 @@ def define_system(iIncidence=[], coordinates = [], verbose=5, xKnown = np.zeros([iNumberOfArc, m + 1]) xKnown[Present == 0] = np.nan one_component = True S = [[]] Loading @@ -123,11 +128,9 @@ def define_system(iIncidence=[], coordinates = [], verbose=5, print(" GROUND TRUTH ") pass xRefObservable = [] xRefRedundant = [] # ======================================================================== # SPECIFY POSSIBLE SENSOR LOCATIONS AND WEIGHTS FOR OBSERVABILITY # ======================================================================== Loading Loading @@ -164,7 +167,7 @@ def define_system(iIncidence=[], coordinates = [], verbose=5, iArcReaction=iArcReaction, iArcEnergy=iArcEnergy, iArcSplitter=iArcSplitter, iArcEqualComposition=iArcEqualComposition, # iArcEqualComposition=iArcEqualComposition, xKnown=xKnown, xMeasured=xMeasured, xSensorCandidate=xSensorCandidate, Loading @@ -179,6 +182,7 @@ def define_system(iIncidence=[], coordinates = [], verbose=5, return process_graph def get_pareto_front( process_graph, plot_all=False, Loading @@ -188,7 +192,7 @@ def get_pareto_front( exhaustive=True, save_results=False, return_all=False, verbose=5 verbose=5, ): """ :param process_graph: Soar process graph object Loading Loading @@ -251,14 +255,7 @@ def get_pareto_front( # ncriteria is if we are looking at red and obs, ncrit=2 is just obs, nCriteria 3 is all 3 nCriteria = n_criteria strProblem = ( "Front_" + str(iSystemID) + "_" + str(iObj) + "_" + str(nCriteria) ) strProblem = "Front_" + str(iSystemID) + "_" + str(iObj) + "_" + str(nCriteria) print(strProblem) if plot_all: Loading @@ -285,9 +282,7 @@ def get_pareto_front( # nObservableMax=np.sum(process_graph.rWeightObservability) [I, J] = np.where(process_graph.xSensorCandidate) process_graph.ijSensorCandidate = np.where( process_graph.xSensorCandidate.T )[::-1] process_graph.ijSensorCandidate = np.where(process_graph.xSensorCandidate.T)[::-1] process_graph.nSensorCandidate = nSensor if xExhaustive: Loading Loading @@ -325,10 +320,7 @@ def get_pareto_front( critfront = crituniq[xIncludeUniq, :] critfront[:, 1:] = -critfront[:, 1:] xLayoutOnFront = np.array( [ np.any(np.all(x[None, :] == critfront, axis=1)) for x in feval.criteria ] [np.any(np.all(x[None, :] == critfront, axis=1)) for x in feval.criteria] ) Solutions = np.where(xLayoutOnFront)[0] iObjectives = feval.criteria[Solutions, :] Loading Loading @@ -359,15 +351,12 @@ def get_pareto_front( if nCriteria >= 3: iObjectives[:, 2] = -iObjectives[:, 2] if feval.RedundancyDeficit: iObjectives[:, 2] = ( iObjectives[:, 2] + iObjectives[:, 0] ) iObjectives[:, 2] = iObjectives[:, 2] + iObjectives[:, 0] pass pass Solutions = iTree[:, 0] pass if plot_all: [_, _, _, _, _, fig, _] = figure_pareto( iObjectives, Loading Loading @@ -427,20 +416,16 @@ def get_pareto_front( "criteria": criteria, "solutions": Solutions, "process_graph": process_graph, "observableconfig": observableconfig "observableconfig": observableconfig, } else: ans = Solutions return ans def get_one_result( process_graph, layout, objective="linear", n_criteria = 3, red_deficit=True, verbose=5 process_graph, layout, objective="linear", n_criteria=3, red_deficit=True, verbose=5 ): """ :param process_graph: Soar process graph object Loading Loading @@ -477,14 +462,7 @@ def get_one_result( # ncriteria is if we are looking at red and obs, ncrit=2 is just obs nCriteria = n_criteria strProblem = ( "Front_" + str(iSystemID) + "_" + str(iObj) + "_" + str(nCriteria) ) strProblem = "Front_" + str(iSystemID) + "_" + str(iObj) + "_" + str(nCriteria) print(strProblem) match strObjective: Loading @@ -505,9 +483,7 @@ def get_one_result( # nObservableMax=np.sum(process_graph.rWeightObservability) [I, J] = np.where(process_graph.xSensorCandidate) process_graph.ijSensorCandidate = np.where( process_graph.xSensorCandidate.T )[::-1] process_graph.ijSensorCandidate = np.where(process_graph.xSensorCandidate.T)[::-1] process_graph.nSensorCandidate = nSensor if xExhaustive: feval.criteria = np.ones([nLayout, nCriteria]) * np.nan Loading Loading
numerical_labelling/labelling.py +129 −112 Original line number Diff line number Diff line Loading @@ -15,6 +15,7 @@ def int_to_bool_list(num, nSensorCandidate): bin_string = format(num, form) return [x == "1" for x in bin_string[::-1]], bin_string, form def bool_list_to_int(layout, nSensorCandidate): """ takey a boolean layout (True=measured, False=unmeasured) and converts it Loading @@ -24,7 +25,9 @@ def bool_list_to_int(layout, nSensorCandidate): if isinstance(layout, list): layout = np.array(layout) rev_layout = np.concatenate([layout[nSensorCandidate:],layout[:nSensorCandidate]]) # Assuming layout is a list rev_layout = np.concatenate( [layout[nSensorCandidate:], layout[:nSensorCandidate]] ) # Assuming layout is a list tlayout_np = np.array(rev_layout) rev_bin = [str(x) for x in tlayout_np[::-1].astype(int)] rev_bin_str = str.join("", rev_bin) Loading Loading @@ -56,7 +59,6 @@ def jacnum(x0, fx, dx=1e-6): def jacsym(constr, varbs, debug=False): """ generate jacobian symbolically constr: list of sympy constraint equations Loading @@ -75,9 +77,11 @@ def jacsym(constr, varbs, debug=False): print(f"∂({con})/∂({var}) = {dmat[row, col]}") return dmat def symbolic_is_zero(x): return sy.Symbol("?") if (e := x.equals(0)) is None else int(e) def find_zero_elements(M): # rows = np.where(np.all(A_uo==0, axis=1))[0] sym_indicator = sy.Matrix( Loading @@ -93,6 +97,7 @@ def find_nonzero_columns(M): return nonzero def get_observability_rref(Ju, bool_measured): bool_observable = bool_measured.copy() Loading @@ -110,6 +115,7 @@ def get_observability_rref(Ju, bool_measured): return bool_observable, Ju_rref, Zero def get_red_rref(J, bool_measured): """ iterates over the measurement columns Loading @@ -132,13 +138,22 @@ def get_red_rref(J, bool_measured): return bool_redundant def observability_redundancy_labelling_rref(J, bool_measured,x0={}, debug=False, le_red_method=False, enforce_integer =True): def observability_redundancy_labelling_rref( J, bool_measured, x0={}, debug=False, le_red_method=False, enforce_integer=True, ): if len(x0) > 0: J = J.subs(x0) if enforce_integer and len(x0) > 0: # convert to int matrix J = sy.Matrix(np.matrix(J).astype(int)) pass # ensure jacobian columns are all accounted for if len(bool_measured) != J.shape[1]: Loading @@ -151,6 +166,7 @@ def observability_redundancy_labelling_rref(J, bool_measured,x0={}, debug=False, bool_observable, Ju_rref, Zero = get_observability_rref(Ju, bool_measured) if le_red_method: bool_redundant = bool_measured.copy() include_rows = np.any(~Zero, axis=1) Loading @@ -159,7 +175,6 @@ def observability_redundancy_labelling_rref(J, bool_measured,x0={}, debug=False, rref_end = time.time() print(f"rref elapsed: {rref_end - rref_start}") VV = V.T @ V # leftinv_V = VV.inv() @ V.T leftinv_V_start = time.time() Loading Loading @@ -203,29 +218,29 @@ def observability_redundancy_labelling_rref(J, bool_measured,x0={}, debug=False, else: return bool_observable, bool_redundant def sort_zero_rows_to_bottom(matrix, additional_mats=[]): # Create a boolean mask where True means the row is not all zeros non_zero_mask = np.any(matrix != 0, axis=1) # Use the mask to sort rows - non-zero rows first, then zero rows sorted_matrix = np.vstack([ sorted_matrix = np.vstack( [ matrix[non_zero_mask], # All non-zero rows matrix[~non_zero_mask] # All zero rows ]) matrix[~non_zero_mask], # All zero rows ] ) sorted_additional_mats = [] if len(additional_mats) > 0: for imat in additional_mats: iadditional_mat = np.vstack([ imat[non_zero_mask], imat[~non_zero_mask] ]) iadditional_mat = np.vstack([imat[non_zero_mask], imat[~non_zero_mask]]) sorted_additional_mats.append(iadditional_mat) return (sorted_matrix, *sorted_additional_mats) # get U1, U2 def get_us(U, debug=False): """ Loading Loading @@ -288,6 +303,7 @@ def get_us(U, debug=False): return U1, U2 def get_obs_lu(U, Jm, debug=False, return_all=False): """ get observability from U matrix Loading Loading @@ -325,6 +341,7 @@ def get_obs_lu(U, Jm, debug=False, return_all=False): else: return obs, obs_cols def get_red_lu(L, Pi, U, A_m, debug=False, return_all=False): """ simply get the number of redundant transmitters. Loading Loading @@ -384,6 +401,7 @@ def get_red_lu(L, Pi, U, A_m, debug=False, return_all = False): else: return red, red_cols def observability_redundancy_labelling_lu(J, bool_measured, x0={}, debug=False): bool_measured = np.array(bool_measured) Loading @@ -396,7 +414,9 @@ def observability_redundancy_labelling_lu(J, bool_measured, x0 = {}, debug=False try: J = np.matrix(J).astype(np.float64) except TypeError: raise TypeError("Matrix contains symbolic elements. Please supply operating point x0") raise TypeError( "Matrix contains symbolic elements. Please supply operating point x0" ) Jm = J[:, list(np.where(bool_measured)[0])] Ju = J[:, list(np.where(~bool_measured)[0])] Loading Loading @@ -464,7 +484,6 @@ def observability_redundancy_labelling_lu(J, bool_measured, x0 = {}, debug=False return bool_observable, bool_redundant def observability_redundancy_labelling_qr(J, bool_measured, x0={}, debug=False): bool_measured = np.array(bool_measured) Loading @@ -477,7 +496,9 @@ def observability_redundancy_labelling_qr(J, bool_measured, x0 = {}, debug=False try: J = np.matrix(J).astype(np.float64) except TypeError: raise TypeError("Matrix contains symbolic elements. Please supply operating point x0") raise TypeError( "Matrix contains symbolic elements. Please supply operating point x0" ) Jm = J[:, list(np.where(bool_measured)[0])] Ju = J[:, list(np.where(~bool_measured)[0])] Loading Loading @@ -511,7 +532,3 @@ def observability_redundancy_labelling_qr(J, bool_measured, x0 = {}, debug=False bool_redundant = np.full(len(bool_observable), np.nan) return bool_observable, None
requirements_minimal.txt +1 −1 Original line number Diff line number Diff line python==3.12 # python==3.12 scipy jupyter numpy Loading
testing/generate_pgraph.py +111 −86 Original line number Diff line number Diff line Loading @@ -7,7 +7,14 @@ import argparse # soar_integration tools from tools.soar_utilities import check_incidence, define_system def build_pgraph(sys_name="ccro", overwrite=False, n_components=3, arc_splitter=None, arc_equal_composition = None,): def build_pgraph( sys_name="ccro", overwrite=False, n_components=3, arc_splitter=None, arc_equal_composition=None, ): """ Build soar process graph object for the CCRO example """ Loading @@ -15,12 +22,17 @@ def build_pgraph(sys_name="ccro", overwrite=False, n_components=3, arc_splitter= sys_name = str(sys_name).lower() # check if pgraph already exists pgraph_path = os.path.join(module_path, "testing", "process_graphs", f"{sys_name}_pgraph.pkl") # pgraph_path = os.path.join(module_path, "testing", "process_graphs", f"{sys_name}_pgraph.pkl") pgraph_path = os.path.join(os.getcwd(), f"{sys_name}_pgraph.pkl") if os.path.exists(pgraph_path): if overwrite: logging.warning(f"WARNING: file {pgraph_path} already exists, overwriting file") logging.warning( f"WARNING: file {pgraph_path} already exists, overwriting file" ) else: raise Exception(f"ERROR: file {pgraph_path} already exists and overwrite not specified. Aborting.") raise Exception( f"ERROR: file {pgraph_path} already exists and overwrite not specified. Aborting." ) if sys_name == "minimal": # almost the exact same layout as CCRO, but combining streams Loading @@ -35,11 +47,13 @@ def build_pgraph(sys_name="ccro", overwrite=False, n_components=3, arc_splitter= ] ) coordinates = np.array([ coordinates = np.array( [ [2, 1], # env [0.75, 0], # mx [1.5, 0], # me1 [2, -1],] # sp1 [2, -1], ] # sp1 ) if arc_splitter is None: arc_splitter = [[4 - 1, 5 - 1, 6 - 1]] Loading @@ -47,6 +61,7 @@ def build_pgraph(sys_name="ccro", overwrite=False, n_components=3, arc_splitter= arc_equal_composition = [[4 - 1, 5 - 1, 6 - 1]] if sys_name == "ccro": strSystemID = "NAWI305" incidence = np.array( [ # e P1 Mx Me1 Sp1 P2 [-1, +1, 0, 0, 0, 0], # 1 Loading @@ -60,13 +75,15 @@ def build_pgraph(sys_name="ccro", overwrite=False, n_components=3, arc_splitter= ] ) coordinates = np.array([ coordinates = np.array( [ [2, 1], # env [0, 0], # p1 [0.75, 0], # mx [1.5, 0], # me1 [2, -1], # sp1 [1.5,-1]] # p2 [1.5, -1], ] # p2 ) if arc_splitter is None: arc_splitter = [[5 - 1, 6 - 1, 7 - 1]] Loading Loading @@ -96,7 +113,8 @@ def build_pgraph(sys_name="ccro", overwrite=False, n_components=3, arc_splitter= ] ) # define coordinates for plotting process graph coordinates = np.array([ coordinates = np.array( [ [0, 0], # a [1, 0], # b [2, 0], # c Loading @@ -119,14 +137,17 @@ def build_pgraph(sys_name="ccro", overwrite=False, n_components=3, arc_splitter= check_incidence(incidence) pgraph = define_system( iIncidence=incidence, coordinates = coordinates, verbose=5, iIncidence=incidence, coordinates=coordinates, verbose=5, arcSplitter=arc_splitter, arcEqualComposition=arc_equal_composition, numberOfComponents=n_components, strSystemID=strSystemID, ) # this is the longest step, generates the cutsets. pgraph.add_graph_info(verbose=5) pgraph.add_graph_info(verbose=5, use_networkx=True) # save pgraph with open(pgraph_path, "wb") as f: Loading @@ -135,9 +156,13 @@ def build_pgraph(sys_name="ccro", overwrite=False, n_components=3, arc_splitter= if __name__ == "__main__": parser = argparse.ArgumentParser(description="Generates a pickle file for a soar processGraph") parser.add_argument('--sysname', type=str, help='Name of the system', required=True) parser.add_argument('--overwrite', action='store_true', help='overwrite existing pickle file') parser = argparse.ArgumentParser( description="Generates a pickle file for a soar processGraph" ) parser.add_argument("--sysname", type=str, help="Name of the system", required=True) parser.add_argument( "--overwrite", action="store_true", help="overwrite existing pickle file" ) # 3. Parse the arguments from the command line args = parser.parse_args() Loading
testing/generate_test_cases.py +45 −45 Original line number Diff line number Diff line import os import sys import pickle Loading @@ -24,6 +23,7 @@ from numerical_labelling.constraints import labelConstraint from numerical_labelling.labelling import jacsym from testing.generate_numerical_system import constr_minimal, constr_ccro, constr_pilot def build_test_case(ptype=1, system="ccro", custom_system=None, debug=False): """ ptype: problem type per test matrix Loading Loading @@ -85,7 +85,9 @@ def build_test_case(ptype=1, system = "ccro", custom_system = None, debug=False) cindx = 0 # exclude any nonlinear constraints for iconstr, constr in constr_sys.items(): if not constr.type.startswith("nl_") and not constr.type.startswith("bl_"): if not constr.type.startswith("nl_") and not constr.type.startswith( "bl_" ): constr_active[cindx] = constr cindx += 1 Loading @@ -94,7 +96,9 @@ def build_test_case(ptype=1, system = "ccro", custom_system = None, debug=False) # use only m and x variables varbs_active = {} for varb_name, varb in varbs_sys.items(): if varb_name.lower().startswith("m") or varb_name.lower().startswith("x"): if varb_name.lower().startswith("m") or varb_name.lower().startswith( "x" ): varbs_active[varb_name] = varb # only select constraints that are linear or bilinear constr_active = {} Loading Loading @@ -172,6 +176,7 @@ def build_test_case(ptype=1, system = "ccro", custom_system = None, debug=False) else: return constr_sys, varbs_sys, coeff_sys, jac_sys def make_layout(soar_layout, varbs_sys, ptype=1): """ soar_layout: bilinear layout Loading @@ -179,9 +184,9 @@ def make_layout(soar_layout, varbs_sys, ptype=1): type: test to be performed """ # get number of pressure variables nmass = sum(1 for varb in varbs_sys if varb.lower().startswith('m')) nconc = sum(1 for varb in varbs_sys if varb.lower().startswith('x')) npress = sum(1 for varb in varbs_sys if varb.lower().startswith('p')) nmass = sum(1 for varb in varbs_sys if varb.lower().startswith("m")) nconc = sum(1 for varb in varbs_sys if varb.lower().startswith("x")) npress = sum(1 for varb in varbs_sys if varb.lower().startswith("p")) ncoeff = len(varbs_sys) - nmass - nconc - npress match ptype: case 100: Loading @@ -202,8 +207,3 @@ def make_layout(soar_layout, varbs_sys, ptype=1): layout = soar_layout + [False] * npress + [False] * ncoeff return layout
tools/soar_utilities.py +51 −75 Original line number Diff line number Diff line Loading @@ -6,6 +6,10 @@ import matplotlib.pyplot as plt import matplotlib # soar_libraries SOARPY = r"C:\Users\2kv\OneDrive - Oak Ridge National Laboratory\C_code\ORNL\NAWI\nawi802\soar\python" sys.path.insert(0, SOARPY) import soar.optimization.fEvaluate as feval from soar.graph_labeling.fDefineSystem import ProcessGraph from soar.graph_labeling.fPlotGraph import plot_graph Loading @@ -22,16 +26,19 @@ def check_incidence(iIncidence): assert sum(row) == 0, f"Incidence matrix row sums to {sum(row)}" def define_system(iIncidence=[], coordinates = [], verbose=5, def define_system( iIncidence=[], coordinates=[], verbose=5, arcSplitter=[[5 - 1, 6 - 1, 7 - 1]], arcEqualComposition=[[1 - 1, 2 - 1], [5 - 1, 6 - 1, 7 - 1, 8 - 1]], numberOfComponents=2, strSystemID="membrane1", ): if verbose >= 0: print("******* DEFINE SYSTEM *******") pass strSystemID = "membrane1" print(strSystemID) # ============================================================================================== if verbose >= 1: Loading Loading @@ -63,8 +70,7 @@ def define_system(iIncidence=[], coordinates = [], verbose=5, ] ) # 7 if len(coordinates) == 0: coordinates = np.array([[2, 1], [0, 0], [0.75, 0], [1.5, 0], [2, -1], [3,0]]) coordinates = np.array([[2, 1], [0, 0], [0.75, 0], [1.5, 0], [2, -1], [3, 0]]) iIncidence = iIncidence.T Loading Loading @@ -110,7 +116,6 @@ def define_system(iIncidence=[], coordinates = [], verbose=5, xKnown = np.zeros([iNumberOfArc, m + 1]) xKnown[Present == 0] = np.nan one_component = True S = [[]] Loading @@ -123,11 +128,9 @@ def define_system(iIncidence=[], coordinates = [], verbose=5, print(" GROUND TRUTH ") pass xRefObservable = [] xRefRedundant = [] # ======================================================================== # SPECIFY POSSIBLE SENSOR LOCATIONS AND WEIGHTS FOR OBSERVABILITY # ======================================================================== Loading Loading @@ -164,7 +167,7 @@ def define_system(iIncidence=[], coordinates = [], verbose=5, iArcReaction=iArcReaction, iArcEnergy=iArcEnergy, iArcSplitter=iArcSplitter, iArcEqualComposition=iArcEqualComposition, # iArcEqualComposition=iArcEqualComposition, xKnown=xKnown, xMeasured=xMeasured, xSensorCandidate=xSensorCandidate, Loading @@ -179,6 +182,7 @@ def define_system(iIncidence=[], coordinates = [], verbose=5, return process_graph def get_pareto_front( process_graph, plot_all=False, Loading @@ -188,7 +192,7 @@ def get_pareto_front( exhaustive=True, save_results=False, return_all=False, verbose=5 verbose=5, ): """ :param process_graph: Soar process graph object Loading Loading @@ -251,14 +255,7 @@ def get_pareto_front( # ncriteria is if we are looking at red and obs, ncrit=2 is just obs, nCriteria 3 is all 3 nCriteria = n_criteria strProblem = ( "Front_" + str(iSystemID) + "_" + str(iObj) + "_" + str(nCriteria) ) strProblem = "Front_" + str(iSystemID) + "_" + str(iObj) + "_" + str(nCriteria) print(strProblem) if plot_all: Loading @@ -285,9 +282,7 @@ def get_pareto_front( # nObservableMax=np.sum(process_graph.rWeightObservability) [I, J] = np.where(process_graph.xSensorCandidate) process_graph.ijSensorCandidate = np.where( process_graph.xSensorCandidate.T )[::-1] process_graph.ijSensorCandidate = np.where(process_graph.xSensorCandidate.T)[::-1] process_graph.nSensorCandidate = nSensor if xExhaustive: Loading Loading @@ -325,10 +320,7 @@ def get_pareto_front( critfront = crituniq[xIncludeUniq, :] critfront[:, 1:] = -critfront[:, 1:] xLayoutOnFront = np.array( [ np.any(np.all(x[None, :] == critfront, axis=1)) for x in feval.criteria ] [np.any(np.all(x[None, :] == critfront, axis=1)) for x in feval.criteria] ) Solutions = np.where(xLayoutOnFront)[0] iObjectives = feval.criteria[Solutions, :] Loading Loading @@ -359,15 +351,12 @@ def get_pareto_front( if nCriteria >= 3: iObjectives[:, 2] = -iObjectives[:, 2] if feval.RedundancyDeficit: iObjectives[:, 2] = ( iObjectives[:, 2] + iObjectives[:, 0] ) iObjectives[:, 2] = iObjectives[:, 2] + iObjectives[:, 0] pass pass Solutions = iTree[:, 0] pass if plot_all: [_, _, _, _, _, fig, _] = figure_pareto( iObjectives, Loading Loading @@ -427,20 +416,16 @@ def get_pareto_front( "criteria": criteria, "solutions": Solutions, "process_graph": process_graph, "observableconfig": observableconfig "observableconfig": observableconfig, } else: ans = Solutions return ans def get_one_result( process_graph, layout, objective="linear", n_criteria = 3, red_deficit=True, verbose=5 process_graph, layout, objective="linear", n_criteria=3, red_deficit=True, verbose=5 ): """ :param process_graph: Soar process graph object Loading Loading @@ -477,14 +462,7 @@ def get_one_result( # ncriteria is if we are looking at red and obs, ncrit=2 is just obs nCriteria = n_criteria strProblem = ( "Front_" + str(iSystemID) + "_" + str(iObj) + "_" + str(nCriteria) ) strProblem = "Front_" + str(iSystemID) + "_" + str(iObj) + "_" + str(nCriteria) print(strProblem) match strObjective: Loading @@ -505,9 +483,7 @@ def get_one_result( # nObservableMax=np.sum(process_graph.rWeightObservability) [I, J] = np.where(process_graph.xSensorCandidate) process_graph.ijSensorCandidate = np.where( process_graph.xSensorCandidate.T )[::-1] process_graph.ijSensorCandidate = np.where(process_graph.xSensorCandidate.T)[::-1] process_graph.nSensorCandidate = nSensor if xExhaustive: feval.criteria = np.ones([nLayout, nCriteria]) * np.nan Loading