From 2be06375aba192bde3c34b17a675f6c80777e066 Mon Sep 17 00:00:00 2001 From: Daniel Noland Date: Fri, 15 May 2026 11:14:03 -0600 Subject: [PATCH 1/2] feat(concurrency): parking_lot-shaped sync facade with workspace sweep Introduce a `parking_lot`-shaped facade over `Mutex` / `RwLock` and rewire the data-path crates through it in a single atomic step. Three pieces are bundled here because they can't be split without leaving the workspace in a state where the facade and its consumers disagree on whether `Mutex::lock` returns `LockResult` or a naked guard. Each piece on its own is internally coherent; together they form the smallest unit that builds clean. ## std::sync wrapper (poison-as-panic) New `concurrency::sync` module exposing `Mutex` / `RwLock` with a `parking_lot`-shaped naked-guard surface backed by a poison-as-panic wrapper around `std::sync`. Workspace policy treats a crashed thread as a crashed process: poison surfacing inside a lock acquire panics through `concurrency::sync::poisoned()` rather than handing back a possibly-torn `LockResult`. * One cold `#[inline(never)] fn poisoned() -> !` keeps the hot path branch-free; the wrapper is a single `match` per acquire. * `RwLock::upgradable_read` is implemented as exclusive `write()` for now -- sound but lossy. Documented at the backend module level. * `Arc`, `Weak`, atomics, `Condvar`, `mpsc`, `OnceLock`, etc. are re-exported from `std::sync` unchanged. ## parking_lot as the production default Adds a `parking_lot` Cargo feature (enabled by default) and `concurrency/src/sync/parking_lot_backend.rs` -- a zero-cost re-export of `parking_lot::{Mutex, RwLock}` plus the std re-exports for everything `parking_lot` doesn't ship. * Backend routing in `concurrency/src/sync/mod.rs` picks `parking_lot_backend` when `feature = "parking_lot"` is on, falls back to `std_backend` otherwise. * `_strict_provenance` feature overrides `parking_lot` and routes through `std_backend`: `parking_lot_core::word_lock` uses integer-to-pointer casts that `-Zmiri-strict-provenance` rejects, and the CI miri job (`--features=_strict_provenance`) exercises the fallback slot, which needs `std::sync` underneath. * Workspace `Cargo.toml` gets a `tokio` parking_lot override entry so the `tokio` parking_lot feature is enabled workspace-wide. ## Workspace transition Sweep the data-path crates (`flow-entry`, `flow-filter`, `nat`, `net`, `pipeline`, `routing`, `stats`) to consume `dataplane_concurrency::sync::{Arc, Mutex, RwLock, atomic, ...}` instead of `std::sync::*` / `parking_lot::*` directly. * Call sites drop the `.unwrap()` / `.expect("poisoned")` noise the `LockResult`-shaped surface forced, since the facade returns naked guards. Same shape as the existing `parking_lot::Mutex` consumers. * `concurrency_mode(std)` annotations at QSBR-touching test sites in `nat::stateful::apalloc::test_alloc`, `routing::fib::test`, and `flow-entry::flow_table::table::tests` keep the existing std-only feature gating intact. * `nat::portfw::flow_state::get_packet_port_fw_state` simplification: the explicit `drop(guard)` dance before `debug!` calls is removed (per the recorded author intent that it isn't worth the code complexity); the guard now drops implicitly at the end of scope. Mechanical-by-design -- the sweep is mostly `use` rewrites and `.unwrap()` removal. The semantic change is one step: every consumer now gets the facade-routed primitives, so flipping the loom/shuttle features at the top of the workspace becomes meaningful (subsequent PRs add the shuttle and loom backends). Signed-off-by: Daniel Noland --- Cargo.lock | 6 +- Cargo.toml | 19 +- common/Cargo.toml | 3 +- common/src/cliprovider.rs | 31 +- concurrency/Cargo.toml | 3 + concurrency/src/lib.rs | 21 +- concurrency/src/quiescent.rs | 26 +- concurrency/src/slot.rs | 149 +++++++++- concurrency/src/sync/mod.rs | 63 ++++ concurrency/src/sync/parking_lot_backend.rs | 22 ++ concurrency/src/sync/std_backend.rs | 306 ++++++++++++++++++++ concurrency/tests/quiescent_properties.rs | 12 +- concurrency/tests/quiescent_protocol.rs | 29 +- flow-entry/src/flow_table/display.rs | 2 +- flow-entry/src/flow_table/nf_lookup.rs | 2 +- flow-entry/src/flow_table/table.rs | 95 +++--- flow-filter/Cargo.toml | 1 + flow-filter/src/lib.rs | 14 +- flow-filter/src/tests.rs | 5 +- nat/src/common/mod.rs | 9 +- nat/src/icmp_handler/nf.rs | 4 +- nat/src/portfw/flow_state.rs | 20 +- nat/src/portfw/icmp_handling.rs | 2 +- nat/src/portfw/nf.rs | 12 +- nat/src/portfw/portfwtable/objects.rs | 18 +- nat/src/portfw/portfwtable/portforwarder.rs | 4 +- nat/src/portfw/test.rs | 6 +- nat/src/stateful/allocator_writer.rs | 16 +- nat/src/stateful/apalloc/alloc.rs | 21 +- nat/src/stateful/apalloc/display.rs | 2 +- nat/src/stateful/apalloc/mod.rs | 2 +- nat/src/stateful/apalloc/port_alloc.rs | 25 +- nat/src/stateful/apalloc/test_alloc.rs | 8 +- nat/src/stateful/flows.rs | 14 +- nat/src/stateful/icmp_handling.rs | 2 +- nat/src/stateful/nf.rs | 16 +- nat/src/stateful/test.rs | 8 +- net/src/flows/atomic_instant.rs | 2 +- net/src/flows/display.rs | 30 +- net/src/flows/flow_info.rs | 26 +- net/src/packet/stats.rs | 2 +- pipeline/Cargo.toml | 2 +- pipeline/src/pipeline.rs | 9 +- pipeline/src/sample_nfs.rs | 11 +- pipeline/src/static_nf.rs | 2 +- routing/src/atable/resolver.rs | 8 +- routing/src/fib/fibtable.rs | 2 +- routing/src/fib/test.rs | 21 +- stats/src/vpc_stats.rs | 26 +- 49 files changed, 817 insertions(+), 322 deletions(-) create mode 100644 concurrency/src/sync/mod.rs create mode 100644 concurrency/src/sync/parking_lot_backend.rs create mode 100644 concurrency/src/sync/std_backend.rs diff --git a/Cargo.lock b/Cargo.lock index 586ce4a4bc..8eefb212fb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1225,6 +1225,7 @@ name = "dataplane-common" version = "0.21.0" dependencies = [ "arc-swap", + "dataplane-concurrency", "left-right 0.11.7 (git+https://github.com/githedgehog/left-right.git?branch=fredi%2Ffix-writehandle-drop)", ] @@ -1236,6 +1237,7 @@ dependencies = [ "bolero", "dataplane-concurrency-macros", "loom", + "parking_lot", "shuttle", "static_assertions", ] @@ -1332,6 +1334,7 @@ name = "dataplane-flow-filter" version = "0.21.0" dependencies = [ "dataplane-common", + "dataplane-concurrency", "dataplane-config", "dataplane-lpm", "dataplane-net", @@ -1588,7 +1591,7 @@ dependencies = [ name = "dataplane-pipeline" version = "0.21.0" dependencies = [ - "arc-swap", + "dataplane-concurrency", "dataplane-id", "dataplane-net", "dataplane-tracectl", @@ -5629,6 +5632,7 @@ dependencies = [ "bytes", "libc 0.2.186", "mio", + "parking_lot", "pin-project-lite", "signal-hook-registry", "socket2", diff --git a/Cargo.toml b/Cargo.toml index 975e34099e..f663e924db 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -53,6 +53,9 @@ repository = "https://github.com/githedgehog/dataplane/" # # 1. correctly documenting what each package actually depends on, # 2. allowing builds under different environments (e.g. cross-compilation, wasm, miri, and so on). +# +# Exceptions: see `tokio = { ..., features = ["parking_lot"] }` below. Each exception has a comment +# justifying why it is workspace-wide. # Internal args = { path = "./args", package = "dataplane-args", features = [] } @@ -110,6 +113,7 @@ chrono = { version = "0.4.44", default-features = false, features = [] } clap = { version = "4.6.1", default-features = true, features = [] } color-eyre = { version = "0.6.5", default-features = false, features = [] } colored = { version = "3.1.1", default-features = false, features = [] } +crossbeam-utils = { version = "0.8.21", default-features = false, features = [] } ctrlc = { version = "3.5.2", default-features = false, features = [] } dashmap = { version = "6.1.0", default-features = false, features = [] } derive_builder = { version = "0.20.2", default-features = false, features = [] } @@ -158,6 +162,7 @@ pci-ids = { version = "0.2.6", default-features = false, features = [] } prefix-trie = { version = "0.8.4", default-features = false, features = [] } pretty_assertions = { version = "1.4.1", default-features = false, features = [] } priority-queue = { version = "2.7.0", default-features = false, features = [] } +proc-macro-crate = { version = "3.5.0", default-features = false, features = [] } proc-macro2 = { version = "1.0.106", default-features = false, features = [] } procfs = { version = "0.18.0", default-features = false, features = [] } pyroscope = { version = "2.0.3", default-features = false, features = [] } @@ -183,7 +188,13 @@ strum_macros = { version = "0.28.0", default-features = false, features = [] } syn = { version = "2.0.117", default-features = false, features = [] } thiserror = { version = "2.0.18", default-features = false, features = [] } thread_local = { version = "1.1.9", default-features = false, features = [] } -tokio = { version = "1.52.3", default-features = false, features = [] } +# Exception to the "no workspace-wide features" rule above. `tokio/parking_lot` is +# not an additive feature in the usual sense -- it picks the lock implementation +# tokio uses internally for its runtime. Production builds already pull +# parking_lot in via `concurrency::sync`, so the only thing scoping this +# per-crate would buy is divergent runtime behaviour between test binaries +# and the real dataplane. Keep it global. +tokio = { version = "1.52.3", default-features = false, features = ["parking_lot"] } tokio-util = { version = "0.7.18", default-features = false, features = [] } tonic = { version = "0.14.6", default-features = false, features = [] } tracing = { version = "0.1.44", default-features = false, features = [] } @@ -247,6 +258,12 @@ package = "dataplane-cli" miri = true wasm = false # split +[workspace.metadata.package.concurrency] +package = "dataplane-concurrency" +miri = true # must ALWAYS work +# NOTE: concurrency should likely be dependency gated as `cfg(unix)` or `cfg(not(target_arch = "wasm32"))` +wasm = false # hopeless + pointless + [workspace.metadata.package.dataplane] package = "dataplane" miri = false # hopeless + pointless diff --git a/common/Cargo.toml b/common/Cargo.toml index 8224c9ae0d..f52baf63b8 100644 --- a/common/Cargo.toml +++ b/common/Cargo.toml @@ -7,5 +7,6 @@ publish.workspace = true repository.workspace = true [dependencies] -left-right = { workspace = true } arc-swap = { workspace = true } +concurrency = { workspace = true } +left-right = { workspace = true } diff --git a/common/src/cliprovider.rs b/common/src/cliprovider.rs index d91e50841b..8389df2b24 100644 --- a/common/src/cliprovider.rs +++ b/common/src/cliprovider.rs @@ -4,9 +4,10 @@ //! A trait for a type that can provide CLI data use arc_swap::{ArcSwap, ArcSwapOption}; +use concurrency::slot::{Slot, SlotOption}; +use concurrency::sync::Arc; use left_right::ReadHandle; use std::fmt::Display; -use std::sync::Arc; /// A trait for types that can produce contents for the cli pub trait CliDataProvider { @@ -72,9 +73,35 @@ where T: CliDataProvider, { fn provide(&self) -> String { + // No type annotation on `p`: `arc_swap::ArcSwapOption` always + // yields `std::sync::Arc`, which is not the same type as + // `concurrency::sync::Arc` under the `loom` backend. + // Letting inference do its job keeps this `impl` compiling on + // every backend. self.load() .as_ref() - .map(|p: &Arc| p.provide()) + .map(|p| p.provide()) + .unwrap_or_else(|| "(none)".to_string()) + } +} + +impl CliDataProvider for Slot +where + T: CliDataProvider, +{ + fn provide(&self) -> String { + self.load_full().provide() + } +} + +impl CliDataProvider for SlotOption +where + T: CliDataProvider, +{ + fn provide(&self) -> String { + self.load_full() + .as_ref() + .map(|p| p.provide()) .unwrap_or_else(|| "(none)".to_string()) } } diff --git a/concurrency/Cargo.toml b/concurrency/Cargo.toml index fcec53af33..0661ae692c 100644 --- a/concurrency/Cargo.toml +++ b/concurrency/Cargo.toml @@ -6,7 +6,9 @@ publish.workspace = true version.workspace = true [features] +default = ["parking_lot"] loom = ["dep:loom", "concurrency-macros/loom"] +parking_lot = ["dep:parking_lot"] shuttle = ["dep:shuttle", "concurrency-macros/shuttle"] silence_clippy = ["concurrency-macros/silence_clippy"] # Do not manually enable this feature, let --all-features do it for you # Force the Mutex-based slot fallback even without a model checker. @@ -20,6 +22,7 @@ _strict_provenance = [] arc-swap = { workspace = true } concurrency-macros = { workspace = true, features = [] } loom = { workspace = true, optional = true, features = [] } +parking_lot = { workspace = true, optional = true, features = [] } shuttle = { workspace = true, optional = true, features = [] } static_assertions = { workspace = true } diff --git a/concurrency/src/lib.rs b/concurrency/src/lib.rs index f3546218bd..10415eb60a 100644 --- a/concurrency/src/lib.rs +++ b/concurrency/src/lib.rs @@ -13,23 +13,14 @@ #![allow(missing_docs)] pub mod macros; +pub mod sync; #[cfg(all(miri, any(feature = "shuttle", feature = "loom")))] compile_error!("miri does not meaningfully support 'loom' or 'shuttle'"); -#[cfg(not(any(feature = "loom", feature = "shuttle")))] -pub use std::sync; - #[cfg(not(any(feature = "loom", feature = "shuttle")))] pub use std::thread; -#[cfg(all( - feature = "loom", - not(feature = "shuttle"), - not(feature = "silence_clippy") -))] -pub use loom::sync; - #[cfg(all( feature = "loom", not(feature = "shuttle"), @@ -37,13 +28,6 @@ pub use loom::sync; ))] pub use loom::thread; -#[cfg(all( - feature = "shuttle", - not(feature = "loom"), - not(feature = "silence_clippy") -))] -pub use shuttle::sync; - #[cfg(all( feature = "shuttle", not(feature = "loom"), @@ -57,9 +41,6 @@ compile_error!("Cannot enable both 'loom' and 'shuttle' features at the same tim ////////////////////// // This is a workaround to silence clippy warnings when both loom and shuttle // features are enabled in the clippy checks which uses --all-features. -#[cfg(all(feature = "shuttle", feature = "loom", feature = "silence_clippy"))] -pub use std::sync; - #[cfg(all(feature = "shuttle", feature = "loom", feature = "silence_clippy"))] pub use std::thread; ////////////////////// diff --git a/concurrency/src/quiescent.rs b/concurrency/src/quiescent.rs index b09654603d..fd47a827d5 100644 --- a/concurrency/src/quiescent.rs +++ b/concurrency/src/quiescent.rs @@ -55,17 +55,29 @@ impl Domain { fn register(&self) -> Epoch { let epoch = Epoch::new(); - #[allow(clippy::expect_used)] // the mutex is poisoned only in unrecoverable error cases - self.active - .lock() - .expect("qsbr mutex poisoned") - .push(Arc::clone(&epoch.cell)); + let guard = self.active.lock(); + // Loom and shuttle still expose `LockResult`; PRs + // extending the poison-as-panic wrapper to those backends will + // drop this `.expect` (the default backend already returns a + // naked guard). + #[cfg(any(feature = "loom", feature = "shuttle"))] + #[allow(clippy::expect_used)] + // the mutex is poisoned only in unrecoverable error cases + let mut guard = guard.expect("qsbr mutex poisoned"); + #[cfg(not(any(feature = "loom", feature = "shuttle")))] + let mut guard = guard; + guard.push(Arc::clone(&epoch.cell)); epoch } fn min_observed(&self) -> Option { - #[allow(clippy::expect_used)] // the mutex is poisoned only in unrecoverable error cases - let mut active = self.active.lock().expect("qsbr mutex poisoned"); + let guard = self.active.lock(); + #[cfg(any(feature = "loom", feature = "shuttle"))] + #[allow(clippy::expect_used)] + // the mutex is poisoned only in unrecoverable error cases + let mut active = guard.expect("qsbr mutex poisoned"); + #[cfg(not(any(feature = "loom", feature = "shuttle")))] + let mut active = guard; let mut min = u64::MAX; let mut any_in_flight = false; active.retain(|cell| { diff --git a/concurrency/src/slot.rs b/concurrency/src/slot.rs index ac73763368..6123f864ab 100644 --- a/concurrency/src/slot.rs +++ b/concurrency/src/slot.rs @@ -3,15 +3,17 @@ //! Single-slot atomic publication. //! -//! In production this is `arc_swap::ArcSwap` -- lock-free read fast path, -//! which is what makes [`Subscriber::snapshot`] cheap on the data-plane. +//! In production these are `arc_swap::ArcSwap` / `ArcSwapOption` -- +//! lock-free read fast path, which is what makes +//! [`Subscriber::snapshot`] cheap on the data-plane. //! //! When the `loom` or `shuttle` feature is enabled (via the -//! `concurrency` crate) it falls back to `Mutex>` because neither -//! model checker sees `arc_swap`'s internals (hazard pointers + lower- -//! level atomics). The two implementations are observably equivalent -//! for the QSBR protocol -- atomic publish, atomic load -- which is all -//! the model checker needs to see. +//! `concurrency` crate) they fall back to `Mutex>` / +//! `Mutex>>` because neither model checker sees +//! `arc_swap`'s internals (hazard pointers + lower-level atomics). +//! The two implementations are observably equivalent for the QSBR +//! protocol -- atomic publish, atomic load -- which is all the model +//! checker needs to see. //! //! [`Subscriber::snapshot`]: crate::Subscriber::snapshot @@ -30,9 +32,15 @@ cfg_select! { Self(Mutex::new(Arc::new(value))) } + #[must_use] + pub fn new(value: Arc) -> Self { + Self(Mutex::new(value)) + } + pub fn load_full(&self) -> Arc { #[allow(clippy::expect_used)] // poisoned only in unrecoverable cases - Arc::clone(&self.0.lock().expect("slot mutex poisoned")) + let guard = self.0.lock().expect("slot mutex poisoned"); + Arc::clone(&*guard) } pub fn swap(&self, new: Arc) -> Arc { @@ -40,12 +48,64 @@ cfg_select! { let mut guard = self.0.lock().expect("slot mutex poisoned"); core::mem::replace(&mut *guard, new) } + + pub fn store(&self, new: Arc) { + #[allow(clippy::expect_used)] + let mut guard = self.0.lock().expect("slot mutex poisoned"); + *guard = new; + } + } + + /// Single-slot atomic publication of an optional value. + /// + /// Fallback implementation backed by `Mutex>>`. + pub struct SlotOption(Mutex>>); + + impl SlotOption { + #[must_use] + pub fn empty() -> Self { + Self(Mutex::new(None)) + } + + pub fn from_pointee>>(value: V) -> Self { + Self(Mutex::new(value.into().map(Arc::new))) + } + + #[must_use] + pub fn new(value: Option>) -> Self { + Self(Mutex::new(value)) + } + + pub fn load_full(&self) -> Option> { + #[allow(clippy::expect_used)] + let guard = self.0.lock().expect("slot mutex poisoned"); + guard.as_ref().map(Arc::clone) + } + + pub fn swap(&self, new: Option>) -> Option> { + #[allow(clippy::expect_used)] + let mut guard = self.0.lock().expect("slot mutex poisoned"); + core::mem::replace(&mut *guard, new) + } + + pub fn store(&self, new: Option>) { + #[allow(clippy::expect_used)] + let mut guard = self.0.lock().expect("slot mutex poisoned"); + *guard = new; + } + } + + impl Default for SlotOption { + fn default() -> Self { + Self::empty() + } } } _ => { use crate::sync::Arc; - use arc_swap::ArcSwap; + use arc_swap::{ArcSwap, ArcSwapOption}; + #[repr(transparent)] pub struct Slot(ArcSwap); impl Slot { @@ -54,6 +114,12 @@ cfg_select! { Self(ArcSwap::from_pointee(value)) } + #[inline] + #[must_use] + pub fn new(value: Arc) -> Self { + Self(ArcSwap::new(value)) + } + #[inline] pub fn load_full(&self) -> Arc { self.0.load_full() @@ -63,6 +129,71 @@ cfg_select! { pub fn swap(&self, new: Arc) -> Arc { self.0.swap(new) } + + #[inline] + pub fn store(&self, new: Arc) { + self.0.store(new); + } } + + /// Single-slot atomic publication of an optional value. + /// + /// Wraps `arc_swap::ArcSwapOption` in production. + #[repr(transparent)] + pub struct SlotOption(ArcSwapOption); + + impl SlotOption { + #[inline] + #[must_use] + pub fn empty() -> Self { + Self(ArcSwapOption::new(None)) + } + + #[inline] + pub fn from_pointee>>(value: V) -> Self { + Self(ArcSwapOption::from_pointee(value)) + } + + #[inline] + #[must_use] + pub fn new(value: Option>) -> Self { + Self(ArcSwapOption::new(value)) + } + + #[inline] + pub fn load_full(&self) -> Option> { + self.0.load_full() + } + + #[inline] + pub fn swap(&self, new: Option>) -> Option> { + self.0.swap(new) + } + + #[inline] + pub fn store(&self, new: Option>) { + self.0.store(new); + } + } + + impl Default for SlotOption { + fn default() -> Self { + Self::empty() + } + } + } +} + +use core::fmt; + +impl fmt::Debug for Slot { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Slot").finish_non_exhaustive() + } +} + +impl fmt::Debug for SlotOption { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("SlotOption").finish_non_exhaustive() } } diff --git a/concurrency/src/sync/mod.rs b/concurrency/src/sync/mod.rs new file mode 100644 index 0000000000..f3234a237d --- /dev/null +++ b/concurrency/src/sync/mod.rs @@ -0,0 +1,63 @@ +// SPDX-License-Identifier: Apache-2.0 +// Copyright Open Network Fabric Authors + +//! Backend-routed synchronization primitives. +//! +//! Exposes a `parking_lot`-shaped surface for `Mutex` / `RwLock` that +//! compiles unchanged across backends. +//! +//! Selection (in priority order): +//! +//! * `loom` / `shuttle` features: raw re-export of the model-checker's +//! `LockResult`-based primitives. Subsequent PRs wrap these too. +//! * `parking_lot` feature (default): zero-cost re-export of +//! `parking_lot`'s naked-guard locks; the production hot path. +//! * Otherwise: `std_backend` -- a thin poison-as-panic wrapper around +//! `std::sync`. Same surface as the `parking_lot` re-export, one +//! extra match on acquire. Lets `--no-default-features` builds +//! compile without depending on `parking_lot`. + +#[cfg(all( + not(any(feature = "loom", feature = "shuttle")), + feature = "parking_lot", +))] +mod parking_lot_backend; +#[cfg(all( + not(any(feature = "loom", feature = "shuttle")), + feature = "parking_lot", +))] +pub use parking_lot_backend::*; + +#[cfg(all( + not(any(feature = "loom", feature = "shuttle")), + not(feature = "parking_lot"), +))] +mod std_backend; +#[cfg(all( + not(any(feature = "loom", feature = "shuttle")), + not(feature = "parking_lot"), +))] +pub use std_backend::*; + +#[cfg(all( + feature = "loom", + not(feature = "shuttle"), + not(feature = "silence_clippy") +))] +pub use loom::sync::*; + +#[cfg(all( + feature = "shuttle", + not(feature = "loom"), + not(feature = "silence_clippy") +))] +pub use shuttle::sync::*; + +// Match the silence_clippy escape hatch in lib.rs: when both loom and +// shuttle are pulled in (under `--all-features`), route sync through +// `std` purely to keep clippy happy. The binary is never executed in +// that configuration. +#[cfg(all(feature = "shuttle", feature = "loom", feature = "silence_clippy"))] +mod std_backend; +#[cfg(all(feature = "shuttle", feature = "loom", feature = "silence_clippy"))] +pub use std_backend::*; diff --git a/concurrency/src/sync/parking_lot_backend.rs b/concurrency/src/sync/parking_lot_backend.rs new file mode 100644 index 0000000000..81b8b15421 --- /dev/null +++ b/concurrency/src/sync/parking_lot_backend.rs @@ -0,0 +1,22 @@ +// SPDX-License-Identifier: Apache-2.0 +// Copyright Open Network Fabric Authors + +//! Default-production backend: `parking_lot` locks layered on top of +//! `std::sync`. +//! +//! `parking_lot::{Mutex, RwLock}` already match the surface the rest +//! of the crate presents -- naked guards, no poison, fast contention +//! path. This module is a pure re-export so production builds pay no +//! wrapping cost. Everything that `parking_lot` doesn't ship +//! (`Arc`, `Weak`, `atomic`, `mpsc`, `Condvar`, `Once`, ...) comes +//! straight from `std::sync` so ordering semantics match a normal +//! release build. + +pub use parking_lot::{ + Mutex, MutexGuard, RwLock, RwLockReadGuard, RwLockUpgradableReadGuard, RwLockWriteGuard, +}; + +pub use std::sync::{ + Arc, Barrier, BarrierWaitResult, Condvar, LockResult, Once, OnceLock, OnceState, PoisonError, + TryLockError, TryLockResult, WaitTimeoutResult, Weak, atomic, mpsc, +}; diff --git a/concurrency/src/sync/std_backend.rs b/concurrency/src/sync/std_backend.rs new file mode 100644 index 0000000000..263cba6e07 --- /dev/null +++ b/concurrency/src/sync/std_backend.rs @@ -0,0 +1,306 @@ +// SPDX-License-Identifier: Apache-2.0 +// Copyright Open Network Fabric Authors + +//! Default backend: poison-as-panic wrapper around `std::sync`. +//! +//! `std::sync::{Mutex, RwLock}` return `LockResult` because they +//! poison on holder panic. This workspace treats poison as a fatal +//! invariant violation; the wrapper below strips `LockResult` and +//! panics, presenting a `parking_lot`-shaped naked-guard surface. +//! +//! One indirection on lock acquire/release (wrapper match + std poison +//! branch). Cold path only -- the fast path under contention is +//! unchanged. + +// Wrapping below panics on poison. clippy::panic is denied at the +// crate root; allow it locally for the cold poisoned() helper. +#![allow(clippy::panic)] + +use core::fmt; +use core::ops::{Deref, DerefMut}; +use std::sync as inner; + +pub use std::sync::{ + Arc, Barrier, BarrierWaitResult, Condvar, LockResult, Once, OnceLock, OnceState, PoisonError, + TryLockError, TryLockResult, WaitTimeoutResult, Weak, atomic, mpsc, +}; + +#[inline(never)] +#[cold] +fn poisoned() -> ! { + panic!( + "concurrency::sync lock was poisoned: a previous holder panicked while \ + holding the lock; propagating the failure" + ); +} + +// =============================== Mutex ==================================== + +/// Mutual exclusion primitive with a `parking_lot`-shaped surface. +/// +/// Returns guards directly (no `LockResult`); poison is treated as a +/// fatal invariant violation and panics. See module docs for rationale. +pub struct Mutex(inner::Mutex); + +/// RAII guard for [`Mutex`]. +#[must_use = "if unused the Mutex will immediately unlock"] +pub struct MutexGuard<'a, T: ?Sized + 'a>(inner::MutexGuard<'a, T>); + +impl Mutex { + #[inline] + pub const fn new(value: T) -> Self { + Self(inner::Mutex::new(value)) + } + + #[inline] + pub fn into_inner(self) -> T { + match self.0.into_inner() { + Ok(v) => v, + Err(_) => poisoned(), + } + } +} + +impl Mutex { + #[inline] + pub fn lock(&self) -> MutexGuard<'_, T> { + match self.0.lock() { + Ok(g) => MutexGuard(g), + Err(_) => poisoned(), + } + } + + #[inline] + pub fn try_lock(&self) -> Option> { + match self.0.try_lock() { + Ok(g) => Some(MutexGuard(g)), + Err(TryLockError::Poisoned(_)) => poisoned(), + Err(TryLockError::WouldBlock) => None, + } + } + + #[inline] + pub fn get_mut(&mut self) -> &mut T { + match self.0.get_mut() { + Ok(v) => v, + Err(_) => poisoned(), + } + } +} + +impl fmt::Debug for Mutex { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Mutex").finish_non_exhaustive() + } +} + +impl Default for Mutex { + fn default() -> Self { + Self::new(T::default()) + } +} + +impl From for Mutex { + fn from(value: T) -> Self { + Self::new(value) + } +} + +impl Deref for MutexGuard<'_, T> { + type Target = T; + #[inline] + fn deref(&self) -> &T { + &self.0 + } +} + +impl DerefMut for MutexGuard<'_, T> { + #[inline] + fn deref_mut(&mut self) -> &mut T { + &mut self.0 + } +} + +impl fmt::Debug for MutexGuard<'_, T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt::Debug::fmt(&**self, f) + } +} + +impl fmt::Display for MutexGuard<'_, T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt::Display::fmt(&**self, f) + } +} + +// =============================== RwLock =================================== + +/// Reader-writer lock with a `parking_lot`-shaped surface. +/// +/// `T: Sized` for parity with future model-checker backends, which +/// adopt the lowest common denominator across their inner types. +pub struct RwLock(inner::RwLock); + +/// Shared-reference guard for [`RwLock`]. +#[must_use = "if unused the RwLock will immediately unlock"] +pub struct RwLockReadGuard<'a, T: 'a>(inner::RwLockReadGuard<'a, T>); + +/// Exclusive-reference guard for [`RwLock`]. +#[must_use = "if unused the RwLock will immediately unlock"] +pub struct RwLockWriteGuard<'a, T: 'a>(inner::RwLockWriteGuard<'a, T>); + +/// Upgradable-read guard for [`RwLock`]. +/// +/// std `RwLock` has no native upgradable-read state machine; this is +/// an exclusive write guard with a `parking_lot`-shaped `upgrade()` API. +#[must_use = "if unused the RwLock will immediately unlock"] +pub struct RwLockUpgradableReadGuard<'a, T: 'a>(inner::RwLockWriteGuard<'a, T>); + +impl RwLock { + #[inline] + pub const fn new(value: T) -> Self { + Self(inner::RwLock::new(value)) + } + + #[inline] + pub fn into_inner(self) -> T { + match self.0.into_inner() { + Ok(v) => v, + Err(_) => poisoned(), + } + } + + #[inline] + pub fn read(&self) -> RwLockReadGuard<'_, T> { + match self.0.read() { + Ok(g) => RwLockReadGuard(g), + Err(_) => poisoned(), + } + } + + #[inline] + pub fn write(&self) -> RwLockWriteGuard<'_, T> { + match self.0.write() { + Ok(g) => RwLockWriteGuard(g), + Err(_) => poisoned(), + } + } + + #[inline] + pub fn try_read(&self) -> Option> { + match self.0.try_read() { + Ok(g) => Some(RwLockReadGuard(g)), + Err(TryLockError::Poisoned(_)) => poisoned(), + Err(TryLockError::WouldBlock) => None, + } + } + + #[inline] + pub fn try_write(&self) -> Option> { + match self.0.try_write() { + Ok(g) => Some(RwLockWriteGuard(g)), + Err(TryLockError::Poisoned(_)) => poisoned(), + Err(TryLockError::WouldBlock) => None, + } + } + + /// Acquire an upgradable read guard. + /// + /// std `RwLock` has no native upgradable-read; this is implemented + /// as an exclusive `write()`. Subsequent backends (parking_lot) + /// will replace this with a true upgradable read; meanwhile the + /// surface is consistent across backends, sound in all cases, and + /// merely loses the many-readers-plus-one-upgradable schedule that + /// `parking_lot` permits. + #[inline] + pub fn upgradable_read(&self) -> RwLockUpgradableReadGuard<'_, T> { + match self.0.write() { + Ok(g) => RwLockUpgradableReadGuard(g), + Err(_) => poisoned(), + } + } + + #[inline] + pub fn get_mut(&mut self) -> &mut T { + match self.0.get_mut() { + Ok(v) => v, + Err(_) => poisoned(), + } + } +} + +impl<'a, T: 'a> RwLockUpgradableReadGuard<'a, T> { + /// Upgrade to a write guard. Free here because we already hold the + /// underlying write lock. + #[inline] + pub fn upgrade(s: Self) -> RwLockWriteGuard<'a, T> { + RwLockWriteGuard(s.0) + } + + /// Always succeeds under the std backend. + /// + /// # Errors + /// + /// Never returns `Err`; the `Result` shape matches `parking_lot`. + #[inline] + pub fn try_upgrade(s: Self) -> Result, Self> { + Ok(RwLockWriteGuard(s.0)) + } +} + +impl fmt::Debug for RwLock { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("RwLock").finish_non_exhaustive() + } +} + +impl Default for RwLock { + fn default() -> Self { + Self::new(T::default()) + } +} + +impl From for RwLock { + fn from(value: T) -> Self { + Self::new(value) + } +} + +macro_rules! impl_rwlock_guard_traits { + ($guard:ident, $mutability:ident) => { + impl Deref for $guard<'_, T> { + type Target = T; + #[inline] + fn deref(&self) -> &T { + &*self.0 + } + } + + impl fmt::Debug for $guard<'_, T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt::Debug::fmt(&**self, f) + } + } + + impl fmt::Display for $guard<'_, T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt::Display::fmt(&**self, f) + } + } + + impl_rwlock_guard_traits!(@mut $guard, $mutability); + }; + (@mut $guard:ident, mut) => { + impl DerefMut for $guard<'_, T> { + #[inline] + fn deref_mut(&mut self) -> &mut T { + &mut *self.0 + } + } + }; + (@mut $guard:ident, immut) => {}; +} + +impl_rwlock_guard_traits!(RwLockReadGuard, immut); +impl_rwlock_guard_traits!(RwLockWriteGuard, mut); +impl_rwlock_guard_traits!(RwLockUpgradableReadGuard, immut); diff --git a/concurrency/tests/quiescent_properties.rs b/concurrency/tests/quiescent_properties.rs index 8bc4bf7a7e..a862822e09 100644 --- a/concurrency/tests/quiescent_properties.rs +++ b/concurrency/tests/quiescent_properties.rs @@ -1,7 +1,7 @@ // SPDX-License-Identifier: Apache-2.0 // Copyright Open Network Fabric Authors -//! Property-based protocol tests for `dataplane_quiescent`. +//! Property-based protocol tests for `dataplane_concurrency::quiescent`. //! //! Generates random sequences of [`Op`]s and checks the //! single-threaded protocol invariants after every step: @@ -24,15 +24,17 @@ //! resurrected, this assertion fires. //! //! Multi-threaded tests (drop affinity, concurrent stress) live in -//! `tests/protocol.rs`; loom-modeled tests live in `tests/loom.rs`. +//! `tests/quiescent_protocol.rs`; loom-modeled tests live in +//! `tests/quiescent_loom.rs`. +// Single-threaded bolero property tests; only meaningful under the +// default backend. Same rationale as in `quiescent_protocol.rs`. #![cfg(not(any(feature = "loom", feature = "shuttle")))] -use std::sync::Arc; -use std::sync::atomic::{AtomicUsize, Ordering}; - use bolero::TypeGenerator; use dataplane_concurrency::quiescent::channel; +use dataplane_concurrency::sync::Arc; +use dataplane_concurrency::sync::atomic::{AtomicUsize, Ordering}; // ---------- ops & state ---------- diff --git a/concurrency/tests/quiescent_protocol.rs b/concurrency/tests/quiescent_protocol.rs index b24a84dad2..2abb01469f 100644 --- a/concurrency/tests/quiescent_protocol.rs +++ b/concurrency/tests/quiescent_protocol.rs @@ -1,12 +1,13 @@ // SPDX-License-Identifier: Apache-2.0 // Copyright Open Network Fabric Authors -//! Multi-threaded protocol tests for `dataplane_quiescent`. +//! Multi-threaded protocol tests for `dataplane_concurrency::quiescent`. //! //! The single-threaded protocol invariants (snapshot legality, //! reclamation gating, conservation of `Versioned` allocations) are -//! covered by the bolero property tests in `tests/properties.rs`. -//! This file holds only the tests that genuinely need real OS threads: +//! covered by the bolero property tests in +//! `tests/quiescent_properties.rs`. This file holds only the tests +//! that genuinely need real OS threads: //! //! - **Drop affinity**: drops must run on the Publisher's //! thread, even when the last Subscriber drops concurrently with @@ -20,16 +21,20 @@ //! requires `'static`) won't work; `thread::scope` matches the //! lifetime exactly. //! -//! Loom-modeled tests live in `tests/loom.rs`. +//! Loom-modeled tests live in `tests/quiescent_loom.rs`. +// Protocol tests use real OS threads via `thread::scope` + `thread::sleep`, +// which only make sense under the default backend. Under any model-checker +// backend (loom or any shuttle variant) the surrounding facade is rewired +// and the std-shaped types these tests use would either fail to compile or +// fault outside the corresponding runtime. #![cfg(not(any(feature = "loom", feature = "shuttle")))] -use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; -use std::sync::{Arc, Mutex}; -use std::thread; -use std::time::Duration; - use dataplane_concurrency::quiescent::channel; +use dataplane_concurrency::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; +use dataplane_concurrency::sync::{Arc, Mutex}; +use dataplane_concurrency::thread; +use std::time::Duration; // ---------- helpers ---------- @@ -47,7 +52,7 @@ impl Drop for Marker { fn drop(&mut self) { self.drop_counter.fetch_add(1, Ordering::Relaxed); if let Some(slot) = &self.drop_threads { - slot.lock().unwrap().push(thread::current().id()); + slot.lock().push(thread::current().id()); } } } @@ -96,7 +101,7 @@ fn destructor_of_initial_runs_on_publisher_thread() { publisher.publish(marker(1, &drops)); publisher.reclaim(); - let observed = initial_drop_threads.lock().unwrap(); + let observed = initial_drop_threads.lock(); assert_eq!( observed.as_slice(), &[publisher_thread_id], @@ -136,7 +141,7 @@ fn destructor_runs_on_publisher_when_last_subscriber_drops_concurrently() { publisher.reclaim(); } - let observed = drop_threads.lock().unwrap(); + let observed = drop_threads.lock(); assert!( !observed.is_empty(), "no drops recorded; the test setup never exercised the Drop path", diff --git a/flow-entry/src/flow_table/display.rs b/flow-entry/src/flow_table/display.rs index 018e80d0ba..accafacf10 100644 --- a/flow-entry/src/flow_table/display.rs +++ b/flow-entry/src/flow_table/display.rs @@ -9,7 +9,7 @@ impl CliSource for FlowTable {} impl Display for FlowTable { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - if let Ok(table) = self.table.try_read() { + if let Some(table) = self.table.try_read() { Heading(format!("Flow Table ({} entries)", table.len())).fmt(f)?; for entry in table.iter() { let key = entry.key(); diff --git a/flow-entry/src/flow_table/nf_lookup.rs b/flow-entry/src/flow_table/nf_lookup.rs index 059ea05ec7..d0867241ff 100644 --- a/flow-entry/src/flow_table/nf_lookup.rs +++ b/flow-entry/src/flow_table/nf_lookup.rs @@ -140,7 +140,7 @@ mod test { } } - #[traced_test] + #[cfg_attr(not(miri), traced_test)] #[tokio::test] async fn test_lookup_nf_with_expiration() { let flow_table = Arc::new(FlowTable::default()); diff --git a/flow-entry/src/flow_table/table.rs b/flow-entry/src/flow_table/table.rs index 5610d45ea0..1d59fb1f9b 100644 --- a/flow-entry/src/flow_table/table.rs +++ b/flow-entry/src/flow_table/table.rs @@ -35,7 +35,7 @@ impl Default for FlowTable { } fn hasher_state() -> &'static RandomState { - use std::sync::OnceLock; + use concurrency::sync::OnceLock; static HASHER_STATE: OnceLock = OnceLock::new(); HASHER_STATE.get_or_init(|| RandomState::with_seeds(0, 0, 0, 0)) } @@ -86,9 +86,8 @@ impl FlowTable { /// /// # Panics /// - /// Panics if this thread already holds the read lock on the table or - /// if the table lock is poisoned, or if the new number of shards is - /// not a power of 2. + /// Panics if this thread already holds the read lock on the table, + /// or if the new number of shards is not a power of 2. pub fn reshard(&self, num_shards: usize) { assert!( num_shards.is_power_of_two(), @@ -96,10 +95,10 @@ impl FlowTable { ); debug!( "reshard: Resharding flow table from {} shards into {} shards", - self.table.read().unwrap().shards().len(), + self.table.read().shards().len(), num_shards ); - let mut locked_table = self.table.write().unwrap(); + let mut locked_table = self.table.write(); let new_table = DashMap::with_hasher_and_shard_amount(locked_table.hasher().clone(), num_shards); let old_table = std::mem::replace(&mut *locked_table, new_table); @@ -122,8 +121,7 @@ impl FlowTable { /// /// # Panics /// - /// Panics if: - /// - this thread already holds the read lock on the table or if the table lock is poisoned. + /// Panics if this thread already holds the read lock on the table. /// /// # Errors /// @@ -141,8 +139,7 @@ impl FlowTable { /// /// # Panics /// - /// Panics if: - /// - this thread already holds the read lock on the table or if the table lock is poisoned. + /// Panics if this thread already holds the read lock on the table. /// /// # Errors /// @@ -194,32 +191,24 @@ impl FlowTable { // We use remove_if + ptr_eq so that a concurrently-inserted replacement is left intact // and try_read() instead of read() so as not to block loop { - let result = table.try_read(); - match result { - Ok(table) => { - let res = table.remove_if(flow_key, |_, v| Arc::ptr_eq(v, &flow_info)); - if res.is_none() { - debug!("Flow-timer: Unable to remove flow {flow_key}: not found"); - } - return; - } - Err(std::sync::TryLockError::WouldBlock) => { - // let other work get done while we wait. We need to drop the result first - drop(result); - debug!("Flow-timer: Waiting for table read access"); - tokio::time::sleep(Duration::from_millis(50)).await; - } - Err(std::sync::TryLockError::Poisoned(_p)) => { - debug!("Flow-timer: FlowTable RwLock poisoned!"); - return; + if let Some(table) = table.try_read() { + let res = table.remove_if(flow_key, |_, v| Arc::ptr_eq(v, &flow_info)); + if res.is_none() { + debug!("Flow-timer: Unable to remove flow {flow_key}: not found"); } + return; } + // Outer write lock is only held during reshard, which is rare and brief; we still + // want a bounded backoff rather than `yield_now()` so a write-locker contending + // with this task can't cause it to spin a tokio worker. + debug!("Flow-timer: Waiting for table read access"); + tokio::time::sleep(Duration::from_millis(50)).await; } }); } fn insert_common(&self, val: &Arc) -> Result>, FlowTableError> { - let table = self.table.read().unwrap(); + let table = self.table.read(); let capacity = self.capacity.load(Ordering::Relaxed); let flow_key = val.flowkey(); debug!("insert: inserting flow {flow_key}"); @@ -269,12 +258,8 @@ impl FlowTable { /// Drain all stale (Expired / Cancelled / deadline-passed Active) entries from the table. /// /// Returns the number of entries removed. - /// - /// # Panics - /// - /// Panics if the table lock is poisoned. pub fn drain_stale(&self) -> usize { - let table = self.table.read().unwrap(); + let table = self.table.read(); let now = std::time::Instant::now(); let mut count = 0usize; table.retain(|_, v| { @@ -295,15 +280,14 @@ impl FlowTable { /// /// # Panics /// - /// Panics if this thread already holds the read lock on the table or - /// if the table lock is poisoned. + /// Panics if this thread already holds the read lock on the table. pub fn lookup(&self, flow_key: &Q) -> Option> where FlowKey: Borrow, Q: Hash + Eq + ?Sized + Debug + Display, { debug!("lookup: Looking up flow key {flow_key}"); - let table = self.table.read().unwrap(); + let table = self.table.read(); Some(table.get(flow_key)?.value().clone()) } @@ -311,15 +295,14 @@ impl FlowTable { /// /// # Panics /// - /// Panics if this thread already holds the read lock on the table or - /// if the table lock is poisoned. + /// Panics if this thread already holds the read lock on the table. pub fn remove(&self, flow_key: &Q) -> Option<(FlowKey, Arc)> where FlowKey: Borrow, Q: Hash + Eq + ?Sized + Debug + Display, { debug!("remove: Removing flow key {flow_key}"); - let table = self.table.read().unwrap(); + let table = self.table.read(); let result = table.remove(flow_key); if let Some((_key, flow_info)) = result.as_ref() { flow_info.update_status(FlowStatus::Detached); @@ -334,7 +317,7 @@ impl FlowTable { /// their expiration status. This is mostly for testing. #[must_use] pub fn len(&self) -> Option { - let table = self.table.try_read().ok()?; + let table = self.table.try_read()?; Some(table.len()) } @@ -342,7 +325,7 @@ impl FlowTable { /// This is mostly for testing. #[must_use] pub fn active_len(&self) -> Option { - let table = self.table.try_read().ok()?; + let table = self.table.try_read()?; Some( table .iter() @@ -360,7 +343,7 @@ impl FlowTable { where F: FnMut(&FlowKey, &FlowInfo), { - let guard = self.table.read().unwrap(); + let guard = self.table.read(); for flow in guard.iter() { func(flow.key(), &flow); } @@ -377,7 +360,7 @@ impl FlowTable { F: FnMut(&FlowKey, &FlowInfo), P: Fn(&FlowKey, &FlowInfo) -> bool, { - let guard = self.table.read().unwrap(); + let guard = self.table.read(); for flow in guard.iter().filter(|flow| filter(flow.key(), flow)) { func(flow.key(), &flow); } @@ -396,7 +379,7 @@ impl FlowTable { where P: Fn(&FlowKey, &FlowInfo) -> bool, { - let table = self.table.read().unwrap(); + let table = self.table.read(); let v: Vec<_> = table .iter() .filter(|flow| filter(flow.key(), flow)) @@ -416,7 +399,7 @@ impl FlowTable { where F: Fn(&FlowKey, &FlowInfo), { - let table = self.table.read().unwrap(); + let table = self.table.read(); for shard in table.shards() { let g = shard.read(); unsafe { @@ -537,7 +520,7 @@ mod tests { // The entry stored in the table should be the first arc. { - let table = flow_table.table.read().unwrap(); + let table = flow_table.table.read(); let entry = table .get(&flow_key) .expect("entry should exist after first insert"); @@ -550,37 +533,39 @@ mod tests { // The table should now point to the second entry. { - let table = flow_table.table.read().unwrap(); + let table = flow_table.table.read(); let entry = table .get(&flow_key) .expect("entry should exist after second insert"); assert_ne!(entry.value().expires_at(), first_expiry_time); + assert_eq!(entry.value().expires_at(), second_expiry_time); } } #[tokio::test] async fn test_flow_table_remove_bolero() { - let flow_table = FlowTable::default(); bolero::check!() .with_type::() + .cloned() .for_each(|flow_key| { + let flow_table = FlowTable::default(); // Use a future expiry so the flow stays active long enough for remove(). flow_table .insert(FlowInfo::new( - *flow_key, + flow_key, Instant::now() + Duration::from_mins(1), )) .unwrap(); - let flow_info = flow_table.lookup(flow_key).unwrap(); + let flow_info = flow_table.lookup(&flow_key).unwrap(); assert!(flow_table.lookup(&flow_key.reverse(None)).is_none()); - let result = flow_table.remove(flow_key); + let result = flow_table.remove(&flow_key); assert!(result.is_some()); let (k, v) = result.unwrap(); - assert_eq!(k, *flow_key); + assert_eq!(k, flow_key); assert!(Arc::ptr_eq(&v, &flow_info)); - assert!(flow_table.lookup(flow_key).is_none()); + assert!(flow_table.lookup(&flow_key).is_none()); }); } @@ -859,7 +844,7 @@ mod tests { for _ in 0..N { thread::yield_now(); if let Some(flow_info) = flow_table.lookup(&flow_key) { - let _guard = flow_info.locked.write().unwrap(); + let _guard = flow_info.locked.write(); } } } diff --git a/flow-filter/Cargo.toml b/flow-filter/Cargo.toml index 4c4d0d92f0..c4fb2eaca2 100644 --- a/flow-filter/Cargo.toml +++ b/flow-filter/Cargo.toml @@ -7,6 +7,7 @@ version.workspace = true [dependencies] common = { workspace = true } +concurrency = { workspace = true } config = { workspace = true } indenter = { workspace = true } left-right = { workspace = true } diff --git a/flow-filter/src/lib.rs b/flow-filter/src/lib.rs index 41d15f068c..b851b32e61 100644 --- a/flow-filter/src/lib.rs +++ b/flow-filter/src/lib.rs @@ -23,7 +23,8 @@ use std::collections::HashSet; use std::fmt::Display; use std::net::IpAddr; use std::num::NonZero; -use std::sync::Arc; + +use concurrency::sync::Arc; use tracing::{debug, error}; mod display; @@ -78,7 +79,7 @@ impl FlowFilter { if flow_info.genid() == genid { return false; } - let locked_info = flow_info.locked.read().unwrap(); + let locked_info = flow_info.locked.read(); let flow_port_fw = locked_info.port_fw_state.is_some(); let flow_masquerade = locked_info.nat_state.is_some(); let flowkey = flow_info.flowkey(); @@ -318,14 +319,7 @@ impl FlowFilter { fn set_nat_requirements_from_flow_info( packet: &mut Packet, ) -> Result<(), ()> { - let locked_info = packet - .meta() - .flow_info - .as_ref() - .ok_or(())? - .locked - .read() - .map_err(|_| ())?; + let locked_info = packet.meta().flow_info.as_ref().ok_or(())?.locked.read(); let needs_stateful_nat = locked_info.nat_state.is_some(); let needs_port_forwarding = locked_info.port_fw_state.is_some(); drop(locked_info); diff --git a/flow-filter/src/tests.rs b/flow-filter/src/tests.rs index b3f32a2a1f..ba19724ef4 100644 --- a/flow-filter/src/tests.rs +++ b/flow-filter/src/tests.rs @@ -29,8 +29,9 @@ use std::collections::HashSet; use std::net::IpAddr; use std::net::{Ipv4Addr, Ipv6Addr}; use std::str::FromStr; -use std::sync::Arc; use std::time::{Duration, Instant}; + +use concurrency::sync::Arc; use tracing_test::traced_test; fn vni(id: u32) -> Vni { @@ -198,7 +199,7 @@ fn fake_flow_session( // pretend that flow is in table flow_info.update_status(FlowStatus::Active); - let mut binding = flow_info.locked.write().unwrap(); + let mut binding = flow_info.locked.write(); binding.dst_vpcd = Some(dst_vpcd); if set_nat_state { // Content should be a NatFlowState object but we can't include it in this crate without diff --git a/nat/src/common/mod.rs b/nat/src/common/mod.rs index c04c5b31c0..c5e5f9d472 100644 --- a/nat/src/common/mod.rs +++ b/nat/src/common/mod.rs @@ -5,9 +5,9 @@ //! While common to both NAT flavors, their use is not dictated here //! but individually by each NAT flavor implementation. +use concurrency::sync::Arc; +use concurrency::sync::atomic::{AtomicU8, Ordering}; use std::fmt::Display; -use std::sync::Arc; -use std::sync::atomic::AtomicU8; /// A type to represent a NAT action #[derive(Debug, Clone, Copy, PartialEq)] @@ -95,11 +95,10 @@ impl AtomicNatFlowStatus { #[must_use] pub fn load(&self) -> NatFlowStatus { - self.0.load(std::sync::atomic::Ordering::Relaxed).into() + self.0.load(Ordering::Relaxed).into() } pub fn store(&self, status: NatFlowStatus) { - self.0 - .store(status.into(), std::sync::atomic::Ordering::Relaxed); + self.0.store(status.into(), Ordering::Relaxed); } } diff --git a/nat/src/icmp_handler/nf.rs b/nat/src/icmp_handler/nf.rs index e0a94a9a2c..70f25dd087 100644 --- a/nat/src/icmp_handler/nf.rs +++ b/nat/src/icmp_handler/nf.rs @@ -14,8 +14,8 @@ use net::icmp4::{Icmp4DestUnreachable, Icmp4Type}; use net::icmp6::Icmp6Type; use net::packet::{DoneReason, Packet}; +use concurrency::sync::Arc; use pipeline::NetworkFunction; -use std::sync::Arc; use strum::EnumMessage; use tracectl::trace_target; use tracing::{debug, warn}; @@ -162,7 +162,7 @@ impl IcmpErrorHandler { return; } - let flow_info_locked = flow.locked.read().unwrap(); + let flow_info_locked = flow.locked.read(); let Some(dst_vpcd) = flow_info_locked.dst_vpcd else { warn!("Flow for {rev_flow_key} has no dst VPC discriminant set. This is a bug"); packet.done(DoneReason::InternalFailure); diff --git a/nat/src/portfw/flow_state.rs b/nat/src/portfw/flow_state.rs index 350e3f1454..b414fc5107 100644 --- a/nat/src/portfw/flow_state.rs +++ b/nat/src/portfw/flow_state.rs @@ -14,7 +14,8 @@ use net::{FlowKey, IpProtoKey}; use std::fmt::Display; use std::num::NonZero; -use std::sync::{Arc, Weak}; + +use concurrency::sync::{Arc, Weak}; use flow_entry::flow_table::FlowInfo; @@ -140,11 +141,10 @@ pub(crate) fn setup_forward_flow( ); // set the port forwarding state in the flow - if let Ok(mut write_guard) = forward_flow.locked.write() { + { + let mut write_guard = forward_flow.locked.write(); write_guard.port_fw_state = Some(Box::new(port_fw_state)); write_guard.dst_vpcd = Some(entry.dst_vpcd); - } else { - unreachable!() } debug!("Set up FORWARD flow for port-forwarding;\nkey={flow_key}\ninfo={forward_flow}"); status @@ -162,11 +162,10 @@ pub(crate) fn setup_reverse_flow( let port_fw_state = PortFwState::new_snat(dst_ip, dst_port, Arc::downgrade(entry), status); // set the port forwarding state in the flow - if let Ok(mut write_guard) = reverse_flow.locked.write() { + { + let mut write_guard = reverse_flow.locked.write(); write_guard.port_fw_state = Some(Box::new(port_fw_state)); write_guard.dst_vpcd = Some(entry.key.src_vpcd()); - } else { - unreachable!() } debug!("Set up REVERSE flow for port-forwarding;\nkey={reverse_key}\ninfo={reverse_flow}"); } @@ -185,11 +184,8 @@ pub(crate) fn get_packet_port_fw_state( debug!("Packet flow-info is not active (status:{status})"); return None; } - let Ok(flow_info_locked) = flow.locked.read() else { - error!("Packet has flow-info but it could not be locked"); - return None; - }; - let Some(state) = flow_info_locked + let guard = flow.locked.read(); + let Some(state) = guard .port_fw_state .as_ref() .and_then(|s| s.extract_ref::()) diff --git a/nat/src/portfw/icmp_handling.rs b/nat/src/portfw/icmp_handling.rs index 85e00f68e8..601adc683e 100644 --- a/nat/src/portfw/icmp_handling.rs +++ b/nat/src/portfw/icmp_handling.rs @@ -67,7 +67,7 @@ pub(crate) fn handle_icmp_error_port_forwarding( let f = flow_info.logfmt(); debug!("(port-forwarding): Processing ICMP error packet from {src_vpcd} using flow {f}"); - let flow_info_locked = flow_info.locked.read().unwrap(); + let flow_info_locked = flow_info.locked.read(); let state = flow_info_locked .port_fw_state .extract_ref::() diff --git a/nat/src/portfw/nf.rs b/nat/src/portfw/nf.rs index ccf9c96135..2ded1e6253 100644 --- a/nat/src/portfw/nf.rs +++ b/nat/src/portfw/nf.rs @@ -4,6 +4,7 @@ //! Port forwarding stage use crate::portfw::{PortFwEntry, PortFwKey, PortFwState, PortFwTable, PortFwTableReader}; +use concurrency::sync::{Arc, Weak}; use flow_entry::flow_table::table::FlowTable; use net::buffer::PacketBufferMut; @@ -13,7 +14,6 @@ use net::ip::UnicastIpAddr; use net::packet::{DoneReason, Packet, VpcDiscriminant}; use pipeline::{NetworkFunction, PipelineData}; use std::num::NonZero; -use std::sync::Arc; use std::time::Instant; use crate::common::NatAction; @@ -120,7 +120,7 @@ impl PortForwarder { setup_reverse_flow(&rev_key, &rev_flow, entry, dst_ip, dst_port, status); // get the state we just created for the FORWARD direction - let locked = fw_flow.locked.read().unwrap(); + let locked = fw_flow.locked.read(); let pfw_state = locked .port_fw_state .extract_ref::() @@ -271,7 +271,7 @@ impl PortForwarder { } fn reassign_port_fw_rule(flow_info: &FlowInfo, entry: &Arc) { - let mut flow_info_locked = flow_info.locked.write().unwrap(); + let mut flow_info_locked = flow_info.locked.write(); if let Some(state) = flow_info_locked.port_fw_state.extract_mut::() { state.rule = Arc::downgrade(entry); } @@ -297,11 +297,7 @@ impl PortForwarder { // point to it so that subsequent packets are fast-forwarded. if let Some(entry) = entry.as_ref() { Self::reassign_port_fw_rule(flow_info, entry); - if let Some(related) = flow_info - .related - .as_ref() - .and_then(std::sync::Weak::upgrade) - { + if let Some(related) = flow_info.related.as_ref().and_then(Weak::upgrade) { Self::reassign_port_fw_rule(&related, entry); } } diff --git a/nat/src/portfw/portfwtable/objects.rs b/nat/src/portfw/portfwtable/objects.rs index 60dfc96688..6fa24b5066 100644 --- a/nat/src/portfw/portfwtable/objects.rs +++ b/nat/src/portfw/portfwtable/objects.rs @@ -4,6 +4,10 @@ //! Port forwarding objects use ahash::RandomState; +use concurrency::sync::Arc; +#[cfg(test)] +use concurrency::sync::Weak; +use concurrency::sync::atomic::{AtomicU64, Ordering}; use lpm::prefix::{IpPrefix, Ipv4Prefix, Ipv6Prefix, Prefix}; use net::ip::NextHeader; use net::ip::UnicastIpAddr; @@ -13,11 +17,6 @@ use std::fmt::Debug; use std::net::IpAddr; use std::net::{Ipv4Addr, Ipv6Addr}; use std::num::NonZero; -use std::sync::Arc; -#[cfg(test)] -use std::sync::Weak; -use std::sync::atomic::AtomicU64; -use std::sync::atomic::Ordering; use std::time::Duration; use tracing::error; @@ -110,15 +109,12 @@ impl PortFwEntry { #[must_use] pub fn init_timeout(&self) -> Duration { - Duration::from_secs(self.init_timeout.load(std::sync::atomic::Ordering::Relaxed)) + Duration::from_secs(self.init_timeout.load(Ordering::Relaxed)) } #[must_use] pub fn estab_timeout(&self) -> Duration { - Duration::from_secs( - self.estab_timeout - .load(std::sync::atomic::Ordering::Relaxed), - ) + Duration::from_secs(self.estab_timeout.load(Ordering::Relaxed)) } pub fn set_init_timeout(&self, duration: Duration) { @@ -362,11 +358,11 @@ impl PortFwTable { #[cfg(test)] mod test { use super::{PortFwEntry, PortFwKey, PortFwTable, PortFwTableError}; + use concurrency::sync::Arc; use lpm::prefix::Prefix; use net::ip::NextHeader; use net::packet::VpcDiscriminant; use std::str::FromStr; - use std::sync::Arc; use std::time::Duration; use tracing_test::traced_test; diff --git a/nat/src/portfw/portfwtable/portforwarder.rs b/nat/src/portfw/portfwtable/portforwarder.rs index ec7bdd35ae..055a6ed2ea 100644 --- a/nat/src/portfw/portfwtable/portforwarder.rs +++ b/nat/src/portfw/portfwtable/portforwarder.rs @@ -10,11 +10,11 @@ use super::lpmmap::LpmMap; use super::objects::PortFwEntry; use super::rangeset::{PrefixMap, RangeSet, RangeSetError}; use crate::portfw::PortRange; +use concurrency::sync::Arc; use lpm::prefix::{IpPrefix, Ipv4Prefix, Ipv6Prefix, Prefix}; use net::ip::UnicastIpAddr; use std::net::{IpAddr, Ipv4Addr, Ipv6Addr}; use std::num::NonZero; -use std::sync::Arc; #[allow(unused)] use tracing::{debug, warn}; @@ -84,10 +84,10 @@ impl PortForwarder { #[cfg(test)] mod test { use crate::portfw::{PortFwEntry, PortFwKey}; + use concurrency::sync::Arc; use net::ip::NextHeader; use net::packet::VpcDiscriminant; use std::num::NonZero; - use std::sync::Arc; use super::{PortForwarder, PrefixMap}; use lpm::prefix::Prefix; diff --git a/nat/src/portfw/test.rs b/nat/src/portfw/test.rs index 34bac9c692..ad4c08d6a8 100644 --- a/nat/src/portfw/test.rs +++ b/nat/src/portfw/test.rs @@ -47,7 +47,6 @@ mod nf_test { .as_ref()? .locked .read() - .unwrap() .port_fw_state .as_ref() .and_then(|s| s.extract_ref::()) @@ -61,7 +60,6 @@ mod nf_test { .as_ref()? .locked .read() - .unwrap() .port_fw_state .as_ref() .and_then(|s| s.extract_ref::()) @@ -583,7 +581,7 @@ mod nf_test { let flow = packet.meta().flow_info.as_ref().unwrap(); // flow entry should have port-forwarding state - let locked = flow.locked.read().unwrap(); + let locked = flow.locked.read(); let state = locked .port_fw_state .as_ref() @@ -599,7 +597,7 @@ mod nf_test { let flow = packet.meta().flow_info.as_ref().unwrap(); // flow entry should have port-forwarding state - let locked = flow.locked.read().unwrap(); + let locked = flow.locked.read(); let state = locked .port_fw_state .as_ref() diff --git a/nat/src/stateful/allocator_writer.rs b/nat/src/stateful/allocator_writer.rs index fcd4b31b0f..54c64edadd 100644 --- a/nat/src/stateful/allocator_writer.rs +++ b/nat/src/stateful/allocator_writer.rs @@ -2,13 +2,13 @@ // Copyright Open Network Fabric Authors use crate::stateful::apalloc::NatAllocator; -use arc_swap::ArcSwapOption; +use concurrency::slot::SlotOption; +use concurrency::sync::Arc; use config::GenId; use config::external::overlay::vpc::{ValidatedPeering, ValidatedVpcTable}; use config::external::overlay::vpcpeering::ValidatedExpose; use flow_entry::flow_table::FlowTable; use net::packet::VpcDiscriminant; -use std::sync::Arc; use tracing::debug; use crate::stateful::flows::check_masquerading_flows; @@ -95,12 +95,12 @@ impl StatefulNatConfig { } #[derive(Debug)] -pub struct NatAllocatorWriter(Arc>); +pub struct NatAllocatorWriter(Arc>); impl NatAllocatorWriter { #[must_use] pub fn new() -> Self { - Self(Arc::new(ArcSwapOption::new(None))) + Self(Arc::new(SlotOption::empty())) } #[must_use] @@ -121,7 +121,7 @@ impl NatAllocatorWriter { /// will be transferred (reserved) in the new allocator. pub fn update_nat_allocator(&mut self, nat_config: StatefulNatConfig, flow_table: &FlowTable) { let genid = nat_config.genid(); - let curr_allocator = self.0.load(); + let curr_allocator = self.0.load_full(); // keep state as-is if config did not change, and just upgrade flows if let Some(current) = curr_allocator.as_ref() @@ -164,17 +164,17 @@ impl Default for NatAllocatorWriter { } #[derive(Debug, Clone)] -pub struct NatAllocatorReader(Arc>); +pub struct NatAllocatorReader(Arc>); impl NatAllocatorReader { pub fn get(&self) -> Option> { - self.0.load().clone() + self.0.load_full() } #[must_use] pub fn factory(&self) -> NatAllocatorReaderFactory { NatAllocatorReaderFactory(self.clone()) } - pub fn inner(&self) -> Arc> { + pub fn inner(&self) -> Arc> { self.0.clone() } } diff --git a/nat/src/stateful/apalloc/alloc.rs b/nat/src/stateful/apalloc/alloc.rs index 21cefab232..abe30fa310 100644 --- a/nat/src/stateful/apalloc/alloc.rs +++ b/nat/src/stateful/apalloc/alloc.rs @@ -45,25 +45,23 @@ impl IpAllocator { } } - pub(crate) fn read(&self) -> Result>, AllocatorError> { - self.pool.read().map_err(|_| { - AllocatorError::InternalIssue("Failed to acquire read lock (poisoned)".to_string()) - }) + pub(crate) fn read(&self) -> RwLockReadGuard<'_, NatPool> { + self.pool.read() } - pub(crate) fn idle_timeout(&self) -> Option { - Some(self.pool.read().ok()?.idle_timeout()) + pub(crate) fn idle_timeout(&self) -> Duration { + self.pool.read().idle_timeout() } fn deallocate_ip(&self, ip: I) { - self.pool.write().unwrap().deallocate_from_pool(ip); + self.pool.write().deallocate_from_pool(ip); } fn reuse_allocated_ip( &self, allow_null: bool, ) -> Result, AllocatorError> { - let allocated_ips = self.pool.read().unwrap(); + let allocated_ips = self.pool.read(); for ip_weak in allocated_ips.ips_in_use() { let Some(ip) = ip_weak.upgrade() else { continue; @@ -85,7 +83,7 @@ impl IpAllocator { } fn allocate_new_ip_from_pool(&self) -> Result>, AllocatorError> { - let mut allocated_ips = self.pool.write().unwrap(); + let mut allocated_ips = self.pool.write(); let new_ip = allocated_ips.use_new_ip(self.clone(), self.randomize)?; let arc_ip = Arc::new(new_ip); allocated_ips.add_in_use(&arc_ip); @@ -102,7 +100,7 @@ impl IpAllocator { } fn cleanup_used_ips(&self) { - let mut allocated_ips = self.pool.write().unwrap(); + let mut allocated_ips = self.pool.write(); allocated_ips.cleanup(); } @@ -122,7 +120,6 @@ impl IpAllocator { fn get_allocated_ip(&self, ip: I) -> Result>, AllocatorError> { self.pool .write() - .unwrap() .reserve_from_pool(ip, self.clone(), self.randomize) } @@ -138,7 +135,7 @@ impl IpAllocator { // Helper to access IpAllocator's internals for tests. Not to be used outside of tests. #[cfg(test)] pub fn get_pool_clone_for_tests(&self) -> (RoaringBitmap, VecDeque>>) { - let pool = self.pool.read().unwrap(); + let pool = self.pool.read(); (pool.bitmap.0.clone(), pool.in_use.clone()) } } diff --git a/nat/src/stateful/apalloc/display.rs b/nat/src/stateful/apalloc/display.rs index 215d06e35a..04c7bca91b 100644 --- a/nat/src/stateful/apalloc/display.rs +++ b/nat/src/stateful/apalloc/display.rs @@ -68,7 +68,7 @@ where I: NatIpWithBitmap + Display, { fn fmt(&self, f: &mut Formatter<'_>) -> Result { - let pool = self.read().map_err(|_| Error)?; + let pool = self.read(); write!(f, "{pool}") } } diff --git a/nat/src/stateful/apalloc/mod.rs b/nat/src/stateful/apalloc/mod.rs index ce91b8d0e7..bdebb0ad50 100644 --- a/nat/src/stateful/apalloc/mod.rs +++ b/nat/src/stateful/apalloc/mod.rs @@ -319,7 +319,7 @@ impl NatAllocator { let allow_null = next_header == NextHeader::ICMP || next_header == NextHeader::ICMP6; let mut allocation = pool.allocate(allow_null)?; allocation.set_genid(self.config.genid()); - let idle_timeout = pool.idle_timeout().unwrap_or_else(|| unreachable!()); + let idle_timeout = pool.idle_timeout(); Ok(AllocationResult { allocation, diff --git a/nat/src/stateful/apalloc/port_alloc.rs b/nat/src/stateful/apalloc/port_alloc.rs index e95f378892..46581aa1a3 100644 --- a/nat/src/stateful/apalloc/port_alloc.rs +++ b/nat/src/stateful/apalloc/port_alloc.rs @@ -438,7 +438,7 @@ impl AllocatedPortBlock { let reserve_zero = !allow_null && block.base_port_idx == 0; let reserve_range = reserved_port_range.is_some(); if reserve_zero || reserve_range { - let mut mutex_guard = block.usage_bitmap.lock().unwrap(); + let mut mutex_guard = block.usage_bitmap.lock(); if reserve_zero { mutex_guard.reserve_port_from_bitmap(0).map_err(|()| { AllocatorError::InternalIssue( @@ -467,7 +467,7 @@ impl AllocatedPortBlock { } fn is_full(&self) -> bool { - self.usage_bitmap.lock().unwrap().bitmap_full() + self.usage_bitmap.lock().bitmap_full() } fn covers(&self, port: NatPort) -> bool { @@ -479,7 +479,6 @@ impl AllocatedPortBlock { fn deallocate_port_from_block(&self, port: NatPort) -> Result<(), AllocatorError> { self.usage_bitmap .lock() - .unwrap() .deallocate_port_from_bitmap( u8::try_from(port.as_u16().checked_sub(self.base_port_idx).ok_or( AllocatorError::InternalIssue( @@ -502,7 +501,6 @@ impl AllocatedPortBlock { let bitmap_offset = self .usage_bitmap .lock() - .unwrap() .allocate_port_from_bitmap() .map_err(|()| AllocatorError::NoFreePort(self.base_port_idx))?; @@ -526,7 +524,6 @@ impl AllocatedPortBlock { ) -> Result, AllocatorError> { self.usage_bitmap .lock() - .unwrap() .reserve_port_from_bitmap( u8::try_from(port.as_u16().checked_sub(self.base_port_idx).ok_or( AllocatorError::InternalIssue( @@ -548,7 +545,6 @@ impl AllocatedPortBlock { fn allocated_port_ranges(&self) -> BTreeSet { self.usage_bitmap .lock() - .unwrap() .allocated_port_ranges() .iter() .map(|range| { @@ -640,17 +636,13 @@ impl ThreadPortMap { fn get(&self) -> Option { self.0 .read() - .unwrap() .get(&std::thread::current().id()) .copied() .unwrap_or(None) } fn set(&self, index: Option) { - self.0 - .write() - .unwrap() - .insert(std::thread::current().id(), index); + self.0.write().insert(std::thread::current().id(), index); } } @@ -682,11 +674,11 @@ impl AllocatedPortBlockMap { } fn get_weak(&self, index: usize) -> Option>> { - self.0.read().unwrap().get(&index).cloned() + self.0.read().get(&index).cloned() } fn remove(&self, index: usize) { - self.0.write().unwrap().remove(&index); + self.0.write().remove(&index); } fn get(&self, index: usize) -> Option>> { @@ -697,19 +689,18 @@ impl AllocatedPortBlockMap { } fn insert(&self, index: usize, block: Weak>) { - self.0.write().unwrap().insert(index, block); + self.0.write().insert(index, block); } fn has_entries_with_free_ports(&self) -> bool { self.0 .read() - .unwrap() .values() .any(|block| block.upgrade().is_some_and(|block| !block.is_full())) } fn search_for_block(&self, port: NatPort) -> Option>> { - let blocks = self.0.read().unwrap(); + let blocks = self.0.read(); blocks .values() .find(|block| block.upgrade().is_some_and(|block| block.covers(port)))? @@ -718,7 +709,7 @@ impl AllocatedPortBlockMap { // Used for Display fn allocated_port_ranges(&self) -> BTreeSet { - let blocks = self.0.read().unwrap(); + let blocks = self.0.read(); let mut ranges = BTreeSet::::new(); for (_, block) in blocks.iter() { if let Some(block) = block.upgrade() { diff --git a/nat/src/stateful/apalloc/test_alloc.rs b/nat/src/stateful/apalloc/test_alloc.rs index 611bcf50dd..b7af37449d 100644 --- a/nat/src/stateful/apalloc/test_alloc.rs +++ b/nat/src/stateful/apalloc/test_alloc.rs @@ -412,9 +412,9 @@ mod tests_shuttle { use super::context::*; use super::tests; + use concurrency::sync::{Arc, Mutex}; + use concurrency::thread; use net::ip::NextHeader; - use shuttle::sync::{Arc, Mutex}; - use shuttle::thread; #[should_panic(expected = "assertion `left == right` failed")] #[test] @@ -425,10 +425,10 @@ mod tests_shuttle { let lock2 = lock.clone(); thread::spawn(move || { - *lock.lock().unwrap() = 1; + *lock.lock() = 1; }); - assert_eq!(0, *lock2.lock().unwrap()); + assert_eq!(0, *lock2.lock()); }, 100, ); diff --git a/nat/src/stateful/flows.rs b/nat/src/stateful/flows.rs index bba4d8344f..a102ddd8de 100644 --- a/nat/src/stateful/flows.rs +++ b/nat/src/stateful/flows.rs @@ -21,9 +21,8 @@ use tracing::{debug, error}; pub(crate) fn invalidate_all_masquerading_flows(flow_table: &FlowTable) { debug!("INVALIDATING all masquerading flows..."); flow_table.for_each_flow(|_key, flow_info| { - if let Ok(locked) = flow_info.locked.read() - && locked.nat_state.as_ref().is_some() - { + let locked = flow_info.locked.read(); + if locked.nat_state.as_ref().is_some() { flow_info.invalidate_pair(); } }); @@ -36,9 +35,8 @@ pub(crate) fn upgrade_all_masquerading_flows(flow_table: &FlowTable, genid: GenI flow_table.for_each_flow_filtered( |_key, flow_info: &FlowInfo| flow_info.is_active(), |_, flow_info| { - if let Ok(locked) = flow_info.locked.read() - && locked.nat_state.as_ref().is_some() - { + let locked = flow_info.locked.read(); + if locked.nat_state.as_ref().is_some() { flow_info.set_genid(genid); count += 1; } @@ -48,7 +46,7 @@ pub(crate) fn upgrade_all_masquerading_flows(flow_table: &FlowTable, genid: GenI } fn get_flow_masquerading_allocation(flow_info: &FlowInfo) -> Option<(IpAddr, NatPort)> { - let locked = flow_info.locked.read().ok()?; + let locked = flow_info.locked.read(); let alloc = locked .nat_state .extract_ref::()? @@ -72,7 +70,7 @@ fn re_reserve_ip_and_port( match new_allocator.reserve_port(proto, dst_vpcd, src_ip, ip, port) { Ok(alloc) => { debug!("Successfully re-reserved ip {ip} port {port_u16}"); - let mut guard = flow_info.locked.write().map_err(|_| ())?; + let mut guard = flow_info.locked.write(); let nat_state = guard.nat_state.as_mut().ok_or(())?; let nat_state = nat_state .extract_mut::() diff --git a/nat/src/stateful/icmp_handling.rs b/nat/src/stateful/icmp_handling.rs index a20b35fec1..8b3867b5ad 100644 --- a/nat/src/stateful/icmp_handling.rs +++ b/nat/src/stateful/icmp_handling.rs @@ -21,7 +21,7 @@ pub(crate) fn handle_icmp_error_masquerading( let f = flow_info.logfmt(); debug!("(masquerade): Processing ICMP error message from {src_vpcd} with flow {f}"); - let flow_info_locked = flow_info.locked.read().unwrap(); + let flow_info_locked = flow_info.locked.read(); let state = flow_info_locked .nat_state .extract_ref::() diff --git a/nat/src/stateful/nf.rs b/nat/src/stateful/nf.rs index 17b48f2da1..6c4f8f514e 100644 --- a/nat/src/stateful/nf.rs +++ b/nat/src/stateful/nf.rs @@ -176,7 +176,7 @@ impl StatefulNat { return None; } debug!("Hit ACTIVE flow: {}", flow_info.logfmt()); - let value = flow_info.locked.read().unwrap(); + let value = flow_info.locked.read(); let Some(state) = value.nat_state.as_ref()?.extract_ref::() else { debug!("Unable to access masquerade state"); return None; @@ -200,7 +200,7 @@ impl StatefulNat { ) -> Option<(NatTranslate, Duration)> { let flow_key = FlowKey::uni(src_vpcd, src_ip, dst_ip, proto_key_info); let flow_info = self.flow_table.lookup(&flow_key)?; - let value = flow_info.locked.read().unwrap(); + let value = flow_info.locked.read(); let state = value.nat_state.as_ref()?.extract_ref::()?; Some((state.as_translate(), state.idle_timeout())) } @@ -212,13 +212,10 @@ impl StatefulNat { ) { let flow_key = flow_info.flowkey(); debug!("Setting up masquerade flow state: {flow_key} -> {state}"); - if let Ok(mut write_guard) = flow_info.locked.write() { - write_guard.nat_state = Some(Box::new(state)); - write_guard.dst_vpcd = Some(dst_vpcd); - } else { - // flow info is just locally created - unreachable!() - } + let state = Box::new(state); + let mut write_guard = flow_info.locked.write(); + write_guard.nat_state = Some(state); + write_guard.dst_vpcd = Some(dst_vpcd); } fn get_reverse_mapping(flow_key: &FlowKey) -> Result<(IpAddr, NatPort), StatefulNatError> { @@ -397,7 +394,6 @@ impl StatefulNat { let translate = installed .locked .read() - .unwrap() .nat_state .extract_ref::() .ok_or(StatefulNatError::Bug("Unexpected masquerade state miss"))? diff --git a/nat/src/stateful/test.rs b/nat/src/stateful/test.rs index 5d7055bed1..da5b649e58 100644 --- a/nat/src/stateful/test.rs +++ b/nat/src/stateful/test.rs @@ -373,7 +373,7 @@ fn flow_lookup(flow_table: &FlowTable, packet: &mut Packet } #[tokio::test] -#[traced_test] +#[cfg_attr(not(miri), traced_test)] #[allow(clippy::too_many_lines)] async fn test_full_config() { let config = build_gwconfig_from_overlay(build_overlay_4vpcs()) @@ -544,7 +544,7 @@ fn build_overlay_2vpcs_no_nat() -> Overlay { } #[test] -#[traced_test] +#[cfg_attr(not(miri), traced_test)] fn test_full_config_no_nat() { let config = build_gwconfig_from_overlay(build_overlay_2vpcs_no_nat()) .validate() @@ -1418,7 +1418,7 @@ fn build_overlay_2vpcs_unidirectional_nat_overlapping_exposes() -> Overlay { } #[tokio::test] -#[traced_test] +#[cfg_attr(not(miri), traced_test)] #[allow(clippy::too_many_lines)] async fn test_full_config_unidirectional_nat_overlapping_exposes_for_single_peering() { let config = @@ -1627,7 +1627,6 @@ fn nat_flow_status(packet: &Packet) -> Option { .as_ref()? .locked .read() - .unwrap() .nat_state .as_ref() .and_then(|s| s.extract_ref::()) @@ -1641,7 +1640,6 @@ fn masquerade_state(packet: &Packet) -> Option { .as_ref()? .locked .read() - .unwrap() .nat_state .as_ref() .and_then(|s| s.extract_ref::()) diff --git a/net/src/flows/atomic_instant.rs b/net/src/flows/atomic_instant.rs index 7ecb69da58..496beb66ee 100644 --- a/net/src/flows/atomic_instant.rs +++ b/net/src/flows/atomic_instant.rs @@ -2,9 +2,9 @@ // Copyright Open Network Fabric Authors use atomic_instant_full; +use concurrency::sync::atomic::Ordering; use std::fmt::{Debug, Formatter}; use std::ops::Deref; -use std::sync::atomic::Ordering; use std::time::Instant; #[repr(transparent)] diff --git a/net/src/flows/display.rs b/net/src/flows/display.rs index a89a776f76..63b9695fc3 100644 --- a/net/src/flows/display.rs +++ b/net/src/flows/display.rs @@ -6,6 +6,7 @@ use super::flow_info::{FlowInfo, FlowInfoLocked}; use super::flow_key::{FlowKey, FlowKeyData}; +use concurrency::sync::Weak; use std::fmt::Display; use std::time::Instant; @@ -58,21 +59,15 @@ impl Display for FlowInfo { let expires_at = self.expires_at(); let expires_in = expires_at.saturating_duration_since(Instant::now()); let genid = self.genid(); - - if let Ok(info) = self.locked.read() { - write!(f, "{info}")?; - } else { - write!(f, "could not lock!")?; - } + let info = self.locked.read(); let has_related = self .related .as_ref() - .and_then(std::sync::Weak::upgrade) + .and_then(Weak::upgrade) .map_or("no", |_| "yes"); - writeln!( f, - " status: {:?}, expires in {}s, related: {has_related}, genid: {genid}", + "{info} status: {:?}, expires in {}s, related: {has_related}, genid: {genid}", self.status(), expires_in.as_secs(), ) @@ -106,18 +101,15 @@ impl Display for FlowInfoOneLiner<'_> { let r = flow_info .related .as_ref() - .and_then(std::sync::Weak::upgrade) + .and_then(Weak::upgrade) .map_or("no", |_| "yes"); - if let Ok(info) = flow_info.locked.read() { - write!( - f, - "{key} {} related:{r} genid:{genid}", - FlowInfoLockedOneLiner(&info) - ) - } else { - write!(f, "{key} info:inaccessible! related:{r} genid:{genid}") - } + let info = flow_info.locked.read(); + write!( + f, + "{key} {} related:{r} genid:{genid}", + FlowInfoLockedOneLiner(&info) + ) } } diff --git a/net/src/flows/flow_info.rs b/net/src/flows/flow_info.rs index 0a5a1a9a60..271a0f1d4e 100644 --- a/net/src/flows/flow_info.rs +++ b/net/src/flows/flow_info.rs @@ -7,9 +7,9 @@ use crate::packet::VpcDiscriminant; use concurrency::sync::Arc; use concurrency::sync::RwLock; use concurrency::sync::Weak; +use concurrency::sync::atomic::{AtomicI64, AtomicU8, Ordering}; use std::fmt::{Debug, Display}; use std::mem::MaybeUninit; -use std::sync::atomic::{AtomicI64, AtomicU8, Ordering}; use std::time::{Duration, Instant}; use tokio_util::sync::CancellationToken; use tracing::debug; @@ -80,7 +80,7 @@ pub struct AtomicFlowStatus(AtomicU8); impl Debug for AtomicFlowStatus { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{:?}", self.load(std::sync::atomic::Ordering::Relaxed)) + write!(f, "{:?}", self.load(Ordering::Relaxed)) } } @@ -202,10 +202,7 @@ impl FlowInfo { /// /// This method panics if the inner lock is poisoned pub fn get_dst_vpcd(&self) -> Option { - self.locked - .read() - .expect("Failure locking flow-info for reading") - .dst_vpcd + self.locked.read().dst_vpcd } /// Set the generation Id of a flow @@ -285,7 +282,7 @@ impl FlowInfo { } pub fn expires_at(&self) -> Instant { - self.expires_at.load(std::sync::atomic::Ordering::Relaxed) + self.expires_at.load(Ordering::Relaxed) } /// Extend the expiry of the flow if it is not expired. @@ -295,7 +292,7 @@ impl FlowInfo { /// Returns `FlowInfoError::FlowExpired` if the flow is expired with the expiry `Instant` /// pub fn extend_expiry(&self, duration: Duration) -> Result<(), FlowInfoError> { - if self.status.load(std::sync::atomic::Ordering::Relaxed) == FlowStatus::Expired { + if self.status.load(Ordering::Relaxed) == FlowStatus::Expired { return Err(FlowInfoError::FlowExpired(self.expires_at())); } self.extend_expiry_unchecked(duration); @@ -309,8 +306,7 @@ impl FlowInfo { /// This method is thread-safe. /// pub fn extend_expiry_unchecked(&self, duration: Duration) { - self.expires_at - .fetch_add(duration, std::sync::atomic::Ordering::Relaxed); + self.expires_at.fetch_add(duration, Ordering::Relaxed); } /// Reset the expiry of the flow if it is not expired. @@ -325,7 +321,7 @@ impl FlowInfo { /// Returns `FlowInfoError::TimeoutUnchanged` if the new timeout is smaller than the current. /// Returns `FlowInfoError::FlowCancelled` if the flow had been cancelled pub fn reset_expiry(&self, duration: Duration) -> Result<(), FlowInfoError> { - match self.status.load(std::sync::atomic::Ordering::Relaxed) { + match self.status.load(Ordering::Relaxed) { FlowStatus::Active => self.reset_expiry_unchecked(duration), FlowStatus::Cancelled => Err(FlowInfoError::FlowCancelled), FlowStatus::Detached => Err(FlowInfoError::FlowDetached), @@ -349,8 +345,7 @@ impl FlowInfo { if new < current { return Err(FlowInfoError::TimeoutUnchanged); } - self.expires_at - .store(new, std::sync::atomic::Ordering::Relaxed); + self.expires_at.store(new, Ordering::Relaxed); Ok(()) } @@ -360,7 +355,7 @@ impl FlowInfo { /// /// This method is thread-safe. pub fn status(&self) -> FlowStatus { - self.status.load(std::sync::atomic::Ordering::Relaxed) + self.status.load(Ordering::Relaxed) } /// Tell if a `FlowInfo` is active, i.e. eligible for processing packets that match it. @@ -408,7 +403,6 @@ impl FlowInfo { /// /// This method is thread-safe. pub fn update_status(&self, status: FlowStatus) -> FlowStatus { - self.status - .store(status, std::sync::atomic::Ordering::Relaxed) + self.status.store(status, Ordering::Relaxed) } } diff --git a/net/src/packet/stats.rs b/net/src/packet/stats.rs index fa11319beb..c7cc9d7a07 100644 --- a/net/src/packet/stats.rs +++ b/net/src/packet/stats.rs @@ -4,7 +4,7 @@ //! Module to compute packet processing counters use super::meta::DoneReason; -use std::sync::atomic::{AtomicU64, Ordering}; +use concurrency::sync::atomic::{AtomicU64, Ordering}; use strum::EnumCount; /// A 64-byte aligned atomic u64 diff --git a/pipeline/Cargo.toml b/pipeline/Cargo.toml index 56a4eed0cd..6bdcd91681 100644 --- a/pipeline/Cargo.toml +++ b/pipeline/Cargo.toml @@ -6,7 +6,7 @@ publish.workspace = true version.workspace = true [dependencies] -arc-swap = { workspace = true } +concurrency = { workspace = true } dyn-iter = { workspace = true } id = { workspace = true } linkme = { workspace = true } diff --git a/pipeline/src/pipeline.rs b/pipeline/src/pipeline.rs index 8c23083d30..295b310dd8 100644 --- a/pipeline/src/pipeline.rs +++ b/pipeline/src/pipeline.rs @@ -5,14 +5,14 @@ use crate::dyn_nf::DynNetworkFunctionImpl; use crate::{DynNetworkFunction, NetworkFunction, nf_dyn}; +use concurrency::sync::Arc; +use concurrency::sync::atomic::{AtomicI64, Ordering}; use dyn_iter::{DynIter, IntoDynIterator}; use id::Id; use net::buffer::PacketBufferMut; use net::packet::Packet; use ordermap::OrderMap; use std::any::Any; -use std::sync::Arc; -use std::sync::atomic::AtomicI64; /// A type that represents an Id for a stage or NF pub type StageId = Id>>; @@ -33,12 +33,11 @@ impl PipelineData { } /// Read the generation id pub fn genid(&self) -> i64 { - self.genid.load(std::sync::atomic::Ordering::Relaxed) + self.genid.load(Ordering::Relaxed) } /// Set the generation id pub fn set_genid(&self, genid: i64) { - self.genid - .store(genid, std::sync::atomic::Ordering::Relaxed); + self.genid.store(genid, Ordering::Relaxed); } } diff --git a/pipeline/src/sample_nfs.rs b/pipeline/src/sample_nfs.rs index 0f82a5e844..19d0e41fa3 100644 --- a/pipeline/src/sample_nfs.rs +++ b/pipeline/src/sample_nfs.rs @@ -2,7 +2,9 @@ // Copyright Open Network Fabric Authors use crate::NetworkFunction; -use arc_swap::ArcSwapOption; +use concurrency::slot::SlotOption; +use concurrency::sync::Arc; +use concurrency::sync::atomic::{AtomicBool, Ordering}; use net::buffer::PacketBufferMut; use net::eth::mac::{DestinationMac, Mac}; use net::headers::TryIcmp4; @@ -11,9 +13,6 @@ use net::headers::{TryEthMut, TryHeaders, TryIpv4Mut, TryIpv6Mut}; use net::packet::{DoneReason, Packet, PacketStats}; use net::vxlan::Vxlan; use std::ops::Deref; -use std::sync::Arc; -use std::sync::atomic::AtomicBool; -use std::sync::atomic::Ordering; use strum::EnumCount; use tracectl::custom_target; use tracectl::tdebug; @@ -43,7 +42,7 @@ pub struct PacketDumper { name: String, enabled: AtomicBool, count: u64, - filter: ArcSwapOption>, + filter: SlotOption>, } /// A type that represents a [`Packet`] filter to selectively dump packets. @@ -106,7 +105,7 @@ impl PacketDumper { name: name.to_owned(), enabled: AtomicBool::new(enabled), count: 0, - filter: ArcSwapOption::from_pointee(filter), + filter: SlotOption::from_pointee(filter), } } /// Tells if the [`PacketDumper`] is enabled. diff --git a/pipeline/src/static_nf.rs b/pipeline/src/static_nf.rs index 9a4bb8d5dc..3100f696b7 100644 --- a/pipeline/src/static_nf.rs +++ b/pipeline/src/static_nf.rs @@ -1,10 +1,10 @@ // SPDX-License-Identifier: Apache-2.0 // Copyright Open Network Fabric Authors +use concurrency::sync::Arc; use net::buffer::PacketBufferMut; use net::packet::Packet; use std::marker::PhantomData; -use std::sync::Arc; use crate::PipelineData; diff --git a/routing/src/atable/resolver.rs b/routing/src/atable/resolver.rs index 394e80fd96..ec72510ef8 100644 --- a/routing/src/atable/resolver.rs +++ b/routing/src/atable/resolver.rs @@ -3,11 +3,11 @@ //! Module to resolve ARP from the /proc. This module only supports ARP (IPv4) +use concurrency::sync::Arc; +use concurrency::sync::atomic::{AtomicBool, Ordering}; +use concurrency::thread; +use concurrency::thread::JoinHandle; use std::net::IpAddr; -use std::sync::Arc; -use std::sync::atomic::{AtomicBool, Ordering}; -use std::thread; -use std::thread::JoinHandle; use std::time::Duration; use netdev::Interface; diff --git a/routing/src/fib/fibtable.rs b/routing/src/fib/fibtable.rs index 37b70dba5d..5fbab2a74a 100644 --- a/routing/src/fib/fibtable.rs +++ b/routing/src/fib/fibtable.rs @@ -7,11 +7,11 @@ use crate::RouterError; use crate::fib::fibtype::{FibKey, FibReader, FibReaderFactory, FibWriter}; use crate::rib::vrf::VrfId; +use concurrency::sync::Arc; use left_right::{Absorb, ReadGuard, ReadHandle, ReadHandleFactory, WriteHandle}; use net::vxlan::Vni; use std::collections::BTreeMap; use std::rc::Rc; -use std::sync::Arc; #[allow(unused)] use tracing::{debug, error, info, warn}; diff --git a/routing/src/fib/test.rs b/routing/src/fib/test.rs index 48329e760e..8e47598b62 100644 --- a/routing/src/fib/test.rs +++ b/routing/src/fib/test.rs @@ -254,7 +254,7 @@ mod tests { // number of threads looking up fibtable const NUM_WORKERS: u16 = 6; const NUM_PACKETS: u64 = cfg_select! { - miri => 50, + miri => 30, _ => 100_000, }; const TENTH: u64 = NUM_PACKETS / 10; @@ -281,6 +281,7 @@ mod tests { let handle = Builder::new() .name(format!("WORKER-{n}")) .spawn(move || { + #[cfg(not(miri))] println!("Worker-{n} started"); let mut rng = rand::rng(); let mut packet = test_packet(); @@ -296,6 +297,7 @@ mod tests { if hit == prefix { prefix_hits += 1; if prefix_hits.is_multiple_of(TENTH) { + #[cfg(not(miri))] println!( "Worker {n} is {} % done", prefix_hits * 100 / NUM_PACKETS @@ -315,12 +317,15 @@ mod tests { nofibs += 1; } } - println!("=== Worker {n} finished ===="); - println!("Stats:"); - println!(" {prefix_hits:>8} packets hit {prefix}"); - println!(" {other_hits:>8} packets hit other prefix (0.0.0.0/0)"); - println!(" {nofibs:>8} packets found no fib"); - println!(" {nofib_enter:>8} packets found fib but could not enter"); + #[cfg(not(miri))] + { + println!("=== Worker {n} finished ===="); + println!("Stats:"); + println!(" {prefix_hits:>8} packets hit {prefix}"); + println!(" {other_hits:>8} packets hit other prefix (0.0.0.0/0)"); + println!(" {nofibs:>8} packets found no fib"); + println!(" {nofib_enter:>8} packets found fib but could not enter"); + } worker_done.fetch_add(1, Ordering::Relaxed); }) @@ -374,6 +379,7 @@ mod tests { // stop when all workers are done if done.load(Ordering::Relaxed) == NUM_WORKERS { + #[cfg(not(miri))] println!("All workers finished!"); if let Some(fib) = fibw.take() { // fib is destroyed here @@ -388,6 +394,7 @@ mod tests { } let duration = start.elapsed(); + #[cfg(not(miri))] println!("Test duration: {duration:?}"); } diff --git a/stats/src/vpc_stats.rs b/stats/src/vpc_stats.rs index cec279c8e2..c0e55d0552 100644 --- a/stats/src/vpc_stats.rs +++ b/stats/src/vpc_stats.rs @@ -51,29 +51,19 @@ impl VpcStatsStore { } pub fn set_many_vpc_names_sync(&self, pairs: Vec<(VpcId, String)>) { - let mut m = self - .vpc_names - .write() - .expect("vpc_names write lock poisoned"); + let mut m = self.vpc_names.write(); for (id, name) in pairs { m.insert(id, name); } } pub fn set_vpc_name_sync(&self, id: VpcId, name: String) { - let mut m = self - .vpc_names - .write() - .expect("vpc_names write lock poisoned"); + let mut m = self.vpc_names.write(); m.insert(id, name); } pub fn name_of(&self, id: VpcId) -> Option { - self.vpc_names - .read() - .expect("vpc_names read lock poisoned") - .get(&id) - .cloned() + self.vpc_names.read().get(&id).cloned() } // ---------- Pair (src -> dst) ---------- @@ -163,10 +153,7 @@ impl VpcStatsStore { vpcs.retain(|vpc, _| alive.contains(vpc)); } { - let mut names = self - .vpc_names - .write() - .expect("vpc_names write lock poisoned"); + let mut names = self.vpc_names.write(); names.retain(|vpc, _| alive.contains(vpc)); } } @@ -185,9 +172,6 @@ impl VpcStatsStore { /// Snapshot all VPC names. Declared async to match callers that `.await` it, /// but it does not perform any awaits internally. pub async fn snapshot_names(&self) -> HashMap { - self.vpc_names - .read() - .expect("vpc_names read lock poisoned") - .clone() + self.vpc_names.read().clone() } } From 586483b7116e46ba03aeb34b8371bab2f23bcd74 Mon Sep 17 00:00:00 2001 From: Daniel Noland Date: Fri, 15 May 2026 11:21:13 -0600 Subject: [PATCH 2/2] feat(concurrency): poison-as-panic wrapper for shuttle::sync Add a shuttle backend to the sync facade so existing code can run unchanged under shuttle's random / PCT / DFS schedulers. Backend selection in concurrency/src/sync/mod.rs now picks shuttle ahead of parking_lot when any shuttle* feature is on (and loom is not). loom still wins overall; the loom backend gets its own wrapper in the final PR of this stack. concurrency/src/sync/shuttle_backend.rs mirrors std_backend.rs: a thin poison-as-panic wrap around shuttle::sync::{Mutex, RwLock} that returns naked guards, plus re-exports for Arc / Weak / atomics / Condvar / mpsc that pass through unchanged. OnceLock falls back to std::sync because shuttle does not ship one (it is pure-std machinery and does not need model-checker integration). Three feature flags share one dep:shuttle so callers can pick a scheduler without a second feature axis: * `shuttle` -- random scheduler (default first-time choice). * `shuttle_pct` -- bias toward rare interleavings. * `shuttle_dfs` -- exhaustive small-state exploration. The scheduler difference is purely runtime; the same wrapper compiles for all three. Cleanup as a side effect: * quiescent.rs's cfg dispatch shrinks from `any(feature = "loom", feature = "shuttle")` to `feature = "loom"` because the wrapped shuttle backend now returns a naked guard. * slot.rs's fallback branch picks up shuttle_pct/shuttle_dfs and factors the loom-only unwrap into a local `unwrap_lock!` macro; the macro disappears with the loom wrapper. Tests: * `quiescent_shuttle.rs` runs `check_random` and `check_pct` and now enables on any of the three shuttle features; the std-only test gates on `not(any(...))` so PCT/DFS don't accidentally pick up the std code path. * `quiescent_protocol.rs` / `quiescent_properties.rs` widen the "std only" gate identically. * `lib.rs`'s `pub use ...::thread` matches the same widened gate, so threads spawned from shuttle test bodies use the shuttle executor instead of falling through to `std::thread` and tripping shuttle's `ExecutionState NotSet` panic. Verified with `cargo nextest run -p dataplane-concurrency` under each of: default features, `--features shuttle`, `--features shuttle_pct`, `--features shuttle_dfs`. Signed-off-by: Daniel Noland --- concurrency/Cargo.toml | 15 ++ concurrency/src/lib.rs | 11 +- concurrency/src/macros.rs | 24 +- concurrency/src/quiescent.rs | 15 +- concurrency/src/slot.rs | 34 ++- concurrency/src/sync/mod.rs | 56 +++-- concurrency/src/sync/shuttle_backend.rs | 290 ++++++++++++++++++++++ concurrency/tests/quiescent_properties.rs | 7 +- concurrency/tests/quiescent_protocol.rs | 7 +- concurrency/tests/quiescent_shuttle.rs | 6 +- 10 files changed, 409 insertions(+), 56 deletions(-) create mode 100644 concurrency/src/sync/shuttle_backend.rs diff --git a/concurrency/Cargo.toml b/concurrency/Cargo.toml index 0661ae692c..c494c4e95b 100644 --- a/concurrency/Cargo.toml +++ b/concurrency/Cargo.toml @@ -9,7 +9,22 @@ version.workspace = true default = ["parking_lot"] loom = ["dep:loom", "concurrency-macros/loom"] parking_lot = ["dep:parking_lot"] +# `shuttle*` are three views of one backend, not three backends: +# +# * `shuttle` -- shuttle with the random scheduler (the default +# for first-time users -- you almost always want +# this one). +# * `shuttle_pct` -- shuttle with the PCT scheduler. Use when you +# want to bias toward rare interleavings. +# * `shuttle_dfs` -- shuttle with the DFS scheduler. Use for +# exhaustive small-state exploration. +# +# All three share the same `dep:shuttle` machinery; only the scheduler +# selected at runtime differs. See the `concurrency_mode` macro and +# `concurrency::stress` (added in a later PR) for the dispatch. shuttle = ["dep:shuttle", "concurrency-macros/shuttle"] +shuttle_pct = ["dep:shuttle", "concurrency-macros/shuttle"] +shuttle_dfs = ["dep:shuttle", "concurrency-macros/shuttle"] silence_clippy = ["concurrency-macros/silence_clippy"] # Do not manually enable this feature, let --all-features do it for you # Force the Mutex-based slot fallback even without a model checker. # Intended for the CI miri job that exercises the fallback slot under diff --git a/concurrency/src/lib.rs b/concurrency/src/lib.rs index 10415eb60a..d36d4f0757 100644 --- a/concurrency/src/lib.rs +++ b/concurrency/src/lib.rs @@ -18,18 +18,23 @@ pub mod sync; #[cfg(all(miri, any(feature = "shuttle", feature = "loom")))] compile_error!("miri does not meaningfully support 'loom' or 'shuttle'"); -#[cfg(not(any(feature = "loom", feature = "shuttle")))] +#[cfg(not(any( + feature = "loom", + feature = "shuttle", + feature = "shuttle_pct", + feature = "shuttle_dfs" +)))] pub use std::thread; #[cfg(all( feature = "loom", - not(feature = "shuttle"), + not(any(feature = "shuttle", feature = "shuttle_pct", feature = "shuttle_dfs")), not(feature = "silence_clippy") ))] pub use loom::thread; #[cfg(all( - feature = "shuttle", + any(feature = "shuttle", feature = "shuttle_pct", feature = "shuttle_dfs"), not(feature = "loom"), not(feature = "silence_clippy") ))] diff --git a/concurrency/src/macros.rs b/concurrency/src/macros.rs index 6dbe7a5658..1564d9698d 100644 --- a/concurrency/src/macros.rs +++ b/concurrency/src/macros.rs @@ -15,7 +15,10 @@ /// } /// } /// ``` -#[cfg(all(feature = "shuttle", not(feature = "silence_clippy")))] +#[cfg(all( + any(feature = "shuttle", feature = "shuttle_pct", feature = "shuttle_dfs"), + not(feature = "silence_clippy") +))] #[macro_export] macro_rules! with_shuttle { ($($item:item)*) => { @@ -39,7 +42,10 @@ macro_rules! with_shuttle { /// } /// } /// ``` -#[cfg(any(not(feature = "shuttle"), feature = "silence_clippy"))] +#[cfg(any( + not(any(feature = "shuttle", feature = "shuttle_pct", feature = "shuttle_dfs")), + feature = "silence_clippy" +))] #[macro_export] macro_rules! with_shuttle { ($($item:item)*) => {}; @@ -103,7 +109,12 @@ macro_rules! with_loom { /// } /// } /// ``` -#[cfg(not(any(feature = "loom", feature = "shuttle")))] +#[cfg(not(any( + feature = "loom", + feature = "shuttle", + feature = "shuttle_pct", + feature = "shuttle_dfs" +)))] #[macro_export] macro_rules! with_std { ($($item:item)*) => { @@ -153,7 +164,12 @@ macro_rules! with_std { /// ``` #[cfg(all( not(feature = "silence_clippy"), - any(feature = "loom", feature = "shuttle") + any( + feature = "loom", + feature = "shuttle", + feature = "shuttle_pct", + feature = "shuttle_dfs" + ) ))] #[macro_export] macro_rules! with_std { diff --git a/concurrency/src/quiescent.rs b/concurrency/src/quiescent.rs index fd47a827d5..0a25dd6d85 100644 --- a/concurrency/src/quiescent.rs +++ b/concurrency/src/quiescent.rs @@ -56,15 +56,14 @@ impl Domain { fn register(&self) -> Epoch { let epoch = Epoch::new(); let guard = self.active.lock(); - // Loom and shuttle still expose `LockResult`; PRs - // extending the poison-as-panic wrapper to those backends will - // drop this `.expect` (the default backend already returns a - // naked guard). - #[cfg(any(feature = "loom", feature = "shuttle"))] + // Loom still exposes `LockResult` raw; the + // poison-as-panic wrapper for loom lands in a subsequent PR + // and drops this `.expect`. + #[cfg(feature = "loom")] #[allow(clippy::expect_used)] // the mutex is poisoned only in unrecoverable error cases let mut guard = guard.expect("qsbr mutex poisoned"); - #[cfg(not(any(feature = "loom", feature = "shuttle")))] + #[cfg(not(feature = "loom"))] let mut guard = guard; guard.push(Arc::clone(&epoch.cell)); epoch @@ -72,11 +71,11 @@ impl Domain { fn min_observed(&self) -> Option { let guard = self.active.lock(); - #[cfg(any(feature = "loom", feature = "shuttle"))] + #[cfg(feature = "loom")] #[allow(clippy::expect_used)] // the mutex is poisoned only in unrecoverable error cases let mut active = guard.expect("qsbr mutex poisoned"); - #[cfg(not(any(feature = "loom", feature = "shuttle")))] + #[cfg(not(feature = "loom"))] let mut active = guard; let mut min = u64::MAX; let mut any_in_flight = false; diff --git a/concurrency/src/slot.rs b/concurrency/src/slot.rs index 6123f864ab..c516496fcc 100644 --- a/concurrency/src/slot.rs +++ b/concurrency/src/slot.rs @@ -22,9 +22,23 @@ // As a result, we can still check for provenance violations in this crate, but only with the Mutex based // fallback implementation. cfg_select! { - any(feature = "loom", feature = "shuttle", feature = "_strict_provenance") => { + any(feature = "loom", feature = "shuttle", feature = "shuttle_pct", feature = "shuttle_dfs", feature = "_strict_provenance") => { use crate::sync::{Arc, Mutex}; + // Loom still exposes `LockResult`-shaped `.lock()`; the wrapped + // backends (shuttle, _strict_provenance) return naked guards. + // PR 6 wraps loom too and drops this helper. + macro_rules! unwrap_lock { + ($guard:expr) => {{ + #[cfg(feature = "loom")] + #[allow(clippy::expect_used)] // poisoned only in unrecoverable cases + let g = $guard.expect("slot mutex poisoned"); + #[cfg(not(feature = "loom"))] + let g = $guard; + g + }}; + } + pub struct Slot(Mutex>); impl Slot { @@ -38,20 +52,17 @@ cfg_select! { } pub fn load_full(&self) -> Arc { - #[allow(clippy::expect_used)] // poisoned only in unrecoverable cases - let guard = self.0.lock().expect("slot mutex poisoned"); + let guard = unwrap_lock!(self.0.lock()); Arc::clone(&*guard) } pub fn swap(&self, new: Arc) -> Arc { - #[allow(clippy::expect_used)] - let mut guard = self.0.lock().expect("slot mutex poisoned"); + let mut guard = unwrap_lock!(self.0.lock()); core::mem::replace(&mut *guard, new) } pub fn store(&self, new: Arc) { - #[allow(clippy::expect_used)] - let mut guard = self.0.lock().expect("slot mutex poisoned"); + let mut guard = unwrap_lock!(self.0.lock()); *guard = new; } } @@ -77,20 +88,17 @@ cfg_select! { } pub fn load_full(&self) -> Option> { - #[allow(clippy::expect_used)] - let guard = self.0.lock().expect("slot mutex poisoned"); + let guard = unwrap_lock!(self.0.lock()); guard.as_ref().map(Arc::clone) } pub fn swap(&self, new: Option>) -> Option> { - #[allow(clippy::expect_used)] - let mut guard = self.0.lock().expect("slot mutex poisoned"); + let mut guard = unwrap_lock!(self.0.lock()); core::mem::replace(&mut *guard, new) } pub fn store(&self, new: Option>) { - #[allow(clippy::expect_used)] - let mut guard = self.0.lock().expect("slot mutex poisoned"); + let mut guard = unwrap_lock!(self.0.lock()); *guard = new; } } diff --git a/concurrency/src/sync/mod.rs b/concurrency/src/sync/mod.rs index f3234a237d..b12691bde4 100644 --- a/concurrency/src/sync/mod.rs +++ b/concurrency/src/sync/mod.rs @@ -8,51 +8,61 @@ //! //! Selection (in priority order): //! -//! * `loom` / `shuttle` features: raw re-export of the model-checker's -//! `LockResult`-based primitives. Subsequent PRs wrap these too. +//! * `loom` feature: raw re-export of `loom::sync`'s `LockResult`-based +//! primitives. A subsequent PR adds the same poison-as-panic wrap +//! here. +//! * `shuttle` / `shuttle_pct` / `shuttle_dfs` features: poison-as-panic +//! wrapper around `shuttle::sync`. All three flavours share one +//! wrapper module; the scheduler difference is runtime-only (see +//! `concurrency::stress`). //! * `parking_lot` feature (default): zero-cost re-export of //! `parking_lot`'s naked-guard locks; the production hot path. //! * Otherwise: `std_backend` -- a thin poison-as-panic wrapper around -//! `std::sync`. Same surface as the `parking_lot` re-export, one -//! extra match on acquire. Lets `--no-default-features` builds -//! compile without depending on `parking_lot`. +//! `std::sync`. Lets `--no-default-features` builds compile without +//! depending on `parking_lot`. +// loom takes priority so the model checker can poison its internal state +// (used for tests that opt loom in explicitly). +#[cfg(all(feature = "loom", not(feature = "silence_clippy")))] +pub use loom::sync::*; + +#[cfg(all( + not(feature = "loom"), + any(feature = "shuttle", feature = "shuttle_pct", feature = "shuttle_dfs") +))] +mod shuttle_backend; #[cfg(all( - not(any(feature = "loom", feature = "shuttle")), + not(feature = "loom"), + any(feature = "shuttle", feature = "shuttle_pct", feature = "shuttle_dfs") +))] +pub use shuttle_backend::*; + +#[cfg(all( + not(feature = "loom"), + not(any(feature = "shuttle", feature = "shuttle_pct", feature = "shuttle_dfs")), feature = "parking_lot", ))] mod parking_lot_backend; #[cfg(all( - not(any(feature = "loom", feature = "shuttle")), + not(feature = "loom"), + not(any(feature = "shuttle", feature = "shuttle_pct", feature = "shuttle_dfs")), feature = "parking_lot", ))] pub use parking_lot_backend::*; #[cfg(all( - not(any(feature = "loom", feature = "shuttle")), + not(feature = "loom"), + not(any(feature = "shuttle", feature = "shuttle_pct", feature = "shuttle_dfs")), not(feature = "parking_lot"), ))] mod std_backend; #[cfg(all( - not(any(feature = "loom", feature = "shuttle")), + not(feature = "loom"), + not(any(feature = "shuttle", feature = "shuttle_pct", feature = "shuttle_dfs")), not(feature = "parking_lot"), ))] pub use std_backend::*; -#[cfg(all( - feature = "loom", - not(feature = "shuttle"), - not(feature = "silence_clippy") -))] -pub use loom::sync::*; - -#[cfg(all( - feature = "shuttle", - not(feature = "loom"), - not(feature = "silence_clippy") -))] -pub use shuttle::sync::*; - // Match the silence_clippy escape hatch in lib.rs: when both loom and // shuttle are pulled in (under `--all-features`), route sync through // `std` purely to keep clippy happy. The binary is never executed in diff --git a/concurrency/src/sync/shuttle_backend.rs b/concurrency/src/sync/shuttle_backend.rs new file mode 100644 index 0000000000..9a710edd48 --- /dev/null +++ b/concurrency/src/sync/shuttle_backend.rs @@ -0,0 +1,290 @@ +// SPDX-License-Identifier: Apache-2.0 +// Copyright Open Network Fabric Authors + +//! Shuttle backend: poison-as-panic wrapper around `shuttle::sync`. +//! +//! `shuttle::sync::{Mutex, RwLock}` return `LockResult` because +//! they mirror `std::sync`'s shape. This workspace treats poison as a +//! fatal invariant violation; the wrapper below strips `LockResult` +//! and panics, presenting the same naked-guard surface as the default +//! production backend. +//! +//! `OnceLock` is taken from `std::sync` (shuttle doesn't ship one); +//! it's pure-std machinery so it doesn't need model-checker +//! integration. + +// Wrapping below panics on poison. clippy::panic is denied at the +// crate root; allow it locally for the cold poisoned() helper. +#![allow(clippy::panic)] + +use core::fmt; +use core::ops::{Deref, DerefMut}; +use shuttle::sync as inner; + +pub use shuttle::sync::{ + Arc, Barrier, BarrierWaitResult, Condvar, LockResult, Once, OnceState, PoisonError, + TryLockError, TryLockResult, WaitTimeoutResult, Weak, atomic, mpsc, +}; +pub use std::sync::OnceLock; + +#[inline(never)] +#[cold] +fn poisoned() -> ! { + panic!( + "concurrency::sync lock was poisoned: a previous holder panicked while \ + holding the lock; propagating the failure" + ); +} + +// =============================== Mutex ==================================== + +pub struct Mutex(inner::Mutex); + +#[must_use = "if unused the Mutex will immediately unlock"] +pub struct MutexGuard<'a, T: ?Sized + 'a>(inner::MutexGuard<'a, T>); + +impl Mutex { + #[inline] + pub fn new(value: T) -> Self { + Self(inner::Mutex::new(value)) + } + + #[inline] + pub fn into_inner(self) -> T { + match self.0.into_inner() { + Ok(v) => v, + Err(_) => poisoned(), + } + } +} + +impl Mutex { + #[inline] + pub fn lock(&self) -> MutexGuard<'_, T> { + match self.0.lock() { + Ok(g) => MutexGuard(g), + Err(_) => poisoned(), + } + } + + #[inline] + pub fn try_lock(&self) -> Option> { + match self.0.try_lock() { + Ok(g) => Some(MutexGuard(g)), + Err(TryLockError::Poisoned(_)) => poisoned(), + Err(TryLockError::WouldBlock) => None, + } + } + + #[inline] + pub fn get_mut(&mut self) -> &mut T { + match self.0.get_mut() { + Ok(v) => v, + Err(_) => poisoned(), + } + } +} + +impl fmt::Debug for Mutex { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Mutex").finish_non_exhaustive() + } +} + +impl Default for Mutex { + fn default() -> Self { + Self::new(T::default()) + } +} + +impl From for Mutex { + fn from(value: T) -> Self { + Self::new(value) + } +} + +impl Deref for MutexGuard<'_, T> { + type Target = T; + #[inline] + fn deref(&self) -> &T { + &self.0 + } +} + +impl DerefMut for MutexGuard<'_, T> { + #[inline] + fn deref_mut(&mut self) -> &mut T { + &mut self.0 + } +} + +impl fmt::Debug for MutexGuard<'_, T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt::Debug::fmt(&**self, f) + } +} + +impl fmt::Display for MutexGuard<'_, T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt::Display::fmt(&**self, f) + } +} + +// =============================== RwLock =================================== + +pub struct RwLock(inner::RwLock); + +#[must_use = "if unused the RwLock will immediately unlock"] +pub struct RwLockReadGuard<'a, T: 'a>(inner::RwLockReadGuard<'a, T>); + +#[must_use = "if unused the RwLock will immediately unlock"] +pub struct RwLockWriteGuard<'a, T: 'a>(inner::RwLockWriteGuard<'a, T>); + +/// Upgradable-read guard for [`RwLock`]. +/// +/// Implemented as an exclusive write guard; shuttle has no native +/// upgradable read. +#[must_use = "if unused the RwLock will immediately unlock"] +pub struct RwLockUpgradableReadGuard<'a, T: 'a>(inner::RwLockWriteGuard<'a, T>); + +impl RwLock { + #[inline] + pub fn new(value: T) -> Self { + Self(inner::RwLock::new(value)) + } + + #[inline] + pub fn into_inner(self) -> T { + match self.0.into_inner() { + Ok(v) => v, + Err(_) => poisoned(), + } + } + + #[inline] + pub fn read(&self) -> RwLockReadGuard<'_, T> { + match self.0.read() { + Ok(g) => RwLockReadGuard(g), + Err(_) => poisoned(), + } + } + + #[inline] + pub fn write(&self) -> RwLockWriteGuard<'_, T> { + match self.0.write() { + Ok(g) => RwLockWriteGuard(g), + Err(_) => poisoned(), + } + } + + #[inline] + pub fn try_read(&self) -> Option> { + match self.0.try_read() { + Ok(g) => Some(RwLockReadGuard(g)), + Err(TryLockError::Poisoned(_)) => poisoned(), + Err(TryLockError::WouldBlock) => None, + } + } + + #[inline] + pub fn try_write(&self) -> Option> { + match self.0.try_write() { + Ok(g) => Some(RwLockWriteGuard(g)), + Err(TryLockError::Poisoned(_)) => poisoned(), + Err(TryLockError::WouldBlock) => None, + } + } + + /// Acquire an upgradable read guard. + /// + /// Shuttle has no native upgradable-read; this is an exclusive + /// `write()`. Sound but loses the many-readers-plus-one-upgradable + /// schedule that `parking_lot` permits. + #[inline] + pub fn upgradable_read(&self) -> RwLockUpgradableReadGuard<'_, T> { + match self.0.write() { + Ok(g) => RwLockUpgradableReadGuard(g), + Err(_) => poisoned(), + } + } + + #[inline] + pub fn get_mut(&mut self) -> &mut T { + match self.0.get_mut() { + Ok(v) => v, + Err(_) => poisoned(), + } + } +} + +impl<'a, T: 'a> RwLockUpgradableReadGuard<'a, T> { + #[inline] + pub fn upgrade(s: Self) -> RwLockWriteGuard<'a, T> { + RwLockWriteGuard(s.0) + } + + /// # Errors + /// + /// Never returns `Err`; the `Result` shape matches `parking_lot`. + #[inline] + pub fn try_upgrade(s: Self) -> Result, Self> { + Ok(RwLockWriteGuard(s.0)) + } +} + +impl fmt::Debug for RwLock { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("RwLock").finish_non_exhaustive() + } +} + +impl Default for RwLock { + fn default() -> Self { + Self::new(T::default()) + } +} + +impl From for RwLock { + fn from(value: T) -> Self { + Self::new(value) + } +} + +macro_rules! impl_rwlock_guard_traits { + ($guard:ident, $mutability:ident) => { + impl Deref for $guard<'_, T> { + type Target = T; + #[inline] + fn deref(&self) -> &T { + &*self.0 + } + } + + impl fmt::Debug for $guard<'_, T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt::Debug::fmt(&**self, f) + } + } + + impl fmt::Display for $guard<'_, T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt::Display::fmt(&**self, f) + } + } + + impl_rwlock_guard_traits!(@mut $guard, $mutability); + }; + (@mut $guard:ident, mut) => { + impl DerefMut for $guard<'_, T> { + #[inline] + fn deref_mut(&mut self) -> &mut T { + &mut *self.0 + } + } + }; + (@mut $guard:ident, immut) => {}; +} + +impl_rwlock_guard_traits!(RwLockReadGuard, immut); +impl_rwlock_guard_traits!(RwLockWriteGuard, mut); +impl_rwlock_guard_traits!(RwLockUpgradableReadGuard, immut); diff --git a/concurrency/tests/quiescent_properties.rs b/concurrency/tests/quiescent_properties.rs index a862822e09..e23f4c158d 100644 --- a/concurrency/tests/quiescent_properties.rs +++ b/concurrency/tests/quiescent_properties.rs @@ -29,7 +29,12 @@ // Single-threaded bolero property tests; only meaningful under the // default backend. Same rationale as in `quiescent_protocol.rs`. -#![cfg(not(any(feature = "loom", feature = "shuttle")))] +#![cfg(not(any( + feature = "loom", + feature = "shuttle", + feature = "shuttle_pct", + feature = "shuttle_dfs" +)))] use bolero::TypeGenerator; use dataplane_concurrency::quiescent::channel; diff --git a/concurrency/tests/quiescent_protocol.rs b/concurrency/tests/quiescent_protocol.rs index 2abb01469f..d08dda5f83 100644 --- a/concurrency/tests/quiescent_protocol.rs +++ b/concurrency/tests/quiescent_protocol.rs @@ -28,7 +28,12 @@ // backend (loom or any shuttle variant) the surrounding facade is rewired // and the std-shaped types these tests use would either fail to compile or // fault outside the corresponding runtime. -#![cfg(not(any(feature = "loom", feature = "shuttle")))] +#![cfg(not(any( + feature = "loom", + feature = "shuttle", + feature = "shuttle_pct", + feature = "shuttle_dfs" +)))] use dataplane_concurrency::quiescent::channel; use dataplane_concurrency::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; diff --git a/concurrency/tests/quiescent_shuttle.rs b/concurrency/tests/quiescent_shuttle.rs index f4ff609e2d..7e94868c8c 100644 --- a/concurrency/tests/quiescent_shuttle.rs +++ b/concurrency/tests/quiescent_shuttle.rs @@ -164,13 +164,13 @@ fn fuzz_test( } #[test] -#[cfg(feature = "shuttle")] +#[cfg(any(feature = "shuttle", feature = "shuttle_pct", feature = "shuttle_dfs"))] fn protocol_under_shuttle() { fuzz_test(|plan: Plan| shuttle::check_random(move || run_plan(&plan), 1)); } #[test] -#[cfg(feature = "shuttle")] +#[cfg(any(feature = "shuttle", feature = "shuttle_pct", feature = "shuttle_dfs"))] fn protocol_under_shuttle_pct() { fuzz_test(|plan: Plan| { // PCT requires both threads to actually do atomic ops; if @@ -194,7 +194,7 @@ fn protocol_under_shuttle_pct() { } #[test] -#[cfg(not(feature = "shuttle"))] +#[cfg(not(any(feature = "shuttle", feature = "shuttle_pct", feature = "shuttle_dfs")))] fn protocol_under_std() { fuzz_test(|plan: Plan| run_plan(&plan)); }