"""Analytical Jacobian sparsity pattern for TIDAL IDA solver.
Builds the sparsity structure of the IDA Jacobian ``J = dF/dy + cj * dF/dyp``
directly from the equation spec and operator stencils — no numerical probing.
This avoids the O(N^2) cost of ``sksundae.jacband.j_pattern()`` (which
allocates a dense N x N array) and gives exact structure in milliseconds.
The sparsity pattern is returned as a ``scipy.sparse.csc_matrix`` suitable
for passing to ``sksundae.ida.IDA(linsolver='sparse', sparsity=pattern)``,
which uses SuperLU_MT for direct factorisation.
Reference: SuperLU_MT, Li & Demmel, ACM TOMS 29(2), 2003.
"""
from __future__ import annotations
import itertools
from typing import TYPE_CHECKING
import numpy as np
from tidal.solver._defaults import SECOND_ORDER
from tidal.solver._scipy_types import SparseMatrix, coo_matrix, csc_matrix
if TYPE_CHECKING:
from tidal.solver.grid import GridInfo
from tidal.solver.operators import BCSpec
from tidal.solver.state import StateLayout
from tidal.symbolic.json_loader import EquationSystem, OperatorTerm
# ---------------------------------------------------------------------------
# Operator -> stencil offsets
# ---------------------------------------------------------------------------
# Each operator maps to a set of relative grid-coordinate offsets that
# describe which neighbor grid points contribute to the output at a given
# point. These are used to populate the Jacobian sparsity blocks.
#
# The stencil width adapts to the current FD order (from operators.py):
# Order 2: [-1, 0, +1] (3-point)
# Order 4: [-2, -1, 0, +1, +2] (5-point)
# Order 6: [-3, -2, -1, 0, +1, +2, +3] (7-point)
#
# Reference: Fornberg (1988), Mathematics of Computation 51(184), Table 1.
#
# Convention: offset tuple length = spatial_dimension (ndim).
# For operators that work per-axis, ``_axis_stencil`` builds the full
# offset tuple from the axis index.
_AXIS_MAP = {"x": 0, "y": 1, "z": 2}
def _fd_deltas() -> list[int]:
"""Return the stencil offsets for the current FD order.
- Order 2: ``[-1, 0, 1]``
- Order 4: ``[-2, -1, 0, 1, 2]``
- Order 6: ``[-3, -2, -1, 0, 1, 2, 3]``
"""
from tidal.solver.operators import get_n_ghost # noqa: PLC0415
ng = get_n_ghost()
return list(range(-ng, ng + 1))
def _axis_offsets(axis: int, ndim: int, deltas: list[int]) -> list[tuple[int, ...]]:
"""Build N-D offset tuples for *deltas* along a single *axis*."""
offsets: list[tuple[int, ...]] = []
for d in deltas:
o = [0] * ndim
o[axis] = d
offsets.append(tuple(o))
return offsets
def _cross_offsets(
ax1: int,
ax2: int,
ndim: int,
) -> list[tuple[int, ...]]:
"""Build offsets for cross-derivative ∂²f/(∂x_i ∂x_j).
For higher-order FD, the cross derivative uses the product of 1D gradient
stencils along each axis. The sparsity pattern is the Cartesian product
of the per-axis deltas (conservative superset of the actual stencil).
Plus self at origin (covers the cj diagonal contribution).
"""
deltas = _fd_deltas()
offsets_set: set[tuple[int, ...]] = set()
for d1 in deltas:
for d2 in deltas:
o = [0] * ndim
o[ax1] = d1
o[ax2] = d2
offsets_set.add(tuple(o))
# Include self (origin) for robustness
offsets_set.add((0,) * ndim)
return list(offsets_set)
def _biharmonic_offsets(ndim: int) -> list[tuple[int, ...]]:
r"""Build offsets for biharmonic ∇⁴f = ∇²(∇²f).
The biharmonic stencil is the convolution of the Laplacian stencil with
itself. The effective radius doubles relative to the Laplacian:
- Order 2: L1-radius 2 (offsets within \|Δ\| ≤ 2)
- Order 4: L1-radius 4 (offsets within \|Δ\| ≤ 4)
- Order 6: L1-radius 6 (offsets within \|Δ\| ≤ 6)
Reference: composition of Fornberg stencils — the convolved stencil
radius is the sum of the inner and outer stencil radii.
"""
from tidal.solver.operators import get_n_ghost # noqa: PLC0415
ng = get_n_ghost()
radius = 2 * ng # biharmonic = laplacian(laplacian), so radius doubles
offsets_set: set[tuple[int, ...]] = set()
for combo in itertools.product(range(-radius, radius + 1), repeat=ndim):
if sum(abs(c) for c in combo) <= radius:
offsets_set.add(combo)
return list(offsets_set)
def _gradient_or_laplacian_offsets(
name: str, ndim: int
) -> list[tuple[int, ...]] | None:
"""Try to resolve gradient_X or laplacian_X operators."""
deltas = _fd_deltas()
if name.startswith("gradient_"):
ax = _AXIS_MAP.get(name[-1])
if ax is not None:
return _axis_offsets(ax, ndim, deltas)
if name.startswith("laplacian_"):
parts = name.split("_")
ax = _AXIS_MAP.get(parts[1]) if len(parts) > 1 else None
if ax is not None:
return _axis_offsets(ax, ndim, deltas)
return None
def _full_laplacian_offsets(ndim: int) -> list[tuple[int, ...]]:
"""Full Laplacian = union of per-axis stencils."""
deltas = _fd_deltas()
zero = (0,) * ndim
offsets_set: set[tuple[int, ...]] = {zero}
for ax in range(ndim):
offsets_set.update(_axis_offsets(ax, ndim, deltas))
return list(offsets_set)
def _cross_derivative_offsets(name: str, ndim: int) -> list[tuple[int, ...]] | None:
"""Try to resolve cross_derivative_XY operators."""
if not name.startswith("cross_derivative_"):
return None
suffix = name.rsplit("_", maxsplit=1)[-1] # "xy", "xz", "yz"
ax1 = _AXIS_MAP.get(suffix[0])
ax2 = _AXIS_MAP.get(suffix[1]) if len(suffix) > 1 else None
if ax1 is not None and ax2 is not None:
return _cross_offsets(ax1, ax2, ndim)
return None
[docs]
def operator_stencil_offsets(name: str, ndim: int) -> list[tuple[int, ...]]: # noqa: PLR0911
"""Return the grid-coordinate offsets for operator *name*.
When spectral mode is active (``get_spectral() == True``), all spatial
operators return dense coupling (all possible offsets) because FFT-based
differentiation couples every grid point to every other. Identity and
first_derivative_t remain local.
Parameters
----------
name : str
Operator name (e.g. ``"laplacian"``, ``"gradient_x"``).
ndim : int
Number of spatial dimensions.
Returns
-------
list[tuple[int, ...]]
Offset tuples, each of length *ndim*.
"""
zero = (0,) * ndim
# Identity and first_derivative_t: self-coupling only (same in both modes)
if name in {"identity", "first_derivative_t"}:
return [zero]
# Spectral mode: all spatial operators are dense (all-to-all coupling).
# This makes IDA's sparse Jacobian infrastructure ineffective — the CLI
# guards against this combination (--spectral with --scheme ida).
from tidal.solver.operators import get_spectral # noqa: PLC0415
if get_spectral():
# Return "all offsets" — conservative superset. In practice this
# means the Jacobian is fully dense for spatial coupling blocks.
# For IDA this would be O(N^2) — but the CLI prevents IDA+spectral.
# For explicit solvers (leapfrog, scipy, cvode) no Jacobian is needed.
#
# We don't actually need to enumerate all N^ndim offsets here because
# this function is only called during Jacobian sparsity construction,
# which is an IDA-only path. Return a sentinel that _stencil_block
# can handle, or a small superset. Since the CLI blocks IDA+spectral,
# this path should not be reached in practice.
import warnings # noqa: PLC0415
warnings.warn(
"Spectral mode with Jacobian sparsity construction: "
"returning dense coupling pattern. This is expected only "
"for non-IDA solvers.",
stacklevel=2,
)
# Return all offsets within a generous radius to approximate dense
ng_approx = 32 # large radius to approximate full coupling
return list(itertools.product(range(-ng_approx, ng_approx + 1), repeat=ndim))
# Directional gradient or Laplacian
result = _gradient_or_laplacian_offsets(name, ndim)
if result is not None:
return result
# Full Laplacian
if name == "laplacian":
return _full_laplacian_offsets(ndim)
# Cross derivative
result = _cross_derivative_offsets(name, ndim)
if result is not None:
return result
# Biharmonic
if name == "biharmonic":
return _biharmonic_offsets(ndim)
# Unknown operator -- conservative: assume full local coupling (FD radius)
from tidal.solver.operators import get_n_ghost # noqa: PLC0415
ng = get_n_ghost()
return list(itertools.product(range(-ng, ng + 1), repeat=ndim))
# ---------------------------------------------------------------------------
# Stencil -> sparse block construction (vectorized)
# ---------------------------------------------------------------------------
def _stencil_block(
n_points: int,
grid_shape: tuple[int, ...],
offsets: list[tuple[int, ...]],
periodic: tuple[bool, ...],
) -> tuple[np.ndarray, np.ndarray]:
"""Build (row, col) index arrays for a single operator stencil block.
Parameters
----------
n_points : int
Total grid points (product of grid_shape).
grid_shape : tuple[int, ...]
Shape of the spatial grid.
offsets : list[tuple[int, ...]]
Grid-coordinate offsets from :func:`operator_stencil_offsets`.
periodic : tuple[bool, ...]
Per-axis periodic flags.
Returns
-------
tuple[np.ndarray, np.ndarray]
(rows, cols) -- flat grid indices (0-based, within a single slot).
"""
ndim = len(grid_shape)
flat = np.arange(n_points)
# Unravel to multi-index: shape (ndim, n_points)
multi = np.array(np.unravel_index(flat, grid_shape))
all_rows: list[np.ndarray] = []
all_cols: list[np.ndarray] = []
for offset in offsets:
neighbor = multi.copy()
valid = np.ones(n_points, dtype=bool)
for ax in range(ndim):
shifted = neighbor[ax] + offset[ax]
if periodic[ax]:
shifted %= grid_shape[ax]
else:
# Non-periodic: clip and mark out-of-bounds
oob = (shifted < 0) | (shifted >= grid_shape[ax])
valid &= ~oob
shifted = np.clip(shifted, 0, grid_shape[ax] - 1)
neighbor[ax] = shifted
col_flat = np.asarray(np.ravel_multi_index(tuple(neighbor), grid_shape))
if not valid.all():
all_rows.append(flat[valid])
all_cols.append(col_flat[valid])
else:
all_rows.append(flat)
all_cols.append(col_flat)
if not all_rows:
return np.array([], dtype=np.intp), np.array([], dtype=np.intp)
return np.concatenate(all_rows), np.concatenate(all_cols)
# ---------------------------------------------------------------------------
# Slot-coupling analysis
# ---------------------------------------------------------------------------
def _find_slot_for_field(layout: StateLayout, field_ref: str) -> int | None:
"""Map a field reference (e.g. ``"A_0"`` or ``"v_A_1"``) to a slot index.
The field_ref may be:
- A field name (``"A_0"``) -> field slot
- A velocity name (``"v_A_1"``) -> velocity slot
"""
# Direct match on slot name
for i, slot in enumerate(layout.slots):
if slot.name == field_ref:
return i
# Try as field name in field_slot_map
if field_ref in layout.field_slot_map:
return layout.field_slot_map[field_ref]
# Try as velocity reference: "v_X" -> velocity slot for field X
if field_ref.startswith("v_"):
base_field = field_ref[2:] # Strip "v_" prefix
if base_field in layout.velocity_slot_map:
return layout.velocity_slot_map[base_field]
# Also try finding a field whose velocity slot name matches
for slot_idx in layout.velocity_slot_map.values():
vel_slot = layout.slots[slot_idx]
if vel_slot.name == field_ref:
return slot_idx
return None
def _rhs_couplings(
terms: tuple[OperatorTerm, ...],
layout: StateLayout,
ndim: int,
) -> list[tuple[int, list[tuple[int, ...]]]]:
"""Extract (col_slot_idx, stencil_offsets) for each RHS term.
For ``first_derivative_t(field)``, the coupling target is the velocity
slot ``v_{field}`` with identity stencil.
Constraint velocity references (``v_X`` where ``X`` has time_order=0)
couple through ``yp[X_field_slot]``. In the combined Jacobian
``J = dF/dy + cj * dF/dyp``, these contribute to the column of
``X``'s field slot with the operator's stencil.
"""
couplings: list[tuple[int, list[tuple[int, ...]]]] = []
for term in terms:
if term.operator == "first_derivative_t":
# Couples to velocity of the referenced field.
# For constraint fields (time_order=0), the velocity is
# injected from yp[field_slot] — couple to the field slot.
vel_name = f"v_{term.field}"
col_slot = _find_slot_for_field(layout, vel_name)
if col_slot is not None:
couplings.append((col_slot, [((0,) * ndim)]))
else:
# Constraint field: no velocity slot, couples via yp
field_slot = layout.field_slot_map.get(term.field)
if field_slot is not None:
couplings.append((field_slot, [((0,) * ndim)]))
else:
col_slot = _find_slot_for_field(layout, term.field)
if col_slot is not None:
offsets = operator_stencil_offsets(term.operator, ndim)
couplings.append((col_slot, offsets))
elif term.field.startswith("v_"):
# Constraint velocity v_X: X has time_order=0, no
# velocity slot. Coupling goes through yp[X_field_slot].
base_field = term.field[2:]
field_slot = layout.field_slot_map.get(base_field)
if field_slot is not None:
offsets = operator_stencil_offsets(term.operator, ndim)
couplings.append((field_slot, offsets))
return couplings
# ---------------------------------------------------------------------------
# Per-slot-type sparsity helpers
# ---------------------------------------------------------------------------
class _SparsityBuilder:
"""Accumulates (row, col) pairs for the full Jacobian sparsity pattern."""
def __init__(
self,
spec: EquationSystem,
layout: StateLayout,
grid: GridInfo,
) -> None:
self.spec = spec
self.layout = layout
self.grid = grid
self.n = grid.num_points
self.ndim = grid.ndim
self.eq_map: dict[str, int] = spec.equation_map
self.all_rows: list[np.ndarray] = []
self.all_cols: list[np.ndarray] = []
def add_block(
self,
row_slot: int,
col_slot: int,
offsets: list[tuple[int, ...]],
) -> None:
"""Add a stencil coupling block between two slots."""
r, c = _stencil_block(self.n, self.grid.shape, offsets, self.grid.periodic)
self.all_rows.append(row_slot * self.n + r)
self.all_cols.append(col_slot * self.n + c)
def add_diagonal(self, slot_idx: int) -> None:
"""Add identity (diagonal) block for a slot."""
idx = np.arange(slot_idx * self.n, (slot_idx + 1) * self.n)
self.all_rows.append(idx)
self.all_cols.append(idx)
def add_rhs_couplings(self, row_slot: int, eq_idx: int) -> None:
"""Add coupling from all RHS terms of equation *eq_idx*."""
eq = self.spec.equations[eq_idx]
for col_slot, offsets in _rhs_couplings(eq.rhs_terms, self.layout, self.ndim):
self.add_block(row_slot, col_slot, offsets)
def handle_constraint(self, slot_idx: int, eq_idx: int | None) -> None:
"""Constraint: res = RHS(y) -- no yp coupling, no cj*I diagonal.
If the equation has no self-referencing terms, the IDA residual
is ``res = y[field]`` (freeze at zero) → diagonal Jacobian block.
"""
if eq_idx is not None:
eq = self.spec.equations[eq_idx]
has_self = any(t.field == eq.field_name for t in eq.rhs_terms)
if not has_self:
# No self-terms: residual is y[field] → identity coupling
self.add_diagonal(slot_idx)
return
self.add_rhs_couplings(slot_idx, eq_idx)
def handle_velocity(self, slot_idx: int, eq_idx: int | None) -> None:
"""Velocity: res = yp - RHS(y) -- diagonal from cj*I + RHS coupling."""
self.add_diagonal(slot_idx)
if eq_idx is not None:
self.add_rhs_couplings(slot_idx, eq_idx)
def handle_dynamical_field(self, slot_idx: int, field_name: str) -> None:
"""Dynamical field: res = yp - v (trivial kinematic)."""
# yp coupling (diagonal)
self.add_diagonal(slot_idx)
# y coupling: identity coupling to velocity slot
if field_name in self.layout.velocity_slot_map:
zero = (0,) * self.ndim
vel_slot = self.layout.velocity_slot_map[field_name]
self.add_block(slot_idx, vel_slot, [zero])
def handle_first_order(self, slot_idx: int, eq_idx: int | None) -> None:
"""First-order: res = yp - RHS(y) -- diagonal + RHS."""
self.add_diagonal(slot_idx)
if eq_idx is not None:
self.add_rhs_couplings(slot_idx, eq_idx)
def assemble(self) -> SparseMatrix:
"""Assemble accumulated entries into a CSC sparse matrix."""
n_total = self.layout.total_size
if not self.all_rows:
return csc_matrix((n_total, n_total))
rows = np.concatenate(self.all_rows)
cols = np.concatenate(self.all_cols)
data = np.ones(len(rows), dtype=np.float64)
# Deduplicate via COO -> CSC conversion (scipy handles duplicates)
pattern = coo_matrix((data, (rows, cols)), shape=(n_total, n_total))
return csc_matrix(pattern)
# ---------------------------------------------------------------------------
# Main entry point
# ---------------------------------------------------------------------------
[docs]
def build_jacobian_sparsity(
spec: EquationSystem,
layout: StateLayout,
grid: GridInfo,
_bc: BCSpec | None,
) -> SparseMatrix:
"""Build the analytical Jacobian sparsity pattern for IDA.
Constructs the nonzero structure of ``J = dF/dy + cj * dF/dyp`` from
the equation system's operator stencils and slot-coupling structure.
No numerical probing -- the pattern is exact and built in O(nnz) time.
Parameters
----------
spec : EquationSystem
Parsed equation specification.
layout : StateLayout
State vector layout.
grid : GridInfo
Spatial grid (periodic flags used for wrapping).
_bc : str or tuple of str, optional
Boundary conditions string (kept for API compatibility with
``ida.py`` call site; periodicity is read from ``grid.periodic``).
Returns
-------
SparseMatrix
Binary CSC sparsity pattern of shape ``(N_total, N_total)``.
"""
builder = _SparsityBuilder(spec, layout, grid)
for slot_idx, slot in enumerate(layout.slots):
eq_idx = builder.eq_map.get(slot.field_name)
if slot.time_order == 0:
builder.handle_constraint(slot_idx, eq_idx)
elif slot.kind == "velocity":
builder.handle_velocity(slot_idx, eq_idx)
elif slot.time_order >= SECOND_ORDER and slot.kind == "field":
builder.handle_dynamical_field(slot_idx, slot.field_name)
elif slot.time_order == 1:
builder.handle_first_order(slot_idx, eq_idx)
return builder.assemble()