# Created: 2024-12-01
# Last Modified: 2025-05-07
# (c) Copyright 2024 ETH Zurich, Milos Katanic
# https://doi.org/10.5905/ethz-1007-842
#
# Licensed under the GNU General Public License v3.0;
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at:
#
# https://www.gnu.org/licenses/gpl-3.0.en.html
#
# This software is distributed "AS IS", WITHOUT WARRANTY OF ANY KIND,
# express or implied. See the License for specific language governing
# permissions and limitations under the License.
#
# The code is based on the publication: Katanic, M., Lygeros, J., Hug, G.: Recursive
# dynamic state estimation for power systems with an incomplete nonlinear DAE model.
# IET Gener. Transm. Distrib. 18, 3657–3668 (2024). https://doi.org/10.1049/gtd2.13308
# The full paper version is available at: https://arxiv.org/abs/2305.10065v2
# See full metadata at: README.md
# For inquiries, contact: mkatanic@ethz.ch
from __future__ import annotations # Postponed type evaluation
from typing import TYPE_CHECKING, Union, Any, Optional
if TYPE_CHECKING:
from pydynamicestimator.system import Dae, Grid, DaeEst, DaeSim
import casadi as ca
import numpy as np
import logging
np.random.seed(30)
sin = np.sin
cos = np.cos
sqrt = np.sqrt
[docs]
class Element:
"""Metaclass to be used for all elements to be added"""
def __init__(self) -> None:
self.n: int = 0 # number of devices
self.u: list[bool] = [] # device status
self.name: list[str] = [] # name of the device
self._type: Optional[str] = None # Device type
self._name: Optional[str] = None # Device name
self.int: dict[
str, Union[str, int]
] = (
{}
) # Dictionary of unique identifiers for each device based on the variable "idx"
self._data: dict[str, Any] = {"u": True} # Default entries for lists
self._params: dict[str, float] = {} # Default entries for np.arrays params
self._setpoints: dict[
str, float
] = {} # Default entries for np.arrays set points
self._descr: dict[str, str] = {} # Parameter and data explanation
self._mand: list[str] = [] # mandatory attributes
self.properties: dict[str, bool] = {
"gcall": False,
"fcall": False,
"xinit": False,
"fgcall": False,
"qcall": False,
"call": False,
"xy_index": False,
"finit": False,
"save_data": False,
"fplot": False,
}
[docs]
def add(
self, idx: Optional[str] = None, name: Optional[str] = None, **kwargs
) -> None:
"""
Add an element device
Args:
idx (str, optional): Unique identifier for the device. Generated if not provided.
name (str, optional): Name of the device. Generated if not provided.
**kwargs: Custom parameters to overwrite defaults.
"""
# Generate unique identifiers if not provided
idx = idx or f"{self._type}_{self.n + 1}"
name = name or f"{self._name}_{self.n + 1}"
self.int[idx] = self.n
self.name.append(name)
# check whether mandatory parameters have been set up
for key in self._mand:
if key not in kwargs:
raise Exception("Mandatory parameter <%s> has not been set up" % key)
self.n += 1
# Set default values
for key, default in {**self._params, **self._setpoints}.items():
if key not in self.__dict__:
logging.warning(
f"Attribute {key} not found in element - initializing as an empty array."
)
self.__dict__[key] = np.array([], dtype=type(default))
self.__dict__[key] = np.append(self.__dict__[key], default)
for key, default in self._data.items():
if key not in self.__dict__:
logging.warning(
f"Attribute {key} not found in element - initializing as an empty list."
)
self.__dict__[key] = []
self.__dict__[key].append(default)
# Overwrite custom values
for key, value in kwargs.items():
if key not in self.__dict__:
logging.warning(
"Element %s does not have parameter %s - ignoring"
% (self._name, key)
)
continue
try:
self.__dict__[key][-1] = value # Attempt to update the last element
except IndexError:
raise IndexError(
f"Parameter/setpoint '{key}' not properly specified in the model definition. Check if it's properly listed and initialized when defining the class"
)
logging.info(f"Element {name} (ID: {idx}) added successfully.")
[docs]
class BusInit(Element):
def __init__(self) -> None:
super().__init__()
self._type = "Bus_init_or_unknwon" # Element type
self._name = "Bus_init_or_unknown" # Element name
self._data.update({"bus": None, "p": 0, "q": 0, "v": 1.0, "type": None})
self.bus: list[Optional[str]] = []
self.p: list[float] = []
self.q: list[float] = []
self.v: list[float] = []
self.type: list[Optional[str]] = []
BusUnknown = BusInit # Alias class name
[docs]
class Disturbance(Element):
def __init__(self) -> None:
super().__init__()
self._type = "Disturbance" # Element type
self._name = "Disturbance" # Element name
# Default parameter values
self._params.update(
{
"bus_i": None,
"bus_j": None,
"time": None,
"type": None,
"y": 10,
"bus": None,
"p_delta": 0,
"q_delta": 0,
}
)
self.type = np.array([], dtype=str)
self.time = np.array([], dtype=float)
self.bus_i = np.array([], dtype=str)
self.bus_j = np.array([], dtype=str)
self.y: np.ndarray = np.array([], dtype=float)
self.bus = np.array([], dtype=str)
self.p_delta = np.array([], dtype=float)
self.q_delta = np.array([], dtype=float)
[docs]
def sort_chrono(self):
sorted_indices = np.argsort(self.time)
for key in self._params:
setattr(self, key, getattr(self, key)[sorted_indices])
[docs]
class Line(Element):
r"""
Parameters
----------
r : ndarray[float]
Series resistance value in per unit (p.u.).
x : ndarray[float]
Series reactance value in per unit (p.u.).
g : ndarray[float]
Total shunt conductance value in per unit (p.u.).
b : ndarray[float]
Total shunt susceptance value in per unit (p.u.).
trafo : ndarray[float]
Off-nominal line transformer ratio.
bus_i : ndarray[str]
Name of the sending-end bus.
bus_j : ndarray[str]
Name of the receiving-end bus.
"""
def __init__(self) -> None:
super().__init__()
self._type = "Transmission_line" # Element type
self._name = "Transmission_line" # Element name
self._params.update(
{
"r": 0.001,
"x": 0.001,
"g": 0,
"b": 0,
"bus_i": None,
"bus_j": None,
"trafo": 1,
}
)
self.r = np.array([], dtype=float)
self.x = np.array([], dtype=float)
self.g = np.array([], dtype=float)
self.b = np.array([], dtype=float)
self.trafo = np.array([], dtype=float)
self.bus_i = np.array([], dtype=object)
self.bus_j = np.array([], dtype=object)
[docs]
class DeviceRect(Element):
"""
Dynamic or static device modeled in rectangular coordinates. Used as a parent class
for all devices modeled in rectangular coordinates.
Attributes:
properties (dict[str, bool]): Flags for method calls (e.g., 'gcall', 'fcall').
xf (dict[str, np.ndarray]): Final state results for simulation/estimation.
xinit (dict[State, list[float]]): Initial state values for all devices of the instance.
xmin (list[float]): Minimum limited state value.
xmax (list[float]): Maximum limited state value.
_params (dict[str, float]): Device parameters, such as rated voltage, power, and frequency and internal parameters.
_data (dict[str, str]): Additional data which will be used in a list, one entry for each device of this object instance.
bus (list[Optional[str]]): Buses where the device is connected. Each device has an entry in the list.
states (list[States]): State variables.
units (list[States]): Units of state variables.
ns (float): Total number of states in the model.
_states_noise (dict[States, float]): Noise for each state variable.
_states_init_error (dict[States, float]): Initial error for each state variable.
vre (list[float]): Order of the real voltage value for each device in the overall DAE model.
vim (list[float]): Order of the real voltage value for each device in the overall DAE model.
_algebs (list[str]): Algebraic variables ('vre', 'vim').
_descr (dict[str, str]): Descriptions for key parameters.
"""
def __init__(self) -> None:
super().__init__()
self.xf: dict[str, np.ndarray] = {} # final state results or sim/est
self.xinit: dict[str, list[float]] = {}
self.xmin: list[float] = []
self.xmax: list[float] = []
self._params.update({"Vn": 220, "fn": 50.0, "Sn": 100})
self.Vn = np.array([], dtype=float)
self.fn = np.array([], dtype=float)
self.Sn = np.array([], dtype=float)
self._data.update({"bus": None})
self.bus: list[Optional[str]] = [] # at which bus
self.states: list[str] = [] # list of state variables
self.units: list[str] = [] # List of states' units
self.ns: int = 0 # number of states
self._states_noise: dict[
str, float
] = {} # list of noises for every state variable (only for estimation)
self._states_init_error: dict[
str, float
] = {} # list of initial errors for every state variable (only for estimation)
self._algebs: list[str] = ["vre", "vim"] # list of algebraic variables
self.vre = np.array([], dtype=float)
self.vim = np.array([], dtype=float)
self._x0: dict[
str, float
] = (
{}
) # default initial states, will be used as initial guess for the simulation initialization
self._mand.extend(["bus"])
self._descr = {
"Sn": "rated power",
"Vn": "rated voltage",
"u": "connection status",
"fn": "nominal frequency",
}
[docs]
def __init_subclass__(cls, **kwargs):
super().__init_subclass__(**kwargs)
# Merge class-level _descrs if subclass adds its own
full_descr = dict(getattr(cls, "_descr", {}))
for base in cls.__bases__:
full_descr.update(getattr(base, "_descr", {}))
# Format as docstring section
doc_lines = ["\nAttributes", "----------"]
for key, val in full_descr.items():
doc_lines.append(f"{key} : np.ndarray")
doc_lines.append(f" {val}")
# Inject into class docstring
cls.__doc__ = (cls.__doc__ or "") + "\n" + "\n".join(doc_lines)
[docs]
def _init_data(self) -> None:
self.xinit = {state: [] for state in self.states}
[docs]
def xy_index(self, dae: Dae, grid: Grid) -> None:
"""Initializes indices for states, algebraic variables, unknown inputs, and switches.
Args:
dae (Dae): Object managing differential-algebraic equations.
grid (Grid): Object managing the electrical grid and node indices.
"""
zeros = [0] * self.n
for item in self.states:
self.__dict__[item] = zeros[:]
for item in self._algebs:
self.__dict__[item] = zeros[:]
for var in range(self.n):
for item in self.states:
self.__dict__[item][var] = dae.nx
# Init with current state init value
dae.xinit.append(self.xinit[item][var])
dae.nx += 1
# Retrieve min and max state limits
item_min = getattr(self, item + "_min", None)
item_max = getattr(self, item + "_max", None)
self.xmin.append(item_min[var] if item_min is not None else -np.inf)
self.xmax.append(item_max[var] if item_max is not None else np.inf)
dae.xmin.append(item_min[var] if item_min is not None else -np.inf)
dae.xmax.append(item_max[var] if item_max is not None else np.inf)
if self.n:
# assign indices to real and imaginary voltage algebraic variables; first real value
self.__dict__["vre"] = grid.get_node_index(self.bus)[1]
self.__dict__["vim"] = grid.get_node_index(self.bus)[2]
[docs]
def add(self, idx=None, name=None, **kwargs) -> None:
super().add(idx, name, **kwargs)
# initialize initial states with some default values
for item in self.states:
self.xinit[item].append(self._x0[item])
[docs]
def init_from_simulation(
self, device_sim: DeviceRect, idx: str, dae: DaeEst, dae_sim: DaeSim
) -> None:
"""Initialize the device state estimation based on simulation results.
Args:
device_sim (DeviceRect): The device simulation object containing simulation results.
idx (str): unique index of the device
dae (DaeEst): The DAE object responsible for managing the estimation of the system.
dae_sim (DaeSim): The simulation object providing timing information.
"""
var_sim = device_sim.int.get(idx)
var_est = self.int.get(idx)
# Initial states of estimation as true states obtained through simulation
for item in self.states:
# Init with simulated value
try:
self.xinit[item][var_est] = device_sim.xf[item][
var_sim, round(dae.T_start / dae_sim.t)
]
except KeyError:
logging.error(
f"Failed to initialize state {item}. State not found in simulation model."
)
continue
# Add noise for the init state
noise = (
self._states_init_error[item]
* (np.random.uniform() - 0.5)
* dae.init_error_diff
)
dae.xinit[self.__dict__[item][var_est]] = self.xinit[item][var_est] + noise
# Set setpoint values based on simulation
for item, value in self._setpoints.items():
if item in device_sim.__dict__:
self.__dict__[item][var_est] = device_sim.__dict__[item][var_sim]
else:
logging.warning(
f"Setpoint {item} not found in simulation test cases. Skipping. It will be ignored and the estimation will start from default initial value"
)
[docs]
def save_data(self, dae: Dae) -> None:
for item in self.states:
self.xf[item] = np.zeros([self.n, dae.nts])
self.xf[item][:, :] = dae.x_full[self.__dict__[item][:], :]
[docs]
def finit(self, dae: Dae) -> None:
"""Initialize the device by setting up setpoints, initial states based on the power flow solution.
Args:
dae (Dae): The DAE object used to simulate the system.
"""
u = ca.SX.sym("", 0)
u0 = []
for item in self._setpoints:
# Set the initial guess for the setpoint
u0.append(self.__dict__[item])
# Reset it to be a variable
self.__dict__[item] = ca.SX.sym(item, self.n)
# Stack the variable to a single vector
u = ca.vertcat(u, self.__dict__[item])
u0 = [item for sublist in u0 for item in sublist] # flatten it
# Now subtract the initial network currents from algebraic equations
for alg in self._algebs:
dae.g[self.__dict__[alg]] += dae.iinit[self.__dict__[alg]]
# Algebraic variables are now not symbolic but their init values
dae.y = dae.yinit.copy()
dae.s = np.ones(dae.nx)
self.fgcall(dae)
# Find the indices of differential equations for this type of generator
diff = [self.__dict__[arg] for arg in self.states]
diff_index = [item for sublist in np.transpose(diff) for item in sublist]
inputs = [ca.vertcat(dae.x[diff_index], u)]
outputs = [
ca.vertcat(
dae.f[diff_index],
dae.g[self.__dict__["vre"]],
dae.g[self.__dict__["vim"]],
)
]
device_init = ca.Function("h", inputs, outputs)
newton_init = ca.rootfinder("G", "newton", device_init)
x0 = np.array(list(self._x0.values()) * self.n)
solution = newton_init(ca.vertcat(x0, u0))
solution = np.array(solution).flatten()
# # Init only these states
# for s in self.states:
# self.xinit[s] = list(solution[self.__dict__[s]])
index = 0 # emma: rewrote 2 lines above so that more than just synchronous generators could be included in the simulation (otherwise the indexing is not done properly)
for s in self.states:
locat = []
for n in range(self.n):
locat.append(n * len(self.states) + index)
self.xinit[s] = solution[locat]
index += 1
for idx, s in enumerate(self._setpoints):
setpoint_range_start = (len(self.states) + idx) * self.n
changed_setpoints = (
u0[idx * self.n : (idx + 1) * self.n]
!= solution[setpoint_range_start : setpoint_range_start + self.n]
)
for i in range(self.n):
if changed_setpoints[i]:
logging.warning(
f"Setpoint {s} updated in device {self._name} at node {self.bus[i]} from {u0[idx*self.n + i]} to {solution[setpoint_range_start+i]} to match the initial power flow!"
)
self.__dict__[s] = solution[
setpoint_range_start : setpoint_range_start + self.n
]
# Now load the initial states into DAE class such that simulation/estimation actually starts from those values
dae.xinit[diff_index] = solution[: len(self.states) * self.n]
# Reset the algebraic equations such that they can be used "erneut" from scratch once the "fgcall" is called
dae.g *= 0
# Reset the voltages to being again symbolic variables
dae.y = ca.SX.sym("y", dae.ny)
dae.s = ca.SX.sym("s", dae.nx)
2 == 2
[docs]
def fgcall(self, dae: Dae) -> None:
"""
A method that executes the differential and algebraic equations of the model
and adds them to the appropriate places in the Dae class
:param dae: an instance of a class Dae
:type dae:
:return: None
:rtype: None
"""
pass
[docs]
def qcall(self, dae: DaeEst) -> None:
for item in self.states:
dae.q_proc_noise_diff_cov_matrix[
self.__dict__[item], self.__dict__[item]
] = (self._states_noise[item]) ** 2