"""Load and validate JSON equation specifications from Mathematica/xAct export.
This module provides the data structures and parsing logic for loading
field equations that were derived symbolically from Lagrangians.
"""
from __future__ import annotations
import json
import logging
import math
import re
from dataclasses import dataclass
from dataclasses import field as dataclass_field
from functools import cached_property
from pathlib import Path
from typing import TYPE_CHECKING, Any
if TYPE_CHECKING:
from collections.abc import Mapping
from tidal.solver.operators import SideBCSpec
# --- Constants & operator validation ---
#: Canonical spatial axis letters, ordered by dimension index.
#: Supports up to 6 spatial dimensions (sufficient for all foreseeable physics).
AXIS_LETTERS: tuple[str, ...] = ("x", "y", "z", "w", "v", "u")
#: Character class matching all known axis letters (for regex construction).
_AXIS_RE_CLASS = "[" + "".join(AXIS_LETTERS) + "]"
#: Pattern matching a coordinate call like ``x[]``, ``y[]`` in symbolic expressions.
#: Used to auto-detect position-dependent coefficients in hamiltonian_terms that
#: were exported before the ``coordinate_dependent`` field was added.
_COORD_CALL_RE = re.compile(r"\b[xyzwvut]\s*\[\s*\]")
logger = logging.getLogger(__name__)
#: Set of static operators supported by the pipeline.
#: Validated at JSON load time to catch typos early.
_STATIC_OPERATORS: frozenset[str] = frozenset(
{
"identity",
"laplacian",
"laplacian_x",
"laplacian_y",
"laplacian_z",
"gradient_x",
"gradient_y",
"gradient_z",
"cross_derivative_xy",
"cross_derivative_xz",
"cross_derivative_yz",
"first_derivative_t",
"biharmonic",
}
)
#: Pattern for generic single-axis Nth-order derivatives: derivative_3_x, derivative_5_y, etc.
_GENERIC_SINGLE_AXIS_RE = re.compile(r"^derivative_(\d+)_(" + _AXIS_RE_CLASS + r")$")
#: Pattern for generic multi-axis derivatives: derivative_2x_1y, derivative_3x_2z, etc.
_GENERIC_MULTI_AXIS_RE = re.compile(
r"^derivative_(\d+" + _AXIS_RE_CLASS + r"(?:_\d+" + _AXIS_RE_CLASS + r")*)$"
)
#: Mutable set of user-registered custom operators.
#: Currently unused; reserved as an extensibility point for future
#: operator registration by downstream code.
_CUSTOM_OPERATORS: set[str] = set()
[docs]
def is_known_operator(name: str) -> bool:
"""Check whether an operator name is recognized.
Accepts static operators (identity, laplacian, gradient_x, ...),
user-registered custom operators (via ``register_operator``),
dynamic patterns for generic Nth-order derivatives
(derivative_3_x, derivative_5_y, derivative_2x_1y, ...),
mixed time-space derivative operators (mixed_T2_S2x, mixed_T_S1x, ...),
and pure higher-order time operators on RHS (d2_t, d3_t, d4_t).
"""
if name in _STATIC_OPERATORS or name in _CUSTOM_OPERATORS:
return True
if _GENERIC_SINGLE_AXIS_RE.match(name) or _GENERIC_MULTI_AXIS_RE.match(name):
return True
# Mixed time-space operators:
# Wolfram format: mixed_T[n]_S[n][axis]_... (e.g., mixed_T2_S2x)
# Numeric format: mixed_[time]_[s1]_[s2]_... (e.g., mixed_1_0_0_1)
if name.startswith("mixed_T"):
return True
if re.match(r"^mixed_\d+(_\d+)+$", name):
return True
# Pure time derivative operators on RHS: d2_t, d3_t, d4_t
return bool(re.match(r"^d\d+_t$", name))
# --- LHS structure ---
[docs]
@dataclass(frozen=True)
class LHSStructure:
"""Structure describing the left-hand side of a PDE.
Supports different PDE types:
- Elliptic (time_order=0): ∇²φ = f (Poisson, Laplace)
- Parabolic (time_order=1): ∂_t φ = ... (heat, diffusion)
- Hyperbolic (time_order=2): ∂²_t φ = ... (wave)
- Higher order: ∂^n_t φ = ...
Attributes
----------
expression : str
String representation (e.g., "d2_t(phi_0)", "d_t(phi)", "phi").
time_order : int
Order of time derivative on LHS (0, 1, 2, or higher).
space_order : int
Order of space derivative on LHS (usually 0).
"""
expression: str
time_order: int
space_order: int = 0
[docs]
@classmethod
def from_dict(cls, data: Mapping[str, Any]) -> LHSStructure:
"""Create LHSStructure from structured JSON data.
Expected format: {"expression": "...", "order": {"time": N, "space": 0}}
Parameters
----------
data : Mapping[str, Any]
The structured LHS data from JSON.
Returns
-------
LHSStructure
Parsed LHS structure.
Raises
------
ValueError
If ``order`` dict does not contain a ``'time'`` key.
"""
expression = str(data.get("expression", ""))
order = data.get("order", {})
if "time" not in order:
msg = (
"lhs.order must specify 'time' "
"(0 for constraint, 1 for parabolic, >=2 for hyperbolic)"
)
raise ValueError(msg)
time_order = int(order["time"])
space_order = int(order.get("space", 0))
return cls(
expression=expression, time_order=time_order, space_order=space_order
)
# --- RHS operator terms ---
[docs]
@dataclass(frozen=True)
class OperatorTerm:
"""A single term in the RHS of a field equation.
Represents: coefficient * operator(field)
Attributes
----------
coefficient : float
Numeric coefficient for this term.
operator : str
Name of the differential operator ("laplacian", "identity", "gradient_x", etc.)
field : str
Name of the field this operator acts on.
coefficient_symbolic : str | None
Optional symbolic name for the coefficient (e.g., "m2", "-kappa").
When present, the coefficient can be overridden at runtime by passing
a parameters dict to the PDE constructor.
time_dependent : bool
Whether the coefficient depends on time. For curved spacetime, terms
like -2H∂_t φ (Hubble friction) have time-dependent coefficients when
the conformal factor Ω(t) varies with time. Default False for flat spacetime.
coordinate_dependent : tuple[str, ...]
Coordinate names the coefficient depends on (e.g., ("x", "y") for
position-dependent coefficients on curved spatial surfaces, or ("t",)
for time-dependent). Empty tuple for constant coefficients.
"""
coefficient: float
operator: str
field: str
coefficient_symbolic: str | None = None
time_dependent: bool = False
coordinate_dependent: tuple[str, ...] = ()
@property
def position_dependent(self) -> bool:
"""Whether the coefficient depends on spatial coordinates.
Returns ``True`` when ``coordinate_dependent`` is non-empty (explicit
declaration), or when ``coefficient_symbolic`` contains a spatial
coordinate call pattern such as ``x[]`` or ``y[]`` (auto-detection for
JSON exports that predate the ``coordinate_dependent`` field).
Time-only dependence (``t[]``) returns ``False``.
"""
if self.coordinate_dependent:
return bool(set(self.coordinate_dependent) - {"t"})
if self.coefficient_symbolic is not None:
# findall returns strings like "x[]", "t[]"; exclude time coord
return any(
m[0] != "t" for m in _COORD_CALL_RE.findall(self.coefficient_symbolic)
)
return False
[docs]
@classmethod
def from_dict(cls, data: Mapping[str, Any]) -> OperatorTerm:
"""Create an OperatorTerm from a dictionary.
Raises
------
ValueError
If required keys are missing or operator is unknown.
"""
required_keys = {"coefficient", "operator", "field"}
missing = required_keys - set(data.keys())
if missing:
msg = (
f"RHS term missing required keys: {sorted(missing)}. Got: {dict(data)}"
)
raise ValueError(msg)
operator = str(data["operator"])
if not is_known_operator(operator):
msg = (
f"Unknown operator '{operator}'. "
f"Known static operators: {sorted(_STATIC_OPERATORS)}. "
f"Dynamic patterns: derivative_N_x, derivative_Nx_My (N,M=integers, x,y,z=axes)."
)
raise ValueError(msg)
# Check for nonlinear coefficient warning from Wolfram export
if data.get("warning") == "nonlinear_coefficient":
import warnings # noqa: PLC0415
warnings.warn(
f"Term '{operator}' on field '{data['field']}' has a "
f"field-dependent (nonlinear) coefficient. The linear PDE "
f"solver will treat it as a constant, producing wrong physics.",
stacklevel=2,
)
return cls(
coefficient=float(data["coefficient"]),
operator=operator,
field=str(data["field"]),
coefficient_symbolic=data.get("coefficient_symbolic"),
time_dependent=bool(data.get("time_dependent", False)),
coordinate_dependent=tuple(data.get("coordinate_dependent", ())),
)
# --- Boundary conditions ---
_VALID_BC_TYPES: frozenset[str] = frozenset(
{"periodic", "dirichlet", "neumann", "robin"}
)
[docs]
@dataclass(frozen=True)
class BoundaryCondition:
"""Boundary condition for one spatial axis.
Attributes
----------
type : str
One of "periodic", "dirichlet", "neumann", or "robin".
value : float | None
Fixed value for Dirichlet BCs, or Robin beta.
derivative : float | None
Fixed normal derivative for Neumann BCs.
gamma : float | None
Robin coefficient gamma in d_n f + gamma*f = beta.
"""
type: str
value: float | None = None
derivative: float | None = None
gamma: float | None = None
[docs]
@classmethod
def from_dict(cls, data: Mapping[str, Any]) -> BoundaryCondition:
"""Create a BoundaryCondition from a dictionary.
Raises
------
ValueError
If the BC type is not recognized.
"""
bc_type = str(data["type"])
if bc_type not in _VALID_BC_TYPES:
msg = (
f"Unknown BC type: {bc_type!r}. Valid types: {sorted(_VALID_BC_TYPES)}"
)
raise ValueError(msg)
# Warn on irrelevant fields
irrelevant_fields: dict[str, set[str]] = {
"periodic": {"value", "derivative", "gamma"},
"dirichlet": {"derivative", "gamma"},
"neumann": {"gamma"},
}
extra = {
k for k in irrelevant_fields.get(bc_type, set()) if data.get(k) is not None
}
if extra:
logger.warning(
"%s BC ignores field(s): %s", bc_type, ", ".join(sorted(extra))
)
return cls(
type=bc_type,
value=data.get("value"),
derivative=data.get("derivative"),
gamma=data.get("gamma"),
)
[docs]
def to_side_bc(self) -> SideBCSpec:
"""Convert to a ``SideBCSpec`` for the operator layer.
Raises
------
ValueError
If the BC type is "periodic" (not representable as a side BC).
"""
from tidal.solver.operators import SideBCSpec # noqa: PLC0415
if self.type == "periodic":
msg = "Cannot convert periodic BC to SideBCSpec"
raise ValueError(msg)
return SideBCSpec(
kind=self.type,
value=self.value if self.value is not None else 0.0,
derivative=self.derivative if self.derivative is not None else 0.0,
gamma=self.gamma if self.gamma is not None else 0.0,
)
# --- Constraint solver configuration ---
_VALID_CONSTRAINT_METHODS = frozenset({"auto", "fft", "matrix", "poisson"})
[docs]
@dataclass(frozen=True)
class ConstraintSolverConfig:
"""Configuration for elliptic constraint solving.
When ``enabled`` is True, the constraint equation is solved at each
timestep rather than remaining frozen at its initial value.
Solver Methods
--------------
- **"auto"** (default): Automatically selects the best solver.
Uses FFT for fully periodic grids (O(N log N)), sparse matrix
for non-periodic grids (O(N) via LU). Handles all constraint
types: Poisson, Helmholtz, algebraic, anisotropic, etc.
- **"fft"**: Force FFT solver. Requires fully periodic grid.
- **"matrix"**: Force sparse matrix solver. Works with any BCs.
- **"poisson"**: Original py-pde Poisson solver. Requires exactly
one ``laplacian(self_field)`` term with no other self-referencing
operators. Will warn if non-laplacian self-terms are present.
Backward compatible.
Coupled Constraint Parameters
----------------------------
When multiple constraints reference each other's fields, the solver
iterates using Gauss-Seidel until convergence or ``max_iterations``.
For fully periodic grids, coupled constraints are solved exactly
via Fourier-space block solve (no iteration needed).
Attributes
----------
enabled : bool
Whether to solve the constraint elliptically. Default False
preserves existing frozen-constraint behavior.
method : str
Solver method: "auto", "fft", "matrix", or "poisson".
boundary_conditions : dict[str, BoundaryCondition]
Per-axis boundary conditions (e.g., ``{"x": ..., "y": ...}``).
max_iterations : int
Maximum Gauss-Seidel iterations for coupled constraints.
Must be >= 1.
tolerance : float
Convergence threshold for coupled constraint iteration.
Iteration stops when max|field_new - field_old| < tolerance
(scaled by field magnitude for robustness). Must be > 0.
"""
enabled: bool = False
method: str = "auto"
boundary_conditions: dict[str, BoundaryCondition] = dataclass_field(
default_factory=lambda: {} # noqa: PIE807 # type: dict[str, BoundaryCondition]
)
max_iterations: int = 20
tolerance: float = 1e-8
[docs]
@classmethod
def from_dict(cls, data: Mapping[str, Any] | None) -> ConstraintSolverConfig:
"""Create from a dictionary or return default (disabled).
Parameters
----------
data : Mapping[str, Any] | None
Parsed ``constraint_solver`` block from JSON, or None.
Returns
-------
ConstraintSolverConfig
Configuration instance.
Raises
------
ValueError
If ``method`` is not one of the recognized solver methods.
"""
if data is None:
return cls()
enabled = bool(data.get("enabled", False))
method = str(data.get("method", "auto"))
if method not in _VALID_CONSTRAINT_METHODS:
msg = (
f"Unknown constraint solver method '{method}'. "
f"Must be one of: {', '.join(sorted(_VALID_CONSTRAINT_METHODS))}."
)
raise ValueError(msg)
bc_data = data.get("boundary_conditions", {})
boundary_conditions = {
axis: BoundaryCondition.from_dict(bc_dict)
for axis, bc_dict in bc_data.items()
}
max_iterations = int(data.get("max_iterations", 20))
if max_iterations < 1:
msg = f"max_iterations must be >= 1, got {max_iterations}"
raise ValueError(msg)
tolerance = float(data.get("tolerance", 1e-8))
if tolerance <= 0:
msg = f"tolerance must be > 0, got {tolerance}"
raise ValueError(msg)
return cls(
enabled=enabled,
method=method,
boundary_conditions=boundary_conditions,
max_iterations=max_iterations,
tolerance=tolerance,
)
# === Phase K: Canonical Momentum Structures ===
[docs]
@dataclass(frozen=True)
class HamiltonianFactor:
"""One factor in a quadratic Hamiltonian term.
Represents a field (or its time/spatial derivative) that appears as a
multiplicative factor in a term of the component-form Hamiltonian density.
Attributes
----------
field : str
Component field name (e.g., "A_1", "phi_0").
operator : str
Differential operator: "identity" (bare field), "time_derivative"
(∂_t field, maps to canonical momentum at evaluation), or a spatial
operator ("gradient_x", "laplacian", etc.).
"""
field: str
operator: str
[docs]
@classmethod
def from_dict(cls, data: Mapping[str, Any]) -> HamiltonianFactor:
"""Parse from JSON dict."""
return cls(field=str(data["field"]), operator=str(data["operator"]))
def _base_field(field_name: str) -> str:
"""Strip velocity prefix ``v_`` to get the base field name.
E-L velocity form uses ``v_{field}`` for velocities. This extracts the
underlying field name: ``"v_phi_0"`` → ``"phi_0"``, ``"phi_0"`` → ``"phi_0"``.
"""
if field_name.startswith("v_"):
return field_name[2:]
return field_name
[docs]
@dataclass(frozen=True)
class HamiltonianTerm:
"""A single quadratic term in the Hamiltonian density.
H = Σ coefficient * factor_a * factor_b
Attributes
----------
coefficient : float
Numeric coefficient.
factor_a : HamiltonianFactor
First field factor.
factor_b : HamiltonianFactor
Second field factor (may equal factor_a for squared terms).
coefficient_symbolic : str | None
Symbolic coefficient expression (for parameter override).
coordinate_dependent : tuple[str, ...]
Spatial axes the coefficient depends on (e.g. ``("x", "y")``).
When non-empty, the coefficient must be evaluated on the grid.
Older JSON exports omit this field; auto-detection via
``position_dependent`` covers those cases.
term_class : str
Classification: ``"self"`` (both factors reference the same base
field) or ``"interaction"`` (cross-field coupling). Defaults to
``"unknown"`` for older JSONs; use ``is_self_energy`` property
which auto-classifies by comparing factor field names.
"""
coefficient: float
factor_a: HamiltonianFactor
factor_b: HamiltonianFactor
coefficient_symbolic: str | None = None
coordinate_dependent: tuple[str, ...] = ()
term_class: str = "unknown" # "self" or "interaction"
@property
def position_dependent(self) -> bool:
"""True if the coefficient is a function of spatial coordinates.
Returns ``True`` when ``coordinate_dependent`` is non-empty (explicit
declaration), or when ``coefficient_symbolic`` contains a coordinate
call pattern such as ``x[]`` or ``y[]`` (auto-detection for JSON
exports that predate the ``coordinate_dependent`` field).
"""
if self.coordinate_dependent:
return True
if self.coefficient_symbolic is not None:
return bool(_COORD_CALL_RE.search(self.coefficient_symbolic))
return False
@property
def is_self_energy(self) -> bool:
"""True if both factors reference the same base field (self-energy).
Uses ``term_class`` when available (from Wolfram export), otherwise
classifies by comparing base field names (stripping ``v_`` prefix).
"""
if self.term_class != "unknown":
return self.term_class == "self"
return _base_field(self.factor_a.field) == _base_field(self.factor_b.field)
@property
def base_field_a(self) -> str:
"""Base field name for factor_a (strips ``v_`` velocity prefix)."""
return _base_field(self.factor_a.field)
@property
def base_field_b(self) -> str:
"""Base field name for factor_b (strips ``v_`` velocity prefix)."""
return _base_field(self.factor_b.field)
[docs]
@classmethod
def from_dict(cls, data: Mapping[str, Any]) -> HamiltonianTerm:
"""Parse from JSON dict."""
return cls(
coefficient=float(data["coefficient"]),
factor_a=HamiltonianFactor.from_dict(data["factor_a"]),
factor_b=HamiltonianFactor.from_dict(data["factor_b"]),
coefficient_symbolic=data.get("coefficient_symbolic"),
coordinate_dependent=tuple(data.get("coordinate_dependent", [])),
term_class=str(data.get("term_class", "unknown")),
)
[docs]
@dataclass(frozen=True)
class CanonicalStructure:
"""Canonical structure: Hamiltonian terms for energy measurement.
The Hamiltonian's bilinear terms are used by energy measurement to
compute H(q, v). The E-L equations are stored in the ``equations``
array directly.
Attributes
----------
hamiltonian_terms : tuple[HamiltonianTerm, ...]
Quadratic terms in the component-form Hamiltonian density.
Used by energy measurement to compute H(q, v).
volume_element : str or None
Symbolic expression for ``sqrt|det(g_spatial)|``, the spatial
volume element. ``None`` for flat (Minkowski) spacetimes where
the volume element is 1. Used by energy measurement to weight
the Hamiltonian density before spatial integration.
"""
hamiltonian_terms: tuple[HamiltonianTerm, ...]
volume_element: str | None = None
[docs]
@classmethod
def from_dict(cls, data: Mapping[str, Any]) -> CanonicalStructure:
"""Parse from JSON ``canonical`` section."""
h_terms = tuple(HamiltonianTerm.from_dict(t) for t in data["hamiltonian_terms"])
vol_elem = data.get("volume_element") # None for flat spacetimes
return cls(
hamiltonian_terms=h_terms,
volume_element=vol_elem,
)
# --- Component equations ---
[docs]
@dataclass(frozen=True)
class ComponentEquation:
"""Equation of motion for a single field component.
For a wave-type equation:
d^2/dt^2 field = sum of OperatorTerms
Attributes
----------
field_name : str
Name of the field component (e.g., "A_0", "phi").
field_index : int
Index of this component in the field array.
time_derivative_order : int
Order of the time derivative on the LHS (2 for wave equations).
rhs_terms : tuple[OperatorTerm, ...]
Terms on the RHS of the equation.
constraint_solver : ConstraintSolverConfig
Configuration for elliptic constraint solving. Only meaningful
when ``time_derivative_order == 0``.
"""
field_name: str
field_index: int
time_derivative_order: int
rhs_terms: tuple[OperatorTerm, ...]
constraint_solver: ConstraintSolverConfig = dataclass_field(
default_factory=ConstraintSolverConfig
)
def __post_init__(self) -> None:
"""Validate constraint_solver is only enabled for time_order=0.
Raises
------
ValueError
If constraint_solver.enabled is True but time_derivative_order is not 0.
"""
if self.constraint_solver.enabled and self.time_derivative_order != 0:
msg = (
f"constraint_solver.enabled=true is only valid for time_order=0 "
f"(constraint equations), but {self.field_name} has "
f"time_order={self.time_derivative_order}"
)
raise ValueError(msg)
[docs]
@classmethod
def from_dict(
cls, data: Mapping[str, Any], fields_lookup: dict[str, int]
) -> ComponentEquation:
"""Create a ComponentEquation from a dictionary.
Raises
------
ValueError
If the RHS type is not "linear_combination", or if
constraint_solver is enabled for a non-constraint equation.
"""
field_name = str(data["field"])
# Parse LHS to determine time derivative order
# Requires structured format: {"expression": "...", "order": {"time": N, "space": 0}}
lhs_data = data["lhs"] # Required field
lhs_structure = LHSStructure.from_dict(lhs_data)
time_derivative_order = lhs_structure.time_order
# Parse RHS terms
rhs_data = data["rhs"]
if rhs_data["type"] != "linear_combination":
msg = f"Unsupported RHS type: {rhs_data['type']}"
raise ValueError(msg)
rhs_terms = tuple(
OperatorTerm.from_dict(term_data) for term_data in rhs_data["terms"]
)
# Validate field_name exists in fields_lookup - no silent fallback to 0
if field_name not in fields_lookup:
valid_fields = list(fields_lookup.keys())
msg = (
f"Unknown field '{field_name}' in equation. "
f"Valid fields are: {valid_fields}"
)
raise ValueError(msg)
# Parse constraint solver config
constraint_solver = ConstraintSolverConfig.from_dict(
data.get("constraint_solver")
)
return cls(
field_name=field_name,
field_index=fields_lookup[field_name],
time_derivative_order=time_derivative_order,
rhs_terms=rhs_terms,
constraint_solver=constraint_solver,
)
# --- Symbolic coefficient resolution ---
def _resolve_symbolic_coeff(sym: str, parameters: Mapping[str, float]) -> float | None:
"""Resolve a symbolic coefficient string with parameter values.
Handles simple names (``"m2"``), negated names (``"-m2"``), and
compound expressions (``"-2*m2"``). Returns *None* if the expression
cannot be evaluated with the given parameters, so the caller can fall
back to the raw numeric coefficient.
"""
# Fast path: negated parameter name
if sym.startswith("-") and sym[1:] in parameters:
return -parameters[sym[1:]]
# Fast path: direct parameter name
if sym in parameters:
return parameters[sym]
# Compound expression: e.g., "-2*m2", "3*lambda"
try:
py_expr = sym.replace("^", "**")
result = eval(py_expr, {"__builtins__": {}}, dict(parameters)) # noqa: S307
value = float(result)
except (NameError, SyntaxError, TypeError, ValueError, ZeroDivisionError):
logger.debug(
"Could not resolve symbolic coefficient '%s' with parameters %s; "
"matrix entry will use raw numeric coefficient",
sym,
sorted(parameters.keys()),
)
return None
# Mass/coupling entries must be finite real numbers
if not math.isfinite(value):
return None
return value
# --- Equation system ---
[docs]
@dataclass(frozen=True)
class EquationSystem:
"""Complete system of field equations derived from a Lagrangian.
Attributes
----------
n_components : int
Number of field components.
dimension : int
Spacetime dimension (e.g., 2 for 1+1D).
spatial_dimension : int
Number of spatial dimensions (dimension - 1).
component_names : tuple[str, ...]
Names of field components in order.
equations : tuple[ComponentEquation, ...]
Equations for each component.
mass_matrix : tuple[tuple[float, ...], ...]
Mass matrix M^2_ij for coupled systems.
coupling_matrix : tuple[tuple[float, ...], ...]
Coupling matrix for field interactions.
metadata : dict[str, Any]
Additional metadata (source, gauge, etc.)
coordinates : tuple[str, ...]
Coordinate names from JSON spacetime.coordinates (e.g., ("t", "x", "y")).
Defaults to empty tuple; use ``effective_coordinates`` for a guaranteed
non-empty result that infers names from dimension when not set.
canonical : CanonicalStructure | None
Canonical momentum and Hamiltonian structure from Legendre transform.
Present when the JSON spec includes a ``"canonical"`` section (generated
by ``tidal derive`` for non-linearization theories). None for legacy
specs or linearization theories.
"""
n_components: int
dimension: int
spatial_dimension: int
component_names: tuple[str, ...]
equations: tuple[ComponentEquation, ...]
mass_matrix: tuple[tuple[float, ...], ...]
coupling_matrix: tuple[tuple[float, ...], ...]
metadata: dict[str, Any]
coordinates: tuple[str, ...] = ()
mass_matrix_symbolic: tuple[tuple[str | None, ...], ...] = ()
coupling_matrix_symbolic: tuple[tuple[str | None, ...], ...] = ()
canonical: CanonicalStructure | None = None
def __post_init__(self) -> None:
"""Validate the equation system.
Raises
------
ValueError
If validation checks fail (invalid n_components, mismatched lengths,
or invalid matrix dimensions).
"""
if self.n_components < 1:
msg = "n_components must be at least 1"
raise ValueError(msg)
if len(self.component_names) != self.n_components:
msg = f"component_names length {len(self.component_names)} != n_components {self.n_components}"
raise ValueError(msg)
if len(self.equations) != self.n_components:
msg = f"equations length {len(self.equations)} != n_components {self.n_components}"
raise ValueError(msg)
self._validate_matrix_dimensions()
# Validate field references in equation terms
self._validate_field_references()
# Verify mass/coupling matrix consistency with identity operator terms.
# Warns (does not error) if manually constructed EquationSystem has
# matrices that don't match the convention: matrix[i][j] = -(identity coeff).
raw_params = self.metadata.get("parameters", {})
check_params: dict[str, float] = {
k: float(v) for k, v in raw_params.items() if isinstance(v, (int, float))
}
expected_mass, expected_coupling, _, _ = self._compute_matrices_from_terms(
self.equations, self.component_names, parameters=check_params or None
)
if (
self.mass_matrix != expected_mass
or self.coupling_matrix != expected_coupling
):
import warnings # noqa: PLC0415
warnings.warn(
"mass_matrix/coupling_matrix inconsistent with identity operator terms. "
f"Expected mass={expected_mass}, coupling={expected_coupling}; "
f"got mass={self.mass_matrix}, coupling={self.coupling_matrix}. "
"Use EquationSystem.from_dict() for auto-computation.",
stacklevel=2,
)
def _validate_matrix_dimensions(self) -> None:
"""Validate mass and coupling matrix dimensions match n_components.
Raises
------
ValueError
If any matrix has wrong number of rows or columns.
"""
for name, matrix in [
("mass_matrix", self.mass_matrix),
("coupling_matrix", self.coupling_matrix),
]:
if len(matrix) != self.n_components:
msg = f"{name} rows {len(matrix)} != n_components {self.n_components}"
raise ValueError(msg)
for i, row in enumerate(matrix):
if len(row) != self.n_components:
msg = f"{name} row {i} length {len(row)} != n_components {self.n_components}"
raise ValueError(msg)
def _validate_field_references(self) -> None:
"""Validate that all field references in equation terms are valid.
Field references can be:
- Regular field names (e.g., ``"A_0"``, ``"A_1"``, ``"phi"``)
- Velocity names: ``"v_field_name"`` (e.g., ``"v_A_1"``)
Raises
------
ValueError
If a field reference is invalid.
"""
valid_fields = set(self.component_names)
# Build valid velocity references: v_field_name
valid_velocities = {f"v_{name}" for name in self.component_names}
for eq in self.equations:
for term in eq.rhs_terms:
field_ref = term.field
# Check regular field names first (a field named "v_0" is
# a valid field, not a velocity reference)
if field_ref in valid_fields:
continue
if field_ref in valid_velocities:
continue
if field_ref.startswith("v_"):
msg = (
f"Invalid velocity reference '{field_ref}' "
f"in equation for {eq.field_name}. "
f"Valid: {sorted(valid_velocities)}."
)
raise ValueError(msg)
msg = (
f"Unknown field reference '{field_ref}' "
f"in equation for {eq.field_name}. "
f"Valid fields: {sorted(valid_fields)}."
)
raise ValueError(msg)
@staticmethod
def _compute_matrices_from_terms(
equations: tuple[ComponentEquation, ...],
component_names: tuple[str, ...],
parameters: Mapping[str, float] | None = None,
) -> tuple[
tuple[tuple[float, ...], ...],
tuple[tuple[float, ...], ...],
tuple[tuple[str | None, ...], ...],
tuple[tuple[str | None, ...], ...],
]:
"""Extract mass and coupling matrices from identity operator terms.
Scans each equation's RHS terms for ``identity`` operators acting on
known field names (not velocity references like ``v_N``).
Convention: ``matrix[i][j] = -(coefficient)`` where ``coefficient``
is the numeric coefficient of the ``identity(field_j)`` term in
equation *i*. This makes mass² positive for the standard Lagrangian
sign convention ``∂²_t φ = … - m² φ``.
When *parameters* are provided and a term has ``coefficient_symbolic``,
the symbolic expression is resolved with the parameter values to
produce the correct numeric matrix entry. Without parameters, the
raw numeric coefficient (a shape-factor like ±1.0) is used.
Symbolic expressions are always preserved as-is from the term (not
evaluated) so that they can be resolved at runtime with actual
parameter values.
Parameters
----------
equations : tuple[ComponentEquation, ...]
Parsed component equations.
component_names : tuple[str, ...]
Names of all field components.
parameters : Mapping[str, float] | None
Default parameter values for resolving symbolic coefficients.
Returns
-------
tuple
``(mass_matrix, coupling_matrix, mass_symbolic, coupling_symbolic)``
Numeric matrices as nested float tuples; symbolic matrices as
nested ``str | None`` tuples (empty tuple if no symbolic entries).
"""
n = len(component_names)
mass: list[list[float]] = [[0.0] * n for _ in range(n)]
coupling: list[list[float]] = [[0.0] * n for _ in range(n)]
mass_sym: list[list[str | None]] = [[None] * n for _ in range(n)]
coupling_sym: list[list[str | None]] = [[None] * n for _ in range(n)]
name_to_idx = {name: i for i, name in enumerate(component_names)}
has_symbolic = False
for i, eq in enumerate(equations):
for term in eq.rhs_terms:
if term.operator == "identity" and term.field in name_to_idx:
j = name_to_idx[term.field]
# Prefer symbolic + params for numeric value; fall back
# to the raw numeric coefficient (shape factor).
effective_coeff = term.coefficient
if term.coefficient_symbolic is not None and parameters:
resolved = _resolve_symbolic_coeff(
term.coefficient_symbolic, parameters
)
if resolved is not None:
effective_coeff = resolved
neg_coeff = -effective_coeff
if i == j:
mass[i][j] += neg_coeff
if term.coefficient_symbolic is not None:
mass_sym[i][j] = term.coefficient_symbolic
has_symbolic = True
else:
coupling[i][j] += neg_coeff
if term.coefficient_symbolic is not None:
coupling_sym[i][j] = term.coefficient_symbolic
has_symbolic = True
mass_sym_t: tuple[tuple[str | None, ...], ...] = ()
coupling_sym_t: tuple[tuple[str | None, ...], ...] = ()
if has_symbolic:
mass_sym_t = tuple(tuple(row) for row in mass_sym)
coupling_sym_t = tuple(tuple(row) for row in coupling_sym)
return (
tuple(tuple(row) for row in mass),
tuple(tuple(row) for row in coupling),
mass_sym_t,
coupling_sym_t,
)
@property
def time_orders(self) -> tuple[int, ...]:
"""Per-component time derivative orders."""
return tuple(eq.time_derivative_order for eq in self.equations)
@property
def state_size(self) -> int:
"""Total number of state fields.
Second-order components contribute 2 slots (field + momentum).
First-order and constraint components contribute 1 slot (field only).
"""
return sum(2 if t >= 2 else 1 for t in self.time_orders) # noqa: PLR2004
@property
def state_layout(self) -> tuple[tuple[str, str], ...]:
"""State vector layout as (field_name, slot_type) tuples.
slot_type is "field" or "momentum". Second-order components produce
two entries (field, momentum); first-order/constraint produce one (field).
"""
layout: list[tuple[str, str]] = []
for eq in self.equations:
layout.append((eq.field_name, "field"))
if eq.time_derivative_order >= 2: # noqa: PLR2004
layout.append((eq.field_name, "momentum"))
return tuple(layout)
@property
def effective_coordinates(self) -> tuple[str, ...]:
"""Coordinate names, inferred from dimension if not set explicitly."""
if self.coordinates:
return self.coordinates
return ("t", *("x", "y", "z")[: self.spatial_dimension])
@property
def spatial_coordinates(self) -> tuple[str, ...]:
"""Spatial coordinate names (all except first, which is time)."""
return self.effective_coordinates[1:]
@cached_property
def has_constraint_velocity_terms(self) -> bool:
"""True if the Hamiltonian has kinetic coupling between constraint fields.
Detects ``time_derivative(C_i) x time_derivative(C_j)`` terms where
both C_i and C_j are constraint fields (time_derivative_order == 0).
These indicate the naive Hamiltonian H = sum(pi * v) - L is not the
correct conserved quantity for this theory (Dirac-Bergmann theory).
See GitHub issue #178 for details.
"""
if self.canonical is None:
return False
constraint_fields = frozenset(
eq.field_name for eq in self.equations if eq.time_derivative_order == 0
)
if not constraint_fields:
return False
for term in self.canonical.hamiltonian_terms:
a, b = term.factor_a, term.factor_b
# Kinetic constraint term: time_derivative(C_i) x time_derivative(C_j)
# where both fields are constraints. These arise from π_c · v_c
# in the Legendre transform (spurious). Cross-terms like
# time_derivative(C) x identity(D) come from -L and are valid.
if (
a.operator == "time_derivative"
and b.operator == "time_derivative"
and a.field in constraint_fields
and b.field in constraint_fields
):
return True
return False
@cached_property
def equation_map(self) -> dict[str, int]:
"""Map from field name to equation index. Cached on frozen dataclass."""
return {eq.field_name: i for i, eq in enumerate(self.equations)}
[docs]
@classmethod
def from_dict(cls, data: Mapping[str, Any]) -> EquationSystem: # noqa: PLR0914, PLR0912, C901
"""Create an EquationSystem from a dictionary (parsed JSON).
Raises
------
ValueError
If the JSON data is invalid or component references are inconsistent.
TypeError
If a metadata parameter has an unsupported type.
"""
# Extract spacetime info
spacetime = data["spacetime"]
dimension = int(spacetime["dimension"])
spatial_dimension = dimension - 1 # Assuming 1 time dimension
# Extract fields
fields_data = data["fields"]
component_names = tuple(f["name"] for f in fields_data)
n_components = len(component_names)
# Validate field name uniqueness
if len(component_names) != len(set(component_names)):
seen: set[str] = set()
duplicates: list[str] = []
for name in component_names:
if name in seen:
duplicates.append(name)
seen.add(name)
msg = f"Duplicate field names: {duplicates}"
raise ValueError(msg)
# Build field name -> index lookup
fields_lookup = {f["name"]: f["index"] for f in fields_data}
# Extract tensor metadata (optional, from enriched JSON export)
tensor_metadata: dict[str, dict[str, Any]] = {}
for f in fields_data:
if "tensor_head" in f:
tensor_metadata[f["name"]] = {
"tensor_head": f["tensor_head"],
"tensor_rank": f.get("tensor_rank", 0),
"tensor_indices": f.get("tensor_indices", []),
}
# Parse equations
equations = tuple(
ComponentEquation.from_dict(eq_data, fields_lookup)
for eq_data in data["equations"]
)
# Extract metadata (needed early for parameter-aware matrix computation)
metadata = dict(data.get("metadata", {}))
# Inject tensor metadata into metadata dict for LaTeX rendering
if tensor_metadata:
metadata["tensor_metadata"] = tensor_metadata
# Extract default parameters from metadata (if available) so that
# numeric matrices reflect actual parameter values, not just ±1.0
# shape factors from the Wolfram pipeline.
raw_params = metadata.get("parameters", {})
default_params: dict[str, float] = {}
for k, v in raw_params.items():
if isinstance(v, (int, float)):
default_params[k] = float(v)
elif isinstance(v, str):
try:
default_params[k] = float(v)
except ValueError:
msg = f"Parameter '{k}': cannot convert '{v}' to float"
raise ValueError(msg) from None
else:
msg = f"Parameter '{k}': unsupported type {type(v).__name__}, expected number or numeric string"
raise TypeError(msg)
# Auto-compute mass/coupling matrices from identity operator terms.
# This is the authoritative source — JSON values are ignored in favour
# of values derived from the equation terms themselves.
(
mass_matrix,
coupling_matrix,
mass_matrix_symbolic,
coupling_matrix_symbolic,
) = cls._compute_matrices_from_terms(
equations, component_names, parameters=default_params or None
)
# Extract coordinate names
coordinates = tuple(str(c) for c in spacetime.get("coordinates", []))
# Parse canonical structure (Phase K) — optional for backward compat
canonical_data = data.get("canonical")
canonical: CanonicalStructure | None = None
if canonical_data is not None:
canonical = CanonicalStructure.from_dict(canonical_data)
spec = cls(
n_components=n_components,
dimension=dimension,
spatial_dimension=spatial_dimension,
component_names=component_names,
equations=equations,
mass_matrix=mass_matrix,
coupling_matrix=coupling_matrix,
metadata=metadata,
coordinates=coordinates,
mass_matrix_symbolic=mass_matrix_symbolic,
coupling_matrix_symbolic=coupling_matrix_symbolic,
canonical=canonical,
)
# Ostrogradsky reduction: convert 4th-order-in-time equations to
# 2nd-order via auxiliary fields. Ref: Ostrogradsky (1850),
# Woodard (2015, arXiv:1506.02210). Applied in-memory only.
if any(eq.time_derivative_order > 2 for eq in spec.equations): # noqa: PLR2004
from tidal.symbolic.ostrogradsky import ( # noqa: PLC0415
apply_ostrogradsky_reduction,
)
logger.info(
"Applying Ostrogradsky reduction for higher-derivative equations"
)
spec = apply_ostrogradsky_reduction(spec)
logger.info("Ostrogradsky: %d fields after reduction", spec.n_components)
return spec
# --- JSON schema validation ---
def _validate_spacetime(spacetime: dict[str, Any]) -> None:
"""Validate spacetime configuration.
Raises
------
TypeError
If spacetime is not a dictionary or spacetime.dimension is not an integer.
ValueError
If spacetime.dimension is missing.
"""
if "dimension" not in spacetime:
msg = "spacetime.dimension is required"
raise ValueError(msg)
if not isinstance(spacetime["dimension"], int):
msg = "spacetime.dimension must be an integer"
raise TypeError(msg)
# Validate signature contains only ±1 and matches dimension
if "signature" in spacetime:
sig: list[int] = spacetime["signature"]
if not isinstance(sig, list) or not all(s in {-1, 1} for s in sig): # type: ignore[reportUnnecessaryIsInstance]
msg = "spacetime.signature must be a list of +1 or -1 values"
raise ValueError(msg)
if len(sig) != spacetime["dimension"]:
msg = (
f"spacetime.signature length ({len(sig)}) "
f"must match dimension ({spacetime['dimension']})"
)
raise ValueError(msg)
def _validate_fields(fields: list[Any]) -> None:
"""Validate fields list.
Raises
------
ValueError
If the fields list is empty or a field is missing the 'name' key.
"""
if len(fields) == 0:
msg = "fields must be non-empty"
raise ValueError(msg)
for i, field in enumerate(fields):
if not isinstance(field, dict) or "name" not in field:
msg = f"fields[{i}] must be a dict with 'name' key"
raise ValueError(msg)
# Validate field indices are unique (when present)
indices = [f["index"] for f in fields if "index" in f]
if len(indices) != len(set(indices)):
msg = f"Field indices must be unique, got: {indices}"
raise ValueError(msg)
def _validate_equations(equations: list[Any]) -> None:
"""Validate equations list.
Raises
------
ValueError
If the equations list is empty or required fields are missing.
"""
if len(equations) == 0:
msg = "equations must be a non-empty list"
raise ValueError(msg)
for i, eq in enumerate(equations):
if "field" not in eq:
msg = f"equations[{i}].field is required"
raise ValueError(msg)
if "rhs" not in eq:
msg = f"equations[{i}].rhs is required"
raise ValueError(msg)
[docs]
def validate_json_schema(data: Mapping[str, Any]) -> None:
"""Validate that the JSON data matches the expected schema.
Raises
------
ValueError
If required fields are missing or have invalid types.
"""
required_top_level = ["spacetime", "fields", "equations"]
for field in required_top_level:
if field not in data:
msg = f"Missing required top-level field: {field}"
raise ValueError(msg)
_validate_spacetime(data["spacetime"])
_validate_fields(data["fields"])
_validate_equations(data["equations"])
# Cross-validate: equation field references must exist in fields list
field_names = {f["name"] for f in data["fields"]}
for i, eq in enumerate(data["equations"]):
if eq["field"] not in field_names:
msg = (
f"equations[{i}].field '{eq['field']}' "
f"not found in fields list: {sorted(field_names)}"
)
raise ValueError(msg)
# --- Public loader ---
[docs]
def load_equation_system(json_path: Path | str) -> EquationSystem:
"""Load an equation system from a JSON file.
Parameters
----------
json_path : Path | str
Path to the JSON file exported from Mathematica.
Returns
-------
EquationSystem
The parsed equation system.
Raises
------
FileNotFoundError
If the JSON file does not exist.
"""
path = Path(json_path)
if not path.exists():
msg = f"JSON file not found: {path}"
raise FileNotFoundError(msg)
with path.open(encoding="utf-8") as f:
data = json.load(f)
validate_json_schema(data)
return EquationSystem.from_dict(data)