"""Symplectic integrators for TIDAL systems (leapfrog / Yoshida).
Symplectic integrators for second-order E-L equations using velocity form:
dq/dt = v (trivial kinematic)
dv/dt = E-L RHS (from equations[] array)
Two integration orders are available via ``--leapfrog-order``:
- **Order 2** (default): Stormer-Verlet KDK scheme. Shadow Hamiltonian
error O(dt^2), one force evaluation per step (with caching).
- **Order 4**: Yoshida triple-composition S4(dt) = S2(w1*dt) o S2(w2*dt)
o S2(w3*dt). Shadow Hamiltonian error O(dt^4), three force evaluations
per step (fused-kick scheme: adjacent half-kicks merge between sub-steps).
**Why Yoshida is faster at equal accuracy**: each individual Yoshida step
is ~3x more expensive (3 vs 1 force evaluations via fused-kick caching).
However, the O(dt^4) accuracy means far fewer steps are needed to reach
a given error target. For example, to reach ~4e-6 accuracy on coupled
scalars (N=256):
- Verlet requires dt=0.005 (4000 steps, 0.21s)
- Yoshida requires dt=0.040 (500 steps, 0.07s) -- **3x faster**
The speedup comes from both the accuracy-to-cost ratio and the fused-kick
optimization (Forest & Ruth, 1990; Hairer et al., 2006 §II.4).
**CFL note**: the effective CFL limit is reduced by max(|wi|) ~ 1.70
because the middle sub-step w2 < 0 has |w2| > 1. The CLI auto-adjusts
dt by this factor when ``--leapfrog-order 4`` is specified.
**Time-dependent forces**: sub-step time is tracked correctly through
all three Yoshida sub-steps, so time-dependent coefficients (background
fields, time-varying sources) are handled correctly for general systems.
Both orders preserve a shadow Hamiltonian to machine precision, giving
zero secular energy drift for conservative systems.
**Not appropriate for**: dissipative systems, absorbing BCs, constraint
damping, or energy outflow -- use IDA instead.
References
----------
Hairer, Lubich, Wanner, "Geometric Numerical Integration",
Springer, 2006. Chapter VI: Symplectic Integration of Hamiltonian Systems.
Yoshida, H. (1990). "Construction of higher order symplectic integrators".
Physics Letters A, 150(5-7), pp. 262-268.
Forest, E. & Ruth, R.D. (1990). "Fourth-order symplectic integration".
Physica D, 43(1), pp. 105-117. (Fused-kick composition method.)
"""
from __future__ import annotations
import math
import warnings
from typing import TYPE_CHECKING
import numpy as np
from tidal.solver._setup import build_rhs_evaluator
from tidal.solver.fields import FieldSet
from tidal.solver.operators import BCSpec, apply_operator
from tidal.solver.state import StateLayout
if TYPE_CHECKING:
from collections.abc import Callable
from tidal.solver._types import SolverResult
from tidal.solver.grid import GridInfo
from tidal.solver.progress import SimulationProgress
from tidal.solver.rhs import RHSEvaluator
from tidal.symbolic.json_loader import EquationSystem
# ---------------------------------------------------------------------------
# Yoshida 4th-order coefficients
# ---------------------------------------------------------------------------
# Triple composition S₄(dt) = S₂(w₁·dt) ∘ S₂(w₂·dt) ∘ S₂(w₃·dt)
# where w₁ = w₃ = 1/(2 - 2^(1/3)), w₂ = -2^(1/3)/(2 - 2^(1/3)).
# The negative middle weight w₂ causes a backward sub-step that cancels
# the leading O(dt²) error term of the Störmer-Verlet integrator.
#
# Ref: Yoshida (1990), Physics Letters A 150(5-7), pp. 262-268.
_CBRT2 = 2.0 ** (1.0 / 3.0)
_W1 = 1.0 / (2.0 - _CBRT2) # ≈ 1.3512071919596578
_W2 = -_CBRT2 / (2.0 - _CBRT2) # ≈ -1.7024143839193155
YOSHIDA_WEIGHTS: tuple[float, float, float] = (_W1, _W2, _W1)
# Pre-computed fused-kick weights for the Forest & Ruth (1990) scheme.
# Adjacent half-kicks between sub-steps merge:
# K(w₁/2) D(w₁) K((w₁+w₂)/2) D(w₂) K((w₂+w₁)/2) D(w₁) K(w₁/2)
# Between outer steps, K(w₁/2) + K(w₁/2) = K(w₁) (since w₃ = w₁).
# This reduces 6 → 3 force evaluations per step (plus 1 initial).
_YOSHIDA_KICK_INIT = _W1 / 2.0 # initial/final half-kick weight
_YOSHIDA_KICK_FUSED = (_W1 + _W2) / 2.0 # merged kick between sub-steps 1<->2 and 2<->3
# ---------------------------------------------------------------------------
# Force / velocity kernels (shared with CVODE and scipy)
# ---------------------------------------------------------------------------
[docs]
def compute_force( # noqa: PLR0913, PLR0917
spec: EquationSystem,
layout: StateLayout,
grid: GridInfo,
bc: BCSpec | None,
y: np.ndarray,
t: float = 0.0,
rhs_eval: RHSEvaluator | None = None,
out: np.ndarray | None = None,
fieldset: FieldSet | None = None,
) -> np.ndarray:
"""Compute dv/dt = E-L RHS for all velocity slots.
Parameters
----------
out : np.ndarray, optional
Pre-allocated output buffer of length ``layout.total_size``.
If provided, filled in-place (avoids allocation). If ``None``,
a fresh array is allocated.
fieldset : FieldSet, optional
Reusable FieldSet. If provided, rebound to *y* in-place
(avoids per-call object allocation).
"""
shape = grid.shape
if fieldset is not None:
fieldset.rebind(y)
else:
fieldset = FieldSet.from_flat(layout, shape, y)
eq_map = spec.equation_map
if out is not None:
force = out
force.fill(0.0)
else:
force = np.zeros(layout.total_size)
for _slot_idx, s, field_name in layout.velocity_slot_groups:
eq_idx = eq_map.get(field_name)
if eq_idx is None:
continue
if rhs_eval is not None:
result = rhs_eval.evaluate(eq_idx, fieldset, t)
else:
# Legacy path: constant coefficients only
eq = spec.equations[eq_idx]
result = np.zeros(shape)
fields = fieldset.as_dict()
for term in eq.rhs_terms:
target_data = fields.get(term.field, np.zeros(shape))
operated = apply_operator(term.operator, target_data, grid, bc)
result += term.coefficient * operated
force[s] = result.ravel()
return force
[docs]
def compute_velocity(
layout: StateLayout,
y: np.ndarray,
out: np.ndarray | None = None,
) -> np.ndarray:
"""Read dq/dt = v directly from velocity slots in the state vector.
Parameters
----------
out : np.ndarray, optional
Pre-allocated output buffer. If provided, filled in-place.
"""
if out is not None:
velocity = out
velocity.fill(0.0)
else:
velocity = np.zeros(layout.total_size)
for _slot_idx, s, vel_slot in layout.dynamical_field_slot_groups:
velocity[s] = y[layout.slot_slice(vel_slot)]
return velocity
# ---------------------------------------------------------------------------
# KDK integration primitives
# ---------------------------------------------------------------------------
def _half_kick(
y: np.ndarray,
force: np.ndarray,
dt: float,
layout: StateLayout,
) -> None:
"""Apply half-kick: v += (dt/2) F(q), in-place."""
for _slot_idx, s, _field_name in layout.velocity_slot_groups:
y[s] += 0.5 * dt * force[s]
def _weighted_kick(
y: np.ndarray,
force: np.ndarray,
weight_dt: float,
layout: StateLayout,
) -> None:
"""Apply weighted kick: v += weight_dt * F(q), in-place.
Used by the Yoshida fused-kick scheme where adjacent half-kicks merge
into a single kick with combined weight.
"""
for _slot_idx, s, _field_name in layout.velocity_slot_groups:
y[s] += weight_dt * force[s]
# ---------------------------------------------------------------------------
# Solver entry point
# ---------------------------------------------------------------------------
[docs]
def solve_leapfrog( # noqa: C901, PLR0912, PLR0913, PLR0914, PLR0915
spec: EquationSystem,
grid: GridInfo,
y0: np.ndarray,
t_span: tuple[float, float],
dt: float,
*,
bc: BCSpec | None = None,
parameters: dict[str, float] | None = None,
snapshot_interval: float | None = None,
snapshot_callback: Callable[[float, np.ndarray], None] | None = None,
progress: SimulationProgress | None = None,
order: int = 2,
) -> SolverResult:
"""Solve a TIDAL Hamiltonian system using symplectic integration.
Works for second-order (wave) equations with optional constraint fields.
Constraint fields (time_order=0) are frozen at their initial values —
correct for gauge-fixed systems (e.g. Coulomb gauge A_0 = 0).
Parameters
----------
spec : EquationSystem
Parsed equation specification.
grid : GridInfo
Spatial grid.
y0 : np.ndarray
Initial state vector (flat: [q_0, v_0, q_1, v_1, ...]).
t_span : tuple[float, float]
(t_start, t_end).
dt : float
Fixed time step.
bc : str or tuple, optional
Boundary conditions.
parameters : dict[str, float], optional
Runtime parameter overrides for symbolic coefficients.
snapshot_interval : float, optional
Time between snapshots. If None, only initial and final states saved.
snapshot_callback : callable, optional
Called as ``callback(t, y)`` at each snapshot time.
order : int, optional
Integration order: 2 (Störmer-Verlet, default) or 4 (Yoshida).
Order 4 uses triple composition of order 2 sub-steps with weights
from Yoshida (1990), achieving O(dt^4) accuracy at 3x force cost
per step. The fused-kick scheme (Forest & Ruth, 1990) merges
adjacent half-kicks to reduce 6→3 force evaluations per step.
Returns
-------
dict
Result dictionary with keys: ``t``, ``y``, ``success``, ``message``.
Raises
------
ValueError
If the system contains first-order (diffusion/transport) equations,
or if *order* is not 2 or 4.
Warns
-----
UserWarning
If constraint fields (time_order=0) are present — they remain frozen
at initial values.
"""
if order not in {2, 4}:
msg = f"Leapfrog order must be 2 or 4, got {order}"
raise ValueError(msg)
layout = StateLayout.from_spec(spec, grid.num_points)
# Validate: leapfrog supports second-order (wave) + frozen constraints.
# First-order (diffusion/transport) equations require IDA.
constraint_fields: list[str] = []
for slot in layout.slots:
if slot.kind == "velocity":
continue
if slot.time_order == 0:
constraint_fields.append(slot.field_name)
elif slot.time_order == 1:
msg = (
f"Leapfrog does not support first-order equations. "
f"Field '{slot.field_name}' has time_order={slot.time_order}. "
f"Use --scheme ida for first-order (diffusion/transport) systems."
)
raise ValueError(msg)
if constraint_fields:
warnings.warn(
f"Leapfrog: constraint fields {constraint_fields} frozen at initial "
f"values (not evolved). Correct for gauge-fixed systems (e.g. A_0=0).",
stacklevel=2,
)
# Build RHSEvaluator if parameters provided
rhs_eval: RHSEvaluator | None = None
if parameters is not None:
rhs_eval = build_rhs_evaluator(spec, grid, parameters, bc)
# Initialize state
y = y0.copy()
t = t_span[0]
t_end = t_span[1]
# Snapshot collection — use integer-based indexing to avoid FP drift.
# Each snapshot trigger recomputes the target time from the integer
# index, preventing accumulated floating-point error in long runs.
if snapshot_interval is None:
snapshot_interval = t_end - t
snapshot_idx = 0
times: list[float] = []
snapshots: list[np.ndarray] = []
def _save(t_now: float) -> None:
times.append(t_now)
snapshots.append(y.copy())
if snapshot_callback is not None:
snapshot_callback(t_now, y)
_save(t)
# Leapfrog loop — use ceil to guarantee reaching t_end.
# int() truncates (e.g. int(153.8) = 153), causing t_final < t_end and
# missing the last snapshot. ceil() ensures we always reach or slightly
# overshoot t_end. The -1e-10 handles exact division (10.0/0.1 = 100.0)
# without adding an unnecessary extra step.
n_steps = max(1, math.ceil((t_end - t) / dt - 1e-10))
force_buf = np.zeros(layout.total_size)
fieldset_buf = FieldSet.zeros(layout, grid.shape)
drift_pairs = layout.drift_slot_pairs
if order == 2: # noqa: PLR2004
# ----- Störmer-Verlet (order 2) with KDK force caching -----
# KDK force caching: compute F(q_0) once before the loop. Between
# steps only velocity changes (half-kicks), but force depends only on
# position, so F(q_{n+1}) from step n is reused at the start of step
# n+1. This halves force evaluations (2N → N+1).
compute_force(
spec,
layout,
grid,
bc,
y,
t,
rhs_eval,
out=force_buf,
fieldset=fieldset_buf,
)
for _step in range(n_steps):
# Half-kick with cached F(q_n)
_half_kick(y, force_buf, dt, layout)
# Drift: q += dt * v (zero-copy, reads velocity directly from y)
for field_slice, vel_slice in drift_pairs:
y[field_slice] += dt * y[vel_slice]
# Force at new position F(q_{n+1}) — cached for next step
compute_force(
spec,
layout,
grid,
bc,
y,
t + dt,
rhs_eval,
out=force_buf,
fieldset=fieldset_buf,
)
# Half-kick with F(q_{n+1})
_half_kick(y, force_buf, dt, layout)
t += dt
if progress is not None:
progress.update(t)
# Snapshot check (integer-based to avoid FP accumulation)
if t >= (snapshot_idx + 1) * snapshot_interval - dt * 0.01:
_save(t)
snapshot_idx += 1
else:
# ----- Yoshida 4th-order with fused-kick caching (order 4) -----
# Standard Yoshida S₄(dt) = S₂(w₁·dt) ∘ S₂(w₂·dt) ∘ S₂(w₃·dt)
# with the Forest & Ruth (1990) fused-kick optimization: adjacent
# half-kicks between sub-steps merge into a single kick.
#
# Unfused: K(w₁/2) D(w₁) K(w₁/2) K(w₂/2) D(w₂) K(w₂/2) K(w₁/2) D(w₁) K(w₁/2)
# Fused: K(w₁/2) D(w₁) K((w₁+w₂)/2) D(w₂) K((w₂+w₁)/2) D(w₁) K(w₁/2)
#
# Between outer steps, the final K(w₁/2) of step n and initial
# K(w₁/2) of step n+1 merge into K(w₁) — same pattern as Verlet
# KDK force caching. This reduces 6N → 3N+1 force evaluations.
#
# Time tracking: t_sub tracks sub-step time for time-dependent
# coefficients. The middle sub-step w₂ < 0 steps backward in time.
#
# CFL note: effective CFL limit is dt_max / max(|wᵢ|) ≈ dt_max / 1.70.
#
# Ref: Yoshida (1990), Physics Letters A 150(5-7), pp. 262-268.
# Forest & Ruth (1990), Physica D 43, pp. 105-117.
# Hairer, Lubich, Wanner (2006), §II.4 "Composition Methods".
# Pre-compute kick weights * dt for the 3 drifts + 4 kicks per step
d1 = _W1 * dt # drift 1 and 3
d2 = _W2 * dt # drift 2 (negative: backward sub-step)
k_init = _YOSHIDA_KICK_INIT * dt # initial/final half-kick
k_fused = _YOSHIDA_KICK_FUSED * dt # merged kick between sub-steps
# Initial force + opening half-kick (cached across integration)
compute_force(
spec,
layout,
grid,
bc,
y,
t,
rhs_eval,
out=force_buf,
fieldset=fieldset_buf,
)
_weighted_kick(y, force_buf, k_init, layout)
for _step in range(n_steps):
is_last = _step == n_steps - 1
t_sub = t
# --- Sub-step 1: drift w₁, force, fused kick (w₁+w₂)/2 ---
for field_slice, vel_slice in drift_pairs:
y[field_slice] += d1 * y[vel_slice]
t_sub += d1
compute_force(
spec,
layout,
grid,
bc,
y,
t_sub,
rhs_eval,
out=force_buf,
fieldset=fieldset_buf,
)
_weighted_kick(y, force_buf, k_fused, layout)
# --- Sub-step 2: drift w₂, force, fused kick (w₂+w₁)/2 ---
for field_slice, vel_slice in drift_pairs:
y[field_slice] += d2 * y[vel_slice]
t_sub += d2
compute_force(
spec,
layout,
grid,
bc,
y,
t_sub,
rhs_eval,
out=force_buf,
fieldset=fieldset_buf,
)
_weighted_kick(y, force_buf, k_fused, layout)
# --- Sub-step 3: drift w₁, force, closing + opening kick ---
for field_slice, vel_slice in drift_pairs:
y[field_slice] += d1 * y[vel_slice]
t_sub += d1
compute_force(
spec,
layout,
grid,
bc,
y,
t_sub,
rhs_eval,
out=force_buf,
fieldset=fieldset_buf,
)
# Closing half-kick w₁/2 — state is now synchronized
_weighted_kick(y, force_buf, k_init, layout)
t += dt
if progress is not None:
progress.update(t)
# Snapshot check (integer-based to avoid FP accumulation)
if t >= (snapshot_idx + 1) * snapshot_interval - dt * 0.01:
_save(t)
snapshot_idx += 1
if not is_last:
# Opening half-kick for next step (same force, no re-eval)
_weighted_kick(y, force_buf, k_init, layout)
# Ensure final state is saved
if not times or abs(times[-1] - t) > dt * 0.01:
_save(t)
if progress is not None:
progress.finish()
label = "Störmer-Verlet" if order == 2 else "Yoshida" # noqa: PLR2004
return {
"t": np.asarray(times, dtype=np.float64),
"y": np.asarray(snapshots, dtype=np.float64),
"success": True,
"message": f"Leapfrog integration completed ({label} order {order})",
}