# Created: 2024-12-01
# Last Modified: 2025-05-27
# (c) Copyright 2024 ETH Zurich, Milos Katanic
# https://doi.org/10.5905/ethz-1007-842
#
# Licensed under the GNU General Public License v3.0;
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at:
#
# https://www.gnu.org/licenses/gpl-3.0.en.html
#
# This software is distributed "AS IS", WITHOUT WARRANTY OF ANY KIND,
# express or implied. See the License for specific language governing
# permissions and limitations under the License.
#
# The code is based on the publication: Katanic, M., Lygeros, J., Hug, G.: Recursive
# dynamic state estimation for power systems with an incomplete nonlinear DAE model.
# IET Gener. Transm. Distrib. 18, 3657–3668 (2024). https://doi.org/10.1049/gtd2.13308
# The full paper version is available at: https://arxiv.org/abs/2305.10065v2
# See full metadata at: README.md
# For inquiries, contact: mkatanic@ethz.ch
from __future__ import annotations # Postponed type evaluation
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from system import Dae
from pydynamicestimator.devices.device import DeviceRect
import numpy as np
import casadi as ca
import logging
[docs]
class StaticLoadPower(DeviceRect): # Not finished
def __init__(self) -> None:
super().__init__()
self._type = "Static_load_power"
self._name = "Static_load_power"
self._setpoints.update({"p": 0.0, "q": 0.0})
self._descr.update(
{
"p": "Active power value",
"q": "Reactive power value",
}
)
self.p = np.array([], dtype=float)
self.q = np.array([], dtype=float)
self.properties.update(
{
"fgcall": True,
"finit": True,
"init_data": True,
"xy_index": True,
"save_data": False,
}
)
[docs]
def gcall(self, dae: Dae) -> None:
dae.g[self.vre] += (
self.p / dae.Sb * dae.y[self.vre] + self.q / dae.Sb * dae.y[self.vim]
) / (dae.y[self.vre] ** 2 + dae.y[self.vim] ** 2)
dae.g[self.vim] += (
self.p / dae.Sb * dae.y[self.vim] - self.q / dae.Sb * dae.y[self.vre]
) / (dae.y[self.vre] ** 2 + dae.y[self.vim] ** 2)
[docs]
def fgcall(self, dae: Dae) -> None:
self.gcall(dae)
[docs]
class StaticLoadImpedance(DeviceRect):
def __init__(self) -> None:
super().__init__()
self._type = "Static_load_impedance"
self._name = "Static_load_impedance"
self._setpoints.update({"g": 1.0, "b": 1.0})
self._descr.update(
{
"g": "Conductance",
"b": "Susceptance",
}
)
self.g = np.array([], dtype=float)
self.b = np.array([], dtype=float)
self.properties.update(
{
"fgcall": True,
"finit": True,
"init_data": True,
"xy_index": True,
"save_data": False,
}
)
[docs]
def gcall(self, dae: Dae):
dae.g[self.vre] += self.g * dae.y[self.vre] - self.b * dae.y[self.vim]
dae.g[self.vim] += self.b * dae.y[self.vre] + self.g * dae.y[self.vim]
[docs]
def fgcall(self, dae: Dae) -> None:
self.gcall(dae)
[docs]
def finit(self, dae: Dae) -> None:
super().finit(dae)
2 == 2
[docs]
class StaticInfiniteBus(DeviceRect):
r"""
This model implements the infinite bus with current balance equations. Resistance
and reactance need to be set and the voltage values get calculated during
initialization.
Attributes:
r (np.ndarray): Resistance value
x (np.ndarray): Reactance value
vre_int (np.ndarray): Internal voltage value
vim_int (np.ndarray): Internal voltage value
.. math::
:nowrap:
\begin{aligned}
i_{re} &= \frac{1}{r^2 + x^2} \left( (v_{re} - v_{re}^{\text{int}}) r + (v_{im} - v_{im}^{\text{int}}) x \right) \\
i_{im} &= \frac{1}{r^2 + x^2} \left( -\left(v_{re} - v_{re}^{\text{int}}\right) x + \left(v_{im} - v_{im}^{\text{int}}\right) r \right)
\end{aligned}
"""
def __init__(self) -> None:
super().__init__()
self._type = "Infinite_bus"
self._name = "Infinite_bus"
self._setpoints.update({"vre_int": 1.0, "vim_int": 0.0})
self._descr.update(
{
"vre_int": "Voltage value in real axis",
"vim_int": "Voltage value in imaginary axis",
}
)
self.vre_int = np.array([], dtype=float)
self.vim_int = np.array([], dtype=float)
self._params.update({"r": 0.001, "x": 0.001})
self.r = np.array([], dtype=float) # internal resistance
self.x = np.array([], dtype=float) # internal reactance
self.properties.update(
{
"fgcall": True,
"finit": True,
"init_data": True,
"xy_index": True,
"save_data": False,
}
)
[docs]
def gcall(self, dae: Dae):
dae.g[self.vre] += (
1
/ (self.r**2 + self.x**2)
* (
(dae.y[self.vre] - self.vre_int) * self.r
+ (dae.y[self.vim] - self.vim_int) * self.x
)
)
dae.g[self.vim] += (
1
/ (self.r**2 + self.x**2)
* (
(dae.y[self.vre] - self.vre_int) * -self.x
+ (dae.y[self.vim] - self.vim_int) * self.r
)
)
[docs]
def fgcall(self, dae: Dae) -> None:
self.gcall(dae)
[docs]
class StaticZIP(DeviceRect):
r"""
ZIP static load. The shares of Z, I, and P (z_share, i_share, p_share) in the overall load need to be specified,
and then the inner parameters of the load will be set during the initialization
to satisfy the initial power flow and the share of contributions of each load tape. The shares need to sum up to one for the
initialization to work perfectly. Positive reactive current i_q means consumption.
.. math::
:nowrap:
\begin{aligned}
i_{re} &= \left(\frac{{p}v_{re} + {q} v_{im}}{v_{re}^2 + v_{im}^2}\right) + (g v_{re} - b v_{im}) + (\cos{\theta} i_d + \sin{\theta} i_q)\\
i_{re} &=\left(\frac{{p}v_{im} - {q} v_{re}}{v_{re}^2 + v_{im}^2} \right)+ (b v_{re} + g v_{im}) + (\sin{\theta} i_q - \sin{\theta} i_d)
\end{aligned}
Attributes:
g (np.ndarray: Conductance value
b (np.ndarray): Susceptance value
p (np.ndarray): Active power value
q (np.ndarray): Reactive power value
id (np.ndarray): Active current value
iq (np.ndarray): Reactive current value
p_share (np.ndarray): Share of power load (default: 0.0)
i_share (np.ndarray): Share of current load (default: 0.0)
z_share (np.ndarray): Share of impedance load (default: 1.0)
"""
def __init__(self) -> None:
super().__init__()
self._type = "Static_load_ZIP"
self._name = "Static_load_ZIP"
self._setpoints_z = {"g": 1.0, "b": 1.0}
self._setpoints_i = {"id": 1.0, "iq": 1.0}
self._setpoints_p = {"p": 1.0, "q": 1.0}
self._setpoints.update(
{"g": 1.0, "b": 1.0, "p": 1.0, "q": 1.0, "id": 1.0, "iq": 1.0}
)
self._params.update(
{
"p_share": 0.0, "i_share": 0.0, "z_share": 1.0,
# Q-side shares; default NaN means "fall back to the matching
# P-side share at use time", which preserves the historical
# single-share-per-branch behaviour bit-for-bit.
"p_share_q": float("nan"),
"i_share_q": float("nan"),
"z_share_q": float("nan"),
}
)
self._descr.update(
{
"p_share": "Fraction of the load constant power (P side)",
"i_share": "Fraction of the load constant current (P side)",
"z_share": "Fraction of the load constant impedance (P side)",
"p_share_q": "Fraction of the load constant power for Q (default: matches p_share)",
"i_share_q": "Fraction of the load constant current for Q (default: matches i_share)",
"z_share_q": "Fraction of the load constant impedance for Q (default: matches z_share)",
"g": "Conductance",
"b": "Susceptance",
"id": "Active current value",
"iq": "Reactive current value",
"p": "Active power value",
"q": "Reactive power value",
}
)
self.g = np.array([], dtype=float)
self.b = np.array([], dtype=float)
self.p = np.array([], dtype=float)
self.q = np.array([], dtype=float)
self.id = np.array([], dtype=float)
self.iq = np.array([], dtype=float)
self.p_share = np.array([], dtype=float)
self.i_share = np.array([], dtype=float)
self.z_share = np.array([], dtype=float)
self.p_share_q = np.array([], dtype=float)
self.i_share_q = np.array([], dtype=float)
self.z_share_q = np.array([], dtype=float)
self.properties.update(
{
"fgcall": True,
"finit": True,
"init_data": True,
"xy_index": True,
"save_data": False,
}
)
[docs]
def gcall_i(self, dae: Dae):
theta = np.arctan2(dae.y[self.vim], dae.y[self.vre])
i_re = np.cos(theta) * self.id + np.sin(theta) * self.iq
i_im = np.sin(theta) * self.id - np.cos(theta) * self.iq
dae.g[self.vre] += i_re
dae.g[self.vim] += i_im
[docs]
def gcall_p(self, dae: Dae):
dae.g[self.vre] += (self.p * dae.y[self.vre] + self.q * dae.y[self.vim]) / (
dae.y[self.vre] ** 2 + dae.y[self.vim] ** 2
)
dae.g[self.vim] += (self.p * dae.y[self.vim] - self.q * dae.y[self.vre]) / (
dae.y[self.vre] ** 2 + dae.y[self.vim] ** 2
)
[docs]
def gcall_z(self, dae: Dae):
dae.g[self.vre] += self.g * dae.y[self.vre] - self.b * dae.y[self.vim]
dae.g[self.vim] += self.b * dae.y[self.vre] + self.g * dae.y[self.vim]
[docs]
def fgcall(self, dae: Dae) -> None:
self.gcall_i(dae)
self.gcall_z(dae)
self.gcall_p(dae)
[docs]
def q_share(self, branch: str, k: int) -> float:
"""Q-side share for ``branch`` ∈ {'z','i','p'} at entry ``k``.
Falls back to the matching P-side share when the Q-side is unset
(NaN sentinel). This is the single source of truth for the
P-side / Q-side fallback used by both ``finit_sub`` (vectorised
via :func:`numpy.where`) and ``Dae.dist_load`` (scalar per
entry).
"""
q_val = float(getattr(self, f"{branch}_share_q")[k])
if np.isnan(q_val):
return float(getattr(self, f"{branch}_share")[k])
return q_val
[docs]
def finit_sub(self, dae: Dae, sub: str) -> None:
_setpoints = self.__getattribute__(f"_setpoints_{sub}")
u = ca.SX.sym("", 0)
u0 = []
for item in _setpoints:
# Set the initial guess for the setpoint
u0.append(self.__dict__[item])
# Reset it to be a variable
self.__dict__[item] = ca.SX.sym(item, self.n)
# Stack the variable to a single vector
u = ca.vertcat(u, self.__dict__[item])
u0 = [item for sublist in u0 for item in sublist]
# --- Decompose the per-bus init current into a P-only and a Q-only
# piece, then weight each by its independent share. With
# share_P == share_Q (every entry that doesn't declare a *_share_q
# field) this collapses to dae.iinit * share — bit-identical to the
# historical single-share calibration. See
# docs/load_disturbance_design.html §7 for the derivation.
V_re = dae.yinit[self.vre]
V_im = dae.yinit[self.vim]
V_sq = V_re ** 2 + V_im ** 2
I_re = dae.iinit[self.vre]
I_im = dae.iinit[self.vim]
# Back out (P, Q) per entry from S = V·I*
P_entry = V_re * I_re + V_im * I_im
Q_entry = V_im * I_re - V_re * I_im
# Per-axis P-only and Q-only currents
iinit_P_re = P_entry * V_re / V_sq
iinit_P_im = P_entry * V_im / V_sq
iinit_Q_re = Q_entry * V_im / V_sq
iinit_Q_im = -Q_entry * V_re / V_sq
share_P = self.__dict__[f"{sub}_share"]
share_Q_raw = self.__dict__[f"{sub}_share_q"]
share_Q = np.where(np.isnan(share_Q_raw), share_P, share_Q_raw)
dae.g[self.vre] += iinit_P_re * share_P + iinit_Q_re * share_Q
dae.g[self.vim] += iinit_P_im * share_P + iinit_Q_im * share_Q
# Algebraic variables are now not symbolic but their init values
dae.y = dae.yinit.copy()
dae.s = np.ones(dae.nx)
dae.s = np.ones(dae.nx)
gcall = self.__getattribute__(f"gcall_{sub}")
gcall(dae)
# self.fgcall(dae)
inputs = [ca.vertcat(u)]
outputs = [
ca.vertcat(
dae.g[self.__dict__["vre"]],
dae.g[self.__dict__["vim"]],
)
]
power_flow_init = ca.Function("h", inputs, outputs)
newton_init = ca.rootfinder("G", "newton", power_flow_init)
solution = newton_init(ca.vertcat(u0))
solution = np.array(solution).flatten()
for idx, s in enumerate(_setpoints):
setpoint_range_start = (len(self.states) + idx) * self.n
self.__dict__[s] = solution[
setpoint_range_start : setpoint_range_start + self.n
]
changed_setpoints = (
u0[idx * self.n : (idx + 1) * self.n]
!= solution[setpoint_range_start : setpoint_range_start + self.n]
)
for i in range(self.n):
if changed_setpoints[i]:
logging.info(
f"Setpoint '{s}' - {self._descr[s]} - updated in device {self._name} at node {self.bus[i]} from {u0[idx * self.n + i]} to {solution[setpoint_range_start + i]} to match the initial power flow!"
)
# Reset the algebraic equations such that they can be used "erneut" from scratch once the "fgcall" is called
dae.g *= 0
# Reset the voltages to being again symbolic variables
dae.y = ca.SX.sym("y", dae.ny)
dae.s = ca.SX.sym("s", dae.nx)
[docs]
def finit(self, dae: Dae) -> None:
self.finit_sub(dae, "p")
self.finit_sub(dae, "z")
self.finit_sub(dae, "i")
# Non-fatal sanity check: each side's shares should sum to 1.0
# per entry. A violation means the user's declared shares don't
# account for the full BusInit demand on that axis — surfacing
# this loudly is much better than silently under-delivering.
for k in range(self.n):
sP = float(self.z_share[k] + self.i_share[k] + self.p_share[k])
sQ = (self.q_share('z', k)
+ self.q_share('i', k)
+ self.q_share('p', k))
if not np.isclose(sP, 1.0, atol=1e-6):
logging.warning(
f"StaticZIP at bus '{self.bus[k]}': P-side shares "
f"sum to {sP:.6f} ≠ 1.0 — load will under/over-deliver "
f"active power."
)
if not np.isclose(sQ, 1.0, atol=1e-6):
logging.warning(
f"StaticZIP at bus '{self.bus[k]}': Q-side shares "
f"sum to {sQ:.6f} ≠ 1.0 — load will under/over-deliver "
f"reactive power."
)