Source code for pydynamicestimator.run

# Created: 2024-12-01
# Last Modified: 2026-03-19
# (c) Copyright 2024 ETH Zurich, Milos Katanic
# https://doi.org/10.5905/ethz-1007-842
#
# Licensed under the GNU General Public License v3.0;
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at:
#
#     https://www.gnu.org/licenses/gpl-3.0.en.html
#
# This software is distributed "AS IS", WITHOUT WARRANTY OF ANY KIND,
# express or implied. See the License for specific language governing
# permissions and limitations under the License.
#

# The code is based on the publication: Katanic, M., Lygeros, J., Hug, G.: Recursive dynamic state estimation for power systems with an incomplete nonlinear DAE model.
# IET Gener. Transm. Distrib. 18, 3657–3668 (2024). https://doi.org/10.1049/gtd2.13308
# The full paper version is available at: https://arxiv.org/abs/2305.10065v2
# See full metadata at: README.md
# For inquiries, contact: mkatanic@ethz.ch


from matplotlib import cm
from tabulate import tabulate

from pydynamicestimator.utils import data_loader
from pydynamicestimator.config import Config
import matplotlib.pyplot as plt
import importlib
from pydynamicestimator import system
from pathlib import Path
import numpy as np
import logging
import sys
import pandas as pd
import os

import matplotlib

# Prefer the interactive TkAgg backend for local use, but fall back to the
# non-interactive Agg backend in headless environments (e.g. CI) where Tk and
# a display are unavailable. Without this guard, importing this module aborts
# with "Cannot load backend 'TkAgg' ... as 'headless' is currently running".
try:
    matplotlib.use("TkAgg")
except ImportError:
    matplotlib.use("Agg")


[docs] def warn_filter_network_mismatch(device_list, line_dyn: bool) -> None: """Warn (but continue) when an inverter's output-filter realization is physically incoherent with the network model. A *quasi-static* filter (e.g. ``LCL_static`` -- the six filter quantities realized as algebraic variables) zeroes the fast LCL dynamics and so presumes a quasi-static network (``line_dyn=False``); a *dynamic* filter (``LCL`` -- those quantities as differential states) keeps them and presumes a dynamic network (``line_dyn=True``). The opposite pairings are physically incoherent (design doc Section 6) -- a deliberately odd pairing is allowed, but flagged. An inverter's filter realization is read from the filter strategy's own ``algebs()``: a quasi-static realization declares its quantities as private algebraics, a dynamic one declares none. (Asking the filter directly, rather than the host's aggregate ``_algebs_int``, keeps this correct once other strategies -- e.g. a current limiter -- start declaring their own algebraics.) Non-inverter devices have no ``_filter`` and are skipped. """ for dev in device_list: filt = getattr(dev, "_filter", None) if filt is None: continue static_filter = len(filt.algebs()) > 0 if static_filter and line_dyn: logging.warning( "Inverter at bus(es) %s uses a quasi-static filter (%s) on a " "DYNAMIC network (line_dyn=True): the filter zeroes the fast LCL " "dynamics the network still carries -- physically incoherent. Use " "a dynamic filter (LCL) or set line_dyn=False.", list(dev.bus), type(filt).__name__, ) elif not static_filter and not line_dyn: logging.warning( "Inverter at bus(es) %s uses a dynamic filter (%s) on a " "QUASI-STATIC network (line_dyn=False): the filter carries fast " "LCL dynamics the network has dropped -- physically incoherent. " "Use a quasi-static filter (LCL_static) or set line_dyn=True.", list(dev.bus), type(filt).__name__, )
[docs] def run(config: Config) -> tuple[system.DaeEst, system.DaeSim]: """Initialize function and run appropriate routines""" clear_module("pydynamicestimator.system") importlib.reload(system) # Set up logging logging.basicConfig(level=config.get_log_level()) base_dir = Path(os.path.dirname(os.path.abspath(__file__))) system_root = ( Path(config.system_root) if config.system_root is not None else base_dir / "systems" ) simfile = system_root / config.testsystemfile / "sim_param.txt" if config.skip_disturance is False: simdistfile = system_root / config.testsystemfile / "sim_dist.txt" if config.skip_est is False: estfile = system_root / config.testsystemfile / "est_param.txt" estdistfile = system_root / config.testsystemfile / "est_dist.txt" with open(simfile, "rt") as fid: data_loader.read(fid, "sim") if config.skip_disturance is False: with open(simdistfile, "rt") as fid: data_loader.read(fid, "sim") if config.skip_est is False: with open(estfile, "rt") as fid: data_loader.read(fid, "est") with open(estdistfile, "rt") as fid: data_loader.read(fid, "est") system.grid_sim.add_lines(system.line_sim) system.grid_est.add_lines(system.line_est) # Simulation for item in system.device_list_sim: if item.properties["xy_index"]: item.xy_index(system.dae_sim, system.grid_sim) system.dae_sim.t = config.ts # Wire grid + device list + bus_init onto the Dae instance BEFORE setup, # so that Dae.setup() (which sizes symbolic omega_ref vectors using # self.grid.nn/nb) and the reference-frame methods (update_omega, # compute_coi_expr, set_omega_single_idx_from_slack — which iterate # self.device_list and look up self.bus_init) use the correct objects. system.dae_sim.grid = system.grid_sim system.dae_sim.device_list = system.device_list_sim system.dae_sim.bus_init = system.bus_init_sim system.dae_sim.setup(**vars(config)) # Coherence check (warn-only): flag an inverter filter realization that does # not match the network model (dynamic LCL on a static network, or vice versa). warn_filter_network_mismatch(system.device_list_sim, config.line_dyn) system.grid_sim.setup(dae=system.dae_sim, bus_init=system.bus_init_sim) if config.print_power_flow: system.grid_sim.print_init_power_flow(system.dae_sim) for item in system.device_list_sim: if item.properties["finit"]: item.finit(system.dae_sim) for item in system.device_list_sim: if item.properties["fgcall"]: item.fgcall(system.dae_sim) system.grid_sim.gcall(system.dae_sim, line=system.line_sim) system.disturbance_sim.sort_chrono() system.dae_sim.check_initialization() if config.debug_check_init: system.dae_sim.debug_check_initialization() system.dae_sim.simulate(system.disturbance_sim) system.grid_sim.save_data(system.dae_sim) for item in system.device_list_sim: if item.properties["save_data"]: item.save_data(system.dae_sim) # Estimation if config.skip_est is False: for item in system.device_list_est: if item.properties["xy_index"]: item.xy_index(system.dae_est, system.grid_est) system.grid_est.setup(system.dae_est, system.grid_sim) system.dae_est.t = config.te # grid_est.setup() above already set dae_est.grid via init_symbolic. # Re-assign explicitly and wire the est-side device list / bus_init so # reference-frame methods on the Dae base class operate on the right # objects (avoids reaching for the sim module globals). system.dae_est.grid = system.grid_est system.dae_est.device_list = system.device_list_est system.dae_est.bus_init = system.bus_init_est system.dae_est.setup(**vars(config)) system.dae_est.unknown = system.bus_unknown_est.bus for device_est in system.device_list_est: if device_est.properties["finit"]: for idx_est in device_est.int.keys(): device_sim = find_device_sim(idx_est)[0] if device_sim is not None: device_est.init_from_simulation( device_sim, idx_est, system.dae_est, system.dae_sim ) else: logging.warning( f"Estimation device index {idx_est} not found simulation data. It will be ignored and the estimation will start from default initial value" ) for item in system.device_list_est: if item.properties["fgcall"]: item.fgcall(system.dae_est) if item.properties["qcall"]: item.qcall(system.dae_est) system.grid_est.gcall(system.dae_est) system.disturbance_est.sort_chrono() system.dae_est.estimate(dist=system.disturbance_est) for item in system.device_list_est: if item.properties["save_data"]: item.save_data(system.dae_est) system.grid_est.save_data(system.dae_est) if config.plot: fplot(config) if config.skip_est is False: print( "=======================================================================================================" ) print("The statistics of the estimation:") print( "=======================================================================================================" ) print(f"Sampling time step: {(1000*system.dae_est.t)} [ms]") print(f"Filter: {system.dae_est.filter}") print(f"Discretization scheme: {system.dae_est.int_scheme}") print( f"Average computation time (per sampling step): {(1000*np.mean(system.dae_est.time_full)).round(2)} [ms]" ) print( f"Maximal computation time (per sampling step): {(1000*np.max(system.dae_est.time_full)).round(2)} [ms]" ) print( f"Average number of iterations (per sampling step): {np.mean(system.dae_est.iter_full).round(2)}" ) print( f"Maximal number of iterations (over all sampling steps): {round(np.max(system.dae_est.iter_full))}" ) print( "-------------------------------------------------------------------------------------------------------" ) return system.dae_est, system.dae_sim
[docs] def fplot(config: Config): """Plot voltage and differential states based on configuration settings.""" logging.basicConfig(level=logging.WARNING) # Set logging level # Estimation time steps est_time = system.dae_est.time_steps # Simulation time steps sim_time = system.dae_sim.time_steps if config.skip_est is False: title_string = "Estimated: Dashed; True: Solid" else: title_string = " " # Plot voltage profiles if enabled if config.plot_voltage: plt.figure(figsize=(15, 11)) if config.skip_est is False: viridis = cm.get_cmap("viridis", system.dae_est.grid.nn) plotting_buses = list(enumerate(system.dae_est.grid.buses)) for i, node in plotting_buses: try: # Plot estimation data est_voltage = np.sqrt( system.grid_est.yf[node][0, :] ** 2 + system.grid_est.yf[node][1, :] ** 2 ) plt.plot( system.dae_est.time_steps, est_voltage, color=viridis(i), linestyle=":", ) except KeyError: logging.warning(f"Node {node} not estimated.") else: viridis = cm.get_cmap("viridis", system.dae_sim.grid.nn) plotting_buses = enumerate(system.dae_sim.grid.buses) for i, node in plotting_buses: try: # Plot simulation data sim_voltage = np.sqrt( system.grid_sim.yf[node][0, :] ** 2 + system.grid_sim.yf[node][1, :] ** 2 ) plt.plot( system.dae_sim.time_steps, sim_voltage, color=viridis(i), label=f"{node}", ) except KeyError: logging.warning(f"Node {node} cannot be plotted.") plt.legend() plt.title(f"Voltage Profiles\n{title_string}", fontsize=16) plt.xlabel("Time [s]") plt.ylabel("Voltage Magnitude") # plt.savefig('voltage.png') # Dataframe for showing the error of algebraic state estimation if config.skip_est is False: # Find closest simulation indices for each estimation time step closest_sim_indices = [np.argmin(np.abs(sim_time - t)) for t in est_time] error_summary = [] for node in system.dae_est.grid.buses: try: # Estimation data est_real = system.grid_est.yf[node][0, :] est_imag = system.grid_est.yf[node][1, :] est_mag = np.sqrt(est_real**2 + est_imag**2) est_angle = np.arctan2(est_imag, est_real) # Simulation data at closest matching time steps sim_real = system.grid_sim.yf[node][0, closest_sim_indices] sim_imag = system.grid_sim.yf[node][1, closest_sim_indices] sim_mag = np.sqrt(sim_real**2 + sim_imag**2) sim_angle = np.arctan2(sim_imag, sim_real) # Errors mag_error = sim_mag - est_mag angle_error = np.unwrap(sim_angle - est_angle) # Metrics mse_mag = np.sqrt(np.mean(mag_error**2)) mse_ang = np.sqrt(np.mean(angle_error**2)) mae_mag = np.mean(np.abs(mag_error)) mae_ang = np.mean(np.abs(angle_error)) # Add to summary list error_summary.append( { "Node": node, "RMSE magnitude": mse_mag, "RMSE angle (rad)": mse_ang, "MAE magnitude": mae_mag, "MAE angle (rad)": mae_ang, } ) except KeyError: logging.warning(f"Node {node} missing in estimation or simulation.") # Create DataFrame from the error summary summary_df = pd.DataFrame(error_summary) summary_df.set_index("Node", inplace=True) summary_df = tabulate(summary_df, headers="keys") print( "=======================================================================================================" ) print("The errors of algebraic state estimation:") print( "=======================================================================================================" ) print(summary_df) print( "-------------------------------------------------------------------------------------------------------" ) # Plot differential states if enabled if config.plot_diff: if config.skip_est is False: devices = system.device_list_est else: devices = system.device_list_sim for device_plt in devices: if device_plt.properties["fplot"]: # config.plot_machines.reverse() num_units = device_plt.n num_states = device_plt.ns # Create subplots with shared x-axis figure, axis = plt.subplots( num_units, num_states, sharex=True, figsize=(num_states * 5, num_units * 5), ) axis = np.atleast_2d(axis) figure.supxlabel("Time [s]", fontsize=15) if "delta" in device_plt.states: # Align delta angles with the reference unit n_est_ref = 0 # First device taken as reference angle reference_est = device_plt.xf["delta"][n_est_ref].copy() device_sim, n_sim_ref = find_device_sim(next(iter(device_plt.int))) reference_sim = device_sim.xf["delta"][n_sim_ref].copy() for idx, n_est in device_plt.int.items(): try: device_sim, n_sim = find_device_sim(idx) except ValueError as e: logging.warning( f"Machine {idx} not found in simulation or estimation: {e}" ) continue if "delta" in device_plt.states: device_sim.xf["delta"][n_sim] -= reference_sim device_plt.xf["delta"][n_est] -= reference_est # Initialize the error summary list for states state_error_summary = [] for col, state in enumerate(device_plt.states): unit = device_plt.units[col] t_sim = np.arange( system.dae_sim.T_start, system.dae_sim.T_end, system.dae_sim.t, ) t_est = np.arange( system.dae_est.T_start, system.dae_est.T_end, system.dae_est.t, ) try: # Plot simulation data axis[n_est, col].plot( t_sim, device_sim.xf[state][n_sim], color="black" ) # Plot estimation data if config.skip_est is False: axis[n_est, col].plot( t_est, device_plt.xf[state][n_est], color="black", linestyle=":", ) axis[n_est, col].tick_params(labelsize=15) # Label the y-axis only in the first column if col == 0: axis[n_est, col].set_ylabel( f"Device {idx}", fontsize=15 ) # Add a title to each column indicating the state if n_est == 0: axis[n_est, col].set_title(state, fontsize=15) # Now cfill the dataframe # Matching the simulation data with the estimation time steps if config.skip_est is False: sim_state = device_sim.xf[state][n_sim][ closest_sim_indices ] est_state = device_plt.xf[state][n_est] # Compute errors: (difference between simulation and estimation) state_error = sim_state - est_state # Compute MSE and MAE for the state mse_state = np.sqrt(np.mean(state_error**2)) mae_state = np.mean(np.abs(state_error)) # Add the results to the summary state_error_summary.append( { "State": state, "Unit": unit, "RMSE": mse_state, "MAE": mae_state, } ) except KeyError: logging.warning( f"State '{state}' not found for unit {device_plt}." ) if config.skip_est is False: state_error_df = pd.DataFrame(state_error_summary) state_error_df.set_index("State", inplace=True) state_error_df = tabulate(state_error_df, headers="keys") # Display the DataFrame print( "=======================================================================================================" ) print( f"The errors of differential states estimates of device {idx}:" ) print( "=======================================================================================================" ) print(state_error_df) print( "-------------------------------------------------------------------------------------------------------" ) # Adjust layout for clarity figure.suptitle( f"Differential States of {device_plt._name}\n{title_string}", fontsize=16, ) plt.tight_layout(rect=[0, 0, 1, 0.95]) # plt.savefig(f'{device_est._name}_diffstates.png') if config.skip_est is False: logging.info("Simulation and estimation successful!") else: logging.info( "Simulation successful, estimation skipped (please do it later)!" ) # The small-signal analysis (modal report + frequency-banded participation # figures) is run inside dae_sim.simulate(), at the operating point and # BEFORE the time-stepping loop, when config.small_signal_analysis is True — # so it is shown even if the simulation/estimation later fails. plt.show(block=True)
[docs] def clear_module(module_name): """Remove all attributes from a module, preparing it for a clean reload.""" if module_name in sys.modules: module = sys.modules[module_name] module_dict = module.__dict__ to_delete = [ name for name in module_dict if not name.startswith("__") # Avoid special and built-in attributes ] for name in to_delete: del module_dict[name]
[docs] def find_device_sim(idx_est): device_sim_found = next( ( device_sim for device_sim in system.device_list_sim if any(idx_est == idx_sim for idx_sim in device_sim.int.keys()) ), None, ) n_sim = device_sim_found.int[idx_est] if device_sim_found else None return device_sim_found, n_sim