# Created: 2024-12-01
# Last Modified: 2026-03-19
# (c) Copyright 2024 ETH Zurich, Milos Katanic
# https://doi.org/10.5905/ethz-1007-842
#
# Licensed under the GNU General Public License v3.0;
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at:
#
# https://www.gnu.org/licenses/gpl-3.0.en.html
#
# This software is distributed "AS IS", WITHOUT WARRANTY OF ANY KIND,
# express or implied. See the License for specific language governing
# permissions and limitations under the License.
#
# The code is based on the publication: Katanic, M., Lygeros, J., Hug, G.: Recursive
# dynamic state estimation for power systems with an incomplete nonlinear DAE model.
# IET Gener. Transm. Distrib. 18, 3657–3668 (2024). https://doi.org/10.1049/gtd2.13308
# The full paper version is available at: https://arxiv.org/abs/2305.10065v2
# See full metadata at: README.md
# For inquiries, contact: mkatanic@ethz.ch
from __future__ import annotations # Postponed type evaluation
from scipy.linalg import cho_solve, cholesky, block_diag, null_space
from typing import Literal, Optional, Sequence, Tuple
from pydynamicestimator.devices.device import Line, BusInit, Disturbance, BusUnknown
from pydynamicestimator.config import config
import casadi as ca
import numpy as np
import pandas as pd
from tabulate import tabulate
import logging
import math
import shutil
import time
from tqdm import tqdm
np.set_printoptions(threshold=np.inf)
np.random.seed(30)
[docs]
class Grid:
def __init__(self) -> None:
self.y_adm_matrix: Optional[ca.SX] = None # Admittance matrix
self.y_series_r: Optional[ca.SX] = None # Series admittance real
self.y_series_i: Optional[ca.SX] = None # Series admittance complex
self.G_dyn: Optional[ca.SX] = (
None # Conductance with dynamic omega (with faults)
)
self.B_i_dyn: Optional[ca.SX] = None # Susceptance on i side (with faults)
self.B_j_dyn: Optional[ca.SX] = None # Susceptance on j side (with faults)
self.X_dyn: Optional[ca.SX] = None # Reactance with dynamic omega (with faults)
self.y_series: Optional[np.ndarray] = None # Series admittance
self.z_imp_matrix: Optional[np.ndarray] = None # Impedance matrix
self.tau: Optional[np.ndarray] = None # Transformer tap ratios
self.tau_r: Optional[np.ndarray] = None # Transformer tap ratios real
self.tau_i: Optional[np.ndarray] = None # Transformer tap ratios complex
self.incident_matrix: Optional[np.ndarray] = None
self.Bsum: Optional[np.ndarray] = None # sum of shunt susceptance at each node
self.Gsum: Optional[np.ndarray] = None # sum of shunt conductance at each node
self.nb: int = 0 # Number of branches
self.nn: int = 0 # Number of nodes
self.buses: list = [] # list of all system buses
self.Bsum_trafo: Optional[np.ndarray] = None
self.Gsum_trafo: Optional[np.ndarray] = None
self.Sb: float = 100
# Indices corresponding to branches
self.idx_i: list = []
self.idx_j: list = []
self.idx_i_re: list = [] # node i, real voltage index
self.idx_j_re: list = [] # node j, real voltage index
self.idx_i_im: list = []
self.idx_j_im: list = []
self.yinit: dict = {} # Init voltages
self.yf: dict = {} # Output voltages
self.sf: dict = {} # Output power
self.line: Optional[Line] = None
self.line_is_faulted: list[bool] = []
self.line_is_open: list[bool] = []
self.line_fault_adm: list[float] = []
self.bus_is_faulted: list[bool] = []
self.bus_fault_adm: list[float] = []
# Dictionary indices for fast look up
self.idx_branch: dict[Tuple[str, str], int] = {}
self.idx_bus: dict[str, int] = {}
self.idx_bus_re: dict[str, int] = {}
self.idx_bus_im: dict[str, int] = {}
# Matrices to calculate all branch currents
self.C_branches_forward: Optional[np.ndarray] = None
self.C_branches_reverse: Optional[np.ndarray] = None
self.C_branches: Optional[np.ndarray] = None # stacked together
# Mapping from line current states (xl) to nodal current balance entries (g)
# Shape: (2*nn, 2*nb)
self.C_linecurrents_to_nodes: Optional[np.ndarray] = None
[docs]
def save_data(self, dae: Dae) -> None:
for idx, bus in enumerate(self.buses):
self.yf[str(bus)] = dae.y_full[2 * idx : 2 * idx + 2, :]
for idx, bus in enumerate(self.buses):
self.sf[bus] = np.zeros([2, dae.nts]) # allocate
I_bus_re = np.zeros((self.nn, dae.nts))
I_bus_im = np.zeros((self.nn, dae.nts))
for k in range(self.nb):
I_bus_re[self.idx_i[k], :] += dae.i_full[4 * k + 0, :]
I_bus_im[self.idx_i[k], :] += dae.i_full[4 * k + 1, :]
I_bus_re[self.idx_j[k], :] += dae.i_full[4 * k + 2, :]
I_bus_im[self.idx_j[k], :] += dae.i_full[4 * k + 3, :]
# Restrict to the 2*nn voltage block before the even/odd split; device-
# private algebraics live in y_full rows [2*nn:] and must not be read as
# bus voltages.
nv = 2 * self.nn
Vr = dae.y_full[0:nv:2, :]
Vi = dae.y_full[1:nv:2, :]
P = Vr * I_bus_re + Vi * I_bus_im
Q = Vi * I_bus_re - Vr * I_bus_im
for idx, bus in enumerate(self.buses):
self.sf[bus] = np.vstack((P[idx, :], Q[idx, :]))
# u_power = stack_volt_power(dae.y_full[2 * idx, t], dae.y_full[2 * idx + 1, t])
# self.sf[bus][:, t] = u_power.dot(self.y_adm_matrix[2 * idx : 2 * idx + 2, :]).dot(dae.y_full[:, t])
# # Allow for dynamic admittance matrix by using evaluated one
# #self.sf[bus][:, t] = u_power.dot(Yk[2 * idx : 2 * idx + 2, :]).dot(dae.y_full[:, t])
[docs]
def init_symbolic(self, dae: Dae) -> None:
# Algebraic vector layout: [ bus voltages (2*nn) | device privates (n_priv) ].
# dae.n_priv is accumulated in device.xy_index, which runs before this.
dae.nv = self.nn * 2 # size of the bus-voltage block
dae.ny = dae.nv + dae.n_priv
dae.nl = self.nb * 2
dae.y = ca.SX.sym("y", dae.ny)
dae.g = ca.SX(np.zeros(dae.ny))
dae.xl = ca.SX.sym("xl", 2 * self.nb)
dae.fl = ca.SX(np.zeros(2 * self.nb))
dae.fnode = ca.SX(np.zeros(2 * self.nn))
dae.grid = self
# Line switches (1=active, 0=opened) for dynamic line mode
if dae.line_dyn:
dae.sl = ca.SX.sym("sl", 2 * self.nb)
if not hasattr(dae, "slinit") or dae.slinit is None:
dae.slinit = np.ones(2 * self.nb)
[docs]
def gcall(self, dae: Dae) -> None:
# KCL at every bus: device injections + network admittance term = 0.
# Devices contribute -I_gen (sources) or +I_load (sinks). The
# network term must enter with the SAME sign as on the simulator
# side (GridSim.gcall non-line_dyn branch) — otherwise only the Yv
# part flips, the device terms don't, and the resulting equation
# is not a global sign-flip of the sim equation but a different,
# non-physical KCL. That mistake is what made DSE results bad
# without crashing the filter (see CHANGELOG_dse_parity.md).
T = self.build_bus_rotation_T(dae)
# The rotation T and y_adm_matrix are 2*nn x 2*nn, so the admittance
# term applies to the bus-voltage block only. Device-private algebraic
# variables occupy dae.y[2*nn:] and have their own equations in
# dae.g[2*nn:] (written by device fgcall); leave those untouched.
nv = 2 * self.nn
dae.g[:nv] += T.T @ (self.y_adm_matrix @ (T @ dae.y[:nv]))
# Distributed-reference-frame delta_ref differential equation:
# dδ_ref / dt = ω_b · (ω_ref(bus) − 1)
# GridSim.gcall does the same in its non-line_dyn branch; replicating
# here so that DaeEst with omega_mode='dist' also gets these
# equations populated (otherwise the f vector has free symbols at
# the delta_ref state positions and CasADi rejects the function).
if getattr(dae, "has_delta_ref", False):
omega_b = 2 * np.pi * dae.fn
omega_ref_vec = ca.SX.ones(self.nn, 1) * dae.omega_net
if getattr(dae, "omega_ref_buses", None) is not None:
omega_ref_vec = dae.omega_ref_buses
for bus in self.buses:
idx_delta = dae.idx_delta_ref_by_bus[bus]
idx_bus = self.idx_bus[bus]
dae.f[idx_delta] = omega_b * (omega_ref_vec[idx_bus] - 1.0)
[docs]
def build_bus_rotation_T(self, dae: Dae) -> ca.SX:
"""
Builds rotation matrix in order to translate bus voltages.
returns: Rotation matrix, SX (2*nn x 2*nn)
"""
if not dae.has_delta_ref:
return ca.SX.eye(2 * self.nn)
T = ca.SX.zeros(2 * self.nn, 2 * self.nn)
for bus_idx, bus_name in enumerate(self.buses):
idx_delta = dae.idx_delta_ref_by_bus[bus_name]
theta = dae.x[idx_delta]
c = ca.cos(theta)
s = ca.sin(theta)
i = 2 * bus_idx
T[i, i] = c
T[i, i + 1] = -s
T[i + 1, i] = s
T[i + 1, i + 1] = c
return T
[docs]
def guncall(self, dae: Dae) -> None:
T = self.build_bus_rotation_T(dae)
nv = 2 * self.nn # bus-voltage block; privates live in dae.y[2*nn:]
dae.g[:nv] -= T.T @ (self.y_adm_matrix @ (T @ dae.y[:nv]))
[docs]
def add_lines(self, line: Line) -> None:
self.line = line
for bus_i, bus_j in zip(line.bus_i, line.bus_j):
self.add_bus(bus_i, self.idx_i, self.idx_i_re, self.idx_i_im)
self.add_bus(bus_j, self.idx_j, self.idx_j_re, self.idx_j_im)
self.idx_branch[(bus_i, bus_j)] = self.nb
self.nb += 1
self.line_is_faulted = [False] * self.nb
self.line_fault_adm = [0.0] * self.nb
self.line_is_open = [False] * self.nb
self.bus_is_faulted = [False] * self.nn
self.bus_fault_adm = [0.0] * self.nn
[docs]
def add_bus(self, bus: str, idx: list, idx_re: list, idx_im: list) -> None:
if bus not in self.buses:
self.buses.append(bus)
idx.append(self.nn)
idx_re.append(2 * self.nn)
idx_im.append(2 * self.nn + 1)
self.idx_bus[bus] = self.nn
self.nn += 1
else:
idx.append(self.buses.index(bus))
idx_re.append(2 * self.buses.index(bus))
idx_im.append(1 + 2 * self.buses.index(bus))
[docs]
def build_Bsum_Gsum_trafo(self) -> None:
r = self.line.r.copy()
x = self.line.x.copy()
g = self.line.g.copy()
b = self.line.b.copy()
trafo = self.line.trafo.copy()
z_inv = 1 / (r**2 + x**2)
gs = r * z_inv * 0
bs = -x * z_inv * 0
# on the from side
self.Gsum_trafo = np.zeros(self.nn)
self.Bsum_trafo = np.zeros(self.nn)
np.add.at(
self.Gsum_trafo,
self.idx_i,
+gs / trafo**2 + g / 2 / (trafo**2 - 1) - gs / trafo,
)
np.add.at(self.Gsum_trafo, self.idx_j, +gs + g / 2 - gs / trafo)
np.add.at(
self.Bsum_trafo,
self.idx_i,
+bs / trafo**2 + b / 2 / (trafo**2 - 1) - bs / trafo,
)
np.add.at(self.Bsum_trafo, self.idx_j, +bs + b / 2 - bs / trafo)
2 == 2
[docs]
def build_y(self) -> None:
# NOTE: w is 1.0 pu hardcoded here
self.y_adm_matrix = np.zeros([2 * self.nn, 2 * self.nn])
self.C_branches_forward = np.zeros([2 * self.nb, 2 * self.nn])
self.C_branches_reverse = np.zeros([2 * self.nb, 2 * self.nn])
r = self.line.r.copy()
x = self.line.x.copy()
g = self.line.g.copy()
b = self.line.b.copy()
trafo = self.line.trafo.copy()
self.tau_r = np.real(trafo)
self.tau_i = np.imag(trafo)
for faulted_line, faulted in enumerate(self.line_is_faulted):
if faulted:
rtemp = complex(r[faulted_line])
xtemp = complex(x[faulted_line])
gtemp = complex(g[faulted_line])
btemp = complex(b[faulted_line])
zt = complex(rtemp, xtemp)
yt = self.line_fault_adm[faulted_line]
zp = zt * (1 + zt * yt / 4)
yp = zt * yt / zp + complex(gtemp, btemp)
r[faulted_line] = zp.real
x[faulted_line] = zp.imag
g[faulted_line] = yp.real
b[faulted_line] = yp.imag
for open_line, opened in enumerate(self.line_is_open):
if opened:
# For static lines: set impedance very high so admittance → 0
# For dynamic lines: sl switch deactivates the branch, but we
# still zero out shunt (g, b) so Bsum/Gsum are correct, and use
# a moderate impedance to avoid float64 overflow in r² + x².
r[open_line] = 1e15
x[open_line] = 1e15
g[open_line] = 0
b[open_line] = 0
for bus_id, faulted in enumerate(self.bus_is_faulted):
if faulted:
np.add.at(
self.y_adm_matrix,
(2 * bus_id, 2 * bus_id),
self.bus_fault_adm[bus_id],
)
np.add.at(
self.y_adm_matrix,
(2 * bus_id + 1, 2 * bus_id + 1),
self.bus_fault_adm[bus_id],
)
# Calculate Y matrix values
z_inv = 1 / (r**2 + x**2)
y_off_diag_real = -r * z_inv / trafo
y_off_diag_imag = -x * z_inv / trafo
y_diag_real = (g / 2 + r * z_inv) / trafo**2
y_diag_imag = (-b / 2 + x * z_inv) / trafo**2
self.y_series = 1 / (r + 1j * x)
self.tau = trafo.copy()
self.g_eff = g.copy()
self.b_eff = b.copy()
# Update Y matrix with vectorized operations
np.add.at(self.y_adm_matrix, (self.idx_i_re, self.idx_j_re), y_off_diag_real)
np.add.at(self.y_adm_matrix, (self.idx_i_re, self.idx_j_im), y_off_diag_imag)
np.add.at(self.y_adm_matrix, (self.idx_i_im, self.idx_j_re), -y_off_diag_imag)
np.add.at(self.y_adm_matrix, (self.idx_i_im, self.idx_j_im), y_off_diag_real)
np.add.at(self.y_adm_matrix, (self.idx_j_re, self.idx_i_re), y_off_diag_real)
np.add.at(self.y_adm_matrix, (self.idx_j_re, self.idx_i_im), y_off_diag_imag)
np.add.at(self.y_adm_matrix, (self.idx_j_im, self.idx_i_re), -y_off_diag_imag)
np.add.at(self.y_adm_matrix, (self.idx_j_im, self.idx_i_im), y_off_diag_real)
# Update diagonal elements
np.add.at(self.y_adm_matrix, (self.idx_i_re, self.idx_i_re), y_diag_real)
np.add.at(self.y_adm_matrix, (self.idx_i_re, self.idx_i_im), y_diag_imag)
np.add.at(self.y_adm_matrix, (self.idx_i_im, self.idx_i_re), -y_diag_imag)
np.add.at(self.y_adm_matrix, (self.idx_i_im, self.idx_i_im), y_diag_real)
np.add.at(self.y_adm_matrix, (self.idx_j_re, self.idx_j_re), g / 2 + r * z_inv)
np.add.at(self.y_adm_matrix, (self.idx_j_re, self.idx_j_im), -b / 2 + x * z_inv)
np.add.at(self.y_adm_matrix, (self.idx_j_im, self.idx_j_re), b / 2 - x * z_inv)
np.add.at(self.y_adm_matrix, (self.idx_j_im, self.idx_j_im), g / 2 + r * z_inv)
even_rows = np.arange(0, 2 * self.nb, 2)
odd_rows = np.arange(1, 2 * self.nb, 2)
# Create a branch impedance matrix to calculate branch currents
np.add.at(
self.C_branches_forward,
(even_rows, self.idx_i_re),
-y_off_diag_real / trafo + g / 2 / trafo**2,
)
np.add.at(
self.C_branches_forward,
(even_rows, self.idx_i_im),
-y_off_diag_imag / trafo - b / 2 / trafo**2,
)
np.add.at(
self.C_branches_forward,
(odd_rows, self.idx_i_re),
y_off_diag_imag / trafo + b / 2 / trafo**2,
)
np.add.at(
self.C_branches_forward,
(odd_rows, self.idx_i_im),
-y_off_diag_real / trafo + g / 2 / trafo**2,
)
np.add.at(self.C_branches_forward, (even_rows, self.idx_j_re), y_off_diag_real)
np.add.at(self.C_branches_forward, (even_rows, self.idx_j_im), y_off_diag_imag)
np.add.at(self.C_branches_forward, (odd_rows, self.idx_j_re), -y_off_diag_imag)
np.add.at(self.C_branches_forward, (odd_rows, self.idx_j_im), y_off_diag_real)
np.add.at(
self.C_branches_reverse,
(even_rows, self.idx_j_re),
-y_off_diag_real * trafo + g / 2,
)
np.add.at(
self.C_branches_reverse,
(even_rows, self.idx_j_im),
-y_off_diag_imag * trafo - b / 2,
)
np.add.at(
self.C_branches_reverse,
(odd_rows, self.idx_j_re),
y_off_diag_imag * trafo + b / 2,
)
np.add.at(
self.C_branches_reverse,
(odd_rows, self.idx_j_im),
-y_off_diag_real * trafo + g / 2,
)
np.add.at(self.C_branches_reverse, (even_rows, self.idx_i_re), y_off_diag_real)
np.add.at(self.C_branches_reverse, (even_rows, self.idx_i_im), y_off_diag_imag)
np.add.at(self.C_branches_reverse, (odd_rows, self.idx_i_re), -y_off_diag_imag)
np.add.at(self.C_branches_reverse, (odd_rows, self.idx_i_im), y_off_diag_real)
self.C_branches = np.vstack((self.C_branches_forward, self.C_branches_reverse))
try:
self.z_imp_matrix = np.linalg.inv(self.y_adm_matrix)
except np.linalg.LinAlgError:
logging.WARNING(
"Admittance matrix not invertible - small shunt capacitance added"
)
self.y_adm_matrix[0, 0] += 1e-8
self.y_adm_matrix[1, 1] += 1e-8
self.z_imp_matrix = np.linalg.inv(self.y_adm_matrix)
[docs]
def build_y_sym(
self,
omega_buses: ca.SX = ca.SX(1.0),
omega_lines: float | ca.SX = 1.0,
dyn_update: bool = False,
) -> None:
self.y_adm_matrix = ca.SX.zeros(2 * self.nn, 2 * self.nn)
self.C_branches_forward = ca.SX.zeros(2 * self.nb, 2 * self.nn)
self.C_branches_reverse = ca.SX.zeros(2 * self.nb, 2 * self.nn)
L = self.line.x.copy()
C = self.line.b.copy()
r = self.line.r.copy()
g = self.line.g.copy()
# For dynamic lines we use x and b, to be L and C, respectively,
# As they are evaluated at 1.0 p.u. and thus equal.
# For static lines x, and b are updated after fault calculation
x = 1.0 * L
b = 1.0 * C
# trafo assumed to be real
# NOTE: formulation needs to be extended for imaginary taps
trafo = self.line.trafo.copy()
self.tau = trafo
self.tau_r = trafo.real
self.tau_i = trafo.imag
for faulted_line, faulted in enumerate(self.line_is_faulted):
if faulted:
# NOTE: took nondyn b (susceptance) & x (reactance) for fault admittance
zt_r = r[faulted_line]
zt_i = L[faulted_line] # non dynamic
gk = g[faulted_line]
bk = C[faulted_line] # non dynamic
yt = self.line_fault_adm[faulted_line]
# wz = 1 + zt*yt/4
wz_r = 1 + (zt_r * yt / 4)
wz_i = zt_i * yt / 4
# zp = zt * wz
zp_r = zt_r * wz_r - zt_i * wz_i
zp_i = zt_r * wz_i + zt_i * wz_r
# yp = (zt*yt)/zp + (g + jb)
num_r = zt_r * yt
num_i = zt_i * yt
yp_r = (num_r * zp_r + num_i * zp_i) / (zp_r**2 + zp_i**2) + gk
yp_i = (num_i * zp_r - num_r * zp_i) / (zp_r**2 + zp_i**2) + bk
# Overwrite branch parameters
r[faulted_line] = zp_r
x[faulted_line] = zp_i
g[faulted_line] = yp_r
b[faulted_line] = yp_i
for open_line, opened in enumerate(self.line_is_open):
if opened:
r[open_line] = 1e308
x[open_line] = 1e308
g[open_line] = 0
b[open_line] = 0
for bus_id, faulted in enumerate(self.bus_is_faulted):
if faulted:
adm = ca.SX(self.bus_fault_adm[bus_id])
self.y_adm_matrix[2 * bus_id, 2 * bus_id] += adm
self.y_adm_matrix[2 * bus_id + 1, 2 * bus_id + 1] += adm
# For static lines, we use x and b, evaluated at frequencies
# Define shunt susceptances on each bus side
# NOTE: defined here in order to include fault
b_i = b
b_j = b
# For static case, evaluate parameters at frequencies
# different from 1.0 pu, given by frequency reference framework.
if not dae_sim.line_dyn:
if dyn_update:
# Series reactance evaluated at line frequency
x = omega_lines * L
# Shunt susceptance evaluated at respective bus frequency
omega_buses_i = omega_buses[self.idx_i]
omega_buses_j = omega_buses[self.idx_j]
b_i = omega_buses_i * b
b_j = omega_buses_j * b
# Storing with fault applied
self.G_dyn = g
self.B_i_dyn = b_i
self.B_j_dyn = b_j
self.X_dyn = x
# Calculate Y matrix values
tau_abs = self.tau**2
# tau_inv_r = self.tau_r / tau_abs
# tau_inv_i = -self.tau_i / tau_abs
z_inv = 1 / (r**2 + x**2)
# Off-diagonal admittance: y_ij = -y_series / T
# y_off_diag_real = -((r * z_inv) * tau_inv_r - (-x * z_inv) * tau_inv_i)
# y_off_diag_imag = -((r * z_inv) * tau_inv_i + (-x * z_inv) * tau_inv_r)
y_off_diag_real = -r * z_inv / self.tau
y_off_diag_imag = -x * z_inv / self.tau
# Diagonal at "i" (from side): (y_sh_i + y_series) / |T|^2
y_diag_real_i = (g / 2 + r * z_inv) / tau_abs
y_diag_imag_i = (-b_i / 2 + x * z_inv) / tau_abs
# Diagonal at "j" (to side): (y_sh_j + y_series)
y_diag_real_j = g / 2 + r * z_inv
y_diag_imag_j = -b_j / 2 + x * z_inv
# Series admittance: 1 / (r + jx)
self.y_series_r = r * z_inv
self.y_series_i = -x * z_inv
for k in range(self.nb):
ii = self.idx_i_re[k]
ki = self.idx_i_im[k]
jj = self.idx_j_re[k]
kj = self.idx_j_im[k]
# Off-diagonal (i->j)
self.y_adm_matrix[ii, jj] += y_off_diag_real[k]
self.y_adm_matrix[ii, kj] += y_off_diag_imag[k]
self.y_adm_matrix[ki, jj] += -y_off_diag_imag[k]
self.y_adm_matrix[ki, kj] += y_off_diag_real[k]
# Off-diagonal (j->i)
self.y_adm_matrix[jj, ii] += y_off_diag_real[k]
self.y_adm_matrix[jj, ki] += y_off_diag_imag[k]
self.y_adm_matrix[kj, ii] += -y_off_diag_imag[k]
self.y_adm_matrix[kj, ki] += y_off_diag_real[k]
# Diagonal at i
self.y_adm_matrix[ii, ii] += y_diag_real_i[k]
self.y_adm_matrix[ii, ki] += y_diag_imag_i[k]
self.y_adm_matrix[ki, ii] += -y_diag_imag_i[k]
self.y_adm_matrix[ki, ki] += y_diag_real_i[k]
# Diagonal at j
self.y_adm_matrix[jj, jj] += y_diag_real_j[k]
self.y_adm_matrix[jj, kj] += y_diag_imag_j[k]
self.y_adm_matrix[kj, jj] += -y_diag_imag_j[k]
self.y_adm_matrix[kj, kj] += y_diag_real_j[k]
# Forward mapping
self.C_branches_forward[2 * k, ii] += -y_off_diag_real[k] / self.tau[k] + g[
k
] / (2 * tau_abs[k])
self.C_branches_forward[2 * k, ki] += -y_off_diag_imag[k] / self.tau[
k
] - b_i[k] / (2 * tau_abs[k])
self.C_branches_forward[2 * k + 1, ii] += y_off_diag_imag[k] / self.tau[
k
] + b_i[k] / (2 * tau_abs[k])
self.C_branches_forward[2 * k + 1, ki] += -y_off_diag_real[k] / self.tau[
k
] + g[k] / (2 * tau_abs[k])
# tau_inv2_r = tau_inv_r**2 - tau_inv_i**2
# tau_inv2_i = 2 * tau_inv_r * tau_inv_i
# self.C_branches_forward[2*k, ii] += (-(self.y_series_r[k] * tau_inv_r[k] - self.y_series_i[k] * tau_inv_i[k]) + g[k]/2 * tau_inv2_r[k] + b[k]/2 * tau_inv2_i[k])
# self.C_branches_forward[2*k, ki] += - (-(self.y_series_r[k] * tau_inv_i[k] + self.y_series_i[k] * tau_inv_r[k]) + g[k]/2 * tau_inv2_i[k] - b[k]/2 * tau_inv2_r[k])
# self.C_branches_forward[2*k + 1, ii] += (-(self.y_series_r[k] * tau_inv_i[k] + self.y_series_i[k] * tau_inv_r[k]) + g[k]/2 * tau_inv2_i[k] - b[k]/2 * tau_inv2_r[k])
# self.C_branches_forward[2*k + 1, ki] += (-(self.y_series_r[k] * tau_inv_r[k] - self.y_series_i[k] * tau_inv_i[k]) + g[k]/2 * tau_inv2_r[k] + b[k]/2 * tau_inv2_i[k])
self.C_branches_forward[2 * k, jj] += y_off_diag_real[k]
self.C_branches_forward[2 * k, kj] += y_off_diag_imag[k]
self.C_branches_forward[2 * k + 1, jj] += -y_off_diag_imag[k]
self.C_branches_forward[2 * k + 1, kj] += y_off_diag_real[k]
# Backward mapping
self.C_branches_reverse[2 * k, jj] += (
-y_off_diag_real[k] * self.tau[k] + g[k] / 2
)
self.C_branches_reverse[2 * k, kj] += (
-y_off_diag_imag[k] * self.tau[k] - b_j[k] / 2
)
self.C_branches_reverse[2 * k + 1, jj] += (
y_off_diag_imag[k] * self.tau[k] + b_j[k] / 2
)
self.C_branches_reverse[2 * k + 1, kj] += (
-y_off_diag_real[k] * self.tau[k] + g[k] / 2
)
# self.C_branches_reverse[2*k, jj] += (-(self.y_series_r[k] * self.tau_r[k] - self.y_series_i[k] * self.tau_i[k]) + g[k]/2)
# self.C_branches_reverse[2*k, kj] += (-(self.y_series_r[k] * self.tau_i[k] + self.y_series_i[k] * self.tau_r[k]) - b[k]/2)
# self.C_branches_reverse[2*k + 1, jj] += ( (self.y_series_r[k] * self.tau_i[k] + self.y_series_i[k] * self.tau_r[k]) + b[k]/2)
# self.C_branches_reverse[2*k + 1, kj] += (-(self.y_series_r[k] * self.tau_r[k] - self.y_series_i[k] * self.tau_i[k]) + g[k]/2)
self.C_branches_reverse[2 * k, ii] += y_off_diag_real[k]
self.C_branches_reverse[2 * k, ki] += y_off_diag_imag[k]
self.C_branches_reverse[2 * k + 1, ii] += -y_off_diag_imag[k]
self.C_branches_reverse[2 * k + 1, ki] += y_off_diag_real[k]
self.C_branches = ca.vertcat(self.C_branches_forward, self.C_branches_reverse)
# NOTE: not used elsewhere thus commented out, need to check invertibility with values
# self.z_imp_matrix = ca.inv(self.y_adm_matrix)
# except np.linalg.LinAlgError:
# # Matrix not invertible — add a little capacitance to the ground at node 0
# logging.WARNING(
# "Admittance matrix not invertible - small shunt capacitance added"
# )
# self.y_adm_matrix[0, 0] += 1e-8
# self.y_adm_matrix[1, 1] += 1e-8
# self.z_imp_matrix = np.linalg.inv(self.y_adm_matrix)
[docs]
def get_branch_index(
self, node1: list[str], node2: list[str]
) -> tuple[np.ndarray, np.ndarray]:
"""
Args:
node1 (): List of starting nodes
node2 (): List of receiving nodes
Returns: The order of the given branch and the order of the opposite direction branch
"""
# Sort the node pair and look it up in the dictionary
# the first one sequence doesn't matter
# the second one matters
ids1 = []
ids2 = []
if isinstance(node1, str):
node1 = [node1]
if isinstance(node2, str):
node2 = [node2]
for n1, n2 in zip(node1, node2):
key = (n1, n2)
key_r = (n2, n1)
if key in self.idx_branch:
idx = self.idx_branch[key]
ids1.append(idx)
ids2.append(idx)
elif key_r in self.idx_branch:
idx = self.idx_branch[key_r]
ids1.append(idx)
ids2.append(idx + self.nb)
else:
ids1.append(None)
ids2.append(None)
return np.array(ids1), np.array(ids2)
[docs]
def get_node_index(self, buses: list) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
# Calculate the real and imaginary indices in one step
if not buses:
return (
np.array([]),
np.array([]),
np.array([]),
) # Return empty arrays safely, if list of buses empty
var_indices = [
(self.idx_bus[bus], 2 * self.idx_bus[bus], 2 * self.idx_bus[bus] + 1)
for bus in buses
]
idx, idx_re, idx_im = zip(*var_indices)
return np.array(idx), np.array(idx_re), np.array(idx_im)
[docs]
class GridSim(Grid):
[docs]
def gcall(self, dae: DaeSim, **kwargs):
if dae.line_dyn:
line = kwargs["line"]
# First call line dynamics equations
line.fcall(self, dae)
omega_b = [2 * np.pi * dae.fn] * self.nn
omega_ref_vec = (
ca.SX.ones(self.nn, 1) * dae.omega_net
) # initialization and fallback
# Set reference frequency
if getattr(dae, "omega_ref_buses", None) is not None:
# Vector of reference frequencies at all buses
omega_ref_vec = dae.omega_ref_buses
# d δ_ref / dt
if dae.has_delta_ref:
for bus in self.buses:
# Derivative d δ_ref/dt = ω_b * ω_ref
idx_delta = dae.idx_delta_ref_by_bus[bus]
idx_bus = self.idx_bus[bus]
dae.f[idx_delta] = omega_b[0] * (omega_ref_vec[idx_bus] - 1.0)
idx_bus_re = [i for i in range(2 * self.nn) if i % 2 == 0]
idx_bus_im = [i for i in range(2 * self.nn) if i % 2 != 0]
i_shG_re = dae.y[idx_bus_re] * self.Gsum
i_shG_im = dae.y[idx_bus_im] * self.Gsum
# d vre / dt
dae.fnode[idx_bus_re] = (
omega_b / self.Bsum * -dae.g[idx_bus_re]
+ omega_b * omega_ref_vec * dae.y[idx_bus_im]
- omega_b / self.Bsum * i_shG_re
)
# d vim / dt
dae.fnode[idx_bus_im] = (
omega_b / self.Bsum * -dae.g[idx_bus_im]
- omega_b * omega_ref_vec * dae.y[idx_bus_re]
- omega_b / self.Bsum * i_shG_im
)
else:
T = self.build_bus_rotation_T(dae)
# dae.g += ca.mtimes(self.y_adm_matrix, dae.y)
# 2*nn x 2*nn admittance term applies to the bus-voltage block only;
# device-private algebraics occupy dae.y[2*nn:] / dae.g[2*nn:].
nv = 2 * self.nn
dae.g[:nv] += T.T @ (self.y_adm_matrix @ (T @ dae.y[:nv]))
omega_b = [2 * np.pi * dae.fn] * self.nn
omega_ref_vec = (
ca.SX.ones(self.nn, 1) * dae.omega_net
) # initialization and fallback
# Set reference frequency
if getattr(dae, "omega_ref_buses", None) is not None:
# Vector of reference frequencies at all buses
omega_ref_vec = dae.omega_ref_buses
# d δ_ref / dt
if dae.has_delta_ref:
for bus in self.buses:
# Derivative d δ_ref/dt = ω_b * ω_ref
idx_delta = dae.idx_delta_ref_by_bus[bus]
idx_bus = self.idx_bus[bus]
dae.f[idx_delta] = omega_b[0] * (omega_ref_vec[idx_bus] - 1.0)
[docs]
def init_from_power_flow(self, dae: DaeSim, static: BusInit) -> None:
y = ca.SX.sym("y", self.nn * 2)
y_init = []
g = ca.SX(np.zeros(self.nn * 2))
for idx, bus in enumerate(self.buses):
y_init.extend([1, 0])
static_idx = static.bus.index(bus)
y_real = y[2 * idx]
y_imag = y[2 * idx + 1]
if static.type[static_idx] == "PQ":
u_power = ca.horzcat(
ca.vertcat(y_real, y_imag), ca.vertcat(+y_imag, -y_real)
)
g[2 * idx : 2 * idx + 2] = (
u_power @ self.y_adm_matrix[2 * idx : 2 * idx + 2, :] @ y
+ np.array([static.p[static_idx], static.q[static_idx]]) / self.Sb
)
elif static.type[static_idx] == "PV":
u_power = ca.horzcat(y_real, +y_imag)
g[2 * idx] = (
u_power @ self.y_adm_matrix[2 * idx : 2 * idx + 2, :] @ y
+ static.p[static_idx] / self.Sb
)
g[2 * idx + 1] = (
np.sqrt(y[2 * idx] ** 2 + y[2 * idx + 1] ** 2)
- static.v[static_idx]
)
elif static.type[static_idx] == "slack":
g[2 * idx] = np.sqrt(y_real**2 + y_imag**2) - static.v[static_idx]
g[2 * idx + 1] = y_imag
z = ca.Function("z", [y], [g])
G = ca.rootfinder("G", "newton", z)
try:
solution = G(y_init)
except Exception as e:
logging.error(f"An exception occurred: {e}")
logging.error("Init power flow failed.")
raise Exception(
"Power flow cannot be solved. Check if the grid parameters (pu) and initial power consumptions (MW and MVar) are realistic. Generated power should be negative. Only one bus can be slack."
)
if (solution == y_init).is_one():
logging.error(
"Init power flow aborted. Error unknown. Try different operating point"
)
raise Exception("Power flow not solved")
# save the initial data into yinit dictionary
for idx, bus in enumerate(self.buses):
self.yinit[str(bus)] = np.array(solution[2 * idx : 2 * idx + 2].T)[0]
dae.yinit = np.array(list(self.yinit.values())).reshape(self.nn * 2)
dae.iinit = np.array(ca.DM(self.y_adm_matrix @ dae.yinit)).squeeze()
# Extend yinit with device-private algebraic init guesses, appended after
# the 2*nn voltage block (iinit above is computed from the voltage block
# only, so it stays 2*nn). These guesses seed the finit Newton solve and
# are overwritten there with the converged consistent values.
if dae.n_priv:
dae.yinit = np.concatenate(
[dae.yinit, np.array(dae.yinit_priv, dtype=float)]
)
# calculate the voltage across each branch (v_from - v_to)
# V_branch = self.incident_matrix @ dae.yinit
# find the admittance at each branch
y_adm_branch = np.zeros((2 * self.nb, 2 * self.nb))
y_adm_re = -self.y_adm_matrix[self.idx_i_re, self.idx_j_re]
y_adm_im = -self.y_adm_matrix[self.idx_i_im, self.idx_j_re]
for k in range(self.nb):
y_adm_branch[2 * k, 2 * k] = y_adm_re[k]
y_adm_branch[2 * k, 2 * k + 1] = -y_adm_im[k]
y_adm_branch[2 * k + 1, 2 * k] = y_adm_im[k]
y_adm_branch[2 * k + 1, 2 * k + 1] = y_adm_re[k]
# yinit → complex bus voltages (nn,)
yinit_reshape = np.array(list(self.yinit.values()), dtype=float).reshape(
self.nn * 2
)
r = self.line.r.copy()
x = self.line.x.copy()
y_series = 1 / (r + 1j * x)
Vbus = yinit_reshape[0::2] + 1j * yinit_reshape[1::2]
# Vectorized branch computations
Vi = Vbus[self.idx_i] # (nb,)
Vj = Vbus[self.idx_j] # (nb,)
Va = Vi / self.tau # (nb,) tap can be complex
# NOTE: not self.y_series to avoid mixing Casadi and Numpy
I_series = y_series * (Va - Vj) # (nb,) complex branch currents
# Pack Re/Im interleaved into xlinit: [Re0, Im0, Re1, Im1, ...]
xlinit = np.empty(2 * self.nb, dtype=float)
xlinit[0::2] = I_series.real
xlinit[1::2] = I_series.imag
dae.xlinit = xlinit
self.build_Bsum()
self.build_Gsum()
# self.print_init_power_flow(dae)
[docs]
def print_init_power_flow(self, dae: DaeSim) -> None:
# Print results of initialization
print("\nPower flow for initialization successfully solved")
# ---- BUS RESULTS ----
idx_bus_re = [i for i in range(2 * self.nn) if i % 2 == 0]
idx_bus_im = [i for i in range(2 * self.nn) if i % 2 != 0]
vinit_re = np.array(dae.yinit)[idx_bus_re]
vinit_im = np.array(dae.yinit)[idx_bus_im]
vinit_mag = np.sqrt(vinit_re**2 + vinit_im**2) # p.u.
vinit_phase = np.arctan2(vinit_im, vinit_re) # radians
iinit_re = np.array(dae.iinit)[idx_bus_re]
iinit_im = np.array(dae.iinit)[idx_bus_im]
Pinit = (vinit_re * iinit_re + vinit_im * iinit_im) * self.Sb
Qinit = (vinit_im * iinit_re - vinit_re * iinit_im) * self.Sb
# calculate power (P & Q) loss due to shunts
# self.build_Gsum()
# self.build_Bsum()
# self.build_Bsum_Gsum_trafo()
iinit_shunt_re = self.Gsum * vinit_re - self.Bsum * vinit_im
iinit_shunt_im = self.Bsum * vinit_re + self.Gsum * vinit_im
iinit_shunt_re_2 = self.Gsum * vinit_re - self.Bsum * vinit_im
iinit_shunt_im_2 = self.Bsum * vinit_re + self.Gsum * vinit_im
Ploss_shunt = (vinit_re * iinit_shunt_re + vinit_im * iinit_shunt_im) * self.Sb
Qloss_shunt = (vinit_im * iinit_shunt_re - vinit_re * iinit_shunt_im) * self.Sb
Ploss_shunt2 = (
vinit_re * iinit_shunt_re_2 + vinit_im * iinit_shunt_im_2
) * self.Sb
Qloss_shunt2 = (
vinit_im * iinit_shunt_re_2 - vinit_re * iinit_shunt_im_2
) * self.Sb
power_flow_bus_table = pd.DataFrame(
{
"Bus": pd.Series(np.array(self.buses), dtype="string"),
"V Magnitude (pu)": pd.Series(vinit_mag, dtype=float),
"V Phase (deg)": pd.Series(vinit_phase * 180 / np.pi, dtype=float),
"P Gen (MW)": pd.Series(Pinit, dtype=float),
"Q Gen (MVAr)": pd.Series(Qinit, dtype=float),
"P Shunt (MW)": pd.Series(Ploss_shunt, dtype=float),
"Q Shunt (MVAr)": pd.Series(Qloss_shunt, dtype=float),
"P Shunt Trafo (MW)": pd.Series(Ploss_shunt2, dtype=float),
"Q Shunt Trafo (MVAr)": pd.Series(Qloss_shunt2, dtype=float),
}
)
power_flow_bus_table = tabulate(power_flow_bus_table, headers="keys")
# ---- BRANCH RESULTS ----
# calculate the voltage across each branch (v_from - v_to)
self.build_incident_matrix()
# incident_matrix is (2*nb x 2*nn); dae.yinit may carry device-private
# algebraic guesses appended after the voltage block, so use the
# voltage block only.
V_branch = self.incident_matrix @ dae.yinit[: 2 * self.nn]
# find the admittance at each branch
# Note: self.y_adm_matrix is a CasADi SX here (built by build_y_sym in
# setup, with ω placeholders defaulted to 1.0 → numerically evaluable).
# SX fancy indexing Y[arr_i, arr_j] does an outer/cross slice, not
# numpy's element-wise pairing — so we evaluate to a numpy array first.
Y_num = np.array(ca.DM(self.y_adm_matrix))
y_adm_branch = np.zeros((2 * self.nb, 2 * self.nb))
y_adm_re = -Y_num[self.idx_i_re, self.idx_j_re]
y_adm_im = -Y_num[self.idx_i_im, self.idx_j_re]
for k in range(self.nb):
y_adm_branch[2 * k, 2 * k] = y_adm_re[k]
y_adm_branch[2 * k, 2 * k + 1] = -y_adm_im[k]
y_adm_branch[2 * k + 1, 2 * k] = y_adm_im[k]
y_adm_branch[2 * k + 1, 2 * k + 1] = y_adm_re[k]
# calculate the initial line currents through each branch
ilinit = y_adm_branch @ V_branch
idx_branch_re = [i for i in range(2 * self.nb) if i % 2 == 0]
idx_branch_im = [i for i in range(2 * self.nb) if i % 2 != 0]
# calculate the from_bus power injection
Pinit_ij = (
dae.yinit[self.idx_i_re] * ilinit[idx_branch_re]
+ dae.yinit[self.idx_i_im] * ilinit[idx_branch_im]
) * self.Sb
Qinit_ij = (
dae.yinit[self.idx_i_im] * ilinit[idx_branch_re]
- dae.yinit[self.idx_i_re] * ilinit[idx_branch_im]
) * self.Sb
# calculate the to_bus power injection
Pinit_ji = (
-(
dae.yinit[self.idx_j_re] * ilinit[idx_branch_re]
+ dae.yinit[self.idx_j_im] * ilinit[idx_branch_im]
)
* self.Sb
)
Qinit_ji = (
-(
dae.yinit[self.idx_j_im] * ilinit[idx_branch_re]
- dae.yinit[self.idx_j_re] * ilinit[idx_branch_im]
)
* self.Sb
)
Ploss = Pinit_ij + Pinit_ji
Qloss = Qinit_ij + Qinit_ji
power_flow_branch_table = pd.DataFrame(
{
"From Bus": [self.buses[i] for i in self.idx_i],
"To Bus": [self.buses[j] for j in self.idx_j],
"From Bus P (MW)": Pinit_ij,
"From Bus Q (MVAr)": Qinit_ij,
"To Bus P (MW)": Pinit_ji,
"To Bus Q (MVAr)": Qinit_ji,
"P Loss (MW)": Ploss,
"Q Loss (MVAr)": Qloss,
}
)
power_flow_branch_table = tabulate(power_flow_branch_table, headers="keys")
print(
"======================================================================================================="
)
print("Power Flow: Bus Results")
print(
"======================================================================================================="
)
print(power_flow_bus_table)
print(
"-------------------------------------------------------------------------------------------------------"
)
# print(f"Excess P Generation: {np.sum(Pinit)} MW")
# print(f"Excess Q Generation: {np.sum(Qinit)} MVAr")
# print(f"\nTotal P Loss from shunts: {np.sum(Ploss_shunt)} MW")
# print(f"Total Q Loss from shunts: {np.sum(Qloss_shunt)} MVAr")
print(
"======================================================================================================="
)
print("Power Flow: Branch Results")
print(
"======================================================================================================="
)
print(power_flow_branch_table)
print(
"-------------------------------------------------------------------------------------------------------"
)
# print(f"Total P Loss from line impedances: {np.sum(Ploss)} MW")
# print(f"Total Q Loss from line impedances: {np.sum(Qloss)} MVAr")
print(
"-------------------------------------------------------------------------------------------------------"
)
[docs]
def build_Gsum(self) -> None:
# finds the sum of the shunt conductances (g) at each node due to line parameters
g = self.line.g
self.Gsum = np.zeros(self.nn)
np.add.at(self.Gsum, self.idx_i, g / 2 / self.line.trafo**2)
np.add.at(self.Gsum, self.idx_j, g / 2)
# Add bus fault shunt admittances (real-valued conductance)
for bus_id, faulted in enumerate(self.bus_is_faulted):
if faulted:
self.Gsum[bus_id] += self.bus_fault_adm[bus_id]
[docs]
def build_Bsum(self) -> None:
# finds the sum of the shunt susceptances (b) at each node due to line parameters
b = self.line.b
self.Bsum = np.zeros(self.nn)
np.add.at(self.Bsum, self.idx_i, b / 2 / self.line.trafo**2)
np.add.at(self.Bsum, self.idx_j, b / 2)
[docs]
def build_incident_matrix(self) -> None:
# build incident matrix for network (+1 indicates start node; -1 indicates end node)
self.incident_matrix = np.zeros([self.nb * 2, self.nn * 2])
for k in range(self.nb):
self.incident_matrix[2 * k, self.idx_i_re[k]] = 1
self.incident_matrix[2 * k + 1, self.idx_i_im[k]] = 1
self.incident_matrix[2 * k, self.idx_j_re[k]] = -1
self.incident_matrix[2 * k + 1, self.idx_j_im[k]] = -1
[docs]
def build_linecurrents_to_nodes(self) -> None:
"""Builds a constant mapping matrix that accumulates branch current
states (xl) into nodal current balance entries (g).
For each branch k with from-bus i and to-bus j:
- Real current i_ijr contributes +1 at (vre_i) and -1 at (vre_j)
- Imag current i_iji contributes +1 at (vim_i) and -1 at (vim_j)
"""
self.C_linecurrents_to_nodes = np.zeros((2 * self.nn, 2 * self.nb))
# rows for nodes (vre/vim), columns for line currents (i_ijr/i_iji)
for k in range(self.nb):
# real current mapping
self.C_linecurrents_to_nodes[self.idx_i_re[k], 2 * k] = (
1.0 / self.line.trafo[k]
)
self.C_linecurrents_to_nodes[self.idx_j_re[k], 2 * k] = -1.0
# imag current mapping
self.C_linecurrents_to_nodes[self.idx_i_im[k], 2 * k + 1] = (
1.0 / self.line.trafo[k]
)
self.C_linecurrents_to_nodes[self.idx_j_im[k], 2 * k + 1] = -1.0
[docs]
def build_branch_current_fun(self, dae: "DaeSim") -> None:
"""Builds a CasADi function that computes terminal branch currents from (x, y)
consistent with distributed reference-frame formulation.
For each branch k with from-bus i and to-bus j:
- Takes terminal bus voltages (y) in the respective bus reference frames
- Rotates Vi and Vj into the line reference frame of branch k
- Applies the (real) tap ratio on the from-side (i)
- Computes series and shunt currents (from transformed Vi,Vj) in the line frame
- Forms terminal currents (If at i-side, It at j-side) in the line frame
- Rotates If back to the i-bus frame and It back to the j-bus frame
Stores:
self._branch_current_fun(x, y) -> I_out, with length 4·nb
ordered as [If_re, If_im, It_re, It_im] (in corresponding bus frames).
"""
# Get bus reference frame angles at terminals
if dae.has_delta_ref:
idx_i_bus = [self.buses[i] for i in self.idx_i]
idx_i_delta = [dae.idx_delta_ref_by_bus[i] for i in idx_i_bus]
delta_i = dae.x[idx_i_delta]
idx_j_bus = [self.buses[j] for j in self.idx_j]
idx_j_delta = [dae.idx_delta_ref_by_bus[j] for j in idx_j_bus]
delta_j = dae.x[idx_j_delta]
# Line reference frame angles SS
delta_line = 0.5 * (delta_i + delta_j)
# Rotation angles
theta_i = delta_i - delta_line
theta_j = delta_j - delta_line
else:
theta_i = 0.0
theta_j = 0.0
# Rotate voltages into line reference frame
Vi_re_L = (
ca.cos(theta_i) * dae.y[self.idx_i_re]
- ca.sin(theta_i) * dae.y[self.idx_i_im]
)
Vi_im_L = (
ca.sin(theta_i) * dae.y[self.idx_i_re]
+ ca.cos(theta_i) * dae.y[self.idx_i_im]
)
Vj_re_L = (
ca.cos(theta_j) * dae.y[self.idx_j_re]
- ca.sin(theta_j) * dae.y[self.idx_j_im]
)
Vj_im_L = (
ca.sin(theta_j) * dae.y[self.idx_j_re]
+ ca.cos(theta_j) * dae.y[self.idx_j_im]
)
# Apply tap ratio on i-side
Vi_re_Lt = Vi_re_L / self.line.trafo
Vi_im_Lt = Vi_im_L / self.line.trafo
# Voltage drop across series element (line frame)
dV_re = Vi_re_Lt - Vj_re_L
dV_im = Vi_im_Lt - Vj_im_L
# y_series = 1/(r + jx) = (r - jx)/(r^2 + x^2)
# denom = self.line.r * self.line.r + self.line.x * self.line.x
# y_series_re = self.line.r / denom
# y_series_im = - self.line.x / denom
# I_series = y_series * dV (line frame)
I_series_re = self.y_series_r * dV_re - self.y_series_i * dV_im
I_series_im = self.y_series_r * dV_im + self.y_series_i * dV_re
# Shunt currents (line frame)
Ish_i_re = 0.5 * self.G_dyn * Vi_re_Lt - 0.5 * self.B_i_dyn * Vi_im_Lt
Ish_i_im = 0.5 * self.G_dyn * Vi_im_Lt + 0.5 * self.B_i_dyn * Vi_re_Lt
Ish_j_re = 0.5 * self.G_dyn * Vj_re_L - 0.5 * self.B_j_dyn * Vj_im_L
Ish_j_im = 0.5 * self.G_dyn * Vj_im_L + 0.5 * self.B_j_dyn * Vj_re_L
# Terminal currents (line frame)
# Forward: from i into the branch
If_re_L = I_series_re + Ish_i_re
If_im_L = I_series_im + Ish_i_im
# Reverse: from j into the branch
It_re_L = -I_series_re + Ish_j_re
It_im_L = -I_series_im + Ish_j_im
# Inverse rotation (-theta) to bus frame (LINE to BUS)
If_re_B = ca.cos(theta_i) * If_re_L + ca.sin(theta_i) * If_im_L
If_im_B = -ca.sin(theta_i) * If_re_L + ca.cos(theta_i) * If_im_L
It_re_B = ca.cos(theta_j) * It_re_L + ca.sin(theta_j) * It_im_L
It_im_B = -ca.sin(theta_j) * It_re_L + ca.cos(theta_j) * It_im_L
I_out = ca.SX.zeros((4 * self.nb, 1))
for k in range(self.nb):
I_out[4 * k + 0] = If_re_B[k]
I_out[4 * k + 1] = If_im_B[k]
I_out[4 * k + 2] = It_re_B[k]
I_out[4 * k + 3] = It_im_B[k]
# Helper for Reference frequency substitution
W_sym = ca.vertcat(
dae.omega_ref, dae.omega_ref_buses, dae.omega_ref_lines
) # symbolic placeholder
W_expr = ca.vertcat(
dae.omega_ref_expr, dae.omega_ref_buses_expr, dae.omega_ref_lines_expr
) # function of states
if I_out is not None:
I_out = ca.substitute(I_out, W_sym, W_expr)
self.branch_current_fun = ca.Function(
"branch_current_fun",
[dae.x, dae.y],
[I_out],
)
[docs]
def setup(self, dae: DaeSim, bus_init: BusInit) -> None:
# Save original line parameters for fault restoration
self.line_r_orig = self.line.r.copy()
self.line_x_orig = self.line.x.copy()
self.line_g_orig = self.line.g.copy()
self.line_b_orig = self.line.b.copy()
self.build_y_sym()
self.build_incident_matrix()
self.build_linecurrents_to_nodes()
self.init_from_power_flow(dae, bus_init)
self.init_symbolic(dae)
[docs]
def update_effective_line_params(self) -> None:
"""Update Line object parameters to reflect current fault/open state.
Uses saved originals so clearing a fault restores the original values."""
r = self.line_r_orig.copy()
x = self.line_x_orig.copy()
g = self.line_g_orig.copy()
b = self.line_b_orig.copy()
# For opened lines, zero out shunt admittance so Bsum/Gsum exclude them.
# The series branch is already deactivated by the sl switch in Line.fcall,
# but shunts enter the voltage dynamics via Bsum and Gsum directly.
for idx, opened in enumerate(self.line_is_open):
if opened:
g[idx] = 0.0
b[idx] = 0.0
self.line.r = r
self.line.x = x
self.line.g = g
self.line.b = b
[docs]
class GridEst(Grid):
def __init__(self) -> None:
Grid.__init__(self)
self.y_simulation = (
[]
) # Store voltage results from the simulation in their full time resolution
[docs]
def _init_from_simulation(self, other: GridSim, dae: Dae) -> None:
for node in self.buses:
self.yinit[str(node)] = other.yf[str(node)][:, round(dae.T_start / dae.t)]
dae.yinit = np.array(list(self.yinit.values())).reshape(self.nn * 2)
dae.iinit = self.y_adm_matrix @ dae.yinit
# Extend yinit with device-private algebraic init guesses (appended after
# the 2*nn voltage block). iinit above is computed from the voltage block
# only. Device.init_from_simulation then overwrites the private entries
# with the simulated values at T_start.
if dae.n_priv:
dae.yinit = np.concatenate(
[dae.yinit, np.array(dae.yinit_priv, dtype=float)]
)
[docs]
def _get_results(self, other: GridSim) -> None:
y_simulation_list = []
for bus in self.buses:
y_simulation_list.append(other.yf[bus])
self.y_simulation = np.vstack(y_simulation_list)
[docs]
def setup(self, dae: DaeEst, other: GridSim) -> None:
self.build_y()
self._init_from_simulation(other, dae)
self._get_results(other)
self.init_symbolic(dae)
[docs]
def init_symbolic(self, dae: DaeEst) -> None:
super().init_symbolic(dae)
# Prepare measurement matrices/vectors dimensions such that real measurements can be added below
dae.y = ca.SX.sym("y", dae.ny)
dae.cy_meas_alg_matrix = np.empty((0, dae.ny))
[docs]
class Dae:
def __init__(self) -> None:
# Counters
self.nx: int = 0 # Number of differential states
self.ny: int = 0 # Number of differential/algebraic states for voltages
self.nv: int = 0 # Size of the bus-voltage block (2*nn); ny = nv + n_priv
self.ng: int = 0 # Number of algebraic equations
self.nl: int = 0 # Number of differential states for line currents
self.np: int = 0 # Number of parameters/inputs (not used)
self.nts: int = 0 # Number of time steps
self.n_priv: int = 0 # Number of device-private algebraic variables
# Device-private algebraic bookkeeping (populated during device.xy_index).
# Privates occupy y/g indices [2*nn, ny). See
# docs/algebraic_equations_design.md.
self.algebs_int_names: list[str] = [] # human-readable name per private
self.yinit_priv: list[float] = [] # init guess per private (for dae.yinit)
# Symbolic variables
self.x: Optional[ca.SX] = None # Symbolic differential states
self.y: Optional[ca.SX] = (
None # Symbolic states voltages (differential or algebraic)
)
self.f: Optional[ca.SX] = None # Symbolic first derivatives
self.g: Optional[ca.SX] = (
None # Symbolic equations current balance (algebraic) or capacitor current (differential)
)
self.fnode: Optional[ca.SX] = None # Symbolic nodal voltage time derivatives
self.xl: Optional[ca.SX] = None # Symbolic differential states of line currents
self.fl: Optional[ca.SX] = (
None # Symbolic vector for derivatives of line currents
)
self.p: Optional[ca.SX] = None # Will be used for parameters/inputs
self.p0: Optional[ca.SX] = None # Will be used for parameters/inputs
self.s: Optional[ca.SX] = None # Switches
# Simulation/estimation outputs
self.x_full: Optional[np.ndarray] = None # Differential states output
self.y_full: Optional[np.ndarray] = None # Algebraic states output
self.i_full: Optional[np.ndarray] = None # Branch currents output
# Simulation/estimation parameters
self.T_start: float = 0.0
self.T_end: float = 10.0
self.time_steps: Optional[np.ndarray] = None # Time steps of the est/sim
self.states: list[str] = [] # List of all states used in the model
self.Sb: float = 100
self.fn: Literal[50, 60] = 50
self.t: float = 0.02
# Reference frame parameters
self.omega_net: float = 1.0 # System synchronous frequency in pu; fallback
self.omega_coi: Optional[ca.SX] = ca.SX(1.0) # COI frequency (States)
self.omega_ref: Optional[ca.SX] = None # Scalar reference frequency (Symbolic)
self.omega_ref_buses: Optional[ca.SX] = (
None # Reference frequencies at buses (Symbolic)
)
self.omega_ref_lines: Optional[ca.SX] = (
None # Reference frequencies at lines (Symbolic)
)
self.omega_ref_expr: Optional[ca.SX] = (
None # Scalar reference frequency (States)
)
self.omega_ref_buses_expr: Optional[ca.SX] = (
None # Reference frequencies at buses (States)
)
self.omega_ref_lines_expr: Optional[ca.SX] = (
None # Reference frequencies at lines (States)
)
self.omega_mode: Literal["coi", "single", "nom", "dist"] = (
config.omega_mode
) # Chosen reference frame
self.has_delta_ref: bool = False # Set True only when omega_mode == "dist"
self.omega_single_idx: Optional[str] = getattr(
config, "omega_single_idx", None
) # index to chosen device for single mode
# TODO: eliminate equation (for single implementation)
# Initial values
self.xinit: list = []
self.yinit: list = []
self.iinit: list = []
self.xlinit: list = []
self.sinit = np.ndarray([], dtype=float)
self.xmin: list = [] # minimal state limiter values
self.xmax: list = [] # maximal state limiter values
# Store the grid as an attribute of the class
self.grid: Optional[Grid] = None
# Per-instance device list + bus init. Subclasses assign these to the
# appropriate sim or est module globals so that reference-frame methods
# (update_omega, compute_coi_expr, set_omega_single_idx_from_slack)
# operate on the right set of devices without reaching for sim globals.
self.device_list: list = []
self.bus_init: Optional[BusInit] = None
self.incl_lim: bool # Include state limiters or not
self.FG: Optional[ca.Function] = None # Casadi function for the DAE model
[docs]
def __reduce__(self):
# Filter the attributes based on their types
picklable_attrs = {}
for key, value in self.__dict__.items():
if isinstance(value, (float, int, list, np.ndarray)):
# Only serialize float, int, list, or numpy arrays
picklable_attrs[key] = value
# Return the class constructor and the picklable attributes as a tuple
return (self.__class__, (), picklable_attrs)
[docs]
def __setstate__(self, state):
# Restore the state from the pickled attributes
self.__dict__.update(state)
[docs]
def setup(self, **kwargs) -> None:
# Overwrite default values
self.__dict__.update(kwargs)
# Number of time steps
self.nts = round((self.T_end - self.T_start) / self.t)
self.time_steps = np.arange(self.T_start, self.T_end, self.t)
self.set_omega_single_idx_from_slack()
# Initialize reference frame angle states
self.init_reference_frame_angle_states()
# Initialize symbolic reference frequency formulations.
# Sizes must come from self.grid so that DaeEst (which may use a grid
# smaller than grid_sim, e.g. with unknown injectors) gets dimensions
# matching its own grid. update_omega() builds the corresponding
# *_expr vectors using self.grid.nn / self.grid.nb; the two must align
# for the ca.substitute call in fgcall() to succeed.
if self.grid is None:
raise RuntimeError(
"Dae.setup() requires self.grid to be set before being called. "
"Assign dae.grid = <appropriate Grid instance> first."
)
self.omega_ref = ca.SX.sym("omega_ref") # Scalar reference frequency
self.omega_ref_buses = ca.SX.sym(
"omega_ref_b", self.grid.nn
) # Reference frequencies at buses
self.omega_ref_lines = ca.SX.sym(
"omega_ref_l", self.grid.nb
) # Reference frequencies at lines
self.init_symbolic()
# State initialization and setting of limits
self.xinit = np.array(self.xinit)
self.xlinit = np.array(self.xlinit)
self.xmin = np.array(self.xmin)
self.xmax = np.array(self.xmax)
[docs]
def fgcall(self) -> None:
pass
[docs]
def init_symbolic(self) -> None:
pass
[docs]
def compute_coi_expr(self, device_list=None) -> None:
"""Compute the symbolic Centre-of-Inertia expression from devices.
``device_list`` defaults to ``self.device_list`` so that DaeEst gets
its own device set rather than reaching for the sim module global.
The parameter is preserved as an override for explicit callers.
"""
if device_list is None:
device_list = self.device_list
Mtot = ca.SX(0)
num = ca.SX(0)
for dev in device_list:
for k in range(getattr(dev, "n", 0)):
# Synchronous machines
if "synchronous" in (getattr(dev, "_type", "") or "").lower():
if (
not hasattr(dev, "omega")
or not hasattr(dev, "H")
or not hasattr(dev, "Sn")
):
continue
idx_omega = int(dev.omega[k])
omega_k = self.x[idx_omega]
Sn_k = float(dev.Sn[k])
H_eq = float(dev.H[k])
if H_eq <= 0.0 or Sn_k <= 0.0:
continue
# Grid-forming inverter
elif "gridforming" in dev.__class__.__name__.lower():
if not (
hasattr(dev, "Kp")
and hasattr(dev, "omega_f")
and hasattr(dev, "Sn")
):
continue
if dev.omega_c is None:
omega_k = self.omega_net
else:
omega_k = dev.omega_c[k]
Kp_k = dev.Kp[k]
omega_f_k = dev.omega_f[k]
Sn_k = dev.Sn[k]
if Kp_k <= 0.0 or omega_f_k <= 0.0 or Sn_k <= 0.0:
continue
# Equivalent virtual inertia
H_eq = 1.0 / (2.0 * Kp_k * omega_f_k)
else:
continue
num += H_eq * (Sn_k / self.Sb) * omega_k
Mtot += H_eq * (Sn_k / self.Sb)
self.omega_coi = (num / Mtot) if Mtot > 0.0 else ca.SX(self.omega_net)
[docs]
def update_omega(self) -> None:
"""
Update system reference frequency to frequency of selected mode.
- If omega_mode == 'coi': compute inertia-weighted COI of synchronous machines.
- If omega_mode == 'single' get frequency of chosen synchronous machine or inverter.
- If omega_mode == 'dist' get frequencies at all buses from frequency divider.
- Otherwise: set ω_ref = ω_net (fallback behavior).
"""
# Default (fallback): nominal synchronous frequency
omega_ref = ca.SX(self.omega_net)
omega_ref_buses = omega_ref * ca.SX.ones(self.grid.nn, 1)
omega_ref_lines = omega_ref * ca.SX.ones(self.grid.nb, 1)
mode = getattr(self, "omega_mode", "nom")
if mode not in ("coi", "single", "dist"):
# Keep fallback expressions
pass
# Calculate Centre of Inertia (COI) reference frequency
elif mode == "coi":
self.compute_coi_expr(self.device_list)
# Set Centre of Inertia (COI) reference frequency
omega_ref = self.omega_coi
omega_ref_buses = omega_ref * ca.SX.ones(self.grid.nn, 1)
omega_ref_lines = omega_ref * ca.SX.ones(self.grid.nb, 1)
# print("Expression of omega COI", self.omega_ref)
# Calculate single machine or inverter reference frequency
elif mode == "single":
if getattr(self, "omega_single_idx", None) is None:
omega_ref = ca.SX(self.omega_net)
omega_ref_buses = omega_ref * ca.SX.ones(self.grid.nn, 1)
omega_ref_lines = omega_ref * ca.SX.ones(self.grid.nb, 1)
return
for dev in self.device_list:
k = getattr(dev, "int", {}).get(self.omega_single_idx)
omega_k = None
if k is None:
continue
# Set Synchronous machine reference frequency
if hasattr(dev, "omega"):
if dev.omega is None:
omega_k = self.omega_net
idx_omega = int(getattr(dev, "omega")[k])
omega_k = self.x[idx_omega]
# print(f"Single reference frame of {self.omega_single_idx}, from {dev._name}[{k}] idx={idx_omega} ω={omega_k}")
break
# Set GFL inverter reference frequency
elif hasattr(dev, "omega_pll"):
if dev.omega_pll is None:
omega_k = self.omega_net
else:
omega_k = dev.omega_pll[k]
# print(f"Single reference frame of {self.omega_single_idx}, PLL from {dev._name}[{k}] ω={omega_k}")
break
# Set GFM inverter reference frequency
elif hasattr(dev, "omega_c"):
if dev.omega_c is None:
omega_k = ca.SX(self.omega_net)
else:
omega_k = dev.omega_c[k]
# print(f"Single reference frame of {self.omega_single_idx}, from {dev._name}[{k}] ω={omega_k}")
break
# Fallback
if omega_k is None:
omega_ref = ca.SX(self.omega_net)
# Set single reference frequency of chosen SM, GFM- or GFL inverter
else:
omega_ref = omega_k
# Create vector formulation of reference frequency
# Buses and lines have same reference frequency for nom, coi and single mode
omega_ref_buses = omega_ref * ca.SX.ones(self.grid.nn, 1)
omega_ref_lines = omega_ref * ca.SX.ones(self.grid.nb, 1)
elif mode == "dist":
grid = getattr(self, "grid", None)
omega_G_list, gen_bus_idx, gen_Xs_list = [], [], []
omega_M_list, gfm_bus_idx, gfm_Bm_list = [], [], []
omega_L_list, gfl_bus_idx, gfl_Bl_list = [], [], []
for dev in self.device_list:
dev_type = (getattr(dev, "_type", "") or "").lower()
dev_name = (getattr(dev, "_name", "") or "").lower()
if "synchronous" in dev_type:
for k in range(getattr(dev, "n", 0)):
if not hasattr(dev, "omega"):
continue
idx_omega = int(getattr(dev, "omega")[k])
omega_k = self.x[idx_omega]
omega_G_list.append(omega_k) # generator Frequencies
gen_bus_idx.append(grid.idx_bus[dev.bus[k]])
gen_Xs_list.append(
float(dev.x_d[k])
) # internal series reactance
# Get GFL inverter parameters
elif "gridfollowing" in dev_name:
for k in range(getattr(dev, "n", 0)):
omega_k = dev.omega_pll[k]
omega_L_list.append(omega_k)
gfl_bus_idx.append(grid.idx_bus[dev.bus[k]])
omega_b = 2 * np.pi * self.fn
Bl_k = dev.Kp[k] * dev.omega_f[k] / omega_b
gfl_Bl_list.append(Bl_k) # 0.002
# Get GFM inverter parameters
elif "gridforming" in dev_name:
for k in range(getattr(dev, "n", 0)):
omega_k = dev.omega_c[k]
omega_M_list.append(omega_k)
gfm_bus_idx.append(grid.idx_bus[dev.bus[k]])
gfm_Bm_list.append(dev.Lt[k]) # 0.02
mG, mM, mL = len(omega_G_list), len(omega_M_list), len(omega_L_list)
mT = mG + mM + mL
if mT == 0:
omega_ref = ca.SX(self.omega_net)
omega_ref_buses = omega_ref * ca.SX.ones(self.grid.nn, 1)
omega_ref_lines = omega_ref * ca.SX.ones(self.grid.nb, 1)
else:
omega_tilde = ca.vertcat(*(omega_G_list + omega_M_list + omega_L_list))
# Susceptance matrix for the buses
Y = self.grid.y_adm_matrix
B_BB = Y[1::2, 0::2]
# Internal series susceptances at buses (diagonal matrix)
B_tilde_BS = np.zeros((grid.nn, grid.nn))
# Generator buses
b_s = 1.0 / np.asarray(gen_Xs_list, dtype=float)
np.add.at(B_tilde_BS, (gen_bus_idx, gen_bus_idx), b_s)
# GFM buses
b_m = np.asarray(gfm_Bm_list, dtype=float) if mM > 0 else np.zeros(0)
np.add.at(B_tilde_BS, (gfm_bus_idx, gfm_bus_idx), b_m)
# GFL buses
b_l = np.asarray(gfl_Bl_list, dtype=float) if mL > 0 else np.zeros(0)
np.add.at(B_tilde_BS, (gfl_bus_idx, gfl_bus_idx), b_l)
# Susceptance matrix using stator and step-up transformer impedances
# Extended to include GFM and GFL
B_tilde_BG = ca.SX.zeros(grid.nn, mT)
# Generator buses
for k, b_i in enumerate(gen_bus_idx):
B_tilde_BG[b_i, k] += -b_s[k]
# GFM buses
for k, b_i in enumerate(gfm_bus_idx):
B_tilde_BG[b_i, mG + k] += -b_m[k]
# GFL buses
for k, b_i in enumerate(gfl_bus_idx):
B_tilde_BG[b_i, k + mG + mM] += -b_l[k]
# Distributed frequencies at buses
D = -ca.solve(B_BB + B_tilde_BS, B_tilde_BG)
# Calculate COI expression
self.compute_coi_expr(self.device_list)
# omega_B = D @ (omega_tilde - ca.SX.ones(mT, 1)) + ca.SX.ones(grid.nn, 1)
# Extended COI-FDF centred to COI frequency
omega_B = D @ (
omega_tilde - self.omega_coi * ca.SX.ones(mT, 1)
) + self.omega_coi * ca.SX.ones(grid.nn, 1)
# Set scalar reference frequency to be mean value
omega_ref = ca.sum1(omega_B) / omega_B.shape[0] # Mean value
# Set bus reference frequencies to be estimate obtained by FDF
omega_ref_buses = omega_B
# Obtain line frequency: average of frequencies at terminal buses
omega_ref_lines = [None] * grid.nb
for (fb, tb), k in grid.idx_branch.items():
i = grid.idx_bus[fb]
j = grid.idx_bus[tb]
omega_ref_lines[k] = 0.5 * (omega_B[i] + omega_B[j])
# Set line frequency to average of estimated FDF frequencies at terminal buses
omega_ref_lines = ca.vertcat(*omega_ref_lines)
# Store reference frequencies as function of states
self.omega_ref_expr = omega_ref
self.omega_ref_buses_expr = omega_ref_buses
self.omega_ref_lines_expr = omega_ref_lines
[docs]
def set_omega_single_idx_from_slack(self) -> None:
"""
If omega_mode == 'single' and omega_single_idx is None,
choose the device connected to the slack bus as reference.
Uses ``self.bus_init`` and ``self.device_list`` so that DaeEst
resolves against its own device set. If the est-side bus_init has no
entries (typical: est_param.txt does not declare BusInit), falls back
to the sim-side ``bus_init_sim`` for the slack lookup — the slack bus
is a property of the physical grid and is the same for sim and est.
"""
if self.omega_mode != "single" or self.omega_single_idx is not None:
return
bus_init_for_slack = self.bus_init
# Fallback for est: if no BusInit declared on est side, use sim's.
if bus_init_for_slack is None or not getattr(bus_init_for_slack, "bus", []):
bus_init_for_slack = bus_init_sim
slack_buses = [
b
for b, t in zip(bus_init_for_slack.bus, bus_init_for_slack.type)
if t is not None and str(t).lower() == "slack"
]
if not slack_buses:
raise ValueError(
"Cannot infer omega_single_idx: no slack bus declared in "
"BusInit on either the calling Dae's side or the simulation."
)
slack_bus = slack_buses[0] # In case of multiple, pick first
for dev in self.device_list:
if not hasattr(dev, "bus"):
continue
for local_pos, bus in enumerate(dev.bus):
if bus == slack_bus:
for idx_name, pos in dev.int.items():
if pos == local_pos:
self.omega_single_idx = idx_name
return
raise ValueError(
f"No generator or converter found at slack bus {slack_bus} "
f"to use as omega_single_idx."
)
[docs]
def init_reference_frame_angle_states(self) -> None:
"""Initialize reference-frame angle states (δ_ref) for all network buses.
- This function allocates one differential state per bus representing the
local reference-frame angle δ_ref at that bus.
- These angles are only needed for the distributed frequency reference
framework (omega_mode == "dist"), specifically for transformations
between the multiple local reference frames.
"""
if self.omega_mode != "dist":
self.has_delta_ref = False
return
if hasattr(self, "idx_delta_ref"):
return
# Use the buses known to the grid this Dae operates on (set before
# Dae.setup runs). For the sim path that's the sim grid; for est it's
# grid_est. Fallback to bus_init_sim for safety if grid not yet bound.
if self.grid is not None and getattr(self.grid, "buses", []):
buses: Sequence[str] = list(self.grid.buses)
else:
buses = getattr(bus_init_sim, "bus", [])
nbus = len(buses)
if nbus > 0:
start = self.nx
stop = self.nx + nbus
self.idx_delta_ref = np.arange(start, stop, dtype=int)
# Indexing is important as buses from bus_init_sim may have different order than grid.buses
self.idx_delta_ref_by_bus = {
bus: int(idx) for bus, idx in zip(buses, self.idx_delta_ref)
}
for bus in buses:
self.states.append(f"Reference_frame_model_at_{bus}_delta_ref")
self.xinit.append(0.0)
self.xmin.append(-1e9)
self.xmax.append(1e9)
self.nx += nbus
self.has_delta_ref = True
# Load device class names recognised by the ZIP-aware LOAD disturbance
# handler. Each entry maps the device's `_name` to a function that returns
# the (z_share, i_share, p_share) tuple for the k-th entry of that device.
_LOAD_DEVICE_SHARES = {
"Static_load_ZIP":
lambda dev, k: (float(dev.z_share[k]), float(dev.i_share[k]), float(dev.p_share[k])),
"Static_load_impedance":
lambda dev, k: (1.0, 0.0, 0.0),
"Static_load_power":
lambda dev, k: (0.0, 0.0, 1.0),
}
[docs]
def _find_bus_load(self, bus_name: str):
"""Return (device, k) for the load attached to *bus_name*, or raise.
Iterates `self.device_list` and matches against any of the registered
load device classes. The first matching (device, local-index) pair is
returned. Single-load-per-bus is enforced upstream by the framework
(one device per bus per type), so the first match is unambiguous.
"""
for dev in self.device_list:
if getattr(dev, "_name", None) in self._LOAD_DEVICE_SHARES:
if bus_name in dev.bus:
return dev, dev.bus.index(bus_name)
raise ValueError(
f"LOAD disturbance at bus '{bus_name}': no load device "
f"(StaticZIP / StaticLoadImpedance / StaticLoadPower) declared "
f"at this bus. Declare one in sim_param.txt before applying a "
f"LOAD disturbance here."
)
[docs]
def dist_load(self, p: float, q: float, bus: list) -> None:
"""ZIP-aware LOAD disturbance.
Increments the rated demand at the load attached to ``bus`` by
(p, q) MW/MVAr. The increment is apportioned across the device's
configured Z, I, P shares so the load preserves its
voltage-dependence shape:
* Z share contributes a linear admittance step (no 1/|V|² term),
* I share contributes a rotated constant-current step,
* P share contributes a constant-power step (the legacy
behaviour of this function).
For ``StaticLoadImpedance`` (z=1) and ``StaticLoadPower`` (p=1)
the shares are inferred. With ``line_dyn=True`` the contributions
feed ``dae.fnode`` via the same ω_b / B_sum capacitor coupling as
before; with ``line_dyn=False`` they feed ``dae.g`` directly.
"""
target_bus = bus[0]
load_dev, k = self._find_bus_load(target_bus)
# P-side shares from the dispatch table (existing behaviour).
a_z_P, a_i_P, a_p_P = self._LOAD_DEVICE_SHARES[load_dev._name](load_dev, k)
# Q-side shares: StaticZIP has the optional q_share() resolver,
# which falls back to the P-side when no Q-specific value was
# declared. Legacy load classes (StaticLoadImpedance,
# StaticLoadPower) have no Q-share concept; their Q-side equals
# their P-side by definition.
if hasattr(load_dev, "q_share"):
a_z_Q = load_dev.q_share("z", k)
a_i_Q = load_dev.q_share("i", k)
a_p_Q = load_dev.q_share("p", k)
else:
a_z_Q, a_i_Q, a_p_Q = a_z_P, a_i_P, a_p_P
idx_re = int(load_dev.vre[k])
idx_im = int(load_dev.vim[k])
idx_node = self.grid.get_node_index([target_bus])[0][0]
# V_nom = |V_init| at this bus. Matches the convention used by
# StaticZIP.finit_sub to calibrate the rated device constants.
v_re_init = float(self.yinit[idx_re])
v_im_init = float(self.yinit[idx_im])
V_nom_sq = v_re_init ** 2 + v_im_init ** 2
V_nom = V_nom_sq ** 0.5
dp_pu = p / self.Sb
dq_pu = q / self.Sb
vre = self.y[idx_re]
vim = self.y[idx_im]
# --- Z share: linear admittance step ----------------------------
# ΔG from the P-side share, ΔB from the Q-side share, independently.
dG = a_z_P * dp_pu / V_nom_sq
dB = -a_z_Q * dq_pu / V_nom_sq
ire_z = dG * vre - dB * vim
iim_z = dB * vre + dG * vim
# --- I share: constant current in the bus frame -----------------
theta = ca.atan2(vim, vre)
dId = a_i_P * dp_pu / V_nom
dIq = a_i_Q * dq_pu / V_nom
ire_i = ca.cos(theta) * dId + ca.sin(theta) * dIq
iim_i = ca.sin(theta) * dId - ca.cos(theta) * dIq
# --- P share: constant power (legacy behaviour) -----------------
dP = a_p_P * dp_pu
dQ = a_p_Q * dq_pu
v_sq = vre ** 2 + vim ** 2
ire_p = (dP * vre + dQ * vim) / v_sq
iim_p = (dP * vim - dQ * vre) / v_sq
ire_total = ire_z + ire_i + ire_p
iim_total = iim_z + iim_i + iim_p
if self.line_dyn:
omega_b = 2 * np.pi * self.fn
B_node = self.grid.Bsum[idx_node]
# Same sign as the original constant-P implementation: the load
# current is a sink, so it feeds dae.fnode with a minus.
self.fnode[idx_re] -= omega_b / B_node * ire_total
self.fnode[idx_im] -= omega_b / B_node * iim_total
else:
self.g[idx_re] += ire_total
self.g[idx_im] += iim_total
self.fgcall()
[docs]
def check_disturbance(self, dist: Disturbance, iter_forward: int) -> bool:
params_changed = False
active = dist.time <= iter_forward * self.t
if active.any():
for idx, state in enumerate(active):
if state:
match dist.type[0]:
case "FAULT_LINE":
if self.line_dyn:
logging.error(
"FAULT_LINE is not supported with line_dyn=True. Skipping."
)
for key in dist._params:
dist.__dict__[key] = np.array(
dist.__dict__[key][1:]
)
break
short_index = self.grid.get_branch_index(
dist.bus_i[0], dist.bus_j[0]
)[0][0]
self.grid.line_is_faulted[short_index] = True
self.grid.line_fault_adm[short_index] += dist.y[0]
self.exec_dist()
params_changed = True
case "CLEAR_FAULT_LINE":
if self.line_dyn:
logging.error(
"CLEAR_FAULT_LINE is not supported with line_dyn=True. Skipping."
)
for key in dist._params:
dist.__dict__[key] = np.array(
dist.__dict__[key][1:]
)
break
short_index = self.grid.get_branch_index(
dist.bus_i[0], dist.bus_j[0]
)[0][0]
self.grid.line_is_faulted[short_index] = False
self.exec_dist()
params_changed = True
case "OPEN_LINE":
short_index = self.grid.get_branch_index(
dist.bus_i[0], dist.bus_j[0]
)[0][0]
self.grid.line_is_open[short_index] = True
self.exec_dist()
params_changed = True
case "LOAD":
if not dae_sim.line_dyn:
# NOTE: need Y(w_ref) to be built with dependency on omega
# and integrated into equations
self.grid.guncall(self)
self.grid.build_y_sym(
omega_buses=self.omega_ref_buses,
omega_lines=self.omega_ref_lines,
dyn_update=True,
)
self.grid.gcall(self)
self.dist_load(
p=dist.p_delta[0],
q=dist.q_delta[0],
bus=[dist.bus[0]],
)
case "FAULT_BUS":
short_index = self.grid.get_node_index([dist.bus[0]])[0][0]
self.grid.bus_is_faulted[short_index] = True
self.grid.bus_fault_adm[short_index] += dist.y[0]
self.exec_dist()
params_changed = True
case "CLEAR_FAULT_BUS":
short_index = self.grid.get_node_index([dist.bus[0]])[0][0]
self.grid.bus_is_faulted[short_index] = False
self.exec_dist()
params_changed = True
case _:
logging.ERROR(
f"Disturbance type {dist.type[0]} not found - skipped."
)
for key, value in dist._params.items():
dist.__dict__[key] = np.array(dist.__dict__[key][1:])
break
logging.info(
f"Disturbance {dist.type[0]} at {dist.time[0]} successfully executed!"
)
# Remove the executed disturbance
for key, value in dist._params.items():
dist.__dict__[key] = np.array(dist.__dict__[key][1:])
return params_changed
[docs]
def exec_dist(self):
if self.line_dyn:
# Update slinit for opened lines
for idx, opened in enumerate(self.grid.line_is_open):
self.slinit[2 * idx] = 0.0 if opened else 1.0
self.slinit[2 * idx + 1] = 0.0 if opened else 1.0
# Restore line params from originals (clears any stale modifications)
self.grid.update_effective_line_params()
# Rebuild derived quantities (Gsum now includes bus fault admittances)
self.grid.build_y_sym(
omega_buses=self.omega_ref_buses,
omega_lines=self.omega_ref_lines,
dyn_update=True,
)
self.grid.build_Gsum()
self.grid.build_Bsum()
# Full symbolic rebuild — reset symbolic expressions
saved_sinit = self.sinit.copy()
saved_slinit = self.slinit.copy()
self.init_symbolic()
self.sinit = saved_sinit
self.grid.init_symbolic(self)
self.slinit = saved_slinit
# Rebuild all device equations (use self.device_list so that
# both sim and est paths iterate the right set; in practice
# disturbance execution is sim-only today, but keeping this on
# self.device_list avoids reaching for sim globals.)
for dev in self.device_list:
if dev.properties["fgcall"]:
dev.fgcall(self)
# Rebuild grid equations (Line.fcall + voltage dynamics)
self.grid.gcall(self, line=self.grid.line)
# Refresh ω_ref expressions against the freshly rebuilt self.x,
# device omega symbols, and post-fault y_adm_matrix. Without this,
# fgcall() would substitute placeholders with expressions still
# referencing the pre-rebuild SX symbols.
self.update_omega()
# Rebuild CasADi integrator
self.fgcall()
else:
self.grid.guncall(self)
# Build a plain (no-ω-substitution) Y so dist-mode's FDF D matrix
# can be computed against the post-fault topology without picking
# up the symbolic ω_ref placeholders that the next build bakes in.
self.grid.build_y_sym()
self.update_omega()
self.grid.build_y_sym(
omega_buses=self.omega_ref_buses,
omega_lines=self.omega_ref_lines,
dyn_update=True,
)
self.grid.gcall(self)
# Re-numerify y_adm_matrix and C_branches so measurement device
# call()s (BranchCurrentPMU, BusCurrentPMU) can slice C_branches
# rows into the numpy c_meas_matrix during the next fgcall().
self.grid.build_y()
self.fgcall()
[docs]
def debug_check_initialization(self) -> None:
if self.line_dyn:
f = ca.Function(
"f",
[self.x, self.y, self.xl, self.s, self.sl],
[self.f, self.fnode, self.fl],
)
init_test = f(
i0=self.xinit,
i1=self.yinit,
i2=self.xlinit,
i3=self.sinit,
i4=self.slinit,
)
else:
f = ca.Function("f", [self.x, self.y, self.s], [self.f, self.g])
init_test = f(i0=self.xinit, i1=self.yinit, i2=self.sinit)
print("Results of initialization:")
for o in init_test.keys():
non_zero_indices = [
i
for i in range(init_test[o].size()[0])
if abs(float(init_test[o][i])) > 1e-6
]
non_zero_values = [float(init_test[o][i]) for i in non_zero_indices]
if self.line_dyn:
if o == "o0":
print(
f"Device dynamic equations non zero indices: {non_zero_indices}"
)
print(
f"Device dynamic equations non zero values: {non_zero_values}"
)
elif o == "o1":
print(
f"Nodal voltage dynamic equation non zero indices: {non_zero_indices}"
)
print(
f"Nodal voltage dynamic equation non zero values: {non_zero_values}"
)
elif o == "o2":
print(
f"Line current dynamic equation non zero indices: {non_zero_indices}"
)
print(
f"Line current dynamic equation non zero values: {non_zero_values}"
)
else:
if o == "o0":
print(f"Differential states non zero indices: {non_zero_indices}")
print(f"Differential states non zero values: {non_zero_values}")
elif o == "o1":
print(f"Algebraic states non zero indices: {non_zero_indices}")
print(f"Algebraic states non zero values: {non_zero_values}")
[docs]
def check_initialization(self) -> None:
W_sym = ca.vertcat(self.omega_ref, self.omega_ref_buses, self.omega_ref_lines)
W_one = ca.SX.ones(1 + self.grid.nn + self.grid.nb, 1)
if self.line_dyn:
# Initialize all symbolic reference frequencies to nominal 1.0 p.u.
f_init = ca.substitute(self.f, W_sym, W_one)
fnode_init = ca.substitute(self.fnode, W_sym, W_one)
fl_init = ca.substitute(self.fl, W_sym, W_one)
f = ca.Function(
"f",
[self.x, self.y, self.xl, self.s, self.sl],
[f_init, fnode_init, fl_init],
)
init_test = f(
i0=self.xinit,
i1=self.yinit,
i2=self.xlinit,
i3=self.sinit,
i4=self.slinit,
)
else:
# Initialize all symbolic reference frequencies to nominal 1.0 p.u.
f_init = ca.substitute(self.f, W_sym, W_one)
g_init = ca.substitute(self.g, W_sym, W_one)
f = ca.Function("f", [self.x, self.y, self.s], [f_init, g_init])
# NOTE: fixed here, was init_test = f(i0=self.xinit, i1=self.yinit, i3=self.sinit)
init_test = f(
i0=self.xinit, i1=self.yinit, i2=self.sinit
) # TODO: check i2 and i3 what it should be here
tol = 1e-6
non_zero_indices_final = []
for o in init_test.keys():
non_zero_indices = [
i
for i in range(init_test[o].size()[0])
if abs(float(init_test[o][i])) > tol
]
non_zero_indices_final += non_zero_indices
if non_zero_indices_final:
logging.critical(
"Initialization check failed. Non-zero entries found in the dynamic equations at initialization. Dynamic Simulation will either start with a transient or fail to simulate."
)
else:
logging.info(
"Initialization check passed. All dynamic equations are zero at initialization."
)
[docs]
class DaeSim(Dae):
def __init__(self) -> None:
super().__init__()
self.int_scheme_sim = None
self.int_scheme_sim_options: dict = {}
self.eigenvalues = None
self.participation_factors = None
self.participation_factors_normalized = None
self.state_names: list = []
self.modes: Optional[list] = None
# Gated by config; when True the small-signal eigenvalue analysis is run
# and its figures shown at the operating point before the simulation.
self.small_signal_analysis: bool = False
self.t0: float # Start of the next DAE call
self.tf: float # End of the next DAE call
self.line_dyn: bool # Simulate with dynamic lines or not
[docs]
def init_symbolic(self) -> None:
self.x = ca.SX.sym("x", self.nx)
self.f = ca.SX.sym("f", self.nx)
self.s = ca.SX.sym("s", self.nx)
self.p = ca.SX.sym("p", self.np)
self.p0 = np.ones(self.np)
self.sinit = np.ones(self.nx) # Initial values for limiters (nothing limited)
[docs]
def fgcall(self) -> None:
# A model has algebraic constraints whenever the network voltages are
# solved algebraically (line_dyn=False) OR a device declares private
# algebraic variables (n_priv > 0). Either case requires a DAE solver,
# regardless of line_dyn. ODE-only schemes (cvodes/rk) are valid only
# when there are no algebraic constraints at all.
has_alg = (not self.line_dyn) or (self.n_priv > 0)
if self.int_scheme_sim in ("cvodes", "rk") and has_alg:
raise ValueError(
f"Integration scheme '{self.int_scheme_sim}' only supports ODE systems, "
f"but the model is a DAE ("
+ (
"network voltages are algebraic"
if not self.line_dyn
else f"{self.n_priv} device-private algebraic constraint(s)"
)
+ "). Use 'idas'/'collocation', or remove the algebraic constraints."
)
# Helper for Reference frequency substitution
W_sym = ca.vertcat(
self.omega_ref, self.omega_ref_buses, self.omega_ref_lines
) # symbolic placeholder
W_expr = ca.vertcat(
self.omega_ref_expr, self.omega_ref_buses_expr, self.omega_ref_lines_expr
) # function of states
# Substitute reference frequency expression in all equations
if self.f is not None:
f = ca.substitute(self.f, W_sym, W_expr)
if self.g is not None:
g = ca.substitute(self.g, W_sym, W_expr)
if self.fnode is not None:
fnode = ca.substitute(self.fnode, W_sym, W_expr)
if self.fl is not None:
fl = ca.substitute(self.fl, W_sym, W_expr)
if self.line_dyn:
if self.n_priv > 0:
# Mixed DAE: the network (voltages + line currents) is
# differential, but device-private algebraics remain algebraic.
# Voltages occupy y[:nv]; privates occupy y[nv:] with their
# defining equations in g[nv:].
y_volt = self.y[: self.nv]
y_priv = self.y[self.nv :]
g_priv = g[self.nv :]
dae_dict = {
"x": ca.vertcat(self.x, y_volt, self.xl),
"z": y_priv,
"p": ca.vertcat(self.s, self.sl),
"ode": ca.vertcat(f, fnode, fl),
"alg": g_priv,
}
else:
dae_dict = {
"x": ca.vertcat(self.x, self.y, self.xl),
"p": ca.vertcat(self.s, self.sl),
"ode": ca.vertcat(f, fnode, fl),
}
else:
dae_dict = {
"x": self.x,
"z": self.y,
"p": self.s,
"ode": f,
"alg": g,
}
# JIT-compiling the integrator (int_scheme_sim_options["jit"]) shells out to
# a C compiler. Environments without one -- e.g. the jupyter/base-notebook CI
# image, which has no gcc -- would otherwise fail integrator construction with
# "Compilation failed ... gcc: not found". Degrade gracefully: drop JIT when
# gcc is absent (identical results, just not compiled to native code).
sim_options = self.int_scheme_sim_options
if sim_options.get("jit") and shutil.which("gcc") is None:
sim_options = {**sim_options, "jit": False}
logging.warning(
"No C compiler (gcc) found on PATH; disabling integrator JIT for this run."
)
self.FG = ca.integrator(
"FG",
self.int_scheme_sim,
dae_dict,
self.t0,
self.tf,
sim_options,
)
[docs]
def _line_dyn_integrate(self, x0, y0, i0, s0, sl0):
"""Run the line_dyn integrator for one (set of) step(s) and return
(x, y_full, i_line) as 2-D arrays (columns = time steps).
Under line_dyn the differential block is [x | y_volt | xl]. With device
private algebraics the voltages stay differential (driven by fnode) while
the privates are algebraic: they are passed as z0 and returned in zf, and
recombined here so y_full has the standard layout [voltages | privates].
"""
p = np.concatenate((s0, sl0))
if self.n_priv > 0:
nv = self.nv
res = self.FG(
x0=np.concatenate((x0, y0[:nv], i0)), z0=y0[nv:], p=p
)
else:
res = self.FG(x0=np.concatenate((x0, y0, i0)), p=p)
xf = np.array(res["xf"])
if xf.ndim == 1:
xf = xf.reshape(-1, 1)
x = xf[: self.nx, :]
if self.n_priv > 0:
nv = self.nv
y_volt = xf[self.nx : self.nx + nv, :]
i_line = xf[self.nx + nv : self.nx + nv + self.nl, :]
zf = np.array(res["zf"])
if zf.ndim == 1:
zf = zf.reshape(-1, 1)
y = np.vstack((y_volt, zf))
else:
y = xf[self.nx : self.nx + self.ny, :]
i_line = xf[self.nx + self.ny :, :]
return x, y, i_line
[docs]
def simulate(self, dist: Disturbance) -> None:
# Prepare for successive calls
self.t0 = self.T_start
self.tf = self.t
self.update_omega()
self.fgcall()
iter_forward = 0
self.x_full = np.zeros([self.nx, self.nts])
self.y_full = np.zeros([self.ny, self.nts])
# Set initial values
x0 = self.xinit
y0 = self.yinit
i0 = self.xlinit
s0 = self.sinit
if self.line_dyn:
sl0 = self.slinit
self.x_full[:, iter_forward] = x0
self.y_full[:, iter_forward] = y0
previous_value = None
# Build functions for evaluation
if not self.line_dyn:
# Rebuild Y to include static
self.grid.build_y_sym(
omega_buses=self.omega_ref_buses,
omega_lines=self.omega_ref_lines,
dyn_update=True,
)
# Small-signal analysis is gated by config (small_signal_analysis) and is
# done HERE — at the operating point, before the time-stepping loop — so
# its results are computed and shown to the user even if the simulation
# (or the later estimation) subsequently fails. The blocking show means
# the figures are guaranteed seen before the run proceeds; under a
# non-interactive backend (headless/CI) plt.show is a harmless no-op.
if getattr(self, "small_signal_analysis", False):
self.eigenvalue_analysis()
self.print_modal_report()
self.plot_eigenvalues()
self.plot_participation_bands()
import matplotlib
import matplotlib.pyplot as plt
# Block only on an interactive backend (so the user sees the figures
# before the run proceeds); on a non-interactive backend (headless/CI)
# the figures are built but plt.show would only warn, so skip it.
_non_interactive = {"agg", "pdf", "svg", "ps", "template", "cairo", "pgf"}
if matplotlib.get_backend().lower() not in _non_interactive:
plt.show(block=True)
# Snapshot initial admittance state for post-processing branch currents
self.grid.build_y()
self._fault_intervals = [(0, self._snapshot_branch_params())]
if self.incl_lim:
for time_step in tqdm(range(self.nts - 1), desc="Simulation", unit="step"):
iter_forward += 1
try:
if self.line_dyn:
xrec, yrec, irec = self._line_dyn_integrate(
x0, y0, i0, s0, sl0
)
x0 = np.clip(xrec[:, 0], self.xmin, self.xmax)
y0 = yrec[:, 0]
i0 = irec[:, 0]
else:
res = self.FG(x0=x0, z0=y0, p=s0)
x0 = np.array(res["xf"]).flatten()
x0 = np.clip(x0, self.xmin, self.xmax)
y0 = np.array(res["zf"]).flatten()
except RuntimeError:
logging.critical(
f"Simulation failed numerically at or before {iter_forward * self.t} [s]. "
f"Check the log to see if the system was stable at the initial point. "
f"Try reducing the disturbance, changing the operating point (reduce the loading), "
f"reducing the time step, changing the simulation solver, tuning model parameters, "
f"decrease the constant power loads... Good luck!"
)
raise
self.x_full[:, iter_forward] = x0
self.y_full[:, iter_forward] = y0
current_value = round(iter_forward * self.t, 0)
# Log only when the first digit after the decimal changes
(
logging.info(f"Simulation time is {current_value} [s]")
if previous_value != current_value
else None
)
previous_value = current_value
params_changed = self.check_disturbance(dist, iter_forward)
if self.line_dyn:
sl0 = self.slinit # pick up updated line switches after disturbance
if params_changed:
self.grid.build_y()
self._fault_intervals.append(
(iter_forward, self._snapshot_branch_params())
)
else:
self.x_full = x0.reshape(-1, 1)
self.y_full = y0.reshape(-1, 1)
total_duration = self.T_end - self.T_start
pbar = tqdm(
total=100,
desc="Simulation",
unit="%",
bar_format="{l_bar}{bar}| {n:.1f}/{total:.0f}% [{elapsed}<{remaining}]",
)
for i in range(len(dist.time)):
if not dist.time.any():
break
tf = dist.time[0] # Take the next disturbance event
if tf > self.T_end:
continue
self.tf = np.arange(self.t0 + self.t, tf + self.t - 1e-10, self.t)
self.fgcall()
try:
if self.line_dyn:
xf, yf, ifinal = self._line_dyn_integrate(
x0, y0, i0, s0, sl0
)
else:
res_grid = self.FG(x0=x0, z0=y0, p=s0)
except RuntimeError:
logging.critical(
f"Simulation failed numerically at or before time {tf}. Check the log for details. "
"Try reducing the disturbance, changing the operating point, "
"reducing the time step, or tuning model parameters."
)
raise
logging.info(f"Simulation time is {tf} [s]")
iter_forward = math.ceil(tf / self.t)
if self.line_dyn:
i0 = ifinal[:, -1]
else:
xf = res_grid["xf"].full()
yf = res_grid["zf"].full()
self.x_full = np.hstack((self.x_full, xf))
self.y_full = np.hstack((self.y_full, yf))
params_changed = self.check_disturbance(dist, iter_forward)
if self.line_dyn:
sl0 = self.slinit # pick up updated line switches after disturbance
if params_changed:
self.grid.build_y()
self._fault_intervals.append(
(self.x_full.shape[1], self._snapshot_branch_params())
)
self.t0 = self.tf[
-1
] # set the beginning of the next interval as the end of the current interval
x0 = xf[:, -1] # initial value for the next interval
y0 = yf[:, -1] # initial value for the next interval
progress = (self.t0 - self.T_start) / total_duration * 100
pbar.n = progress
pbar.refresh()
# Now do the last interval
self.tf = np.arange(self.t0 + self.t, self.T_end - 1e-10, self.t)
self.fgcall()
try:
if self.line_dyn:
xf, yf, _ifinal = self._line_dyn_integrate(x0, y0, i0, s0, sl0)
else:
res_grid = self.FG(x0=x0, z0=y0, p=s0)
except RuntimeError:
logging.critical(
f"Simulation failed numerically at or before time {self.T_end}. Check the log for details. \n "
"Try reducing the disturbance, changing the operating point, "
"reducing the time step, or tuning model parameters."
)
raise
if not self.line_dyn:
xf = res_grid["xf"].full()
yf = res_grid["zf"].full()
self.x_full = np.hstack((self.x_full, xf))
self.y_full = np.hstack((self.y_full, yf))
pbar.n = 100
pbar.refresh()
pbar.close()
# Reconcile nts with actual data size (np.arange rounding can cause off-by-one)
self.nts = self.x_full.shape[1]
self.time_steps = np.linspace(self.T_start, self.T_end, self.nts)
# Compute branch currents in post-processing (vectorized NumPy)
self.compute_i_full()
[docs]
def _snapshot_branch_params(self) -> dict:
"""Snapshot the current numeric branch admittance parameters from Grid.build_y()."""
grid = self.grid
return {
"C_branches": grid.C_branches.copy(),
"y_series_r": np.real(grid.y_series).copy(),
"y_series_i": np.imag(grid.y_series).copy(),
"g_eff": grid.g_eff.copy(),
"b_eff": grid.b_eff.copy(),
"trafo": grid.tau.copy(),
}
[docs]
def compute_i_full(self) -> None:
"""Compute branch currents from x_full and y_full in post-processing.
Uses vectorized NumPy instead of per-step CasADi evaluation.
For has_delta_ref=False: simple matrix multiply C_branches @ y_full.
For has_delta_ref=True: includes reference-frame rotation using delta_ref from x_full.
"""
nb = self.grid.nb
nts = self.nts
self.i_full = np.zeros((4 * nb, nts))
# Build reordering index: C_branches layout [fwd; rev] → i_full layout [interleaved]
# C_branches rows: fwd[2k]=If_re_k, fwd[2k+1]=If_im_k, rev[2k]=It_re_k, rev[2k+1]=It_im_k
# i_full rows: [4k]=If_re_k, [4k+1]=If_im_k, [4k+2]=It_re_k, [4k+3]=It_im_k
k = np.arange(nb)
reorder_idx = np.empty(4 * nb, dtype=int)
reorder_idx[0::4] = 2 * k # If_re from forward row 2k
reorder_idx[1::4] = 2 * k + 1 # If_im from forward row 2k+1
reorder_idx[2::4] = 2 * nb + 2 * k # It_re from reverse row 2k
reorder_idx[3::4] = 2 * nb + 2 * k + 1 # It_im from reverse row 2k+1
# Determine interval boundaries from fault snapshots
boundaries = [snap[0] for snap in self._fault_intervals] + [nts]
for idx in range(len(self._fault_intervals)):
start = boundaries[idx]
end = boundaries[idx + 1]
if start >= end:
continue
params = self._fault_intervals[idx][1]
if not self.has_delta_ref:
# No rotation needed — constant matrix multiply. C_branches is
# built on the 2*nn voltage block; device-private algebraics
# occupy y_full rows [2*nn:], so use the voltage block only.
nv = 2 * self.grid.nn
self.i_full[:, start:end] = (
params["C_branches"] @ self.y_full[:nv, start:end]
)[reorder_idx, :]
else:
# Distributed reference frames: vectorized rotation + admittance
self._compute_i_full_with_rotation(params, start, end)
[docs]
def _compute_i_full_with_rotation(self, params: dict, start: int, end: int) -> None:
"""Compute branch currents with reference-frame rotation (has_delta_ref=True)."""
grid = self.grid
nb = grid.nb
y_sr = params["y_series_r"][:, None] # (nb, 1) for broadcasting
y_si = params["y_series_i"][:, None]
G = params["g_eff"][:, None]
B = params["b_eff"][:, None]
trafo = params["trafo"][:, None]
# Get delta_ref indices for from/to buses of each branch
idx_i_bus = [grid.buses[i] for i in grid.idx_i]
idx_j_bus = [grid.buses[j] for j in grid.idx_j]
idx_i_delta = [self.idx_delta_ref_by_bus[b] for b in idx_i_bus]
idx_j_delta = [self.idx_delta_ref_by_bus[b] for b in idx_j_bus]
# Extract delta_ref angles from x_full (nb, nts_interval)
delta_i = self.x_full[idx_i_delta, start:end]
delta_j = self.x_full[idx_j_delta, start:end]
delta_line = 0.5 * (delta_i + delta_j)
theta_i = delta_i - delta_line
theta_j = delta_j - delta_line
cos_ti = np.cos(theta_i)
sin_ti = np.sin(theta_i)
cos_tj = np.cos(theta_j)
sin_tj = np.sin(theta_j)
# Extract bus voltages (nb, nts_interval)
Vi_re = self.y_full[grid.idx_i_re, start:end]
Vi_im = self.y_full[grid.idx_i_im, start:end]
Vj_re = self.y_full[grid.idx_j_re, start:end]
Vj_im = self.y_full[grid.idx_j_im, start:end]
# Rotate voltages to line frame
Vi_re_L = cos_ti * Vi_re - sin_ti * Vi_im
Vi_im_L = sin_ti * Vi_re + cos_ti * Vi_im
Vj_re_L = cos_tj * Vj_re - sin_tj * Vj_im
Vj_im_L = sin_tj * Vj_re + cos_tj * Vj_im
# Apply tap ratio on i-side
Vi_re_Lt = Vi_re_L / trafo
Vi_im_Lt = Vi_im_L / trafo
# Series current: I_series = y_series * (Vi_tapped - Vj)
dV_re = Vi_re_Lt - Vj_re_L
dV_im = Vi_im_Lt - Vj_im_L
I_s_re = y_sr * dV_re - y_si * dV_im
I_s_im = y_sr * dV_im + y_si * dV_re
# Shunt currents
Ish_i_re = 0.5 * G * Vi_re_Lt - 0.5 * B * Vi_im_Lt
Ish_i_im = 0.5 * G * Vi_im_Lt + 0.5 * B * Vi_re_Lt
Ish_j_re = 0.5 * G * Vj_re_L - 0.5 * B * Vj_im_L
Ish_j_im = 0.5 * G * Vj_im_L + 0.5 * B * Vj_re_L
# Terminal currents in line frame
If_re_L = I_s_re + Ish_i_re
If_im_L = I_s_im + Ish_i_im
It_re_L = -I_s_re + Ish_j_re
It_im_L = -I_s_im + Ish_j_im
# Rotate back to bus frames (inverse rotation: -theta)
If_re_B = cos_ti * If_re_L + sin_ti * If_im_L
If_im_B = -sin_ti * If_re_L + cos_ti * If_im_L
It_re_B = cos_tj * It_re_L + sin_tj * It_im_L
It_im_B = -sin_tj * It_re_L + cos_tj * It_im_L
# Write to i_full (interleaved layout)
self.i_full[0::4, start:end] = If_re_B
self.i_full[1::4, start:end] = If_im_B
self.i_full[2::4, start:end] = It_re_B
self.i_full[3::4, start:end] = It_im_B
[docs]
def eigenvalue_analysis(self) -> None:
# Helper for Reference frequency substitution
W_sym = ca.vertcat(
self.omega_ref, self.omega_ref_buses, self.omega_ref_lines
) # symbolic placeholder
W_expr = ca.vertcat(
self.omega_ref_expr, self.omega_ref_buses_expr, self.omega_ref_lines_expr
) # function of states
# Substitute reference frequency expression in all equations
if self.f is not None:
f = ca.substitute(self.f, W_sym, W_expr)
if self.g is not None:
g = ca.substitute(self.g, W_sym, W_expr)
if self.fnode is not None:
fnode = ca.substitute(self.fnode, W_sym, W_expr)
if self.fl is not None:
fl = ca.substitute(self.fl, W_sym, W_expr)
if self.line_dyn and self.n_priv > 0:
# Mixed DAE: the network (voltages y_volt + line currents xl) is
# differential, the device-private algebraics y_priv are algebraic.
# Reduce by Schur-complementing out the privates (constrained by
# g_priv = g[nv:]) from the differential block [x, y_volt, xl]. Note
# g[:nv] are the device injections that feed fnode, not independent
# algebraic equations here.
y_volt = self.y[: self.nv]
y_priv = self.y[self.nv :]
g_priv = g[self.nv :]
nd = self.nx + self.nv + self.nl # differential variable count
J = ca.jacobian(
ca.vertcat(f, fnode, fl, g_priv),
ca.vertcat(self.x, y_volt, self.xl, y_priv),
)
jacobian_func = ca.Function(
"jacobian_func", [self.x, self.y, self.xl, self.s, self.sl], [J]
)
Ac = jacobian_func(
self.xinit, self.yinit, self.xlinit, self.sinit, self.slinit
)
fd_d = Ac[:nd, :nd]
fd_a = Ac[:nd, nd:]
ga_d = Ac[nd:, :nd]
ga_a = Ac[nd:, nd:]
As = fd_d - fd_a @ ca.inv(ga_a) @ ga_d
As = np.array(As)
elif self.line_dyn:
J = ca.jacobian(
ca.vertcat(f, fnode, fl),
ca.vertcat(self.x, self.y, self.xl),
)
jacobian_func = ca.Function(
"jacobian_func", [self.x, self.y, self.xl, self.s, self.sl], [J]
)
Ac = jacobian_func(
self.xinit, self.yinit, self.xlinit, self.sinit, self.slinit
)
As = Ac[: self.nx + self.ny + self.nl, : self.nx + self.ny + self.nl]
else:
J = ca.jacobian(ca.vertcat(f, g), ca.vertcat(self.x, self.y))
jacobian_func = ca.Function("jacobian_func", [self.x, self.y, self.s], [J])
Ac = jacobian_func(self.xinit, self.yinit, self.sinit)
fx = Ac[: self.nx, : self.nx]
fy = Ac[: self.nx, self.nx :]
gx = Ac[self.nx :, : self.nx]
gy = Ac[self.nx :, self.nx :]
As = fx - fy @ ca.inv(gy) @ gx
As = np.array(As) # convert As casadi matrix into numpy array
# The small-signal eigenvalue analysis is a diagnostic; never let it
# abort the simulation. A non-finite reduced state matrix (e.g. a
# near-singular algebraic Jacobian gy at the operating point) is logged
# and the analysis is skipped.
if not np.isfinite(np.array(As)).all():
logging.warning(
"Eigenvalue analysis skipped: reduced state matrix is not finite "
"(algebraic Jacobian may be singular at the operating point)."
)
self.eigenvalues = np.array([])
return
# Compute eigenvalues and right eigenvectors of the reduced state matrix.
self.eigenvalues, right_eigenvectors = np.linalg.eig(As)
# Left eigenvectors are the rows of the inverse modal matrix. For a
# defective/ill-conditioned modal matrix fall back to the pseudo-inverse
# so the diagnostic still yields (approximate) participation factors
# instead of aborting the run.
try:
left_eigenvectors = np.linalg.inv(right_eigenvectors)
except np.linalg.LinAlgError:
logging.warning(
"Eigenvector matrix is singular; using pseudo-inverse for "
"participation factors (eigenvalues may be defective/repeated)."
)
left_eigenvectors = np.linalg.pinv(right_eigenvectors)
# Participation factor of state k in mode j: |phi_kj * psi_jk|.
participation_factors = np.abs(right_eigenvectors * left_eigenvectors.T)
# Normalize each mode (column) so its participations sum to 1, making them
# comparable across modes; guard against an all-zero column.
col_sums = participation_factors.sum(axis=0)
col_sums[col_sums == 0] = 1.0
participation_normalized = participation_factors / col_sums
# Build state name list (always, not just for unstable modes)
_device_abbrev = {
"Synchronous_machine_transient_model": "SM",
"Synchronous_machine_subtransient_model": "SM",
"Synchronous_machine_subtransient_model_Sauer_Pai": "SM_SP",
"Synchronous_machine_subtransient_model_Sauer_Pai_6th_order": "SM_SP6",
"GridForming_inverter_model": "GFM",
"GridFollowing_inverter_model": "GFL",
"Static_load_power": "SLP",
"Static_load_impedance": "SLZ",
"Static_load_ZIP": "ZIP",
"Infinite_bus": "INF",
}
state_names = []
for item in device_list_sim:
short = _device_abbrev.get(item._name, item._name[:6])
for i in range(item.n):
for state in item.states:
state_names.append(f"{short}@{item.bus[i]}:{state}")
if self.line_dyn:
# Dynamic-network states: branch currents (LINE_i-j) and the
# algebraic-turned-differential node voltages (BUS_n).
for i in range(grid_sim.nb):
for state in line_sim.states:
state_names.append(
f"LINE_{line_sim.bus_i[i]}-{line_sim.bus_j[i]}:{state}"
)
for i in range(grid_sim.nn):
for state in ["vre", "vim"]:
state_names.append(f"BUS_{grid_sim.buses[i]}:{state}")
self.participation_factors = participation_factors
self.participation_factors_normalized = participation_normalized
self.state_names = state_names
self.modes = self._build_modes(participation_normalized)
unstable_modes = [e for e in self.eigenvalues if np.real(e) > 1e-4]
if unstable_modes:
logging.error(
f"The operating point seems unstable. It has unstable modes: {np.array(unstable_modes)}.\n "
f"The simulation will potentially fail. Known causes: too much fix power loads, model parameter issues..."
)
[docs]
def _build_modes(self, participation_normalized: np.ndarray) -> list:
"""Collapse the raw eigenvalues into physical modes (complex-conjugate
pairs merged) and attach the modal metrics used by the reports.
Returns a list of mode dicts sorted by damping ratio (most critical
first). Each carries the representative eigenvalue, natural frequency
[Hz], damping ratio, and that mode's normalized participation column.
"""
eigs = np.asarray(self.eigenvalues)
names = self.state_names
n = eigs.size
used = np.zeros(n, dtype=bool)
modes = []
for i in range(n):
if used[i]:
continue
used[i] = True
lam = eigs[i]
members = [i]
if abs(lam.imag) > 1e-9:
# Pair with the closest still-unused conjugate, if present.
best_j, best_err = None, np.inf
for j in range(n):
if used[j]:
continue
err = abs(eigs[j] - np.conj(lam))
if err < best_err:
best_err, best_j = err, j
if best_j is not None and best_err < 1e-6 * (abs(lam) + 1.0):
used[best_j] = True
members.append(best_j)
# Representative = the member with non-negative imaginary part.
rep = members[int(np.argmax([eigs[m].imag for m in members]))]
lam = eigs[rep]
sigma, omega = float(lam.real), abs(float(lam.imag))
denom = math.hypot(sigma, omega)
zeta = (-sigma / denom) if denom > 1e-12 else 0.0
pf = participation_normalized[:, rep]
order = np.argsort(pf)[::-1]
dominant = [(names[k], float(pf[k])) for k in order if pf[k] > 0]
modes.append(
{
"members": members,
"rep_idx": rep,
"eig": complex(lam),
"sigma": sigma,
"omega": omega,
"freq_hz": omega / (2.0 * math.pi),
"zeta": zeta,
"is_complex": len(members) == 2,
"participation": pf,
"dominant": dominant,
}
)
# Most critical (least damped) modes first.
modes.sort(key=lambda m: m["zeta"])
for idx, m in enumerate(modes):
m["id"] = idx + 1
return modes
[docs]
def print_modal_report(self, top_k: int = 4) -> str:
"""Print a per-mode small-signal summary and return it as a string.
One row per physical mode (complex-conjugate pairs collapsed), with the
eigenvalue, natural frequency, damping ratio and the dominant states.
Modes are sorted by damping ratio so the most critical appear first.
"""
if self.eigenvalues is None or len(self.eigenvalues) == 0 or not self.modes:
logging.info("No eigenvalue data available; skipping modal report.")
return ""
rows = []
for m in self.modes:
lam = m["eig"]
if m["is_complex"]:
eig_str = f"{lam.real:+.3f} ± {m['omega']:.3f}j"
freq_str = f"{m['freq_hz']:.3f}"
else:
eig_str = f"{lam.real:+.3f}"
freq_str = "—"
if abs(m["eig"]) < 1e-4:
flag = "zero"
elif m["sigma"] > 1e-4:
flag = "UNSTABLE"
elif m["zeta"] < 0.05:
flag = "low ζ"
else:
flag = ""
dom = ", ".join(
f"{name} ({frac * 100:.0f}%)" for name, frac in m["dominant"][:top_k]
)
rows.append(
[m["id"], eig_str, freq_str, f"{m['zeta'] * 100:+.1f}", flag, dom]
)
headers = [
"#",
"Eigenvalue (σ±jω)",
"f [Hz]",
"ζ [%]",
"Flag",
f"Dominant states (top {top_k}, participation %)",
]
table = tabulate(
rows, headers=headers, tablefmt="github", disable_numparse=True
)
n_unstable = sum(1 for m in self.modes if m["sigma"] > 1e-4)
header_line = (
f"Small-signal modal analysis: {len(self.modes)} modes "
f"({n_unstable} unstable) over {len(self.state_names)} states, "
"sorted by damping ratio (most critical first)."
)
report = header_line + "\n" + table
print("\n" + report + "\n")
return report
[docs]
def plot_eigenvalues(self, damping_ref: float = 0.05) -> None:
"""Simple eigenvalue (s-plane) scatter of the reduced state matrix.
Every eigenvalue is plotted in the complex plane with the imaginary axis
as the stability boundary. Points are colored red = unstable (Re>0),
orange = lightly damped (ζ < ``damping_ref``), blue = well damped, grey =
marginal (near the origin). A dashed wedge marks the constant-damping
locus ζ = ``damping_ref``. When the spectrum is very wide (fast real
modes far in the left half-plane) a second panel zooms in near the
imaginary axis, where the slow/critical modes live. Unstable eigenvalues
are annotated when there are only a few.
"""
import matplotlib.pyplot as plt
if self.eigenvalues is None or len(self.eigenvalues) == 0:
logging.info("No eigenvalue data available; skipping eigenvalue plot.")
return
eigs = np.asarray(self.eigenvalues)
re, im = eigs.real, eigs.imag
mag = np.abs(eigs)
with np.errstate(divide="ignore", invalid="ignore"):
zeta = np.where(mag > 1e-12, -re / mag, 0.0)
marginal = mag < 1e-4
unstable = (re > 1e-4) & ~marginal
lightly = (~unstable) & (~marginal) & (zeta < damping_ref)
stable = ~(marginal | unstable | lightly)
osc = np.abs(im) > 1e-6
if osc.any():
x_focus = max(5.0, 1.5 * float(np.abs(re[osc]).max()))
y_focus = 1.1 * float(np.abs(im).max())
else:
x_focus = max(5.0, 1.5 * float(np.median(np.abs(re)) or 1.0))
y_focus = 1.0
# A zoom panel only helps when fast real modes push the full range far
# past the slow/oscillatory cluster.
need_zoom = re.size > 1 and float(re.min()) < -1.5 * x_focus
def draw(ax, annotate):
ax.axhline(0.0, color="0.7", lw=0.6, zorder=0)
ax.axvline(0.0, color="k", lw=1.2, zorder=1) # stability boundary
if 0.0 < damping_ref < 1.0 and osc.any():
w = float(np.abs(im).max()) * 1.05
slope = damping_ref / np.sqrt(1.0 - damping_ref**2)
ax.plot([0, -slope * w], [0, w], ls="--", color="#E07000", lw=0.9)
ax.plot(
[0, -slope * w],
[0, -w],
ls="--",
color="#E07000",
lw=0.9,
label=f"ζ = {damping_ref * 100:.0f}%",
)
for mask, color, lbl, mk in (
(stable, "#1f77b4", "stable", "o"),
(lightly, "#E07000", f"lightly damped (ζ<{damping_ref * 100:.0f}%)", "o"),
(unstable, "#CC0000", "unstable (Re>0)", "o"),
(marginal, "#999999", "marginal (≈0)", "s"),
):
if np.any(mask):
ax.scatter(
re[mask], im[mask], s=42, c=color, marker=mk,
edgecolors="black", linewidths=0.4, label=lbl, zorder=3,
)
if annotate:
idx = np.where(unstable)[0]
if 0 < idx.size <= 10:
for k in idx:
sign = "+" if im[k] >= 0 else "−"
ax.annotate(
f"{re[k]:.2g}{sign}{abs(im[k]):.2g}j",
(re[k], im[k]), textcoords="offset points",
xytext=(6, 4), fontsize=7, color="#CC0000",
)
ax.set_xlabel("Real part σ [1/s]")
ax.grid(True, alpha=0.25, zorder=0)
ncols = 2 if need_zoom else 1
fig, axes = plt.subplots(
1, ncols, figsize=(7.7 * ncols, 7.0), squeeze=False
)
axes = axes[0]
draw(axes[0], annotate=not need_zoom)
axes[0].set_ylabel("Imaginary part ω [rad/s]")
axes[0].legend(loc="upper left", fontsize=8, framealpha=0.9)
if need_zoom:
axes[0].set_title("Full spectrum")
draw(axes[1], annotate=True)
axes[1].set_xlim(-x_focus, 0.06 * x_focus)
axes[1].set_ylim(-y_focus, y_focus)
axes[1].set_title(f"Zoom near imaginary axis (σ > −{x_focus:.0f})")
fig.suptitle(
f"Eigenvalue spectrum (s-plane) — {eigs.size} eigenvalues, "
f"{int(unstable.sum())} unstable",
fontsize=12,
)
else:
axes[0].set_title(
f"Eigenvalue spectrum (s-plane) — {eigs.size} eigenvalues, "
f"{int(unstable.sum())} unstable"
)
fig.tight_layout()
[docs]
def _render_participation_heatmap(
self,
modes: list,
title: str,
max_states: Optional[int] = 35,
min_participation: float = 0.02,
annotate: bool = True,
) -> None:
"""Render one participation-factor heatmap for a pre-selected, already
ordered list of modes. Shared by the damping overview and the banded
frequency views. Cells below ``min_participation`` are greyed out and
states that never reach it (in the shown modes) are dropped; the row axis
is capped at ``max_states`` strongest participants.
"""
import matplotlib.pyplot as plt
if not modes:
return
pf = np.column_stack([m["participation"] for m in modes])
state_names = list(self.state_names)
n_states_total = len(state_names)
# Keep only states that participate meaningfully, then cap the row count.
peak = pf.max(axis=1)
relevant = np.where(peak >= min_participation)[0]
if relevant.size == 0:
relevant = np.argsort(peak)[::-1][: (max_states or n_states_total)]
if max_states and relevant.size > max_states:
relevant = relevant[np.argsort(peak[relevant])[::-1][:max_states]]
relevant = np.sort(relevant)
pf = pf[relevant, :]
state_names = [state_names[k] for k in relevant]
n_states, n_modes = pf.shape
if n_states < n_states_total:
print(
f" [{title}] showing {n_modes} modes and "
f"{n_states}/{n_states_total} states (participation >= "
f"{min_participation:g})."
)
vmax = max(float(pf.max()), 1e-9)
pf_masked = np.ma.masked_less(pf, min_participation)
cmap = plt.get_cmap("viridis").copy()
cmap.set_bad("#ECECEC")
def mode_label(m):
if m["is_complex"]:
return f"#{m['id']} {m['freq_hz']:.2f} Hz ζ{m['zeta'] * 100:.0f}%"
return f"#{m['id']} real σ{m['sigma']:+.2g}"
col_labels = [mode_label(m) for m in modes]
fig_w = min(max(0.55 * n_modes + 4.0, 7.0), 32.0)
fig_h = min(max(0.5 * n_states + 2.0, 4.0), 26.0)
fig, ax = plt.subplots(figsize=(fig_w, fig_h))
im = ax.imshow(pf_masked, aspect="auto", cmap=cmap, vmin=0.0, vmax=vmax)
fs = 9 if max(n_states, n_modes) <= 35 else 7
ax.set_xticks(range(n_modes))
ax.set_xticklabels(col_labels, fontsize=fs, rotation=90)
ax.set_yticks(range(n_states))
ax.set_yticklabels(state_names, fontsize=fs)
ax.tick_params(length=0)
ax.set_xlabel("Mode", fontsize=10)
# Flag critical modes via x-tick label colors.
for j, m in enumerate(modes):
lbl = ax.get_xticklabels()[j]
if m["sigma"] > 1e-4:
lbl.set_color("#CC0000")
lbl.set_fontweight("bold")
elif m["zeta"] < 0.05 and abs(m["eig"]) >= 1e-4:
lbl.set_color("#E07000")
lbl.set_fontweight("bold")
# Annotate only when the grid is small enough to stay readable.
if annotate and n_states * n_modes <= 400:
for i in range(n_states):
for j in range(n_modes):
v = pf[i, j]
if v >= min_participation:
ax.text(
j,
i,
f"{v:.2f}",
ha="center",
va="center",
fontsize=max(fs - 2, 6),
color="white" if v < 0.55 * vmax else "black",
)
cbar = fig.colorbar(im, ax=ax, fraction=0.03, pad=0.02)
cbar.set_label("Normalized participation", fontsize=9)
cbar.ax.tick_params(labelsize=8)
ax.set_title(
f"{title}\n"
"red label = unstable · orange = lightly damped (ζ<5%) · "
"grey cell = negligible",
fontsize=11,
)
fig.tight_layout()
[docs]
def plot_participation_factors(
self,
max_modes: Optional[int] = 25,
max_states: Optional[int] = 35,
min_participation: float = 0.02,
annotate: bool = True,
) -> None:
"""Single participation-factor heatmap of the most critical modes.
Modes are sorted by damping ratio (most critical first) and the view is
focused for legibility on large systems: only the first ``max_modes`` and
the states that participate meaningfully in them are shown. Pass
``max_modes=None`` / ``max_states=None`` to show everything. For the
frequency-banded views (one figure per band) use
:meth:`plot_participation_bands`.
"""
if self.eigenvalues is None or len(self.eigenvalues) == 0 or not self.modes:
logging.info(
"No eigenvalue data available; skipping participation plot."
)
return
modes = self.modes[:max_modes] if max_modes else list(self.modes)
self._render_participation_heatmap(
modes,
"Participation factors — states × modes (most critical first)",
max_states=max_states,
min_participation=min_participation,
annotate=annotate,
)
[docs]
def plot_participation_bands(
self,
max_states: Optional[int] = 35,
min_participation: float = 0.02,
annotate: bool = True,
) -> None:
"""Frequency-banded participation heatmaps — one figure per band.
The modes are split into non-oscillatory (real) modes and four frequency
bands; a figure is produced only for a band that actually contains modes.
Within a band the modes are frequency-sorted (real modes are ordered by
proximity to instability). Bands: real · 0–0.1 Hz · 0.1–2 Hz (electro-
mechanical) · 2–50 Hz · >50 Hz.
"""
if self.eigenvalues is None or len(self.eigenvalues) == 0 or not self.modes:
logging.info(
"No eigenvalue data available; skipping participation plots."
)
return
bands = [
(
"Non-oscillatory (real) modes",
lambda m: not m["is_complex"],
lambda m: -m["sigma"], # closest to instability first
),
(
"Oscillatory modes 0–0.1 Hz",
lambda m: m["is_complex"] and m["freq_hz"] <= 0.1,
lambda m: m["freq_hz"],
),
(
"Oscillatory modes 0.1–2 Hz (electromechanical)",
lambda m: m["is_complex"] and 0.1 < m["freq_hz"] <= 2.0,
lambda m: m["freq_hz"],
),
(
"Oscillatory modes 2–50 Hz",
lambda m: m["is_complex"] and 2.0 < m["freq_hz"] <= 50.0,
lambda m: m["freq_hz"],
),
(
"Oscillatory modes > 50 Hz",
lambda m: m["is_complex"] and m["freq_hz"] > 50.0,
lambda m: m["freq_hz"],
),
]
any_shown = False
for title, predicate, sort_key in bands:
subset = sorted(
(m for m in self.modes if predicate(m)), key=sort_key
)
if not subset:
continue
any_shown = True
self._render_participation_heatmap(
subset,
title,
max_states=max_states,
min_participation=min_participation,
annotate=annotate,
)
if not any_shown:
logging.info("No modes to plot in any frequency band.")
[docs]
class DaeEst(Dae):
err_msg_est = (
"Estimation failed \n"
"Possible reasons: \n"
" - Not enough measurements specified \n"
" - Initialization point very bad \n"
" - Estimator diverged from true state \n"
" - Check if the disturbance rendered system unestimable \n"
"Possible solutions: \n"
"More measurements, less noise, different disturbance, better initialization..."
)
def __init__(self) -> None:
Dae.__init__(self)
self.nm: int = 0 # Number of measurements
# Integration scheme
self._schemes = {
"trapezoidal": {"kf": 0.5, "kb": 0.5},
"forward": {"kf": 1.0, "kb": 0.0},
"backward": {"kf": 0.0, "kb": 1.0},
}
# Set backward Euler as default
self.int_scheme: str = "backward"
self.kf: float = 0.0
self.kb: float = 1.0
self.line_dyn = False # So far only static lines are suppoorted for estimation
self.filter = None
self.unknown: list[str] # List of unknown buses
self.proc_noise_alg: float = 0.0001 # default value
self.proc_noise_diff: float = 0.0001 # default value
self.init_error_diff: float = 1.0 # default value
self.init_error_alg: bool = False # default value
self.unknown_indices: list = []
self.known_indices: list = []
self.err_init: float = 0.001 # initial covariance matrix - default value
# Matrices needed for calculation
self.r_meas_noise_cov_matrix: Optional[np.ndarray] = (
None # Measurement noise covariance matrix
)
self.r_meas_noise__inv_cov_matrix: Optional[np.ndarray] = (
None # Measurement noise covariance matrix
)
self.q_proc_noise_diff_cov_matrix: Optional[np.ndarray] = (
None # Process noise covariance matrix
)
self.q_proc_noise_alg_cov_matrix: Optional[np.ndarray] = None
self.q_proc_noise_cov_matrix: Optional[np.ndarray] = None
self.c_meas_matrix: Optional[np.ndarray] = None
self.z_meas_points_matrix: Optional[np.ndarray] = None
self.p_est_init_cov_matrix: Optional[np.ndarray] = None
self.x0: Optional[np.ndarray] = (
None # actual initial vector of differential states
)
self.y0: Optional[np.ndarray] = (
None # actual vector of initial algebraic states
)
self.s0: Optional[np.ndarray] = None # actual vector of initial switch states
self.f_func: Optional[ca.Function] = (
None # ca.Function of differential equations
)
self.g_func: Optional[ca.Function] = None # ca.Function of algebraic equations
self.df_dxy_jac: Optional[ca.Function] = (
None # Jacobian of differential equations
)
self.dg_dxy_jac: Optional[ca.Function] = None # Jacobian of algebraic equations
self.inner_tol: float = (
1e-6 # default value for the inner estimation loop tolerance
)
self.cov_tol: float = 1e-10 # minimal covariance matrix
self.iter_ful: Optional[np.ndarray] = None # Number of internal iterations
self.time_full: Optional[np.ndarray] = None # Time for each iteration
@property
def te(self):
return self._te
@te.setter
def te(self, value):
self._te = value
self.t = value
[docs]
def find_unknown_indices(self, grid: Grid) -> None:
unknowns_re = grid.get_node_index(self.unknown)[1]
unknowns_im = grid.get_node_index(self.unknown)[2]
# Combine unknown indices into one array. Cast to int explicitly:
# grid.get_node_index returns dtype=float64 empty arrays when
# self.unknown is empty (no BusUnknown declarations), and np.delete
# in estimate() rejects float-typed indices.
self.unknown_indices = np.concatenate((unknowns_re, unknowns_im)).astype(int)
# Create an array of all indices
all_indices = np.arange(self.ny)
# Use numpy.isin with invert=True to get known indices (those NOT in unknown_indices)
self.known_indices = all_indices[~np.isin(all_indices, self.unknown_indices)]
logging.info(
f"All buses with unknown components are: {self.unknown}. Other buses are assumed known and if nothing is connected to them, they will be assumed zero injection buses. The estimator will have an estimation bias if there is an un-modeled power injection coming in one of them."
)
[docs]
def setup(self, **kwargs) -> None:
# The Config object is shared between sim and est; if the user set
# line_dyn=True for the simulator, **vars(config) would propagate
# that here. The estimator does not support dynamic lines (see the
# NotImplementedError in estimate() for the reasoning). Force
# line_dyn=False on the est side regardless of what's in the config.
kwargs.pop("line_dyn", None)
super().setup(**kwargs)
self.line_dyn = False
[docs]
def init_symbolic(self) -> None:
self.x = ca.SX.sym("x", self.nx)
self.f = ca.SX.sym("f", self.nx)
self.s = ca.SX.sym("s", self.nx)
self.q_proc_noise_diff_cov_matrix = np.zeros([self.nx, self.nx])
self.r_meas_noise_cov_matrix = np.zeros([self.nm, self.nm])
self.z_meas_points_matrix = np.zeros([self.nm, self.nts])
self.c_meas_matrix = np.zeros([self.nm, self.nx + self.ny])
self.sinit = np.ones(self.nx) # Initial values for limiters (nothing limited)
# Per-algebraic-equation relative process-noise weight (squared). Bus
# KCL equations keep weight 1; device qcall() overlays private weights.
# estimate() turns this into q_proc_noise_alg_cov_matrix.
self.q_proc_noise_alg_relative = np.ones(self.ny)
[docs]
def fgcall(self) -> None:
for dev in device_list_est:
if dev.properties["call"]:
dev.call(dae_est, dae_sim)
# branch_voltage_p_m_u_est.call(dae_est, dae_sim)
# branch_current_p_m_u_est.call(dae_est, dae_sim)
dae_est.r_meas_noise__inv_cov_matrix = np.linalg.inv(
dae_est.r_meas_noise_cov_matrix
)
self.update_omega()
# Helper for Reference frequency substitution
W_sym = ca.vertcat(
self.omega_ref, self.omega_ref_buses, self.omega_ref_lines
) # symbolic placeholder
W_expr = ca.vertcat(
self.omega_ref_expr, self.omega_ref_buses_expr, self.omega_ref_lines_expr
) # function of states
# Substitute reference frequency expression in all equations
if self.f is not None:
self.f = ca.substitute(self.f, W_sym, W_expr)
if self.g is not None:
self.g = ca.substitute(self.g, W_sym, W_expr)
self.f_func = ca.Function("f", [self.x, self.y, self.s], [self.f])
self.df_dxy_jac = self.f_func.jacobian()
self.g_func = ca.Function(
"g", [self.x, self.y, self.s], [self.g[self.known_indices]]
)
self.dg_dxy_jac = self.g_func.jacobian()
[docs]
def _init_estimate(self) -> None:
self.p_est_init_cov_matrix = np.eye(self.nx + self.ny) * self.err_init ** (-1)
# set initial values
# err = lambda: (np.random.uniform() - 0.5) * 0.2 * config.init_error_diff
self.x0 = self.xinit
self.y0 = self.yinit
self.s0 = self.sinit
self.x_full = np.zeros([self.nx, self.nts])
self.y_full = np.zeros([self.ny, self.nts])
self.i_full = np.zeros([4 * self.grid.nb, self.nts])
self.iter_full = np.zeros(
[self.nts - 1]
) # 0-th state estimate is assumed known
self.time_full = np.zeros([self.nts - 1])
if self.init_error_alg:
# Flat start for the bus-voltage block only (1+0j per node); keep the
# device-private algebraics at their seeded init (yinit), since [1,0]
# striping is meaningless for them and would also mis-size when
# n_priv is odd.
nv = self.nv if self.nv else self.grid.nn * 2
y0 = [1, 0] * (nv // 2)
y0.extend(list(np.array(self.yinit)[nv:]))
self.y0 = y0
A_jac = self.df_dxy_jac(self.x0, self.y0, self.s0, 0)
A12x = np.array(A_jac[0])
A12y = np.array(A_jac[1])
A12 = np.hstack((A12x, A12y))
ones = np.eye(self.nx, self.nx + self.ny)
A34_jac = self.dg_dxy_jac(self.x0, self.y0, self.s0, 0)
A34 = np.hstack((A34_jac[0], A34_jac[1]))
obs = np.vstack((A12 + ones, A34, self.c_meas_matrix))
if np.linalg.matrix_rank(obs) < self.nx + self.ny:
logging.error(
"It seems that the system is un-estimable. Check input data! Try placing more measurements!"
)
null = null_space(obs) # nullspace
un_est_nodes = self.grid.buses.copy()
for node_idx, node in enumerate(self.grid.buses):
if np.allclose(null[self.nx + 2 * node_idx], 0):
un_est_nodes.remove(node)
logging.error(f"Un-estimable nodes are: {un_est_nodes}")
logging.warning(
"A good idea would be to place some PMUs around these nodes."
)
[docs]
def estimate(self, dist: Disturbance, **kwargs) -> None:
# Reject dynamic-line mode on the estimator. Two reasons:
# 1. PMU sampling rates (~50/60 Hz) are slower than the
# EM-transient timescale of line dynamics; the line current
# states are unobservable from typical synchrophasor streams.
# 2. The unknown-injector framework cannot supply bus reference
# angles needed by the line equations when a bus has no device
# model. There is no consistent way to write the rotation
# transforms across such buses.
# The DaeEst constructor sets line_dyn=False, but a config-driven
# setup(**vars(config)) could overwrite that — fail fast here so the
# design decision is enforced rather than silently mis-handled.
if getattr(self, "line_dyn", False):
raise NotImplementedError(
"Dynamic line modeling (line_dyn=True) is not supported on "
"the estimation side. EM-transient line dynamics are below "
"the typical PMU sampling rate (50/60 Hz) and cannot be "
"observed; furthermore the unknown-injector framework "
"cannot supply the bus reference angles needed by line "
"equations when a bus has no device model. "
"Set line_dyn=False for estimation."
)
self.find_unknown_indices(self.grid)
# Algebraic-equation process noise over the full ny = [voltages | privates]:
# diagonal of relative weights (1 for bus KCL, per-private from qcall())
# scaled by proc_noise_alg**2. A private weight of 0 -> hard constraint
# (zero slack). With n_priv == 0 this reduces to the previous
# eye(nn*2) * proc_noise_alg**2 behaviour exactly.
self.q_proc_noise_alg_cov_matrix = np.diag(self.q_proc_noise_alg_relative) * (
max(self.proc_noise_alg**2, self.cov_tol)
) # Noise for the algebraic equations
self.q_proc_noise_diff_cov_matrix *= max(self.proc_noise_diff**2, self.cov_tol)
self.q_proc_noise_alg_cov_matrix = np.delete(
self.q_proc_noise_alg_cov_matrix, self.unknown_indices, 0
)
self.q_proc_noise_alg_cov_matrix = np.delete(
self.q_proc_noise_alg_cov_matrix, self.unknown_indices, 1
)
self.q_proc_noise_cov_matrix = block_diag(
self.q_proc_noise_diff_cov_matrix, self.q_proc_noise_alg_cov_matrix
)
self.ng = self.ny - 2 * (len(self.unknown)) # number of algebraic equations
self.fgcall()
self._init_estimate()
self.kf = self._schemes[self.int_scheme]["kf"]
self.kb = self._schemes[self.int_scheme]["kb"]
x0 = self.x0
y0 = self.y0
s0 = self.s0
self.x_full[:, 0] = x0
self.y_full[:, 0] = y0
# Create shorter variable names
P_cov_inv = self.p_est_init_cov_matrix
C = self.c_meas_matrix
Rinv = self.r_meas_noise__inv_cov_matrix
Q = self.q_proc_noise_cov_matrix
CRC = C.T.dot(Rinv).dot(C)
CR = C.T.dot(Rinv)
A34 = np.zeros([self.ng, self.nx + self.ny])
ones = np.eye(self.nx, self.nx + self.ny)
iter_forward = 0
previous_value = None
for time_step in range(self.nts - 1):
start = time.time()
iter_forward += 1
x1 = x0
y1 = y0
s1 = s0
A_jac = self.df_dxy_jac(x0, y0, s0, 0)
A12x = np.array(A_jac[0] * self.t * self.kf)
A12y = np.array(A_jac[1] * self.t * self.kf)
A12 = np.hstack((A12x, A12y))
A = np.vstack((A12 + ones, A34))
Cov_L = cholesky(
Q + A.dot(cho_solve((P_cov_inv, True), A.T, check_finite=False)),
check_finite=False,
lower=True,
)
f_d = np.zeros(self.nx)
f_d_0 = np.zeros(self.nx)
if self.kf != 0: # for forward Euler and trapezoidal
f_d_0 = np.array(self.f_func(x0, y0, s0) * self.t * self.kf)[:, 0]
if self.kb == 0: # for forward Euler
E12 = np.zeros([self.nx, self.nx + self.ny])
f_d = f_d_0
y = self.z_meas_points_matrix[:, iter_forward]
current_value = round(iter_forward * self.t, 0)
# Log only when the first digit after the decimal changes
(
logging.info(f"Estimation time is {current_value} [s]")
if previous_value != current_value
else None
)
previous_value = current_value
# if the value is zero, add no noise
p_nd = (
np.sqrt(self.q_proc_noise_diff_cov_matrix) @ np.random.randn(self.nx)
) * (self.proc_noise_diff != 0)
p_na = (
np.sqrt(self.q_proc_noise_alg_cov_matrix) @ np.random.randn(self.ng)
) * (self.proc_noise_alg != 0)
p_n = np.hstack((p_nd, p_na))
if self.filter == "iekf" or self.filter == "IEKF":
max_inner_iter = 5
else:
max_inner_iter = 1
for iter_kf in range(max_inner_iter):
if self.kb != 0: # for trapezoidal and backward Euler
E12_jac = self.df_dxy_jac(x1, y1, s1, 0)
E12 = np.hstack((E12_jac[0], E12_jac[1])) * self.t * self.kb
f_d = (
f_d_0
+ np.array(self.f_func(x1, y1, s1) * self.t * self.kb)[:, 0]
)
E34_jac = self.dg_dxy_jac(x1, y1, s1, 0)
E34 = np.hstack((E34_jac[0], E34_jac[1]))
g_d = np.array(self.g_func(x1, y1, s1))[:, 0]
E = np.vstack((E12 - ones, E34))
xy1 = np.hstack((x1, y1))
Big_ = E.T.dot(cho_solve((Cov_L, True), E, check_finite=False)) + CRC
delta_k = np.hstack((E12.dot(xy1) - x0 - f_d, E34.dot(xy1) - g_d)) + p_n
small_ = E.T.dot(
cho_solve((Cov_L, True), delta_k, check_finite=False)
) + CR.dot(y)
try:
Big_chol = cholesky(Big_, lower=True)
except np.linalg.LinAlgError:
raise Exception(DaeEst.err_msg_est)
xy1_new = cho_solve((Big_chol, True), small_, check_finite=False)
x1_raw = xy1_new[: self.nx]
y1 = xy1_new[self.nx : self.nx + self.ny]
if self.incl_lim:
x1 = np.clip(x1_raw, self.xmin, self.xmax)
s1 = (x1 == x1_raw).astype(int)
else:
x1 = x1_raw
if np.max(np.abs(xy1_new - xy1)) < self.inner_tol:
break
self.x_full[:, iter_forward], self.y_full[:, iter_forward] = x0, y0 = x1, y1
P_cov_inv = Big_chol
self.check_disturbance(dist, iter_forward)
end = time.time()
self.time_full[iter_forward - 1] = end - start
self.iter_full[iter_forward - 1] = iter_kf + 1
# create the estimation grid
grid_est = GridEst()
# create the simulation grid
grid_sim = GridSim()
# initialize the DAE classes
dae_est = DaeEst()
dae_sim = DaeSim()
bus_init_sim = BusInit()
bus_init_est = BusInit()
bus_unknown_est = BusUnknown()
line_sim = Line()
line_est = Line()
disturbance_sim = Disturbance()
disturbance_est = Disturbance()
device_list_sim = []
device_list_est = []
[docs]
def stack_volt_power(vre, vim) -> np.ndarray:
u_power = np.hstack((np.vstack((vre, vim)), np.vstack((vim, -vre))))
return u_power