Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
210 changes: 209 additions & 1 deletion lightning/src/routing/scoring.rs
Original file line number Diff line number Diff line change
Expand Up @@ -492,6 +492,17 @@ pub struct ProbabilisticScorer<G: Deref<Target = NetworkGraph<L>>, L: Logger> {
#[derive(Clone)]
pub struct ChannelLiquidities(HashMap<u64, ChannelLiquidity>);

/// The action to take when two [`ChannelLiquidities`] contain the same short channel id.
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub enum ChannelLiquidityMergeAction {
/// Keep the existing entry from the left-hand [`ChannelLiquidities`].
KeepExisting,
/// Replace the existing entry with the entry from the right-hand [`ChannelLiquidities`].
ReplaceWithOther,
/// Combine both entries using LDK's per-channel merge logic.
Combine,
}

impl ChannelLiquidities {
fn new() -> Self {
Self(new_hash_map())
Expand Down Expand Up @@ -550,6 +561,75 @@ impl ChannelLiquidities {
self.0.get_mut(short_channel_id)
}

/// Merge another set of channel liquidities into this one.
///
/// Both sets are first decayed to `duration_since_epoch` using the given `decay_params`,
/// matching the normalization performed by [`CombinedScorer::merge`]. Entries present in
/// both sets are combined using LDK's per-channel merge logic; entries present in only one
/// set are preserved. Use [`Self::merge_with`] if duplicate entries should be resolved with a
/// custom policy instead.
///
/// This is primarily useful for offline tooling that reads multiple serialized scorer files,
/// merges them, and writes a new serialized [`ChannelLiquidities`] file.
pub fn merge(
&mut self, other: Self, duration_since_epoch: Duration,
decay_params: ProbabilisticScoringDecayParameters,
) {
self.merge_with(other, duration_since_epoch, decay_params, |_existing, _other| {
ChannelLiquidityMergeAction::Combine
});
}

/// Merge another set of channel liquidities into this one, resolving duplicate entries with
/// `merge_action`.
///
/// Both sets are first decayed to `duration_since_epoch` using the given `decay_params`,
/// matching the normalization performed by [`CombinedScorer::merge`]. Entries present in only
/// one set are preserved. For duplicate short channel ids, `merge_action` is called with
/// diagnostic views of the existing and incoming entries and decides whether to keep the
/// existing value, replace it with the incoming value, or combine both entries using LDK's
/// per-channel merge logic.
///
/// This lets offline tooling apply deterministic source-aware policies such as preferring the
/// entry with richer historical data, preferring the newer datapoint, or preserving a trusted
/// source for known-good channels while still writing a regular serialized [`ChannelLiquidities`]
/// file.
pub fn merge_with<F>(
&mut self, mut other: Self, duration_since_epoch: Duration,
decay_params: ProbabilisticScoringDecayParameters, mut merge_action: F,
) where
F: FnMut(
&ChannelLiquidityDiagnostic,
&ChannelLiquidityDiagnostic,
) -> ChannelLiquidityMergeAction,
{
self.time_passed(duration_since_epoch, decay_params);
other.time_passed(duration_since_epoch, decay_params);

for (scid, other_liquidity) in other.0 {
match self.0.entry(scid) {
Entry::Occupied(mut entry) => {
let existing_diagnostic =
ChannelLiquidityDiagnostic::from_internal(scid, entry.get());
let other_diagnostic =
ChannelLiquidityDiagnostic::from_internal(scid, &other_liquidity);
match merge_action(&existing_diagnostic, &other_diagnostic) {
ChannelLiquidityMergeAction::KeepExisting => {},
ChannelLiquidityMergeAction::ReplaceWithOther => {
_ = entry.insert(other_liquidity);
},
ChannelLiquidityMergeAction::Combine => {
entry.get_mut().merge(&other_liquidity);
},
}
},
Entry::Vacant(entry) => {
entry.insert(other_liquidity);
},
}
}
}

/// Produces a read-only [`ChannelLiquidityDiagnostic`] view of every entry, sorted by
/// `short_channel_id` for deterministic output.
///
Expand Down Expand Up @@ -1961,6 +2041,16 @@ impl<G: Deref<Target = NetworkGraph<L>> + Clone, L: Logger + Clone> CombinedScor
pub fn set_scores(&mut self, external_scores: ChannelLiquidities) {
self.scorer.set_scores(external_scores);
}

/// Returns the current combined scoring state.
///
/// Unlike this type's [`Writeable`] implementation, which intentionally persists only the
/// local scorer, this exposes the in-memory scorer that includes any external scores merged
/// through [`Self::merge`] or installed through [`Self::set_scores`]. This lets offline tools
/// serialize the combined view explicitly via [`ChannelLiquidities::write`].
pub fn scores(&self) -> &ChannelLiquidities {
self.scorer.scores()
}
}

impl<G: Deref<Target = NetworkGraph<L>>, L: Logger> ScoreLookUp for CombinedScorer<G, L> {
Expand Down Expand Up @@ -2711,7 +2801,8 @@ mod tests {
BlindedTail, CandidateRouteHop, Path, PublicHopCandidate, RouteHop,
};
use crate::routing::scoring::{
ChannelLiquidities, ChannelUsage, CombinedScorer, ScoreLookUp, ScoreUpdate,
ChannelLiquidities, ChannelLiquidityMergeAction, ChannelUsage, CombinedScorer, ScoreLookUp,
ScoreUpdate,
};
use crate::util::ser::{Readable, ReadableArgs, Writeable};
use crate::util::test_utils::{self, TestLogger};
Expand Down Expand Up @@ -4277,6 +4368,108 @@ mod tests {
assert_eq!(scores.iter().count(), 2);
}

#[test]
#[rustfmt::skip]
fn channel_liquidities_merge_preserves_unique_entries_and_averages_overlaps() {
let last_updated = Duration::ZERO;
let mut first = ChannelLiquidities::new();
first.insert(42, ChannelLiquidity {
min_liquidity_offset_msat: 100,
max_liquidity_offset_msat: 300,
liquidity_history: HistoricalLiquidityTracker::new(),
last_updated,
offset_history_last_updated: last_updated,
last_datapoint_time: last_updated,
});
first.insert(43, ChannelLiquidity {
min_liquidity_offset_msat: 10,
max_liquidity_offset_msat: 30,
liquidity_history: HistoricalLiquidityTracker::new(),
last_updated,
offset_history_last_updated: last_updated,
last_datapoint_time: last_updated,
});

let mut second = ChannelLiquidities::new();
second.insert(42, ChannelLiquidity {
min_liquidity_offset_msat: 300,
max_liquidity_offset_msat: 700,
liquidity_history: HistoricalLiquidityTracker::new(),
last_updated,
offset_history_last_updated: last_updated,
last_datapoint_time: last_updated,
});
second.insert(44, ChannelLiquidity {
min_liquidity_offset_msat: 20,
max_liquidity_offset_msat: 40,
liquidity_history: HistoricalLiquidityTracker::new(),
last_updated,
offset_history_last_updated: last_updated,
last_datapoint_time: last_updated,
});

first.merge(second, Duration::ZERO, ProbabilisticScoringDecayParameters::zero_penalty());

let diagnostics = first.diagnostics();
assert_eq!(diagnostics.len(), 3);
let merged = diagnostics.iter().find(|diag| diag.scid == 42).unwrap();
assert_eq!(merged.min_liquidity_offset_msat, 200);
assert_eq!(merged.max_liquidity_offset_msat, 500);
assert!(diagnostics.iter().any(|diag| diag.scid == 43));
assert!(diagnostics.iter().any(|diag| diag.scid == 44));
}

#[test]
#[rustfmt::skip]
fn channel_liquidities_merge_with_can_choose_overlap_winner() {
let last_updated = Duration::ZERO;
let mut first = ChannelLiquidities::new();
first.insert(42, ChannelLiquidity {
min_liquidity_offset_msat: 100,
max_liquidity_offset_msat: 300,
liquidity_history: HistoricalLiquidityTracker::new(),
last_updated,
offset_history_last_updated: last_updated,
last_datapoint_time: last_updated,
});

let mut second = ChannelLiquidities::new();
second.insert(42, ChannelLiquidity {
min_liquidity_offset_msat: 300,
max_liquidity_offset_msat: 700,
liquidity_history: HistoricalLiquidityTracker::new(),
last_updated,
offset_history_last_updated: last_updated,
last_datapoint_time: last_updated,
});
second.insert(44, ChannelLiquidity {
min_liquidity_offset_msat: 20,
max_liquidity_offset_msat: 40,
liquidity_history: HistoricalLiquidityTracker::new(),
last_updated,
offset_history_last_updated: last_updated,
last_datapoint_time: last_updated,
});

first.merge_with(
second,
Duration::ZERO,
ProbabilisticScoringDecayParameters::zero_penalty(),
|existing, other| {
assert_eq!(existing.scid, 42);
assert_eq!(other.scid, 42);
ChannelLiquidityMergeAction::ReplaceWithOther
},
);

let diagnostics = first.diagnostics();
assert_eq!(diagnostics.len(), 2);
let merged = diagnostics.iter().find(|diag| diag.scid == 42).unwrap();
assert_eq!(merged.min_liquidity_offset_msat, 300);
assert_eq!(merged.max_liquidity_offset_msat, 700);
assert!(diagnostics.iter().any(|diag| diag.scid == 44));
}

#[test]
fn combined_scorer() {
let logger = TestLogger::new();
Expand Down Expand Up @@ -4342,6 +4535,21 @@ mod tests {
combined_scorer.scorer.estimated_channel_liquidity_range(42, &target_node_id());
assert_eq!(liquidity_range.unwrap(), (0, 300));

let mut serialized_scores = Vec::new();
combined_scorer.scores().write(&mut serialized_scores).unwrap();
let exported_scores: ChannelLiquidities =
Readable::read(&mut io::Cursor::new(&serialized_scores)).unwrap();
let mut scorer_from_export = ProbabilisticScorer::new(
ProbabilisticScoringDecayParameters::default(),
&network_graph,
&logger,
);
scorer_from_export.set_scores(exported_scores);
assert_eq!(
scorer_from_export.estimated_channel_liquidity_range(42, &target_node_id()).unwrap(),
(0, 300)
);

// Now set (overwrite) the scorer state with the external data which should lead to an even greater liquidity
// range. Just the success from the external source is now considered.
combined_scorer.set_scores(external_scores);
Expand Down