Source code for tidal.solver.operators

"""Spatial operators on plain numpy arrays (finite-difference and spectral).

All operators accept an ``np.ndarray`` (field data) and a ``GridInfo``
descriptor, returning an ``np.ndarray`` of the same shape.  No py-pde
ScalarField / VectorField wrappers — pure numpy with explicit ghost-cell
boundary handling for FD, or FFT-based spectral differentiation for
periodic domains.

Operator modes
--------------
Two modes are available, selectable at runtime:

1. **Finite-difference** (default): Ghost-cell stencils with 2nd, 4th, or
   6th-order accuracy (``set_fd_order``).  Works with all BC types.

2. **Spectral** (``set_spectral(True)``): FFT-based pseudo-spectral
   operators via ``np.fft.rfft``/``np.fft.irfft``.  Requires all-periodic
   BCs.  Achieves exponential convergence for smooth problems — typically
   N=64-128 instead of N=512-1024 for equivalent accuracy.

   The pseudo-spectral approach computes derivatives in Fourier space
   (multiply by ``ik`` for gradient, ``-k^2`` for Laplacian) and returns
   to physical space.  Position-dependent coefficients are applied in
   physical space after the operator, following the standard Dedalus/py-pde
   pseudo-spectral pattern.

   Ref: Burns et al. (2020), "Dedalus: A flexible framework for numerical
   simulations with spectral methods", Physical Review Research 2:023068.

Finite-difference accuracy orders
----------------------------------
Supports 2nd, 4th, and 6th-order central-difference stencils, selectable
at runtime via :func:`set_fd_order`.  Higher orders use wider stencils
(more ghost cells) for dramatically improved spatial accuracy:

- **Order 2** (default): 3-point stencil, 1 ghost cell/side.
- **Order 4**: 5-point stencil, 2 ghost cells/side.
- **Order 6**: 7-point stencil, 3 ghost cells/side.

Stencil coefficients from Fornberg (1988), "Generation of Finite Difference
Formulas on Arbitrarily Spaced Grids", Mathematics of Computation 51(184),
pp. 699-706, Table 1 (central differences).

Boundary condition types
------------------------
- ``"periodic"`` — wraparound via ``np.roll``
- ``"neumann"``  — zero normal derivative (mirror ghost cell)
- ``"dirichlet"``— zero value (antisymmetric ghost cell)
- ``"robin"``    -- mixed: d_n f + gamma*f = beta

All non-periodic BCs are unified via the ghost-cell formula
``ghost = const + factor * interior_cell`` where ``(const, factor)``
are determined by BC type, value, and grid spacing.  See ``SideBCSpec``
for the derivation.  For higher-order stencils (order > 2), ghost cells
are filled layer-by-layer from interior outward (recursive ghost fill;
LeVeque, "Finite Difference Methods for ODEs and PDEs", SIAM, 2007, ch. 2).

The ``bc`` parameter may be:
- A single string applied to all axes (e.g. ``"periodic"``)
- A tuple of strings, one per axis (e.g. ``("neumann", "periodic")``)
- A tuple of ``AxisBCSpec`` objects for per-side / non-zero / Robin BCs

References
----------
- Burns et al. (2020), Phys. Rev. Research 2:023068 — pseudo-spectral methods.
- Fornberg (1988), Math. Comp. 51(184), pp. 699-706 — stencil coefficients.
- LeVeque (2007), "Finite Difference Methods for ODEs and PDEs", SIAM — convergence theory.
- Zwicker (2020), J. Open Source Software 5(48) — ghost-cell virtual-point approach.
"""

from __future__ import annotations

from dataclasses import dataclass
from typing import TYPE_CHECKING, Any

import numpy as np

if TYPE_CHECKING:
    from tidal.solver.grid import GridInfo

# ---------------------------------------------------------------------------
# Finite-difference order (module-level state)
# ---------------------------------------------------------------------------
# Set once at simulation startup via set_fd_order().  Determines stencil
# width and ghost-cell count for all spatial operators.
#
# Reference: Fornberg, "Generation of Finite Difference Formulas on
# Arbitrarily Spaced Grids", Mathematics of Computation 51(184), 1988.
# Stencil coefficients from Table 1 (central differences).

_fd_order: int = 2
"""Current FD accuracy order: 2 (default), 4, or 6."""

_n_ghost: int = 1
"""Number of ghost cells per side: fd_order // 2."""


[docs] def set_fd_order(order: int) -> None: """Set the finite-difference accuracy order for all spatial operators. Must be called before any operator evaluation (typically at simulation startup). Clears the ghost-cell padding cache since buffer shapes change. Parameters ---------- order : int Accuracy order: 2 (3-point stencil), 4 (5-point), or 6 (7-point). Raises ------ ValueError If *order* is not 2, 4, or 6. """ global _fd_order, _n_ghost # noqa: PLW0603 if order not in {2, 4, 6}: msg = f"FD order must be 2, 4, or 6; got {order}" raise ValueError(msg) _fd_order = order _n_ghost = order // 2 # Invalidate cached padding buffers — shapes depend on ghost count _pad_cache.clear()
[docs] def get_fd_order() -> int: """Return the current FD accuracy order.""" return _fd_order
[docs] def get_n_ghost() -> int: """Return the current number of ghost cells per side.""" return _n_ghost
# --------------------------------------------------------------------------- # Spectral mode (module-level state) # --------------------------------------------------------------------------- # When enabled, all spatial operators use FFT-based pseudo-spectral # differentiation instead of finite-difference stencils. Requires all # boundary conditions to be periodic. # # Spectral operators achieve exponential convergence for smooth problems # (band-limited functions are differentiated exactly), enabling N=64-128 # instead of N=512-1024 for equivalent accuracy. # # Reference: Burns et al. (2020), "Dedalus: A flexible framework for # numerical simulations with spectral methods", Phys. Rev. Research 2:023068. _use_spectral: bool = False """Whether spectral (FFT) operators are active."""
[docs] def set_spectral(enable: bool) -> None: # noqa: FBT001 """Enable or disable FFT-based spectral operators. When enabled, all spatial operators (gradient, Laplacian, cross-derivative, biharmonic) use ``np.fft.rfft``/``np.fft.irfft`` instead of FD stencils. Requires all boundary conditions to be periodic. Must be called before any operator evaluation (typically at simulation startup). Clears the wavenumber cache when toggled. Parameters ---------- enable : bool ``True`` to use spectral operators, ``False`` for finite-difference. """ global _use_spectral # noqa: PLW0603 if _use_spectral != enable: _wavenum_cache.clear() _use_spectral = enable
[docs] def get_spectral() -> bool: """Return whether spectral (FFT) operators are active.""" return _use_spectral
# Wavenumber cache: (n_points, dx) -> wavenumber array. # Pre-computed once per (N, dx) pair and reused for all subsequent calls. # Uses ``np.fft.rfftfreq`` for real-valued fields (half-complex storage). _wavenum_cache: dict[tuple[int, float], np.ndarray] = {}
[docs] def get_wavenumbers(n: int, dx: float) -> np.ndarray: """Return cached wavenumber array for ``rfft`` of length *n*. Wavenumbers: ``k = 2*pi*rfftfreq(n, d=dx)`` — the angular frequencies corresponding to the real-FFT output bins. Parameters ---------- n : int Number of grid points along the axis. dx : float Grid spacing. Returns ------- np.ndarray Wavenumber array of length ``n // 2 + 1``. """ key = (n, dx) cached = _wavenum_cache.get(key) if cached is not None: return cached k = 2.0 * np.pi * np.fft.rfftfreq(n, d=dx) _wavenum_cache[key] = k return k
# --------------------------------------------------------------------------- # BC data model # --------------------------------------------------------------------------- _VALID_BC_KINDS = frozenset({"dirichlet", "neumann", "robin"}) _VALID_BC = frozenset({"periodic", "neumann", "dirichlet", "robin"})
[docs] @dataclass(frozen=True) class SideBCSpec: """Boundary condition for one side (low or high) of one axis. All first-order BCs map to a ghost-cell formula:: ghost = const + factor * interior_cell where ``(const, factor)`` depend on the BC kind: - **Neumann** (df/dn = D): const = dx*D, factor = +1 - **Dirichlet** (f = V): const = 2*V, factor = -1 - **Robin** (d_n f + gamma*f = beta): const = 2*dx*beta / (gamma*dx + 2), factor = (2 - gamma*dx) / (gamma*dx + 2) Setting D=0, V=0 recovers the homogeneous cases. """ kind: str """BC type: ``"dirichlet"``, ``"neumann"``, or ``"robin"``.""" value: float = 0.0 """Dirichlet boundary value V, or Robin inhomogeneity β.""" derivative: float = 0.0 """Neumann ghost-cell offset D. Ghost cell is placed at ``interior + dx * D``. For homogeneous Neumann (zero normal flux), set D=0. The sign is the same at both low and high boundaries (py-pde convention; see Zwicker, JOSS 2020). """ gamma: float = 0.0 """Robin coefficient gamma in d_n f + gamma*f = beta. Must be >= 0.""" def __post_init__(self) -> None: if self.kind not in _VALID_BC_KINDS: msg = ( f"Invalid BC kind: {self.kind!r}. " f"Must be one of {sorted(_VALID_BC_KINDS)}" ) raise ValueError(msg) if self.kind == "robin" and self.gamma < 0: msg = f"Robin gamma must be ≥ 0, got {self.gamma}" raise ValueError(msg)
[docs] def ghost_params(self, dx: float) -> tuple[float, float]: """Return ``(const, factor)`` for ``ghost = const + factor * interior``. Parameters ---------- dx : float Grid spacing in the normal direction. """ if self.kind == "neumann": return (dx * self.derivative, 1.0) if self.kind == "dirichlet": return (2.0 * self.value, -1.0) # Robin: d_n f + gamma*f = beta # When gamma*dx >= 2, factor <= 0 (unstable); see check_robin_stability(). denom = self.gamma * dx + 2.0 return ( 2.0 * dx * self.value / denom, (2.0 - self.gamma * dx) / denom, )
[docs] @dataclass(frozen=True) class AxisBCSpec: """Boundary condition for one axis: periodic or distinct low/high sides. For periodic axes, ``low`` and ``high`` must be ``None``. For non-periodic axes, both must be ``SideBCSpec`` instances. """ periodic: bool = False low: SideBCSpec | None = None high: SideBCSpec | None = None def __post_init__(self) -> None: if self.periodic: if self.low is not None or self.high is not None: msg = "Periodic axis cannot have low/high BC specs" raise ValueError(msg) elif self.low is None or self.high is None: msg = "Non-periodic axis requires both low and high BC specs" raise ValueError(msg)
# Singleton cache for _str_to_axis_bc — avoids repeated AxisBCSpec/SideBCSpec # allocation for the same BC string. Safe because AxisBCSpec is frozen. _AXIS_BC_CACHE: dict[str, AxisBCSpec] = {} def _str_to_axis_bc(bc_str: str) -> AxisBCSpec: """Convert a legacy BC string to an ``AxisBCSpec``. ``"periodic"`` maps to a periodic axis. ``"neumann"``/``"dirichlet"`` map to symmetric homogeneous BC on both sides. ``"robin"`` maps to zero Robin on both sides (gamma=0, beta=0, equivalent to Neumann). Results are cached (singleton pattern) since ``AxisBCSpec`` is frozen. Raises ------ ValueError If *bc_str* is not a recognized BC type. """ cached = _AXIS_BC_CACHE.get(bc_str) if cached is not None: return cached if bc_str == "periodic": result = AxisBCSpec(periodic=True) elif bc_str not in _VALID_BC: msg = f"Unknown BC type: {bc_str!r}; valid: {sorted(_VALID_BC)}" raise ValueError(msg) else: side = SideBCSpec(kind=bc_str) result = AxisBCSpec(periodic=False, low=side, high=side) _AXIS_BC_CACHE[bc_str] = result return result
[docs] def is_periodic_bc(bc_entry: str | AxisBCSpec) -> bool: """Check whether a single axis BC entry is periodic. Handles both legacy string BCs and structured ``AxisBCSpec`` objects, so callers don't need to type-check manually. """ if isinstance(bc_entry, str): return bc_entry == "periodic" return bc_entry.periodic
# Legacy type alias — widened to accept structured specs BCSpec = str | tuple[str, ...] | tuple[AxisBCSpec, ...] """Single BC string, per-axis strings, or per-axis ``AxisBCSpec`` objects.""" def _normalize_bc(bc: BCSpec, grid: GridInfo) -> tuple[str | AxisBCSpec, ...]: """Expand a BC spec to a per-axis tuple, validating each entry. Returns a tuple of either strings or ``AxisBCSpec`` objects (never mixed within one call — the type depends on what was passed in). Pre-normalized tuples (from a previous ``_normalize_bc`` call) pass through with only a fast length check, avoiding per-element validation. Raises ------ ValueError If the number of BC entries doesn't match ``grid.ndim`` or an unknown BC type is encountered. TypeError If a BC entry is neither a string nor an ``AxisBCSpec``. """ # Fast path: already a tuple of correct length (pre-normalized) if isinstance(bc, tuple) and len(bc) == grid.ndim: return bc if isinstance(bc, str): bcs: tuple[str | AxisBCSpec, ...] = (bc,) * grid.ndim else: bcs = tuple(bc) if len(bcs) != grid.ndim: msg = f"Expected {grid.ndim} BC entries, got {len(bcs)}" raise ValueError(msg) for i, b in enumerate(bcs): if isinstance(b, str): if b not in _VALID_BC: msg = f"Unknown BC type {b!r} for axis {i}; valid: {sorted(_VALID_BC)}" raise ValueError(msg) elif not isinstance(b, AxisBCSpec): # pyright: ignore[reportUnnecessaryIsInstance] msg = f"BC entry {i} must be str or AxisBCSpec, got {type(b).__name__}" raise TypeError(msg) return bcs def _bc_from_grid(grid: GridInfo) -> tuple[str, ...]: """Infer BCs from grid periodicity (periodic or neumann).""" return tuple("periodic" if p else "neumann" for p in grid.periodic) def _resolve_axis_bc(bc_entry: str | AxisBCSpec) -> AxisBCSpec: # pyright: ignore[reportUnusedFunction] # used by rhs.py """Convert a per-axis BC entry to an ``AxisBCSpec``. Strings are converted via ``_str_to_axis_bc``. ``AxisBCSpec`` objects pass through unchanged. """ if isinstance(bc_entry, AxisBCSpec): return bc_entry return _str_to_axis_bc(bc_entry) # --------------------------------------------------------------------------- # Cached slice-tuple helpers (avoid per-call list + tuple allocation) # --------------------------------------------------------------------------- # Module-level constants for common slices _SLC_LO = slice(0, -2) # ghost-cell frame: left neighbor _SLC_MID = slice(1, -1) # ghost-cell frame: center _SLC_HI = slice(2, None) # ghost-cell frame: right neighbor _SLC_GHOST_L = slice(0, 1) # left ghost cell _SLC_INTERIOR = slice(1, -1) # interior (for writing) def _slice_tuple(ndim: int, axis: int, axis_slice: slice) -> tuple[slice, ...]: """Build a tuple of slices with *axis_slice* at position *axis*.""" s = [slice(None)] * ndim s[axis] = axis_slice return tuple(s) # Pre-computed cache for the most common (ndim, axis, slice) combos. # Populated lazily on first use per combo. Thread-safe for reads (dict). _SLICE_CACHE: dict[tuple[int, int, int], tuple[slice, ...]] = {} # Map slice objects to small ints for cache keys _SLC_ID = {id(_SLC_LO): 0, id(_SLC_MID): 1, id(_SLC_HI): 2} def _cached_slice(ndim: int, axis: int, slc: slice) -> tuple[slice, ...]: """Return cached slice tuple for (ndim, axis, slc). Falls back to ``_slice_tuple`` for non-standard slices. """ slc_id = _SLC_ID.get(id(slc)) if slc_id is None: return _slice_tuple(ndim, axis, slc) key = (ndim, axis, slc_id) cached = _SLICE_CACHE.get(key) if cached is not None: return cached result = _slice_tuple(ndim, axis, slc) _SLICE_CACHE[key] = result return result # --------------------------------------------------------------------------- # Ghost-cell padding helpers # --------------------------------------------------------------------------- class _PadEntry: """Pre-allocated buffer and cached slices for one (shape, axis, n_ghost) combo. ALL slice tuples are pre-computed at construction time so the hot-path ``_pad_axis()`` does zero per-call tuple/slice construction. Supports variable ghost-cell width for higher-order FD stencils: - Order 2: 1 ghost cell per side (3-point stencil) - Order 4: 2 ghost cells per side (5-point stencil) - Order 6: 3 ghost cells per side (7-point stencil) """ __slots__ = ( "buf", "interior_slc", "left_slcs", "ng", "periodic_left_src", "periodic_right_src", "recursive_left_src", "recursive_right_src", "right_slcs", "src_hi_slcs", "src_lo_slcs", ) def __init__(self, data_shape: tuple[int, ...], axis: int, ng: int) -> None: n = data_shape[axis] ndim = len(data_shape) self.ng = ng padded_shape = list(data_shape) padded_shape[axis] += 2 * ng self.buf = np.empty(tuple(padded_shape)) # Interior: data sits at [ng .. n+ng) self.interior_slc = _slice_tuple(ndim, axis, slice(ng, n + ng)) # Per-layer ghost destination slices in padded buffer self.left_slcs: list[tuple[slice, ...]] = [] self.right_slcs: list[tuple[slice, ...]] = [] # Per-layer source slices from original data (for non-periodic k=0) self.src_lo_slcs: list[tuple[slice, ...]] = [] self.src_hi_slcs: list[tuple[slice, ...]] = [] # Per-layer source slices from data for periodic wrapping self.periodic_left_src: list[tuple[slice, ...]] = [] self.periodic_right_src: list[tuple[slice, ...]] = [] # Per-layer source slices from padded buffer for recursive ghost fill (k>0) self.recursive_left_src: list[tuple[slice, ...]] = [] self.recursive_right_src: list[tuple[slice, ...]] = [] for k in range(ng): # Ghost cell destination: left_slcs[k] = layer k (0=outermost) self.left_slcs.append(_slice_tuple(ndim, axis, slice(k, k + 1))) self.right_slcs.append( _slice_tuple(ndim, axis, slice(n + 2 * ng - 1 - k, n + 2 * ng - k)) ) # Non-periodic source from data boundary (innermost=index 0) self.src_lo_slcs.append(_slice_tuple(ndim, axis, slice(ng - 1 - k, ng - k))) self.src_hi_slcs.append(_slice_tuple(ndim, axis, slice(n + k, n + k + 1))) # Periodic source from data: left ghost copies from right end self.periodic_left_src.append( _slice_tuple(ndim, axis, slice(n - ng + k, n - ng + k + 1)) ) # Periodic source from data: right ghost copies from left end self.periodic_right_src.append( _slice_tuple(ndim, axis, slice(ng - 1 - k, ng - k)) ) # Recursive ghost fill source (from padded buffer, for k>0) if k > 0: # Left: source from previously-filled ghost at index (ng - k) src_l = ng - k self.recursive_left_src.append( _slice_tuple(ndim, axis, slice(src_l, src_l + 1)) ) # Right: source from previously-filled ghost src_r = n + ng + k - 1 self.recursive_right_src.append( _slice_tuple(ndim, axis, slice(src_r, src_r + 1)) ) else: # Placeholders for k=0 (use data directly, not buffer) self.recursive_left_src.append(_slice_tuple(ndim, axis, slice(0, 1))) self.recursive_right_src.append( _slice_tuple(ndim, axis, slice(n - 1, n)) ) class _PadBufferCache: """Lazily allocate and reuse padded buffers for ghost-cell operations. Keyed by ``(data.shape, axis, n_ghost)`` — since TIDAL operates on a fixed grid with a fixed FD order per run, each key is allocated only once. Single-threaded assumption (no locking). """ __slots__ = ("_cache",) def __init__(self) -> None: self._cache: dict[tuple[tuple[int, ...], int, int], _PadEntry] = {} def clear(self) -> None: """Invalidate all cached padding buffers.""" self._cache.clear() def get( self, data_shape: tuple[int, ...], axis: int, ng: int | None = None ) -> _PadEntry: if ng is None: ng = _n_ghost key = (data_shape, axis, ng) entry = self._cache.get(key) if entry is None: entry = _PadEntry(data_shape, axis, ng) self._cache[key] = entry return entry _pad_cache = _PadBufferCache() def _pad_axis( data: np.ndarray, axis: int, bc: str | AxisBCSpec, dx: float = 1.0, ng: int | None = None, ) -> np.ndarray: """Pad *data* with ghost cells on each side along *axis*. Uses pre-allocated buffers and cached slice tuples (via ``_pad_cache``) for zero-alloc ghost-cell writes in the hot path. The number of ghost cells is determined by the current FD order (``_n_ghost``), or overridden by *ng*. Parameters ---------- data : np.ndarray Field data to pad. axis : int Axis along which to pad. bc : str or AxisBCSpec Boundary condition for this axis. dx : float Grid spacing along this axis (needed for Neumann derivative and Robin formula; default 1.0 for backward compat). ng : int, optional Override ghost-cell count (default: ``_n_ghost`` from FD order). """ if isinstance(bc, str): bc = _str_to_axis_bc(bc) if ng is None: ng = _n_ghost entry = _pad_cache.get(data.shape, axis, ng) buf = entry.buf # Copy interior into buffer buf[entry.interior_slc] = data if bc.periodic: # All source slices pre-cached in _PadEntry — zero per-call construction. for k in range(ng): buf[entry.left_slcs[k]] = data[entry.periodic_left_src[k]] buf[entry.right_slcs[k]] = data[entry.periodic_right_src[k]] else: assert bc.low is not None # guaranteed by AxisBCSpec.__post_init__ assert bc.high is not None c_lo, f_lo = bc.low.ghost_params(dx) c_hi, f_hi = bc.high.ghost_params(dx) # Fill ghost cells layer-by-layer from interior outward. # Layer 0 (innermost, adjacent to data boundary) uses data edge. # Layer k>0 uses the previously-filled ghost layer as source, # implementing the recursive ghost-cell fill standard in # computational physics (LeVeque, "Finite Difference Methods for # Ordinary and Partial Differential Equations", SIAM, 2007, §2.12). # # All slice tuples pre-cached in _PadEntry for zero per-call overhead. for k in range(ng): left_ghost_idx = ng - 1 - k # innermost first left_ghost_slc = entry.left_slcs[left_ghost_idx] if k == 0: np.multiply( f_lo, data[entry.recursive_left_src[0]], out=buf[left_ghost_slc] ) else: np.multiply( f_lo, buf[entry.recursive_left_src[k]], out=buf[left_ghost_slc] ) if c_lo != 0.0: buf[left_ghost_slc] += c_lo right_ghost_idx = ng - 1 - k right_ghost_slc = entry.right_slcs[right_ghost_idx] if k == 0: np.multiply( f_hi, data[entry.recursive_right_src[0]], out=buf[right_ghost_slc] ) else: np.multiply( f_hi, buf[entry.recursive_right_src[k]], out=buf[right_ghost_slc] ) if c_hi != 0.0: buf[right_ghost_slc] += c_hi return buf # --------------------------------------------------------------------------- # Core stencil operations # ---------------------------------------------------------------------------
[docs] def gradient( data: np.ndarray, axis: int, grid: GridInfo, bc: BCSpec | None = None, ) -> np.ndarray: """Central-difference gradient ∂f/∂x_i. Stencil width adapts to the current FD order (set via ``set_fd_order``): - **Order 2** (3-point): ``(f[i+1] - f[i-1]) / (2·dx)`` - **Order 4** (5-point): ``(f[i-2]/12 - 2f[i-1]/3 + 2f[i+1]/3 - f[i+2]/12) / dx`` - **Order 6** (7-point): ``(-f[i-3]/60 + 3f[i-2]/20 - 3f[i-1]/4 + 3f[i+1]/4 - 3f[i+2]/20 + f[i+3]/60) / dx`` Reference: Fornberg (1988), Mathematics of Computation 51(184), Table 1. Parameters ---------- data : np.ndarray Field values, shape must match ``grid.shape``. axis : int Spatial axis along which to differentiate. grid : GridInfo Grid descriptor. bc : str or tuple of str, optional Boundary conditions. Defaults to grid periodicity inference. Returns ------- np.ndarray Gradient array, same shape as *data*. """ # Spectral fast path: FFT-based differentiation (periodic only) if _use_spectral: return _spectral_gradient(data, axis, grid, bc) bcs = _normalize_bc(bc, grid) if bc is not None else _bc_from_grid(grid) dx = grid.dx[axis] bc_axis = bcs[axis] # Inline _resolve_axis_bc to avoid function call overhead axis_bc = bc_axis if isinstance(bc_axis, AxisBCSpec) else _str_to_axis_bc(bc_axis) ng = _n_ghost padded = _pad_axis(data, axis, axis_bc, dx, ng=ng) ndim = data.ndim inv_dx = 1.0 / dx if _fd_order == 2: # noqa: PLR2004 slc_left = _cached_slice(ndim, axis, _SLC_LO) slc_right = _cached_slice(ndim, axis, _SLC_HI) result = np.subtract(padded[slc_right], padded[slc_left]) result *= 0.5 * inv_dx elif _fd_order == 4: # noqa: PLR2004 result = _gradient_o4(padded, axis, ndim, ng, inv_dx) else: # order 6 result = _gradient_o6(padded, axis, ndim, ng, inv_dx) return result
def _gradient_o4( padded: np.ndarray, axis: int, ndim: int, ng: int, inv_dx: float, ) -> np.ndarray: """4th-order central gradient on a padded array (5-point stencil). Coefficients: {1/12, -2/3, 0, 2/3, -1/12} / dx Reference: Fornberg (1988), Table 1, d=1, N=2. """ n = padded.shape[axis] - 2 * ng def _slc(offset: int) -> tuple[slice, ...]: start = ng + offset return _slice_tuple(ndim, axis, slice(start, start + n)) # Fornberg (1988) Table 1, d=1, N=2 coefficients result = np.multiply(1.0 / 12.0, padded[_slc(-2)]) result -= np.multiply(2.0 / 3.0, padded[_slc(-1)]) result += np.multiply(2.0 / 3.0, padded[_slc(1)]) result -= np.multiply(1.0 / 12.0, padded[_slc(2)]) result *= inv_dx return result def _gradient_o6( padded: np.ndarray, axis: int, ndim: int, ng: int, inv_dx: float, ) -> np.ndarray: """6th-order central gradient on a padded array (7-point stencil). Coefficients: {-1/60, 3/20, -3/4, 0, 3/4, -3/20, 1/60} / dx Reference: Fornberg (1988), Table 1, d=1, N=3. """ n = padded.shape[axis] - 2 * ng def _slc(offset: int) -> tuple[slice, ...]: start = ng + offset return _slice_tuple(ndim, axis, slice(start, start + n)) result = np.multiply(-1.0 / 60.0, padded[_slc(-3)]) result += np.multiply(3.0 / 20.0, padded[_slc(-2)]) result -= np.multiply(3.0 / 4.0, padded[_slc(-1)]) result += np.multiply(3.0 / 4.0, padded[_slc(1)]) result -= np.multiply(3.0 / 20.0, padded[_slc(2)]) result += np.multiply(1.0 / 60.0, padded[_slc(3)]) result *= inv_dx return result def _directional_laplacian_raw( data: np.ndarray, axis: int, grid: GridInfo, bc_axis: str | AxisBCSpec, ) -> np.ndarray: """Central second derivative ∂²f/∂x_i² with pre-resolved per-axis BC. Stencil width adapts to the current FD order: - **Order 2** (3-point): ``(f[i+1] - 2f[i] + f[i-1]) / dx²`` - **Order 4** (5-point): ``(-f[i-2]/12 + 4f[i-1]/3 - 5f[i]/2 + 4f[i+1]/3 - f[i+2]/12) / dx²`` - **Order 6** (7-point): ``(f[i-3]/90 - 3f[i-2]/20 + 3f[i-1]/2 - 49f[i]/18 + 3f[i+1]/2 - 3f[i+2]/20 + f[i+3]/90) / dx²`` Reference: Fornberg (1988), Mathematics of Computation 51(184), Table 1. Used by ``directional_laplacian`` and ``laplacian`` to avoid redundant BC normalization when computing multiple axes. """ dx = grid.dx[axis] inv_dx2 = 1.0 / (dx * dx) # Inline _resolve_axis_bc to avoid function call overhead axis_bc = bc_axis if isinstance(bc_axis, AxisBCSpec) else _str_to_axis_bc(bc_axis) ng = _n_ghost padded = _pad_axis(data, axis, axis_bc, dx, ng=ng) ndim = data.ndim if _fd_order == 2: # noqa: PLR2004 slc_left = _cached_slice(ndim, axis, _SLC_LO) slc_center = _cached_slice(ndim, axis, _SLC_MID) slc_right = _cached_slice(ndim, axis, _SLC_HI) # In-place arithmetic: result = (right - 2*center + left) * inv_dx2 result = np.subtract(padded[slc_right], padded[slc_center]) result -= padded[slc_center] result += padded[slc_left] result *= inv_dx2 elif _fd_order == 4: # noqa: PLR2004 result = _laplacian_raw_o4(padded, axis, ndim, ng, inv_dx2) else: # order 6 result = _laplacian_raw_o6(padded, axis, ndim, ng, inv_dx2) return result def _laplacian_raw_o4( padded: np.ndarray, axis: int, ndim: int, ng: int, inv_dx2: float, ) -> np.ndarray: """4th-order central Laplacian on a padded array (5-point stencil). Coefficients: {-1/12, 4/3, -5/2, 4/3, -1/12} / dx² Reference: Fornberg (1988), Table 1, d=2, N=2. """ n = padded.shape[axis] - 2 * ng def _slc(offset: int) -> tuple[slice, ...]: start = ng + offset return _slice_tuple(ndim, axis, slice(start, start + n)) result = np.multiply(-1.0 / 12.0, padded[_slc(-2)]) result += np.multiply(4.0 / 3.0, padded[_slc(-1)]) result -= np.multiply(5.0 / 2.0, padded[_slc(0)]) result += np.multiply(4.0 / 3.0, padded[_slc(1)]) result -= np.multiply(1.0 / 12.0, padded[_slc(2)]) result *= inv_dx2 return result def _laplacian_raw_o6( padded: np.ndarray, axis: int, ndim: int, ng: int, inv_dx2: float, ) -> np.ndarray: """6th-order central Laplacian on a padded array (7-point stencil). Coefficients: {1/90, -3/20, 3/2, -49/18, 3/2, -3/20, 1/90} / dx² Reference: Fornberg (1988), Table 1, d=2, N=3. """ n = padded.shape[axis] - 2 * ng def _slc(offset: int) -> tuple[slice, ...]: start = ng + offset return _slice_tuple(ndim, axis, slice(start, start + n)) result = np.multiply(1.0 / 90.0, padded[_slc(-3)]) result -= np.multiply(3.0 / 20.0, padded[_slc(-2)]) result += np.multiply(3.0 / 2.0, padded[_slc(-1)]) result -= np.multiply(49.0 / 18.0, padded[_slc(0)]) result += np.multiply(3.0 / 2.0, padded[_slc(1)]) result -= np.multiply(3.0 / 20.0, padded[_slc(2)]) result += np.multiply(1.0 / 90.0, padded[_slc(3)]) result *= inv_dx2 return result
[docs] def directional_laplacian( data: np.ndarray, axis: int, grid: GridInfo, bc: BCSpec | None = None, ) -> np.ndarray: """Second derivative ∂²f/∂x_i² (FD stencil or spectral FFT).""" if _use_spectral: return _spectral_directional_laplacian(data, axis, grid, bc) bcs = _normalize_bc(bc, grid) if bc is not None else _bc_from_grid(grid) return _directional_laplacian_raw(data, axis, grid, bcs[axis])
[docs] def laplacian( data: np.ndarray, grid: GridInfo, bc: BCSpec | None = None, ) -> np.ndarray: """Full Laplacian nabla^2 f = sum_i d^2f/dx_i^2 (FD or spectral).""" if _use_spectral: return _spectral_laplacian(data, grid, bc) bcs = _normalize_bc(bc, grid) if bc is not None else _bc_from_grid(grid) result = _directional_laplacian_raw(data, 0, grid, bcs[0]) for ax in range(1, grid.ndim): result += _directional_laplacian_raw(data, ax, grid, bcs[ax]) return result
[docs] def cross_derivative( data: np.ndarray, axis1: int, axis2: int, grid: GridInfo, bc: BCSpec | None = None, ) -> np.ndarray: """Mixed second derivative d^2f/(dx_i dx_j) (FD or spectral). FD mode: successive application of the 1D gradient operator along each axis, inheriting the current FD order from :func:`set_fd_order`. Spectral mode: successive FFT-based gradients (exact for band-limited functions). Reference: LeVeque (2007), "Finite Difference Methods", SIAM, S1.4. """ if _use_spectral: return _spectral_cross_derivative(data, axis1, axis2, grid, bc) tmp = gradient(data, axis1, grid, bc) return gradient(tmp, axis2, grid, bc)
[docs] def biharmonic( data: np.ndarray, grid: GridInfo, bc: BCSpec | None = None, ) -> np.ndarray: r"""Biharmonic operator nabla^4 f = nabla^2(nabla^2 f) (FD or spectral).""" if _use_spectral: return _spectral_biharmonic(data, grid, bc) return laplacian(laplacian(data, grid, bc), grid, bc)
[docs] def identity( data: np.ndarray, grid: GridInfo, # noqa: ARG001 bc: BCSpec | None = None, # noqa: ARG001 ) -> np.ndarray: """Identity operator — returns data unchanged (no copy).""" return data
# --------------------------------------------------------------------------- # Spectral (FFT) operators # --------------------------------------------------------------------------- # Pseudo-spectral differentiation: rfft -> multiply by spectral symbol -> irfft. # Exponential convergence for smooth (band-limited) problems on periodic domains. # # The pseudo-spectral approach computes the derivative in Fourier space: # gradient: f_hat *= i*k (first derivative) # laplacian: f_hat *= -k^2 (second derivative) # cross: f_hat *= -k_i * k_j (mixed second derivative) # biharmonic: f_hat *= k^4 (fourth derivative) # # Position-dependent coefficients are applied in physical space AFTER the # operator returns, following the standard pseudo-spectral pattern in the # RHS evaluator (rhs.py): result = coeff(x) * operator(field). # # Reference: Burns et al. (2020), Phys. Rev. Research 2:023068. def _spectral_gradient( data: np.ndarray, axis: int, grid: GridInfo, bc: BCSpec | None = None, # noqa: ARG001 ) -> np.ndarray: """Spectral gradient ∂f/∂x_i via FFT. Computes ``irfft(i*k * rfft(f))`` along the specified axis. Exact for band-limited functions (up to Nyquist frequency). """ n = grid.shape[axis] dx = grid.dx[axis] k = get_wavenumbers(n, dx) # Reshape k for broadcasting along the target axis shape = [1] * data.ndim shape[axis] = len(k) k_shaped = k.reshape(shape) f_hat = np.fft.rfft(data, axis=axis) f_hat *= 1j * k_shaped return np.fft.irfft(f_hat, n=n, axis=axis) def _spectral_directional_laplacian( data: np.ndarray, axis: int, grid: GridInfo, bc: BCSpec | None = None, # noqa: ARG001 ) -> np.ndarray: """Spectral second derivative ∂²f/∂x_i² via FFT. Computes ``irfft(-k^2 * rfft(f))`` along the specified axis. """ n = grid.shape[axis] dx = grid.dx[axis] k = get_wavenumbers(n, dx) shape = [1] * data.ndim shape[axis] = len(k) k2_shaped = (-(k**2)).reshape(shape) f_hat = np.fft.rfft(data, axis=axis) f_hat *= k2_shaped return np.fft.irfft(f_hat, n=n, axis=axis) def _spectral_laplacian( data: np.ndarray, grid: GridInfo, bc: BCSpec | None = None, ) -> np.ndarray: """Spectral full Laplacian sum_i d^2f/dx_i^2 via FFT.""" result = _spectral_directional_laplacian(data, 0, grid, bc) for ax in range(1, grid.ndim): result += _spectral_directional_laplacian(data, ax, grid, bc) return result def _spectral_cross_derivative( data: np.ndarray, axis1: int, axis2: int, grid: GridInfo, bc: BCSpec | None = None, # noqa: ARG001 ) -> np.ndarray: """Spectral mixed derivative d^2f/(dx_i dx_j) via successive FFTs. Applies spectral gradient along axis1 then axis2. """ tmp = _spectral_gradient(data, axis1, grid) return _spectral_gradient(tmp, axis2, grid) def _spectral_biharmonic( data: np.ndarray, grid: GridInfo, bc: BCSpec | None = None, ) -> np.ndarray: r"""Spectral biharmonic operator nabla^4 f = nabla^2(nabla^2 f).""" return _spectral_laplacian(_spectral_laplacian(data, grid, bc), grid, bc) # --------------------------------------------------------------------------- # Operator registry # --------------------------------------------------------------------------- # Each entry maps operator name → callable(data, grid, bc) → ndarray. # Partials bind axis arguments for directional operators. def _make_directional_laplacian(ax: int): # noqa: ANN202 def _op(data: np.ndarray, grid: GridInfo, bc: BCSpec | None = None) -> np.ndarray: # Must go through directional_laplacian() for spectral dispatch. # In FD mode this normalizes BCs and calls _directional_laplacian_raw. # In spectral mode it dispatches to _spectral_directional_laplacian. return directional_laplacian(data, ax, grid, bc) return _op def _make_gradient(ax: int): # noqa: ANN202 def _op(data: np.ndarray, grid: GridInfo, bc: BCSpec | None = None) -> np.ndarray: return gradient(data, ax, grid, bc) return _op def _make_cross_derivative(ax1: int, ax2: int): # noqa: ANN202 def _op(data: np.ndarray, grid: GridInfo, bc: BCSpec | None = None) -> np.ndarray: return cross_derivative(data, ax1, ax2, grid, bc) return _op def _make_third_derivative(ax: int): # noqa: ANN202 """Third-order spatial derivative ∂³f/∂x³ = ∂_x(∂²_x f). FD: applies gradient to directional laplacian (composition). Spectral: uses ik³_x multiplier (odd order → imaginary). """ def _op(data: np.ndarray, grid: GridInfo, bc: BCSpec | None = None) -> np.ndarray: return gradient(directional_laplacian(data, ax, grid, bc), ax, grid, bc) return _op OPERATOR_REGISTRY: dict[str, Any] = { "identity": identity, "laplacian": laplacian, "laplacian_x": _make_directional_laplacian(0), "laplacian_y": _make_directional_laplacian(1), "laplacian_z": _make_directional_laplacian(2), "gradient_x": _make_gradient(0), "gradient_y": _make_gradient(1), "gradient_z": _make_gradient(2), "cross_derivative_xy": _make_cross_derivative(0, 1), "cross_derivative_xz": _make_cross_derivative(0, 2), "cross_derivative_yz": _make_cross_derivative(1, 2), "biharmonic": biharmonic, "derivative_3_x": _make_third_derivative(0), "derivative_3_y": _make_third_derivative(1), "derivative_3_z": _make_third_derivative(2), } # Minimum spatial dimension required by each operator _OPERATOR_MIN_DIM: dict[str, int] = { "identity": 1, "laplacian": 1, "laplacian_x": 1, "laplacian_y": 2, "laplacian_z": 3, "gradient_x": 1, "gradient_y": 2, "gradient_z": 3, "cross_derivative_xy": 2, "cross_derivative_xz": 3, "cross_derivative_yz": 3, "biharmonic": 1, "derivative_3_x": 1, "derivative_3_y": 2, "derivative_3_z": 3, "first_derivative_t": 1, }
[docs] def operator_min_dim(name: str) -> int: """Return the minimum spatial dimension required by *name*. Raises ------ ValueError If *name* is not a recognized operator. """ dim = _OPERATOR_MIN_DIM.get(name) if dim is not None: return dim msg = f"Unknown operator {name!r}; known: {sorted(_OPERATOR_MIN_DIM)}" raise ValueError(msg)
[docs] def apply_operator( name: str, data: np.ndarray, grid: GridInfo, bc: BCSpec | None = None, ) -> np.ndarray: """Look up *name* in the registry and apply it to *data*. Raises ------ ValueError If *name* is not a known operator. """ fn = OPERATOR_REGISTRY.get(name) if fn is None: msg = f"Unknown operator {name!r}; known: {sorted(OPERATOR_REGISTRY)}" raise ValueError(msg) result: np.ndarray = fn(data, grid, bc) return result