Commit e35f0ea5 authored by Blais, Chris's avatar Blais, Chris
Browse files

clean up redundant script

parent 89ef0add
Loading
Loading
Loading
Loading
+0 −227
Original line number Diff line number Diff line

import os
import sys
import pickle
import numpy as np

# soar_path = os.path.join(r"C:\Users\tjf\Documents\01_gitlab_repos\soar\python")
# if soar_path not in sys.path:
#     sys.path.insert(0, soar_path)
# else:
#     print("soar toolbox already in path")

# add 
module_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
if module_path not in sys.path:
    sys.path.insert(0, module_path)
else:
    print("path already in sys.path")
# from soar.graph_labeling.genobs.fGenObs import general_observability
# from soar.graph_labeling.genred.fGenRed import genred
# from soar.graph_labeling.fDefineSystem import ProcessGraph
# from setup_example_soar import define_system, get_pareto_front, get_one_result
# from soar.optimization.fEvaluate import evaluate_obs_red

# for plotting
# from soar.graph_labeling.fPlotGraph import plot_graph
import matplotlib.pyplot as plt
from numerical_labelling.labelling import *

# compare to soar
from documenting_failures.test_case_nonlinear import *
# import soar.optimization.fEvaluate as feval
import matplotlib.pyplot as plt


def int_to_bool_list(num, nSensorCandidate):
    """
    from Soar toolbox, creates unique id for each possible sensor layout. 
    takes num (sensor layout number, from 0 to 2**nSensorCandidate)
    converts to binary, and uses that for measured/unmeasured determination. 
    """
    form = format(int(nSensorCandidate), "d") + "b"
    bin_string = format(num, form)
    return [x == "1" for x in bin_string[::-1]], bin_string, form

def test_nonlinear_soar(plot=False):
    # establish simple membrane system for testing
    iIncidence = np.array(
        [   # e   P1  Mx Me1 Sp1  P2
            [-1, +1,  0,  0,  0,  0],  # 1
            [ 0, -1, +1,  0,  0,  0],  # 2
            [ 0,  0, -1, +1,  0,  0],  # 3
            [+1,  0,  0, -1,  0,  0],  # 4
            [ 0,  0,  0, -1, +1,  0],  # 5
            [+1,  0,  0,  0, -1,  0],  # 6
            [ 0,  0,  0,  0, -1, +1],  # 7
            [ 0,  0, +1,  0,  0, -1],  # 8
        ]
    )
    check_incidence(iIncidence)
    # define coordinates for plotting process graph
    coordinates = np.array([
        [2, 1], #env
        [0, 0], #p1
        [0.75, 0], #mx
        [1.5, 0], #me1
        [2, -1], # sp1
        [1.5,-1]] # p2
    )
    pgraph = define_system(iIncidence=iIncidence, coordinates=coordinates, verbose=5)
    pgraph.add_graph_info(verbose=5)
    xMeasured_default = pgraph.xMeasured
    results = get_pareto_front(pgraph, objective="bilinear", verbose=5)

    # use the bilinear pareto front for comparison

    # generate cases where we can tell what is observable in the nonlinear system
    # picking cases where the pressures are known.  


    # plot
    if plot:
        fig = plot_graph(pgraph)
        fig.canvas.draw()
        fig.canvas.flush_events()
        plt.show(block=False)

    constr_nl, varbs_nl = constr_mem_lin()
    jac_nl = jacsym(constr_nl.values(), varbs_nl.values())

    failures = {}
    error_log = {}
    for result in results:
        pgraph.xMeasured = xMeasured_default.copy()
        # # sens, obs, red = evaluate_obs_red(int(result), pgraph)
        # sens, obs, red = get_one_result(pgraph, result, objective="bilinear", verbose=5)

        # # convert to list. in soar -1 means undetermined, right? 
        # obs_soar = np.array([True if s==1 else False for s in obs[:,-1]])
        # red_soar = np.array([True if s==1 else False for s in red[:,-1]])

        # layout = list(np.array(sens).astype(bool))

        error_log[result] = {}
        try: 
            obs_rref, red_rref = observability_redundancy_labelling_rref(jac_lin, layout)
        except Exception as e:
            if result not in error_log.keys():
                error_log[result] = {}
            error_log[result]['rref'] = {
                "error":e, "type":"obs", "method":"rref",
                }
            obs_rref = np.full(len(obs_soar), fill_value=np.nan) 
            red_rref = np.full(len(red_soar), fill_value=np.nan) 

        try: 
            obs_lu, red_lu = observability_redundancy_labelling_lu(jac_lin, layout)
        except Exception as e:
            if result not in error_log.keys():
                error_log[result] = {}
            error_log[result]['lu'] = {
                "error":e, "type":"obs", "method":"lu",
                } 
            # add in a nan array so we can run the other sections
            obs_lu = np.full(len(obs_soar), fill_value=np.nan)  
            red_lu = np.full(len(red_soar), fill_value=np.nan)        


        try: 
            obs_qr, red_qr = observability_redundancy_labelling_qr(jac_lin, layout)
        except Exception as e:
            if result not in error_log.keys():
                error_log[result] = {}
            error_log[result]['qr'] = {
                "error":e, "type":"obs", "method":"qr",
                }     
            obs_qr = np.full(len(obs_soar), fill_value=np.nan) 
            red_qr = np.full(len(red_soar), fill_value=np.nan) 
   

        obs_rref_equ = np.array_equal(obs_rref, obs_soar)
        obs_lu_equ = np.array_equal(obs_lu, obs_soar)
        obs_qr_equ = np.array_equal(obs_qr, obs_soar)

        red_rref_equ = np.array_equal(red_rref, red_soar)
        red_lu_equ = np.array_equal(red_lu, red_soar)
        red_qr_equ = np.array_equal(red_qr, red_soar)

        failures[result] = {}
        if not obs_rref_equ:
            if result not in failures.keys():
                failures[result] = {}
            if "rref" not in error_log[result].keys():
                failures[result]['rref'] = {
                    "obs_method":obs_rref, "obs_soar":obs_soar, "type":"obs", "method":"rref",
                    }
            else: 
                failures[result]["rref"] = "error"
            
        if not obs_lu_equ:
            if result not in failures.keys():
                failures[result] = {}
            if "lu" not in error_log[result].keys():
                failures[result]['lu'] = {
                    "obs_method":obs_lu, "obs_soar":obs_soar, "type":"obs", "method":"lu",
                }
            else: 
                failures[result]['lu'] = "error"
        
        if not obs_qr_equ:
            if result not in failures.keys():
                failures[result] = {}
            if "qr" not in error_log[result].keys():
                failures[result]['qr'] = {
                    "obs_method":obs_qr, "obs_soar":obs_soar, "type":"obs", "method":"qr",
                }
            else:
                failures[result]['qr'] = "error"
        # try:
        #     assert np.array_equal(red_rref,red_soar), "redundancies don't match"
        # except AssertionError as e:
        #     failures[result] = {
        #         "red_method":red_rref, "red_soar":red_soar, "type":"red", "method":"rref"
        #         }

    if len(error_log) > 0:
        err_log = os.path.join(r".\documenting_failures\errors\nonlinear_errors.pkl")
        if not os.path.exists(os.path.dirname(err_log)):
            os.mkdir(os.path.dirname(err_log))
        
        with open(err_log, "wb") as f: 
            pickle.dump(obj=error_log, file=f)
        print(f"errors in {len(error_log)} layouts out of {len(results)}")
    else: 
        print("no errors")

    if len(failures) > 0:
        fail_log = os.path.join(r".\documenting_failures\failures\nonlinear_failures.pkl")
        if not os.path.exists(os.path.dirname(fail_log)):
            os.mkdir(os.path.dirname(fail_log))
        
        with open(fail_log, "wb") as f: 
            pickle.dump(obj=failures, file=f)
        print(f"failures in {len(fail_log)} layouts out of {len(results)}")

    
    else: 
        print("all layouts pass for linear system")


if __name__ == "__main__":
    test_nonlinear_soar()