diff --git a/src/layup/bk_orbitfit.py b/src/layup/bk_orbitfit.py new file mode 100644 index 00000000..18e34dbf --- /dev/null +++ b/src/layup/bk_orbitfit.py @@ -0,0 +1,192 @@ +"""Adapter from layup's Observation/FitResult types to the liborbfit +Python wrapper (https://github.com/matthewholman/liborbfit). + +The heavy lifting — ctypes bindings, the αβγ→Cartesian transform, +and the do_bk_fit entry point — lives in liborbfit's own repo at +`python/liborbfit/bk_orbitfit.py`. This file is a thin adapter: + + * `_build_obs(...)` converts a sequence of layup Observation + objects into liborbfit's Observation dataclass. + * `_to_layup_result(...)` packs a liborbfit BKFitResult into a + layup FitResult (constructing the Cartesian state from BK via + `BKFitResult.to_cartesian()`). + * `set_ephem`, `do_bk_fit`, `compute_r_au` are the public entry + points consumed by orbitfit.py; their surface is unchanged. + +To use, clone and build liborbfit (`./setup_deps.sh && make` in the +liborbfit repo), then export two env vars so layup can load both the +C library (via ctypes) and the Python wrapper: + + export LIBORBFIT_PATH=/path/to/liborbfit/liborbfit.so + export PYTHONPATH=/path/to/liborbfit/python:$PYTHONPATH + +When the env vars are not set the BK engine is unavailable; layup's +auto-dispatch falls back to the Cartesian engine and explicit +`engine="bk"` calls return a FitResult with flag=3. + +Failure modes are surfaced through FitResult.flag: + 0 = success + 1 = liborbfit.do_bk_fit returned a non-zero rc (singular, too few + obs, etc. — see orbfit_lib.h) + 2 = ephemeris not loaded + 3 = liborbfit module unavailable (not on PYTHONPATH, library + missing, ABI symbol mismatch, …) +""" + +from __future__ import annotations + +import os +from typing import Sequence, Any + +import numpy as np + +# liborbfit (the standalone Python wrapper around liborbfit.so). +# Imported lazily so this module remains importable even on +# machines without the BK build; callers see the failure when they +# try to use the API. +try: + from liborbfit import ( + Observation as _LiborbfitObs, + set_ephem as _set_ephem, + do_bk_fit as _do_bk_fit, + ) + + _LIBORBFIT_OK = True +except ImportError: # pragma: no cover — let callers see the failure + _LiborbfitObs = None # type: ignore + _set_ephem = None # type: ignore + _do_bk_fit = None # type: ignore + _LIBORBFIT_OK = False + +# layup's pybind11-bound FitResult, the same type returned by the +# Cartesian-LM path so orbitfit.do_fit can hand it to its caller +# without knowing which engine produced it. +try: + from _layup_cpp._core import FitResult # type: ignore +except ImportError: # pragma: no cover + FitResult = None # type: ignore + + +ARCSEC = 1.0 / 206265.0 + + +# --------------------------------------------------------------------------- # +# Public entry points # +# --------------------------------------------------------------------------- # + + +def set_ephem(planets_path: str | os.PathLike, asteroids_path: str | os.PathLike) -> None: + """Load the ASSIST kernels once per process; pass-through to + liborbfit.set_ephem.""" + if not _LIBORBFIT_OK: + raise RuntimeError( + "liborbfit Python package not importable; install it and " + "ensure both LIBORBFIT_PATH and PYTHONPATH point at the build." + ) + _set_ephem(planets_path, asteroids_path) + + +def do_bk_fit(observations: Sequence[Any]) -> "FitResult": + """Fit a sequence of layup Observations with BK and return a + layup FitResult. See module docstring for the FitResult.flag + convention.""" + if FitResult is None: + raise RuntimeError("_layup_cpp._core.FitResult is not importable; build layup first.") + if not _LIBORBFIT_OK: + return _failed_result(flag=3) + obs = _build_obs(observations) + r = _do_bk_fit(obs) + return _to_layup_result(r) + + +def compute_r_au(observations: Sequence[Any]) -> float | None: + """Cheap heliocentric-distance estimate via Gauss IOD on the + first / middle / last obs. Used by orbitfit.do_fit's auto + dispatch (route to BK when r ≥ threshold).""" + try: + from _layup_cpp._core import gauss as _gauss # type: ignore + except ImportError: + return None + if len(observations) < 3: + return None + o0, o1, o2 = (observations[0], observations[len(observations) // 2], observations[-1]) + try: + solns = _gauss(o0, o1, o2) + except Exception: + return None + if not solns: + return None + state = np.array(solns[0].state) + return float(np.linalg.norm(state[:3])) + + +# --------------------------------------------------------------------------- # +# Internal helpers # +# --------------------------------------------------------------------------- # + + +def _radec_from_rho(rho: np.ndarray) -> tuple[float, float]: + """Unit direction vector (ICRS-equatorial) -> (ra, dec) radians.""" + rx, ry, rz = rho[0], rho[1], rho[2] + ra = np.arctan2(ry, rx) % (2 * np.pi) + dec = np.arctan2(rz, np.hypot(rx, ry)) + return float(ra), float(dec) + + +def _build_obs(observations: Sequence[Any]) -> list: + """Convert layup Observation list to liborbfit Observation list.""" + out = [] + for o in observations: + rho = o.rho_hat if isinstance(o.rho_hat, np.ndarray) else np.asarray(o.rho_hat) + ra, dec = _radec_from_rho(rho.reshape(-1)) + out.append( + _LiborbfitObs( + jd_tdb=float(o.epoch), + ra=ra, + dec=dec, + sigma_ra=float(o.ra_unc if o.ra_unc is not None else ARCSEC), + sigma_dec=float(o.dec_unc if o.dec_unc is not None else ARCSEC), + xe=float(o.observer_position[0]), + ye=float(o.observer_position[1]), + ze=float(o.observer_position[2]), + ) + ) + return out + + +def _failed_result(flag: int) -> "FitResult": + res = FitResult() + res.method = "BK" + res.niter = 0 + res.flag = flag + res.csq = float("nan") + res.ndof = 0 + res.epoch = 0.0 + res.state = [0.0] * 6 + res.cov = [0.0] * 36 + return res + + +def _to_layup_result(r) -> "FitResult": + """Pack a liborbfit BKFitResult into a layup FitResult.""" + res = FitResult() + res.method = "BK" + res.niter = 0 # liborbfit doesn't currently report iteration count + + if r.flag != 0: + res.flag = 1 + res.csq = float("nan") + res.ndof = 0 + res.epoch = 0.0 + res.state = [0.0] * 6 + res.cov = [0.0] * 36 + return res + + state, _J, cov_cart = r.to_cartesian() + res.flag = 0 + res.csq = float(r.chisq) + res.ndof = int(r.dof) + res.epoch = float(r.jd0) + res.state = [float(x) for x in state] + res.cov = [float(x) for x in cov_cart.reshape(-1)] + return res diff --git a/src/layup/orbitfit.py b/src/layup/orbitfit.py index afab1dbe..ebc09b7b 100644 --- a/src/layup/orbitfit.py +++ b/src/layup/orbitfit.py @@ -349,7 +349,52 @@ def do_gauss_iod(observations, seq): return solns -def do_fit(observations, seq, cache_dir, iod="gauss"): +def _bk_route_r_AU(solns): + """Estimate r from Gauss IOD's first solution, for the BK auto-dispatch. + Returns 0.0 if no solution is available.""" + if not solns: + return 0.0 + s0 = solns[0] + sx, sy, sz = s0.state[0], s0.state[1], s0.state[2] + return float((sx * sx + sy * sy + sz * sz) ** 0.5) + + +# Module-level cache: whether bk_orbitfit's ephem has been loaded for +# the given cache_dir. set_ephem itself is idempotent on liborbfit's +# side but reopens the kernel files; this cache avoids the cost. +_bk_ephem_cache_dir = None + + +def _ensure_bk_ephem(cache_dir) -> bool: + """Lazily load BK's ASSIST kernels. Returns True if ready, False on error.""" + global _bk_ephem_cache_dir + if _bk_ephem_cache_dir == cache_dir: + return True + try: + from layup import bk_orbitfit + except Exception as e: + logger.warning(f"bk_orbitfit not importable: {e}") + return False + try: + bk_orbitfit.set_ephem( + os.path.join(str(cache_dir), "linux_p1550p2650.440"), + os.path.join(str(cache_dir), "sb441-n16.bsp"), + ) + except Exception as e: + logger.warning(f"BK ephem load failed for cache_dir={cache_dir}: {e}") + return False + _bk_ephem_cache_dir = cache_dir + return True + + +def do_fit( + observations, + seq, + cache_dir, + iod="gauss", + engine: Literal["auto", "cartesian", "bk"] = "cartesian", + bk_threshold_AU: float = 10.0, +): """Carry out an orbit fit to the observations in a series of steps. A list of lists of observation indices specifies the order in which the fit proceeds. @@ -397,6 +442,37 @@ def do_fit(observations, seq, cache_dir, iod="gauss"): x.flag = 5 return x + # Engine dispatch. BK (Bernstein-Khushalani) is well-conditioned + # for distant short-arc objects (it parameterizes in tangent-plane + # coords and uses a variational Jacobian); Cartesian-LM is better + # for inner-SS / NEO-like targets. Route based on the Gauss IOD's + # first-solution heliocentric distance. + chosen_engine = engine + if chosen_engine == "auto": + r_iod = _bk_route_r_AU(solns) + chosen_engine = "bk" if r_iod >= bk_threshold_AU else "cartesian" + logger.debug( + f"do_fit auto-dispatch: Gauss r={r_iod:.3f} AU, " + f"threshold={bk_threshold_AU} AU -> {chosen_engine}" + ) + + if chosen_engine == "bk": + if _ensure_bk_ephem(cache_dir): + from layup import bk_orbitfit + + try: + x = bk_orbitfit.do_bk_fit(observations) + except Exception as e: + logger.warning(f"BK fit raised {e}; falling back to Cartesian") + chosen_engine = "cartesian" + else: + if x.flag == 0: + return x + logger.debug(f"BK fit returned flag={x.flag}; " f"falling back to Cartesian path") + chosen_engine = "cartesian" + else: + chosen_engine = "cartesian" + assist_ephem = get_ephem(cache_dir) #! I think this can be a `for/else loop...` diff --git a/src/lib/detection.cpp b/src/lib/detection.cpp index 80e5f2af..df0c0484 100644 --- a/src/lib/detection.cpp +++ b/src/lib/detection.cpp @@ -5,6 +5,8 @@ #include #include +#include +#include namespace py = pybind11; // --- Observation Variant Types --- diff --git a/tests/layup/test_bk_dispatch.py b/tests/layup/test_bk_dispatch.py new file mode 100644 index 00000000..e1255f57 --- /dev/null +++ b/tests/layup/test_bk_dispatch.py @@ -0,0 +1,131 @@ +"""Tests for orbitfit.do_fit's BK engine dispatch. + +These tests are guarded by an importability check for `liborbfit.so`: +if the library isn't available, they skip rather than fail, so CI on +machines without the BK build is unaffected. +""" + +from __future__ import annotations + +import json +import os +from pathlib import Path + +import numpy as np +import pytest + +from layup import orbitfit +from layup.routines import Observation + + +# The BK path needs the `liborbfit` Python package importable AND +# liborbfit.so loadable (via LIBORBFIT_PATH or the wrapper's default +# search). We force the load here by calling set_jacobian_mode, +# which exercises the ctypes binding without doing any real work. +# If anything fails, skip rather than error so CI on machines without +# the BK build is unaffected. +def _liborbfit_available() -> bool: + try: + import liborbfit + + liborbfit.set_jacobian_mode(liborbfit.JACOBIAN_VARIATIONAL) + return True + except (ImportError, OSError, RuntimeError): + return False + + +pytestmark = pytest.mark.skipif( + not _liborbfit_available(), + reason="liborbfit not available (clone+build it and set " "LIBORBFIT_PATH + PYTHONPATH)", +) + + +# Truth-data fixtures live in claude_layup/diagnostic/scan/. The +# pre-bundled tests/data tree doesn't carry them. Tests skip gracefully +# if the diagnostic harness isn't present. +DIAGNOSTIC_SCAN = Path(__file__).resolve().parent.parent.parent.parent / "diagnostic" / "scan" / "truth" + + +def _have_truth(name: str) -> bool: + return (DIAGNOSTIC_SCAN / f"{name}.json").exists() + + +def _build_obs(name: str): + truth = json.loads((DIAGNOSTIC_SCAN / f"{name}.json").read_text()) + sigma_rad = float(truth["sigma_arcsec"]) / 206265.0 + obs = [] + for r in truth["observations"]: + o = Observation.from_astrometry( + float(r["ra"]) * np.pi / 180.0, + float(r["dec"]) * np.pi / 180.0, + float(r["jd_tdb"]), + list(r["observer_state_AU"]), + [0.0, 0.0, 0.0], + ) + o.ra_unc = sigma_rad + o.dec_unc = sigma_rad + obs.append(o) + return obs + + +CACHE = os.path.expanduser("~/Library/Caches/layup") + + +@pytest.mark.skipif( + not _have_truth("classical_42AU_arc_010.00d"), reason="diagnostic/scan dataset not present" +) +def test_auto_routes_to_bk_for_distant(): + """A 42 AU classical KBO should auto-dispatch to BK.""" + obs = _build_obs("classical_42AU_arc_010.00d") + seq = [list(range(len(obs)))] + fit = orbitfit.do_fit(obs, seq, CACHE, iod="gauss", engine="auto") + assert fit.flag == 0 + assert fit.method == "BK" + + +@pytest.mark.skipif( + not _have_truth("mainbelt_2.5AU_arc_007.00d"), reason="diagnostic/scan dataset not present" +) +def test_auto_routes_to_cartesian_for_mainbelt(): + """A 2.5 AU mainbelt object should auto-dispatch to Cartesian-LM.""" + obs = _build_obs("mainbelt_2.5AU_arc_007.00d") + seq = [list(range(len(obs)))] + fit = orbitfit.do_fit(obs, seq, CACHE, iod="gauss", engine="auto") + assert fit.flag == 0 + assert fit.method != "BK" + + +@pytest.mark.skipif( + not _have_truth("classical_42AU_arc_010.00d"), reason="diagnostic/scan dataset not present" +) +def test_explicit_engine_bk(): + """Forcing engine='bk' produces a BK result regardless of r.""" + obs = _build_obs("classical_42AU_arc_010.00d") + seq = [list(range(len(obs)))] + fit = orbitfit.do_fit(obs, seq, CACHE, iod="gauss", engine="bk") + assert fit.flag == 0 + assert fit.method == "BK" + + +@pytest.mark.skipif( + not _have_truth("classical_42AU_arc_010.00d"), reason="diagnostic/scan dataset not present" +) +def test_explicit_engine_cartesian(): + """engine='cartesian' bypasses the dispatch and uses the original LM path.""" + obs = _build_obs("classical_42AU_arc_010.00d") + seq = [list(range(len(obs)))] + fit = orbitfit.do_fit(obs, seq, CACHE, iod="gauss", engine="cartesian") + assert fit.flag == 0 + assert fit.method != "BK" + + +@pytest.mark.skipif( + not _have_truth("classical_42AU_arc_010.00d"), reason="diagnostic/scan dataset not present" +) +def test_threshold_pushes_to_cartesian(): + """A very high threshold forces auto to pick Cartesian even for distant targets.""" + obs = _build_obs("classical_42AU_arc_010.00d") + seq = [list(range(len(obs)))] + fit = orbitfit.do_fit(obs, seq, CACHE, iod="gauss", engine="auto", bk_threshold_AU=1000.0) + assert fit.flag == 0 + assert fit.method != "BK"