# Created: 2024-12-01
# Last Modified: 2025-04-28
# (c) Copyright 2024 ETH Zurich, Milos Katanic
# https://doi.org/10.5905/ethz-1007-842
#
# Licensed under the GNU General Public License v3.0;
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at:
#
# https://www.gnu.org/licenses/gpl-3.0.en.html
#
# This software is distributed "AS IS", WITHOUT WARRANTY OF ANY KIND,
# express or implied. See the License for specific language governing
# permissions and limitations under the License.
#
# The code is based on the publication: Katanic, M., Lygeros, J., Hug, G.: Recursive dynamic state estimation for power systems with an incomplete nonlinear DAE model.
# IET Gener. Transm. Distrib. 18, 3657–3668 (2024). https://doi.org/10.1049/gtd2.13308
# The full paper version is available at: https://arxiv.org/abs/2305.10065v2
# See full metadata at: README.md
# For inquiries, contact: mkatanic@ethz.ch
from matplotlib import cm
from tabulate import tabulate
from pydynamicestimator.utils import data_loader
from pydynamicestimator.config import Config
import matplotlib.pyplot as plt
import importlib
from pydynamicestimator import system
import numpy as np
import logging
import sys
import pandas as pd
# import matplotlib
#
# matplotlib.use("TkAgg")
[docs]
def run(config: Config) -> tuple[system.DaeEst, system.DaeSim]:
"""Initialize function and run appropriate routines"""
clear_module("pydynamicestimator.system")
importlib.reload(system)
# Set up logging
logging.basicConfig(level=config.get_log_level())
simfile = config.testsystemfile / "sim_param.txt"
simdistfile = config.testsystemfile / "sim_dist.txt"
estfile = config.testsystemfile / "est_param.txt"
estdistfile = config.testsystemfile / "est_dist.txt"
with open(simfile, "rt") as fid:
data_loader.read(fid, "sim")
with open(simdistfile, "rt") as fid:
data_loader.read(fid, "sim")
with open(estfile, "rt") as fid:
data_loader.read(fid, "est")
with open(estdistfile, "rt") as fid:
data_loader.read(fid, "est")
system.grid_sim.add_lines(system.line_sim)
system.grid_est.add_lines(system.line_est)
# Simulation
for item in system.device_list_sim:
if item.properties["xy_index"]:
item.xy_index(system.dae_sim, system.grid_sim)
system.dae_sim.t = config.ts
system.dae_sim.setup(**vars(config))
system.grid_sim.setup(dae=system.dae_sim, bus_init=system.bus_init_sim)
for item in system.device_list_sim:
if item.properties["finit"]:
item.finit(system.dae_sim)
for item in system.device_list_sim:
if item.properties["fgcall"]:
item.fgcall(system.dae_sim)
system.grid_sim.gcall(system.dae_sim)
system.disturbance_sim.sort_chrono()
system.dae_sim.simulate(system.disturbance_sim)
system.grid_sim.save_data(system.dae_sim)
for item in system.device_list_sim:
if item.properties["save_data"]:
item.save_data(system.dae_sim)
# Estimation
for item in system.device_list_est:
if item.properties["xy_index"]:
item.xy_index(system.dae_est, system.grid_est)
system.grid_est.setup(system.dae_est, system.grid_sim)
system.dae_est.t = config.te
system.dae_est.setup(**vars(config))
system.dae_est.unknown = system.bus_unknown_est.bus
for device_est in system.device_list_est:
if device_est.properties["finit"]:
for idx_est in device_est.int.keys():
device_sim = find_device_sim(idx_est)[0]
if device_sim is not None:
device_est.init_from_simulation(
device_sim, idx_est, system.dae_est, system.dae_sim
)
else:
logging.warning(
f"Estimation device index {idx_est} not found simulation data. It will be ignored and the estimation will start from default initial value"
)
for item in system.device_list_est:
if item.properties["fgcall"]:
item.fgcall(system.dae_est)
if item.properties["qcall"]:
item.qcall(system.dae_est)
system.grid_est.gcall(system.dae_est)
system.disturbance_est.sort_chrono()
system.dae_est.estimate(dist=system.disturbance_est)
for item in system.device_list_est:
if item.properties["save_data"]:
item.save_data(system.dae_est)
system.grid_est.save_data(system.dae_est)
if config.plot:
fplot(config)
print(
"======================================================================================================="
)
print("The statistics of the estimation:")
print(
"======================================================================================================="
)
print(f"Sampling time step: {(1000*system.dae_est.t)} [ms]")
print(f"Filter: {system.dae_est.filter}")
print(f"Discretization scheme: {system.dae_est.int_scheme}")
print(
f"Average computation time (per sampling step): {(1000*np.mean(system.dae_est.time_full)).round(2)} [ms]"
)
print(
f"Maximal computation time (per sampling step): {(1000*np.max(system.dae_est.time_full)).round(2)} [ms]"
)
print(
f"Average number of iterations (per sampling step): {np.mean(system.dae_est.iter_full).round(2)}"
)
print(
f"Maximal number of iterations (over all sampling steps): {round(np.max(system.dae_est.iter_full))}"
)
print(
"-------------------------------------------------------------------------------------------------------"
)
return system.dae_est, system.dae_sim
[docs]
def fplot(config: Config):
"""Plot voltage and differential states based on configuration settings."""
logging.basicConfig(level=logging.WARNING) # Set logging level
# Estimation time steps
est_time = system.dae_est.time_steps
# Simulation time steps
sim_time = system.dae_sim.time_steps
# Find closest simulation indices for each estimation time step
closest_sim_indices = [np.argmin(np.abs(sim_time - t)) for t in est_time]
# Plot voltage profiles if enabled
if config.plot_voltage:
viridis = cm.get_cmap("viridis", system.dae_est.grid.nn)
for i, node in enumerate(system.dae_est.grid.buses):
try:
# Plot estimation data
est_voltage = np.sqrt(
system.grid_est.yf[node][0, :] ** 2
+ system.grid_est.yf[node][1, :] ** 2
)
plt.plot(
system.dae_est.time_steps,
est_voltage,
color=viridis(i),
linestyle=":",
)
except KeyError:
logging.warning(f"Node {node} not estimated.")
try:
# Plot simulation data
sim_voltage = np.sqrt(
system.grid_sim.yf[node][0, :] ** 2
+ system.grid_sim.yf[node][1, :] ** 2
)
plt.plot(
system.dae_sim.time_steps,
sim_voltage,
color=viridis(i),
label=f"{node}",
)
except KeyError:
logging.warning(f"Node {node} does not exist in simulation.")
plt.legend()
plt.title("Voltage Profiles")
plt.xlabel("Time")
plt.ylabel("Voltage Magnitude")
# plt.savefig('voltage.png')
# Dataframe for showing the error of algebraic state estimation
error_summary = []
for node in system.dae_est.grid.buses:
try:
# Estimation data
est_real = system.grid_est.yf[node][0, :]
est_imag = system.grid_est.yf[node][1, :]
est_mag = np.sqrt(est_real**2 + est_imag**2)
est_angle = np.arctan2(est_imag, est_real)
# Simulation data at closest matching time steps
sim_real = system.grid_sim.yf[node][0, closest_sim_indices]
sim_imag = system.grid_sim.yf[node][1, closest_sim_indices]
sim_mag = np.sqrt(sim_real**2 + sim_imag**2)
sim_angle = np.arctan2(sim_imag, sim_real)
# Errors
mag_error = sim_mag - est_mag
angle_error = np.unwrap(sim_angle - est_angle)
# Metrics
mse_mag = np.sqrt(np.mean(mag_error**2))
mse_ang = np.sqrt(np.mean(angle_error**2))
mae_mag = np.mean(np.abs(mag_error))
mae_ang = np.mean(np.abs(angle_error))
# Add to summary list
error_summary.append(
{
"Node": node,
"RMSE magnitude": mse_mag,
"RMSE angle (rad)": mse_ang,
"MAE magnitude": mae_mag,
"MAE angle (rad)": mae_ang,
}
)
except KeyError:
logging.warning(f"Node {node} missing in estimation or simulation.")
# Create DataFrame from the error summary
summary_df = pd.DataFrame(error_summary)
summary_df.set_index("Node", inplace=True)
summary_df = tabulate(summary_df, headers="keys")
print(
"======================================================================================================="
)
print("The errors of algebraic state estimation:")
print(
"======================================================================================================="
)
print(summary_df)
print(
"-------------------------------------------------------------------------------------------------------"
)
# Plot differential states if enabled
if config.plot_diff:
for device_est in system.device_list_est:
if device_est.properties["fplot"]:
# config.plot_machines.reverse()
num_units = device_est.n
num_states = device_est.ns
# Create subplots with shared x-axis
figure, axis = plt.subplots(
num_units, num_states, sharex=True, figsize=(25, 10)
)
axis = np.atleast_2d(axis)
figure.supxlabel("Time (s)", fontsize=12)
if "delta" in device_est.states:
# Align delta angles with the reference unit
n_est_ref = 0 # First device taken as reference angle
reference_est = device_est.xf["delta"][n_est_ref].copy()
device_sim, n_sim_ref = find_device_sim(next(iter(device_est.int)))
reference_sim = device_sim.xf["delta"][n_sim_ref].copy()
for idx, n_est in device_est.int.items():
try:
device_sim, n_sim = find_device_sim(idx)
except ValueError as e:
logging.warning(
f"Machine {idx} not found in simulation or estimation: {e}"
)
continue
if "delta" in device_est.states:
device_sim.xf["delta"][n_sim] -= reference_sim
device_est.xf["delta"][n_est] -= reference_est
# Initialize the error summary list for states
state_error_summary = []
for col, state in enumerate(device_est.states):
unit = device_est.units[col]
t_sim = np.arange(
system.dae_sim.T_start,
system.dae_sim.T_end,
system.dae_sim.t,
)
t_est = np.arange(
system.dae_est.T_start,
system.dae_est.T_end,
system.dae_est.t,
)
try:
# Plot simulation data
axis[n_est, col].plot(
t_sim,
device_sim.xf[state][n_sim],
)
# Plot estimation data
axis[n_est, col].plot(
t_est, device_est.xf[state][n_est], linestyle=":"
)
# Label the y-axis only in the first column
if col == 0:
axis[n_est, col].set_ylabel(f"Device {idx}")
# Add a title to each column indicating the state
if n_est == 0:
axis[n_est, col].set_title(state)
# Now cfill the dataframe
# Matching the simulation data with the estimation time steps
sim_state = device_sim.xf[state][n_sim][closest_sim_indices]
est_state = device_est.xf[state][n_est]
# Compute errors: (difference between simulation and estimation)
state_error = sim_state - est_state
# Compute MSE and MAE for the state
mse_state = np.sqrt(np.mean(state_error**2))
mae_state = np.mean(np.abs(state_error))
# Add the results to the summary
state_error_summary.append(
{
"State": state,
"Unit": unit,
"RMSE": mse_state,
"MAE": mae_state,
}
)
except KeyError:
logging.warning(
f"State '{state}' not found for unit {device_est}."
)
state_error_df = pd.DataFrame(state_error_summary)
state_error_df.set_index("State", inplace=True)
state_error_df = tabulate(state_error_df, headers="keys")
# Display the DataFrame
print(
"======================================================================================================="
)
print(
f"The errors of differential states estimates of device {idx}:"
)
print(
"======================================================================================================="
)
print(state_error_df)
print(
"-------------------------------------------------------------------------------------------------------"
)
# Adjust layout for clarity
figure.suptitle(
f"Differential States of {device_est._name}", fontsize=14
)
plt.tight_layout(rect=[0, 0, 1, 0.95])
# plt.savefig(f'{device_est._name}_diffstates.png')
# Show all plots
plt.show()
[docs]
def clear_module(module_name):
"""Remove all attributes from a module, preparing it for a clean reload."""
if module_name in sys.modules:
module = sys.modules[module_name]
module_dict = module.__dict__
to_delete = [
name
for name in module_dict
if not name.startswith("__") # Avoid special and built-in attributes
]
for name in to_delete:
del module_dict[name]
[docs]
def find_device_sim(idx_est):
device_sim_found = next(
(
device_sim
for device_sim in system.device_list_sim
if any(idx_est == idx_sim for idx_sim in device_sim.int.keys())
),
None,
)
n_sim = device_sim_found.int[idx_est] if device_sim_found else None
return device_sim_found, n_sim