"""Hamiltonian energy density computation from Lagrangian-derived equations.
Computes the spatially-averaged Hamiltonian energy density ⟨ε⟩ = H / V_domain
for any quadratic Lagrangian by reconstructing it from the Euler-Lagrange
equations in the JSON spec:
⟨ε⟩ = ½ Σ_{dyn} ⟨v²⟩ (kinetic, using simulation velocities)
+ ⟨v_virial⟩ (from dynamical fields' spatial RHS terms)
+ ⟨v_constraint_self⟩ (constraint field gradient + mass, sign-flipped)
+ ⟨v_constraint_cross⟩ (cross-constraint identity coupling)
The virial potential density uses Euler's homogeneous function theorem for
degree-2 functionals: ⟨v⟩ = -½ Σ ⟨φ_i · RHS_i^{spatial}⟩.
Constraint fields (temporal gauge components) have NEGATIVE self-energy
due to the Minkowski metric g^{00} = -1. This sign flip is automatic.
All energy values are **average energy densities** (intensive quantities),
independent of the domain size. Ratios (conservation, conversion probability)
are invariant under this normalization.
"""
from __future__ import annotations
from dataclasses import dataclass
from typing import TYPE_CHECKING
import numpy as np
if TYPE_CHECKING:
from collections.abc import Callable
from numpy.typing import NDArray
from tidal.measurement._io import SimulationData
from tidal.symbolic.json_loader import (
ComponentEquation,
HamiltonianTerm,
OperatorTerm,
)
# Threshold below which energy is treated as zero. Prevents division by
# near-zero values from floating-point integration noise.
_ENERGY_FLOOR: float = 1e-12
# Axis letter → numpy axis index (spatial axes only).
_AXIS_MAP: dict[str, int] = {"x": 0, "y": 1, "z": 2}
# Gradient operator → axis index. Used by the integration-by-parts
# Hamiltonian evaluation to convert gradient-product terms into the
# equivalent second-order operators (laplacian / cross_derivative).
_GRADIENT_AXES: dict[str, int] = {
"gradient_x": 0,
"gradient_y": 1,
"gradient_z": 2,
}
[docs]
@dataclass(frozen=True)
class FieldEnergy:
"""Energy density decomposition for a single field at one snapshot.
All values are spatially-averaged energy densities ⟨ε⟩ = E / V_domain.
Attributes
----------
kinetic : float
``0.5 * ⟨π²⟩``
gradient : float
``0.5 * ⟨|∇_self φ|²⟩`` — gradient energy density over
self-laplacian axes only. For scalar fields (full ``laplacian``),
this equals ``0.5 * ⟨|∇φ|²⟩``. For vector components with
directional laplacians (e.g. ``laplacian_y``), only the relevant axes.
mass : float
``0.5 * m² * ⟨φ²⟩``
total : float
Sum of kinetic + gradient + mass.
"""
kinetic: float
gradient: float
mass: float
total: float
[docs]
@dataclass(frozen=True)
class SystemEnergy:
"""Energy density for the full coupled system at one snapshot.
All values are spatially-averaged energy densities ⟨ε⟩ = E / V_domain.
Attributes
----------
per_field : dict[str, FieldEnergy]
Energy density breakdown per field (operator-aware gradient).
interaction : float
Cross-field coupling energy density: total potential density minus
per-field self-potential densities. Uses operator-aware gradient
axes, so this is zero when fields are uncoupled and only one field
is excited.
total : float
Complete Hamiltonian density: kinetic + virial + constraint self-energy.
"""
per_field: dict[str, FieldEnergy]
interaction: float
total: float
# ------------------------------------------------------------------
# Derivative helpers — delegated to operators.py
# ------------------------------------------------------------------
#
# Energy measurement MUST use the same finite-difference stencils as the
# PDE solver (operators.py) to track the discrete conserved Hamiltonian
# exactly. Previous versions reimplemented stencils here independently;
# this caused inconsistencies when the solver FD order changed.
#
# Now all spatial operators are called through the solver module, which
# automatically inherits the current FD order from set_fd_order().
#
# Reference: the stencil-matching requirement is discussed in
# Hairer, Lubich & Wanner, "Geometric Numerical Integration", Springer,
# 2006, Chapter VI — virial energy tracks the discrete (shadow)
# Hamiltonian only when the measurement stencils match the solver's.
def _make_grid_info(
grid_spacing: tuple[float, ...],
periodic: tuple[bool, ...],
shape: tuple[int, ...],
bc_types: tuple[str, ...] | None = None,
) -> object:
"""Build a minimal ``GridInfo`` from measurement parameters.
Lazily imports ``GridInfo`` to avoid circular imports at module load.
The ``bounds`` are reconstructed from ``shape * dx`` per axis.
"""
from tidal.solver.grid import GridInfo # noqa: PLC0415
bounds = tuple(
(0.0, float(n) * dx) for n, dx in zip(shape, grid_spacing, strict=True)
)
bc: tuple[str, ...] | None = bc_types
return GridInfo(
bounds=bounds,
shape=shape,
periodic=periodic,
bc=bc,
)
def _resolve_bc_spec(
bc_types: tuple[str, ...] | None,
periodic: tuple[bool, ...],
) -> str | tuple[str, ...]:
"""Build a BC spec for operators.py from energy module parameters."""
if bc_types is not None:
return bc_types
return tuple("periodic" if p else "dirichlet" for p in periodic)
# ------------------------------------------------------------------
# Operator-aware gradient axes
# ------------------------------------------------------------------
def _self_gradient_axes(eq: ComponentEquation) -> list[int] | None: # pyright: ignore[reportUnusedFunction]
"""Return spatial axes that have self-laplacian operators in *eq*.
Inspects ``eq.rhs_terms`` for laplacian-type operators acting on the
equation's own field. Returns:
- ``None`` if the equation contains a full ``laplacian`` (all axes).
- A sorted list of axis indices for directional laplacians
(e.g. ``[1]`` for ``laplacian_y``).
This is used to compute operator-aware per-field gradient energy:
only the gradient axes that appear as self-laplacian terms contribute
to per-field energy; the remaining axes contribute to interaction.
"""
axes: set[int] = set()
for term in eq.rhs_terms:
if term.field != eq.field_name:
continue
if term.operator == "laplacian":
return None # full laplacian → use all axes
if term.operator.startswith("laplacian_"):
letter = term.operator[len("laplacian_") :]
if letter in _AXIS_MAP:
axes.add(_AXIS_MAP[letter])
return sorted(axes)
# ------------------------------------------------------------------
# Spatial operator application (for Hamiltonian evaluation)
# ------------------------------------------------------------------
def _effective_bc(
axis: int,
periodic: tuple[bool, ...],
bc_types: tuple[str, ...] | None,
) -> str:
"""Resolve effective BC type for a single axis."""
if bc_types is not None:
return bc_types[axis]
return "periodic" if periodic[axis] else "dirichlet"
def _apply_spatial_operator( # noqa: PLR0913, PLR0917
operator: str,
field: NDArray[np.float64],
grid_spacing: tuple[float, ...],
periodic: tuple[bool, ...],
bc_types: tuple[str, ...] | None = None,
grid: object | None = None,
) -> NDArray[np.float64]:
"""Apply a named spatial operator to a field array.
Delegates to ``operators.apply_operator()`` from the solver module,
ensuring the same FD stencils are used for energy measurement and
PDE integration. This is critical for virial energy tracking the
discrete (shadow) Hamiltonian exactly.
Parameters
----------
bc_types : tuple[str, ...] | None
Per-axis BC type. When ``None``, falls back to ``periodic`` booleans.
grid : GridInfo | None
Pre-built grid info. When provided, skips ``_make_grid_info()``
reconstruction (avoids redundant allocation per call).
"""
from tidal.solver.operators import apply_operator as _solver_apply # noqa: PLC0415
if operator == "identity":
return field
bc_spec = _resolve_bc_spec(bc_types, periodic)
if grid is None:
grid = _make_grid_info(grid_spacing, periodic, field.shape, bc_types)
return _solver_apply(operator, field, grid, bc_spec) # type: ignore[arg-type]
# ------------------------------------------------------------------
# Term resolution helpers
# ------------------------------------------------------------------
def _is_velocity_field(field_name: str) -> bool:
"""Check if a field name is a velocity reference (v_field_name, e.g. v_A_1)."""
return field_name.startswith("v_") and len(field_name) > len("v_")
# ------------------------------------------------------------------
# Mixed time-space operator support
# ------------------------------------------------------------------
#
# Mixed operators arise from Lagrangians containing second-order time
# derivatives (e.g. linearized Einstein-Hilbert: L^(2) ~ h * d²_t h).
# The Legendre transform doesn't eliminate these, so the Hamiltonian
# retains operators like mixed_1_0_0_1 (d_t d_z) or mixed_2_0_0_0 (d²_t).
#
# Strategy: decompose into time part (velocity/EOM) + spatial part.
def _parse_mixed_operator(operator: str) -> tuple[int, tuple[int, ...]]:
"""Parse ``mixed_T_S1_S2_...`` → ``(time_order, spatial_orders)``.
Examples
--------
>>> _parse_mixed_operator("mixed_1_0_0_1")
(1, (0, 0, 1))
>>> _parse_mixed_operator("mixed_2_0_0")
(2, (0, 0))
"""
parts = operator.split("_")[1:] # strip "mixed" prefix
return int(parts[0]), tuple(int(p) for p in parts[1:])
def _compute_acceleration_from_eom(
data: SimulationData,
field: str,
t_idx: int,
params: dict[str, float],
) -> NDArray[np.float64] | None:
"""Compute d²_t(field) by evaluating the E-L equation RHS.
For dynamical fields whose equation has the form ``d²_t f = Σ c_k op_k(g_k)``,
evaluates the RHS using stored field/velocity snapshots and spatial operators.
Returns ``None`` if no matching dynamical equation exists.
"""
for eq in data.spec.equations:
if eq.field_name != field or eq.time_derivative_order != 2: # noqa: PLR2004
continue
shape: tuple[int, ...] = next(iter(data.fields.values()))[t_idx].shape
result: NDArray[np.float64] = np.zeros(shape)
for term in eq.rhs_terms:
if term.operator == "first_derivative_t":
vel = data.velocities.get(term.field)
if vel is None:
continue
operated: NDArray[np.float64] = vel[t_idx]
else:
target = _resolve_term_target(data, term.field, t_idx)
if target is None:
continue
operated = _apply_spatial_operator(
term.operator,
target,
data.grid_spacing,
data.periodic,
bc_types=data.bc_types,
)
coeff = _resolve_term_coefficient(term, params)
result += coeff * operated
return result
return None
def _differentiate_constraint(
data: SimulationData,
field: str,
t_idx: int,
params: dict[str, float],
) -> NDArray[np.float64] | None:
"""Compute d_t of a constraint field by differentiating its equation.
For a constraint ``f = Σ c_k op_k(source_k)``, the time derivative is
``d_t(f) = Σ c_k op_k(d_t(source_k))`` where ``d_t(source)`` is the
velocity (for field sources) or acceleration from EOM (for velocity sources).
"""
for eq in data.spec.equations:
if eq.field_name != field or eq.time_derivative_order != 0:
continue
shape: tuple[int, ...] = next(iter(data.fields.values()))[t_idx].shape
result: NDArray[np.float64] = np.zeros(shape)
for term in eq.rhs_terms:
# Skip self-referential terms (e.g. trace constraint h_9 = h_4 + h_7 + h_9)
if term.field == field:
continue
if _is_velocity_field(term.field):
# Source is velocity v_X → d_t(v_X) = acceleration from EOM
base_field = term.field[2:] # strip "v_" prefix
dt_source = _compute_acceleration_from_eom(
data, base_field, t_idx, params
)
else:
# Source is field X → d_t(X) = velocity
vel = data.velocities.get(term.field)
dt_source = vel[t_idx] if vel is not None else None
if dt_source is None:
continue
operated: NDArray[np.float64] = _apply_spatial_operator(
term.operator,
dt_source,
data.grid_spacing,
data.periodic,
bc_types=data.bc_types,
)
coeff = _resolve_term_coefficient(term, params)
result += coeff * operated
return result
return None
def _differentiate_constraint_twice(
data: SimulationData,
field: str,
t_idx: int,
params: dict[str, float],
) -> NDArray[np.float64] | None:
"""Compute d²_t of a constraint field.
Handles constraints whose sources are plain fields (not velocities):
``d²_t(field_source) = acceleration`` from EOM. Constraints with
velocity sources would need the jerk (d³_t), which is not supported.
"""
for eq in data.spec.equations:
if eq.field_name != field or eq.time_derivative_order != 0:
continue
shape: tuple[int, ...] = next(iter(data.fields.values()))[t_idx].shape
result: NDArray[np.float64] = np.zeros(shape)
for term in eq.rhs_terms:
if term.field == field:
continue
if _is_velocity_field(term.field):
# d²_t(velocity) = jerk — not supported
return None
accel = _compute_acceleration_from_eom(data, term.field, t_idx, params)
if accel is None:
return None
operated: NDArray[np.float64] = _apply_spatial_operator(
term.operator,
accel,
data.grid_spacing,
data.periodic,
bc_types=data.bc_types,
)
coeff = _resolve_term_coefficient(term, params)
result += coeff * operated
return result
return None
def _resolve_time_derivative(
data: SimulationData,
field: str,
t_idx: int,
time_order: int,
params: dict[str, float],
) -> NDArray[np.float64] | None:
"""Resolve the Nth time derivative of a field from stored data + EOM.
========== ===============================================================
time_order Resolution
========== ===============================================================
0 Field value (identity)
1 Velocity (stored), or differentiated constraint equation
2 Acceleration from EOM RHS, or double-differentiated constraint
≥ 3 Not supported (returns None)
========== ===============================================================
"""
if time_order == 0:
return _resolve_term_target(data, field, t_idx)
if time_order == 1:
vel = data.velocities.get(field)
if vel is not None:
return vel[t_idx]
return _differentiate_constraint(data, field, t_idx, params)
if time_order == 2: # noqa: PLR2004
accel = _compute_acceleration_from_eom(data, field, t_idx, params)
if accel is not None:
return accel
return _differentiate_constraint_twice(data, field, t_idx, params)
# time_order >= 3: not currently supported
return None
_SPATIAL_AXIS_LETTERS: tuple[str, ...] = ("x", "y", "z")
def _evaluate_mixed_factor(
factor_field: str,
factor_operator: str,
data: SimulationData,
t_idx: int,
params: dict[str, float],
) -> NDArray[np.float64] | None:
"""Evaluate a ``mixed_T_S1_S2_...`` operator on a field.
Decomposition: apply T time derivatives (velocity/EOM), then apply
spatial derivatives sequentially.
"""
time_order, spatial_orders = _parse_mixed_operator(factor_operator)
# Time part
base = _resolve_time_derivative(data, factor_field, t_idx, time_order, params)
if base is None:
return None
# Spatial part: apply gradient operators for each nonzero spatial order.
# Handle dimension mismatch from plane-wave reduction: when the mixed
# operator has more spatial indices than grid dimensions, collapse all
# derivative orders onto the available axes (axis 0 for 1D).
n_spatial = len(data.grid_spacing)
if len(spatial_orders) > n_spatial:
total_order = sum(spatial_orders)
effective_orders: tuple[int, ...] = (total_order,) + (0,) * (n_spatial - 1)
else:
effective_orders = spatial_orders
for ax_idx, order in enumerate(effective_orders):
for _ in range(order):
ax_letter = _SPATIAL_AXIS_LETTERS[ax_idx]
base = _apply_spatial_operator(
f"gradient_{ax_letter}",
base,
data.grid_spacing,
data.periodic,
bc_types=data.bc_types,
)
return base
def _resolve_term_coefficient(
term: OperatorTerm,
parameters: dict[str, float],
) -> float:
"""Resolve a term's numeric coefficient.
Tries symbolic resolution first, falls back to numeric.
No negation — RHS coefficients are already the correct sign.
"""
if term.coefficient_symbolic is not None:
from tidal.symbolic.json_loader import ( # noqa: PLC0415
_resolve_symbolic_coeff, # pyright: ignore[reportPrivateUsage]
)
resolved = _resolve_symbolic_coeff(term.coefficient_symbolic, parameters)
if resolved is not None:
return float(resolved)
return float(term.coefficient)
def _build_coord_arrays(
data: SimulationData,
) -> dict[str, NDArray[np.float64]]:
"""Build spatial coordinate arrays from ``SimulationData`` grid info.
Constructs cell-centered coordinate meshgrids matching the field
data shape (``*grid_shape``).
"""
spatial_coords = data.spec.spatial_coordinates
axes: list[NDArray[np.float64]] = []
for i, name in enumerate(spatial_coords):
lo, hi = data.grid_bounds[i]
dx = data.grid_spacing[i]
n = round((hi - lo) / dx)
# Cell-centered: offset by dx/2
axes.append(np.linspace(lo + dx / 2, hi - dx / 2, n))
del name # used only in the dict comprehension below
grids = np.meshgrid(*axes, indexing="ij")
return {
name: np.asarray(g, dtype=np.float64)
for name, g in zip(spatial_coords, grids, strict=True)
}
def _resolve_coefficient_on_grid( # pyright: ignore[reportUnusedFunction]
term: OperatorTerm,
data: SimulationData,
coord_arrays: dict[str, NDArray[np.float64]],
) -> float | NDArray[np.float64]:
"""Resolve a term's coefficient, returning a grid array if position-dependent.
For constant coefficients, returns a scalar float (same as
``_resolve_term_coefficient``). For position-dependent coefficients,
evaluates the symbolic expression on the grid using
:func:`~tidal.symbolic._eval_utils.evaluate_coefficient`.
"""
if not term.position_dependent:
return _resolve_term_coefficient(term, data.parameters)
sym = term.coefficient_symbolic
if sym is None:
return float(term.coefficient)
from tidal.symbolic._eval_utils import evaluate_coefficient # noqa: PLC0415
return evaluate_coefficient(
sym,
data.parameters,
data.spec.effective_coordinates,
coord_arrays=coord_arrays,
t=0.0,
)
def _resolve_term_target(
data: SimulationData,
field_name: str,
t_idx: int,
) -> NDArray[np.float64] | None:
"""Resolve the field data a term acts on.
Returns
-------
NDArray or None
The field/velocity snapshot, or ``None`` if the target is a
zero-velocity constraint field (expected case).
Raises
------
ValueError
If *field_name* cannot be resolved to any known field or velocity.
"""
# Direct field reference
if field_name in data.fields:
return data.fields[field_name][t_idx]
# Velocity reference: v_field_name (e.g. "v_A_1")
if field_name.startswith("v_") and len(field_name) > len("v_"):
suffix = field_name[2:]
names = data.spec.component_names
if suffix not in names:
msg = (
f"Velocity reference '{field_name}': suffix '{suffix}' "
f"is not a known field ({names})"
)
raise ValueError(msg)
target_name = suffix
eq_idx = names.index(target_name)
# Constraint field → zero velocity (expected None)
eq = data.spec.equations[eq_idx]
if eq.time_derivative_order == 0:
return None
vel = data.velocities.get(target_name)
if vel is not None:
return vel[t_idx]
msg = (
f"Velocity reference '{field_name}' resolves to field "
f"'{target_name}', but no velocity data found"
)
raise ValueError(msg)
msg = (
f"Unresolvable field reference '{field_name}' — "
f"not a known field ({list(data.fields.keys())}) "
f"or velocity pattern (v_field_name)"
)
raise ValueError(msg)
# ------------------------------------------------------------------
# Mass resolution
# ------------------------------------------------------------------
def _resolve_mass_squared( # pyright: ignore[reportUnusedFunction]
data: SimulationData,
field_idx: int,
coord_arrays: dict[str, NDArray[np.float64]] | None = None,
) -> float | NDArray[np.float64]:
"""Get the diagonal mass matrix entry for a field.
Tries symbolic resolution first (using ``data.parameters``), then
falls back to the pre-computed numeric matrix. For position-dependent
mass terms, evaluates the symbolic expression on the grid and returns
an ndarray (negated, per mass matrix convention).
Parameters
----------
data : SimulationData
Simulation data.
field_idx : int
Field equation index.
coord_arrays : dict[str, NDArray] | None
Pre-built coordinate arrays (from ``_build_coord_arrays``).
Required if the mass term is position-dependent.
"""
from tidal.symbolic.json_loader import ( # noqa: PLC0415
_resolve_symbolic_coeff, # pyright: ignore[reportPrivateUsage]
)
# Check for position-dependent mass term → evaluate on grid
eq = data.spec.equations[field_idx]
field_name = eq.field_name
for term in eq.rhs_terms:
if (
term.operator == "identity"
and term.field == field_name
and term.position_dependent
):
sym = term.coefficient_symbolic
if sym is None:
return float(term.coefficient)
if coord_arrays is None:
coord_arrays = _build_coord_arrays(data)
from tidal.symbolic._eval_utils import evaluate_coefficient # noqa: PLC0415
coeff = evaluate_coefficient(
sym,
data.parameters,
data.spec.effective_coordinates,
coord_arrays=coord_arrays,
)
# Convention: mass_matrix[i][i] = -(coefficient of identity(field_i))
if isinstance(coeff, np.ndarray):
return -coeff
return -float(coeff)
sym_row = (
data.spec.mass_matrix_symbolic[field_idx]
if data.spec.mass_matrix_symbolic
else ()
)
if field_idx < len(sym_row):
sym_val = sym_row[field_idx]
if sym_val is not None:
resolved = _resolve_symbolic_coeff(sym_val, data.parameters)
if resolved is not None:
# Negate: symbolic matrix stores raw coefficient_symbolic,
# but convention is matrix[i][j] = -(coefficient).
return float(-resolved)
return float(data.spec.mass_matrix[field_idx][field_idx])
# ------------------------------------------------------------------
# Hamiltonian evaluation helpers
# ------------------------------------------------------------------
def _evaluate_hamiltonian_factor(
factor_field: str,
factor_operator: str,
data: SimulationData,
t_idx: int,
grid: object | None = None,
) -> NDArray[np.float64] | None:
"""Evaluate a single Hamiltonian factor on the grid.
For ``time_derivative`` operator, reads the velocity directly from
``data.velocities`` (which stores velocities v = dq/dt in the E-L form).
For ``mixed_*`` operators (time-space cross derivatives), decomposes into
time derivative resolution (velocity/EOM) + spatial operator application.
For spatial operators, applies the operator to the field data.
For ``identity``, returns the field data directly.
Returns None if the factor cannot be evaluated (e.g., constraint
field without stored velocity for time_derivative).
Parameters
----------
grid : GridInfo | None
Pre-built grid info from context (avoids per-call reconstruction).
"""
if factor_operator == "time_derivative":
vel = data.velocities.get(factor_field)
if vel is not None:
return vel[t_idx]
return None
# Mixed time-space operators (e.g. mixed_1_0_0_1 = d_t d_z)
if factor_operator.startswith("mixed_"):
params = _merge_parameters(data)
return _evaluate_mixed_factor(
factor_field, factor_operator, data, t_idx, params
)
# Get the field data
field_arr = _resolve_term_target(data, factor_field, t_idx)
if field_arr is None:
return None
if factor_operator == "identity":
return field_arr
# Apply spatial operator
return _apply_spatial_operator(
factor_operator,
field_arr,
data.grid_spacing,
data.periodic,
bc_types=data.bc_types,
grid=grid,
)
def _gradient_pair_to_second_order(op_a: str, op_b: str) -> str:
"""Map a pair of gradient operators to the equivalent 2nd-order operator.
Uses integration-by-parts identity (exact for periodic BCs):
``⟨∂_a u, ∂_b v⟩ = -⟨u, ∂²_ab v⟩`` where ``∂²_ab`` is laplacian
(same axis) or cross_derivative (different axes).
"""
ax_a = _GRADIENT_AXES[op_a]
ax_b = _GRADIENT_AXES[op_b]
if ax_a == ax_b:
return f"laplacian_{'xyz'[ax_a]}"
lo, hi = sorted([ax_a, ax_b])
return f"cross_derivative_{'xyz'[lo]}{'xyz'[hi]}"
def _gradient_product_density( # noqa: PLR0913, PLR0917
op_a: str,
field_a: NDArray[np.float64],
op_b: str,
field_b: NDArray[np.float64],
grid_spacing: tuple[float, ...],
periodic: tuple[bool, ...],
bc_types: tuple[str, ...] | None = None,
grid: object | None = None,
) -> NDArray[np.float64]:
"""Pointwise density for the gradient inner product ⟨∂_a f, ∂_b g⟩.
Returns a grid array whose spatial mean gives the gradient inner product.
Returning an array (not a scalar) allows callers to weight by
position-dependent coefficients before taking ``.mean()``.
For **periodic** axes: uses integration by parts (IBP)
``density = -f · ∂²_ab(g)``
matching the solver's 3-point laplacian stencil (exact discrete IBP).
For **non-periodic** axes: uses direct central-difference
``density = ∂_a(f) · ∂_b(g)``
because discrete IBP requires a constant integration weight and breaks
down when the energy integral includes a position-dependent volume
element (e.g. ``r²`` in spherical coordinates).
This is the **single source of truth** for gradient-product evaluation.
Both the standalone gradient x gradient Hamiltonian path and the kinetic
bilinear expansion dispatch here, guaranteeing stencil consistency for
terms that must cancel (e.g. ``½(∂_x A_0)²`` from kinetic ``- ½(∂_x A_0)²``
standalone in Proca/CS theories).
"""
ax_a = _GRADIENT_AXES[op_a]
bc_a = _effective_bc(ax_a, periodic, bc_types)
from tidal.solver.operators import get_spectral as _get_spectral # noqa: PLC0415
if bc_a == "periodic" and not _get_spectral():
# FD periodic: IBP is more accurate than two separate FD gradients
# because it uses a single laplacian stencil (fewer FD applications).
# IBP: ⟨∂_a f, ∂_b g⟩ = mean(-f · ∂²_ab g)
second_op = _gradient_pair_to_second_order(op_a, op_b)
operated = _apply_spatial_operator(
second_op,
field_b,
grid_spacing,
periodic,
bc_types=bc_types,
grid=grid,
)
return -(field_a * operated)
# Spectral or non-periodic: direct gradient product.
# For spectral operators, direct gradient*gradient is more accurate
# than IBP (laplacian) because rfft Nyquist-mode handling introduces
# O(1/N) discrepancy between mean(|∂f|²) and -mean(f·∂²f).
grad_a = _apply_spatial_operator(
op_a,
field_a,
grid_spacing,
periodic,
bc_types=bc_types,
grid=grid,
)
grad_b = _apply_spatial_operator(
op_b,
field_b,
grid_spacing,
periodic,
bc_types=bc_types,
grid=grid,
)
return grad_a * grad_b
def _merge_parameters(data: SimulationData) -> dict[str, float]:
"""Merge spec metadata parameters with runtime parameters.
Spec metadata provides defaults (from theory.toml); runtime parameters
(from ``--param`` CLI flags) override them.
"""
import contextlib # noqa: PLC0415
raw_meta = data.spec.metadata.get("parameters", {})
params: dict[str, float] = {}
if isinstance(raw_meta, dict):
for k, v in raw_meta.items(): # type: ignore[union-attr]
with contextlib.suppress(ValueError, TypeError):
params[str(k)] = float(v) # type: ignore[arg-type]
params.update(data.parameters)
return params
@dataclass
class _HamiltonianContext:
"""Pre-computed snapshot-invariant data for Hamiltonian evaluation.
Hoisting these computations out of the per-snapshot loop avoids
redundant parameter merging, coordinate array construction, volume
element evaluation, per-term coefficient resolution, and GridInfo
reconstruction.
"""
params: dict[str, float]
coord_arrays: dict[str, NDArray[np.float64]] | None
volume_weight: float | NDArray[np.float64]
term_coeffs: list[float | NDArray[np.float64]]
resolve_symbolic: Callable[..., float | None]
grid_info: object | None = None # GridInfo, typed as object to avoid import
def _prepare_hamiltonian_context(data: SimulationData) -> _HamiltonianContext:
"""Build a reusable context for repeated Hamiltonian evaluations."""
canonical = data.spec.canonical
assert canonical is not None
from tidal.symbolic.json_loader import ( # noqa: PLC0415
_resolve_symbolic_coeff, # pyright: ignore[reportPrivateUsage]
)
params = _merge_parameters(data)
coord_arrays: dict[str, NDArray[np.float64]] | None = None
# Volume element
volume_weight: float | NDArray[np.float64] = 1.0
if canonical.volume_element is not None:
coord_arrays = _build_coord_arrays(data)
from tidal.symbolic._eval_utils import evaluate_coefficient # noqa: PLC0415
volume_weight = evaluate_coefficient(
canonical.volume_element,
params,
data.spec.effective_coordinates,
coord_arrays=coord_arrays,
t=0.0,
)
# Pre-resolve all term coefficients
term_coeffs: list[float | NDArray[np.float64]] = []
for term in canonical.hamiltonian_terms:
if term.position_dependent:
if coord_arrays is None:
coord_arrays = _build_coord_arrays(data)
from tidal.symbolic._eval_utils import evaluate_coefficient # noqa: PLC0415
assert term.coefficient_symbolic is not None
coeff: float | NDArray[np.float64] = evaluate_coefficient(
term.coefficient_symbolic,
params,
data.spec.effective_coordinates,
coord_arrays=coord_arrays,
t=0.0,
)
else:
coeff = float(term.coefficient)
if term.coefficient_symbolic is not None and params:
resolved = _resolve_symbolic_coeff(
term.coefficient_symbolic,
params,
)
if resolved is not None:
coeff = float(resolved)
term_coeffs.append(coeff)
# Pre-build GridInfo once (avoids reconstruction per operator call)
grid_info = None
if data.fields:
first_field = next(iter(data.fields.values()))
field_shape = first_field[0].shape # spatial shape from first snapshot
grid_info = _make_grid_info(
data.grid_spacing, data.periodic, field_shape, data.bc_types
)
return _HamiltonianContext(
params=params,
coord_arrays=coord_arrays,
volume_weight=volume_weight,
term_coeffs=term_coeffs,
resolve_symbolic=_resolve_symbolic_coeff,
grid_info=grid_info,
)
def _compute_hamiltonian_from_canonical( # pyright: ignore[reportUnusedFunction]
data: SimulationData,
t_idx: int,
ctx: _HamiltonianContext | None = None,
) -> float:
"""Evaluate the symbolic Hamiltonian from canonical structure.
For spatial gradient terms, uses :func:`_gradient_product_density` —
the single source of truth for gradient inner products. BC-aware:
periodic → IBP, non-periodic → central-difference.
For kinetic (``time_derivative x time_derivative``) terms, reads
velocities directly from ``data.velocities`` (which stores v = dq/dt
in the E-L velocity form). No field_rates expansion needed.
This ensures the measured Hamiltonian uses the **same** finite-difference
stencils as the solver (3-point laplacian, cascaded-gradient cross
derivative), making it exactly the conserved quantity of the discrete
system. Without IBP, the central-difference gradient squared (5-point
"wide" stencil) differs from the solver's 3-point laplacian by O(dx²),
producing spurious time-varying energy drift.
Parameters
----------
data : SimulationData
t_idx : int
Snapshot index.
Returns
-------
float
Spatially-averaged Hamiltonian energy density.
Raises
------
ValueError
If ``data.spec.canonical`` is None.
"""
canonical = data.spec.canonical
if canonical is None:
msg = "_compute_hamiltonian_from_canonical called without canonical structure"
raise ValueError(msg)
# Use pre-computed context when available (timeseries path),
# otherwise compute inline (single-snapshot callers).
if ctx is None:
ctx = _prepare_hamiltonian_context(data)
volume_weight = ctx.volume_weight
total = 0.0
grid = ctx.grid_info
for term_idx, term in enumerate(canonical.hamiltonian_terms):
contrib = _evaluate_single_hamiltonian_term(
term,
ctx.term_coeffs[term_idx],
volume_weight,
data,
t_idx,
grid=grid,
)
total += contrib
return total
def _evaluate_single_hamiltonian_term( # noqa: PLR0913, PLR0917
term: HamiltonianTerm,
coeff: float | NDArray[np.float64],
volume_weight: float | NDArray[np.float64],
data: SimulationData,
t_idx: int,
grid: object | None = None,
) -> float:
"""Evaluate a single Hamiltonian term's contribution to energy density.
Returns the spatially-averaged contribution ``⟨coeff * f_a * f_b * √|g|⟩``.
Parameters
----------
grid : GridInfo | None
Pre-built grid info from context (avoids per-call reconstruction).
"""
op_a = term.factor_a.operator
op_b = term.factor_b.operator
# Gradient x gradient path: use _gradient_product_density (single source
# of truth). BC-aware: periodic→IBP, non-periodic→CD.
if op_a in _GRADIENT_AXES and op_b in _GRADIENT_AXES:
field_a = _resolve_term_target(data, term.factor_a.field, t_idx)
field_b = _resolve_term_target(data, term.factor_b.field, t_idx)
if field_a is None or field_b is None:
return 0.0
density = _gradient_product_density(
op_a,
field_a,
op_b,
field_b,
data.grid_spacing,
data.periodic,
bc_types=data.bc_types,
grid=grid,
)
return float((coeff * density * volume_weight).mean())
# Kinetic: time_derivative x time_derivative — direct velocity lookup.
if op_a == "time_derivative" and op_b == "time_derivative":
fname_a = term.factor_a.field
fname_b = term.factor_b.field
vel_a = data.velocities.get(fname_a)
vel_b = data.velocities.get(fname_b)
if vel_a is not None and vel_b is not None:
return float((coeff * vel_a[t_idx] * vel_b[t_idx] * volume_weight).mean())
return 0.0
# All other terms: identity, mixed operator x identity, etc.
fa = _evaluate_hamiltonian_factor(
term.factor_a.field,
term.factor_a.operator,
data,
t_idx,
grid=grid,
)
fb = _evaluate_hamiltonian_factor(
term.factor_b.field,
term.factor_b.operator,
data,
t_idx,
grid=grid,
)
if fa is None or fb is None:
return 0.0
return float((coeff * fa * fb * volume_weight).mean())
def _compute_hamiltonian_per_field(
data: SimulationData,
t_idx: int,
ctx: _HamiltonianContext | None = None,
fields: set[str] | None = None,
) -> tuple[dict[str, float], float]:
"""Decompose Hamiltonian into per-field self-energy and interaction.
Self-energy: terms where both factors reference the same base field.
Interaction: terms where factors reference different fields.
Parameters
----------
data : SimulationData
t_idx : int
Snapshot index.
ctx : _HamiltonianContext or None
Pre-computed context (for timeseries path).
fields : set[str] or None
If given, only evaluate Hamiltonian terms involving these base field
names. Self-energy terms are skipped if the base field is not in the
set; interaction terms are skipped if *neither* base field is in the
set. This avoids evaluating operators on irrelevant fields — critical
for Ostrogradsky theories where some fields' EOM contain unresolvable
``d2_t``/``d3_t`` operators (#196).
Returns
-------
per_field : dict[str, float]
Per-field self-energy density (base field name → energy).
interaction : float
Total interaction energy density (cross-field terms).
Raises
------
ValueError
If the spec lacks hamiltonian_terms.
"""
canonical = data.spec.canonical
if canonical is None or not canonical.hamiltonian_terms:
msg = (
"No hamiltonian_terms in JSON spec. Re-derive the theory to populate "
"the canonical Hamiltonian (required for correct energy measurement). "
"Run: uv run tidal derive <theory.toml>"
)
raise ValueError(msg)
if ctx is None:
ctx = _prepare_hamiltonian_context(data)
volume_weight = ctx.volume_weight
grid = ctx.grid_info
per_field: dict[str, float] = {}
interaction = 0.0
for term_idx, term in enumerate(canonical.hamiltonian_terms):
# --- field filter: skip terms not involving requested fields ---
if fields is not None:
if term.is_self_energy:
if term.base_field_a not in fields:
continue
elif term.base_field_a not in fields and term.base_field_b not in fields:
continue
contrib = _evaluate_single_hamiltonian_term(
term,
ctx.term_coeffs[term_idx],
volume_weight,
data,
t_idx,
grid=grid,
)
if term.is_self_energy:
base = term.base_field_a
per_field[base] = per_field.get(base, 0.0) + contrib
else:
interaction += contrib
return per_field, interaction
[docs]
def compute_system_energy(
data: SimulationData,
t_idx: int,
_ctx: _HamiltonianContext | None = None,
fields: set[str] | None = None,
) -> SystemEnergy:
"""Compute Hamiltonian energy density at snapshot *t_idx*.
Uses the Legendre-transform Hamiltonian from canonical structure,
decomposed into per-field self-energy and cross-field interaction.
This is the only physically correct energy computation — the
Hamiltonian coefficients include all metric/kinetic prefactors
from the covariant Lagrangian, making it coordinate-invariant.
Parameters
----------
data : SimulationData
t_idx : int
Snapshot index.
fields : set[str] or None
If given, only evaluate Hamiltonian terms involving these base
field names. Passed through to ``_compute_hamiltonian_per_field``.
Raises
------
ValueError
If *t_idx* is out of range, or if the spec lacks hamiltonian_terms
(re-derive with ``tidal derive`` to populate them).
"""
if t_idx < 0 or t_idx >= data.n_snapshots:
msg = f"t_idx={t_idx} out of range [0, {data.n_snapshots})"
raise ValueError(msg)
if data.spec.canonical is None or not data.spec.canonical.hamiltonian_terms:
msg = (
"No hamiltonian_terms in JSON spec. Re-derive the theory to "
"populate the canonical Hamiltonian (required for correct energy "
"measurement). Run: uv run tidal derive <theory.toml>"
)
raise ValueError(msg)
if data.spec.has_constraint_velocity_terms:
import warnings # noqa: PLC0415
warnings.warn(
"Hamiltonian contains kinetic coupling between constraint fields "
"(time_derivative_order=0). The naive Legendre transform "
"H = sum(pi_i * v_i) - L is not the correct conserved quantity "
"for theories with non-trivial constraints (Dirac-Bergmann "
"theory). Energy measurements may show systematic drift. "
"See: https://github.com/WilliamRoyce/torsion-gertsenshtein/issues/178",
stacklevel=2,
)
per_field_totals, interaction = _compute_hamiltonian_per_field(
data, t_idx, ctx=_ctx, fields=fields
)
per_field = {
name: FieldEnergy(kinetic=0.0, gradient=0.0, mass=0.0, total=total)
for name, total in per_field_totals.items()
}
total = sum(per_field_totals.values()) + interaction
return SystemEnergy(per_field=per_field, interaction=interaction, total=total)
[docs]
def compute_energy_timeseries(
data: SimulationData,
fields: set[str] | None = None,
) -> tuple[
NDArray[np.float64],
dict[str, NDArray[np.float64]],
NDArray[np.float64],
NDArray[np.float64],
]:
"""Compute energy density for every snapshot in the simulation.
Parameters
----------
data : SimulationData
fields : set[str] or None
If given, only evaluate Hamiltonian terms involving these base field
names. Passed through to ``compute_system_energy``.
Returns
-------
times : ndarray, shape ``(n_snapshots,)``
per_field : dict[str, ndarray]
Each value is shape ``(n_snapshots,)`` — energy density of that field.
interaction : ndarray, shape ``(n_snapshots,)``
total : ndarray, shape ``(n_snapshots,)``
"""
n = data.n_snapshots
# Pre-compute Hamiltonian context once (avoids re-merging parameters,
# rebuilding coord arrays, and re-resolving coefficients per snapshot).
ctx: _HamiltonianContext | None = None
if data.spec.canonical is not None:
ctx = _prepare_hamiltonian_context(data)
# Compute first snapshot to discover field names, then pre-allocate.
se0 = compute_system_energy(data, 0, _ctx=ctx, fields=fields)
field_names = list(se0.per_field.keys())
per_field_np: dict[str, NDArray[np.float64]] = {
name: np.empty(n, dtype=np.float64) for name in field_names
}
interaction = np.empty(n, dtype=np.float64)
total = np.empty(n, dtype=np.float64)
# Fill snapshot 0
for name, fe in se0.per_field.items():
per_field_np[name][0] = fe.total
interaction[0] = se0.interaction
total[0] = se0.total
# Fill remaining snapshots
for t_idx in range(1, n):
se = compute_system_energy(data, t_idx, _ctx=ctx, fields=fields)
for name, fe in se.per_field.items():
per_field_np[name][t_idx] = fe.total
interaction[t_idx] = se.interaction
total[t_idx] = se.total
return (
data.times.copy(),
per_field_np,
interaction,
total,
)
# ------------------------------------------------------------------
# Validation
# ------------------------------------------------------------------
def _validate_array(arr: NDArray[np.float64], label: str) -> None: # pyright: ignore[reportUnusedFunction]
"""Check array for non-finite values.
Raises
------
ValueError
If *arr* contains NaN or Inf.
"""
if not np.isfinite(arr).all():
msg = f"{label} contains NaN or Inf values"
raise ValueError(msg)