Source code for tidal.solver.sparsity

"""Analytical Jacobian sparsity pattern for TIDAL IDA solver.

Builds the sparsity structure of the IDA Jacobian ``J = dF/dy + cj * dF/dyp``
directly from the equation spec and operator stencils — no numerical probing.
This avoids the O(N^2) cost of ``sksundae.jacband.j_pattern()`` (which
allocates a dense N x N array) and gives exact structure in milliseconds.

The sparsity pattern is returned as a ``scipy.sparse.csc_matrix`` suitable
for passing to ``sksundae.ida.IDA(linsolver='sparse', sparsity=pattern)``,
which uses SuperLU_MT for direct factorisation.

Reference: SuperLU_MT, Li & Demmel, ACM TOMS 29(2), 2003.
"""

from __future__ import annotations

import itertools
from typing import TYPE_CHECKING

import numpy as np

from tidal.solver._defaults import SECOND_ORDER
from tidal.solver._scipy_types import SparseMatrix, coo_matrix, csc_matrix

if TYPE_CHECKING:
    from tidal.solver.grid import GridInfo
    from tidal.solver.operators import BCSpec
    from tidal.solver.state import StateLayout
    from tidal.symbolic.json_loader import EquationSystem, OperatorTerm

# ---------------------------------------------------------------------------
# Operator -> stencil offsets
# ---------------------------------------------------------------------------

# Each operator maps to a set of relative grid-coordinate offsets that
# describe which neighbor grid points contribute to the output at a given
# point.  These are used to populate the Jacobian sparsity blocks.
#
# The stencil width adapts to the current FD order (from operators.py):
#   Order 2: [-1, 0, +1] (3-point)
#   Order 4: [-2, -1, 0, +1, +2] (5-point)
#   Order 6: [-3, -2, -1, 0, +1, +2, +3] (7-point)
#
# Reference: Fornberg (1988), Mathematics of Computation 51(184), Table 1.
#
# Convention: offset tuple length = spatial_dimension (ndim).
# For operators that work per-axis, ``_axis_stencil`` builds the full
# offset tuple from the axis index.

_AXIS_MAP = {"x": 0, "y": 1, "z": 2}


def _fd_deltas() -> list[int]:
    """Return the stencil offsets for the current FD order.

    - Order 2: ``[-1, 0, 1]``
    - Order 4: ``[-2, -1, 0, 1, 2]``
    - Order 6: ``[-3, -2, -1, 0, 1, 2, 3]``
    """
    from tidal.solver.operators import get_n_ghost  # noqa: PLC0415

    ng = get_n_ghost()
    return list(range(-ng, ng + 1))


def _axis_offsets(axis: int, ndim: int, deltas: list[int]) -> list[tuple[int, ...]]:
    """Build N-D offset tuples for *deltas* along a single *axis*."""
    offsets: list[tuple[int, ...]] = []
    for d in deltas:
        o = [0] * ndim
        o[axis] = d
        offsets.append(tuple(o))
    return offsets


def _cross_offsets(
    ax1: int,
    ax2: int,
    ndim: int,
) -> list[tuple[int, ...]]:
    """Build offsets for cross-derivative ∂²f/(∂x_i ∂x_j).

    For higher-order FD, the cross derivative uses the product of 1D gradient
    stencils along each axis.  The sparsity pattern is the Cartesian product
    of the per-axis deltas (conservative superset of the actual stencil).
    Plus self at origin (covers the cj diagonal contribution).
    """
    deltas = _fd_deltas()
    offsets_set: set[tuple[int, ...]] = set()
    for d1 in deltas:
        for d2 in deltas:
            o = [0] * ndim
            o[ax1] = d1
            o[ax2] = d2
            offsets_set.add(tuple(o))
    # Include self (origin) for robustness
    offsets_set.add((0,) * ndim)
    return list(offsets_set)


def _biharmonic_offsets(ndim: int) -> list[tuple[int, ...]]:
    r"""Build offsets for biharmonic ∇⁴f = ∇²(∇²f).

    The biharmonic stencil is the convolution of the Laplacian stencil with
    itself.  The effective radius doubles relative to the Laplacian:

    - Order 2: L1-radius 2  (offsets within \|Δ\| ≤ 2)
    - Order 4: L1-radius 4  (offsets within \|Δ\| ≤ 4)
    - Order 6: L1-radius 6  (offsets within \|Δ\| ≤ 6)

    Reference: composition of Fornberg stencils — the convolved stencil
    radius is the sum of the inner and outer stencil radii.
    """
    from tidal.solver.operators import get_n_ghost  # noqa: PLC0415

    ng = get_n_ghost()
    radius = 2 * ng  # biharmonic = laplacian(laplacian), so radius doubles
    offsets_set: set[tuple[int, ...]] = set()
    for combo in itertools.product(range(-radius, radius + 1), repeat=ndim):
        if sum(abs(c) for c in combo) <= radius:
            offsets_set.add(combo)
    return list(offsets_set)


def _gradient_or_laplacian_offsets(
    name: str, ndim: int
) -> list[tuple[int, ...]] | None:
    """Try to resolve gradient_X or laplacian_X operators."""
    deltas = _fd_deltas()
    if name.startswith("gradient_"):
        ax = _AXIS_MAP.get(name[-1])
        if ax is not None:
            return _axis_offsets(ax, ndim, deltas)

    if name.startswith("laplacian_"):
        parts = name.split("_")
        ax = _AXIS_MAP.get(parts[1]) if len(parts) > 1 else None
        if ax is not None:
            return _axis_offsets(ax, ndim, deltas)

    return None


def _full_laplacian_offsets(ndim: int) -> list[tuple[int, ...]]:
    """Full Laplacian = union of per-axis stencils."""
    deltas = _fd_deltas()
    zero = (0,) * ndim
    offsets_set: set[tuple[int, ...]] = {zero}
    for ax in range(ndim):
        offsets_set.update(_axis_offsets(ax, ndim, deltas))
    return list(offsets_set)


def _cross_derivative_offsets(name: str, ndim: int) -> list[tuple[int, ...]] | None:
    """Try to resolve cross_derivative_XY operators."""
    if not name.startswith("cross_derivative_"):
        return None
    suffix = name.rsplit("_", maxsplit=1)[-1]  # "xy", "xz", "yz"
    ax1 = _AXIS_MAP.get(suffix[0])
    ax2 = _AXIS_MAP.get(suffix[1]) if len(suffix) > 1 else None
    if ax1 is not None and ax2 is not None:
        return _cross_offsets(ax1, ax2, ndim)
    return None


[docs] def operator_stencil_offsets(name: str, ndim: int) -> list[tuple[int, ...]]: # noqa: PLR0911 """Return the grid-coordinate offsets for operator *name*. When spectral mode is active (``get_spectral() == True``), all spatial operators return dense coupling (all possible offsets) because FFT-based differentiation couples every grid point to every other. Identity and first_derivative_t remain local. Parameters ---------- name : str Operator name (e.g. ``"laplacian"``, ``"gradient_x"``). ndim : int Number of spatial dimensions. Returns ------- list[tuple[int, ...]] Offset tuples, each of length *ndim*. """ zero = (0,) * ndim # Identity and first_derivative_t: self-coupling only (same in both modes) if name in {"identity", "first_derivative_t"}: return [zero] # Spectral mode: all spatial operators are dense (all-to-all coupling). # This makes IDA's sparse Jacobian infrastructure ineffective — the CLI # guards against this combination (--spectral with --scheme ida). from tidal.solver.operators import get_spectral # noqa: PLC0415 if get_spectral(): # Return "all offsets" — conservative superset. In practice this # means the Jacobian is fully dense for spatial coupling blocks. # For IDA this would be O(N^2) — but the CLI prevents IDA+spectral. # For explicit solvers (leapfrog, scipy, cvode) no Jacobian is needed. # # We don't actually need to enumerate all N^ndim offsets here because # this function is only called during Jacobian sparsity construction, # which is an IDA-only path. Return a sentinel that _stencil_block # can handle, or a small superset. Since the CLI blocks IDA+spectral, # this path should not be reached in practice. import warnings # noqa: PLC0415 warnings.warn( "Spectral mode with Jacobian sparsity construction: " "returning dense coupling pattern. This is expected only " "for non-IDA solvers.", stacklevel=2, ) # Return all offsets within a generous radius to approximate dense ng_approx = 32 # large radius to approximate full coupling return list(itertools.product(range(-ng_approx, ng_approx + 1), repeat=ndim)) # Directional gradient or Laplacian result = _gradient_or_laplacian_offsets(name, ndim) if result is not None: return result # Full Laplacian if name == "laplacian": return _full_laplacian_offsets(ndim) # Cross derivative result = _cross_derivative_offsets(name, ndim) if result is not None: return result # Biharmonic if name == "biharmonic": return _biharmonic_offsets(ndim) # Unknown operator -- conservative: assume full local coupling (FD radius) from tidal.solver.operators import get_n_ghost # noqa: PLC0415 ng = get_n_ghost() return list(itertools.product(range(-ng, ng + 1), repeat=ndim))
# --------------------------------------------------------------------------- # Stencil -> sparse block construction (vectorized) # --------------------------------------------------------------------------- def _stencil_block( n_points: int, grid_shape: tuple[int, ...], offsets: list[tuple[int, ...]], periodic: tuple[bool, ...], ) -> tuple[np.ndarray, np.ndarray]: """Build (row, col) index arrays for a single operator stencil block. Parameters ---------- n_points : int Total grid points (product of grid_shape). grid_shape : tuple[int, ...] Shape of the spatial grid. offsets : list[tuple[int, ...]] Grid-coordinate offsets from :func:`operator_stencil_offsets`. periodic : tuple[bool, ...] Per-axis periodic flags. Returns ------- tuple[np.ndarray, np.ndarray] (rows, cols) -- flat grid indices (0-based, within a single slot). """ ndim = len(grid_shape) flat = np.arange(n_points) # Unravel to multi-index: shape (ndim, n_points) multi = np.array(np.unravel_index(flat, grid_shape)) all_rows: list[np.ndarray] = [] all_cols: list[np.ndarray] = [] for offset in offsets: neighbor = multi.copy() valid = np.ones(n_points, dtype=bool) for ax in range(ndim): shifted = neighbor[ax] + offset[ax] if periodic[ax]: shifted %= grid_shape[ax] else: # Non-periodic: clip and mark out-of-bounds oob = (shifted < 0) | (shifted >= grid_shape[ax]) valid &= ~oob shifted = np.clip(shifted, 0, grid_shape[ax] - 1) neighbor[ax] = shifted col_flat = np.asarray(np.ravel_multi_index(tuple(neighbor), grid_shape)) if not valid.all(): all_rows.append(flat[valid]) all_cols.append(col_flat[valid]) else: all_rows.append(flat) all_cols.append(col_flat) if not all_rows: return np.array([], dtype=np.intp), np.array([], dtype=np.intp) return np.concatenate(all_rows), np.concatenate(all_cols) # --------------------------------------------------------------------------- # Slot-coupling analysis # --------------------------------------------------------------------------- def _find_slot_for_field(layout: StateLayout, field_ref: str) -> int | None: """Map a field reference (e.g. ``"A_0"`` or ``"v_A_1"``) to a slot index. The field_ref may be: - A field name (``"A_0"``) -> field slot - A velocity name (``"v_A_1"``) -> velocity slot """ # Direct match on slot name for i, slot in enumerate(layout.slots): if slot.name == field_ref: return i # Try as field name in field_slot_map if field_ref in layout.field_slot_map: return layout.field_slot_map[field_ref] # Try as velocity reference: "v_X" -> velocity slot for field X if field_ref.startswith("v_"): base_field = field_ref[2:] # Strip "v_" prefix if base_field in layout.velocity_slot_map: return layout.velocity_slot_map[base_field] # Also try finding a field whose velocity slot name matches for slot_idx in layout.velocity_slot_map.values(): vel_slot = layout.slots[slot_idx] if vel_slot.name == field_ref: return slot_idx return None def _rhs_couplings( terms: tuple[OperatorTerm, ...], layout: StateLayout, ndim: int, ) -> list[tuple[int, list[tuple[int, ...]]]]: """Extract (col_slot_idx, stencil_offsets) for each RHS term. For ``first_derivative_t(field)``, the coupling target is the velocity slot ``v_{field}`` with identity stencil. Constraint velocity references (``v_X`` where ``X`` has time_order=0) couple through ``yp[X_field_slot]``. In the combined Jacobian ``J = dF/dy + cj * dF/dyp``, these contribute to the column of ``X``'s field slot with the operator's stencil. """ couplings: list[tuple[int, list[tuple[int, ...]]]] = [] for term in terms: if term.operator == "first_derivative_t": # Couples to velocity of the referenced field. # For constraint fields (time_order=0), the velocity is # injected from yp[field_slot] — couple to the field slot. vel_name = f"v_{term.field}" col_slot = _find_slot_for_field(layout, vel_name) if col_slot is not None: couplings.append((col_slot, [((0,) * ndim)])) else: # Constraint field: no velocity slot, couples via yp field_slot = layout.field_slot_map.get(term.field) if field_slot is not None: couplings.append((field_slot, [((0,) * ndim)])) else: col_slot = _find_slot_for_field(layout, term.field) if col_slot is not None: offsets = operator_stencil_offsets(term.operator, ndim) couplings.append((col_slot, offsets)) elif term.field.startswith("v_"): # Constraint velocity v_X: X has time_order=0, no # velocity slot. Coupling goes through yp[X_field_slot]. base_field = term.field[2:] field_slot = layout.field_slot_map.get(base_field) if field_slot is not None: offsets = operator_stencil_offsets(term.operator, ndim) couplings.append((field_slot, offsets)) return couplings # --------------------------------------------------------------------------- # Per-slot-type sparsity helpers # --------------------------------------------------------------------------- class _SparsityBuilder: """Accumulates (row, col) pairs for the full Jacobian sparsity pattern.""" def __init__( self, spec: EquationSystem, layout: StateLayout, grid: GridInfo, ) -> None: self.spec = spec self.layout = layout self.grid = grid self.n = grid.num_points self.ndim = grid.ndim self.eq_map: dict[str, int] = spec.equation_map self.all_rows: list[np.ndarray] = [] self.all_cols: list[np.ndarray] = [] def add_block( self, row_slot: int, col_slot: int, offsets: list[tuple[int, ...]], ) -> None: """Add a stencil coupling block between two slots.""" r, c = _stencil_block(self.n, self.grid.shape, offsets, self.grid.periodic) self.all_rows.append(row_slot * self.n + r) self.all_cols.append(col_slot * self.n + c) def add_diagonal(self, slot_idx: int) -> None: """Add identity (diagonal) block for a slot.""" idx = np.arange(slot_idx * self.n, (slot_idx + 1) * self.n) self.all_rows.append(idx) self.all_cols.append(idx) def add_rhs_couplings(self, row_slot: int, eq_idx: int) -> None: """Add coupling from all RHS terms of equation *eq_idx*.""" eq = self.spec.equations[eq_idx] for col_slot, offsets in _rhs_couplings(eq.rhs_terms, self.layout, self.ndim): self.add_block(row_slot, col_slot, offsets) def handle_constraint(self, slot_idx: int, eq_idx: int | None) -> None: """Constraint: res = RHS(y) -- no yp coupling, no cj*I diagonal. If the equation has no self-referencing terms, the IDA residual is ``res = y[field]`` (freeze at zero) → diagonal Jacobian block. """ if eq_idx is not None: eq = self.spec.equations[eq_idx] has_self = any(t.field == eq.field_name for t in eq.rhs_terms) if not has_self: # No self-terms: residual is y[field] → identity coupling self.add_diagonal(slot_idx) return self.add_rhs_couplings(slot_idx, eq_idx) def handle_velocity(self, slot_idx: int, eq_idx: int | None) -> None: """Velocity: res = yp - RHS(y) -- diagonal from cj*I + RHS coupling.""" self.add_diagonal(slot_idx) if eq_idx is not None: self.add_rhs_couplings(slot_idx, eq_idx) def handle_dynamical_field(self, slot_idx: int, field_name: str) -> None: """Dynamical field: res = yp - v (trivial kinematic).""" # yp coupling (diagonal) self.add_diagonal(slot_idx) # y coupling: identity coupling to velocity slot if field_name in self.layout.velocity_slot_map: zero = (0,) * self.ndim vel_slot = self.layout.velocity_slot_map[field_name] self.add_block(slot_idx, vel_slot, [zero]) def handle_first_order(self, slot_idx: int, eq_idx: int | None) -> None: """First-order: res = yp - RHS(y) -- diagonal + RHS.""" self.add_diagonal(slot_idx) if eq_idx is not None: self.add_rhs_couplings(slot_idx, eq_idx) def assemble(self) -> SparseMatrix: """Assemble accumulated entries into a CSC sparse matrix.""" n_total = self.layout.total_size if not self.all_rows: return csc_matrix((n_total, n_total)) rows = np.concatenate(self.all_rows) cols = np.concatenate(self.all_cols) data = np.ones(len(rows), dtype=np.float64) # Deduplicate via COO -> CSC conversion (scipy handles duplicates) pattern = coo_matrix((data, (rows, cols)), shape=(n_total, n_total)) return csc_matrix(pattern) # --------------------------------------------------------------------------- # Main entry point # ---------------------------------------------------------------------------
[docs] def build_jacobian_sparsity( spec: EquationSystem, layout: StateLayout, grid: GridInfo, _bc: BCSpec | None, ) -> SparseMatrix: """Build the analytical Jacobian sparsity pattern for IDA. Constructs the nonzero structure of ``J = dF/dy + cj * dF/dyp`` from the equation system's operator stencils and slot-coupling structure. No numerical probing -- the pattern is exact and built in O(nnz) time. Parameters ---------- spec : EquationSystem Parsed equation specification. layout : StateLayout State vector layout. grid : GridInfo Spatial grid (periodic flags used for wrapping). _bc : str or tuple of str, optional Boundary conditions string (kept for API compatibility with ``ida.py`` call site; periodicity is read from ``grid.periodic``). Returns ------- SparseMatrix Binary CSC sparsity pattern of shape ``(N_total, N_total)``. """ builder = _SparsityBuilder(spec, layout, grid) for slot_idx, slot in enumerate(layout.slots): eq_idx = builder.eq_map.get(slot.field_name) if slot.time_order == 0: builder.handle_constraint(slot_idx, eq_idx) elif slot.kind == "velocity": builder.handle_velocity(slot_idx, eq_idx) elif slot.time_order >= SECOND_ORDER and slot.kind == "field": builder.handle_dynamical_field(slot_idx, slot.field_name) elif slot.time_order == 1: builder.handle_first_order(slot_idx, eq_idx) return builder.assemble()