Source code for pydynamicestimator.run

# Created: 2024-12-01
# Last Modified: 2025-05-16
# (c) Copyright 2024 ETH Zurich, Milos Katanic
# https://doi.org/10.5905/ethz-1007-842
#
# Licensed under the GNU General Public License v3.0;
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at:
#
#     https://www.gnu.org/licenses/gpl-3.0.en.html
#
# This software is distributed "AS IS", WITHOUT WARRANTY OF ANY KIND,
# express or implied. See the License for specific language governing
# permissions and limitations under the License.
#

# The code is based on the publication: Katanic, M., Lygeros, J., Hug, G.: Recursive dynamic state estimation for power systems with an incomplete nonlinear DAE model.
# IET Gener. Transm. Distrib. 18, 3657–3668 (2024). https://doi.org/10.1049/gtd2.13308
# The full paper version is available at: https://arxiv.org/abs/2305.10065v2
# See full metadata at: README.md
# For inquiries, contact: mkatanic@ethz.ch


from matplotlib import cm
from tabulate import tabulate

from pydynamicestimator.utils import data_loader
from pydynamicestimator.config import Config
import matplotlib.pyplot as plt
import importlib
from pydynamicestimator import system
from pathlib import Path
import numpy as np
import logging
import sys
import pandas as pd
import os

# import matplotlib

# matplotlib.use("TkAgg")


[docs] def run(config: Config) -> tuple[system.DaeEst, system.DaeSim]: """Initialize function and run appropriate routines""" clear_module("pydynamicestimator.system") importlib.reload(system) # Set up logging logging.basicConfig(level=config.get_log_level()) base_dir = Path(os.path.dirname(os.path.abspath(__file__))) simfile = base_dir / "testcases" / config.testsystemfile / "sim_param.txt" simdistfile = base_dir / "testcases" / config.testsystemfile / "sim_dist.txt" estfile = base_dir / "testcases" / config.testsystemfile / "est_param.txt" estdistfile = base_dir / "testcases" / config.testsystemfile / "est_dist.txt" with open(simfile, "rt") as fid: data_loader.read(fid, "sim") with open(simdistfile, "rt") as fid: data_loader.read(fid, "sim") with open(estfile, "rt") as fid: data_loader.read(fid, "est") with open(estdistfile, "rt") as fid: data_loader.read(fid, "est") system.grid_sim.add_lines(system.line_sim) system.grid_est.add_lines(system.line_est) # Simulation for item in system.device_list_sim: if item.properties["xy_index"]: item.xy_index(system.dae_sim, system.grid_sim) system.dae_sim.t = config.ts system.dae_sim.setup(**vars(config)) system.grid_sim.setup(dae=system.dae_sim, bus_init=system.bus_init_sim) for item in system.device_list_sim: if item.properties["finit"]: item.finit(system.dae_sim) for item in system.device_list_sim: if item.properties["fgcall"]: item.fgcall(system.dae_sim) system.grid_sim.gcall(system.dae_sim) system.disturbance_sim.sort_chrono() system.dae_sim.simulate(system.disturbance_sim) system.grid_sim.save_data(system.dae_sim) for item in system.device_list_sim: if item.properties["save_data"]: item.save_data(system.dae_sim) # Estimation for item in system.device_list_est: if item.properties["xy_index"]: item.xy_index(system.dae_est, system.grid_est) system.grid_est.setup(system.dae_est, system.grid_sim) system.dae_est.t = config.te system.dae_est.setup(**vars(config)) system.dae_est.unknown = system.bus_unknown_est.bus for device_est in system.device_list_est: if device_est.properties["finit"]: for idx_est in device_est.int.keys(): device_sim = find_device_sim(idx_est)[0] if device_sim is not None: device_est.init_from_simulation( device_sim, idx_est, system.dae_est, system.dae_sim ) else: logging.warning( f"Estimation device index {idx_est} not found simulation data. It will be ignored and the estimation will start from default initial value" ) for item in system.device_list_est: if item.properties["fgcall"]: item.fgcall(system.dae_est) if item.properties["qcall"]: item.qcall(system.dae_est) system.grid_est.gcall(system.dae_est) system.disturbance_est.sort_chrono() system.dae_est.estimate(dist=system.disturbance_est) for item in system.device_list_est: if item.properties["save_data"]: item.save_data(system.dae_est) system.grid_est.save_data(system.dae_est) if config.plot: fplot(config) print( "=======================================================================================================" ) print("The statistics of the estimation:") print( "=======================================================================================================" ) print(f"Sampling time step: {(1000*system.dae_est.t)} [ms]") print(f"Filter: {system.dae_est.filter}") print(f"Discretization scheme: {system.dae_est.int_scheme}") print( f"Average computation time (per sampling step): {(1000*np.mean(system.dae_est.time_full)).round(2)} [ms]" ) print( f"Maximal computation time (per sampling step): {(1000*np.max(system.dae_est.time_full)).round(2)} [ms]" ) print( f"Average number of iterations (per sampling step): {np.mean(system.dae_est.iter_full).round(2)}" ) print( f"Maximal number of iterations (over all sampling steps): {round(np.max(system.dae_est.iter_full))}" ) print( "-------------------------------------------------------------------------------------------------------" ) return system.dae_est, system.dae_sim
[docs] def fplot(config: Config): """Plot voltage and differential states based on configuration settings.""" logging.basicConfig(level=logging.WARNING) # Set logging level # Estimation time steps est_time = system.dae_est.time_steps # Simulation time steps sim_time = system.dae_sim.time_steps # Find closest simulation indices for each estimation time step closest_sim_indices = [np.argmin(np.abs(sim_time - t)) for t in est_time] # Plot voltage profiles if enabled if config.plot_voltage: plt.figure(figsize=(15, 11)) viridis = cm.get_cmap("viridis", system.dae_est.grid.nn) for i, node in enumerate(system.dae_est.grid.buses): try: # Plot estimation data est_voltage = np.sqrt( system.grid_est.yf[node][0, :] ** 2 + system.grid_est.yf[node][1, :] ** 2 ) plt.plot( system.dae_est.time_steps, est_voltage, color=viridis(i), linestyle=":", ) except KeyError: logging.warning(f"Node {node} not estimated.") try: # Plot simulation data sim_voltage = np.sqrt( system.grid_sim.yf[node][0, :] ** 2 + system.grid_sim.yf[node][1, :] ** 2 ) plt.plot( system.dae_sim.time_steps, sim_voltage, color=viridis(i), label=f"{node}", ) except KeyError: logging.warning(f"Node {node} does not exist in simulation.") plt.legend() plt.title("Voltage Profiles\n(Estimated: Dashed; True: Solid)") plt.xlabel("Time [s]") plt.ylabel("Voltage Magnitude") # plt.savefig('voltage.png') # Dataframe for showing the error of algebraic state estimation error_summary = [] for node in system.dae_est.grid.buses: try: # Estimation data est_real = system.grid_est.yf[node][0, :] est_imag = system.grid_est.yf[node][1, :] est_mag = np.sqrt(est_real**2 + est_imag**2) est_angle = np.arctan2(est_imag, est_real) # Simulation data at closest matching time steps sim_real = system.grid_sim.yf[node][0, closest_sim_indices] sim_imag = system.grid_sim.yf[node][1, closest_sim_indices] sim_mag = np.sqrt(sim_real**2 + sim_imag**2) sim_angle = np.arctan2(sim_imag, sim_real) # Errors mag_error = sim_mag - est_mag angle_error = np.unwrap(sim_angle - est_angle) # Metrics mse_mag = np.sqrt(np.mean(mag_error**2)) mse_ang = np.sqrt(np.mean(angle_error**2)) mae_mag = np.mean(np.abs(mag_error)) mae_ang = np.mean(np.abs(angle_error)) # Add to summary list error_summary.append( { "Node": node, "RMSE magnitude": mse_mag, "RMSE angle (rad)": mse_ang, "MAE magnitude": mae_mag, "MAE angle (rad)": mae_ang, } ) except KeyError: logging.warning(f"Node {node} missing in estimation or simulation.") # Create DataFrame from the error summary summary_df = pd.DataFrame(error_summary) summary_df.set_index("Node", inplace=True) summary_df = tabulate(summary_df, headers="keys") print( "=======================================================================================================" ) print("The errors of algebraic state estimation:") print( "=======================================================================================================" ) print(summary_df) print( "-------------------------------------------------------------------------------------------------------" ) # Plot differential states if enabled if config.plot_diff: for device_est in system.device_list_est: if device_est.properties["fplot"]: # config.plot_machines.reverse() num_units = device_est.n num_states = device_est.ns # Create subplots with shared x-axis figure, axis = plt.subplots( num_units, num_states, sharex=True, figsize=(num_states * 5, num_units * 5), ) axis = np.atleast_2d(axis) figure.supxlabel("Time [s]", fontsize=15) if "delta" in device_est.states: # Align delta angles with the reference unit n_est_ref = 0 # First device taken as reference angle reference_est = device_est.xf["delta"][n_est_ref].copy() device_sim, n_sim_ref = find_device_sim(next(iter(device_est.int))) reference_sim = device_sim.xf["delta"][n_sim_ref].copy() for idx, n_est in device_est.int.items(): try: device_sim, n_sim = find_device_sim(idx) except ValueError as e: logging.warning( f"Machine {idx} not found in simulation or estimation: {e}" ) continue if "delta" in device_est.states: device_sim.xf["delta"][n_sim] -= reference_sim device_est.xf["delta"][n_est] -= reference_est # Initialize the error summary list for states state_error_summary = [] for col, state in enumerate(device_est.states): unit = device_est.units[col] t_sim = np.arange( system.dae_sim.T_start, system.dae_sim.T_end, system.dae_sim.t, ) t_est = np.arange( system.dae_est.T_start, system.dae_est.T_end, system.dae_est.t, ) try: # Plot simulation data axis[n_est, col].plot( t_sim, device_sim.xf[state][n_sim], color="black" ) # Plot estimation data axis[n_est, col].plot( t_est, device_est.xf[state][n_est], color="black", linestyle=":", ) axis[n_est, col].plot( t_est, device_est.xf[state][n_est], color="black", linestyle=":", ) axis[n_est, col].tick_params(labelsize=15) # Label the y-axis only in the first column if col == 0: axis[n_est, col].set_ylabel( f"Device {idx}", fontsize=15 ) # Add a title to each column indicating the state if n_est == 0: axis[n_est, col].set_title(state, fontsize=15) # Now cfill the dataframe # Matching the simulation data with the estimation time steps sim_state = device_sim.xf[state][n_sim][closest_sim_indices] est_state = device_est.xf[state][n_est] # Compute errors: (difference between simulation and estimation) state_error = sim_state - est_state # Compute MSE and MAE for the state mse_state = np.sqrt(np.mean(state_error**2)) mae_state = np.mean(np.abs(state_error)) # Add the results to the summary state_error_summary.append( { "State": state, "Unit": unit, "RMSE": mse_state, "MAE": mae_state, } ) except KeyError: logging.warning( f"State '{state}' not found for unit {device_est}." ) state_error_df = pd.DataFrame(state_error_summary) state_error_df.set_index("State", inplace=True) state_error_df = tabulate(state_error_df, headers="keys") # Display the DataFrame print( "=======================================================================================================" ) print( f"The errors of differential states estimates of device {idx}:" ) print( "=======================================================================================================" ) print(state_error_df) print( "-------------------------------------------------------------------------------------------------------" ) # Adjust layout for clarity figure.suptitle( f"Differential States of {device_est._name}\n(Dashed: Estimated, Solid: True)", fontsize=16, ) plt.tight_layout(rect=[0, 0, 1, 0.95]) # plt.savefig(f'{device_est._name}_diffstates.png') logging.info("Simulation and estimation successful!") plt.show()
[docs] def clear_module(module_name): """Remove all attributes from a module, preparing it for a clean reload.""" if module_name in sys.modules: module = sys.modules[module_name] module_dict = module.__dict__ to_delete = [ name for name in module_dict if not name.startswith("__") # Avoid special and built-in attributes ] for name in to_delete: del module_dict[name]
[docs] def find_device_sim(idx_est): device_sim_found = next( ( device_sim for device_sim in system.device_list_sim if any(idx_est == idx_sim for idx_sim in device_sim.int.keys()) ), None, ) n_sim = device_sim_found.int[idx_est] if device_sim_found else None return device_sim_found, n_sim