pydynamicestimator.system

Attributes

grid_est

grid_sim

dae_est

dae_sim

bus_init_sim

bus_init_est

bus_unknown_est

line_sim

line_est

disturbance_sim

disturbance_est

device_list_sim

device_list_est

Classes

Grid

GridSim

GridEst

Dae

DaeSim

DaeEst

Functions

stack_volt_power(→ numpy.ndarray)

Module Contents

class pydynamicestimator.system.Grid[source]
y_adm_matrix: casadi.SX | None = None
y_series_r: casadi.SX | None = None
y_series_i: casadi.SX | None = None
G_dyn: casadi.SX | None = None
B_i_dyn: casadi.SX | None = None
B_j_dyn: casadi.SX | None = None
X_dyn: casadi.SX | None = None
y_series: numpy.ndarray | None = None
z_imp_matrix: numpy.ndarray | None = None
tau: numpy.ndarray | None = None
tau_r: numpy.ndarray | None = None
tau_i: numpy.ndarray | None = None
incident_matrix: numpy.ndarray | None = None
Bsum: numpy.ndarray | None = None
Gsum: numpy.ndarray | None = None
nb: int = 0
nn: int = 0
buses: list = []
Bsum_trafo: numpy.ndarray | None = None
Gsum_trafo: numpy.ndarray | None = None
Sb: float = 100
idx_i: list = []
idx_j: list = []
idx_i_re: list = []
idx_j_re: list = []
idx_i_im: list = []
idx_j_im: list = []
yinit: dict
yf: dict
sf: dict
line: pydynamicestimator.devices.device.Line | None = None
line_is_faulted: list[bool] = []
line_is_open: list[bool] = []
line_fault_adm: list[float] = []
bus_is_faulted: list[bool] = []
bus_fault_adm: list[float] = []
idx_branch: dict[Tuple[str, str], int]
idx_bus: dict[str, int]
idx_bus_re: dict[str, int]
idx_bus_im: dict[str, int]
C_branches_forward: numpy.ndarray | None = None
C_branches_reverse: numpy.ndarray | None = None
C_branches: numpy.ndarray | None = None
C_linecurrents_to_nodes: numpy.ndarray | None = None
save_data(dae: Dae) None[source]
Parameters:

dae (Dae)

Return type:

None

init_symbolic(dae: Dae) None[source]
Parameters:

dae (Dae)

Return type:

None

gcall(dae: Dae) None[source]
Parameters:

dae (Dae)

Return type:

None

build_bus_rotation_T(dae: Dae) casadi.SX[source]

Builds rotation matrix in order to translate bus voltages. returns: Rotation matrix, SX (2*nn x 2*nn)

Parameters:

dae (Dae)

Return type:

casadi.SX

guncall(dae: Dae) None[source]
Parameters:

dae (Dae)

Return type:

None

add_lines(line: pydynamicestimator.devices.device.Line) None[source]
Parameters:

line (pydynamicestimator.devices.device.Line)

Return type:

None

add_bus(bus: str, idx: list, idx_re: list, idx_im: list) None[source]
Parameters:
  • bus (str)

  • idx (list)

  • idx_re (list)

  • idx_im (list)

Return type:

None

build_Bsum_Gsum_trafo() None[source]
Return type:

None

build_y() None[source]
Return type:

None

build_y_sym(omega_buses: casadi.SX = ca.SX(1.0), omega_lines: float | casadi.SX = 1.0, dyn_update: bool = False) None[source]
Parameters:
  • omega_buses (casadi.SX)

  • omega_lines (float | casadi.SX)

  • dyn_update (bool)

Return type:

None

get_branch_index(node1: list[str], node2: list[str]) tuple[numpy.ndarray, numpy.ndarray][source]
Parameters:
  • () (node2) – List of starting nodes

  • () – List of receiving nodes

  • node1 (list[str])

  • node2 (list[str])

Return type:

tuple[numpy.ndarray, numpy.ndarray]

Returns: The order of the given branch and the order of the opposite direction branch

get_node_index(buses: list) tuple[numpy.ndarray, numpy.ndarray, numpy.ndarray][source]
Parameters:

buses (list)

Return type:

tuple[numpy.ndarray, numpy.ndarray, numpy.ndarray]

class pydynamicestimator.system.GridSim[source]

Bases: Grid

gcall(dae: DaeSim, **kwargs)[source]
Parameters:

dae (DaeSim)

init_from_power_flow(dae: DaeSim, static: pydynamicestimator.devices.device.BusInit) None[source]
Parameters:
Return type:

None

print_init_power_flow(dae: DaeSim) None[source]
Parameters:

dae (DaeSim)

Return type:

None

build_Gsum() None[source]
Return type:

None

build_Bsum() None[source]
Return type:

None

build_incident_matrix() None[source]
Return type:

None

build_linecurrents_to_nodes() None[source]

Builds a constant mapping matrix that accumulates branch current states (xl) into nodal current balance entries (g).

For each branch k with from-bus i and to-bus j:
  • Real current i_ijr contributes +1 at (vre_i) and -1 at (vre_j)

  • Imag current i_iji contributes +1 at (vim_i) and -1 at (vim_j)

Return type:

None

build_linecurrents_transformed_to_bus_frame(dae: DaeSim) casadi.SX[source]

Builds the nodal current balance from line current states (xl) after rotating them from the line reference frame into each corresponding bus reference frame.

For each branch k with from-bus i and to-bus j:
  • Real current i_ijr, transformed to reference frame of bus i, contributes +1 at (node i)

  • Real current i_ijr, transformed to reference frame of bus j, contributes -1 at (node j)

  • Imag current i_iji, transformed to reference frame of bus i, contributes +1 at (node i)

  • Imag current i_iji, transformed to reference frame of bus j, contributes -1 at (node j)

  • And applies (real) tap ratio at from side (i)

Returns:

(g_line) nodal current injection vector of length 2·nn

Parameters:

dae (DaeSim)

Return type:

casadi.SX

build_voltage_transformed_to_line_frame(dae: DaeSim) casadi.SX[source]

Builds the from- and to-terminal bus voltages expressed in the corresponding line reference frame.

For each branch k with from-bus i and to-bus j:
  • Takes (V_i, V_j) given in the local bus frames of the from-bus i and to-bus j, respectively

  • Rotates both into branch k’s line frame whose angle is defined by δ_line = 0.5(δ_i + δ_j)

Returns:

(Vi_re_L, Vi_im_L, Vj_re_L, Vj_im_L) transformed voltages as CasADi vectors of length nb

Parameters:

dae (DaeSim)

Return type:

casadi.SX

build_branch_current_fun(dae: DaeSim) None[source]

Builds a CasADi function that computes terminal branch currents from (x, y) consistent with distributed reference-frame formulation.

For each branch k with from-bus i and to-bus j:
  • Takes terminal bus voltages (y) in the respective bus reference frames

  • Rotates Vi and Vj into the line reference frame of branch k

  • Applies the (real) tap ratio on the from-side (i)

  • Computes series and shunt currents (from transformed Vi,Vj) in the line frame

  • Forms terminal currents (If at i-side, It at j-side) in the line frame

  • Rotates If back to the i-bus frame and It back to the j-bus frame

Stores:

self._branch_current_fun(x, y) -> I_out, with length 4·nb ordered as [If_re, If_im, It_re, It_im] (in corresponding bus frames).

Parameters:

dae (DaeSim)

Return type:

None

setup(dae: DaeSim, bus_init: pydynamicestimator.devices.device.BusInit) None[source]
Parameters:
Return type:

None

update_effective_line_params() None[source]

Update Line object parameters to reflect current fault/open state. Uses saved originals so clearing a fault restores the original values.

Return type:

None

class pydynamicestimator.system.GridEst[source]

Bases: Grid

y_simulation = []
_init_from_simulation(other: GridSim, dae: Dae) None[source]
Parameters:
Return type:

None

_get_results(other: GridSim) None[source]
Parameters:

other (GridSim)

Return type:

None

setup(dae: DaeEst, other: GridSim) None[source]
Parameters:
Return type:

None

init_symbolic(dae: DaeEst) None[source]
Parameters:

dae (DaeEst)

Return type:

None

class pydynamicestimator.system.Dae[source]
nx: int = 0
ny: int = 0
nv: int = 0
ng: int = 0
nl: int = 0
np: int = 0
nts: int = 0
n_priv: int = 0
algebs_int_names: list[str] = []
yinit_priv: list[float] = []
x: casadi.SX | None = None
y: casadi.SX | None = None
f: casadi.SX | None = None
g: casadi.SX | None = None
fnode: casadi.SX | None = None
xl: casadi.SX | None = None
fl: casadi.SX | None = None
p: casadi.SX | None = None
p0: casadi.SX | None = None
s: casadi.SX | None = None
x_full: numpy.ndarray | None = None
y_full: numpy.ndarray | None = None
i_full: numpy.ndarray | None = None
T_start: float = 0.0
T_end: float = 10.0
time_steps: numpy.ndarray | None = None
states: list[str] = []
Sb: float = 100
fn: Literal[50, 60] = 50
t: float = 0.02
omega_net: float = 1.0
omega_coi: casadi.SX | None
omega_ref: casadi.SX | None = None
omega_ref_buses: casadi.SX | None = None
omega_ref_lines: casadi.SX | None = None
omega_ref_expr: casadi.SX | None = None
omega_ref_buses_expr: casadi.SX | None = None
omega_ref_lines_expr: casadi.SX | None = None
omega_mode: Literal['coi', 'single', 'nom', 'dist']
has_delta_ref: bool = False
omega_single_idx: str | None = None
xinit: list = []
yinit: list = []
iinit: list = []
xlinit: list = []
sinit
xmin: list = []
xmax: list = []
grid: Grid | None = None
device_list: list = []
bus_init: pydynamicestimator.devices.device.BusInit | None = None
incl_lim: bool
FG: casadi.Function | None = None
__reduce__()[source]
__setstate__(state)[source]
setup(**kwargs) None[source]
Return type:

None

fgcall() None[source]
Return type:

None

init_symbolic() None[source]
Return type:

None

compute_coi_expr(device_list=None) None[source]

Compute the symbolic Centre-of-Inertia expression from devices.

device_list defaults to self.device_list so that DaeEst gets its own device set rather than reaching for the sim module global. The parameter is preserved as an override for explicit callers.

Return type:

None

update_omega() None[source]

Update system reference frequency to frequency of selected mode.

  • If omega_mode == ‘coi’: compute inertia-weighted COI of synchronous machines.

  • If omega_mode == ‘single’ get frequency of chosen synchronous machine or inverter.

  • If omega_mode == ‘dist’ get frequencies at all buses from frequency divider.

  • Otherwise: set ω_ref = ω_net (fallback behavior).

Return type:

None

set_omega_single_idx_from_slack() None[source]

If omega_mode == ‘single’ and omega_single_idx is None, choose the device connected to the slack bus as reference.

Uses self.bus_init and self.device_list so that DaeEst resolves against its own device set. If the est-side bus_init has no entries (typical: est_param.txt does not declare BusInit), falls back to the sim-side bus_init_sim for the slack lookup — the slack bus is a property of the physical grid and is the same for sim and est.

Return type:

None

init_reference_frame_angle_states() None[source]

Initialize reference-frame angle states (δ_ref) for all network buses.

  • This function allocates one differential state per bus representing the local reference-frame angle δ_ref at that bus.

  • These angles are only needed for the distributed frequency reference framework (omega_mode == “dist”), specifically for transformations between the multiple local reference frames.

Return type:

None

_LOAD_DEVICE_SHARES
_find_bus_load(bus_name: str)[source]

Return (device, k) for the load attached to bus_name, or raise.

Iterates self.device_list and matches against any of the registered load device classes. The first matching (device, local-index) pair is returned. Single-load-per-bus is enforced upstream by the framework (one device per bus per type), so the first match is unambiguous.

Parameters:

bus_name (str)

dist_load(p: float, q: float, bus: list) None[source]

ZIP-aware LOAD disturbance.

Increments the rated demand at the load attached to bus by (p, q) MW/MVAr. The increment is apportioned across the device’s configured Z, I, P shares so the load preserves its voltage-dependence shape:

  • Z share contributes a linear admittance step (no 1/|V|² term),

  • I share contributes a rotated constant-current step,

  • P share contributes a constant-power step (the legacy behaviour of this function).

For StaticLoadImpedance (z=1) and StaticLoadPower (p=1) the shares are inferred. With line_dyn=True the contributions feed dae.fnode via the same ω_b / B_sum capacitor coupling as before; with line_dyn=False they feed dae.g directly.

Parameters:
  • p (float)

  • q (float)

  • bus (list)

Return type:

None

check_disturbance(dist: pydynamicestimator.devices.device.Disturbance, iter_forward: int) bool[source]
Parameters:
Return type:

bool

exec_dist()[source]
debug_check_initialization() None[source]
Return type:

None

check_initialization() None[source]
Return type:

None

class pydynamicestimator.system.DaeSim[source]

Bases: Dae

int_scheme_sim = None
int_scheme_sim_options: dict
eigenvalues = None
participation_factors = None
participation_factors_normalized = None
state_names: list = []
modes: list | None = None
small_signal_analysis: bool = False
t0: float
tf: float
line_dyn: bool
init_symbolic() None[source]
Return type:

None

fgcall() None[source]
Return type:

None

_line_dyn_integrate(x0, y0, i0, s0, sl0)[source]

Run the line_dyn integrator for one (set of) step(s) and return (x, y_full, i_line) as 2-D arrays (columns = time steps).

Under line_dyn the differential block is [x | y_volt | xl]. With device private algebraics the voltages stay differential (driven by fnode) while the privates are algebraic: they are passed as z0 and returned in zf, and recombined here so y_full has the standard layout [voltages | privates].

simulate(dist: pydynamicestimator.devices.device.Disturbance) None[source]
Parameters:

dist (pydynamicestimator.devices.device.Disturbance)

Return type:

None

_snapshot_branch_params() dict[source]

Snapshot the current numeric branch admittance parameters from Grid.build_y().

Return type:

dict

compute_i_full() None[source]

Compute branch currents from x_full and y_full in post-processing.

Uses vectorized NumPy instead of per-step CasADi evaluation. For has_delta_ref=False: simple matrix multiply C_branches @ y_full. For has_delta_ref=True: includes reference-frame rotation using delta_ref from x_full.

Return type:

None

_compute_i_full_with_rotation(params: dict, start: int, end: int) None[source]

Compute branch currents with reference-frame rotation (has_delta_ref=True).

Parameters:
  • params (dict)

  • start (int)

  • end (int)

Return type:

None

eigenvalue_analysis() None[source]
Return type:

None

_build_modes(participation_normalized: numpy.ndarray) list[source]

Collapse the raw eigenvalues into physical modes (complex-conjugate pairs merged) and attach the modal metrics used by the reports.

Returns a list of mode dicts sorted by damping ratio (most critical first). Each carries the representative eigenvalue, natural frequency [Hz], damping ratio, and that mode’s normalized participation column.

Parameters:

participation_normalized (numpy.ndarray)

Return type:

list

print_modal_report(top_k: int = 4) str[source]

Print a per-mode small-signal summary and return it as a string.

One row per physical mode (complex-conjugate pairs collapsed), with the eigenvalue, natural frequency, damping ratio and the dominant states. Modes are sorted by damping ratio so the most critical appear first.

Parameters:

top_k (int)

Return type:

str

plot_eigenvalues(damping_ref: float = 0.05) None[source]

Simple eigenvalue (s-plane) scatter of the reduced state matrix.

Every eigenvalue is plotted in the complex plane with the imaginary axis as the stability boundary. Points are colored red = unstable (Re>0), orange = lightly damped (ζ < damping_ref), blue = well damped, grey = marginal (near the origin). A dashed wedge marks the constant-damping locus ζ = damping_ref. When the spectrum is very wide (fast real modes far in the left half-plane) a second panel zooms in near the imaginary axis, where the slow/critical modes live. Unstable eigenvalues are annotated when there are only a few.

Parameters:

damping_ref (float)

Return type:

None

_render_participation_heatmap(modes: list, title: str, max_states: int | None = 35, min_participation: float = 0.02, annotate: bool = True) None[source]

Render one participation-factor heatmap for a pre-selected, already ordered list of modes. Shared by the damping overview and the banded frequency views. Cells below min_participation are greyed out and states that never reach it (in the shown modes) are dropped; the row axis is capped at max_states strongest participants.

Parameters:
  • modes (list)

  • title (str)

  • max_states (Optional[int])

  • min_participation (float)

  • annotate (bool)

Return type:

None

plot_participation_factors(max_modes: int | None = 25, max_states: int | None = 35, min_participation: float = 0.02, annotate: bool = True) None[source]

Single participation-factor heatmap of the most critical modes.

Modes are sorted by damping ratio (most critical first) and the view is focused for legibility on large systems: only the first max_modes and the states that participate meaningfully in them are shown. Pass max_modes=None / max_states=None to show everything. For the frequency-banded views (one figure per band) use plot_participation_bands().

Parameters:
  • max_modes (Optional[int])

  • max_states (Optional[int])

  • min_participation (float)

  • annotate (bool)

Return type:

None

plot_participation_bands(max_states: int | None = 35, min_participation: float = 0.02, annotate: bool = True) None[source]

Frequency-banded participation heatmaps — one figure per band.

The modes are split into non-oscillatory (real) modes and four frequency bands; a figure is produced only for a band that actually contains modes. Within a band the modes are frequency-sorted (real modes are ordered by proximity to instability). Bands: real · 0–0.1 Hz · 0.1–2 Hz (electro- mechanical) · 2–50 Hz · >50 Hz.

Parameters:
  • max_states (Optional[int])

  • min_participation (float)

  • annotate (bool)

Return type:

None

class pydynamicestimator.system.DaeEst[source]

Bases: Dae

err_msg_est = Multiline-String
Show Value
"""Estimation failed
Possible reasons:
 - Not enough measurements specified
 - Initialization point very bad
 - Estimator diverged from true state
 - Check if the disturbance rendered system unestimable
Possible solutions:
More measurements, less noise, different disturbance, better initialization..."""
nm: int = 0
_schemes
int_scheme: str = 'backward'
kf: float = 0.0
kb: float = 1.0
line_dyn = False
filter = None
unknown: list[str]
proc_noise_alg: float = 0.0001
proc_noise_diff: float = 0.0001
init_error_diff: float = 1.0
init_error_alg: bool = False
unknown_indices: list = []
known_indices: list = []
err_init: float = 0.001
r_meas_noise_cov_matrix: numpy.ndarray | None = None
r_meas_noise__inv_cov_matrix: numpy.ndarray | None = None
q_proc_noise_diff_cov_matrix: numpy.ndarray | None = None
q_proc_noise_alg_cov_matrix: numpy.ndarray | None = None
q_proc_noise_cov_matrix: numpy.ndarray | None = None
c_meas_matrix: numpy.ndarray | None = None
z_meas_points_matrix: numpy.ndarray | None = None
p_est_init_cov_matrix: numpy.ndarray | None = None
x0: numpy.ndarray | None = None
y0: numpy.ndarray | None = None
s0: numpy.ndarray | None = None
f_func: casadi.Function | None = None
g_func: casadi.Function | None = None
df_dxy_jac: casadi.Function | None = None
dg_dxy_jac: casadi.Function | None = None
inner_tol: float = 1e-06
cov_tol: float = 1e-10
iter_ful: numpy.ndarray | None = None
time_full: numpy.ndarray | None = None
property te
find_unknown_indices(grid: Grid) None[source]
Parameters:

grid (Grid)

Return type:

None

setup(**kwargs) None[source]
Return type:

None

init_symbolic() None[source]
Return type:

None

fgcall() None[source]
Return type:

None

_init_estimate() None[source]
Return type:

None

estimate(dist: pydynamicestimator.devices.device.Disturbance, **kwargs) None[source]
Parameters:

dist (pydynamicestimator.devices.device.Disturbance)

Return type:

None

pydynamicestimator.system.grid_est
pydynamicestimator.system.grid_sim
pydynamicestimator.system.dae_est
pydynamicestimator.system.dae_sim
pydynamicestimator.system.bus_init_sim
pydynamicestimator.system.bus_init_est
pydynamicestimator.system.bus_unknown_est
pydynamicestimator.system.line_sim
pydynamicestimator.system.line_est
pydynamicestimator.system.disturbance_sim
pydynamicestimator.system.disturbance_est
pydynamicestimator.system.device_list_sim = []
pydynamicestimator.system.device_list_est = []
pydynamicestimator.system.stack_volt_power(vre, vim) numpy.ndarray[source]
Return type:

numpy.ndarray