Source code for pydynamicestimator.measurements.measurements

# Created: 2024-12-01
# Last Modified: 2025-05-07
# (c) Copyright 2024 ETH Zurich, Milos Katanic
# https://doi.org/10.5905/ethz-1007-842
#
# Licensed under the GNU General Public License v3.0;
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at:
#
#     https://www.gnu.org/licenses/gpl-3.0.en.html
#
# This software is distributed "AS IS", WITHOUT WARRANTY OF ANY KIND,
# express or implied. See the License for specific language governing
# permissions and limitations under the License.
#

# The code is based on the publication: Katanic, M., Lygeros, J., Hug, G.: Recursive
# dynamic state estimation for power systems with an incomplete nonlinear DAE model.
# IET Gener. Transm. Distrib. 18, 3657–3668 (2024). https://doi.org/10.1049/gtd2.13308
# The full paper version is available at: https://arxiv.org/abs/2305.10065v2
# See full metadata at: README.md
# For inquiries, contact: mkatanic@ethz.ch


from __future__ import annotations  # Postponed type evaluation
from typing import TYPE_CHECKING
import numpy as np
from typing import Optional

from pydynamicestimator.devices.device import Element

if TYPE_CHECKING:
    from pydynamicestimator.system import DaeEst, DaeSim

np.random.seed(30)


[docs] class Measurement(Element): def __init__(self) -> None: super().__init__() self._params.update( {"acc": 0.001, "seed": 30, "distr": "normal"} ) # Default entries for np.arrays self._descr.update( { "prec": "measurement weight", "acc": "standard deviation of measurement error", "seed": "random number ceed", "u": "device status", } ) self._meas: list[str] = [] self._mand: list[str] = [] # mandatory attributes self._noise_mapping = { name: getattr(np.random, name) for name in dir(np.random) if callable(getattr(np.random, name)) and not name.startswith("_") } self.acc = np.array([], dtype=float) # Measurement accuracy self.distr = np.array([], dtype=str) self.seed = np.array([], dtype=int) self.properties.update({"call": True, "xy_index": True})
[docs] def add(self, idx=None, name=None, **kwargs) -> None: """Add a measurement device""" super().add(idx=idx, name=name, **kwargs) distr = kwargs.get("distr") if distr not in self._noise_mapping.keys(): raise Exception("The given noise distribution %s is not defined" % distr)
[docs] def xy_index(self, dae: DaeEst, dae_sim: DaeSim) -> None: """Assign indices for measured quantities""" for var in range(self.n): for item in self._meas: self.__dict__[item][var] = dae.nm dae.nm += 1
[docs] class BusVoltagePMU(Measurement): def __init__(self) -> None: super().__init__() self._type = "pmuu" self._name = "PMU_voltage_measurement" self._data.update({"bus": "0"}) self._params.update({"vre": 0, "vim": 0}) self._meas.extend(["vre", "vim"]) self._mand.extend(["bus"]) self.bus: list[Optional[str]] = [] # which buses are measured self.vre = np.array([], dtype=int) # index of the measured quantity self.vim = np.array([], dtype=int)
[docs] def call(self, dae: DaeEst, dae_sim: DaeSim) -> None: est_idx_re, est_idx_im = dae.grid.get_node_index(self.bus)[1:3] sim_idx_re, sim_idx_im = dae_sim.grid.get_node_index(self.bus)[1:3] noise = np.zeros([self.n * 2, dae.nts]) # TODO: Implement the function to calculate the noise term properly for different noise distributions for var in range(self.n): selected_function = self._noise_mapping[self.distr[var]] noise[2 * var : 2 * var + 2] = ( selected_function(size=[2, dae.nts]) * self.acc[var] ) dae.c_meas_matrix[self.vre, dae.nx + est_idx_re] = 1 dae.c_meas_matrix[self.vim, dae.nx + est_idx_im] = 1 dae.z_meas_points_matrix[self.vre] = ( dae_sim.y_full[ sim_idx_re, round(dae.T_start / dae_sim.t) : round(dae.T_end / dae_sim.t) : round( dae.t / dae_sim.t ), ] + noise[np.arange(0, 2, 2 * self.n)] ) dae.z_meas_points_matrix[self.vim] = ( dae_sim.y_full[ sim_idx_im, round(dae.T_start / dae_sim.t) : round(dae.T_end / dae_sim.t) : round( dae.t / dae_sim.t ), ] + noise[np.arange(1, 2, 2 * self.n)] ) dae.r_meas_noise_cov_matrix[self.vre, self.vre] = np.clip( self.acc**2, a_min=dae.cov_tol, a_max=1 / (dae.cov_tol) ) dae.r_meas_noise_cov_matrix[self.vim, self.vim] = np.clip( self.acc**2, a_min=dae.cov_tol, a_max=1 / (dae.cov_tol) )
[docs] class BranchCurrentPMU(Measurement): def __init__(self) -> None: super().__init__() self._type = "pmui" self._name = "PMU_current_measurement" self._data.update({"bus_i": "0", "bus_j": "0"}) self._params.update({"ire": 0, "iim": 0}) self._mand.extend(["bus_i", "bus_j"]) self._meas.extend(["ire", "iim"]) self.bus_i: list[Optional[str]] = [] self.bus_j: list[Optional[str]] = [] self.ire = np.array([], dtype=int) self.iim = np.array([], dtype=int)
[docs] def call(self, dae: DaeEst, dae_sim: DaeSim) -> None: for idx_meas in range(self.n): # now include the choice of the distribution of noise selected_function = self._noise_mapping[self.distr[idx_meas]] noise = selected_function(size=[2, dae.nts]) * self.acc[idx_meas] idx_branch_dir = dae_sim.grid.get_branch_index(self.bus_i, self.bus_j)[1] idx_branch_dir_re = 2 * idx_branch_dir idx_branch_dir_im = 2 * idx_branch_dir + 1 idx_branch_dir_est = dae.grid.get_branch_index(self.bus_i, self.bus_j)[1] idx_branch_dir_re_est = 2 * idx_branch_dir_est idx_branch_dir_im_est = 2 * idx_branch_dir_est + 1 dae.c_meas_matrix[self.ire, dae.nx :] = dae.grid.C_branches[ idx_branch_dir_re_est, : ] dae.c_meas_matrix[self.iim, dae.nx :] = dae.grid.C_branches[ idx_branch_dir_im_est, : ] dae.r_meas_noise_cov_matrix[self.ire, self.ire] = np.clip( self.acc**2, a_min=1e-10, a_max=1e10 ) dae.r_meas_noise_cov_matrix[self.iim, self.iim] = np.clip( self.acc**2, a_min=1e-10, a_max=1e10 ) dae.z_meas_points_matrix[self.ire] = ( dae_sim.i_full[ idx_branch_dir_re, round(dae.T_start / dae_sim.t) : round(dae.T_end / dae_sim.t) : round( dae.t / dae_sim.t ), ] + noise[np.arange(0, 2, 2 * self.n)] ) dae.z_meas_points_matrix[self.iim] = ( dae_sim.i_full[ idx_branch_dir_im, round(dae.T_start / dae_sim.t) : round(dae.T_end / dae_sim.t) : round( dae.t / dae_sim.t ), ] + noise[np.arange(1, 2, 2 * self.n)] )
# TODO: Implement injection measurement and confirm that it gives the same as branch measurement if there is only one branch
[docs] class BusCurrentPMU(Measurement): def __init__(self) -> None: super().__init__() self._type = "pmui" self._name = "PMU_current_measurement" self._data.update({"bus_i": "0"}) self._params.update({"ire": 0, "iim": 0}) self._mand.extend(["bus_i"]) self._meas.extend(["ire", "iim"]) self.bus_i: list[Optional[str]] = [] self.ire = np.array([], dtype=int) self.iim = np.array([], dtype=int)
[docs] def call(self, dae: DaeEst, dae_sim: DaeSim) -> None: for idx_meas in range(self.n): # now include the choice of the distribution of noise selected_function = self._noise_mapping[self.distr[idx_meas]] noise = selected_function(size=[2, dae.nts]) * self.acc[idx_meas] idx_i_re, idx_i_im = dae.grid.get_node_index(self.bus_i)[1:3] dae.c_meas_matrix[self.ire, dae.nx :] = dae.grid.y_adm_matrix[idx_i_re, :] dae.c_meas_matrix[self.iim, dae.nx :] = dae.grid.y_adm_matrix[idx_i_im, :] dae.r_meas_noise_cov_matrix[self.ire, self.ire] = np.clip( self.acc**2, a_min=1e-10, a_max=1e10 ) dae.r_meas_noise_cov_matrix[self.iim, self.iim] = np.clip( self.acc**2, a_min=1e-10, a_max=1e10 ) # Indices of estimation nodes in the simulation vector nodes = np.array(dae_sim.grid.get_node_index(dae.grid.buses)[1:3]).flatten("F") dae.z_meas_points_matrix[self.ire] = ( dae.grid.y_adm_matrix[idx_i_re, :] @ dae_sim.y_full[ nodes, round(dae.T_start / dae_sim.t) : round(dae.T_end / dae_sim.t) : round( dae.t / dae_sim.t ), ] + noise[np.arange(0, 2, 2 * self.n)] ) dae.z_meas_points_matrix[self.iim] = ( dae.grid.y_adm_matrix[idx_i_im, :] @ dae_sim.y_full[ nodes, round(dae.T_start / dae_sim.t) : round(dae.T_end / dae_sim.t) : round( dae.t / dae_sim.t ), ] + noise[np.arange(1, 2, 2 * self.n)] )