Source code for pydynamicestimator.measurements.measurements

# Created: 2024-12-01
# Last Modified: 2025-05-12
# (c) Copyright 2024 ETH Zurich, Milos Katanic
# https://doi.org/10.5905/ethz-1007-842
#
# Licensed under the GNU General Public License v3.0;
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at:
#
#     https://www.gnu.org/licenses/gpl-3.0.en.html
#
# This software is distributed "AS IS", WITHOUT WARRANTY OF ANY KIND,
# express or implied. See the License for specific language governing
# permissions and limitations under the License.
#

# The code is based on the publication: Katanic, M., Lygeros, J., Hug, G.: Recursive
# dynamic state estimation for power systems with an incomplete nonlinear DAE model.
# IET Gener. Transm. Distrib. 18, 3657–3668 (2024). https://doi.org/10.1049/gtd2.13308
# The full paper version is available at: https://arxiv.org/abs/2305.10065v2
# See full metadata at: README.md
# For inquiries, contact: mkatanic@ethz.ch


from __future__ import annotations  # Postponed type evaluation
from typing import TYPE_CHECKING
import numpy as np
from typing import Optional

from pydynamicestimator.devices.device import Element

if TYPE_CHECKING:
    from pydynamicestimator.system import DaeEst, DaeSim

np.random.seed(30)


[docs] class Measurement(Element): def __init__(self) -> None: super().__init__() self._params.update( {"acc": 0.001, "seed": 30, "distr": "normal"} ) # Default entries for np.arrays self._descr.update( { "prec": "measurement weight", "acc": "standard deviation of measurement error", "seed": "random number ceed", "u": "device status", } ) self._meas: list[str] = [] self._mand: list[str] = [] # mandatory attributes self._noise_mapping = { name: getattr(np.random, name) for name in dir(np.random) if callable(getattr(np.random, name)) and not name.startswith("_") } self.acc = np.array([], dtype=float) # Measurement accuracy self.distr = np.array([], dtype=str) self.seed = np.array([], dtype=int) self.properties.update({"call": True, "xy_index": True})
[docs] def add(self, idx=None, name=None, **kwargs) -> None: """Add a measurement device""" super().add(idx=idx, name=name, **kwargs) distr = kwargs.get("distr") if distr not in self._noise_mapping.keys(): raise Exception("The given noise distribution %s is not defined" % distr)
[docs] def xy_index(self, dae: DaeEst, dae_sim: DaeSim) -> None: """Assign indices for measured quantities""" for var in range(self.n): for item in self._meas: self.__dict__[item][var] = dae.nm dae.nm += 1
[docs] class BusVoltagePMU(Measurement): def __init__(self) -> None: super().__init__() self._type = "pmuu" self._name = "PMU_voltage_measurement" self._data.update({"bus": "0"}) self._params.update({"vre": 0, "vim": 0}) self._meas.extend(["vre", "vim"]) self._mand.extend(["bus"]) self.bus: list[Optional[str]] = [] # which buses are measured self.vre = np.array([], dtype=int) # index of the measured quantity self.vim = np.array([], dtype=int)
[docs] def call(self, dae: DaeEst, dae_sim: DaeSim) -> None: est_idx_re, est_idx_im = dae.grid.get_node_index(self.bus)[1:3] sim_idx_re, sim_idx_im = dae_sim.grid.get_node_index(self.bus)[1:3] noise = np.zeros([self.n * 2, dae.nts]) # TODO: Implement the function to calculate the noise term properly for different noise distributions for var in range(self.n): selected_function = self._noise_mapping[self.distr[var]] noise[2 * var : 2 * var + 2] = ( selected_function(size=[2, dae.nts]) * self.acc[var] ) dae.c_meas_matrix[self.vre, dae.nx + est_idx_re] = 1 dae.c_meas_matrix[self.vim, dae.nx + est_idx_im] = 1 dae.z_meas_points_matrix[self.vre] = ( dae_sim.y_full[ sim_idx_re, round(dae.T_start / dae_sim.t) : round(dae.T_end / dae_sim.t) : round( dae.t / dae_sim.t ), ] + noise[np.arange(0, 2, 2 * self.n)] ) dae.z_meas_points_matrix[self.vim] = ( dae_sim.y_full[ sim_idx_im, round(dae.T_start / dae_sim.t) : round(dae.T_end / dae_sim.t) : round( dae.t / dae_sim.t ), ] + noise[np.arange(1, 2, 2 * self.n)] ) dae.r_meas_noise_cov_matrix[self.vre, self.vre] = np.clip( self.acc**2, a_min=dae.cov_tol, a_max=1 / (dae.cov_tol) ) dae.r_meas_noise_cov_matrix[self.vim, self.vim] = np.clip( self.acc**2, a_min=dae.cov_tol, a_max=1 / (dae.cov_tol) )
[docs] class BranchCurrentPMU(Measurement): def __init__(self) -> None: super().__init__() self._type = "pmui" self._name = "PMU_current_measurement" self._data.update({"bus_i": "0", "bus_j": "0"}) self._params.update({"ire": 0, "iim": 0}) self._mand.extend(["bus_i", "bus_j"]) self._meas.extend(["ire", "iim"]) self.bus_i: list[Optional[str]] = [] self.bus_j: list[Optional[str]] = [] self.ire = np.array([], dtype=int) self.iim = np.array([], dtype=int)
[docs] def call(self, dae: DaeEst, dae_sim: DaeSim) -> None: for idx_meas in range(self.n): # now include the choice of the distribution of noise selected_function = self._noise_mapping[self.distr[idx_meas]] noise = selected_function(size=[2, dae.nts]) * self.acc[idx_meas] idx_branch_dir = dae_sim.grid.get_branch_index(self.bus_i, self.bus_j)[1] # dae_sim.i_full layout changed (compute_i_full at system.py:2440) from # vstack(forward_block, reverse_block) — 2-stride within each block — # to interleaved [If_re_k, If_im_k, It_re_k, It_im_k] — 4-stride. # Translate idx_branch_dir (which is `k` for forward, `k + nb` for # reverse) into the new layout: forward uses rows 4k, 4k+1; reverse # uses rows 4k+2, 4k+3. nb_sim = dae_sim.grid.nb is_fwd_sim = idx_branch_dir < nb_sim k_sim = np.where(is_fwd_sim, idx_branch_dir, idx_branch_dir - nb_sim) off_sim = np.where(is_fwd_sim, 0, 2) idx_branch_dir_re = 4 * k_sim + off_sim idx_branch_dir_im = 4 * k_sim + off_sim + 1 idx_branch_dir_est = dae.grid.get_branch_index(self.bus_i, self.bus_j)[1] # C_branches layout is still vstack(forward, reverse) — 2-stride — so # the original `2 * idx_branch_dir_est` indexing remains correct here. idx_branch_dir_re_est = 2 * idx_branch_dir_est idx_branch_dir_im_est = 2 * idx_branch_dir_est + 1 # C_branches spans the 2*nn bus-voltage block. Map the branch-current # measurement onto the voltage columns only; device-private algebraic # columns (dae.nx + 2*nn :) stay zero (a branch current does not depend # on any device's internal algebraic variable). nv = dae.nv if dae.nv else 2 * dae.grid.nn dae.c_meas_matrix[self.ire, dae.nx : dae.nx + nv] = dae.grid.C_branches[ idx_branch_dir_re_est, : ] dae.c_meas_matrix[self.iim, dae.nx : dae.nx + nv] = dae.grid.C_branches[ idx_branch_dir_im_est, : ] dae.r_meas_noise_cov_matrix[self.ire, self.ire] = np.clip( self.acc**2, a_min=1e-10, a_max=1e10 ) dae.r_meas_noise_cov_matrix[self.iim, self.iim] = np.clip( self.acc**2, a_min=1e-10, a_max=1e10 ) dae.z_meas_points_matrix[self.ire] = ( dae_sim.i_full[ idx_branch_dir_re, round(dae.T_start / dae_sim.t) : round(dae.T_end / dae_sim.t) : round( dae.t / dae_sim.t ), ] + noise[np.arange(0, 2, 2 * self.n)] ) dae.z_meas_points_matrix[self.iim] = ( dae_sim.i_full[ idx_branch_dir_im, round(dae.T_start / dae_sim.t) : round(dae.T_end / dae_sim.t) : round( dae.t / dae_sim.t ), ] + noise[np.arange(1, 2, 2 * self.n)] )
# TODO: Implement injection measurement and confirm that it gives the same as branch measurement if there is only one branch
[docs] class BusCurrentPMU(Measurement): def __init__(self) -> None: super().__init__() self._type = "pmui" self._name = "PMU_current_measurement" self._data.update({"bus_i": "0"}) self._params.update({"ire": 0, "iim": 0}) self._mand.extend(["bus_i"]) self._meas.extend(["ire", "iim"]) self.bus_i: list[Optional[str]] = [] self.ire = np.array([], dtype=int) self.iim = np.array([], dtype=int)
[docs] def call(self, dae: DaeEst, dae_sim: DaeSim) -> None: for idx_meas in range(self.n): # now include the choice of the distribution of noise selected_function = self._noise_mapping[self.distr[idx_meas]] noise = selected_function(size=[2, dae.nts]) * self.acc[idx_meas] idx_i_re, idx_i_im = dae.grid.get_node_index(self.bus_i)[1:3] dae.c_meas_matrix[self.ire, dae.nx :] = dae.grid.y_adm_matrix[idx_i_re, :] dae.c_meas_matrix[self.iim, dae.nx :] = dae.grid.y_adm_matrix[idx_i_im, :] dae.r_meas_noise_cov_matrix[self.ire, self.ire] = np.clip( self.acc**2, a_min=1e-10, a_max=1e10 ) dae.r_meas_noise_cov_matrix[self.iim, self.iim] = np.clip( self.acc**2, a_min=1e-10, a_max=1e10 ) # Indices of estimation nodes in the simulation vector nodes = np.array(dae_sim.grid.get_node_index(dae.grid.buses)[1:3]).flatten("F") dae.z_meas_points_matrix[self.ire] = ( dae.grid.y_adm_matrix[idx_i_re, :] @ dae_sim.y_full[ nodes, round(dae.T_start / dae_sim.t) : round(dae.T_end / dae_sim.t) : round( dae.t / dae_sim.t ), ] + noise[np.arange(0, 2, 2 * self.n)] ) dae.z_meas_points_matrix[self.iim] = ( dae.grid.y_adm_matrix[idx_i_im, :] @ dae_sim.y_full[ nodes, round(dae.T_start / dae_sim.t) : round(dae.T_end / dae_sim.t) : round( dae.t / dae_sim.t ), ] + noise[np.arange(1, 2, 2 * self.n)] )