Source code for hermess.run

# © 2024-2026 ETH Zurich
# Original author: Milos Katanic
# Simulation-only fork & maintainer: Maitraya Avadhut Desai
#
# Licensed under the GNU General Public License v3.0;
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at:
#
#     https://www.gnu.org/licenses/gpl-3.0.en.html
#
# This software is distributed "AS IS", WITHOUT WARRANTY OF ANY KIND,
# express or implied. See the License for specific language governing
# permissions and limitations under the License.
#
# Simulation-only fork of PowerDynamicEstimator
# (https://doi.org/10.5905/ethz-1007-842); dynamic state estimation removed.
# For inquiries, contact: mdesai@ethz.ch

from matplotlib import cm

from hermess.utils import data_loader
from hermess.config import Config
import matplotlib.pyplot as plt
import importlib
from hermess import system
from pathlib import Path
import numpy as np
import logging
import sys
import os

import matplotlib

# Use the interactive TkAgg backend, falling back to the non-interactive Agg
# backend in headless environments (e.g. CI) where Tk and a display are absent.
try:
    matplotlib.use("TkAgg")
except ImportError:
    matplotlib.use("Agg")


[docs] def warn_filter_network_mismatch(device_list, line_dyn: bool) -> None: """Warn (but continue) when an inverter's output-filter realization is physically incoherent with the network model. A quasi-static filter (``LCL_static``, the six filter quantities as algebraic variables) zeroes the fast LCL dynamics and presumes a quasi-static network (``line_dyn=False``); a dynamic filter (``LCL``, those quantities as differential states) presumes a dynamic network (``line_dyn=True``). The crossed pairings are physically incoherent and flagged. The filter realization is read from the filter strategy's own ``algebs()``: a quasi-static realization declares its quantities as private algebraics, a dynamic one declares none. Non-inverter devices have no ``_filter`` and are skipped. """ for dev in device_list: filt = getattr(dev, "_filter", None) if filt is None: continue static_filter = len(filt.algebs()) > 0 if static_filter and line_dyn: logging.warning( "Inverter at bus(es) %s uses a quasi-static filter (%s) on a " "DYNAMIC network (line_dyn=True): the filter zeroes the fast LCL " "dynamics the network still carries -- physically incoherent. Use " "a dynamic filter (LCL) or set line_dyn=False.", list(dev.bus), type(filt).__name__, ) elif not static_filter and not line_dyn: logging.warning( "Inverter at bus(es) %s uses a dynamic filter (%s) on a " "QUASI-STATIC network (line_dyn=False): the filter carries fast " "LCL dynamics the network has dropped -- physically incoherent. " "Use a quasi-static filter (LCL_static) or set line_dyn=True.", list(dev.bus), type(filt).__name__, )
[docs] def run(config: Config) -> system.DaeSim: """Initialize and run the dynamic simulation.""" clear_module("hermess.system") importlib.reload(system) # Set up logging logging.basicConfig(level=config.get_log_level()) base_dir = Path(os.path.dirname(os.path.abspath(__file__))) system_root = ( Path(config.system_root) if config.system_root is not None else base_dir / "systems" ) simfile = system_root / config.testsystemfile / "sim_param.txt" if config.skip_disturance is False: simdistfile = system_root / config.testsystemfile / "sim_dist.txt" with open(simfile, "rt") as fid: data_loader.read(fid, "sim") if config.skip_disturance is False: with open(simdistfile, "rt") as fid: data_loader.read(fid, "sim") system.grid_sim.add_lines(system.line_sim) # Simulation for item in system.device_list_sim: if item.properties["xy_index"]: item.xy_index(system.dae_sim, system.grid_sim) system.dae_sim.t = config.ts # Attach grid, device list and bus_init before setup(), which sizes the # symbolic reference-frequency vectors from the grid and is also used by the # reference-frame methods that iterate the device list and bus_init. system.dae_sim.grid = system.grid_sim system.dae_sim.device_list = system.device_list_sim system.dae_sim.bus_init = system.bus_init_sim system.dae_sim.setup(**vars(config)) # Warn-only coherence check: inverter filter realization vs network model. warn_filter_network_mismatch(system.device_list_sim, config.line_dyn) system.grid_sim.setup(dae=system.dae_sim, bus_init=system.bus_init_sim) if config.print_power_flow: system.grid_sim.print_init_power_flow(system.dae_sim) for item in system.device_list_sim: if item.properties["finit"]: item.finit(system.dae_sim) for item in system.device_list_sim: if item.properties["fgcall"]: item.fgcall(system.dae_sim) system.grid_sim.gcall(system.dae_sim, line=system.line_sim) system.disturbance_sim.sort_chrono() system.dae_sim.check_initialization() if config.debug_check_init: system.dae_sim.debug_check_initialization() system.dae_sim.simulate(system.disturbance_sim) system.grid_sim.save_data(system.dae_sim) for item in system.device_list_sim: if item.properties["save_data"]: item.save_data(system.dae_sim) if config.plot: fplot(config) return system.dae_sim
[docs] def fplot(config: Config): """Plot voltage and differential states from the simulation.""" logging.basicConfig(level=logging.WARNING) if config.plot_voltage: plt.figure(figsize=(15, 11)) viridis = cm.get_cmap("viridis", system.dae_sim.grid.nn) for i, node in enumerate(system.dae_sim.grid.buses): try: sim_voltage = np.sqrt( system.grid_sim.yf[node][0, :] ** 2 + system.grid_sim.yf[node][1, :] ** 2 ) plt.plot( system.dae_sim.time_steps, sim_voltage, color=viridis(i), label=f"{node}", ) except KeyError: logging.warning(f"Node {node} cannot be plotted.") plt.legend() plt.title("Voltage Profiles", fontsize=16) plt.xlabel("Time [s]") plt.ylabel("Voltage Magnitude") if config.plot_diff: for device_plt in system.device_list_sim: if device_plt.properties["fplot"]: num_units = device_plt.n num_states = device_plt.ns figure, axis = plt.subplots( num_units, num_states, sharex=True, figsize=(num_states * 5, num_units * 5), ) axis = np.atleast_2d(axis) figure.supxlabel("Time [s]", fontsize=15) # Align rotor angles to the first unit (taken as the reference). if "delta" in device_plt.states: reference = device_plt.xf["delta"][0].copy() for n in device_plt.int.values(): device_plt.xf["delta"][n] -= reference t_sim = np.arange( system.dae_sim.T_start, system.dae_sim.T_end, system.dae_sim.t, ) for idx, n_sim in device_plt.int.items(): for col, state in enumerate(device_plt.states): try: axis[n_sim, col].plot( t_sim, device_plt.xf[state][n_sim], color="black" ) axis[n_sim, col].tick_params(labelsize=15) # Label the y-axis only in the first column if col == 0: axis[n_sim, col].set_ylabel( f"Device {idx}", fontsize=15 ) # Add a title to each column indicating the state if n_sim == 0: axis[n_sim, col].set_title(state, fontsize=15) except KeyError: logging.warning( f"State '{state}' not found for unit {device_plt}." ) # Adjust layout for clarity figure.suptitle( f"Differential States of {device_plt._name}", fontsize=16, ) plt.tight_layout(rect=[0, 0, 1, 0.95]) logging.info("Simulation successful!") # Small-signal analysis (modal report + frequency-banded participation # figures) runs inside dae_sim.simulate() at the operating point, before the # time-stepping loop, when config.small_signal_analysis is True; its figures # are displayed here. plt.show(block=True)
[docs] def clear_module(module_name): """Remove all attributes from a module, preparing it for a clean reload.""" if module_name in sys.modules: module = sys.modules[module_name] module_dict = module.__dict__ to_delete = [ name for name in module_dict if not name.startswith("__") # Avoid special and built-in attributes ] for name in to_delete: del module_dict[name]