# Created: 2026-06-08
# (c) Copyright 2025 ETH Zurich
#
# Licensed under the GNU General Public License v3.0;
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at:
#
# https://www.gnu.org/licenses/gpl-3.0.en.html
#
# This software is distributed "AS IS", WITHOUT WARRANTY OF ANY KIND,
# express or implied. See the License for specific language governing
# permissions and limitations under the License.
"""Quantify how far the CURRENT inverter trajectories drift from the committed
byte-identical baselines -- the diagnostic companion to
``test_recur_{sim,est}_inverter.py`` (which only give boolean pass/fail).
Run from the repo root:
.venv/bin/python -m pydynamicestimator.tests.baselines.report_inverter_drift
.venv/bin/python -m pydynamicestimator.tests.baselines.report_inverter_drift sim_ld
Purpose. The vectorization pass (A1 in docs/inverter_remaining_work.md) is the one
inverter change that is *not* byte-identical: re-associating floating-point
arithmetic shifts the trajectory at the ULP level, amplified by the nonlinear
integrator + init Newton solves to ~1e-9..1e-6. The byte-identical gates can only
say "diverged"; this tool says *by how much* and *in which states*, so you can
(a) confirm the drift is reassociation-scale rather than a transcription bug, and
(b) watch the actual margin against the 1e-6 sim / 1e-5 est gate tolerances.
It does NOT modify the baselines -- it loads them read-only and compares. Capture
a fresh reference only via ``create_baseline_inverter.py``, and only when an
intentional, validated numerics change makes the new trajectories the reference.
For each case it reports the array shape, the np.allclose gate verdict (same
semantics as the test: atol gate + numpy's default rtol=1e-5), the global
max-abs / max-rel / RMS difference with the location of the worst element, and a
per-state table of the largest-drifting rows -- named when the row->name
reconstruction matches the array (else bare indices, never a wrong label)."""
from __future__ import annotations
import pickle
import sys
from typing import List, Optional
import numpy as np
from pydynamicestimator import system
from pydynamicestimator.tests.baselines.inverter_baseline import (
BASELINES,
make_inverter_baseline_config,
run_inverter_case,
)
#: Gate tolerance per case, mirroring test_recur_{sim,est}_inverter.py exactly.
GATE_ATOL = {"sim_ld": 1e-6, "sim_alg": 1e-6, "est": 1e-5}
#: Floor added to the denominator of the relative error so near-zero baseline
#: entries (e.g. a current at rest) don't blow it up.
_REL_FLOOR = 1e-9
#: Device-name -> short tag, copied from system.py's participation-factor labeller
#: so the reconstructed names read the same as everywhere else in the codebase.
_DEVICE_ABBREV = {
"Synchronous_machine_transient_model": "SM",
"Synchronous_machine_subtransient_model": "SM",
"Synchronous_machine_subtransient_model_Sauer_Pai": "SM_SP",
"Synchronous_machine_subtransient_model_Sauer_Pai_6th_order": "SM_SP6",
"GridForming_inverter_model": "GFM",
"GridFollowing_inverter_model": "GFL",
"Static_load_power": "SLP",
"Static_load_impedance": "SLZ",
"Static_load_ZIP": "ZIP",
"Infinite_bus": "INF",
}
[docs]
def _reconstruct_state_names(case: str, n_rows: int) -> List[str]:
"""Rebuild the per-row state names of ``x_full`` from the module globals that
``run_inverter_case`` just populated, mirroring the exact device -> entry ->
state -> (dynamic-line tail) ordering system.py uses to build the state vector.
The reconstruction is *length-guarded*: if anything is off (attribute missing,
or the rebuilt list does not match ``n_rows``) it returns plain ``x{i}``
indices rather than risk pinning a drift number on the wrong state."""
fallback = [f"x{i}" for i in range(n_rows)]
try:
# The 'est' trajectory is the estimation model (device_list_est / grid_est);
# the sim cases are the simulation model.
if case == "est":
device_list, grid, line = (
system.device_list_est,
system.grid_est,
system.line_est,
)
else:
device_list, grid, line = (
system.device_list_sim,
system.grid_sim,
system.line_sim,
)
names: List[str] = []
for item in device_list:
short = _DEVICE_ABBREV.get(item._name, item._name[:6])
for i in range(item.n):
for state in item.states:
names.append(f"{short}@{item.bus[i]}:{state}")
if make_inverter_baseline_config(case).line_dyn:
for i in range(grid.nb):
for state in line.states:
names.append(f"DL_{line.bus_i[i]}-{line.bus_j[i]}:{state}")
for i in range(grid.nn):
for state in ["vre", "vim"]:
names.append(f"DL_{grid.buses[i]}:{state}")
return names if len(names) == n_rows else fallback
except Exception:
return fallback
[docs]
def report_case(case: str) -> Optional[dict]:
"""Run one baseline ``case`` on the current code, diff it against the committed
reference, print the report, and return the metrics dict (None if the baseline
is missing or the shapes differ)."""
path = BASELINES[case]
print(f"\n=== {case} ===")
if not path.exists():
print(f" baseline missing: {path.name} -- run create_baseline_inverter.py")
return None
with open(path, "rb") as file:
base = np.asarray(pickle.load(file))
arr = run_inverter_case(case) # populates the system module globals as a side effect
if arr.shape != base.shape:
# A shape change means the state count/ordering moved -- A1 must not do
# that. This is a structural regression, not reassociation drift.
print(
f" SHAPE MISMATCH current {arr.shape} vs baseline {base.shape} "
f"-- state count/ordering changed (structural regression, not drift)"
)
return None
atol = GATE_ATOL[case]
passes = bool(np.allclose(arr, base, atol=atol)) # default rtol=1e-5, as in the gate
diff = np.abs(arr - base)
rel = diff / (np.abs(base) + _REL_FLOOR)
max_abs = float(diff.max())
max_rel = float(rel.max())
rms = float(np.sqrt(np.mean(diff**2)))
r_abs, c_abs = np.unravel_index(int(diff.argmax()), diff.shape)
names = _reconstruct_state_names(case, arr.shape[0])
named = names[0] != "x0" or arr.shape[0] == 1 # whether reconstruction succeeded
print(f" shape {arr.shape} (rows=states, cols=time)")
print(f" gate np.allclose atol={atol:.0e}, rtol=1e-5 -> {'PASS' if passes else 'FAIL'}")
print(f" max |abs diff| {max_abs:.3e} at {names[r_abs]} (row {r_abs}, col {c_abs})")
print(f" max |rel diff| {max_rel:.3e} (floor {_REL_FLOOR:.0e})")
print(f" RMS abs diff {rms:.3e}")
if not named:
print(" (state names unavailable -- showing row indices)")
per_state = diff.max(axis=1) # worst drift per state over all time
order = np.argsort(per_state)[::-1]
top = min(15, arr.shape[0])
print(f" top {top} drifting states (max |abs diff| over time):")
for r in order[:top]:
if per_state[r] == 0.0:
break
print(f" {per_state[r]:.3e} {names[r]}")
return {
"case": case,
"shape": arr.shape,
"pass": passes,
"max_abs": max_abs,
"max_rel": max_rel,
"rms": rms,
"argmax_row": int(r_abs),
"argmax_col": int(c_abs),
"argmax_name": names[r_abs],
}
[docs]
def main(argv: List[str]) -> int:
cases = argv[1:] or list(BASELINES)
unknown = [c for c in cases if c not in BASELINES]
if unknown:
print(f"unknown case(s) {unknown}; valid: {list(BASELINES)}")
return 2
results = [report_case(c) for c in cases]
results = [r for r in results if r is not None]
print("\n--- summary ---")
for r in results:
print(
f" {r['case']:8s} {'PASS' if r['pass'] else 'FAIL'} "
f"max|abs|={r['max_abs']:.2e} max|rel|={r['max_rel']:.2e} "
f"rms={r['rms']:.2e}"
)
# Nonzero exit if any case failed its gate, so this is usable in a pipeline.
return 0 if all(r["pass"] for r in results) else 1
if __name__ == "__main__":
raise SystemExit(main(sys.argv))