Commit a9c9fe86 authored by Blais, Chris's avatar Blais, Chris
Browse files

update plotting scripts for paper

parent b6caa712
Loading
Loading
Loading
Loading
+1 −1
Original line number Diff line number Diff line
@@ -54,7 +54,7 @@ def plot_layout_any(layout_plt, layout_name, varbs,
    """
    layout_name: the name of the layout, 'pilot', 'minimal'
    """
    img_folder = os.path.join(module_path, "fig")
    img_folder = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "fig")
    # print(img_folder)
    mem_img_path = os.path.join(img_folder, f"{layout_name}_layout.png")
    mem_img = np.asarray(Image.open(mem_img_path))
+363 −0
Original line number Diff line number Diff line
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.lines import Line2D


def figure_pareto(iObjectives, strName, verbose=0):
    # -------------------------------------------------------------------------
    # SOAR toolbox - fFigurePareto.m
    #
    # This function makes a figure showing the Pareto front.
    #
    # Syntax: 	[bubble_position,xSurplus,iBubbleSize,NB,NF]=
    # fFigurePareto(iObjectives,strName)
    #
    # Inputs:
    # iObjectives     Objective function values for Pareto optimal
    #                       solutions. Row dimension = number of solutions;
    #                       Column dimension:  umber of optimization criteria
    #       strName         String for figure title
    #
    # Outputs:
    # bubble_position     unique positions of the Pareto optimal
    #                           solutions on the Pareto front
    #       xSurplus            Boolean vector indicating whether Pareto point
    #                           is for solutions with (true) or without (false)
    #                           surplus redundancy, with length matching row
    #                           dimension of bubble_position
    #       iBubbleSize         Number of Pareto optimal solutions in Pareto
    #                           front position, with length matching row
    #                           dimension of bubble_position
    #       NB                  Number of unique positions on the Pareto front
    #       NF                  Number of solutions on the Pareto front
    #
    # References:
    # [1] Villez, K., Vanrolleghem, P. A., Corominas, Ll. (in preparation).
    # Optimal placement of flow rate and concentration sensors on wastewater
    # treatment plants.
    # [2] Kretsovalis, A. and Mah, R. S. H. (1988b). Observability and
    # redundancy classification in generalized process networks - II.
    # Algorithms. Comp. Chem. Eng., 12(7), 689-703.
    # [3] Villez, K. (2019). GENOBS* and GENRED* algorithms for observability
    # and redundancy labeling.  (TR-008-01-0). Technical Report, Eawag,
    # Duebendorf, Switzerland.
    #
    # -------------------------------------------------------------------------
    # Latest version: Kris Villez, 2019-12-27
    # -------------------------------------------------------------------------

    # -------------------------------------------------------------------------
    # Copyright 2019-2019 Kris Villez
    #
    # This file is part of the SOAR Toolbox for Matlab/Octave.
    #
    # The SOAR Toolbox is free software: you can redistribute it and/or
    # modify it under the terms of the GNU General Public License as published
    # by the Free Software Foundation, either version 3 of the License, or (at
    # your option) any later version.
    #
    # The SOAR Toolbox is distributed in the hope that it will be useful,
    # but WITHOUT ANY WARRANTY; without even the implied warranty of
    # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General
    # Public License for more details.
    #
    # You should have received a copy of the GNU General Public License along
    # with the SOAR Toolbox. If not, see <http://www.gnu.org/licenses/>.
    # -------------------------------------------------------------------------

    # make custom legend
    mpl_legend_lines = {}
    leg_pt_size = 10

    nCriterion = iObjectives.shape[1]

    # iObjectives = np.sort(iObjectives[:, ::-1], axis=0)[:, ::-1]

    # [bubble_position,ix,iy]=unique(iObjectives,'rows','stable');
    [bubble_position, iBubbleSize] = np.unique(
        iObjectives,
        axis=0,
        return_counts=True,
    )
    if nCriterion >= 3:
        arg = np.lexsort(bubble_position[:, [-1, 0]].T)[::-1]
        bubble_position = bubble_position[arg, :]
        iBubbleSize = iBubbleSize[arg]
        pass

    nBubble = bubble_position.shape[0]
    nObservableMax = np.nanmax(bubble_position[:, 1])
    nCostMax = np.nanmax(bubble_position[:, 0])

    xSurplus = np.zeros(nBubble).astype(bool)
    if nCriterion >= 3:
        nRedundantMax = np.nanmax(bubble_position[:, 2])
        ObsUnique = np.unique(bubble_position[:, 1])
        for iObs in ObsUnique:
            xInclude = bubble_position[:, 1] == iObs
            iDiff = bubble_position[:, 2] - bubble_position[:, 0]
            if np.any(xInclude):
                iDiffMax = np.max(iDiff[xInclude])
                xInclude = np.logical_and(xInclude, iDiff == iDiffMax)
                xCostMin = np.min(bubble_position[xInclude, 0])
                xInclude = np.logical_and(
                    xInclude,
                    bubble_position[:, 0] > xCostMin,
                )
                xSurplus[xInclude] = True
                pass
            pass

        # iAllSensorsRedundant = find(and(bubble_position(:,3)==
        # bubble_position(:,1),bubble_position(:,
        # 2)==nObservableMax));
        # [~,iMin]=min(bubble_position(iAllSensorsRedundant,1));
        # iMinLayoutAllRedundant =
        # iAllSensorsRedundant(iMin) ;
        pass
    else:
        # iMinLayoutAllRedundant = 0
        pass

    if nCriterion >= 3:
        subplot_kw = {"projection": "3d"}
        pass
    else:
        subplot_kw = {}
        pass
    [fig, ax] = plt.subplots(1, 1, subplot_kw=subplot_kw)

    for iBubble in range(nBubble):
        MarkerSize = 5 + 20 * np.sqrt(iBubbleSize[iBubble]) / np.sqrt(
            np.max(iBubbleSize)
        )
        # surplus redundancy
        if xSurplus[iBubble]:
            colour = "red"
            bubble_label = "Surplus Redundancy"
            pass
        else:
            # no sensors
            if bubble_position[iBubble, 0] == 0:
                colour = "k"
                bubble_label = "No Sensors"
                pass
            elif bubble_position[iBubble, 1] == nObservableMax:
                # nsensors = nredundant, all observable
                if (
                    nCriterion >= 3
                    and bubble_position[
                        iBubble,
                        2,
                    ]
                    == bubble_position[
                        iBubble,
                        0,
                    ]
                ):
                    colour = "pink"
                    bubble_label = "Full Obs., Full Red."
                    pass
                # at least 1 redundant sensor, all observable
                elif nCriterion >= 3 and bubble_position[iBubble, 2] > 0:
                    colour = "y"
                    bubble_label = "Full Obs., Partial Red."
                    pass
                # no redundant sensors, all observable 
                else:
                    colour = "b"
                    bubble_label = "Full Obs., No Red."
                    pass
                pass
            else:
                # some redundant, some observable
                if nCriterion >= 3 and bubble_position[iBubble, 2] > 0:
                    colour = "g"
                    bubble_label = "Partial Obs., Partial Red."
                    pass
                # no redundant, some observable
                else:
                    colour = "w"
                    bubble_label = "Partial Obs., No Red."
                    pass
                pass
            pass

        match nCriterion:
            case 2:
                ax.plot(
                    bubble_position[iBubble, 0],
                    bubble_position[iBubble, 1],
                    "o",
                    markersize=MarkerSize,
                    markerfacecolor=colour,
                    color="k",
                )
                pass
            case 3:
                ax.plot(
                    bubble_position[iBubble, 0] * np.ones(2),
                    bubble_position[iBubble, 1] * np.ones(2),
                    np.array([bubble_position[iBubble, 2], 0]),
                    "k-",
                )
                ax.plot(
                    bubble_position[iBubble, 0],
                    bubble_position[iBubble, 1],
                    bubble_position[iBubble, 2],
                    "o",
                    markersize=MarkerSize,
                    markerfacecolor=colour,
                    color="k",
                )
                pass
            case 4:
                ax.plot(
                    bubble_position[iBubble, 0] * np.ones(2),
                    bubble_position[iBubble, 1] * np.ones(2),
                    np.array(
                        [
                            bubble_position[iBubble, 3]
                            / (np.max(bubble_position[iBubble, 0]) + 1e-12),
                            0,
                        ]
                    ),
                    "k-",
                    label=bubble_label,
                )
                ax.plot(
                    bubble_position[iBubble, 0],
                    bubble_position[iBubble, 1],
                    bubble_position[iBubble, 3]
                    / (np.max(bubble_position[iBubble, 0]) + 1e-12),
                    "o",
                    markersize=MarkerSize,
                    markerfacecolor=colour,
                    color="k",
                )
                pass

            case _:
                print("unknown case")
                pass

        pass

        # regardless of 2d/3d, make legend 
        if bubble_label not in mpl_legend_lines.keys():
            mpl_line = Line2D(
                [0],
                [0],
                linestyle='none',
                marker="o",
                markersize=leg_pt_size,
                markerfacecolor=colour,
                markeredgecolor="k",
                label=bubble_label,
            )
            mpl_legend_lines[bubble_label] = mpl_line

    match nCriterion:
        case 2:
            ax.set_xlabel("$f^C$")
            ax.set_ylabel("$f^O$")
            pass
        case item if item in [3, 4]:
            ax.set_xlabel("# Sensors")
            ax.set_ylabel("# Observable variables")

            ax.set_xlim([0, nCostMax + 0.5])
            ax.set_ylim([0, nObservableMax + 0.5])

            if nCriterion == 3:
                ax.set_zlabel("# Redundant sensors")
                ax.set_zlim([0, nRedundantMax])
                ax.view_init(elev=15.0, azim=-220, roll=0)
                ax.plot(
                    [0, 0],
                    [0, nObservableMax],
                    [nRedundantMax, nRedundantMax],
                    "k--",
                )
                ax.plot(
                    [0, 0],
                    [nObservableMax, nObservableMax],
                    [0, nRedundantMax],
                    "k--",
                )
                ax.plot(
                    [0, nCostMax],
                    [nObservableMax, nObservableMax],
                    [nRedundantMax, nRedundantMax],
                    "k--",
                )
                ax.plot(
                    0,
                    nObservableMax,
                    nRedundantMax,
                    "v",
                    color="k",
                    markerfacecolor="w",
                )
                # ax.set_xticks(np.arange(0, nCostMax, 1))
                # yticks = np.arange(0, nObservableMax, 1)
                # ax.set_yticks(yticks)
            else:
                ax.set_zlabel("$f^{C,q}/f^C$")
                ax.set_zlim([0, 1])
                pass
            pass
    

    # setup legend
    # for lab, line in mpl_legend_lines.items():
    #     line.set_label(lab)
    
    handles = list(mpl_legend_lines.values())
    leg_labels = list(mpl_legend_lines.keys())
    ax.legend(
        handles, leg_labels, loc="center right", 
        bbox_to_anchor=(1.8, 0.5)
        )


    xFront = np.ones(nBubble).astype(bool)
    NB = np.sum(xFront)
    NF = np.sum(iBubbleSize[xFront])
    if verbose > 0:
        print("Bubbles without surplus redundancy: " + str(np.sum((~xSurplus))))
        print(
            "Number of layouts without surplus redundancy: "
            + str(np.sum(iBubbleSize[~xSurplus]))
        )
    M = np.hstack(
        [
            bubble_position[~xSurplus, :],
            iBubbleSize[~xSurplus][:, None],
        ]
    )
    M1 = M[:, :3]
    index = np.lexsort(M1.T)
    M = M[index, :]
    if verbose > 0:
        print(M.astype(int))
        print("Bubbles with surplus redundancy: " + str(np.sum(xSurplus)))
        print(
            "Number of layouts with surplus redundancy: "
            + str(np.sum(iBubbleSize[xSurplus]))
        )
    M = np.hstack(
        [
            bubble_position[xSurplus, :],
            iBubbleSize[xSurplus][:, None],
        ]
    )
    index = np.lexsort(M[:, :3].T)
    M = M[index, :]

    if nCriterion > 3:
        ax.view_init(elev=30, azim=-225)
        pass
    ax.set_title(strName)

    return [bubble_position, xSurplus, iBubbleSize, NB, NF, fig, ax]
 No newline at end of file
+46.2 KiB
Loading image diff...
+0 −11
Original line number Diff line number Diff line
%% Cell type:code id:7c8b21a9 tags:

``` python
import math
import numpy as np
import os
import pickle
```

%% Cell type:code id:2487f03f tags:

``` python
def apply_rotation(xy, angle):
    theta = np.radians(angle)
    # Calculate cosine and sine of the angle
    cos_theta = np.cos(theta)
    sin_theta = np.sin(theta)
    x,y = xy

    # Apply the rotation matrix formula
    rx = np.round(x * cos_theta - y * sin_theta)
    ry = np.round(x * sin_theta + y * cos_theta)
    return np.array([rx,ry], dtype=int)
```

%% Cell type:code id:503b4eef tags:

``` python
results_filename = "minimal_bilinear_deficit_red_200_2026_02_10_19_12_08.pkl"
results_path = os.path.join("./test_results", results_filename)
with open(results_path, "rb") as f:
    results = pickle.load(f)
```

%% Cell type:code id:79e51e5c tags:

``` python
analyze_res = {}
for ilayout, res in results.items():
    if not res['rref']['match_obs']:
        analyze_res[ilayout] = res
print(len(analyze_res), len(results))

```

%% Output

    7 4096

%% Cell type:code id:c908c30a tags:

``` python
from PIL import Image
import copy

def plot_layout(result, img_name):
    nsens = 6
    mass_meas = result['soar']['layout'][0:nsens]
    conc_meas = result['soar']['layout'][nsens:]

    mass_soar_obs = result['soar']['obs'][0:nsens]
    conc_soar_obs = result['soar']['obs'][nsens:]

    mass_mrror_obs = result['rref']['obs'][0:nsens]
    conc_mrror_obs = result['rref']['obs'][nsens:]

    # set sensor rotation
    rot = [
        0,0,0,90,0,0,
    ]
    xy_mass = [
        (490,25),
        (1000,25),
        (2250,25),
        (1580,400),
        (2250,765),
        (1000,765),
    ]
    xy_offset = (130,0)

    # Open the main (background) image
    # layout_img_meas = Image.open("./diagrams/minimal_layout_nosensors.png").convert("RGBA")
    layout_img_sensors = Image.open("./diagrams/minimal_layout_nosensors.png").convert("RGBA")
    layout_img_soar = Image.open("./diagrams/minimal_layout_nosensors.png").convert("RGBA")
    layout_img_mrror = Image.open("./diagrams/minimal_layout_nosensors.png").convert("RGBA")

    # Open the images to overlay
    mass_hs = Image.open("./diagrams/massflow_sensor.png").convert("RGBA")
    conc_hs = Image.open("./diagrams/concentration_sensor.png").convert("RGBA")
    mass_ss = Image.open("./diagrams/massflow_sensor_soft.png").convert("RGBA")
    conc_ss = Image.open("./diagrams/concentration_sensor_soft.png").convert("RGBA")

    # Define the coordinates where each overlay will be pasted (top-left corner)
    # Coordinates are a tuple: (x, y)
    for sid, (xym, ro) in enumerate(zip(xy_mass, rot)):
        rot_offset = apply_rotation(xy_offset, ro)
        xyc = (xym[0] + rot_offset[0], xym[1] + rot_offset[1])

        if mass_meas[sid]:
            layout_img_sensors.paste(mass_hs.rotate(ro,expand=True), xym, mass_hs.rotate(ro,expand=True))
            layout_img_soar.paste(mass_hs.rotate(ro,expand=True), xym, mass_hs.rotate(ro,expand=True))
            layout_img_mrror.paste(mass_hs.rotate(ro,expand=True), xym, mass_hs.rotate(ro,expand=True))

        if conc_meas[sid]:
            layout_img_sensors.paste(conc_hs.rotate(ro,expand=True), xyc, conc_hs.rotate(ro,expand=True))
            layout_img_soar.paste(conc_hs.rotate(ro,expand=True), xyc, conc_hs.rotate(ro,expand=True))
            layout_img_mrror.paste(conc_hs.rotate(ro,expand=True), xyc, conc_hs.rotate(ro,expand=True))

        if mass_soar_obs[sid] and not mass_meas[sid]:
            layout_img_soar.paste(mass_ss.rotate(ro,expand=True), xym, mass_ss.rotate(ro,expand=True))

        if conc_soar_obs[sid] and not conc_meas[sid]:
            layout_img_soar.paste(conc_ss.rotate(ro,expand=True), xyc, conc_ss.rotate(ro,expand=True))

        if mass_mrror_obs[sid] and not mass_meas[sid]:
            layout_img_mrror.paste(mass_ss.rotate(ro,expand=True), xym, mass_ss.rotate(ro,expand=True))

        if conc_mrror_obs[sid] and not conc_meas[sid]:
            layout_img_mrror.paste(conc_ss.rotate(ro,expand=True), xyc, conc_ss.rotate(ro,expand=True))

    # Save the final image
    layout_img_sensors.save(f"./plots/layout_{img_name}.png", "PNG")
    layout_img_soar.save(f"./plots/SOAR_obs_{img_name}.png", "PNG")
    layout_img_mrror.save(f"./plots/MRROR_obs_{img_name}.png", "PNG")


for ires, res in analyze_res.items():
    plot_layout(result=res, img_name = f"{str(ires)}_paper")
    break
```

%% Cell type:code id:e55afbc1 tags:

``` python
```

%% Cell type:code id:f696e21f tags:

``` python
```
+581 −80

File changed.

Preview size limit exceeded, changes collapsed.

Loading