# Created: 2024-12-01
# Last Modified: 2026-03-19
# (c) Copyright 2024 ETH Zurich, Milos Katanic
# https://doi.org/10.5905/ethz-1007-842
#
# Licensed under the GNU General Public License v3.0;
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at:
#
# https://www.gnu.org/licenses/gpl-3.0.en.html
#
# This software is distributed "AS IS", WITHOUT WARRANTY OF ANY KIND,
# express or implied. See the License for specific language governing
# permissions and limitations under the License.
#
# The code is based on the publication: Katanic, M., Lygeros, J., Hug, G.: Recursive
# dynamic state estimation for power systems with an incomplete nonlinear DAE model.
# IET Gener. Transm. Distrib. 18, 3657–3668 (2024). https://doi.org/10.1049/gtd2.13308
# The full paper version is available at: https://arxiv.org/abs/2305.10065v2
# See full metadata at: README.md
# For inquiries, contact: mkatanic@ethz.ch
from __future__ import annotations # Postponed type evaluation
from typing import TYPE_CHECKING, Union, Any, Optional
if TYPE_CHECKING:
from pydynamicestimator.system import Dae, Grid, DaeEst, DaeSim, GridSim
import casadi as ca
import numpy as np
import logging
np.random.seed(30)
sin = np.sin
cos = np.cos
sqrt = np.sqrt
[docs]
class Element:
"""Metaclass to be used for all elements to be added"""
def __init__(self) -> None:
self.n: int = 0 # number of devices
self.u: list[bool] = [] # device status
self.name: list[str] = [] # name of the device
self._type: Optional[str] = None # Device type
self._name: Optional[str] = None # Device name
self.int: dict[
str, Union[str, int]
] = (
{}
) # Dictionary of unique identifiers for each device based on the variable "idx"
self._data: dict[str, Any] = {"u": True} # Default entries for lists
self._params: dict[str, float] = {} # Default entries for np.arrays params
self._setpoints: dict[
str, float
] = {} # Default entries for np.arrays set points
self._descr: dict[str, str] = {} # Parameter and data explanation
self._mand: list[str] = [] # mandatory attributes
self.properties: dict[str, bool] = {
"gcall": False,
"fcall": False,
"xinit": False,
"fgcall": False,
"qcall": False,
"call": False,
"xy_index": False,
"finit": False,
"save_data": False,
"fplot": False,
}
[docs]
def add(
self, idx: Optional[str] = None, name: Optional[str] = None, **kwargs
) -> None:
"""
Add an element device
Args:
idx (str, optional): Unique identifier for the device. Generated if not provided.
name (str, optional): Name of the device. Generated if not provided.
**kwargs: Custom parameters to overwrite defaults.
"""
# Generate unique identifiers if not provided
idx = idx or f"{self._type}_{self.n + 1}"
name = name or f"{self._name}_{self.n + 1}"
self.int[idx] = self.n
self.name.append(name)
# check whether mandatory parameters have been set up
for key in self._mand:
if key not in kwargs:
raise Exception("Mandatory parameter <%s> has not been set up" % key)
self.n += 1
# Set default values
for key, default in {**self._params, **self._setpoints}.items():
if key not in self.__dict__:
logging.warning(
f"Attribute {key} not found in element - initializing as an empty array."
)
self.__dict__[key] = np.array([], dtype=type(default))
self.__dict__[key] = np.append(self.__dict__[key], default)
for key, default in self._data.items():
if key not in self.__dict__:
logging.warning(
f"Attribute {key} not found in element - initializing as an empty list."
)
self.__dict__[key] = []
self.__dict__[key].append(default)
# Overwrite custom values
for key, value in kwargs.items():
if key not in self.__dict__:
logging.info(
"Element %s does not have parameter %s - ignoring"
% (self._name, key)
)
continue
try:
self.__dict__[key][-1] = value # Attempt to update the last element
except IndexError:
raise IndexError(
f"Parameter/setpoint '{key}' not properly specified in the model definition. Check if it's properly listed and initialized when defining the class"
)
logging.info(f"Element {name} (ID: {idx}) added successfully.")
[docs]
class BusInit(Element):
def __init__(self) -> None:
super().__init__()
self._type = "Bus_init_or_unknwon" # Element type
self._name = "Bus_init_or_unknown" # Element name
self._data.update({"bus": None, "p": 0, "q": 0, "v": 1.0, "type": None})
self.bus: list[Optional[str]] = []
self.p: list[float] = []
self.q: list[float] = []
self.v: list[float] = []
self.type: list[Optional[str]] = []
BusUnknown = BusInit # Alias class name
[docs]
class Disturbance(Element):
def __init__(self) -> None:
super().__init__()
self._type = "Disturbance" # Element type
self._name = "Disturbance" # Element name
# Default parameter values
self._params.update(
{
"bus_i": None,
"bus_j": None,
"time": None,
"type": None,
"y": 10,
"bus": None,
"p_delta": 0,
"q_delta": 0,
}
)
self.type = np.array([], dtype=str)
self.time = np.array([], dtype=float)
self.bus_i = np.array([], dtype=str)
self.bus_j = np.array([], dtype=str)
self.y: np.ndarray = np.array([], dtype=float)
self.bus = np.array([], dtype=str)
self.p_delta = np.array([], dtype=float)
self.q_delta = np.array([], dtype=float)
[docs]
def sort_chrono(self):
sorted_indices = np.argsort(self.time)
for key in self._params:
setattr(self, key, getattr(self, key)[sorted_indices])
[docs]
class Line(Element):
r"""
Parameters
----------
r : ndarray[float]
Series resistance value in per unit (p.u.).
x : ndarray[float]
Series reactance value in per unit (p.u.).
g : ndarray[float]
Total shunt conductance value in per unit (p.u.).
b : ndarray[float]
Total shunt susceptance value in per unit (p.u.).
trafo : ndarray[float]
Off-nominal line transformer ratio.
bus_i : ndarray[str]
Name of the sending-end bus.
bus_j : ndarray[str]
Name of the receiving-end bus.
"""
def __init__(self) -> None:
super().__init__()
self._type = "Transmission_line" # Element type
self._name = "Transmission_line" # Element name
self._params.update(
{
"r": 0.001,
"x": 0.001,
"g": 0,
"b": 0,
"bus_i": None,
"bus_j": None,
"trafo": 1,
}
)
self.states = ["i_ijr", "i_iji"]
self.r = np.array([], dtype=float)
self.x = np.array([], dtype=float)
self.g = np.array([], dtype=float)
self.b = np.array([], dtype=float)
self.trafo = np.array([], dtype=float)
self.bus_i = np.array([], dtype=object)
self.bus_j = np.array([], dtype=object)
[docs]
def fcall(self, grid: GridSim, dae: DaeSim) -> None:
r"""Call all differential equations for all lines. This function should be used
only in case lines are modeled dynamically. If they are modeled statically they
are automatically incorporated within grid class"""
# TODO create separate multiplication factors for re and im because of phase shifters
self.i_ijr = [i for i in range(2 * grid.nb) if i % 2 == 0]
self.i_iji = [i for i in range(2 * grid.nb) if i % 2 != 0]
omega_b = [2 * np.pi * dae.fn] * self.n
# Update Reference Frequencies
omega_ref_vec = (
ca.SX.ones(self.n, 1) * dae.omega_net
) # initialization and fallback
# Set reference frequency
if getattr(dae, "omega_ref_lines", None) is not None:
# Vector of reference frequencies at all lines
omega_ref_vec = dae.omega_ref_lines
# # d i_ijr / dt
# dae.fl[self.i_ijr] = (omega_b / (self.x * 1)) * (
# ((dae.y[grid.idx_i_re] / self.trafo) - dae.y[grid.idx_j_re])
# - ((self.r * 1) * dae.xl[self.i_ijr])) + omega_b * omega_ref_vec * dae.xl[self.i_iji]
# # d i_iji / dt
# dae.fl[self.i_iji] = (omega_b / (self.x * 1)) * (
# ((dae.y[grid.idx_i_im] / self.trafo) - dae.y[grid.idx_j_im])
# - ((self.r * 1) * dae.xl[self.i_iji])) - omega_b * omega_ref_vec * dae.xl[self.i_ijr]
# NOTE: implementation with transformed voltages
# Transforming nodal voltages to reference frame of line properties
# Quantities in same equation are thus consistently in same reference frame.
Vi_re, Vi_im, Vj_re, Vj_im = grid.build_voltage_transformed_to_line_frame(dae)
# d i_ijr / dt
dae.fl[self.i_ijr] = (omega_b / (self.x * 1)) * (
((Vi_re / self.trafo) - Vj_re) - ((self.r * 1) * dae.xl[self.i_ijr])
) + omega_b * omega_ref_vec * dae.xl[self.i_iji]
# d i_iji / dt
dae.fl[self.i_iji] = (omega_b / (self.x * 1)) * (
((Vi_im / self.trafo) - Vj_im) - ((self.r * 1) * dae.xl[self.i_iji])
) - omega_b * omega_ref_vec * dae.xl[self.i_ijr]
# Apply line switches: sl=0 deactivates opened lines (derivative → 0)
dae.fl[self.i_ijr] *= dae.sl[self.i_ijr]
dae.fl[self.i_iji] *= dae.sl[self.i_iji]
# Accumulate branch currents into nodal current balance using a prebuilt mapping matrix.
# Gate by sl so opened lines don't inject current into nodes.
# dae.g += grid.C_linecurrents_to_nodes @ (dae.xl * dae.sl)
# TODO add the rotation of current through phase shifts
# NOTE: implementation with transformed linecurrents
# Accumulates branch currents into nodal current balance.
# Quantities are consistently transformed into the same reference frame.
# This robustly handles multiple lines incident to the same bus without index-overwrite issues.
# The returned vector spans the 2*nn bus-voltage block; device-private
# algebraics occupy dae.g[2*nn:] and are untouched here.
nv = 2 * grid.nn
dae.g[:nv] += grid.build_linecurrents_transformed_to_bus_frame(dae)
[docs]
class DeviceRect(Element):
"""
Dynamic or static device modeled in rectangular coordinates. Used as a parent class
for all devices modeled in rectangular coordinates.
Attributes:
properties (dict[str, bool]): Flags for method calls (e.g., 'gcall', 'fcall').
xf (dict[str, np.ndarray]): Final state results for simulation/estimation.
xinit (dict[State, list[float]]): Initial state values for all devices of the instance.
xmin (list[float]): Minimum limited state value.
xmax (list[float]): Maximum limited state value.
_params (dict[str, float]): Device parameters, such as rated voltage, power, and frequency and internal parameters.
_data (dict[str, str]): Additional data which will be used in a list, one entry for each device of this object instance.
bus (list[Optional[str]]): Buses where the device is connected. Each device has an entry in the list.
states (list[States]): State variables.
units (list[States]): Units of state variables.
ns (float): Total number of states in the model.
_states_noise (dict[States, float]): Noise for each state variable.
_states_init_error (dict[States, float]): Initial error for each state variable.
vre (list[float]): Order of the real voltage value for each device in the overall DAE model.
vim (list[float]): Order of the real voltage value for each device in the overall DAE model.
_algebs (list[str]): Algebraic variables ('vre', 'vim').
_descr (dict[str, str]): Descriptions for key parameters.
"""
#: Initialization method this device class declares as its default. ``"joint"``
#: -> the one-shot Newton+LM solve in :meth:`finit` (the framework default,
#: e.g. the synchronous machine); ``"sequential"`` -> a staged init the device
#: implements in :meth:`_finit_sequential` (e.g. the inverter). Declared per
#: device class.
_init_method: str = "joint"
def __init__(self) -> None:
super().__init__()
self.xf: dict[str, np.ndarray] = {} # final state results or sim/est
self.yf_int: dict[str, np.ndarray] = {} # private algebraic trajectories
self.xinit: dict[str, list[float]] = {}
self.xmin: list[float] = []
self.xmax: list[float] = []
self._params.update({"Vn": 220, "fn": 50.0, "Sn": 100})
self.Vn = np.array([], dtype=float)
self.fn = np.array([], dtype=float)
self.Sn = np.array([], dtype=float)
self._data.update({"bus": None})
self.bus: list[Optional[str]] = [] # at which bus
self.states: list[str] = [] # list of state variables
self.units: list[str] = [] # List of states' units
self.ns: int = 0 # number of states
self._states_noise: dict[
str, float
] = {} # list of noises for every state variable (only for estimation)
self._states_init_error: dict[
str, float
] = {} # list of initial errors for every state variable (only for estimation)
self._algebs: list[str] = ["vre", "vim"] # list of algebraic variables
self.vre = np.array([], dtype=float)
self.vim = np.array([], dtype=float)
# Device-private (internal) algebraic variables. Unlike vre/vim (which
# are shared, grid-mapped node voltages), each private algebraic is
# owned by this device, gets a fresh dae.y/dae.g slot in xy_index, and
# is defined by its own equation 0 = g written in fgcall. Used for
# device-local algebraic constraints that cannot be symbolically
# eliminated. See docs/algebraic_equations_design.md.
self._algebs_int: list[str] = [] # ordered names of private algebraics
self._algebs_int_x0: dict[str, float] = {} # init guess (Newton guess in finit)
self._algebs_int_noise: dict[str, float] = {} # relative process-noise weight
self._algebs_int_units: dict[str, str] = {} # units, for labels/plotting
self._x0: dict[
str, float
] = (
{}
) # default initial states, will be used as initial guess for the simulation initialization
self._mand.extend(["bus"])
self._descr = {
"Sn": "rated power",
"Vn": "rated voltage",
"u": "connection status",
"fn": "nominal frequency",
}
[docs]
def __init_subclass__(cls, **kwargs):
super().__init_subclass__(**kwargs)
# Merge class-level _descrs if subclass adds its own
full_descr = dict(getattr(cls, "_descr", {}))
for base in cls.__bases__:
full_descr.update(getattr(base, "_descr", {}))
# Format as docstring section
doc_lines = ["\nAttributes", "----------"]
for key, val in full_descr.items():
doc_lines.append(f"{key} : np.ndarray")
doc_lines.append(f" {val}")
# Inject into class docstring
cls.__doc__ = (cls.__doc__ or "") + "\n" + "\n".join(doc_lines)
[docs]
def _init_data(self) -> None:
self.xinit = {state: [] for state in self.states}
[docs]
def xy_index(self, dae: Dae, grid: Grid) -> None:
"""Initializes indices for states, algebraic variables, unknown inputs, and switches.
Args:
dae (Dae): Object managing differential-algebraic equations.
grid (Grid): Object managing the electrical grid and node indices.
"""
zeros = [0] * self.n
for item in self.states:
self.__dict__[item] = zeros[:]
for item in self._algebs:
self.__dict__[item] = zeros[:]
for item in self._algebs_int:
self.__dict__[item] = zeros[:]
for var in range(self.n):
for item in self.states:
self.__dict__[item][var] = dae.nx
# Init with current state init value
dae.xinit.append(self.xinit[item][var])
# Add the corresponding state to the DAE
dae.states.append(f"{self._name}_at_{self.bus[var]}_{item}")
dae.nx += 1
# Retrieve min and max state limits
item_min = getattr(self, item + "_min", None)
item_max = getattr(self, item + "_max", None)
self.xmin.append(item_min[var] if item_min is not None else -np.inf)
self.xmax.append(item_max[var] if item_max is not None else np.inf)
dae.xmin.append(item_min[var] if item_min is not None else -np.inf)
dae.xmax.append(item_max[var] if item_max is not None else np.inf)
if self.n:
# assign indices to real and imaginary voltage algebraic variables; first real value
self.__dict__["vre"] = grid.get_node_index(self.bus)[1]
self.__dict__["vim"] = grid.get_node_index(self.bus)[2]
# Device-private algebraic variables: append fresh slots AFTER the
# 2*nn voltage block. grid.nn is final here, so the global index is
# grid.nn*2 + dae.n_priv (a running counter across all devices). The
# symbolic dae.y/dae.g vectors are sized to 2*nn + dae.n_priv later, in
# Grid.init_symbolic (which runs after every device's xy_index).
if self._algebs_int:
base = grid.nn * 2
for var in range(self.n):
for item in self._algebs_int:
self.__dict__[item][var] = base + dae.n_priv
dae.algebs_int_names.append(
f"{self._name}_at_{self.bus[var]}_{item}"
)
dae.yinit_priv.append(self._algebs_int_x0.get(item, 0.0))
dae.n_priv += 1
[docs]
def add(self, idx=None, name=None, **kwargs) -> None:
super().add(idx, name, **kwargs)
# initialize initial states with some default values
for item in self.states:
self.xinit[item].append(self._x0[item])
[docs]
def init_from_simulation(
self, device_sim: DeviceRect, idx: str, dae: DaeEst, dae_sim: DaeSim
) -> None:
"""Initialize the device state estimation based on simulation results.
Args:
device_sim (DeviceRect): The device simulation object containing simulation results.
idx (str): unique index of the device
dae (DaeEst): The DAE object responsible for managing the estimation of the system.
dae_sim (DaeSim): The simulation object providing timing information.
"""
var_sim = device_sim.int.get(idx)
var_est = self.int.get(idx)
# Initial states of estimation as true states obtained through simulation
for item in self.states:
# Init with simulated value
try:
self.xinit[item][var_est] = device_sim.xf[item][
var_sim, round(dae.T_start / dae_sim.t)
]
except KeyError:
logging.error(
f"Failed to initialize state {item}. State not found in simulation model."
)
continue
# Add noise for the init state
noise = (
self._states_init_error[item]
* (np.random.uniform() - 0.5)
* dae.init_error_diff
)
dae.xinit[self.__dict__[item][var_est]] = self.xinit[item][var_est] + noise
# Set setpoint values based on simulation
for item, value in self._setpoints.items():
if item in device_sim.__dict__:
self.__dict__[item][var_est] = device_sim.__dict__[item][var_sim]
else:
logging.warning(
f"Setpoint {item} not found in simulation test cases. Skipping. It will be ignored and the estimation will start from default initial value"
)
# Seed device-private algebraic variables from the simulated trajectory
# at T_start, written into the (already ny-sized) dae.yinit.
for item in self._algebs_int:
if item in device_sim.yf_int:
dae.yinit[self.__dict__[item][var_est]] = device_sim.yf_int[item][
var_sim, round(dae.T_start / dae_sim.t)
]
[docs]
def save_data(self, dae: Dae) -> None:
for item in self.states:
self.xf[item] = np.zeros([self.n, dae.nts])
self.xf[item][:, :] = dae.x_full[self.__dict__[item][:], :]
# Persist device-private algebraic trajectories (from dae.y_full) so the
# estimation twin can be initialised from the simulated values.
for item in self._algebs_int:
self.yf_int[item] = dae.y_full[self.__dict__[item][:], :]
[docs]
def finit_anchor_residuals(self, dae: Dae) -> list:
"""Extra steady-state anchor residuals (each an n-vector that must equal 0)
a device declares to keep its initialization Newton system square.
The generic :meth:`finit` pins each device's state ODEs (``f = 0``) plus
two quantities through the bus-current balance (``g[vre]``/``g[vim]``), so
the square-system invariant is ``n_setpoints == 2`` (the SG: Pref + Vref).
A device with more setpoints returns ``n_setpoints - 2`` anchor residuals
to balance them. Empty by default -> existing devices are unaffected.
Example -- the inverter has 3 setpoints (Pref/Qref/Vref) but the bus
equations pin only 2; it returns one anchor (Qref = Qc) here. See
``docs/inverter_modernization_design.md`` §7.
"""
return []
[docs]
def finit(self, dae: Dae) -> None:
"""Initialize the device, dispatching to the method this device class
declares via :attr:`_init_method` (``"joint"`` -> :meth:`_finit_joint`,
``"sequential"`` -> :meth:`_finit_sequential`)."""
if self._init_method == "sequential":
self._finit_sequential(dae)
elif self._init_method == "joint":
self._finit_joint(dae)
else:
raise ValueError(
f"Unknown _init_method {self._init_method!r} on {self._name}; "
"expected 'joint' or 'sequential'."
)
[docs]
def _finit_sequential(self, dae: Dae) -> None:
"""Staged, device-specific initialization. Devices that declare
``_init_method = 'sequential'`` implement this; the default has none."""
raise NotImplementedError(
f"{self._name} declares _init_method='sequential' but provides no "
"_finit_sequential."
)
[docs]
def _finit_joint(self, dae: Dae) -> None:
"""One-shot initialization: solve the device's steady state (states +
setpoints + private algebraics, balanced by the bus equations and any
device-declared anchor residuals) as a single Newton system, with a
Levenberg-Marquardt fallback.
Initialize the device by setting up setpoints, initial states based on the power flow solution.
Args:
dae (Dae): The DAE object used to simulate the system.
"""
u = ca.SX.sym("", 0)
u0 = []
for item in self._setpoints:
# Set the initial guess for the setpoint
u0.append(self.__dict__[item])
# Reset it to be a variable
self.__dict__[item] = ca.SX.sym(item, self.n)
# Stack the variable to a single vector
u = ca.vertcat(u, self.__dict__[item])
u0 = [item for sublist in u0 for item in sublist] # flatten it
# Now subtract the initial network currents from algebraic equations
for alg in self._algebs:
dae.g[self.__dict__[alg]] += dae.iinit[self.__dict__[alg]]
# Algebraic variables are now fixed to their init values -- EXCEPT this
# device's own private algebraics, which stay symbolic so finit can
# solve for their consistent values together with the states. We build a
# mixed dae.y: numeric voltages, symbolic privates. priv_global_idx /
# priv_syms / priv_guess are kept item-major, instance-minor so the
# solution tail can be unpacked in the same order below.
priv_syms: list = []
priv_global_idx: list = []
priv_guess: list = []
if self._algebs_int:
y_mixed = ca.SX(dae.yinit) # numeric voltages as SX constants
for item in self._algebs_int:
sym = ca.SX.sym(item, self.n)
for var in range(self.n):
gidx = self.__dict__[item][var]
y_mixed[gidx] = sym[var]
priv_global_idx.append(gidx)
priv_guess.append(self._algebs_int_x0.get(item, 0.0))
priv_syms.append(sym)
dae.y = y_mixed
else:
dae.y = dae.yinit.copy()
dae.s = np.ones(dae.nx)
self.fgcall(dae)
# Find the indices of differential equations for this type of generator
diff = [self.__dict__[arg] for arg in self.states]
diff_index = [item for sublist in np.transpose(diff) for item in sublist]
# Stack private symbols as additional unknowns (after the setpoints) and
# their defining equations dae.g[private] as additional residuals. This
# keeps the Newton system square: +n*n_priv unknowns, +n*n_priv residuals.
priv_stack = ca.SX.sym("", 0)
for sym in priv_syms:
priv_stack = ca.vertcat(priv_stack, sym)
residuals = ca.vertcat(
dae.f[diff_index],
dae.g[self.__dict__["vre"]],
dae.g[self.__dict__["vim"]],
)
if priv_global_idx:
residuals = ca.vertcat(residuals, dae.g[priv_global_idx])
# Device-declared anchor residuals balance setpoints not pinned by the two
# bus-current equations (e.g. the inverter's Qref=Qc gauge fix). Empty by
# default, so the square-system invariant is unchanged for existing devices.
for anchor in self.finit_anchor_residuals(dae):
residuals = ca.vertcat(residuals, anchor)
inputs = [ca.vertcat(dae.x[diff_index], u, priv_stack)]
outputs = [residuals]
W = [ca.vertcat(dae.omega_ref, dae.omega_ref_buses, dae.omega_ref_lines)]
W_one = [ca.SX.ones(1 + dae.grid.nn + dae.grid.nb, 1)]
outputs = ca.substitute(outputs, W, W_one)
device_init = ca.Function("h", inputs, outputs)
newton_init = ca.rootfinder("G", "newton", device_init)
x0 = np.array([self._x0[s] for s in self.states] * self.n)
guess = ca.vertcat(x0, u0)
if priv_guess:
guess = ca.vertcat(guess, priv_guess)
try:
solution = np.array(newton_init(guess)).flatten()
except RuntimeError:
solution = np.full(int(inputs[0].numel()), np.nan)
if not np.isfinite(solution).all():
# Plain Newton can diverge when the per-device init Jacobian is
# rank-deficient at the starting guess (a benign gauge in the d-axis
# excitation/flux chain). This is harmless for the stock models, but
# the larger DAE init systems (devices declaring private algebraics)
# need a globalised solve. Fall back to a damped Gauss-Newton
# (Levenberg-Marquardt) iteration, which regularises the
# rank-deficient Jacobian and converges to a consistent solution.
h = ca.Function("h", inputs, outputs)
Jh = ca.Function("Jh", inputs, [ca.jacobian(outputs[0], inputs[0])])
z = np.array(guess).flatten().astype(float)
lam = 1e-3
r = np.array(h(z)).flatten()
for _ in range(500):
rn = np.linalg.norm(r)
if rn < 1e-10:
break
J = np.array(Jh(z))
JtJ = J.T @ J
step = np.linalg.solve(
JtJ + lam * np.eye(JtJ.shape[0]), -J.T @ r
)
z_new = z + step
r_new = np.array(h(z_new)).flatten()
if np.linalg.norm(r_new) < rn:
z, r = z_new, r_new
lam = max(lam * 0.5, 1e-12)
else:
lam = min(lam * 2.0, 1e8)
solution = z
# # Init only these states
# for s in self.states:
# self.xinit[s] = list(solution[self.__dict__[s]])
index = 0 # emma: rewrote 2 lines above so that more than just synchronous generators could be included in the simulation (otherwise the indexing is not done properly)
for s in self.states:
locat = []
for n in range(self.n):
locat.append(n * len(self.states) + index)
self.xinit[s] = solution[locat]
index += 1
for idx, s in enumerate(self._setpoints):
setpoint_range_start = (len(self.states) + idx) * self.n
changed_setpoints = (
u0[idx * self.n : (idx + 1) * self.n]
!= solution[setpoint_range_start : setpoint_range_start + self.n]
)
for i in range(self.n):
if changed_setpoints[i]:
logging.info(
f"Setpoint '{s}' - {self._descr[s]} - updated in device {self._name} at node {self.bus[i]} from {u0[idx * self.n + i]} to {solution[setpoint_range_start + i]} to match the initial power flow!"
)
self.__dict__[s] = solution[
setpoint_range_start : setpoint_range_start + self.n
]
# Now load the initial states into DAE class such that simulation/estimation actually starts from those values
dae.xinit[diff_index] = solution[: len(self.states) * self.n]
# Write converged private algebraic values back into dae.yinit so the
# time-domain start sits on the constraint manifold. The private block of
# the solution begins after the states and setpoints, item-major
# instance-minor (the order in which priv_global_idx was built).
if self._algebs_int:
k = (len(self.states) + len(self._setpoints)) * self.n
for gidx in priv_global_idx:
dae.yinit[gidx] = solution[k]
k += 1
# Reset the algebraic equations such that they can be used "erneut" from scratch once the "fgcall" is called
dae.g *= 0
# Reset the voltages to being again symbolic variables
dae.y = ca.SX.sym("y", dae.ny)
dae.s = ca.SX.sym("s", dae.nx)
2 == 2
[docs]
def fgcall(self, dae: Dae) -> None:
"""
A method that executes the differential and algebraic equations of the model
and adds them to the appropriate places in the Dae class
:param dae: an instance of a class Dae
:type dae:
:return: None
:rtype: None
"""
pass
[docs]
def qcall(self, dae: DaeEst) -> None:
for item in self.states:
dae.q_proc_noise_diff_cov_matrix[
self.__dict__[item], self.__dict__[item]
] = (self._states_noise[item]) ** 2
# Per-private algebraic-equation process-noise weight (squared relative
# weight, mirroring _states_noise for states). Default 1 -> the private
# constraint gets the same slack as a bus KCL equation; 0 -> hard
# constraint. estimate() scales these by proc_noise_alg**2.
for item in self._algebs_int:
weight = self._algebs_int_noise.get(item, 1.0)
dae.q_proc_noise_alg_relative[self.__dict__[item]] = weight**2