Source code for tidal.measurement._asymptotic

"""Asymptotic conversion measurement — frame-independent scattering observables.

Computes:

- **P_final**: ``E_target(t_final) / E_source(t=0)`` — total conversion at end
  of simulation.  Uses field-group energies (summed over all components),
  making this a Lorentz scalar for any field type.

- **P_transmitted / P_reflected**: Directional decomposition of the
  target-field energy using **flux-based mode classification**.  For each
  Fourier mode k, the directional flux
  ``(k · k̂_source) · Im[v̂*(k) · φ̂(k)]`` determines propagation direction.
  The mode's spectral energy is attributed accordingly.

  This correctly handles real-valued fields where |φ̂(k)|² = |φ̂(-k)|²:
  the cross-spectrum ``Im[v̂* · φ̂]`` is antisymmetric, so for a rightward
  wave both the +k₀ and -k₀ modes have positive directional flux and are
  both attributed to forward.

  Properties:
  - **Coordinate-independent**: reference direction from source physics.
  - **Dimension-independent**: full wavevector dot product, works in 1-3D.
  - **Rotation-invariant**: ``k · k̂`` is a scalar under spatial rotations.
  - **Correctly handles real fields**: flux breaks the power-spectrum symmetry.

For scalar fields, φ² is a Lorentz scalar so P_final is automatically
frame-independent.  For vector fields, the group sum over all components
preserves rotational covariance.
"""

from __future__ import annotations

from dataclasses import dataclass
from typing import TYPE_CHECKING

import numpy as np

from tidal.measurement._energy import (
    _ENERGY_FLOOR,  # pyright: ignore[reportPrivateUsage]
    _compute_hamiltonian_per_field,  # pyright: ignore[reportPrivateUsage]
)
from tidal.measurement._utils import (
    _normalize_group,  # pyright: ignore[reportPrivateUsage]
)

if TYPE_CHECKING:
    from collections.abc import Sequence

    from numpy.typing import NDArray

    from tidal.measurement._io import SimulationData

# Floor for wavevector magnitude (detect standing waves)
_WAVEVECTOR_FLOOR = 1e-12


[docs] @dataclass(frozen=True) class AsymptoticConversionResult: """Result of asymptotic conversion measurement. Attributes ---------- P_final : float Total conversion at final snapshot: ``E_target(t_final) / E_source(0)``. P_reflected : float Fraction of total conversion in backward-propagating target modes. P_transmitted : float Fraction of total conversion in forward-propagating target modes. E_source_initial : float Source group energy at ``t=0``. E_target_final : float Target group energy at final snapshot. source_field : str Source field group name. target_field : str Target field group name. source_wavevector : tuple[float, ...] Spectral centroid wavevector of the source at ``t=0`` (defines the forward direction). """ P_final: float P_reflected: float P_transmitted: float E_source_initial: float E_target_final: float source_field: str target_field: str source_wavevector: tuple[float, ...]
def _group_energy_at_snapshot( data: SimulationData, field_names: Sequence[str], t_idx: int, ) -> float: """Compute total energy for a group of fields at snapshot *t_idx*. Uses the Hamiltonian per-field self-energy decomposition for coordinate-invariant results. """ per_field, _interaction = _compute_hamiltonian_per_field(data, t_idx) total = 0.0 for fname in field_names: total += per_field.get(fname, 0.0) return total def _build_k_grids( field_shape: tuple[int, ...], grid_spacing: tuple[float, ...], ) -> list[NDArray[np.float64]]: """Build per-axis wavenumber grids (full FFT, not rFFT).""" ndim = len(grid_spacing) k_arrays: list[NDArray[np.float64]] = [] for ax in range(ndim): n = field_shape[ax] k_ax: NDArray[np.float64] = np.asarray( np.fft.fftfreq(n, d=grid_spacing[ax]) * (2.0 * np.pi), dtype=np.float64 ) k_arrays.append(k_ax) return [ np.asarray(g, dtype=np.float64) for g in np.meshgrid(*k_arrays, indexing="ij") ] def _source_wavevector( data: SimulationData, source_fields: Sequence[str], ) -> NDArray[np.float64]: """Compute the propagation direction of the source at t=0. Uses the **energy flux** (momentum density) in Fourier space to determine the net propagation direction. For real fields, the power spectrum ``|φ̂(k)|²`` is symmetric (``|φ̂(k)| = |φ̂(-k)|``), so a power-weighted centroid is always zero. The energy flux breaks this symmetry: ⟨k⟩_flux = Σ_k k · Im[v̂*(k) · φ̂(k)] This is proportional to the momentum density ``T^{0i} = -v · ∂_i φ`` and correctly identifies the propagation direction for travelling waves while returning zero for standing waves. Summed over all source fields for group covariance. For a plane wave with wavevector k₀, this returns a vector parallel to k₀. The magnitude is proportional to energy flux, not |k₀| itself. """ ndim = len(data.grid_spacing) k_flux = np.zeros(ndim) total_flux = 0.0 for fname in source_fields: field_0 = np.asarray(data.fields[fname][0], dtype=np.float64) fhat = np.fft.fftn(field_0) vel_all = data.velocities.get(fname) if vel_all is None: continue vel_0 = np.asarray(vel_all[0], dtype=np.float64) vhat = np.fft.fftn(vel_0) # Cross-spectrum: Im[v̂*(k) · φ̂(k)] gives directional weight cross = np.imag(np.conj(vhat) * fhat) k_grids = _build_k_grids(field_0.shape, data.grid_spacing) for ax in range(ndim): flux_ax = float(np.sum(k_grids[ax] * cross)) k_flux[ax] += flux_ax total_flux += float(np.sum(np.abs(cross))) # Normalize to get a unit-like direction vector scaled by net flux if total_flux > 0: k_flux /= total_flux return k_flux def _directional_split( # noqa: PLR0914 data: SimulationData, target_fields: Sequence[str], t_idx: int, k_hat: NDArray[np.float64], ) -> tuple[float, float]: """Split target field energy into forward (transmitted) and reflected. Uses **flux-based mode classification**: for each Fourier mode k, the directional energy flux ``(k · k̂_source) · Im[v̂*(k) · φ̂(k)]`` determines whether the mode propagates forward or backward. The mode's spectral energy ``|φ̂(k)|² + |v̂(k)|²`` is then attributed to the corresponding direction. For real-valued fields, a naive spectral power split by sign of k always gives ~50/50 because |φ̂(k)|² = |φ̂(-k)|². The flux-based approach correctly handles real fields because the directional flux uses the cross-spectrum ``Im[v̂* · φ̂]`` which is antisymmetric: for a rightward travelling wave, both the +k and -k modes have positive directional flux and are both attributed to the forward direction. Parameters ---------- data : SimulationData Simulation output. target_fields : list[str] Target field names. t_idx : int Snapshot index. k_hat : ndarray, shape ``(ndim,)`` Unit vector in the source propagation direction. Returns ------- forward_frac, reflected_frac : float Fractions of total spectral energy (each in [0, 1], sum ≈ 1). """ total_forward = 0.0 total_reflected = 0.0 total_zero = 0.0 # Build k-grids once (invariant across fields — depends only on grid geometry) first_field = np.asarray(data.fields[target_fields[0]][t_idx], dtype=np.float64) k_grids = _build_k_grids(first_field.shape, data.grid_spacing) for fname in target_fields: field_arr = np.asarray(data.fields[fname][t_idx], dtype=np.float64) fhat = np.fft.fftn(field_arr) spectral_energy = np.abs(fhat) ** 2 vel_all = data.velocities.get(fname) if vel_all is not None: vel_arr = np.asarray(vel_all[t_idx], dtype=np.float64) vhat = np.fft.fftn(vel_arr) spectral_energy += np.abs(vhat) ** 2 # Directional flux: (k · k̂_source) · Im[v̂*(k) · φ̂(k)] k_dot_source = sum( k_grids[ax] * k_hat[ax] for ax in range(len(data.grid_spacing)) ) cross = np.imag(np.conj(vhat) * fhat) flux = k_dot_source * cross flux_scale = np.max(np.abs(flux)) eps = 1e-12 * flux_scale if flux_scale > 0 else 0.0 fwd_mask = flux > eps ref_mask = flux < -eps zero_mask = ~fwd_mask & ~ref_mask total_forward += float(np.sum(spectral_energy[fwd_mask])) total_reflected += float(np.sum(spectral_energy[ref_mask])) total_zero += float(np.sum(spectral_energy[zero_mask])) else: # No velocity → cannot determine direction; split equally total_zero += float(np.sum(spectral_energy)) # Split zero-flux modes equally between forward and reflected total_forward += total_zero / 2.0 total_reflected += total_zero / 2.0 total = total_forward + total_reflected if total < _ENERGY_FLOOR: return 0.0, 0.0 return total_forward / total, total_reflected / total
[docs] def compute_asymptotic_conversion( data: SimulationData, source_fields: str | Sequence[str], target_fields: str | Sequence[str] | None = None, ) -> AsymptoticConversionResult: """Compute asymptotic scattering observables. The forward/backward directional split is defined by the source field's initial propagation direction (spectral centroid wavevector at ``t=0``). See module docstring for details on frame independence. Parameters ---------- data : SimulationData Simulation output. source_fields : str or sequence of str Source field name(s). target_fields : str, sequence of str, or None Target field name(s). ``None`` → all dynamical fields not in source. Returns ------- AsymptoticConversionResult Raises ------ ValueError If source/target fields are invalid or overlap, or if the source has zero initial energy. """ source_list = _normalize_group(source_fields) if target_fields is None: all_dyn = list(data.dynamical_fields) target_list = [f for f in all_dyn if f not in source_list] if not target_list: msg = "No target fields: all dynamical fields are in the source set" raise ValueError(msg) else: target_list = _normalize_group(target_fields) overlap = set(source_list) & set(target_list) if overlap: msg = f"Source and target overlap: {overlap}" raise ValueError(msg) # Energies e_source_0 = _group_energy_at_snapshot(data, source_list, 0) e_target_final = _group_energy_at_snapshot(data, target_list, -1) if e_source_0 < _ENERGY_FLOOR: msg = "Source initial energy is zero — cannot compute conversion" raise ValueError(msg) p_final = e_target_final / e_source_0 # Source propagation direction (spectral centroid at t=0) k_source = _source_wavevector(data, source_list) k_mag = float(np.linalg.norm(k_source)) if k_mag < _WAVEVECTOR_FLOOR: # Source has no net propagation direction (e.g., standing wave). # Directional decomposition is undefined — report equal split. p_transmitted = p_final / 2.0 p_reflected = p_final / 2.0 else: k_hat = k_source / k_mag fwd_frac, ref_frac = _directional_split(data, target_list, -1, k_hat) p_transmitted = p_final * fwd_frac p_reflected = p_final * ref_frac return AsymptoticConversionResult( P_final=p_final, P_reflected=p_reflected, P_transmitted=p_transmitted, E_source_initial=e_source_0, E_target_final=e_target_final, source_field=",".join(source_list), target_field=",".join(target_list), source_wavevector=tuple(float(x) for x in k_source), )