Source code for pydynamicestimator.devices.inverter_filter

# Created: 2026-06-05
# (c) Copyright 2025 ETH Zurich
#
# Licensed under the GNU General Public License v3.0;
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at:
#
#     https://www.gnu.org/licenses/gpl-3.0.en.html
#
# This software is distributed "AS IS", WITHOUT WARRANTY OF ANY KIND,
# express or implied. See the License for specific language governing
# permissions and limitations under the License.

"""Inverter output-filter (electrical-plant) strategies.

The filter owns the converter's network-interface states and writes their
dynamics, driven by the switching voltage ``Vsw`` from the inner controller and
the network voltage. It is the inverter analogue of the synchronous machine's
electromagnetic model -- the electrical plant the controllers act on -- but,
unlike the SG's is-a EM hook, it is a *pluggable strategy* (composition): the
filter combines freely with any angle / voltage / inner / pll choice, so by the
is-a/has-a rule it is a strategy, not an inheritance hook. See
``docs/inverter_modernization_design.md`` (§2, §6).

A filter exposes the terminal-current states (``itd_ext`` / ``itq_ext``, read by
the host's network injection ``gcall``) and the capacitor-voltage / filter-current
states that the control ladder reads through ``host.var_sym`` -- which lets a
future quasi-static realization (``LCL_static``) make them algebraic transparently.
"""

from __future__ import annotations

from abc import ABC, abstractmethod
from typing import TYPE_CHECKING, Dict, List, Set

import casadi as ca
import numpy as np

if TYPE_CHECKING:
    from pydynamicestimator.system import Dae


[docs] class Filter(ABC): """Abstract base class for inverter output-filter models (pluggable strategy). Symmetric to the synchronous-machine strategies (AVR/governor/...): the filter does NOT own state arrays or DAE indices. It declares what states, private algebraics, parameters and estimation metadata it needs, and the host ``Inverter`` registers them on itself (filter first, so its states occupy the leading block of the state vector). It reads host parameters/states by attribute (``host.Lf``, ``host.var_sym(dae, "Vfd_ext")``, ...). """
[docs] @abstractmethod def states(self) -> List[str]: """Return ordered list of differential-state names.""" ...
[docs] def algebs(self) -> List[str]: """Return ordered list of device-private *algebraic* variable names. Default empty (dynamic filter: every quantity is a differential state). A quasi-static realization returns its quantities here instead and writes ``0 = RHS`` residuals into ``dae.g``.""" return []
[docs] def algebs_units(self) -> Dict[str, str]: return {}
[docs] def algebs_noise(self) -> Dict[str, float]: return {}
[docs] def algebs_x0(self) -> Dict[str, float]: return {}
[docs] @abstractmethod def units(self) -> List[str]: """Return units for each state, same length as states().""" ...
[docs] @abstractmethod def params(self) -> Dict[str, float]: """Return dict of parameter names -> default values.""" ...
[docs] @abstractmethod def states_noise(self) -> Dict[str, float]: """Return process-noise weight for each state.""" ...
[docs] @abstractmethod def states_init_error(self) -> Dict[str, float]: """Return initial error for each state.""" ...
[docs] @abstractmethod def x0(self) -> Dict[str, float]: """Return default initial guess for each state.""" ...
[docs] @abstractmethod def descriptions(self) -> Dict[str, str]: """Return descriptions for states and params.""" ...
[docs] @abstractmethod def fgcall(self, host, dae: Dae, omega_ref_vec, omega_b) -> None: """Write the filter dynamics into ``dae.f`` (and, for a quasi-static realization, the residuals into ``dae.g``). The driving switching voltage (converter output, network frame) is read host-mediated via ``host.switching_voltage(dae)``. Args: host: the Inverter instance (read params via host.Lf/..., states via host.var_sym(dae, name), the switching voltage via host.switching_voltage(dae), indices via host.Vfd_ext, host.vre, ...). dae: the DAE system object. omega_ref_vec: per-bus reference-frame frequency. omega_b: base speed (2*pi*fn) per device. """ ...
[docs] def provides(self) -> Set[str]: """Plant capability tags this filter exposes (e.g. ``"shunt_capacitor"``), checked against the inner controller's requirements at construction. Empty by default.""" return set()
[docs] def finit_sequential(self, host, dae: Dae) -> Dict[str, np.ndarray]: """Steady-state init of the filter from the power-flow terminal point. Returns a dict with the filter states plus the switching voltage ``Vswd/Vswq`` and terminal current ``itd_ext/itq_ext`` consumed downstream. The base raises -- a filter without a sequential init (e.g. an algebraic realization) is only usable with the joint init.""" raise NotImplementedError( f"{type(self).__name__} provides no sequential filter init; implement " f"finit_sequential() or set _init_method='joint' on the device." )
[docs] class LCL(Filter): """Dynamic LCL output filter -- the framework default (states realization). Capacitor voltage (Vfd_ext/Vfq_ext), converter-side filter current (ifd_ext/ifq_ext) and grid-side / terminal current (itd_ext/itq_ext), all in the network (external) dq frame, with the classic series-Rf-Lf / shunt-Cf / series-Rt-Lt topology. Equivalent to the equations previously hardcoded in ``Inverter.fgcall``; the trajectory is byte-identical. """
[docs] def states(self) -> List[str]: return [ "Vfd_ext", "Vfq_ext", "ifd_ext", "ifq_ext", "itd_ext", "itq_ext", ]
[docs] def units(self) -> List[str]: return ["p.u.", "p.u.", "p.u.", "p.u.", "p.u.", "p.u."]
[docs] def params(self) -> Dict[str, float]: return { "Rf": 0.0001, "Lf": 0.08, "Cf": 0.074, "Rt": 0.01, "Lt": 0.2, }
[docs] def states_noise(self) -> Dict[str, float]: return {s: 1 for s in self.states()}
[docs] def states_init_error(self) -> Dict[str, float]: return {s: 1e-1 for s in self.states()}
[docs] def x0(self) -> Dict[str, float]: return { "Vfd_ext": 1.0, "Vfq_ext": 0, "ifd_ext": 0.1, "ifq_ext": 0, "itd_ext": 0, "itq_ext": 0, }
[docs] def descriptions(self) -> Dict[str, str]: return { "Rf": "Filter resistance", "Lf": "Filter reactance", "Cf": "Filter capacitance", "Rt": "Resistance of the line connecting the external end of the filter to the terminal (i.e. grid)", "Lt": "Reactance of the line connecting the external end of the filter to the terminal (i.e. grid)", "Vfd_ext": "d-component of external filter voltage", "Vfq_ext": "q-component of external filter voltage", "ifd_ext": "d-component of external filter current", "ifq_ext": "q-component of external filter current", "itd_ext": "d-component of external terminal current", "itq_ext": "q-component of external terminal current", }
[docs] def fgcall(self, host, dae: Dae, omega_ref_vec, omega_b) -> None: vn = dae.y Vswd, Vswq = host.switching_voltage(dae) Vfd_ext_s = host.var_sym(dae, "Vfd_ext") Vfq_ext_s = host.var_sym(dae, "Vfq_ext") ifd_ext_s = host.var_sym(dae, "ifd_ext") ifq_ext_s = host.var_sym(dae, "ifq_ext") itd_ext_s = host.var_sym(dae, "itd_ext") itq_ext_s = host.var_sym(dae, "itq_ext") dae.f[host.Vfd_ext] = ( omega_b / host.Cf * (ifd_ext_s - itd_ext_s) + omega_ref_vec * omega_b * Vfq_ext_s ) dae.f[host.Vfq_ext] = ( omega_b / host.Cf * (ifq_ext_s - itq_ext_s) - omega_ref_vec * omega_b * Vfd_ext_s ) dae.f[host.ifd_ext] = ( omega_b / host.Lf * (Vswd - Vfd_ext_s) - omega_b * host.Rf / host.Lf * ifd_ext_s + omega_ref_vec * omega_b * ifq_ext_s ) dae.f[host.ifq_ext] = ( omega_b / host.Lf * (Vswq - Vfq_ext_s) - omega_b * host.Rf / host.Lf * ifq_ext_s - omega_ref_vec * omega_b * ifd_ext_s ) dae.f[host.itd_ext] = ( omega_b / host.Lt * (Vfd_ext_s - vn[host.vre]) - omega_b * host.Rt / host.Lt * itd_ext_s + omega_ref_vec * omega_b * itq_ext_s ) dae.f[host.itq_ext] = ( omega_b / host.Lt * (Vfq_ext_s - vn[host.vim]) - omega_b * host.Rt / host.Lt * itq_ext_s - omega_ref_vec * omega_b * itd_ext_s )
[docs] def provides(self) -> Set[str]: # The LC(L) topology has a shunt capacitor whose voltage the cascaded inner # controller regulates (and feeds forward Cf through). return {"shunt_capacitor"}
[docs] def finit_sequential(self, host, dae: Dae) -> Dict[str, np.ndarray]: """Steady-state init of the LCL filter (decoupled from the controls): given the power-flow terminal voltage (vre/vim) and current (itd_ext/itq_ext), solve the 6 filter ODEs = 0 for the capacitor voltage, filter current, and the switching voltage Vsw the converter must output.""" n = host.n n_unknowns = 6 Vswd = ca.SX.sym("Vswd", n) Vswq = ca.SX.sym("Vswq", n) ifd_ext = ca.SX.sym("ifd_ext", n) ifq_ext = ca.SX.sym("ifq_ext", n) Vfd_ext = ca.SX.sym("Vfd_ext", n) Vfq_ext = ca.SX.sym("Vfq_ext", n) inputs = [ca.vertcat(Vswd, Vswq, ifd_ext, ifq_ext, Vfd_ext, Vfq_ext)] outputs = ca.SX(np.zeros(n_unknowns * n)) omega_b = [2 * np.pi * dae.fn] * n omega_net_vec = dae.omega_net * np.ones(n) # nominal during init itd_ext = dae.Sb / host.Sn * dae.iinit[host.vre] itq_ext = dae.Sb / host.Sn * dae.iinit[host.vim] vre = dae.yinit[host.vre] vim = dae.yinit[host.vim] outputs[0:n] = ( omega_b / host.Lf * (Vswd - Vfd_ext) - omega_b * host.Rf / host.Lf * ifd_ext + omega_net_vec * omega_b * ifq_ext ) outputs[n : 2 * n] = ( omega_b / host.Lf * (Vswq - Vfq_ext) - omega_b * host.Rf / host.Lf * ifq_ext - omega_net_vec * omega_b * ifd_ext ) outputs[2 * n : 3 * n] = ( omega_b / host.Cf * (ifd_ext - itd_ext) + omega_net_vec * omega_b * Vfq_ext ) outputs[3 * n : 4 * n] = ( omega_b / host.Cf * (ifq_ext - itq_ext) - omega_net_vec * omega_b * Vfd_ext ) outputs[4 * n : 5 * n] = ( omega_b / host.Lt * (Vfd_ext - vre) - omega_b * host.Rt / host.Lt * itd_ext + omega_net_vec * omega_b * itq_ext ) outputs[5 * n : 6 * n] = ( omega_b / host.Lt * (Vfq_ext - vim) - omega_b * host.Rt / host.Lt * itq_ext - omega_net_vec * omega_b * itd_ext ) h = ca.Function("h", inputs, [ca.vertcat(outputs)]) G = ca.rootfinder("G", "newton", h) sol = np.array(G(ca.vertcat(np.zeros(n_unknowns * n)))).flatten() return { "Vswd": sol[0:n], "Vswq": sol[n : 2 * n], "ifd_ext": sol[2 * n : 3 * n], "ifq_ext": sol[3 * n : 4 * n], "Vfd_ext": sol[4 * n : 5 * n], "Vfq_ext": sol[5 * n : 6 * n], "itd_ext": itd_ext, "itq_ext": itq_ext, }
[docs] class LCL_static(LCL): """Quasi-static LCL filter: the SAME topology as :class:`LCL`, but the six filter quantities are device-private *algebraic* variables instead of differential states -- the singular-perturbation / quasi-static reduction that zeroes the fast LCL dynamics (``d/dt -> 0``). Appropriate when the network itself is quasi-static (``line_dyn=False``); a *dynamic* filter on a static network is physically incoherent (the host warns but allows it). Because the filter quantities are algebraic, this realization requires the one-shot joint init (the staged sequential init cannot initialize algebraic states) -- the host switches to ``_init_method="joint"`` automatically when a filter declares algebraics. Per docs/inverter_modernization_design.md §6/§7, the ``2*pi*f`` (``omega_b``) factor common to every term of the dynamic ODE is DROPPED on the ``0 = RHS`` algebraic constraint: it is mathematically redundant and would inflate the finit Jacobian condition number. """ # finit_sequential is INHERITED from LCL: the steady-state solve is identical # (LCL_static's algebraic constraints ARE the dynamic filter's d/dt=0 equations, # so the operating point is the same). The only difference is the destination -- # the host's _load_finit routes these quantities to dae.yinit (they are in # _algebs_int) instead of dae.xinit. So LCL_static supports BOTH init methods.
[docs] def states(self) -> List[str]: return []
[docs] def units(self) -> List[str]: return []
[docs] def states_noise(self) -> Dict[str, float]: return {}
[docs] def states_init_error(self) -> Dict[str, float]: return {}
[docs] def x0(self) -> Dict[str, float]: return {}
[docs] def algebs(self) -> List[str]: return ["Vfd_ext", "Vfq_ext", "ifd_ext", "ifq_ext", "itd_ext", "itq_ext"]
[docs] def algebs_units(self) -> Dict[str, str]: return {s: "p.u." for s in self.algebs()}
[docs] def algebs_noise(self) -> Dict[str, float]: return {s: 1.0 for s in self.algebs()}
[docs] def algebs_x0(self) -> Dict[str, float]: return { "Vfd_ext": 1.0, "Vfq_ext": 0, "ifd_ext": 0.1, "ifq_ext": 0, "itd_ext": 0, "itq_ext": 0, }
[docs] def fgcall(self, host, dae: Dae, omega_ref_vec, omega_b) -> None: # Steady-state (quasi-static) LCL constraints: 0 = RHS, i.e. the dynamic # equations with the d/dt term zeroed. The common omega_b factor is # dropped. var_sym resolves the filter quantities to dae.y (algebraic). vn = dae.y Vswd, Vswq = host.switching_voltage(dae) Vfd_ext_s = host.var_sym(dae, "Vfd_ext") Vfq_ext_s = host.var_sym(dae, "Vfq_ext") ifd_ext_s = host.var_sym(dae, "ifd_ext") ifq_ext_s = host.var_sym(dae, "ifq_ext") itd_ext_s = host.var_sym(dae, "itd_ext") itq_ext_s = host.var_sym(dae, "itq_ext") dae.g[host.Vfd_ext] = ( 1 / host.Cf * (ifd_ext_s - itd_ext_s) + omega_ref_vec * Vfq_ext_s ) dae.g[host.Vfq_ext] = ( 1 / host.Cf * (ifq_ext_s - itq_ext_s) - omega_ref_vec * Vfd_ext_s ) dae.g[host.ifd_ext] = ( 1 / host.Lf * (Vswd - Vfd_ext_s) - host.Rf / host.Lf * ifd_ext_s + omega_ref_vec * ifq_ext_s ) dae.g[host.ifq_ext] = ( 1 / host.Lf * (Vswq - Vfq_ext_s) - host.Rf / host.Lf * ifq_ext_s - omega_ref_vec * ifd_ext_s ) dae.g[host.itd_ext] = ( 1 / host.Lt * (Vfd_ext_s - vn[host.vre]) - host.Rt / host.Lt * itd_ext_s + omega_ref_vec * itq_ext_s ) dae.g[host.itq_ext] = ( 1 / host.Lt * (Vfq_ext_s - vn[host.vim]) - host.Rt / host.Lt * itq_ext_s - omega_ref_vec * itd_ext_s )
FILTER_REGISTRY: Dict[str, type] = { "LCL": LCL, "LCL_static": LCL_static, }