diff --git a/fuzz/src/chanmon_consistency.rs b/fuzz/src/chanmon_consistency.rs index 8a90dc93e97..a93978c409b 100644 --- a/fuzz/src/chanmon_consistency.rs +++ b/fuzz/src/chanmon_consistency.rs @@ -41,8 +41,7 @@ use lightning::chain; use lightning::chain::chaininterface::{ BroadcasterInterface, ConfirmationTarget, FeeEstimator, TransactionType, }; -use lightning::chain::channelmonitor::{ChannelMonitor, MonitorEvent}; -use lightning::chain::transaction::OutPoint; +use lightning::chain::channelmonitor::ChannelMonitor; use lightning::chain::{ chainmonitor, channelmonitor, BlockLocator, ChannelMonitorUpdateStatus, Confirm, Watch, }; @@ -87,7 +86,6 @@ use lightning::util::wallet_utils::{WalletSourceSync, WalletSync}; use lightning_invoice::RawBolt11Invoice; use crate::utils::test_logger::{self, Output}; -use crate::utils::test_persister::TestPersister; use bitcoin::secp256k1::ecdh::SharedSecret; use bitcoin::secp256k1::ecdsa::{RecoverableSignature, Signature}; @@ -293,144 +291,302 @@ impl Writer for VecWriter { } } -/// The LDK API requires that any time we tell it we're done persisting a `ChannelMonitor[Update]` -/// we never pass it in as the "latest" `ChannelMonitor` on startup. However, we can pass -/// out-of-date monitors as long as we never told LDK we finished persisting them, which we do by -/// storing both old `ChannelMonitor`s and ones that are "being persisted" here. +fn serialize_monitor(monitor: &ChannelMonitor) -> Vec { + let mut ser = VecWriter(Vec::new()); + monitor.write(&mut ser).unwrap(); + ser.0 +} + +/// LDK requires the `ChannelMonitor` loaded on startup to be at least as current as the +/// `ChannelManager` state, except for monitor updates that `ChannelManager` still records as +/// in-flight and can replay. This harness tracks the monitor blobs that remain valid restart +/// candidates under that rule. +/// +/// Separately, we track every `InProgress` persistence operation that still needs a +/// `channel_monitor_updated` call. A newer persisted monitor can make an older monitor invalid for +/// restart while the older update still needs to be completed to unblock the live `ChainMonitor`. /// -/// Note that such "being persisted" `ChannelMonitor`s are stored in `ChannelManager` and will -/// simply be replayed on startup. +/// Off-chain monitor updates that are still "being persisted" are stored in `ChannelManager` and +/// will be replayed on startup. Full-monitor snapshots from chain sync or archive paths that return +/// `InProgress` are only restart candidates; losing one on restart does not require a +/// `channel_monitor_updated` callback. struct LatestMonitorState { /// The latest monitor id which we told LDK we've persisted. /// - /// Note that there may still be earlier pending monitor updates in [`Self::pending_monitors`] - /// which we haven't yet completed. We're allowed to reload with those as well, at least until - /// they're completed. + /// Note that earlier updates may still need a `channel_monitor_updated` callback via + /// [`Self::pending_monitor_completions`]. persisted_monitor_id: u64, /// The latest serialized `ChannelMonitor` that we told LDK we persisted. persisted_monitor: Vec, - /// A set of (monitor id, serialized `ChannelMonitor`)s which we're currently "persisting", - /// from LDK's perspective. + /// An ordered list of (monitor id, serialized `ChannelMonitor`)s which remain safe to use as + /// stale monitors on reload. pending_monitors: Vec<(u64, Vec)>, + /// An ordered list of (monitor id, serialized `ChannelMonitor`)s which still need a + /// `channel_monitor_updated` callback. + pending_monitor_completions: Vec<(u64, Vec)>, } +impl LatestMonitorState { + fn insert_pending_entry( + pending: &mut Vec<(u64, Vec)>, monitor_id: u64, serialized_monitor: Vec, + ) { + // Monitor update ids must arrive in order. Assert at insertion time so duplicates or + // out-of-order updates fail close to the write that caused them instead of being sorted + // into place. + assert!( + pending.last().map_or(true, |(last_id, _)| *last_id < monitor_id), + "pending monitor updates should arrive in order" + ); + pending.push((monitor_id, serialized_monitor)); + } -struct TestChainMonitor { - pub logger: Arc, - pub keys: Arc, - pub persister: Arc, - pub chain_monitor: Arc< - chainmonitor::ChainMonitor< - TestChannelSigner, - Arc, - Arc, - Arc, - Arc, - Arc, - Arc, - >, - >, - pub latest_monitors: Mutex>, -} -impl TestChainMonitor { - pub fn new( - broadcaster: Arc, logger: Arc, feeest: Arc, - persister: Arc, keys: Arc, - ) -> Self { - Self { - chain_monitor: Arc::new(chainmonitor::ChainMonitor::new( - None, - broadcaster, - logger.clone(), - feeest, - Arc::clone(&persister), - Arc::clone(&keys), - keys.get_peer_storage_key(), - false, - )), - logger, - keys, - persister, - latest_monitors: Mutex::new(new_hash_map()), + fn insert_pending_monitor_candidate(&mut self, monitor_id: u64, serialized_monitor: Vec) { + // Full-monitor persists from chain sync or archive paths use the monitor's current + // latest_update_id rather than a fresh ChannelMonitorUpdate id. Keep duplicate ids so + // reload can choose between multiple same-id full snapshots that were in flight together. + if let Some((last_id, _)) = self.pending_monitors.last() { + assert!(*last_id <= monitor_id, "pending monitor updates should arrive in order"); } + self.pending_monitors.push((monitor_id, serialized_monitor)); } -} -impl chain::Watch for TestChainMonitor { - fn watch_channel( - &self, channel_id: ChannelId, monitor: channelmonitor::ChannelMonitor, - ) -> Result { - let mut ser = VecWriter(Vec::new()); - monitor.write(&mut ser).unwrap(); - let monitor_id = monitor.get_latest_update_id(); - let res = self.chain_monitor.watch_channel(channel_id, monitor); - let state = match res { - Ok(chain::ChannelMonitorUpdateStatus::Completed) => LatestMonitorState { - persisted_monitor_id: monitor_id, - persisted_monitor: ser.0, - pending_monitors: Vec::new(), - }, - Ok(chain::ChannelMonitorUpdateStatus::InProgress) => LatestMonitorState { - persisted_monitor_id: monitor_id, - persisted_monitor: Vec::new(), - pending_monitors: vec![(monitor_id, ser.0)], - }, - Ok(chain::ChannelMonitorUpdateStatus::UnrecoverableError) => panic!(), - Err(()) => panic!(), - }; - if self.latest_monitors.lock().unwrap().insert(channel_id, state).is_some() { - panic!("Already had monitor pre-watch_channel"); + + fn mark_persisted(&mut self, monitor_id: u64, serialized_monitor: Vec) { + // Once a monitor is durable, use it as the restart baseline and stop tracking candidates + // at or behind that update id. Completion obligations are tracked separately and are + // deliberately not pruned here. + self.pending_monitors.retain(|(id, _)| *id > monitor_id); + if monitor_id >= self.persisted_monitor_id { + self.persisted_monitor_id = monitor_id; + self.persisted_monitor = serialized_monitor; } - res } - fn update_channel( - &self, channel_id: ChannelId, update: &channelmonitor::ChannelMonitorUpdate, - ) -> chain::ChannelMonitorUpdateStatus { - let mut map_lock = self.latest_monitors.lock().unwrap(); - let map_entry = map_lock.get_mut(&channel_id).expect("Didn't have monitor on update call"); - let latest_monitor_data = map_entry - .pending_monitors - .last() - .as_ref() - .map(|(_, data)| data) - .unwrap_or(&map_entry.persisted_monitor); - let deserialized_monitor = - <(BlockLocator, channelmonitor::ChannelMonitor)>::read( - &mut &latest_monitor_data[..], - (&*self.keys, &*self.keys), - ) - .unwrap() - .1; - deserialized_monitor - .update_monitor( - update, - &&TestBroadcaster { txn_broadcasted: RefCell::new(Vec::new()) }, - &&FuzzEstimator { ret_val: atomic::AtomicU32::new(253) }, - &self.logger, - ) - .unwrap(); - let mut ser = VecWriter(Vec::new()); - deserialized_monitor.write(&mut ser).unwrap(); - let res = self.chain_monitor.update_channel(channel_id, update); - match res { - chain::ChannelMonitorUpdateStatus::Completed => { - map_entry.persisted_monitor_id = update.update_id; - map_entry.persisted_monitor = ser.0; + fn insert_pending( + &mut self, monitor_id: u64, serialized_monitor: Vec, needs_completion: bool, + ) { + if needs_completion { + // persist_new_channel and update_persisted_channel(Some(_)) require a later + // channel_monitor_updated callback if persistence returns InProgress. + Self::insert_pending_entry( + &mut self.pending_monitors, + monitor_id, + serialized_monitor.clone(), + ); + Self::insert_pending_entry( + &mut self.pending_monitor_completions, + monitor_id, + serialized_monitor, + ); + } else { + // This harness treats update_persisted_channel(None, ...) as the chain-sync/archive + // case: the full monitor may be used on restart, but ChainMonitor does not wait for a + // channel_monitor_updated callback. + self.insert_pending_monitor_candidate(monitor_id, serialized_monitor); + } + } + + fn mark_completed_update_persisted(&mut self, monitor_id: u64, serialized_monitor: Vec) { + // The selector/drain path should already have removed this entry before + // finish_monitor_update calls channel_monitor_updated. This check catches accidental + // double-completion or pruning of the wrong list. + assert!( + self.pending_monitor_completions.iter().all(|(id, _)| *id != monitor_id), + "completed monitor update should already be removed from the completion queue" + ); + self.mark_persisted(monitor_id, serialized_monitor); + } + + fn drain_pending_completions(&mut self) -> Vec<(u64, Vec)> { + std::mem::take(&mut self.pending_monitor_completions) + } + + fn take_pending_completion( + &mut self, selector: MonitorUpdateSelector, + ) -> Option<(u64, Vec)> { + // The fuzzer chooses which outstanding callback to deliver. These choices apply to + // completion obligations, not to the set of monitors that may be used on restart. + match selector { + MonitorUpdateSelector::First => { + if self.pending_monitor_completions.is_empty() { + None + } else { + Some(self.pending_monitor_completions.remove(0)) + } }, - chain::ChannelMonitorUpdateStatus::InProgress => { - map_entry.pending_monitors.push((update.update_id, ser.0)); + MonitorUpdateSelector::Second => { + if self.pending_monitor_completions.len() > 1 { + Some(self.pending_monitor_completions.remove(1)) + } else { + None + } }, - chain::ChannelMonitorUpdateStatus::UnrecoverableError => panic!(), + MonitorUpdateSelector::Last => self.pending_monitor_completions.pop(), } - res } - fn release_pending_monitor_events( - &self, - ) -> Vec<(OutPoint, ChannelId, Vec, PublicKey)> { - return self.chain_monitor.release_pending_monitor_events(); + fn select_monitor_for_reload(&mut self, selector: MonitorReloadSelector) { + // A restart can load the last monitor we told LDK was persisted, or a monitor snapshot + // whose write was started before the simulated crash. + let old_mon = (self.persisted_monitor_id, std::mem::take(&mut self.persisted_monitor)); + let (monitor_id, serialized_monitor) = match selector { + MonitorReloadSelector::Persisted => old_mon, + MonitorReloadSelector::FirstPending => { + if self.pending_monitors.is_empty() { + old_mon + } else { + self.pending_monitors.remove(0) + } + }, + MonitorReloadSelector::LastPending => self.pending_monitors.pop().unwrap_or(old_mon), + }; + self.persisted_monitor_id = monitor_id; + self.persisted_monitor = serialized_monitor; + // After restart, stop tracking pre-restart in-flight writes. ChannelManager will replay + // off-chain monitor updates that still matter; full-monitor snapshots may simply be absent. + self.pending_monitors.clear(); + self.pending_monitor_completions.clear(); } } +struct HarnessPersister { + pub update_ret: Mutex, + pub latest_monitors: Mutex>, +} +impl HarnessPersister { + fn track_monitor_update( + &self, channel_id: ChannelId, monitor_id: u64, serialized_monitor: Vec, + status: chain::ChannelMonitorUpdateStatus, needs_completion: bool, + ) { + let mut latest_monitors = self.latest_monitors.lock().unwrap(); + if let Some(state) = latest_monitors.get_mut(&channel_id) { + match status { + chain::ChannelMonitorUpdateStatus::Completed => { + // A completed write advances the restart baseline. Once LDK can rely on that + // monitor state being durable, the harness stops offering candidates at or + // behind that update id. + state.mark_persisted(monitor_id, serialized_monitor); + }, + chain::ChannelMonitorUpdateStatus::InProgress => { + // InProgress always creates a restart candidate, but only some calls also need + // an explicit channel_monitor_updated completion. + state.insert_pending(monitor_id, serialized_monitor, needs_completion); + }, + chain::ChannelMonitorUpdateStatus::UnrecoverableError => {}, + } + } else { + let state = match status { + chain::ChannelMonitorUpdateStatus::Completed => LatestMonitorState { + persisted_monitor_id: monitor_id, + persisted_monitor: serialized_monitor, + pending_monitors: Vec::new(), + pending_monitor_completions: Vec::new(), + }, + chain::ChannelMonitorUpdateStatus::InProgress => { + // The first persist for a channel is persist_new_channel, which always needs a + // completion callback when it returns InProgress. A full-monitor update without + // existing state would mean the harness missed the channel's initial monitor. + assert!(needs_completion, "missing monitor state for full monitor update"); + LatestMonitorState { + persisted_monitor_id: monitor_id, + persisted_monitor: Vec::new(), + pending_monitors: vec![(monitor_id, serialized_monitor.clone())], + pending_monitor_completions: vec![(monitor_id, serialized_monitor)], + } + }, + chain::ChannelMonitorUpdateStatus::UnrecoverableError => return, + }; + assert!( + latest_monitors.insert(channel_id, state).is_none(), + "Already had monitor state pre-persist" + ); + } + } + + fn mark_update_completed( + &self, channel_id: ChannelId, monitor_id: u64, serialized_monitor: Vec, + ) { + let mut latest_monitors = self.latest_monitors.lock().unwrap(); + let state = latest_monitors + .get_mut(&channel_id) + .expect("missing monitor state for completed update"); + // Once we tell LDK update N is completed, use the completed monitor as the restart + // baseline and drop restart candidates at or behind N. + state.mark_completed_update_persisted(monitor_id, serialized_monitor); + } + + fn drain_pending_updates(&self, channel_id: &ChannelId) -> Vec<(u64, Vec)> { + self.latest_monitors + .lock() + .unwrap() + .get_mut(channel_id) + .map_or_else(Vec::new, |state| state.drain_pending_completions()) + } + + fn drain_all_pending_updates(&self) -> Vec<(ChannelId, u64, Vec)> { + let mut completed_updates = Vec::new(); + for (channel_id, state) in self.latest_monitors.lock().unwrap().iter_mut() { + for (monitor_id, data) in state.drain_pending_completions() { + completed_updates.push((*channel_id, monitor_id, data)); + } + } + completed_updates + } + + fn take_pending_update( + &self, channel_id: &ChannelId, selector: MonitorUpdateSelector, + ) -> Option<(u64, Vec)> { + self.latest_monitors + .lock() + .unwrap() + .get_mut(channel_id) + .and_then(|state| state.take_pending_completion(selector)) + } +} +impl chainmonitor::Persist for HarnessPersister { + fn persist_new_channel( + &self, _monitor_name: lightning::util::persist::MonitorName, + data: &channelmonitor::ChannelMonitor, + ) -> chain::ChannelMonitorUpdateStatus { + let status = self.update_ret.lock().unwrap().clone(); + let monitor_id = data.get_latest_update_id(); + let serialized_monitor = serialize_monitor(data); + self.track_monitor_update(data.channel_id(), monitor_id, serialized_monitor, status, true); + status + } + + fn update_persisted_channel( + &self, _monitor_name: lightning::util::persist::MonitorName, + update: Option<&channelmonitor::ChannelMonitorUpdate>, + data: &channelmonitor::ChannelMonitor, + ) -> chain::ChannelMonitorUpdateStatus { + let status = self.update_ret.lock().unwrap().clone(); + let monitor_id = update.map_or_else(|| data.get_latest_update_id(), |upd| upd.update_id); + let serialized_monitor = serialize_monitor(data); + self.track_monitor_update( + data.channel_id(), + monitor_id, + serialized_monitor, + status, + // `None` normally comes from chain-sync or archive writes, which need no completion + // callback. `update_channel_internal` can also use `None` after `update_monitor` + // fails, but this harness does not model that error-recovery path. + update.is_some(), + ); + status + } + + fn archive_persisted_channel(&self, _monitor_name: lightning::util::persist::MonitorName) {} +} + +type TestChainMonitor = chainmonitor::ChainMonitor< + TestChannelSigner, + Arc, + Arc, + Arc, + Arc, + Arc, + Arc, +>; + struct KeyProvider { node_secret: SecretKey, rand_bytes_id: atomic::AtomicU32, @@ -654,12 +810,14 @@ struct HarnessNode<'a> { node_id: u8, node: ChanMan<'a>, monitor: Arc, + persister: Arc, keys_manager: Arc, logger: Arc, broadcaster: Arc, fee_estimator: Arc, wallet: TestWalletSource, persistence_style: ChannelMonitorUpdateStatus, + deferred: bool, serialized_manager: Vec, height: u32, last_htlc_clear_fee: u32, @@ -674,35 +832,42 @@ impl<'a> std::ops::Deref for HarnessNode<'a> { } impl<'a> HarnessNode<'a> { - fn build_loggers( + fn build_logger( node_id: u8, out: &Out, - ) -> (Arc, Arc) { - let raw_logger = Arc::new(test_logger::TestLogger::new(node_id.to_string(), out.clone())); - let logger_for_monitor: Arc = raw_logger.clone(); - let logger: Arc = raw_logger; - (logger_for_monitor, logger) + ) -> Arc { + Arc::new(test_logger::TestLogger::new(node_id.to_string(), out.clone())) + } + + fn build_persister(persistence_style: ChannelMonitorUpdateStatus) -> Arc { + Arc::new(HarnessPersister { + update_ret: Mutex::new(persistence_style), + latest_monitors: Mutex::new(new_hash_map()), + }) } fn build_chain_monitor( broadcaster: &Arc, fee_estimator: &Arc, - keys_manager: &Arc, logger_for_monitor: Arc, - persistence_style: ChannelMonitorUpdateStatus, + keys_manager: &Arc, logger: Arc, + persister: &Arc, deferred: bool, ) -> Arc { - Arc::new(TestChainMonitor::new( + Arc::new(chainmonitor::ChainMonitor::new( + None, Arc::clone(broadcaster), - logger_for_monitor, + logger, Arc::clone(fee_estimator), - Arc::new(TestPersister { update_ret: Mutex::new(persistence_style) }), + Arc::clone(persister), Arc::clone(keys_manager), + keys_manager.get_peer_storage_key(), + deferred, )) } fn new( node_id: u8, wallet: TestWalletSource, fee_estimator: Arc, broadcaster: Arc, persistence_style: ChannelMonitorUpdateStatus, - out: &Out, router: &'a FuzzRouter, chan_type: ChanType, + deferred: bool, out: &Out, router: &'a FuzzRouter, chan_type: ChanType, ) -> Self { - let (logger_for_monitor, logger) = Self::build_loggers(node_id, out); + let logger = Self::build_logger(node_id, out); let node_secret = SecretKey::from_slice(&[ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, node_id, @@ -713,12 +878,14 @@ impl<'a> HarnessNode<'a> { rand_bytes_id: atomic::AtomicU32::new(0), enforcement_states: Mutex::new(new_hash_map()), }); + let persister = Self::build_persister(persistence_style); let monitor = Self::build_chain_monitor( &broadcaster, &fee_estimator, &keys_manager, - logger_for_monitor, - persistence_style, + Arc::clone(&logger), + &persister, + deferred, ); let network = Network::Bitcoin; let best_block_timestamp = genesis_block(network).header.time; @@ -741,12 +908,14 @@ impl<'a> HarnessNode<'a> { node_id, node, monitor, + persister, keys_manager, logger, broadcaster, fee_estimator, wallet, persistence_style, + deferred, serialized_manager: Vec::new(), height: 0, last_htlc_clear_fee: 253, @@ -754,67 +923,34 @@ impl<'a> HarnessNode<'a> { } fn set_persistence_style(&mut self, style: ChannelMonitorUpdateStatus) { + // Store the style for the next reload. The active persister is intentionally not changed + // in place. self.persistence_style = style; } - fn complete_all_monitor_updates(&self, chan_id: &ChannelId) { - if let Some(state) = self.monitor.latest_monitors.lock().unwrap().get_mut(chan_id) { - assert!( - state.pending_monitors.windows(2).all(|pair| pair[0].0 < pair[1].0), - "updates should be sorted by id" - ); - for (id, data) in state.pending_monitors.drain(..) { - self.monitor.chain_monitor.channel_monitor_updated(*chan_id, id).unwrap(); - if id > state.persisted_monitor_id { - state.persisted_monitor_id = id; - state.persisted_monitor = data; - } - } + fn finish_monitor_update(&self, chan_id: ChannelId, monitor_id: u64, data: Vec) { + self.monitor.channel_monitor_updated(chan_id, monitor_id).unwrap(); + self.persister.mark_update_completed(chan_id, monitor_id, data); + } + + fn complete_all_monitor_updates(&self, chan_id: &ChannelId) -> bool { + let completed_updates = self.persister.drain_pending_updates(chan_id); + let completed_any = !completed_updates.is_empty(); + for (monitor_id, data) in completed_updates { + self.finish_monitor_update(*chan_id, monitor_id, data); } + completed_any } fn complete_all_pending_monitor_updates(&self) { - for (channel_id, state) in self.monitor.latest_monitors.lock().unwrap().iter_mut() { - for (id, data) in state.pending_monitors.drain(..) { - self.monitor.chain_monitor.channel_monitor_updated(*channel_id, id).unwrap(); - if id >= state.persisted_monitor_id { - state.persisted_monitor_id = id; - state.persisted_monitor = data; - } - } + for (channel_id, monitor_id, data) in self.persister.drain_all_pending_updates() { + self.finish_monitor_update(channel_id, monitor_id, data); } } fn complete_monitor_update(&self, chan_id: &ChannelId, selector: MonitorUpdateSelector) { - if let Some(state) = self.monitor.latest_monitors.lock().unwrap().get_mut(chan_id) { - assert!( - state.pending_monitors.windows(2).all(|pair| pair[0].0 < pair[1].0), - "updates should be sorted by id" - ); - let update = match selector { - MonitorUpdateSelector::First => { - if state.pending_monitors.is_empty() { - None - } else { - Some(state.pending_monitors.remove(0)) - } - }, - MonitorUpdateSelector::Second => { - if state.pending_monitors.len() > 1 { - Some(state.pending_monitors.remove(1)) - } else { - None - } - }, - MonitorUpdateSelector::Last => state.pending_monitors.pop(), - }; - if let Some((id, data)) = update { - self.monitor.chain_monitor.channel_monitor_updated(*chan_id, id).unwrap(); - if id > state.persisted_monitor_id { - state.persisted_monitor_id = id; - state.persisted_monitor = data; - } - } + if let Some((monitor_id, data)) = self.persister.take_pending_update(chan_id, selector) { + self.finish_monitor_update(*chan_id, monitor_id, data); } } @@ -836,9 +972,30 @@ impl<'a> HarnessNode<'a> { } } - fn refresh_serialized_manager(&mut self) { + fn checkpoint_manager_persistence(&mut self) -> bool { if self.node.get_and_clear_needs_persistence() { + let pending_monitor_writes = self.monitor.pending_operation_count(); self.serialized_manager = self.node.encode(); + if self.deferred { + self.monitor.flush(pending_monitor_writes, &self.logger); + } else { + assert_eq!(pending_monitor_writes, 0); + } + true + } else { + assert_eq!(self.monitor.pending_operation_count(), 0); + false + } + } + + fn force_checkpoint_manager_persistence(&mut self) { + let pending_monitor_writes = self.monitor.pending_operation_count(); + self.serialized_manager = self.node.encode(); + self.node.get_and_clear_needs_persistence(); + if self.deferred { + self.monitor.flush(pending_monitor_writes, &self.logger); + } else { + assert_eq!(pending_monitor_writes, 0); } } @@ -942,50 +1099,38 @@ impl<'a> HarnessNode<'a> { fn reload( &mut self, use_old_mons: u8, out: &Out, router: &'a FuzzRouter, chan_type: ChanType, ) { - let (logger_for_monitor, logger) = Self::build_loggers(self.node_id, out); + let logger = Self::build_logger(self.node_id, out); + let persister = Self::build_persister(self.persistence_style); let chain_monitor = Self::build_chain_monitor( &self.broadcaster, &self.fee_estimator, &self.keys_manager, - logger_for_monitor, - ChannelMonitorUpdateStatus::Completed, + Arc::clone(&logger), + &persister, + self.deferred, ); let mut monitors = new_hash_map(); let mut use_old_mons = use_old_mons; { - let mut old_monitors = self.monitor.latest_monitors.lock().unwrap(); + let mut old_monitors = self.persister.latest_monitors.lock().unwrap(); for (channel_id, mut prev_state) in old_monitors.drain() { - let (mon_id, serialized_mon) = if use_old_mons % 3 == 0 { - // Reload with the oldest `ChannelMonitor` (the one that we already told - // `ChannelManager` we finished persisting). - (prev_state.persisted_monitor_id, prev_state.persisted_monitor) - } else if use_old_mons % 3 == 1 { - // Reload with the second-oldest `ChannelMonitor`. - let old_mon = (prev_state.persisted_monitor_id, prev_state.persisted_monitor); - prev_state.pending_monitors.drain(..).next().unwrap_or(old_mon) - } else { - // Reload with the newest `ChannelMonitor`. - let old_mon = (prev_state.persisted_monitor_id, prev_state.persisted_monitor); - prev_state.pending_monitors.pop().unwrap_or(old_mon) + let selector = match use_old_mons % 3 { + 0 => MonitorReloadSelector::Persisted, + 1 => MonitorReloadSelector::FirstPending, + _ => MonitorReloadSelector::LastPending, }; - // Use a different value of `use_old_mons` if we have another monitor - // (only for node B) by shifting `use_old_mons` one in base-3. + prev_state.select_monitor_for_reload(selector); + // Use a different trit for each monitor so one restart byte can vary the stale + // monitor depth across multiple monitors for the node. use_old_mons /= 3; let mon = <(BlockLocator, ChannelMonitor)>::read( - &mut &serialized_mon[..], + &mut &prev_state.persisted_monitor[..], (&*self.keys_manager, &*self.keys_manager), ) .expect("Failed to read monitor"); monitors.insert(channel_id, mon.1); - // Update the latest `ChannelMonitor` state to match what we just told LDK. - prev_state.persisted_monitor = serialized_mon; - prev_state.persisted_monitor_id = mon_id; - // Wipe any `ChannelMonitor`s which we never told LDK we finished persisting, - // considering them discarded. LDK should replay these for us as they're stored in - // the `ChannelManager`. - prev_state.pending_monitors.clear(); - chain_monitor.latest_monitors.lock().unwrap().insert(channel_id, prev_state); + persister.latest_monitors.lock().unwrap().insert(channel_id, prev_state); } } let mut monitor_refs = new_hash_map(); @@ -1009,19 +1154,32 @@ impl<'a> HarnessNode<'a> { let manager = <(BlockLocator, ChanMan)>::read(&mut &self.serialized_manager[..], read_args) .expect("Failed to read manager"); + let expected_status = if self.deferred { + ChannelMonitorUpdateStatus::InProgress + } else { + self.persistence_style + }; for (channel_id, mon) in monitors.drain() { - assert_eq!( - chain_monitor.chain_monitor.watch_channel(channel_id, mon), - Ok(ChannelMonitorUpdateStatus::Completed) - ); + assert_eq!(chain_monitor.watch_channel(channel_id, mon), Ok(expected_status)); } - *chain_monitor.persister.update_ret.lock().unwrap() = self.persistence_style; self.node = manager.1; self.monitor = chain_monitor; + self.persister = persister; self.logger = logger; + // In deferred mode, the startup watch_channel registrations above queue monitor operations + // even if the reloaded ChannelManager does not need persistence. Always checkpoint here so + // those registrations can be flushed against the manager snapshot they belong to. + self.force_checkpoint_manager_persistence(); } } +#[derive(Copy, Clone)] +enum MonitorReloadSelector { + Persisted, + FirstPending, + LastPending, +} + #[derive(Copy, Clone)] enum MonitorUpdateSelector { First, @@ -1233,11 +1391,13 @@ impl PeerLink { || (self.node_a == node_b && self.node_b == node_a) } - fn complete_all_monitor_updates(&self, nodes: &[HarnessNode<'_>; 3]) { + fn complete_all_monitor_updates(&self, nodes: &[HarnessNode<'_>; 3]) -> bool { + let mut completed_updates = false; for id in &self.channel_ids { - nodes[self.node_a].complete_all_monitor_updates(id); - nodes[self.node_b].complete_all_monitor_updates(id); + completed_updates |= nodes[self.node_a].complete_all_monitor_updates(id); + completed_updates |= nodes[self.node_b].complete_all_monitor_updates(id); } + completed_updates } fn complete_monitor_updates_for_node( @@ -1808,9 +1968,12 @@ fn connect_peers(source: &ChanMan<'_>, dest: &ChanMan<'_>) { } fn make_channel( - source: &HarnessNode<'_>, dest: &HarnessNode<'_>, chan_id: i32, trusted_open: bool, - trusted_accept: bool, chain_state: &mut ChainState, + nodes: &mut [HarnessNode<'_>; 3], source_idx: usize, dest_idx: usize, chan_id: i32, + trusted_open: bool, trusted_accept: bool, chain_state: &mut ChainState, ) { + assert!(source_idx < dest_idx); + let (left, right) = nodes.split_at_mut(dest_idx); + let (source, dest) = (&mut left[source_idx], &mut right[0]); if trusted_open { source .create_channel_to_trusted_peer_0reserve( @@ -1921,7 +2084,8 @@ fn make_channel( } }; dest.handle_funding_created(source.get_our_node_id(), &funding_created); - // Complete any pending monitor updates for dest after watch_channel. + dest.checkpoint_manager_persistence(); + // Complete any monitor persistence callbacks made available for dest after watch_channel. dest.complete_all_pending_monitor_updates(); let (funding_signed, channel_id) = { @@ -1942,7 +2106,8 @@ fn make_channel( } source.handle_funding_signed(dest.get_our_node_id(), &funding_signed); - // Complete any pending monitor updates for source after watch_channel. + source.checkpoint_manager_persistence(); + // Complete any monitor persistence callbacks made available for source after watch_channel. source.complete_all_pending_monitor_updates(); let events = source.get_and_clear_pending_events(); @@ -2014,6 +2179,11 @@ impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> { ChannelMonitorUpdateStatus::Completed }, ]; + let deferred = [ + config_byte & 0b0010_0000 != 0, + config_byte & 0b0100_0000 != 0, + config_byte & 0b1000_0000 != 0, + ]; let wallet_a = TestWalletSource::new(SecretKey::from_slice(&[1; 32]).unwrap()); let wallet_b = TestWalletSource::new(SecretKey::from_slice(&[2; 32]).unwrap()); @@ -2051,6 +2221,7 @@ impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> { Arc::clone(&fee_est_a), Arc::clone(&broadcast_a), persistence_styles[0], + deferred[0], &out, router, chan_type, @@ -2061,6 +2232,7 @@ impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> { Arc::clone(&fee_est_b), Arc::clone(&broadcast_b), persistence_styles[1], + deferred[1], &out, router, chan_type, @@ -2071,6 +2243,7 @@ impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> { Arc::clone(&fee_est_c), Arc::clone(&broadcast_c), persistence_styles[2], + deferred[2], &out, router, chan_type, @@ -2088,14 +2261,14 @@ impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> { // channel gets its own txid and funding outpoint. // A-B: channel 2 A and B have 0-reserve (trusted open + trusted accept), // channel 3 A has 0-reserve (trusted accept). - make_channel(&nodes[0], &nodes[1], 1, false, false, &mut chain_state); - make_channel(&nodes[0], &nodes[1], 2, true, true, &mut chain_state); - make_channel(&nodes[0], &nodes[1], 3, false, true, &mut chain_state); + make_channel(&mut nodes, 0, 1, 1, false, false, &mut chain_state); + make_channel(&mut nodes, 0, 1, 2, true, true, &mut chain_state); + make_channel(&mut nodes, 0, 1, 3, false, true, &mut chain_state); // B-C: channel 4 B has 0-reserve (via trusted accept), // channel 5 C has 0-reserve (via trusted open). - make_channel(&nodes[1], &nodes[2], 4, false, true, &mut chain_state); - make_channel(&nodes[1], &nodes[2], 5, true, false, &mut chain_state); - make_channel(&nodes[1], &nodes[2], 6, false, false, &mut chain_state); + make_channel(&mut nodes, 1, 2, 4, false, true, &mut chain_state); + make_channel(&mut nodes, 1, 2, 5, true, false, &mut chain_state); + make_channel(&mut nodes, 1, 2, 6, false, false, &mut chain_state); // Wipe the transactions-broadcasted set to make sure we don't broadcast // any transactions during normal operation after setup. @@ -2122,7 +2295,7 @@ impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> { }; for node in &mut nodes { - node.serialized_manager = node.encode(); + node.force_checkpoint_manager_persistence(); } Self { @@ -2542,7 +2715,7 @@ impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> { // claim/fail handling per event batch. let mut claim_set = new_hash_map(); let mut events = nodes[node_idx].get_and_clear_pending_events(); - let had_events = !events.is_empty(); + let mut had_events = !events.is_empty(); for event in events.drain(..) { match event { events::Event::PaymentClaimable { payment_hash, .. } => { @@ -2598,6 +2771,7 @@ impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> { } while nodes[node_idx].needs_pending_htlc_processing() { nodes[node_idx].process_pending_htlc_forwards(); + had_events = true; } had_events } @@ -2620,9 +2794,10 @@ impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> { "It may take may iterations to settle the state, but it should not take forever" ); } - // Next, make sure no monitor updates are pending. - self.ab_link.complete_all_monitor_updates(&self.nodes); - self.bc_link.complete_all_monitor_updates(&self.nodes); + let mut made_progress = self.checkpoint_manager_persistences(); + // Next, make sure no monitor completion callbacks are pending. + made_progress |= self.ab_link.complete_all_monitor_updates(&self.nodes); + made_progress |= self.bc_link.complete_all_monitor_updates(&self.nodes); // Then, make sure any current forwards make their way to their destination. if self.process_msg_events(0, false, ProcessMessages::AllMessages) { last_pass_no_updates = false; @@ -2649,6 +2824,10 @@ impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> { last_pass_no_updates = false; continue; } + if made_progress { + last_pass_no_updates = false; + continue; + } if last_pass_no_updates { // In some cases, we may generate a message to send in // `process_msg_events`, but block sending until @@ -2747,19 +2926,22 @@ impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> { self.nodes[2].record_last_htlc_clear_fee(); } - fn refresh_serialized_managers(&mut self) { + fn checkpoint_manager_persistences(&mut self) -> bool { + let mut made_progress = false; for node in &mut self.nodes { - node.refresh_serialized_manager(); + made_progress |= node.checkpoint_manager_persistence(); } + made_progress } } #[inline] pub fn do_test(data: &[u8], out: Out) { let router = FuzzRouter {}; - // Read initial monitor styles and channel type from fuzz input byte 0: + // Read initial monitor styles, channel type, and deferred write mode from fuzz input byte 0: // bits 0-2: monitor styles (1 bit per node) // bits 3-4: channel type (0=Legacy, 1=KeyedAnchors, 2=ZeroFeeCommitments) + // bits 5-7: deferred monitor write mode (1 bit per node) let config_byte = if !data.is_empty() { data[0] } else { 0 }; let mut harness = Harness::new(config_byte, out, &router); let mut read_pos = 1; // First byte was consumed for initial config. @@ -3002,18 +3184,18 @@ pub fn do_test(data: &[u8], out: Out) { }, 0xb0 | 0xb1 | 0xb2 => { - // Restart node A, picking among the in-flight `ChannelMonitor`s to use based on - // the value of `v` we're matching. + // Restart node A, picking among persisted and in-flight `ChannelMonitor` + // candidates based on the value of `v` we're matching. harness.restart_node(0, v, &router); }, 0xb3..=0xbb => { - // Restart node B, picking among the in-flight `ChannelMonitor`s to use based on - // the value of `v` we're matching. + // Restart node B, picking among persisted and in-flight `ChannelMonitor` + // candidates based on the value of `v` we're matching. harness.restart_node(1, v, &router); }, 0xbc | 0xbd | 0xbe => { - // Restart node C, picking among the in-flight `ChannelMonitor`s to use based on - // the value of `v` we're matching. + // Restart node C, picking among persisted and in-flight `ChannelMonitor` + // candidates based on the value of `v` we're matching. harness.restart_node(2, v, &router); }, @@ -3171,7 +3353,7 @@ pub fn do_test(data: &[u8], out: Out) { _ => break 'fuzz_loop, } - harness.refresh_serialized_managers(); + harness.checkpoint_manager_persistences(); } harness.finish(); }