Loading documenting_failures/build_test_cases_nl.py +7 −3 Original line number Diff line number Diff line Loading @@ -27,14 +27,18 @@ from documenting_failures.setup_example_soar import define_system, get_pareto_fr import matplotlib.pyplot as plt # compare to soar from documenting_failures.test_case_nonlinear import constr_mem_nl, labelConstraint from documenting_failures.test_case_nonlinear import constr_ccro_nl, constr_pilot_nl, labelConstraint from numerical_labelling.labelling import jacsym def build_test_case_nl(ptype=1): def build_test_case_nl(ptype=1, system = "ccro"): """ ptype: problem type """ constr_nl, varbs_nl, coeff_nl = constr_mem_nl() match system: case "ccro": constr_nl, varbs_nl, coeff_nl = constr_ccro_nl() case "pilot": constr_nl, varbs_nl, coeff_nl = constr_pilot_nl() # by default, we have all of the columns for the jacobian. match ptype: # all columns included, all constraints Loading documenting_failures/setup_example_soar.py +2 −2 Original line number Diff line number Diff line Loading @@ -14,7 +14,7 @@ from sympy import symbols if sys.platform == "win32": soar_path = os.path.join(r"C:\Users\tjf\Documents\01_gitlab_repos\soar\python") elif sys.platform == "linux": soar_path = os.path.join("/home/tjf/01_wwtp/soar/python/soar") soar_path = os.path.join("/home/tjf/01_nawi/soar/python") if soar_path not in sys.path: sys.path.insert(0, soar_path) Loading Loading @@ -238,7 +238,7 @@ def get_pareto_front(process_graph, plot_all=False, objective="linear", pass # ncriteria is if we are looking at red and obs, ncrit=2 is just obs nCriteria = 2 nCriteria = 3 strProblem = ( "Front_" + str(iSystemID) Loading documenting_failures/soar_compare_nonlinear.py +6 −8 Original line number Diff line number Diff line Loading @@ -6,7 +6,7 @@ import random import traceback import yaml import numpy as np from time import time import time soar_path = os.path.join(r"C:\Users\tjf\Documents\01_gitlab_repos\soar\python") if soar_path not in sys.path: Loading Loading @@ -132,18 +132,18 @@ def test_nonlinear_soar(plot=False, random_sampling=False, ptype=1, verbose=1): xMeasured_default = pgraph.xMeasured # load pareto optimal layouts if they exist, run and save if they don't results_path = os.path.join(module_path, "documenting_failures", "pareto_fronts", "bilinear_pareto_front.pkl") results_path = os.path.join(module_path, "documenting_failures", "pareto_fronts", "bilinear_pareto_front_red.pkl") if os.path.exists(results_path): with open(results_path, "rb") as f: results = pickle.load(f) else: results = get_pareto_front(pgraph, objective="nonlinear", verbose=5) results = get_pareto_front(pgraph, objective="bilinear", verbose=5) results = [int(res) for res in results] with open(results_path, "wb") as f: pickle.dump(results, f) # file the results for the pareto front. soar_or_results_path = os.path.join(module_path, "documenting_failures", "pareto_fronts", "bilinear_pareto_front_labels.pkl") soar_or_results_path = os.path.join(module_path, "documenting_failures", "pareto_fronts", "bilinear_pareto_front_labels_red.pkl") if os.path.exists(soar_or_results_path): with open(soar_or_results_path, "rb") as f: soar_or_results = pickle.load(f) Loading Loading @@ -174,8 +174,6 @@ def test_nonlinear_soar(plot=False, random_sampling=False, ptype=1, verbose=1): constr_nl, varbs_nl, coeff_nl, jac_nl = build_test_case_nl(ptype=ptype) all_varbs = {**varbs_nl, **coeff_nl} # constr_nl, varbs_nl, consts_nl = constr_mem_nl() # jac_nl = jacsym(constr_nl.values(), varbs_nl.values()) # needed for removing values from obs, red for each method (soar doesn't have pressure) nstreams = iIncidence.shape[0] Loading Loading @@ -366,13 +364,13 @@ def test_nonlinear_soar(plot=False, random_sampling=False, ptype=1, verbose=1): if __name__ == "__main__": t1 = time() t1 = time.time() test_nonlinear_soar(random_sampling = False, ptype=1, verbose=0) test_nonlinear_soar(random_sampling = False, ptype=2, verbose=0) test_nonlinear_soar(random_sampling = False, ptype=3, verbose=0) test_nonlinear_soar(random_sampling = False, ptype=4, verbose=0) test_nonlinear_soar(random_sampling = False, ptype=5, verbose=0) t2 = time() t2 = time.time() print(f"total test time: {t2 - t1} seconds") Loading documenting_failures/soar_compare_nonlinear_pilot.py 0 → 100644 +406 −0 Original line number Diff line number Diff line import os import sys import pickle import random import traceback import yaml import numpy as np from time import time soar_path = os.path.join(r"C:\Users\tjf\Documents\01_gitlab_repos\soar\python") if soar_path not in sys.path: sys.path.insert(0, soar_path) else: print("soar toolbox already in path") # add module_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) if module_path not in sys.path: sys.path.insert(0, module_path) else: print("path already in sys.path") from soar.graph_labeling.genobs.fGenObs import general_observability from soar.graph_labeling.genred.fGenRed import genred from soar.graph_labeling.fDefineSystem import ProcessGraph from setup_example_soar import define_system, get_pareto_front, get_one_result from soar.optimization.fEvaluate import evaluate_obs_red # for plotting from soar.graph_labeling.fPlotGraph import plot_graph import matplotlib.pyplot as plt from numerical_labelling.labelling import * # compare to soar from build_test_cases_nl import build_test_case_nl, make_layout import soar.optimization.fEvaluate as feval import matplotlib.pyplot as plt def int_to_bool_list(num, nSensorCandidate): """ from Soar toolbox, creates unique id for each possible sensor layout. takes num (sensor layout number, from 0 to 2**nSensorCandidate) converts to binary, and uses that for measured/unmeasured determination. """ form = format(int(nSensorCandidate), "d") + "b" bin_string = format(num, form) return [x == "1" for x in bin_string[::-1]], bin_string, form def check_incidence(iIncidence): """ simple check that in nodes equal to out nodes (graph is closed) """ for row in iIncidence: assert sum(row) == 0, f"Incidence matrix row sums to {sum(row)}" def log_err(error_log, e, result, method=""): err_trace = traceback.format_exc() if result not in error_log.keys(): error_log[result] = {} error_log[result][method] = { "error":e, "method":method, "trace":err_trace, } return error_log def log_fail(result, failures, error_log, method, or_method, or_soar, layout): """ method: the value we've gotten, and the method used (eg obs_rref, red_lu) or_method: observability or redundancy for method or_soar: obs or red from soar """ if result not in failures.keys(): failures[result] = {} if method not in error_log[result].keys(): failures[result][method] = { "method":or_method, "soar":or_soar, "layout":layout, } else: failures[result][method] = "error" return failures def log_success(result, successes, method, soar, lu, layout): """ method: results for method (could be tuple of obs and red) soar: results for soar lu: results for lu layout: bool array of measured vars """ successes[result]= { "method":method, "soar":soar, "lu":lu, "layout":layout, } return successes def test_nonlinear_soar(plot=False, random_sampling=False, ptype=1, verbose=1): # establish pilot graph: # 12 nodes: # mix (a), pump (b), mem 1 (c), # mem 2 (d), mix 2 (e), # mem 3 (f), mix 3 (g), # mem 4 (h), mix 4 (i), # split 1 (j), recyc valve (k) # environment (l) iIncidence = np.array( [ # a b c d e f g h i j k l [+1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, -1], # 1 [-1, +1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], # 2 [ 0, -1, +1, 0, 0, 0, 0, 0, 0, 0, 0, 0], # 3 [ 0, 0, -1, 0, +1, 0, 0, 0, 0, 0, 0, 0], # 4 [ 0, 0, -1, +1, 0, 0, 0, 0, 0, 0, 0, 0], # 5 [ 0, 0, 0, -1, +1, 0, 0, 0, 0, 0, 0, 0], # 6 [ 0, 0, 0, 0, -1, 0, +1, 0, 0, 0, 0, 0], # 7 [ 0, 0, 0, -1, 0, +1, 0, 0, 0, 0, 0, 0], # 8 [ 0, 0, 0, 0, 0, -1, +1, 0, 0, 0, 0, 0], # 9 [ 0, 0, 0, 0, 0, 0, -1, 0, +1, 0, 0, 0], # 10 [ 0, 0, 0, 0, 0, -1, 0, +1, 0, 0, 0, 0], # 11 [ 0, 0, 0, 0, 0, 0, 0, -1, +1, 0, 0, 0], # 12 [ 0, 0, 0, 0, 0, 0, 0, 0, -1, 0, 0, +1], # 13 [ 0, 0, 0, 0, 0, 0, 0, -1, 0, +1, 0, 0], # 14 [ 0, 0, 0, 0, 0, 0, 0, 0, 0, -1, 0, +1], # 15 [ 0, 0, 0, 0, 0, 0, 0, 0, 0, -1, +1, 0], # 16 [+1, 0, 0, 0, 0, 0, 0, 0, 0, 0, -1, 0], # 17 ] ) check_incidence(iIncidence) # define coordinates for plotting process graph coordinates = np.array([ [2, 1], #env [0, 0], #p1 [0.75, 0], #mx [1.5, 0], #me1 [2, -1], # sp1 [1.5,-1]] # p2 ) pgraph = define_system( iIncidence=iIncidence, coordinates = coordinates, verbose=verbose, arcSplitter =[[5 - 1, 6 - 1, 7 - 1]], arcEqualComposition = [[1-1, 2-1], [5 - 1, 6 - 1, 7 - 1, 8 - 1]], numberOfComponents = 3, ) pgraph.add_graph_info(verbose=5) xMeasured_default = pgraph.xMeasured # load pareto optimal layouts if they exist, run and save if they don't results_path = os.path.join(module_path, "documenting_failures", "pareto_fronts", "bilinear_pareto_front.pkl") if os.path.exists(results_path): with open(results_path, "rb") as f: results = pickle.load(f) else: results = get_pareto_front(pgraph, objective="nonlinear", verbose=5) results = [int(res) for res in results] with open(results_path, "wb") as f: pickle.dump(results, f) # file the results for the pareto front. soar_or_results_path = os.path.join(module_path, "documenting_failures", "pareto_fronts", "bilinear_pareto_front_labels.pkl") if os.path.exists(soar_or_results_path): with open(soar_or_results_path, "rb") as f: soar_or_results = pickle.load(f) else: soar_or_results = {} # use the nonlinear pareto front for comparison # generate cases where we can tell what is observable in the nonlinear system # picking cases where the pressures are known. # plot if plot: fig = plot_graph(pgraph) fig.canvas.draw() fig.canvas.flush_events() plt.show(block=False) # x0 = get_x0_nl() # # x0_file = os.path.join(os.path.dirname(__file__), "x0_solved.pkl") # with open(x0_file, "rb") as f: # x0 = pickle.load(f) x0_file = os.path.join(module_path, "operating_point_search", "saved_x0", "x0_ccro_final_ans_rev2.yaml") with open(x0_file, "r") as f: x0 = yaml.safe_load(f) # typecheck for key, val in x0.items(): x0[key] = int(val) constr_nl, varbs_nl, coeff_nl, jac_nl = build_test_case_nl(ptype=ptype) all_varbs = {**varbs_nl, **coeff_nl} # needed for removing values from obs, red for each method (soar doesn't have pressure) nstreams = iIncidence.shape[0] ncoeffs = len(coeff_nl) failures = {} error_log = {} success_log = {} # random.seed = 42 # if random_sampling: # results = random.sample(results, 20) # add 100 random layouts with higher number of measured sensors, so we have # redundant sensor layouts tested random.seed(42) results += random.choices(range(30000, 65536), k=200) # results = [ # # 8512, # # 4169, # # 17538, # # 16646, # # 3104, # 21800, # adding redundant layouts # 65535, # 31679, # 56134, # ] # for message at end errs_rref = 0 errs_lu = 0 errs_qr = 0 fails_rref_obs = 0 fails_lu_obs = 0 fails_qr_obs = 0 fails_rref_red = 0 fails_lu_red = 0 fails_qr_red = 0 for result in results: pgraph.xMeasured = xMeasured_default.copy() # get the result if we have saved it if result in soar_or_results.keys(): sens, obs, red = soar_or_results[result] else: sens, obs, red = get_one_result( pgraph, result, objective="bilinear", verbose=verbose ) soar_or_results[result] = sens, obs, red # for concentrations we have it backwards. rev_sens = np.concatenate([sens[8:],sens[0:8]]) # convert to list. in soar -1 means undetermined, right? obs_soar_mass = np.array([True if s==1 else False for s in obs[:,-1]]) red_soar_mass = np.array([True if s==1 else False for s in red[:,-1]]) # add the concentration sensors obs_soar_conc = np.array([True if s==1 else False for s in obs[:,0]]) red_soar_conc = np.array([True if s==1 else False for s in red[:,0]]) obs_soar = np.concatenate([obs_soar_mass, obs_soar_conc]) red_soar = np.concatenate([red_soar_mass, red_soar_conc]) bl_layout = list(np.array(rev_sens).astype(bool)) # make the layout including pressure, other variables # layout = layout+[False]*8 layout = make_layout(bl_layout, all_varbs, ptype=ptype) error_log[result] = {} try: obs_rref_p, red_rref_p = observability_redundancy_labelling_rref(jac_nl, layout, x0=x0) except Exception as e: err_log = log_err(error_log, e, result, method="rref") obs_rref_p = np.full(len(obs_soar)+8, fill_value=np.nan) red_rref_p = np.full(len(red_soar)+8, fill_value=np.nan) errs_rref+=1 try: obs_lu_p, red_lu_p = observability_redundancy_labelling_lu(jac_nl, layout, x0=x0) except Exception as e: err_log = log_err(error_log, e, result, method="lu") # add in a nan array so we can run the other sections obs_lu_p = np.full(len(obs_soar)+8, fill_value=np.nan) red_lu_p = np.full(len(red_soar)+8, fill_value=np.nan) errs_lu+=1 try: obs_qr_p, red_qr_p = observability_redundancy_labelling_qr(jac_nl, layout, x0=x0) except Exception as e: err_log = log_err(error_log, e, result, method="qr") obs_qr_p = np.full(len(obs_soar)+8, fill_value=np.nan) red_qr_p = np.full(len(red_soar)+8, fill_value=np.nan) errs_qr+=1 # for comparison, remove the values for pressure, coeff # TODO: structure this like the SOAR matrix, so we can refer to column ncut = nstreams + ncoeffs obs_rref = obs_rref_p[:-ncut] red_rref = red_rref_p[:-ncut] obs_lu = obs_lu_p[:-ncut] red_lu = red_lu_p[:-ncut] obs_qr = obs_qr_p[:-ncut] # red_qr = red_qr_p[:-8] red_qr = np.full(len(red_soar), fill_value=np.nan) obs_rref_equ = np.array_equal(obs_rref, obs_soar) obs_lu_equ = np.array_equal(obs_lu, obs_soar) obs_qr_equ = np.array_equal(obs_qr, obs_soar) red_rref_equ = np.array_equal(red_rref, red_soar) red_lu_equ = np.array_equal(red_lu, red_soar) red_qr_equ = np.array_equal(red_qr, red_soar) failures[result] = {} if not obs_rref_equ: failures = log_fail(result, failures, error_log, "obs_rref", obs_rref, obs_soar, layout) fails_rref_obs += 1 if not red_rref_equ: failures = log_fail(result, failures, error_log, "red_rref", red_rref, red_soar, layout) fails_rref_red += 1 if not obs_lu_equ: failures = log_fail(result, failures, error_log, "obs_lu", obs_lu, obs_soar, layout) fails_lu_obs += 1 if not red_lu_equ: failures = log_fail(result, failures, error_log, "red_lu", red_lu, red_soar, layout) fails_lu_red += 1 if not obs_qr_equ: failures = log_fail(result, failures, error_log, "obs_qr", obs_qr, obs_soar, layout) fails_qr_obs += 1 if not red_qr_equ: failures = log_fail(result, failures, error_log, "red_qr", red_qr, red_soar, layout) fails_qr_red += 1 if (obs_rref_equ and red_rref_equ) and (not obs_lu_equ or not red_lu_equ): success_log = log_success( result=result, successes=success_log, method=(obs_rref, red_rref), soar=(obs_soar, red_soar), lu=(obs_lu, red_lu), layout = layout, ) # save the soar results to save time: with open(soar_or_results_path, "wb") as f: pickle.dump(soar_or_results, f) print("!"*80) print(f"!!! Method {str(ptype)} results:") print("!"*80) if len(error_log) > 0: err_log = os.path.join(rf".\documenting_failures\errors\nonlinear_errors_type{str(ptype)}.pkl") if not os.path.exists(os.path.dirname(err_log)): os.mkdir(os.path.dirname(err_log)) with open(err_log, "wb") as f: pickle.dump(obj=error_log, file=f) print(f"errors out of {len(results)} layouts:\nrref: {errs_rref}\nLU: {errs_lu}\nQR: {errs_qr}") else: print("no errors") if len(failures) > 0: fail_log = os.path.join(rf".\documenting_failures\failures\nonlinear_failures_type{str(ptype)}.pkl") if not os.path.exists(os.path.dirname(fail_log)): os.mkdir(os.path.dirname(fail_log)) with open(fail_log, "wb") as f: pickle.dump(obj=failures, file=f) print(f"failures out of {len(results)} layouts obs:\nrref: {fails_rref_obs}\nLU: {fails_lu_obs}\nQR: {fails_qr_obs}") print(f"failures out of {len(results)} layouts red:\nrref: {fails_rref_red}\nLU: {fails_lu_red}\nQR: {fails_qr_red}") else: print("all layouts pass for linear system") if len(success_log) > 0: success_path = os.path.join(rf".\documenting_failures\successes\nonlinear_successes_type{str(ptype)}.pkl") if not os.path.exists(os.path.dirname(success_path)): os.mkdir(os.path.dirname(success_path)) with open(success_path, "wb") as f: pickle.dump(obj=success_log, file=f) print(f"RREF agree SOAR, LU disagree in {len(success_log)} layouts out of {len(results)}") if __name__ == "__main__": t1 = time() test_nonlinear_soar(random_sampling = False, ptype=1, verbose=0) test_nonlinear_soar(random_sampling = False, ptype=2, verbose=0) test_nonlinear_soar(random_sampling = False, ptype=3, verbose=0) test_nonlinear_soar(random_sampling = False, ptype=4, verbose=0) test_nonlinear_soar(random_sampling = False, ptype=5, verbose=0) t2 = time() print(f"total test time: {t2 - t1} seconds") documenting_failures/soar_pgraphs/generate_pgraph.py +1 −3 Original line number Diff line number Diff line Loading @@ -17,8 +17,6 @@ else: print("soar toolbox already in path") # add print(soar_path) assert os.path.exists(soar_path) module_path = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) print(module_path) if module_path not in sys.path: Loading Loading
documenting_failures/build_test_cases_nl.py +7 −3 Original line number Diff line number Diff line Loading @@ -27,14 +27,18 @@ from documenting_failures.setup_example_soar import define_system, get_pareto_fr import matplotlib.pyplot as plt # compare to soar from documenting_failures.test_case_nonlinear import constr_mem_nl, labelConstraint from documenting_failures.test_case_nonlinear import constr_ccro_nl, constr_pilot_nl, labelConstraint from numerical_labelling.labelling import jacsym def build_test_case_nl(ptype=1): def build_test_case_nl(ptype=1, system = "ccro"): """ ptype: problem type """ constr_nl, varbs_nl, coeff_nl = constr_mem_nl() match system: case "ccro": constr_nl, varbs_nl, coeff_nl = constr_ccro_nl() case "pilot": constr_nl, varbs_nl, coeff_nl = constr_pilot_nl() # by default, we have all of the columns for the jacobian. match ptype: # all columns included, all constraints Loading
documenting_failures/setup_example_soar.py +2 −2 Original line number Diff line number Diff line Loading @@ -14,7 +14,7 @@ from sympy import symbols if sys.platform == "win32": soar_path = os.path.join(r"C:\Users\tjf\Documents\01_gitlab_repos\soar\python") elif sys.platform == "linux": soar_path = os.path.join("/home/tjf/01_wwtp/soar/python/soar") soar_path = os.path.join("/home/tjf/01_nawi/soar/python") if soar_path not in sys.path: sys.path.insert(0, soar_path) Loading Loading @@ -238,7 +238,7 @@ def get_pareto_front(process_graph, plot_all=False, objective="linear", pass # ncriteria is if we are looking at red and obs, ncrit=2 is just obs nCriteria = 2 nCriteria = 3 strProblem = ( "Front_" + str(iSystemID) Loading
documenting_failures/soar_compare_nonlinear.py +6 −8 Original line number Diff line number Diff line Loading @@ -6,7 +6,7 @@ import random import traceback import yaml import numpy as np from time import time import time soar_path = os.path.join(r"C:\Users\tjf\Documents\01_gitlab_repos\soar\python") if soar_path not in sys.path: Loading Loading @@ -132,18 +132,18 @@ def test_nonlinear_soar(plot=False, random_sampling=False, ptype=1, verbose=1): xMeasured_default = pgraph.xMeasured # load pareto optimal layouts if they exist, run and save if they don't results_path = os.path.join(module_path, "documenting_failures", "pareto_fronts", "bilinear_pareto_front.pkl") results_path = os.path.join(module_path, "documenting_failures", "pareto_fronts", "bilinear_pareto_front_red.pkl") if os.path.exists(results_path): with open(results_path, "rb") as f: results = pickle.load(f) else: results = get_pareto_front(pgraph, objective="nonlinear", verbose=5) results = get_pareto_front(pgraph, objective="bilinear", verbose=5) results = [int(res) for res in results] with open(results_path, "wb") as f: pickle.dump(results, f) # file the results for the pareto front. soar_or_results_path = os.path.join(module_path, "documenting_failures", "pareto_fronts", "bilinear_pareto_front_labels.pkl") soar_or_results_path = os.path.join(module_path, "documenting_failures", "pareto_fronts", "bilinear_pareto_front_labels_red.pkl") if os.path.exists(soar_or_results_path): with open(soar_or_results_path, "rb") as f: soar_or_results = pickle.load(f) Loading Loading @@ -174,8 +174,6 @@ def test_nonlinear_soar(plot=False, random_sampling=False, ptype=1, verbose=1): constr_nl, varbs_nl, coeff_nl, jac_nl = build_test_case_nl(ptype=ptype) all_varbs = {**varbs_nl, **coeff_nl} # constr_nl, varbs_nl, consts_nl = constr_mem_nl() # jac_nl = jacsym(constr_nl.values(), varbs_nl.values()) # needed for removing values from obs, red for each method (soar doesn't have pressure) nstreams = iIncidence.shape[0] Loading Loading @@ -366,13 +364,13 @@ def test_nonlinear_soar(plot=False, random_sampling=False, ptype=1, verbose=1): if __name__ == "__main__": t1 = time() t1 = time.time() test_nonlinear_soar(random_sampling = False, ptype=1, verbose=0) test_nonlinear_soar(random_sampling = False, ptype=2, verbose=0) test_nonlinear_soar(random_sampling = False, ptype=3, verbose=0) test_nonlinear_soar(random_sampling = False, ptype=4, verbose=0) test_nonlinear_soar(random_sampling = False, ptype=5, verbose=0) t2 = time() t2 = time.time() print(f"total test time: {t2 - t1} seconds") Loading
documenting_failures/soar_compare_nonlinear_pilot.py 0 → 100644 +406 −0 Original line number Diff line number Diff line import os import sys import pickle import random import traceback import yaml import numpy as np from time import time soar_path = os.path.join(r"C:\Users\tjf\Documents\01_gitlab_repos\soar\python") if soar_path not in sys.path: sys.path.insert(0, soar_path) else: print("soar toolbox already in path") # add module_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) if module_path not in sys.path: sys.path.insert(0, module_path) else: print("path already in sys.path") from soar.graph_labeling.genobs.fGenObs import general_observability from soar.graph_labeling.genred.fGenRed import genred from soar.graph_labeling.fDefineSystem import ProcessGraph from setup_example_soar import define_system, get_pareto_front, get_one_result from soar.optimization.fEvaluate import evaluate_obs_red # for plotting from soar.graph_labeling.fPlotGraph import plot_graph import matplotlib.pyplot as plt from numerical_labelling.labelling import * # compare to soar from build_test_cases_nl import build_test_case_nl, make_layout import soar.optimization.fEvaluate as feval import matplotlib.pyplot as plt def int_to_bool_list(num, nSensorCandidate): """ from Soar toolbox, creates unique id for each possible sensor layout. takes num (sensor layout number, from 0 to 2**nSensorCandidate) converts to binary, and uses that for measured/unmeasured determination. """ form = format(int(nSensorCandidate), "d") + "b" bin_string = format(num, form) return [x == "1" for x in bin_string[::-1]], bin_string, form def check_incidence(iIncidence): """ simple check that in nodes equal to out nodes (graph is closed) """ for row in iIncidence: assert sum(row) == 0, f"Incidence matrix row sums to {sum(row)}" def log_err(error_log, e, result, method=""): err_trace = traceback.format_exc() if result not in error_log.keys(): error_log[result] = {} error_log[result][method] = { "error":e, "method":method, "trace":err_trace, } return error_log def log_fail(result, failures, error_log, method, or_method, or_soar, layout): """ method: the value we've gotten, and the method used (eg obs_rref, red_lu) or_method: observability or redundancy for method or_soar: obs or red from soar """ if result not in failures.keys(): failures[result] = {} if method not in error_log[result].keys(): failures[result][method] = { "method":or_method, "soar":or_soar, "layout":layout, } else: failures[result][method] = "error" return failures def log_success(result, successes, method, soar, lu, layout): """ method: results for method (could be tuple of obs and red) soar: results for soar lu: results for lu layout: bool array of measured vars """ successes[result]= { "method":method, "soar":soar, "lu":lu, "layout":layout, } return successes def test_nonlinear_soar(plot=False, random_sampling=False, ptype=1, verbose=1): # establish pilot graph: # 12 nodes: # mix (a), pump (b), mem 1 (c), # mem 2 (d), mix 2 (e), # mem 3 (f), mix 3 (g), # mem 4 (h), mix 4 (i), # split 1 (j), recyc valve (k) # environment (l) iIncidence = np.array( [ # a b c d e f g h i j k l [+1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, -1], # 1 [-1, +1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], # 2 [ 0, -1, +1, 0, 0, 0, 0, 0, 0, 0, 0, 0], # 3 [ 0, 0, -1, 0, +1, 0, 0, 0, 0, 0, 0, 0], # 4 [ 0, 0, -1, +1, 0, 0, 0, 0, 0, 0, 0, 0], # 5 [ 0, 0, 0, -1, +1, 0, 0, 0, 0, 0, 0, 0], # 6 [ 0, 0, 0, 0, -1, 0, +1, 0, 0, 0, 0, 0], # 7 [ 0, 0, 0, -1, 0, +1, 0, 0, 0, 0, 0, 0], # 8 [ 0, 0, 0, 0, 0, -1, +1, 0, 0, 0, 0, 0], # 9 [ 0, 0, 0, 0, 0, 0, -1, 0, +1, 0, 0, 0], # 10 [ 0, 0, 0, 0, 0, -1, 0, +1, 0, 0, 0, 0], # 11 [ 0, 0, 0, 0, 0, 0, 0, -1, +1, 0, 0, 0], # 12 [ 0, 0, 0, 0, 0, 0, 0, 0, -1, 0, 0, +1], # 13 [ 0, 0, 0, 0, 0, 0, 0, -1, 0, +1, 0, 0], # 14 [ 0, 0, 0, 0, 0, 0, 0, 0, 0, -1, 0, +1], # 15 [ 0, 0, 0, 0, 0, 0, 0, 0, 0, -1, +1, 0], # 16 [+1, 0, 0, 0, 0, 0, 0, 0, 0, 0, -1, 0], # 17 ] ) check_incidence(iIncidence) # define coordinates for plotting process graph coordinates = np.array([ [2, 1], #env [0, 0], #p1 [0.75, 0], #mx [1.5, 0], #me1 [2, -1], # sp1 [1.5,-1]] # p2 ) pgraph = define_system( iIncidence=iIncidence, coordinates = coordinates, verbose=verbose, arcSplitter =[[5 - 1, 6 - 1, 7 - 1]], arcEqualComposition = [[1-1, 2-1], [5 - 1, 6 - 1, 7 - 1, 8 - 1]], numberOfComponents = 3, ) pgraph.add_graph_info(verbose=5) xMeasured_default = pgraph.xMeasured # load pareto optimal layouts if they exist, run and save if they don't results_path = os.path.join(module_path, "documenting_failures", "pareto_fronts", "bilinear_pareto_front.pkl") if os.path.exists(results_path): with open(results_path, "rb") as f: results = pickle.load(f) else: results = get_pareto_front(pgraph, objective="nonlinear", verbose=5) results = [int(res) for res in results] with open(results_path, "wb") as f: pickle.dump(results, f) # file the results for the pareto front. soar_or_results_path = os.path.join(module_path, "documenting_failures", "pareto_fronts", "bilinear_pareto_front_labels.pkl") if os.path.exists(soar_or_results_path): with open(soar_or_results_path, "rb") as f: soar_or_results = pickle.load(f) else: soar_or_results = {} # use the nonlinear pareto front for comparison # generate cases where we can tell what is observable in the nonlinear system # picking cases where the pressures are known. # plot if plot: fig = plot_graph(pgraph) fig.canvas.draw() fig.canvas.flush_events() plt.show(block=False) # x0 = get_x0_nl() # # x0_file = os.path.join(os.path.dirname(__file__), "x0_solved.pkl") # with open(x0_file, "rb") as f: # x0 = pickle.load(f) x0_file = os.path.join(module_path, "operating_point_search", "saved_x0", "x0_ccro_final_ans_rev2.yaml") with open(x0_file, "r") as f: x0 = yaml.safe_load(f) # typecheck for key, val in x0.items(): x0[key] = int(val) constr_nl, varbs_nl, coeff_nl, jac_nl = build_test_case_nl(ptype=ptype) all_varbs = {**varbs_nl, **coeff_nl} # needed for removing values from obs, red for each method (soar doesn't have pressure) nstreams = iIncidence.shape[0] ncoeffs = len(coeff_nl) failures = {} error_log = {} success_log = {} # random.seed = 42 # if random_sampling: # results = random.sample(results, 20) # add 100 random layouts with higher number of measured sensors, so we have # redundant sensor layouts tested random.seed(42) results += random.choices(range(30000, 65536), k=200) # results = [ # # 8512, # # 4169, # # 17538, # # 16646, # # 3104, # 21800, # adding redundant layouts # 65535, # 31679, # 56134, # ] # for message at end errs_rref = 0 errs_lu = 0 errs_qr = 0 fails_rref_obs = 0 fails_lu_obs = 0 fails_qr_obs = 0 fails_rref_red = 0 fails_lu_red = 0 fails_qr_red = 0 for result in results: pgraph.xMeasured = xMeasured_default.copy() # get the result if we have saved it if result in soar_or_results.keys(): sens, obs, red = soar_or_results[result] else: sens, obs, red = get_one_result( pgraph, result, objective="bilinear", verbose=verbose ) soar_or_results[result] = sens, obs, red # for concentrations we have it backwards. rev_sens = np.concatenate([sens[8:],sens[0:8]]) # convert to list. in soar -1 means undetermined, right? obs_soar_mass = np.array([True if s==1 else False for s in obs[:,-1]]) red_soar_mass = np.array([True if s==1 else False for s in red[:,-1]]) # add the concentration sensors obs_soar_conc = np.array([True if s==1 else False for s in obs[:,0]]) red_soar_conc = np.array([True if s==1 else False for s in red[:,0]]) obs_soar = np.concatenate([obs_soar_mass, obs_soar_conc]) red_soar = np.concatenate([red_soar_mass, red_soar_conc]) bl_layout = list(np.array(rev_sens).astype(bool)) # make the layout including pressure, other variables # layout = layout+[False]*8 layout = make_layout(bl_layout, all_varbs, ptype=ptype) error_log[result] = {} try: obs_rref_p, red_rref_p = observability_redundancy_labelling_rref(jac_nl, layout, x0=x0) except Exception as e: err_log = log_err(error_log, e, result, method="rref") obs_rref_p = np.full(len(obs_soar)+8, fill_value=np.nan) red_rref_p = np.full(len(red_soar)+8, fill_value=np.nan) errs_rref+=1 try: obs_lu_p, red_lu_p = observability_redundancy_labelling_lu(jac_nl, layout, x0=x0) except Exception as e: err_log = log_err(error_log, e, result, method="lu") # add in a nan array so we can run the other sections obs_lu_p = np.full(len(obs_soar)+8, fill_value=np.nan) red_lu_p = np.full(len(red_soar)+8, fill_value=np.nan) errs_lu+=1 try: obs_qr_p, red_qr_p = observability_redundancy_labelling_qr(jac_nl, layout, x0=x0) except Exception as e: err_log = log_err(error_log, e, result, method="qr") obs_qr_p = np.full(len(obs_soar)+8, fill_value=np.nan) red_qr_p = np.full(len(red_soar)+8, fill_value=np.nan) errs_qr+=1 # for comparison, remove the values for pressure, coeff # TODO: structure this like the SOAR matrix, so we can refer to column ncut = nstreams + ncoeffs obs_rref = obs_rref_p[:-ncut] red_rref = red_rref_p[:-ncut] obs_lu = obs_lu_p[:-ncut] red_lu = red_lu_p[:-ncut] obs_qr = obs_qr_p[:-ncut] # red_qr = red_qr_p[:-8] red_qr = np.full(len(red_soar), fill_value=np.nan) obs_rref_equ = np.array_equal(obs_rref, obs_soar) obs_lu_equ = np.array_equal(obs_lu, obs_soar) obs_qr_equ = np.array_equal(obs_qr, obs_soar) red_rref_equ = np.array_equal(red_rref, red_soar) red_lu_equ = np.array_equal(red_lu, red_soar) red_qr_equ = np.array_equal(red_qr, red_soar) failures[result] = {} if not obs_rref_equ: failures = log_fail(result, failures, error_log, "obs_rref", obs_rref, obs_soar, layout) fails_rref_obs += 1 if not red_rref_equ: failures = log_fail(result, failures, error_log, "red_rref", red_rref, red_soar, layout) fails_rref_red += 1 if not obs_lu_equ: failures = log_fail(result, failures, error_log, "obs_lu", obs_lu, obs_soar, layout) fails_lu_obs += 1 if not red_lu_equ: failures = log_fail(result, failures, error_log, "red_lu", red_lu, red_soar, layout) fails_lu_red += 1 if not obs_qr_equ: failures = log_fail(result, failures, error_log, "obs_qr", obs_qr, obs_soar, layout) fails_qr_obs += 1 if not red_qr_equ: failures = log_fail(result, failures, error_log, "red_qr", red_qr, red_soar, layout) fails_qr_red += 1 if (obs_rref_equ and red_rref_equ) and (not obs_lu_equ or not red_lu_equ): success_log = log_success( result=result, successes=success_log, method=(obs_rref, red_rref), soar=(obs_soar, red_soar), lu=(obs_lu, red_lu), layout = layout, ) # save the soar results to save time: with open(soar_or_results_path, "wb") as f: pickle.dump(soar_or_results, f) print("!"*80) print(f"!!! Method {str(ptype)} results:") print("!"*80) if len(error_log) > 0: err_log = os.path.join(rf".\documenting_failures\errors\nonlinear_errors_type{str(ptype)}.pkl") if not os.path.exists(os.path.dirname(err_log)): os.mkdir(os.path.dirname(err_log)) with open(err_log, "wb") as f: pickle.dump(obj=error_log, file=f) print(f"errors out of {len(results)} layouts:\nrref: {errs_rref}\nLU: {errs_lu}\nQR: {errs_qr}") else: print("no errors") if len(failures) > 0: fail_log = os.path.join(rf".\documenting_failures\failures\nonlinear_failures_type{str(ptype)}.pkl") if not os.path.exists(os.path.dirname(fail_log)): os.mkdir(os.path.dirname(fail_log)) with open(fail_log, "wb") as f: pickle.dump(obj=failures, file=f) print(f"failures out of {len(results)} layouts obs:\nrref: {fails_rref_obs}\nLU: {fails_lu_obs}\nQR: {fails_qr_obs}") print(f"failures out of {len(results)} layouts red:\nrref: {fails_rref_red}\nLU: {fails_lu_red}\nQR: {fails_qr_red}") else: print("all layouts pass for linear system") if len(success_log) > 0: success_path = os.path.join(rf".\documenting_failures\successes\nonlinear_successes_type{str(ptype)}.pkl") if not os.path.exists(os.path.dirname(success_path)): os.mkdir(os.path.dirname(success_path)) with open(success_path, "wb") as f: pickle.dump(obj=success_log, file=f) print(f"RREF agree SOAR, LU disagree in {len(success_log)} layouts out of {len(results)}") if __name__ == "__main__": t1 = time() test_nonlinear_soar(random_sampling = False, ptype=1, verbose=0) test_nonlinear_soar(random_sampling = False, ptype=2, verbose=0) test_nonlinear_soar(random_sampling = False, ptype=3, verbose=0) test_nonlinear_soar(random_sampling = False, ptype=4, verbose=0) test_nonlinear_soar(random_sampling = False, ptype=5, verbose=0) t2 = time() print(f"total test time: {t2 - t1} seconds")
documenting_failures/soar_pgraphs/generate_pgraph.py +1 −3 Original line number Diff line number Diff line Loading @@ -17,8 +17,6 @@ else: print("soar toolbox already in path") # add print(soar_path) assert os.path.exists(soar_path) module_path = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) print(module_path) if module_path not in sys.path: Loading