Loading documenting_failures/test_case_nonlinear.py +307 −367 Original line number Diff line number Diff line import sympy as sy import pandas as pd import os # import scipy as sp import numpy as np import math import itertools import random import matplotlib.pyplot as plt import os import sys import scipy as sp import copy # local contex # from numerical_labelling.labelling import * from PIL import Image from constraint import Problem # nonlinear variables # constants and physical properties Mww = 18.01528 # Molecular weight of water [kg/kmol] Mwn = 58.44 # Molecular weight of NaCl [kg/kmol] from itertools import product # membrane properties am = 1 Dw = 0.001 Ds = 0.002 cbw = 1 Vw = 1 R = 8.3145 T = 298 Lm = 1 Ks = 1 from sympy.core.numbers import int_valued # partial molar volumes (e-an zan et al 1956) # at 25c (reported cc per mol, convert to m3 per kmol) dv_nacl = 16.8*1e-3 #1e-6 cc to m3, 1e3 mol to kmol dv_h2o = 18.07*1e-3 #just partial molar volume of water # overall constant for permeate (water) c_perm = Mww*am*Dw*cbw*Vw/(R*T*Lm) # overall constant for concentrate (salt) c_conc = Mwn*am*Ds*Ks/Lm gpm_2_m3s = 6.309e-5 # gallons/min to m^3/s module_path = os.path.join(os.path.dirname(os.path.abspath(''))) if module_path not in sys.path: sys.path.insert(0, module_path) else: print("path already in sys.path") in_2_meter = 0.0254 diam = 0.5 * in_2_meter import matplotlib.pyplot as plt from numerical_labelling.labelling import * Acs = math.pi*diam**2/4 Q = 5*gpm_2_m3s #M^3/s V_avg = Q/Acs # m/s mu_h2o = 1.002e-3 # pa*s from math import gcd rho_w = 1000 # kg/m^3 def lcm(a, b): return abs(a*b) // gcd(a, b) Re = rho_w * V_avg * diam / mu_h2o def get_consts(cfa, cfb): lcm_value = lcm(cfa, cfb) const_1 = lcm_value / cfa const_2 = lcm_value / cfb # membrane friction loss coefficient K_mem = 0.50467 K_split = 0.5 K_mix = 0.5 return const_1, const_2 # t-junction friction loss coefficient (assume they are all the same for now) K_tj = 1 data_path = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "data") def get_varbs_in_eq(equation_list, all_constr): """ equation_list: list of ids in all constr you want to use """ varb_list = [] for iconstr in equation_list: constr = all_constr[iconstr] symbs = [symbol.name for symbol in list(constr.free_symbols)] for symb in symbs: if symb not in varb_list: varb_list.append(symb) varb_list = sorted(varb_list) return varb_list def create_constraint_function(equation, condition_func=None, debug=False): """ from gitlab duo, but it is pretty straightforward function factory Dynamically create a constraint function from a sympy equation pump_curve_path = os.path.join(data_path, "ro_pump_curve_data.csv") pump_curve = pd.read_csv(pump_curve_path) pump_curve.columns = ["flow_rate_gpm", "pressure_psi"] Args: equation: sympy expression condition_func: function that takes the equation result and returns bool defaults to checking if result == 0 """ # Get free variables and sort them for consistent ordering free_vars = sorted(equation.free_symbols, key=str) var_names = [str(var) for var in free_vars] if condition_func is None: condition_func = lambda result: abs(float(result)) <= 1e-10 # near zero def constraint_func(*values): if len(values) != len(free_vars): raise ValueError(f"Expected {len(free_vars)} values, got {len(values)}") substitution_dict = dict(zip(free_vars, values)) try: result = equation.subs(substitution_dict) if debug: print(substitution_dict, result, condition_func(result)) return condition_func(result) except Exception as e: print("Exception occurred:", e) return False # Store metadata for debugging constraint_func.equation = equation constraint_func.variables = var_names constraint_func.free_symbols = free_vars return constraint_func, var_names def add_sltn(sltn_dict, varb_name, constr): """add a solved equation to sltn dict example: varb_name = x3 constr = m1x1 + m2x2 - m3x3 sltn_dict.update({'x3': (m1x1 + m2x2)/m3}) """ sltn = sy.solve(constr, varb_name) if len(sltn) !=1: raise ValueError(f"multiple solutions when trying to solve for {varb_name}") # unit conversion pump_curve["flow_rate_m3_s"] = pump_curve["flow_rate_gpm"] * 6.30902e-5 # Convert GPM to m³/s pump_curve["pressure_pa"] = pump_curve["pressure_psi"] * 6894.76 # Convert PSI to Pa"] pump_curve["pressure_Mpa"] = pump_curve["pressure_pa"]*1e-6 # Convert Pa to MPa if varb_name in sltn_dict.keys(): print(f"variable {varb_name} in eq {constr} already solved for in eq {sltn_dict[varb_name]}") # raise KeyError(f"{varb_name} already solved for") sltn_dict.update({varb_name:sltn[0]}) m_pump, n_pump, b_pump = np.polyfit(pump_curve['flow_rate_m3_s'], pump_curve['pressure_pa'], 2) x_poly = np.linspace(min(pump_curve['flow_rate_m3_s']), max(pump_curve['flow_rate_m3_s']), 100) y_poly = m_pump * x_poly**2 + n_pump * x_poly + b_pump return sltn_dict def add_constraint(constr_dict, eqn): Loading @@ -86,341 +116,251 @@ def add_constraint(constr_dict, eqn): constr_dict[indx] = eqn return constr_dict def solve_constr(constr, var:str, subs = {}): def constr_split( min, mout1, mout2, xin, xout1, xout2, pin, pout1, pout2, constr, sltns=None, ): """ solve an equation for a single variable constr: the constraint equation (sympy equation) var: the variable to be solved for subs: dictionary of numeric values to be substituted for symbolic constraint for splitter unit """ if var in subs.keys(): subs_new = copy.deepcopy(subs) subs_new.pop(var) else: subs_new = subs constr_sub = constr.subs(subs_new) eq = sy.Eq(constr_sub, 0) ans = sy.solve(eq, var) if len(ans) == 1: return ans[0] else: raise ValueError(f"multiple answers detected: {ans}") constr_hydraulic = min - mout1 - mout2 constr_eq_conc1 = xin - xout1 constr_eq_conc2 = xin - xout2 constr_press1 = pin - pout1 constr_press2 = pin - pout2 constr = add_constraint(constr, constr_hydraulic) constr = add_constraint(constr, constr_eq_conc1) constr = add_constraint(constr, constr_eq_conc2) constr = add_constraint(constr, constr_press1) constr = add_constraint(constr, constr_press2) if sltns is not None: sltns = add_sltn(sltns, xout1.name, constr_eq_conc1) sltns = add_sltn(sltns, xout2.name, constr_eq_conc2) sltns = add_sltn(sltns, pout1.name, constr_press1) sltns = add_sltn(sltns, pout2.name, constr_press2) return constr, sltns return constr # def add_pump_constraint(constr_dict, ): def get_density(x_n, simple = False): def constr_mix( min1, min2, mout, xin1, xin2, xout, pin1, pin2, pout, constr, sltns=None ): """ get the density for a stream given the mass fraction of salt x_n: mass fraction of salt returns: density of stream constraint for 2 pipes coming together """ if simple: rho = rho_w else: rho = 1/((((1 - x_n)/Mww)*dv_h2o + (x_n/Mwn)*dv_nacl)) constr_hydraulic = min1 + min2 - mout constr_component = min1*xin1 + min2*xin2 - mout*xout constr_press1 = pin1 - pout constr_press2 = pin2 - pout constr = add_constraint(constr, constr_hydraulic) constr = add_constraint(constr, constr_component) constr = add_constraint(constr, constr_press1) constr = add_constraint(constr, constr_press2) if sltns is not None: sltns = add_sltn(sltns, xout.name, constr_component) sltns = add_sltn(sltns, pout.name, constr_press1) sltns = add_sltn(sltns, pin2.name, constr_press2) return constr, sltns return constr return rho def get_concentration(x_n, simple = False): def constr_mex( mfeed, mperm, mret, xfeed,xperm, xret, pfeed, pperm, pret, a, ap, app, b, bp, c, cp, constr, sltns = None ): """ get the concentration (moles/volume) for ro calculations x_n: mass fraction of salt M_all: overall mass flowrate a, ap, app are A, A', and A'' for the osmotic pressure constraint b, bp are B, B' for the diffusion conc constraint c is constant in major loss constraint. don't multiply by masses b/c they are already squared """ rho = get_density(x_n, simple=simple) c_n = (x_n*rho)/(Mwn) # this is not correct, check expression (M_all ) return c_n constr_hyd = mfeed - mperm - mret constr_component = xfeed*mfeed -xperm*mperm -xret*mret constr_osm_press = a*mperm - ap*((pfeed - pperm) - app*(xfeed-xperm)) constr_diff_conc = b*mperm*xperm - bp*(xfeed-xperm) # xperm may be very small, but for integer example maybe it's better to keep constr_major_loss = c*(pfeed - pret) - cp*(mfeed+mret)**2 # major loss from feed to concentrate side. constr = add_constraint(constr, constr_hyd) constr = add_constraint(constr, constr_component) constr = add_constraint(constr, constr_osm_press) constr = add_constraint(constr, constr_diff_conc) constr = add_constraint(constr, constr_major_loss) # return solutions for the membrane constraint equations if sltns is not None: sltns = add_sltn(sltns, xperm.name, constr_diff_conc) sltns = add_sltn(sltns, pperm.name, constr_osm_press) sltns = add_sltn(sltns, pret.name, constr_major_loss) return constr, sltns def constr_mem(M_in, M_out, x_in, x_out, p_in, p_out, simple=False): """ M_in: mass flow rate in M_out: mass flow rate out (permeate) x_in: mass fraction salt in x_out: mass fraction salt out (permeate) p_in: pressure at membrane feed p_out: pressure at membrane exit (permeate) returns: the two membrane constraints """ c_in = get_concentration(x_in, simple=simple) c_out = get_concentration(x_out, simple=simple) pi_in = c_in*R*T pi_out = c_out*R*T dPi = pi_in - pi_out dP = p_in - p_out return constr constr_1 = M_in - c_perm*(dP - dPi) constr_2 = x_in*M_in - c_conc*(c_in-c_out) return constr_1, constr_2 def constr_pump(m_in, x_in, m_out, p_in, p_out, simple=False, get_dp = False): # pump1 pressure increase (streams 1 and 2) # need volume flow rate rho_pump_1 = get_density(x_in, simple=simple) # rho_pump_1 = rho_w vf = m_in/rho_pump_1 pump_dp_1 = (p_out - p_in) poly_fit_1 = m_pump*vf**2 + n_pump*vf + b_pump def constr_pump( min, mout, xin, xout, pin, pout, E, H, G, J, constr, sltns=None, ): """ E: second order coefficient (Ex^2) H: first order coefficient (Hx) G: intercept for pump curve J: unit conversion for pressure (arbitrary) """ constr_mass = mout - min constr_conc = xout - xin constr_pump = J*(pout - pin) - (E*min**2 + H*min + G) constr = add_constraint(constr, constr_mass) constr = add_constraint(constr, constr_conc) constr = add_constraint(constr, constr_pump) if sltns is not None: sltns = add_sltn(sltns, xout.name, constr_conc) sltns = add_sltn(sltns, pout.name, constr_pump) return constr, sltns return constr constr = pump_dp_1 - poly_fit_1 def constr_valve( min, mout, xin, xout, pin, pout, K, L, constr, sltns= None, ): # # see if multiple a, b, c, and ds defined # A, B, C, D = sy.symbols("A, B, C, D") # varb_symbs = [A, B, C, D] # labels = [ "A", "B", "C", "D"] # varbs.update(dict(zip(labels, varb_symbs))) constr_mass = mout - min constr_conc = xout - xin constr_major_loss = L*(pin - pout) - K*(min)**2 # major loss from feed to concentrate side. constr = add_constraint(constr, constr_mass) constr = add_constraint(constr, constr_conc) constr = add_constraint(constr, constr_major_loss) if sltns is not None: sltns = add_sltn(sltns, xout.name, constr_conc) sltns = add_sltn(sltns, pout.name, constr_major_loss) return constr, sltns return constr def constr_fitting(m_in, m_out, x_in, p_in, p_out, K, simple=False): def constr_mem_nl(): # friction loss over pipe fitting mavg = ((m_in + m_out)/2) rho = get_density(x_in) constr = (p_in - p_out) - K*(mavg**2/2*rho**2) return constr m1, m2, m3, m4, m5, m6, m7, m8 = sy.symbols( "m1 m2 m3 m4 m5 m6 m7 m8" ) x1, x2, x3, x4, x5, x6, x7, x8 = sy.symbols( "x1 x2 x3 x4 x5 x6 x7 x8" ) p1, p2, p3, p4, p5, p6, p7, p8 = sy.symbols( "p1 p2 p3 p4 p5 p6 p7 p8" ) varbs_list_pil = [ m1, m2, m3, m4, m5, m6, m7, m8, x1, x2, x3, x4, x5, x6, x7, x8, p1, p2, p3, p4, p5, p6, p7, p8, ] varbs_pil = {varb.name:varb for varb in varbs_list_pil} # membrane constants a1, a1p, a1pp = sy.symbols( 'a1 a1p a1pp' ) b1, b1p = sy.symbols( 'b1 b1p' ) c1, c1p = sy.symbols( 'c1 c1p' ) def constr_fitting_mem(m_in, m_out, x_in, x_out, p_in, p_out, K_mem, simple=False): """ need to take average """ # friction loss over fitting mavg = ((m_in + m_out)/2) if simple: rhoavg = get_density(x_in, simple=simple) else: rho_in = get_density(x_in, simple=simple) rho_out = get_density(x_out, simple=simple) rhoavg = ((rho_in + rho_out)/2) # pump constants e1, h1, g1, j1, e2, h2, g2, j2 = sy.symbols("e1 h1 g1 j1 e2 h2 g2 j2") constr = (p_in - p_out) - K_mem*(mavg**2/2*rhoavg**2) coeff_pil_list = [ a1, a1p, a1pp, b1, b1p, c1, c1p, e1, h1, g1, j1, e2, h2, g2, j2, ] return constr coeff_pil = {coeff.name:coeff for coeff in coeff_pil_list} # generate constraints def constr_mem_nl(simple=False, subf = False): """ constraint equations for membrane example """ all_varbs_pil = {**varbs_pil, **coeff_pil} m1, m2, m3, m4, m5, m6, m7, m8 = sy.symbols('m1 m2 m3 m4 m5 m6 m7 m8') # overall mass flow rates f1, f2, f3, f4, f5, f6, f7, f8 = sy.symbols('f1 f2 f3 f4 f5 f6 f7 f8') # component flow rates x1, x2, x3, x4, x5, x6, x7, x8 = sy.symbols('x1 x2 x3 x4 x5 x6 x7 x8') # component mass fractions p1, p2, p3, p4, p5, p6, p7, p8 = sy.symbols('p1 p2 p3 p4 p5 p6 p7 p8') # pressures varbs = [ m1, m2, m3, m4, m5, m6, m7, m8, x1, x2, x3, x4, x5, x6, x7, x8, f1, f2, f3, f4, f5, f6, f7, f8, p1, p2, p3, p4, p5, p6, p7, p8, ] var_labels = [ 'M1', 'M2', 'M3', 'M4', 'M5', 'M6', 'M7', 'M8', 'X1', 'X2', 'X3', 'X4', 'X5', 'X6', 'X7', 'X8', 'F1', 'F2', 'F3', 'F4', 'F5', 'F6', 'F7', 'F8', 'P1', 'P2', 'P3', 'P4', 'P5', 'P6', 'P7', 'P8', all_label_list = [ 'm1', 'm2', 'm3', 'm4', 'm5', 'm6', 'm7', 'm8', 'x1', 'x2', 'x3', 'x4', 'x5', 'x6', 'x7', 'x8', 'f1', 'f2', 'f3', 'f4', 'f5', 'f6', 'f7', 'f8', 'p1', 'p2', 'p3', 'p4', 'p5', 'p6', 'p7','p8', 'a1', 'a1p', 'a1pp', 'b1', 'b1p', 'c1', 'c1p', 'e1', 'h1', 'g1', 'j1', 'e2', 'f2', 'g2', 'j2' ] # Create a dictionary to store all variables and their values var_dict = dict(zip(var_labels, varbs)) constr = {} # mass balance constr = add_constraint(constr, m1 - m2) constr = add_constraint(constr, m2 - m3 + m8) constr = add_constraint(constr, m3 - m4 - m5) constr = add_constraint(constr, m5 - m6 - m7) constr = add_constraint(constr, m7 - m8) # component balance constr = add_constraint(constr, f1 - f2) constr = add_constraint(constr, f2 - f3 + f8) constr = add_constraint(constr, f3 - f4 - f5) constr = add_constraint(constr, f5 - f6 - f7) constr = add_constraint(constr, f7 - f8) # solve for f constr = add_constraint(constr, f1 - m1 * x1) constr = add_constraint(constr, f2 - m2 * x2) constr = add_constraint(constr, f3 - m3 * x3) constr = add_constraint(constr, f4 - m4 * x4) constr = add_constraint(constr, f5 - m5 * x5) constr = add_constraint(constr, f6 - m6 * x6) constr = add_constraint(constr, f7 - m7 * x7) constr = add_constraint(constr, f8 - m8 * x8) # equal concentration constr = add_constraint(constr, x5 - x6) constr = add_constraint(constr, x5 - x7) mem_constrs = constr_mem( M_in = m3, M_out = m4, x_in = x3, x_out = x4, p_in = p3, p_out = p4, simple = simple, # setup pilot constraints constr_pil = {} sltns_pil = {} constr_pil, sltns_pil = constr_pump( min = m1, mout=m2, xin=x1, xout = x2, pin=p1, pout=p2, E=e1, H = h1, G = g1, J= j1, constr = constr_pil, sltns=sltns_pil, ) for constr_eq in mem_constrs: constr = add_constraint(constr, constr_eq) # pump1 constraints (s1 in, s2 out) pump_constr_1 = constr_pump( m_in=m1, x_in=x1, m_out=m2, p_in=p1, p_out=p2, simple=simple constr_pil, sltns_pil = constr_mix( min1=m2, min2=m8, mout=m3, xin1=x2, xin2=x8, xout=x3, pin1=p2, pin2=p8, pout=p3, constr=constr_pil, sltns=sltns_pil, ) constr = add_constraint(constr, pump_constr_1) # pump2 constraints (s7 in, s8 out) pump_constr_2 = constr_pump( m_in=m7, x_in=x7, m_out=m8, p_in=p7, p_out=p8, simple=simple # membrane 1 constr_pil, sltns_pil = constr_mex( mfeed=m3, mperm=m4, mret=m5, xfeed=x3, xperm=x4, xret=x5, pfeed=p3, pperm=p4, pret=p5, a=a1, ap=a1p, app=a1pp, b=b1, bp=b1p, c=c1, cp=c1p, constr=constr_pil, sltns=sltns_pil, ) constr = add_constraint(constr, pump_constr_2) # friction loss over membrane (streams 3-5) mem_fric_constr = constr_fitting_mem( m_in=m3, m_out=m5, x_in=m3, x_out=m5, p_in=p3, p_out=p5, K_mem=K_mem, simple=simple, # split point constr_pil, sltns_pil = constr_split( min=m5, mout1=m6, mout2=m7, xin=x5, xout1=x6, xout2=x7, pin=p5, pout1=p6, pout2=p7, constr=constr_pil, sltns=sltns_pil ) constr_pil, sltns_pil = constr_pump( min = m7, mout=m8, xin=x7, xout = x8, pin=p7, pout=p8, E=e2, H = h2, G = g2, J= j2, constr = constr_pil, sltns=sltns_pil, ) constr = add_constraint(constr, mem_fric_constr) # remaining friction losses are equivalent (add friction loss coeff later) constr = add_constraint(constr, p2 - p3) constr = add_constraint(constr, p8 - p2) constr = add_constraint(constr, p5 - p6) constr = add_constraint(constr, p5 - p7) if subf: f_dict = { "f1":x1*m1, "f2":x2*m2, "f3":x3*m3, "f4":x4*m4, "f5":x5*m5, "f6":x6*m6, "f7":x7*m7, "f8":x8*m8, } constr_subf = constr.copy() for i, iconstr in constr.items(): constr[i] = iconstr.subs(f_dict) varbs = [ m1, m2, m3, m4, m5, m6, m7, m8, x1, x2, x3, x4, x5, x6, x7, x8, p1, p2, p3, p4, p5, p6, p7, p8, ] var_labels = [ 'M1', 'M2', 'M3', 'M4', 'M5', 'M6', 'M7', 'M8', 'X1', 'X2', 'X3', 'X4', 'X5', 'X6', 'X7', 'X8', 'P1', 'P2', 'P3', 'P4', 'P5', 'P6', 'P7', 'P8', ] # Create a dictionary to store all variables and their values var_dict = dict(zip(var_labels, varbs)) return constr, var_dict def get_x0_nl(m1 = 0.2, x1 = 0.2, p1 = 1e5, simple=False): # nonlinear operating point: # for failure case make recycle really large, other streams very small constr, _ = constr_mem_nl(simple=simple) M1 = m1 M8 = 1.1 M2 = M1 M3 = M2 + M8 M4 = 0.1 M5 = M3 - M4 M6 = 0.1 M7 = M8 X1 = x1 X2 = x1 X3 = 0.01 X4 = 0.01 X5 = 0.01 X6 = 0.01 X7 = 0.01 X8 = 0.01 F1 = M1*X1 F2 = M2*X2 F3 = M3*X3 F4 = M4*X4 F5 = M5*X5 F6 = M6*X6 F7 = M7*X7 F8 = M8*X8 x0_nl = { 'm1': M1, 'm2': M2, 'm3': M3, 'm4': M4, 'm5': M5, 'm6': M6, 'm7': M7, 'm8': M8, 'f1': F1, 'f2': F2, 'f3': F3, 'f4': F4, 'f5': F5, 'f6': F6, 'f7': F7, 'f8': F8, 'x1': X1, 'x2': X2, 'x3': X3, 'x4': X4, 'x5': X5, 'x6': X6, 'x7': X7, 'x8': X8, } # pressure constraints P1 = p1 x0_nl["p1"] = P1 # NOTE: constraint numbering is hardcoded. picking which constraint # corresponds to which is not addressed for the operating point # pump 1 constraint eq P2 = solve_constr(constr = constr[22], var="p2", subs=x0_nl ) P2 = float(P2) x0_nl["p2"] = P2 P3 = P2 x0_nl["p3"] = P3 P4 = solve_constr(constr = constr[20], var="p4", subs=x0_nl) P4 = float(P4) x0_nl["p4"] = P4 P5 = solve_constr(constr = constr[24], var="p5", subs=x0_nl) x0_nl["p5"] = P5 P6 = P5 x0_nl["p6"] = P6 P7 = P5 x0_nl["p7"] = P7 P8 = solve_constr(constr = constr[23], var="p8", subs=x0_nl) x0_nl["p8"] = P8 # # resolve the mass flow out, concentration out # x0_nl_x4 = copy.deepcopy(x0_nl) # x0_nl_x4.pop("x4") M4 = solve_constr(constr[21], var="m4", subs=x0_nl) # # resolve the mass flow out, concentration out # x0_nl_x4 = copy.deepcopy(x0_nl) # x0_nl_x4.pop("x4") X4 = solve_constr(constr[21], var="x4", subs=x0_nl) return x0_nl return constr_pil, all_varbs_pil Loading
documenting_failures/test_case_nonlinear.py +307 −367 Original line number Diff line number Diff line import sympy as sy import pandas as pd import os # import scipy as sp import numpy as np import math import itertools import random import matplotlib.pyplot as plt import os import sys import scipy as sp import copy # local contex # from numerical_labelling.labelling import * from PIL import Image from constraint import Problem # nonlinear variables # constants and physical properties Mww = 18.01528 # Molecular weight of water [kg/kmol] Mwn = 58.44 # Molecular weight of NaCl [kg/kmol] from itertools import product # membrane properties am = 1 Dw = 0.001 Ds = 0.002 cbw = 1 Vw = 1 R = 8.3145 T = 298 Lm = 1 Ks = 1 from sympy.core.numbers import int_valued # partial molar volumes (e-an zan et al 1956) # at 25c (reported cc per mol, convert to m3 per kmol) dv_nacl = 16.8*1e-3 #1e-6 cc to m3, 1e3 mol to kmol dv_h2o = 18.07*1e-3 #just partial molar volume of water # overall constant for permeate (water) c_perm = Mww*am*Dw*cbw*Vw/(R*T*Lm) # overall constant for concentrate (salt) c_conc = Mwn*am*Ds*Ks/Lm gpm_2_m3s = 6.309e-5 # gallons/min to m^3/s module_path = os.path.join(os.path.dirname(os.path.abspath(''))) if module_path not in sys.path: sys.path.insert(0, module_path) else: print("path already in sys.path") in_2_meter = 0.0254 diam = 0.5 * in_2_meter import matplotlib.pyplot as plt from numerical_labelling.labelling import * Acs = math.pi*diam**2/4 Q = 5*gpm_2_m3s #M^3/s V_avg = Q/Acs # m/s mu_h2o = 1.002e-3 # pa*s from math import gcd rho_w = 1000 # kg/m^3 def lcm(a, b): return abs(a*b) // gcd(a, b) Re = rho_w * V_avg * diam / mu_h2o def get_consts(cfa, cfb): lcm_value = lcm(cfa, cfb) const_1 = lcm_value / cfa const_2 = lcm_value / cfb # membrane friction loss coefficient K_mem = 0.50467 K_split = 0.5 K_mix = 0.5 return const_1, const_2 # t-junction friction loss coefficient (assume they are all the same for now) K_tj = 1 data_path = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "data") def get_varbs_in_eq(equation_list, all_constr): """ equation_list: list of ids in all constr you want to use """ varb_list = [] for iconstr in equation_list: constr = all_constr[iconstr] symbs = [symbol.name for symbol in list(constr.free_symbols)] for symb in symbs: if symb not in varb_list: varb_list.append(symb) varb_list = sorted(varb_list) return varb_list def create_constraint_function(equation, condition_func=None, debug=False): """ from gitlab duo, but it is pretty straightforward function factory Dynamically create a constraint function from a sympy equation pump_curve_path = os.path.join(data_path, "ro_pump_curve_data.csv") pump_curve = pd.read_csv(pump_curve_path) pump_curve.columns = ["flow_rate_gpm", "pressure_psi"] Args: equation: sympy expression condition_func: function that takes the equation result and returns bool defaults to checking if result == 0 """ # Get free variables and sort them for consistent ordering free_vars = sorted(equation.free_symbols, key=str) var_names = [str(var) for var in free_vars] if condition_func is None: condition_func = lambda result: abs(float(result)) <= 1e-10 # near zero def constraint_func(*values): if len(values) != len(free_vars): raise ValueError(f"Expected {len(free_vars)} values, got {len(values)}") substitution_dict = dict(zip(free_vars, values)) try: result = equation.subs(substitution_dict) if debug: print(substitution_dict, result, condition_func(result)) return condition_func(result) except Exception as e: print("Exception occurred:", e) return False # Store metadata for debugging constraint_func.equation = equation constraint_func.variables = var_names constraint_func.free_symbols = free_vars return constraint_func, var_names def add_sltn(sltn_dict, varb_name, constr): """add a solved equation to sltn dict example: varb_name = x3 constr = m1x1 + m2x2 - m3x3 sltn_dict.update({'x3': (m1x1 + m2x2)/m3}) """ sltn = sy.solve(constr, varb_name) if len(sltn) !=1: raise ValueError(f"multiple solutions when trying to solve for {varb_name}") # unit conversion pump_curve["flow_rate_m3_s"] = pump_curve["flow_rate_gpm"] * 6.30902e-5 # Convert GPM to m³/s pump_curve["pressure_pa"] = pump_curve["pressure_psi"] * 6894.76 # Convert PSI to Pa"] pump_curve["pressure_Mpa"] = pump_curve["pressure_pa"]*1e-6 # Convert Pa to MPa if varb_name in sltn_dict.keys(): print(f"variable {varb_name} in eq {constr} already solved for in eq {sltn_dict[varb_name]}") # raise KeyError(f"{varb_name} already solved for") sltn_dict.update({varb_name:sltn[0]}) m_pump, n_pump, b_pump = np.polyfit(pump_curve['flow_rate_m3_s'], pump_curve['pressure_pa'], 2) x_poly = np.linspace(min(pump_curve['flow_rate_m3_s']), max(pump_curve['flow_rate_m3_s']), 100) y_poly = m_pump * x_poly**2 + n_pump * x_poly + b_pump return sltn_dict def add_constraint(constr_dict, eqn): Loading @@ -86,341 +116,251 @@ def add_constraint(constr_dict, eqn): constr_dict[indx] = eqn return constr_dict def solve_constr(constr, var:str, subs = {}): def constr_split( min, mout1, mout2, xin, xout1, xout2, pin, pout1, pout2, constr, sltns=None, ): """ solve an equation for a single variable constr: the constraint equation (sympy equation) var: the variable to be solved for subs: dictionary of numeric values to be substituted for symbolic constraint for splitter unit """ if var in subs.keys(): subs_new = copy.deepcopy(subs) subs_new.pop(var) else: subs_new = subs constr_sub = constr.subs(subs_new) eq = sy.Eq(constr_sub, 0) ans = sy.solve(eq, var) if len(ans) == 1: return ans[0] else: raise ValueError(f"multiple answers detected: {ans}") constr_hydraulic = min - mout1 - mout2 constr_eq_conc1 = xin - xout1 constr_eq_conc2 = xin - xout2 constr_press1 = pin - pout1 constr_press2 = pin - pout2 constr = add_constraint(constr, constr_hydraulic) constr = add_constraint(constr, constr_eq_conc1) constr = add_constraint(constr, constr_eq_conc2) constr = add_constraint(constr, constr_press1) constr = add_constraint(constr, constr_press2) if sltns is not None: sltns = add_sltn(sltns, xout1.name, constr_eq_conc1) sltns = add_sltn(sltns, xout2.name, constr_eq_conc2) sltns = add_sltn(sltns, pout1.name, constr_press1) sltns = add_sltn(sltns, pout2.name, constr_press2) return constr, sltns return constr # def add_pump_constraint(constr_dict, ): def get_density(x_n, simple = False): def constr_mix( min1, min2, mout, xin1, xin2, xout, pin1, pin2, pout, constr, sltns=None ): """ get the density for a stream given the mass fraction of salt x_n: mass fraction of salt returns: density of stream constraint for 2 pipes coming together """ if simple: rho = rho_w else: rho = 1/((((1 - x_n)/Mww)*dv_h2o + (x_n/Mwn)*dv_nacl)) constr_hydraulic = min1 + min2 - mout constr_component = min1*xin1 + min2*xin2 - mout*xout constr_press1 = pin1 - pout constr_press2 = pin2 - pout constr = add_constraint(constr, constr_hydraulic) constr = add_constraint(constr, constr_component) constr = add_constraint(constr, constr_press1) constr = add_constraint(constr, constr_press2) if sltns is not None: sltns = add_sltn(sltns, xout.name, constr_component) sltns = add_sltn(sltns, pout.name, constr_press1) sltns = add_sltn(sltns, pin2.name, constr_press2) return constr, sltns return constr return rho def get_concentration(x_n, simple = False): def constr_mex( mfeed, mperm, mret, xfeed,xperm, xret, pfeed, pperm, pret, a, ap, app, b, bp, c, cp, constr, sltns = None ): """ get the concentration (moles/volume) for ro calculations x_n: mass fraction of salt M_all: overall mass flowrate a, ap, app are A, A', and A'' for the osmotic pressure constraint b, bp are B, B' for the diffusion conc constraint c is constant in major loss constraint. don't multiply by masses b/c they are already squared """ rho = get_density(x_n, simple=simple) c_n = (x_n*rho)/(Mwn) # this is not correct, check expression (M_all ) return c_n constr_hyd = mfeed - mperm - mret constr_component = xfeed*mfeed -xperm*mperm -xret*mret constr_osm_press = a*mperm - ap*((pfeed - pperm) - app*(xfeed-xperm)) constr_diff_conc = b*mperm*xperm - bp*(xfeed-xperm) # xperm may be very small, but for integer example maybe it's better to keep constr_major_loss = c*(pfeed - pret) - cp*(mfeed+mret)**2 # major loss from feed to concentrate side. constr = add_constraint(constr, constr_hyd) constr = add_constraint(constr, constr_component) constr = add_constraint(constr, constr_osm_press) constr = add_constraint(constr, constr_diff_conc) constr = add_constraint(constr, constr_major_loss) # return solutions for the membrane constraint equations if sltns is not None: sltns = add_sltn(sltns, xperm.name, constr_diff_conc) sltns = add_sltn(sltns, pperm.name, constr_osm_press) sltns = add_sltn(sltns, pret.name, constr_major_loss) return constr, sltns def constr_mem(M_in, M_out, x_in, x_out, p_in, p_out, simple=False): """ M_in: mass flow rate in M_out: mass flow rate out (permeate) x_in: mass fraction salt in x_out: mass fraction salt out (permeate) p_in: pressure at membrane feed p_out: pressure at membrane exit (permeate) returns: the two membrane constraints """ c_in = get_concentration(x_in, simple=simple) c_out = get_concentration(x_out, simple=simple) pi_in = c_in*R*T pi_out = c_out*R*T dPi = pi_in - pi_out dP = p_in - p_out return constr constr_1 = M_in - c_perm*(dP - dPi) constr_2 = x_in*M_in - c_conc*(c_in-c_out) return constr_1, constr_2 def constr_pump(m_in, x_in, m_out, p_in, p_out, simple=False, get_dp = False): # pump1 pressure increase (streams 1 and 2) # need volume flow rate rho_pump_1 = get_density(x_in, simple=simple) # rho_pump_1 = rho_w vf = m_in/rho_pump_1 pump_dp_1 = (p_out - p_in) poly_fit_1 = m_pump*vf**2 + n_pump*vf + b_pump def constr_pump( min, mout, xin, xout, pin, pout, E, H, G, J, constr, sltns=None, ): """ E: second order coefficient (Ex^2) H: first order coefficient (Hx) G: intercept for pump curve J: unit conversion for pressure (arbitrary) """ constr_mass = mout - min constr_conc = xout - xin constr_pump = J*(pout - pin) - (E*min**2 + H*min + G) constr = add_constraint(constr, constr_mass) constr = add_constraint(constr, constr_conc) constr = add_constraint(constr, constr_pump) if sltns is not None: sltns = add_sltn(sltns, xout.name, constr_conc) sltns = add_sltn(sltns, pout.name, constr_pump) return constr, sltns return constr constr = pump_dp_1 - poly_fit_1 def constr_valve( min, mout, xin, xout, pin, pout, K, L, constr, sltns= None, ): # # see if multiple a, b, c, and ds defined # A, B, C, D = sy.symbols("A, B, C, D") # varb_symbs = [A, B, C, D] # labels = [ "A", "B", "C", "D"] # varbs.update(dict(zip(labels, varb_symbs))) constr_mass = mout - min constr_conc = xout - xin constr_major_loss = L*(pin - pout) - K*(min)**2 # major loss from feed to concentrate side. constr = add_constraint(constr, constr_mass) constr = add_constraint(constr, constr_conc) constr = add_constraint(constr, constr_major_loss) if sltns is not None: sltns = add_sltn(sltns, xout.name, constr_conc) sltns = add_sltn(sltns, pout.name, constr_major_loss) return constr, sltns return constr def constr_fitting(m_in, m_out, x_in, p_in, p_out, K, simple=False): def constr_mem_nl(): # friction loss over pipe fitting mavg = ((m_in + m_out)/2) rho = get_density(x_in) constr = (p_in - p_out) - K*(mavg**2/2*rho**2) return constr m1, m2, m3, m4, m5, m6, m7, m8 = sy.symbols( "m1 m2 m3 m4 m5 m6 m7 m8" ) x1, x2, x3, x4, x5, x6, x7, x8 = sy.symbols( "x1 x2 x3 x4 x5 x6 x7 x8" ) p1, p2, p3, p4, p5, p6, p7, p8 = sy.symbols( "p1 p2 p3 p4 p5 p6 p7 p8" ) varbs_list_pil = [ m1, m2, m3, m4, m5, m6, m7, m8, x1, x2, x3, x4, x5, x6, x7, x8, p1, p2, p3, p4, p5, p6, p7, p8, ] varbs_pil = {varb.name:varb for varb in varbs_list_pil} # membrane constants a1, a1p, a1pp = sy.symbols( 'a1 a1p a1pp' ) b1, b1p = sy.symbols( 'b1 b1p' ) c1, c1p = sy.symbols( 'c1 c1p' ) def constr_fitting_mem(m_in, m_out, x_in, x_out, p_in, p_out, K_mem, simple=False): """ need to take average """ # friction loss over fitting mavg = ((m_in + m_out)/2) if simple: rhoavg = get_density(x_in, simple=simple) else: rho_in = get_density(x_in, simple=simple) rho_out = get_density(x_out, simple=simple) rhoavg = ((rho_in + rho_out)/2) # pump constants e1, h1, g1, j1, e2, h2, g2, j2 = sy.symbols("e1 h1 g1 j1 e2 h2 g2 j2") constr = (p_in - p_out) - K_mem*(mavg**2/2*rhoavg**2) coeff_pil_list = [ a1, a1p, a1pp, b1, b1p, c1, c1p, e1, h1, g1, j1, e2, h2, g2, j2, ] return constr coeff_pil = {coeff.name:coeff for coeff in coeff_pil_list} # generate constraints def constr_mem_nl(simple=False, subf = False): """ constraint equations for membrane example """ all_varbs_pil = {**varbs_pil, **coeff_pil} m1, m2, m3, m4, m5, m6, m7, m8 = sy.symbols('m1 m2 m3 m4 m5 m6 m7 m8') # overall mass flow rates f1, f2, f3, f4, f5, f6, f7, f8 = sy.symbols('f1 f2 f3 f4 f5 f6 f7 f8') # component flow rates x1, x2, x3, x4, x5, x6, x7, x8 = sy.symbols('x1 x2 x3 x4 x5 x6 x7 x8') # component mass fractions p1, p2, p3, p4, p5, p6, p7, p8 = sy.symbols('p1 p2 p3 p4 p5 p6 p7 p8') # pressures varbs = [ m1, m2, m3, m4, m5, m6, m7, m8, x1, x2, x3, x4, x5, x6, x7, x8, f1, f2, f3, f4, f5, f6, f7, f8, p1, p2, p3, p4, p5, p6, p7, p8, ] var_labels = [ 'M1', 'M2', 'M3', 'M4', 'M5', 'M6', 'M7', 'M8', 'X1', 'X2', 'X3', 'X4', 'X5', 'X6', 'X7', 'X8', 'F1', 'F2', 'F3', 'F4', 'F5', 'F6', 'F7', 'F8', 'P1', 'P2', 'P3', 'P4', 'P5', 'P6', 'P7', 'P8', all_label_list = [ 'm1', 'm2', 'm3', 'm4', 'm5', 'm6', 'm7', 'm8', 'x1', 'x2', 'x3', 'x4', 'x5', 'x6', 'x7', 'x8', 'f1', 'f2', 'f3', 'f4', 'f5', 'f6', 'f7', 'f8', 'p1', 'p2', 'p3', 'p4', 'p5', 'p6', 'p7','p8', 'a1', 'a1p', 'a1pp', 'b1', 'b1p', 'c1', 'c1p', 'e1', 'h1', 'g1', 'j1', 'e2', 'f2', 'g2', 'j2' ] # Create a dictionary to store all variables and their values var_dict = dict(zip(var_labels, varbs)) constr = {} # mass balance constr = add_constraint(constr, m1 - m2) constr = add_constraint(constr, m2 - m3 + m8) constr = add_constraint(constr, m3 - m4 - m5) constr = add_constraint(constr, m5 - m6 - m7) constr = add_constraint(constr, m7 - m8) # component balance constr = add_constraint(constr, f1 - f2) constr = add_constraint(constr, f2 - f3 + f8) constr = add_constraint(constr, f3 - f4 - f5) constr = add_constraint(constr, f5 - f6 - f7) constr = add_constraint(constr, f7 - f8) # solve for f constr = add_constraint(constr, f1 - m1 * x1) constr = add_constraint(constr, f2 - m2 * x2) constr = add_constraint(constr, f3 - m3 * x3) constr = add_constraint(constr, f4 - m4 * x4) constr = add_constraint(constr, f5 - m5 * x5) constr = add_constraint(constr, f6 - m6 * x6) constr = add_constraint(constr, f7 - m7 * x7) constr = add_constraint(constr, f8 - m8 * x8) # equal concentration constr = add_constraint(constr, x5 - x6) constr = add_constraint(constr, x5 - x7) mem_constrs = constr_mem( M_in = m3, M_out = m4, x_in = x3, x_out = x4, p_in = p3, p_out = p4, simple = simple, # setup pilot constraints constr_pil = {} sltns_pil = {} constr_pil, sltns_pil = constr_pump( min = m1, mout=m2, xin=x1, xout = x2, pin=p1, pout=p2, E=e1, H = h1, G = g1, J= j1, constr = constr_pil, sltns=sltns_pil, ) for constr_eq in mem_constrs: constr = add_constraint(constr, constr_eq) # pump1 constraints (s1 in, s2 out) pump_constr_1 = constr_pump( m_in=m1, x_in=x1, m_out=m2, p_in=p1, p_out=p2, simple=simple constr_pil, sltns_pil = constr_mix( min1=m2, min2=m8, mout=m3, xin1=x2, xin2=x8, xout=x3, pin1=p2, pin2=p8, pout=p3, constr=constr_pil, sltns=sltns_pil, ) constr = add_constraint(constr, pump_constr_1) # pump2 constraints (s7 in, s8 out) pump_constr_2 = constr_pump( m_in=m7, x_in=x7, m_out=m8, p_in=p7, p_out=p8, simple=simple # membrane 1 constr_pil, sltns_pil = constr_mex( mfeed=m3, mperm=m4, mret=m5, xfeed=x3, xperm=x4, xret=x5, pfeed=p3, pperm=p4, pret=p5, a=a1, ap=a1p, app=a1pp, b=b1, bp=b1p, c=c1, cp=c1p, constr=constr_pil, sltns=sltns_pil, ) constr = add_constraint(constr, pump_constr_2) # friction loss over membrane (streams 3-5) mem_fric_constr = constr_fitting_mem( m_in=m3, m_out=m5, x_in=m3, x_out=m5, p_in=p3, p_out=p5, K_mem=K_mem, simple=simple, # split point constr_pil, sltns_pil = constr_split( min=m5, mout1=m6, mout2=m7, xin=x5, xout1=x6, xout2=x7, pin=p5, pout1=p6, pout2=p7, constr=constr_pil, sltns=sltns_pil ) constr_pil, sltns_pil = constr_pump( min = m7, mout=m8, xin=x7, xout = x8, pin=p7, pout=p8, E=e2, H = h2, G = g2, J= j2, constr = constr_pil, sltns=sltns_pil, ) constr = add_constraint(constr, mem_fric_constr) # remaining friction losses are equivalent (add friction loss coeff later) constr = add_constraint(constr, p2 - p3) constr = add_constraint(constr, p8 - p2) constr = add_constraint(constr, p5 - p6) constr = add_constraint(constr, p5 - p7) if subf: f_dict = { "f1":x1*m1, "f2":x2*m2, "f3":x3*m3, "f4":x4*m4, "f5":x5*m5, "f6":x6*m6, "f7":x7*m7, "f8":x8*m8, } constr_subf = constr.copy() for i, iconstr in constr.items(): constr[i] = iconstr.subs(f_dict) varbs = [ m1, m2, m3, m4, m5, m6, m7, m8, x1, x2, x3, x4, x5, x6, x7, x8, p1, p2, p3, p4, p5, p6, p7, p8, ] var_labels = [ 'M1', 'M2', 'M3', 'M4', 'M5', 'M6', 'M7', 'M8', 'X1', 'X2', 'X3', 'X4', 'X5', 'X6', 'X7', 'X8', 'P1', 'P2', 'P3', 'P4', 'P5', 'P6', 'P7', 'P8', ] # Create a dictionary to store all variables and their values var_dict = dict(zip(var_labels, varbs)) return constr, var_dict def get_x0_nl(m1 = 0.2, x1 = 0.2, p1 = 1e5, simple=False): # nonlinear operating point: # for failure case make recycle really large, other streams very small constr, _ = constr_mem_nl(simple=simple) M1 = m1 M8 = 1.1 M2 = M1 M3 = M2 + M8 M4 = 0.1 M5 = M3 - M4 M6 = 0.1 M7 = M8 X1 = x1 X2 = x1 X3 = 0.01 X4 = 0.01 X5 = 0.01 X6 = 0.01 X7 = 0.01 X8 = 0.01 F1 = M1*X1 F2 = M2*X2 F3 = M3*X3 F4 = M4*X4 F5 = M5*X5 F6 = M6*X6 F7 = M7*X7 F8 = M8*X8 x0_nl = { 'm1': M1, 'm2': M2, 'm3': M3, 'm4': M4, 'm5': M5, 'm6': M6, 'm7': M7, 'm8': M8, 'f1': F1, 'f2': F2, 'f3': F3, 'f4': F4, 'f5': F5, 'f6': F6, 'f7': F7, 'f8': F8, 'x1': X1, 'x2': X2, 'x3': X3, 'x4': X4, 'x5': X5, 'x6': X6, 'x7': X7, 'x8': X8, } # pressure constraints P1 = p1 x0_nl["p1"] = P1 # NOTE: constraint numbering is hardcoded. picking which constraint # corresponds to which is not addressed for the operating point # pump 1 constraint eq P2 = solve_constr(constr = constr[22], var="p2", subs=x0_nl ) P2 = float(P2) x0_nl["p2"] = P2 P3 = P2 x0_nl["p3"] = P3 P4 = solve_constr(constr = constr[20], var="p4", subs=x0_nl) P4 = float(P4) x0_nl["p4"] = P4 P5 = solve_constr(constr = constr[24], var="p5", subs=x0_nl) x0_nl["p5"] = P5 P6 = P5 x0_nl["p6"] = P6 P7 = P5 x0_nl["p7"] = P7 P8 = solve_constr(constr = constr[23], var="p8", subs=x0_nl) x0_nl["p8"] = P8 # # resolve the mass flow out, concentration out # x0_nl_x4 = copy.deepcopy(x0_nl) # x0_nl_x4.pop("x4") M4 = solve_constr(constr[21], var="m4", subs=x0_nl) # # resolve the mass flow out, concentration out # x0_nl_x4 = copy.deepcopy(x0_nl) # x0_nl_x4.pop("x4") X4 = solve_constr(constr[21], var="x4", subs=x0_nl) return x0_nl return constr_pil, all_varbs_pil