From 0b0cd67da5ba3643fcfbfea17da691f0fb0e6cd9 Mon Sep 17 00:00:00 2001 From: Alexander Shevtsov Date: Tue, 28 Apr 2026 18:12:07 +0200 Subject: [PATCH 1/2] Add probing service Introduce a background probing service that periodically sends payment probes to discover liquidity along Lightning routes. Probes update the local scorer with channel liquidity information, improving pathfinding for subsequent real payments. The service supports three strategies: - HighDegree: probes nodes with the most channels in the network graph - Random: walks random paths from the local node - Custom: user-supplied strategy via the `ProbingStrategy` trait A dedicated `ProbingConfigBuilder` exposes amount bounds, locked-msat caps, probing intervals, and per-node cooldowns, with sensible defaults. The service runs as a cancellable background task driven by the existing `Runtime`, and budget accounting tracks both in-flight and locked amounts to bound outbound liquidity exposure. UniFFI bindings expose the probing service to the Swift, Kotlin, and Python language bindings. Co-Authored-By: Claude Sonnet 4.6 --- bindings/ldk_node.udl | 5 + src/builder.rs | 82 ++++- src/config.rs | 6 + src/event.rs | 29 +- src/lib.rs | 21 ++ src/probing.rs | 749 ++++++++++++++++++++++++++++++++++++++++++ src/util.rs | 37 +++ 7 files changed, 916 insertions(+), 13 deletions(-) create mode 100644 src/probing.rs create mode 100644 src/util.rs diff --git a/bindings/ldk_node.udl b/bindings/ldk_node.udl index 7368b0291..beb81efa2 100644 --- a/bindings/ldk_node.udl +++ b/bindings/ldk_node.udl @@ -13,6 +13,10 @@ typedef dictionary TorConfig; typedef interface NodeEntropy; +typedef interface ProbingConfig; + +typedef interface ProbingConfigBuilder; + typedef enum WordCount; [Remote] @@ -61,6 +65,7 @@ interface Builder { [Throws=BuildError] void set_async_payments_role(AsyncPaymentsRole? role); void set_wallet_recovery_mode(); + void set_probing_config(ProbingConfig config); [Throws=BuildError] Node build(NodeEntropy node_entropy); [Throws=BuildError] diff --git a/src/builder.rs b/src/builder.rs index 0b44dc153..5660d6b75 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -10,6 +10,7 @@ use std::convert::TryInto; use std::default::Default; use std::net::ToSocketAddrs; use std::path::PathBuf; +use std::sync::atomic::AtomicU64; use std::sync::{Arc, Mutex, Once, RwLock}; use std::time::SystemTime; use std::{fmt, fs}; @@ -51,6 +52,7 @@ use crate::config::{ default_user_config, may_announce_channel, AnnounceError, AsyncPaymentsRole, BitcoindRestClientConfig, Config, ElectrumSyncConfig, EsploraSyncConfig, HRNResolverConfig, TorConfig, DEFAULT_ESPLORA_SERVER_URL, DEFAULT_LOG_FILENAME, DEFAULT_LOG_LEVEL, + DEFAULT_MAX_PROBE_AMOUNT_MSAT, DEFAULT_MIN_PROBE_AMOUNT_MSAT, }; use crate::connection::ConnectionManager; use crate::entropy::NodeEntropy; @@ -77,6 +79,9 @@ use crate::logger::{log_error, LdkLogger, LogLevel, LogWriter, Logger}; use crate::message_handler::NodeCustomMessageHandler; use crate::payment::asynchronous::om_mailbox::OnionMessageMailbox; use crate::peer_store::PeerStore; +use crate::probing::{ + HighDegreeStrategy, Prober, ProbingConfig, ProbingStrategy, ProbingStrategyKind, RandomStrategy, +}; use crate::runtime::{Runtime, RuntimeSpawner}; use crate::tx_broadcaster::TransactionBroadcaster; use crate::types::{ @@ -293,6 +298,7 @@ pub struct NodeBuilder { runtime_handle: Option, pathfinding_scores_sync_config: Option, recovery_mode: bool, + probing_config: Option, } impl NodeBuilder { @@ -311,6 +317,8 @@ impl NodeBuilder { let runtime_handle = None; let pathfinding_scores_sync_config = None; let recovery_mode = false; + let async_payments_role = None; + let probing_config = None; Self { config, chain_data_source_config, @@ -318,9 +326,10 @@ impl NodeBuilder { liquidity_source_config, log_writer_config, runtime_handle, - async_payments_role: None, + async_payments_role, pathfinding_scores_sync_config, recovery_mode, + probing_config, } } @@ -626,6 +635,25 @@ impl NodeBuilder { self } + /// Configures background probing. + /// + /// Use [`ProbingConfigBuilder`] to build the configuration: + /// ```ignore + /// use ldk_node::probing::ProbingConfigBuilder; + /// + /// builder.set_probing_config( + /// ProbingConfigBuilder::high_degree(100) + /// .interval(Duration::from_secs(30)) + /// .build() + /// ); + /// ``` + /// + /// [`ProbingConfigBuilder`]: crate::probing::ProbingConfigBuilder + pub fn set_probing_config(&mut self, config: ProbingConfig) -> &mut Self { + self.probing_config = Some(config); + self + } + /// Builds a [`Node`] instance with a [`SqliteStore`] backend and according to the options /// previously configured. pub fn build(&self, node_entropy: NodeEntropy) -> Result { @@ -797,6 +825,7 @@ impl NodeBuilder { self.gossip_source_config.as_ref(), self.liquidity_source_config.as_ref(), self.pathfinding_scores_sync_config.as_ref(), + self.probing_config.as_ref(), self.async_payments_role, self.recovery_mode, seed_bytes, @@ -1097,6 +1126,13 @@ impl ArcedNodeBuilder { self.inner.write().expect("lock").set_wallet_recovery_mode(); } + /// Configures background probing. + /// + /// See [`ProbingConfig`] for details. + pub fn set_probing_config(&self, config: Arc) { + self.inner.write().unwrap().set_probing_config((*config).clone()); + } + /// Builds a [`Node`] instance with a [`SqliteStore`] backend and according to the options /// previously configured. pub fn build(&self, node_entropy: Arc) -> Result, BuildError> { @@ -1240,8 +1276,9 @@ fn build_with_store_internal( gossip_source_config: Option<&GossipSourceConfig>, liquidity_source_config: Option<&LiquiditySourceConfig>, pathfinding_scores_sync_config: Option<&PathfindingScoresSyncConfig>, - async_payments_role: Option, recovery_mode: bool, seed_bytes: [u8; 64], - runtime: Arc, logger: Arc, kv_store: Arc, + probing_config: Option<&ProbingConfig>, async_payments_role: Option, + recovery_mode: bool, seed_bytes: [u8; 64], runtime: Arc, logger: Arc, + kv_store: Arc, ) -> Result { optionally_install_rustls_cryptoprovider(); @@ -1639,7 +1676,10 @@ fn build_with_store_internal( }, } - let scoring_fee_params = ProbabilisticScoringFeeParameters::default(); + let mut scoring_fee_params = ProbabilisticScoringFeeParameters::default(); + if let Some(penalty) = probing_config.and_then(|c| c.diversity_penalty_msat) { + scoring_fee_params.probing_diversity_penalty_msat = penalty; + } let router = Arc::new(DefaultRouter::new( Arc::clone(&network_graph), Arc::clone(&logger), @@ -2019,6 +2059,39 @@ fn build_with_store_internal( _leak_checker.0.push(Arc::downgrade(&wallet) as Weak); } + let prober = probing_config.map(|probing_cfg| { + let strategy: Arc = match &probing_cfg.kind { + ProbingStrategyKind::HighDegree { top_node_count } => { + Arc::new(HighDegreeStrategy::new( + Arc::clone(&network_graph), + Arc::clone(&channel_manager), + Arc::clone(&router), + *top_node_count, + DEFAULT_MIN_PROBE_AMOUNT_MSAT, + DEFAULT_MAX_PROBE_AMOUNT_MSAT, + probing_cfg.cooldown, + config.probing_liquidity_limit_multiplier, + )) + }, + ProbingStrategyKind::Random { max_hops } => Arc::new(RandomStrategy::new( + Arc::clone(&network_graph), + Arc::clone(&channel_manager), + *max_hops, + DEFAULT_MIN_PROBE_AMOUNT_MSAT, + DEFAULT_MAX_PROBE_AMOUNT_MSAT, + )), + ProbingStrategyKind::Custom(s) => Arc::clone(s), + }; + Arc::new(Prober { + channel_manager: Arc::clone(&channel_manager), + logger: Arc::clone(&logger), + strategy, + interval: probing_cfg.interval, + max_locked_msat: probing_cfg.max_locked_msat, + locked_msat: Arc::new(AtomicU64::new(0)), + }) + }); + Ok(Node { runtime, stop_sender, @@ -2052,6 +2125,7 @@ fn build_with_store_internal( om_mailbox, async_payments_role, hrn_resolver, + prober, #[cfg(cycle_tests)] _leak_checker, }) diff --git a/src/config.rs b/src/config.rs index 014d6216a..8b28d4015 100644 --- a/src/config.rs +++ b/src/config.rs @@ -28,6 +28,12 @@ const DEFAULT_BDK_WALLET_SYNC_INTERVAL_SECS: u64 = 80; const DEFAULT_LDK_WALLET_SYNC_INTERVAL_SECS: u64 = 30; const DEFAULT_FEE_RATE_CACHE_UPDATE_INTERVAL_SECS: u64 = 60 * 10; const DEFAULT_PROBING_LIQUIDITY_LIMIT_MULTIPLIER: u64 = 3; +pub(crate) const DEFAULT_PROBING_INTERVAL_SECS: u64 = 10; +pub(crate) const MIN_PROBING_INTERVAL: Duration = Duration::from_millis(100); +pub(crate) const DEFAULT_PROBED_NODE_COOLDOWN_SECS: u64 = 60 * 60; // 1 hour +pub(crate) const DEFAULT_MAX_PROBE_LOCKED_MSAT: u64 = 100_000_000; // 100k sats +pub(crate) const DEFAULT_MIN_PROBE_AMOUNT_MSAT: u64 = 1_000_000; // 1k sats +pub(crate) const DEFAULT_MAX_PROBE_AMOUNT_MSAT: u64 = 10_000_000; // 10k sats const DEFAULT_ANCHOR_PER_CHANNEL_RESERVE_SATS: u64 = 25_000; // The default timeout after which we abort a wallet syncing operation. diff --git a/src/event.rs b/src/event.rs index 3161daa2a..3eda18790 100644 --- a/src/event.rs +++ b/src/event.rs @@ -52,6 +52,7 @@ use crate::payment::asynchronous::static_invoice_store::StaticInvoiceStore; use crate::payment::store::{ PaymentDetails, PaymentDetailsUpdate, PaymentDirection, PaymentKind, PaymentStatus, }; +use crate::probing::Prober; use crate::runtime::Runtime; use crate::types::{ CustomTlvRecord, DynStore, KeysManager, OnionMessenger, PaymentStore, Sweeper, Wallet, @@ -509,12 +510,13 @@ where payment_store: Arc, peer_store: Arc>, keys_manager: Arc, - runtime: Arc, - logger: L, - config: Arc, static_invoice_store: Option, onion_messenger: Arc, om_mailbox: Option>, + prober: Option>, + runtime: Arc, + logger: L, + config: Arc, } impl EventHandler @@ -530,7 +532,7 @@ where payment_store: Arc, peer_store: Arc>, keys_manager: Arc, static_invoice_store: Option, onion_messenger: Arc, om_mailbox: Option>, - runtime: Arc, logger: L, config: Arc, + prober: Option>, runtime: Arc, logger: L, config: Arc, ) -> Self { Self { event_queue, @@ -544,12 +546,13 @@ where payment_store, peer_store, keys_manager, - logger, - runtime, - config, static_invoice_store, onion_messenger, om_mailbox, + prober, + runtime, + logger, + config, } } @@ -1158,8 +1161,16 @@ where LdkEvent::PaymentPathSuccessful { .. } => {}, LdkEvent::PaymentPathFailed { .. } => {}, - LdkEvent::ProbeSuccessful { .. } => {}, - LdkEvent::ProbeFailed { .. } => {}, + LdkEvent::ProbeSuccessful { path, .. } => { + if let Some(prober) = &self.prober { + prober.handle_probe_successful(&path); + } + }, + LdkEvent::ProbeFailed { path, .. } => { + if let Some(prober) = &self.prober { + prober.handle_probe_failed(&path); + } + }, LdkEvent::HTLCHandlingFailed { failure_type, .. } => { if let Some(liquidity_source) = self.liquidity_source.as_ref() { liquidity_source.handle_htlc_handling_failed(failure_type).await; diff --git a/src/lib.rs b/src/lib.rs index 6902228a6..f0d695af6 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -101,10 +101,12 @@ pub mod logger; mod message_handler; pub mod payment; mod peer_store; +pub mod probing; mod runtime; mod scoring; mod tx_broadcaster; mod types; +mod util; mod wallet; use std::default::Default; @@ -113,6 +115,8 @@ use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; #[cfg(cycle_tests)] use std::{any::Any, sync::Weak}; +#[cfg(feature = "uniffi")] +use crate::probing::ProbingConfig; pub use balance::{BalanceDetails, LightningBalance, PendingSweepBalance}; pub use bip39; pub use bitcoin; @@ -170,6 +174,9 @@ use payment::{ UnifiedPayment, }; use peer_store::{PeerInfo, PeerStore}; +#[cfg(feature = "uniffi")] +pub use probing::ArcedProbingConfigBuilder as ProbingConfigBuilder; +use probing::{run_prober, Prober}; use runtime::Runtime; pub use tokio; use types::{ @@ -239,6 +246,7 @@ pub struct Node { om_mailbox: Option>, async_payments_role: Option, hrn_resolver: HRNResolver, + prober: Option>, #[cfg(cycle_tests)] _leak_checker: LeakChecker, } @@ -596,11 +604,19 @@ impl Node { static_invoice_store, Arc::clone(&self.onion_messenger), self.om_mailbox.clone(), + self.prober.clone(), Arc::clone(&self.runtime), Arc::clone(&self.logger), Arc::clone(&self.config), )); + if let Some(prober) = self.prober.clone() { + let stop_rx = self.stop_sender.subscribe(); + self.runtime.spawn_cancellable_background_task(async move { + run_prober(prober, stop_rx).await; + }); + } + // Setup background processing let background_persister = Arc::clone(&self.kv_store); let background_event_handler = Arc::clone(&event_handler); @@ -1079,6 +1095,11 @@ impl Node { )) } + /// Returns a reference to the [`Prober`], or `None` if no probing strategy is configured. + pub fn prober(&self) -> Option<&Prober> { + self.prober.as_deref() + } + /// Retrieve a list of known channels. pub fn list_channels(&self) -> Vec { self.channel_manager.list_channels().into_iter().map(|c| c.into()).collect() diff --git a/src/probing.rs b/src/probing.rs new file mode 100644 index 000000000..3d0b1af75 --- /dev/null +++ b/src/probing.rs @@ -0,0 +1,749 @@ +// This file is Copyright its original authors, visible in version control history. +// +// This file is licensed under the Apache License, Version 2.0 or the MIT license , at your option. You may not use this file except in +// accordance with one or both of these licenses. + +//! Background probing strategies for training the payment scorer. + +use std::collections::HashMap; +use std::fmt; +use std::sync::atomic::{AtomicU64, Ordering}; +#[cfg(feature = "uniffi")] +use std::sync::RwLock; +use std::sync::{Arc, Mutex}; +use std::time::{Duration, Instant}; + +use bitcoin::secp256k1::PublicKey; +use lightning::routing::gossip::NodeId; +use lightning::routing::router::{ + Path, PaymentParameters, RouteHop, RouteParameters, MAX_PATH_LENGTH_ESTIMATE, +}; +use lightning_invoice::DEFAULT_MIN_FINAL_CLTV_EXPIRY_DELTA; +use lightning_types::features::{ChannelFeatures, NodeFeatures}; + +use crate::config::{ + DEFAULT_MAX_PROBE_LOCKED_MSAT, DEFAULT_PROBED_NODE_COOLDOWN_SECS, + DEFAULT_PROBING_INTERVAL_SECS, MIN_PROBING_INTERVAL, +}; +use crate::logger::{log_debug, LdkLogger, Logger}; +use crate::types::{ChannelManager, Graph, Router}; +use crate::util::random_range; + +use lightning::routing::router::Router as LdkRouter; + +/// Which built-in probing strategy to use, or a custom one. +#[derive(Clone)] +pub(crate) enum ProbingStrategyKind { + HighDegree { top_node_count: usize }, + Random { max_hops: usize }, + Custom(Arc), +} + +impl fmt::Debug for ProbingStrategyKind { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::HighDegree { top_node_count } => { + f.debug_struct("HighDegree").field("top_node_count", top_node_count).finish() + }, + Self::Random { max_hops } => { + f.debug_struct("Random").field("max_hops", max_hops).finish() + }, + Self::Custom(_) => f.write_str("Custom()"), + } + } +} + +/// Configuration for the background probing subsystem. +/// +/// Construct via [`ProbingConfigBuilder`]. Pick a strategy with +/// [`ProbingConfigBuilder::high_degree`], [`ProbingConfigBuilder::random_walk`], or +/// [`ProbingConfigBuilder::custom`], chain optional setters, and finalize with +/// [`ProbingConfigBuilder::build`]. +/// +/// # Caution +/// +/// Probes send real HTLCs along real paths. If an intermediate hop is offline or +/// misbehaving, the probe HTLC can remain in-flight — locking outbound liquidity +/// on the first-hop channel until the HTLC timeout elapses (potentially hours). +/// `max_locked_msat` caps the total outbound capacity that in-flight probes may +/// hold at any one time; tune it conservatively for nodes with tight liquidity. +/// +/// # Example +/// ```ignore +/// let config = ProbingConfigBuilder::high_degree(100) +/// .interval(Duration::from_secs(30)) +/// .max_locked_msat(500_000) +/// .diversity_penalty_msat(250) +/// .build(); +/// builder.set_probing_config(config); +/// ``` +#[derive(Clone, Debug)] +#[cfg_attr(feature = "uniffi", derive(uniffi::Object))] +pub struct ProbingConfig { + pub(crate) kind: ProbingStrategyKind, + pub(crate) interval: Duration, + pub(crate) max_locked_msat: u64, + pub(crate) diversity_penalty_msat: Option, + pub(crate) cooldown: Duration, +} + +/// Builder for [`ProbingConfig`]. +/// +/// Pick a strategy with [`high_degree`], [`random_walk`], or [`custom`], chain optional +/// setters, and call [`build`] to finalize. +/// +/// [`high_degree`]: Self::high_degree +/// [`random_walk`]: Self::random_walk +/// [`custom`]: Self::custom +/// [`build`]: Self::build +pub struct ProbingConfigBuilder { + kind: ProbingStrategyKind, + interval: Duration, + max_locked_msat: u64, + diversity_penalty_msat: Option, + cooldown: Duration, +} + +impl ProbingConfigBuilder { + fn with_kind(kind: ProbingStrategyKind) -> Self { + Self { + kind, + interval: Duration::from_secs(DEFAULT_PROBING_INTERVAL_SECS), + max_locked_msat: DEFAULT_MAX_PROBE_LOCKED_MSAT, + diversity_penalty_msat: None, + cooldown: Duration::from_secs(DEFAULT_PROBED_NODE_COOLDOWN_SECS), + } + } + + /// Start building a config that probes toward the highest-degree nodes in the graph. + /// + /// `top_node_count` controls how many of the most-connected nodes are cycled through. + pub fn high_degree(top_node_count: usize) -> Self { + Self::with_kind(ProbingStrategyKind::HighDegree { top_node_count }) + } + + /// Start building a config that probes via random graph walks. + /// + /// `max_hops` is the upper bound on the number of hops in a randomly constructed path. + pub fn random_walk(max_hops: usize) -> Self { + Self::with_kind(ProbingStrategyKind::Random { max_hops }) + } + + /// Start building a config with a custom [`ProbingStrategy`] implementation. + pub fn custom(strategy: Arc) -> Self { + Self::with_kind(ProbingStrategyKind::Custom(strategy)) + } + + /// Overrides the interval between probe attempts. + /// + /// Defaults to 10 seconds. + pub fn interval(&mut self, interval: Duration) -> &mut Self { + self.interval = interval; + self + } + + /// Overrides the maximum millisatoshis that may be locked in in-flight probes at any time. + /// + /// Defaults to 100 000 000 msat (100k sats). + pub fn max_locked_msat(&mut self, max_msat: u64) -> &mut Self { + self.max_locked_msat = max_msat; + self + } + + /// Sets the probing diversity penalty applied by the probabilistic scorer. + /// + /// When set, the scorer will penalize channels that have been recently probed, + /// encouraging path diversity during background probing. The penalty decays + /// quadratically over 24 hours. + /// + /// This is only useful for probing strategies that route through the scorer + /// (e.g., [`HighDegreeStrategy`]). Strategies that build paths manually + /// (e.g., [`RandomStrategy`]) bypass the scorer entirely. + /// + /// If unset, LDK's default of `0` (no penalty) is used. + pub fn diversity_penalty_msat(&mut self, penalty_msat: u64) -> &mut Self { + self.diversity_penalty_msat = Some(penalty_msat); + self + } + + /// Sets how long a probed node stays ineligible before being probed again. + /// + /// Only applies to [`HighDegreeStrategy`]. Defaults to 1 hour. + pub fn cooldown(&mut self, cooldown: Duration) -> &mut Self { + self.cooldown = cooldown; + self + } + + /// Builds the [`ProbingConfig`]. + pub fn build(&self) -> ProbingConfig { + ProbingConfig { + kind: self.kind.clone(), + interval: self.interval.max(MIN_PROBING_INTERVAL), + max_locked_msat: self.max_locked_msat, + diversity_penalty_msat: self.diversity_penalty_msat, + cooldown: self.cooldown, + } + } +} + +/// A UniFFI-compatible wrapper around [`ProbingConfigBuilder`] that uses interior mutability +/// so it can be shared behind an `Arc` as required by the FFI object model. +/// +/// Obtain one via the constructors [`new_high_degree`] or [`new_random_walk`], configure it +/// with the `set_*` methods, then call [`build`] to produce a [`ProbingConfig`]. +/// +/// [`new_high_degree`]: Self::new_high_degree +/// [`new_random_walk`]: Self::new_random_walk +/// [`build`]: Self::build +#[cfg(feature = "uniffi")] +#[derive(uniffi::Object)] +pub struct ArcedProbingConfigBuilder { + inner: RwLock, +} + +#[cfg(feature = "uniffi")] +#[uniffi::export] +impl ArcedProbingConfigBuilder { + /// Creates a builder configured to probe toward the highest-degree nodes in the graph. + /// + /// `top_node_count` controls how many of the most-connected nodes are cycled through. + #[uniffi::constructor] + pub fn new_high_degree(top_node_count: u64) -> Arc { + Arc::new(Self { + inner: RwLock::new(ProbingConfigBuilder::high_degree(top_node_count as usize)), + }) + } + + /// Creates a builder configured to probe via random graph walks. + /// + /// `max_hops` is the upper bound on the number of hops in a randomly constructed path. + #[uniffi::constructor] + pub fn new_random_walk(max_hops: u64) -> Arc { + Arc::new(Self { inner: RwLock::new(ProbingConfigBuilder::random_walk(max_hops as usize)) }) + } + + /// Overrides the interval between probe attempts. Defaults to 10 seconds. + pub fn set_interval(&self, secs: u64) { + self.inner.write().unwrap().interval(Duration::from_secs(secs)); + } + + /// Overrides the maximum millisatoshis that may be locked in in-flight probes at any time. + /// + /// Defaults to 100 000 000 msat (100k sats). + pub fn set_max_locked_msat(&self, max_msat: u64) { + self.inner.write().unwrap().max_locked_msat(max_msat); + } + + /// Sets the probing diversity penalty applied by the probabilistic scorer. + /// + /// When set, the scorer will penalize channels that have been recently probed, + /// encouraging path diversity during background probing. The penalty decays + /// quadratically over 24 hours. + /// + /// If unset, LDK's default of `0` (no penalty) is used. + pub fn set_diversity_penalty_msat(&self, penalty_msat: u64) { + self.inner.write().unwrap().diversity_penalty_msat(penalty_msat); + } + + /// Sets how long a probed node stays ineligible before being probed again. + /// + /// Only applies to the high-degree strategy. Defaults to 1 hour. + pub fn set_cooldown(&self, secs: u64) { + self.inner.write().unwrap().cooldown(Duration::from_secs(secs)); + } + + /// Builds the [`ProbingConfig`]. + pub fn build(&self) -> Arc { + Arc::new(self.inner.read().unwrap().build()) + } +} + +/// Strategy can be used for determining the next target and amount for probing. +pub trait ProbingStrategy: Send + Sync + 'static { + /// Returns the next probe path to run, or `None` to skip this tick. + fn next_probe(&self) -> Option; +} + +/// Probes toward the most-connected nodes in the graph. +/// +/// On each tick the strategy reads the current gossip graph, sorts nodes by +/// channel count, and picks the highest-degree node from the top +/// `top_node_count` that has not been probed within `cooldown`. +/// Nodes probed more recently are skipped so that the strategy +/// naturally spreads across the top nodes and picks up graph changes. +/// If all top nodes are on cooldown, the cooldown map is cleared and a new cycle begins +/// immediately. +/// +/// The probe amount is chosen uniformly at random from +/// `[min_amount_msat, max_amount_msat]`. +pub struct HighDegreeStrategy { + network_graph: Arc, + channel_manager: Arc, + router: Arc, + /// How many of the highest-degree nodes to cycle through. + pub top_node_count: usize, + /// Lower bound for the randomly chosen probe amount. + pub min_amount_msat: u64, + /// Upper bound for the randomly chosen probe amount. + pub max_amount_msat: u64, + /// How long a node stays ineligible after being probed. + pub cooldown: Duration, + /// Skip a path when the first-hop outbound liquidity is less than + /// `path_value * liquidity_limit_multiplier`. + pub liquidity_limit_multiplier: u64, + /// Nodes probed recently, with the time they were last probed. + recently_probed: Mutex>, +} + +impl HighDegreeStrategy { + /// Creates a new high-degree probing strategy. + pub(crate) fn new( + network_graph: Arc, channel_manager: Arc, router: Arc, + top_node_count: usize, min_amount_msat: u64, max_amount_msat: u64, cooldown: Duration, + liquidity_limit_multiplier: u64, + ) -> Self { + assert!( + min_amount_msat <= max_amount_msat, + "min_amount_msat must not exceed max_amount_msat" + ); + Self { + network_graph, + channel_manager, + router, + top_node_count, + min_amount_msat, + max_amount_msat, + cooldown, + liquidity_limit_multiplier, + recently_probed: Mutex::new(HashMap::new()), + } + } +} + +impl ProbingStrategy for HighDegreeStrategy { + fn next_probe(&self) -> Option { + let graph = self.network_graph.read_only(); + + let mut nodes_by_degree: Vec<(PublicKey, usize)> = graph + .nodes() + .unordered_iter() + .filter_map(|(id, info)| { + PublicKey::try_from(*id).ok().map(|pubkey| (pubkey, info.channels.len())) + }) + .collect(); + + if nodes_by_degree.is_empty() { + return None; + } + + nodes_by_degree.sort_unstable_by(|a, b| b.1.cmp(&a.1)); + + let top_node_count = self.top_node_count.min(nodes_by_degree.len()); + let now = Instant::now(); + + let mut probed = self.recently_probed.lock().unwrap_or_else(|e| e.into_inner()); + + // We could check staleness when we use the entry, but that way we'd not clear cache at + // all. For hundreds of top nodes it's okay to call retain each tick. + probed.retain(|_, probed_at| now.duration_since(*probed_at) < self.cooldown); + + // If all top nodes are on cooldown, reset and start a new cycle. + let final_node = match nodes_by_degree[..top_node_count] + .iter() + .find(|(pubkey, _)| !probed.contains_key(pubkey)) + { + Some((pubkey, _)) => *pubkey, + None => { + probed.clear(); + nodes_by_degree[0].0 + }, + }; + + probed.insert(final_node, now); + drop(probed); + drop(graph); + + let amount_msat = random_range(self.min_amount_msat, self.max_amount_msat); + let payment_params = + PaymentParameters::from_node_id(final_node, DEFAULT_MIN_FINAL_CLTV_EXPIRY_DELTA as u32); + let route_params = + RouteParameters::from_payment_params_and_value(payment_params, amount_msat); + + let payer = self.channel_manager.get_our_node_id(); + let usable_channels = self.channel_manager.list_usable_channels(); + let first_hops: Vec<&_> = usable_channels.iter().collect(); + let inflight_htlcs = self.channel_manager.compute_inflight_htlcs(); + + let route = self + .router + .find_route(&payer, &route_params, Some(&first_hops), inflight_htlcs) + .ok()?; + + let path = route.paths.into_iter().next()?; + + // Liquidity-limit check (mirrors send_preflight_probes): skip the path when the + // first-hop outbound liquidity is less than path_value * liquidity_limit_multiplier. + if let Some(first_hop_hop) = path.hops.first() { + if let Some(ch) = usable_channels + .iter() + .find(|h| h.get_outbound_payment_scid() == Some(first_hop_hop.short_channel_id)) + { + let path_value = path.final_value_msat() + path.fee_msat(); + if ch.next_outbound_htlc_limit_msat + < path_value.saturating_mul(self.liquidity_limit_multiplier) + { + return None; + } + } + } + + Some(path) + } +} + +/// Explores the graph by walking a random number of hops outward from one of our own +/// channels, constructing the [`Path`] explicitly. +/// +/// On each tick: +/// 1. Picks one of our confirmed, usable channels to start from. +/// 2. Performs a random walk of a chosen depth (up to [`MAX_PATH_LENGTH_ESTIMATE`]) through the +/// gossip graph, skipping disabled channels and dead-ends. +/// +/// The probe amount is chosen uniformly at random from `[min_amount_msat, max_amount_msat]`. +/// +/// Because path selection ignores the scorer, this probes channels the router +/// would never try on its own, teaching the scorer about previously unknown paths. +pub struct RandomStrategy { + network_graph: Arc, + channel_manager: Arc, + /// Upper bound on the number of hops in a randomly constructed path. + pub max_hops: usize, + /// Lower bound for the randomly chosen probe amount. + pub min_amount_msat: u64, + /// Upper bound for the randomly chosen probe amount. + pub max_amount_msat: u64, +} + +impl RandomStrategy { + /// Creates a new random-walk probing strategy. + pub(crate) fn new( + network_graph: Arc, channel_manager: Arc, max_hops: usize, + min_amount_msat: u64, max_amount_msat: u64, + ) -> Self { + assert!( + min_amount_msat <= max_amount_msat, + "min_amount_msat must not exceed max_amount_msat" + ); + Self { + network_graph, + channel_manager, + max_hops: max_hops.clamp(1, MAX_PATH_LENGTH_ESTIMATE as usize), + min_amount_msat, + max_amount_msat, + } + } + + /// Tries to build a path of `target_hops` hops. Returns `None` if the local node has no + /// usable channels, or the walk terminates before reaching `target_hops`. + fn try_build_path(&self, target_hops: usize, amount_msat: u64) -> Option { + let initial_channels = self + .channel_manager + .list_channels() + .into_iter() + .filter(|c| c.is_usable && c.short_channel_id.is_some()) + .collect::>(); + + if initial_channels.is_empty() { + return None; + } + + let graph = self.network_graph.read_only(); + let first_hop = + &initial_channels[random_range(0, initial_channels.len() as u64 - 1) as usize]; + let first_hop_scid = first_hop.short_channel_id?; + let next_peer_pubkey = first_hop.counterparty.node_id; + let next_peer_node_id = NodeId::from_pubkey(&next_peer_pubkey); + + // Track the tightest HTLC limit across all hops to cap the probe amount. + // The first hop limit comes from our live channel state; subsequent hops use htlc_maximum_msat from the gossip channel update. + let mut route_least_htlc_upper_bound = first_hop.next_outbound_htlc_limit_msat; + let mut route_greatest_htlc_lower_bound = first_hop.next_outbound_htlc_minimum_msat; + + // Walk the graph: each entry is (node_id, arrived_via_scid, pubkey); first entry is set: + let mut route: Vec<(NodeId, u64, PublicKey)> = + vec![(next_peer_node_id, first_hop_scid, next_peer_pubkey)]; + + let mut prev_scid = first_hop_scid; + let mut current_node_id = next_peer_node_id; + + for _ in 1..target_hops { + let node_info = match graph.node(¤t_node_id) { + Some(n) => n, + None => break, + }; + + // Skip the edge we arrived on. Longer cycles aren't filtered — probes fail at + // the destination anyway, so revisiting nodes is harmless. + let candidates: Vec = + node_info.channels.iter().copied().filter(|&scid| scid != prev_scid).collect(); + + if candidates.is_empty() { + break; + } + + let next_scid = candidates[random_range(0, candidates.len() as u64 - 1) as usize]; + let next_channel = match graph.channel(next_scid) { + Some(c) => c, + None => break, + }; + + // as_directed_from validates that current_node_id is a channel endpoint and that + // both direction updates are present; effective_capacity covers both htlc_maximum_msat + // and funding capacity. + let Some((directed, next_node_id)) = next_channel.as_directed_from(¤t_node_id) + else { + break; + }; + // Retrieve the direction-specific update via the public ChannelInfo fields. + // as_directed_from already checked both directions are Some, but we break + // defensively rather than unwrap. + let update = match if directed.source() == &next_channel.node_one { + next_channel.one_to_two.as_ref() + } else { + next_channel.two_to_one.as_ref() + } { + Some(u) => u, + None => break, + }; + + if !update.enabled { + break; + } + + route_least_htlc_upper_bound = + route_least_htlc_upper_bound.min(update.htlc_maximum_msat); + + route_greatest_htlc_lower_bound = + route_greatest_htlc_lower_bound.max(update.htlc_minimum_msat); + + let next_pubkey = match PublicKey::try_from(*next_node_id) { + Ok(pk) => pk, + Err(_) => break, + }; + + route.push((*next_node_id, next_scid, next_pubkey)); + prev_scid = next_scid; + current_node_id = *next_node_id; + } + + if route_greatest_htlc_lower_bound > route_least_htlc_upper_bound { + return None; + } + let amount_msat = + amount_msat.max(route_greatest_htlc_lower_bound).min(route_least_htlc_upper_bound); + if amount_msat < self.min_amount_msat || amount_msat > self.max_amount_msat { + return None; + } + + // Assemble hops backwards so each hop's proportional fee is computed on the amount it actually forwards + let mut hops = Vec::with_capacity(route.len()); + let mut forwarded = amount_msat; + let last = route.len() - 1; + + // Resolve (node_features, channel_features, maybe_announced_channel) for a hop. + // The first hop is our local channel and may be unannounced, so its ChannelFeatures + // are not in the gossip graph — match on SCID to detect it and fall back to local-state + // defaults. All other (walked) hops were picked from the graph and must resolve there. + let hop_features = + |node_id: &NodeId, via_scid: u64| -> Option<(NodeFeatures, ChannelFeatures, bool)> { + let node_features = graph + .node(node_id) + .and_then(|n| n.announcement_info.as_ref().map(|a| a.features().clone())) + .unwrap_or_else(NodeFeatures::empty); + let (channel_features, maybe_announced_channel) = if via_scid == first_hop_scid { + (ChannelFeatures::empty(), false) + } else { + (graph.channel(via_scid)?.features.clone(), true) + }; + Some((node_features, channel_features, maybe_announced_channel)) + }; + + // Final hop: fee_msat carries the delivery amount; cltv delta is zero. + { + let (node_id, via_scid, pubkey) = route[last]; + let (node_features, channel_features, maybe_announced_channel) = + hop_features(&node_id, via_scid)?; + hops.push(RouteHop { + pubkey, + node_features, + short_channel_id: via_scid, + channel_features, + fee_msat: amount_msat, + cltv_expiry_delta: 0, + maybe_announced_channel, + }); + } + + // Non-final hops, from second-to-last back to first. + for i in (0..last).rev() { + let (node_id, via_scid, pubkey) = route[i]; + let (node_features, channel_features, maybe_announced_channel) = + hop_features(&node_id, via_scid)?; + + let (_, next_scid, _) = route[i + 1]; + let next_channel = graph.channel(next_scid)?; + let (directed, _) = next_channel.as_directed_from(&node_id)?; + let update = match if directed.source() == &next_channel.node_one { + next_channel.one_to_two.as_ref() + } else { + next_channel.two_to_one.as_ref() + } { + Some(u) => u, + None => return None, + }; + let fee = update.fees.base_msat as u64 + + (forwarded * update.fees.proportional_millionths as u64 / 1_000_000); + forwarded += fee; + + hops.push(RouteHop { + pubkey, + node_features, + short_channel_id: via_scid, + channel_features, + fee_msat: fee, + cltv_expiry_delta: update.cltv_expiry_delta as u32, + maybe_announced_channel, + }); + } + + hops.reverse(); + + // The first-hop HTLC carries amount_msat + all intermediate fees. + // Verify the total fits within our live outbound limit before returning. + let total_outgoing: u64 = hops.iter().map(|h| h.fee_msat).sum(); + if total_outgoing > first_hop.next_outbound_htlc_limit_msat { + return None; + } + + Some(Path { hops, blinded_tail: None }) + } +} + +impl ProbingStrategy for RandomStrategy { + fn next_probe(&self) -> Option { + let target_hops = random_range(1, self.max_hops as u64) as usize; + let amount_msat = random_range(self.min_amount_msat, self.max_amount_msat); + + self.try_build_path(target_hops, amount_msat) + } +} + +/// Periodically dispatches probes according to a [`ProbingStrategy`]. +pub struct Prober { + pub(crate) channel_manager: Arc, + pub(crate) logger: Arc, + /// The strategy that decides what to probe. + pub strategy: Arc, + /// How often to fire a probe attempt. + pub interval: Duration, + /// Maximum total millisatoshis that may be locked in in-flight probes at any time. + pub max_locked_msat: u64, + pub(crate) locked_msat: Arc, +} + +fn fmt_path(path: &lightning::routing::router::Path) -> String { + path.hops + .iter() + .map(|h| format!("{}(scid={})", h.pubkey, h.short_channel_id)) + .collect::>() + .join(" -> ") +} + +impl Prober { + /// Returns the total millisatoshis currently locked in in-flight probes. + pub fn locked_msat(&self) -> u64 { + self.locked_msat.load(Ordering::Relaxed) + } + + pub(crate) fn handle_probe_successful(&self, path: &lightning::routing::router::Path) { + let amount: u64 = path.hops.iter().map(|h| h.fee_msat).sum(); + let prev = self + .locked_msat + .fetch_update(Ordering::AcqRel, Ordering::Acquire, |v| Some(v.saturating_sub(amount))) + .expect("fetch_update closure always returns Some"); + let new = prev.saturating_sub(amount); + log_debug!( + self.logger, + "Probe successful: released {} msat (locked_msat {} -> {}), path: {}", + amount, + prev, + new, + fmt_path(path) + ); + } + + pub(crate) fn handle_probe_failed(&self, path: &lightning::routing::router::Path) { + let amount: u64 = path.hops.iter().map(|h| h.fee_msat).sum(); + let prev = self + .locked_msat + .fetch_update(Ordering::AcqRel, Ordering::Acquire, |v| Some(v.saturating_sub(amount))) + .expect("fetch_update closure always returns Some"); + let new = prev.saturating_sub(amount); + log_debug!( + self.logger, + "Probe failed: released {} msat (locked_msat {} -> {}), path: {}", + amount, + prev, + new, + fmt_path(path) + ); + } +} + +/// Runs the probing loop for the given [`Prober`] until `stop_rx` fires. +pub(crate) async fn run_prober(prober: Arc, mut stop_rx: tokio::sync::watch::Receiver<()>) { + let mut ticker = tokio::time::interval(prober.interval); + ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + + loop { + tokio::select! { + biased; + _ = stop_rx.changed() => { + log_debug!(prober.logger, "Stopping background probing."); + return; + } + _ = ticker.tick() => { + let path = match prober.strategy.next_probe() { + Some(p) => p, + None => continue, + }; + let amount: u64 = path.hops.iter().map(|h| h.fee_msat).sum(); + if prober.locked_msat.load(Ordering::Acquire) + amount > prober.max_locked_msat { + log_debug!(prober.logger, "Skipping probe: locked-msat budget exceeded."); + continue; + } + match prober.channel_manager.send_probe(path.clone()) { + Ok(_) => { + prober.locked_msat.fetch_add(amount, Ordering::Release); + log_debug!( + prober.logger, + "Probe sent: locked {} msat, path: {}", + amount, + fmt_path(&path) + ); + } + Err(e) => { + log_debug!( + prober.logger, + "Probe send failed: {:?}, path: {}", + e, + fmt_path(&path) + ); + } + } + } + } + } +} diff --git a/src/util.rs b/src/util.rs new file mode 100644 index 000000000..3350ad2c7 --- /dev/null +++ b/src/util.rs @@ -0,0 +1,37 @@ +// This file is Copyright its original authors, visible in version control history. +// +// This file is licensed under the Apache License, Version 2.0 or the MIT license , at your option. You may not use this file except in +// accordance with one or both of these licenses. + +/// Returns a random `u64` uniformly distributed in `[min, max]` (inclusive). +pub(crate) fn random_range(min: u64, max: u64) -> u64 { + debug_assert!(min <= max); + if min == max { + return min; + } + let range = match (max - min).checked_add(1) { + Some(r) => r, + None => { + // overflowed — full u64::MAX range + let mut buf = [0u8; 8]; + getrandom::fill(&mut buf).expect("getrandom failed"); + return u64::from_ne_bytes(buf); + }, + }; + // We remove bias due to the fact that the range does not evenly divide 2⁶⁴. + // Imagine we had a range from 0 to 2⁶⁴-2 (of length 2⁶⁴-1), then + // the outcomes of 0 would be twice as frequent as any other, as 0 can be produced + // as randomly drawn 0 % 2⁶⁴-1 and as well as 2⁶⁴-1 % 2⁶⁴-1 + let limit = u64::MAX - (u64::MAX % range); + loop { + let mut buf = [0u8; 8]; + getrandom::fill(&mut buf).expect("getrandom failed"); + let val = u64::from_ne_bytes(buf); + if val < limit { + return min + (val % range); + } + // loop runs ~1 iteration on average, in worst case it's ~2 iterations on average + } +} From b11cea06efecabf351d4d8d67a3a32a11c7e3ccf Mon Sep 17 00:00:00 2001 From: Alexander Shevtsov Date: Tue, 28 Apr 2026 18:13:21 +0200 Subject: [PATCH 2/2] Add probing service tests Add integration tests that verify the probing service fires probes on the configured interval and respects the locked-msat budget cap. Shared helpers in tests/common are extended with probing-aware setup. Co-Authored-By: Claude Sonnet 4.6 --- src/builder.rs | 2 +- src/probing.rs | 10 +- tests/common/mod.rs | 36 ++++- tests/probing_tests.rs | 345 +++++++++++++++++++++++++++++++++++++++++ 4 files changed, 380 insertions(+), 13 deletions(-) create mode 100644 tests/probing_tests.rs diff --git a/src/builder.rs b/src/builder.rs index 5660d6b75..4da58c0fa 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -1130,7 +1130,7 @@ impl ArcedNodeBuilder { /// /// See [`ProbingConfig`] for details. pub fn set_probing_config(&self, config: Arc) { - self.inner.write().unwrap().set_probing_config((*config).clone()); + self.inner.write().expect("lock").set_probing_config((*config).clone()); } /// Builds a [`Node`] instance with a [`SqliteStore`] backend and according to the options diff --git a/src/probing.rs b/src/probing.rs index 3d0b1af75..ac0571bd1 100644 --- a/src/probing.rs +++ b/src/probing.rs @@ -226,14 +226,14 @@ impl ArcedProbingConfigBuilder { /// Overrides the interval between probe attempts. Defaults to 10 seconds. pub fn set_interval(&self, secs: u64) { - self.inner.write().unwrap().interval(Duration::from_secs(secs)); + self.inner.write().expect("lock").interval(Duration::from_secs(secs)); } /// Overrides the maximum millisatoshis that may be locked in in-flight probes at any time. /// /// Defaults to 100 000 000 msat (100k sats). pub fn set_max_locked_msat(&self, max_msat: u64) { - self.inner.write().unwrap().max_locked_msat(max_msat); + self.inner.write().expect("lock").max_locked_msat(max_msat); } /// Sets the probing diversity penalty applied by the probabilistic scorer. @@ -244,19 +244,19 @@ impl ArcedProbingConfigBuilder { /// /// If unset, LDK's default of `0` (no penalty) is used. pub fn set_diversity_penalty_msat(&self, penalty_msat: u64) { - self.inner.write().unwrap().diversity_penalty_msat(penalty_msat); + self.inner.write().expect("lock").diversity_penalty_msat(penalty_msat); } /// Sets how long a probed node stays ineligible before being probed again. /// /// Only applies to the high-degree strategy. Defaults to 1 hour. pub fn set_cooldown(&self, secs: u64) { - self.inner.write().unwrap().cooldown(Duration::from_secs(secs)); + self.inner.write().expect("lock").cooldown(Duration::from_secs(secs)); } /// Builds the [`ProbingConfig`]. pub fn build(&self) -> Arc { - Arc::new(self.inner.read().unwrap().build()) + Arc::new(self.inner.read().expect("lock").build()) } } diff --git a/tests/common/mod.rs b/tests/common/mod.rs index 850c6f22c..306a432d8 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -35,6 +35,7 @@ use ldk_node::config::{ use ldk_node::entropy::{generate_entropy_mnemonic, NodeEntropy}; use ldk_node::io::sqlite_store::SqliteStore; use ldk_node::payment::{PaymentDirection, PaymentKind, PaymentStatus}; +use ldk_node::probing::ProbingConfig; use ldk_node::{ Builder, ChannelShutdownState, CustomTlvRecord, Event, LightningBalance, Node, NodeError, PendingSweepBalance, UserChannelId, @@ -318,9 +319,9 @@ pub(crate) fn random_config(anchor_channels: bool) -> TestConfig { } #[cfg(feature = "uniffi")] -type TestNode = Arc; +pub(crate) type TestNode = Arc; #[cfg(not(feature = "uniffi"))] -type TestNode = Node; +pub(crate) type TestNode = Node; #[derive(Clone)] pub(crate) enum TestChainSource<'a> { @@ -350,6 +351,7 @@ pub(crate) struct TestConfig { pub node_entropy: NodeEntropy, pub async_payments_role: Option, pub recovery_mode: bool, + pub probing: Option, } impl Default for TestConfig { @@ -369,6 +371,7 @@ impl Default for TestConfig { node_entropy, async_payments_role, recovery_mode, + probing: None, } } } @@ -501,6 +504,10 @@ pub(crate) fn setup_node(chain_source: &TestChainSource, config: TestConfig) -> builder.set_wallet_recovery_mode(); } + if let Some(probing) = config.probing { + builder.set_probing_config(probing.into()); + } + let node = match config.store_type { TestStoreType::TestSyncStore => { let kv_store = TestSyncStore::new(config.node_config.storage_dir_path.into()); @@ -728,12 +735,18 @@ pub async fn open_channel( node_a: &TestNode, node_b: &TestNode, funding_amount_sat: u64, should_announce: bool, electrsd: &ElectrsD, ) -> OutPoint { - open_channel_push_amt(node_a, node_b, funding_amount_sat, None, should_announce, electrsd).await + let funding_txo = + open_channel_no_wait(node_a, node_b, funding_amount_sat, None, should_announce).await; + wait_for_tx(&electrsd.client, funding_txo.txid).await; + funding_txo } -pub async fn open_channel_push_amt( +/// Like [`open_channel`] but skips the `wait_for_tx` electrum check so that +/// multiple channels can be opened back-to-back before any blocks are mined. +/// The caller is responsible for mining blocks and confirming the funding txs. +pub async fn open_channel_no_wait( node_a: &TestNode, node_b: &TestNode, funding_amount_sat: u64, push_amount_msat: Option, - should_announce: bool, electrsd: &ElectrsD, + should_announce: bool, ) -> OutPoint { if should_announce { node_a @@ -761,11 +774,20 @@ pub async fn open_channel_push_amt( let funding_txo_a = expect_channel_pending_event!(node_a, node_b.node_id()); let funding_txo_b = expect_channel_pending_event!(node_b, node_a.node_id()); assert_eq!(funding_txo_a, funding_txo_b); - wait_for_tx(&electrsd.client, funding_txo_a.txid).await; - funding_txo_a } +pub async fn open_channel_push_amt( + node_a: &TestNode, node_b: &TestNode, funding_amount_sat: u64, push_amount_msat: Option, + should_announce: bool, electrsd: &ElectrsD, +) -> OutPoint { + let funding_txo = + open_channel_no_wait(node_a, node_b, funding_amount_sat, push_amount_msat, should_announce) + .await; + wait_for_tx(&electrsd.client, funding_txo.txid).await; + funding_txo +} + pub async fn open_channel_with_all( node_a: &TestNode, node_b: &TestNode, should_announce: bool, electrsd: &ElectrsD, ) -> OutPoint { diff --git a/tests/probing_tests.rs b/tests/probing_tests.rs new file mode 100644 index 000000000..7744a565b --- /dev/null +++ b/tests/probing_tests.rs @@ -0,0 +1,345 @@ +// Integration tests for the probing service. +// +// Budget tests – linear A ──[1M sats]──▶ B ──[1M sats]──▶ C topology: +// +// probe_budget_increments_and_decrements +// Verifies locked_msat rises when a probe is dispatched and returns +// to zero once the probe resolves. +// +// exhausted_probe_budget_blocks_new_probes +// Stops B mid-flight so the HTLC cannot resolve; confirms the budget +// stays exhausted and no further probes are sent. After B restarts +// the probe fails, the budget clears, and new probes resume. + +mod common; +use std::sync::atomic::{AtomicBool, Ordering}; + +use common::{ + expect_channel_ready_event, expect_event, generate_blocks_and_wait, open_channel, + premine_and_distribute_funds, random_chain_source, random_config, setup_bitcoind_and_electrsd, + setup_node, TestNode, +}; + +use ldk_node::bitcoin::Amount; +use ldk_node::probing::{ProbingConfigBuilder, ProbingStrategy}; +use ldk_node::Event; + +use lightning::routing::router::Path; + +use std::sync::{Arc, Mutex}; +use std::time::Duration; + +const PROBE_AMOUNT_MSAT: u64 = 1_000_000; +const PROBING_INTERVAL_MILLISECONDS: u64 = 500; + +/// FixedPathStrategy — returns a fixed pre-built path; used by budget tests. +/// +/// The path is set after node and channel setup via [`set_path`]. +struct FixedPathStrategy { + path: Mutex>, + ready_to_probe: AtomicBool, +} + +impl FixedPathStrategy { + fn new() -> Arc { + Arc::new(Self { path: Mutex::new(None), ready_to_probe: AtomicBool::new(false) }) + } + + fn set_path(&self, path: Path) { + *self.path.lock().unwrap() = Some(path); + } + + fn start_probing(&self) { + self.ready_to_probe.store(true, Ordering::Relaxed); + } + + fn stop_probing(&self) { + self.ready_to_probe.store(false, Ordering::Relaxed); + } +} + +impl ProbingStrategy for FixedPathStrategy { + fn next_probe(&self) -> Option { + if self.ready_to_probe.load(Ordering::Relaxed) { + self.path.lock().unwrap().clone() + } else { + None + } + } +} + +/// Builds a 2-hop probe path: node_a → node_b → node_c using live channel info. +fn build_probe_path( + node_a: &TestNode, node_b: &TestNode, node_c: &TestNode, amount_msat: u64, +) -> Path { + use lightning::routing::router::RouteHop; + use lightning_types::features::{ChannelFeatures, NodeFeatures}; + + let ch_ab = node_a + .list_channels() + .into_iter() + .find(|ch| ch.counterparty_node_id == node_b.node_id() && ch.short_channel_id.is_some()) + .expect("A→B channel not found"); + let ch_bc = node_b + .list_channels() + .into_iter() + .find(|ch| ch.counterparty_node_id == node_c.node_id() && ch.short_channel_id.is_some()) + .expect("B→C channel not found"); + + Path { + hops: vec![ + RouteHop { + pubkey: node_b.node_id(), + node_features: NodeFeatures::empty(), + short_channel_id: ch_ab.short_channel_id.unwrap(), + channel_features: ChannelFeatures::empty(), + fee_msat: 0, + cltv_expiry_delta: 40, + maybe_announced_channel: true, + }, + RouteHop { + pubkey: node_c.node_id(), + node_features: NodeFeatures::empty(), + short_channel_id: ch_bc.short_channel_id.unwrap(), + channel_features: ChannelFeatures::empty(), + fee_msat: amount_msat, + cltv_expiry_delta: 0, + maybe_announced_channel: true, + }, + ], + blinded_tail: None, + } +} + +/// Verifies that `locked_msat` increases when a probe is dispatched and returns +/// to zero once the probe resolves (succeeds or fails). +#[tokio::test(flavor = "multi_thread")] +async fn probe_budget_increments_and_decrements() { + let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); + let chain_source = random_chain_source(&bitcoind, &electrsd); + + let node_b = setup_node(&chain_source, random_config(false)); + let node_c = setup_node(&chain_source, random_config(false)); + + let mut config_a = random_config(false); + let strategy = FixedPathStrategy::new(); + config_a.probing = Some( + ProbingConfigBuilder::custom(strategy.clone()) + .interval(Duration::from_millis(PROBING_INTERVAL_MILLISECONDS)) + .max_locked_msat(10 * PROBE_AMOUNT_MSAT) + .build(), + ); + let node_a = setup_node(&chain_source, config_a); + + let addr_a = node_a.onchain_payment().new_address().unwrap(); + let addr_b = node_b.onchain_payment().new_address().unwrap(); + premine_and_distribute_funds( + &bitcoind.client, + &electrsd.client, + vec![addr_a, addr_b], + Amount::from_sat(2_000_000), + ) + .await; + node_a.sync_wallets().unwrap(); + node_b.sync_wallets().unwrap(); + + open_channel(&node_a, &node_b, 1_000_000, true, &electrsd).await; + generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 1).await; + node_b.sync_wallets().unwrap(); + open_channel(&node_b, &node_c, 1_000_000, true, &electrsd).await; + generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 6).await; + + node_a.sync_wallets().unwrap(); + node_b.sync_wallets().unwrap(); + node_c.sync_wallets().unwrap(); + + expect_channel_ready_event!(node_a, node_b.node_id()); + expect_event!(node_b, ChannelReady); + expect_event!(node_b, ChannelReady); + expect_event!(node_c, ChannelReady); + + // Build the probe path now that channels are ready, then enable probing. + strategy.set_path(build_probe_path(&node_a, &node_b, &node_c, PROBE_AMOUNT_MSAT)); + tokio::time::sleep(Duration::from_secs(3)).await; + strategy.start_probing(); + + let went_up = tokio::time::timeout(Duration::from_secs(30), async { + loop { + if node_a.prober().unwrap().locked_msat() > 0 { + break; + } + tokio::time::sleep(Duration::from_millis(100)).await; + } + }) + .await + .is_ok(); + assert!(went_up, "locked_msat never increased — no probe was dispatched"); + println!("First probe dispatched; locked_msat = {}", node_a.prober().unwrap().locked_msat()); + + strategy.stop_probing(); + let cleared = tokio::time::timeout(Duration::from_secs(30), async { + loop { + if node_a.prober().unwrap().locked_msat() == 0 { + break; + } + tokio::time::sleep(Duration::from_millis(100)).await; + } + }) + .await + .is_ok(); + assert!(cleared, "locked_msat never returned to zero after probe resolved"); + + node_a.stop().unwrap(); + node_b.stop().unwrap(); + node_c.stop().unwrap(); +} + +/// Verifies that no new probes are dispatched once the in-flight budget is exhausted. +/// +/// Exhaustion is triggered by stopping the intermediate node (B) while a probe HTLC +/// is in-flight, preventing resolution and keeping the budget locked. After B restarts +/// the HTLC fails, the budget clears, and probing resumes. +#[tokio::test(flavor = "multi_thread")] +async fn exhausted_probe_budget_blocks_new_probes() { + let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); + let chain_source = random_chain_source(&bitcoind, &electrsd); + + let node_b = setup_node(&chain_source, random_config(false)); + let node_c = setup_node(&chain_source, random_config(false)); + + let mut config_a = random_config(false); + let strategy = FixedPathStrategy::new(); + config_a.probing = Some( + ProbingConfigBuilder::custom(strategy.clone()) + .interval(Duration::from_millis(PROBING_INTERVAL_MILLISECONDS)) + .max_locked_msat(PROBE_AMOUNT_MSAT) + .build(), + ); + let node_a = setup_node(&chain_source, config_a); + + let addr_a = node_a.onchain_payment().new_address().unwrap(); + let addr_b = node_b.onchain_payment().new_address().unwrap(); + premine_and_distribute_funds( + &bitcoind.client, + &electrsd.client, + vec![addr_a, addr_b], + Amount::from_sat(2_000_000), + ) + .await; + node_a.sync_wallets().unwrap(); + node_b.sync_wallets().unwrap(); + + open_channel(&node_a, &node_b, 1_000_000, true, &electrsd).await; + generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 1).await; + node_b.sync_wallets().unwrap(); + open_channel(&node_b, &node_c, 1_000_000, true, &electrsd).await; + generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 6).await; + + node_a.sync_wallets().unwrap(); + node_b.sync_wallets().unwrap(); + node_c.sync_wallets().unwrap(); + + expect_channel_ready_event!(node_a, node_b.node_id()); + expect_event!(node_b, ChannelReady); + expect_event!(node_b, ChannelReady); + expect_event!(node_c, ChannelReady); + + let capacity_at_open = node_a + .list_channels() + .iter() + .find(|ch| ch.counterparty_node_id == node_b.node_id()) + .map(|ch| ch.outbound_capacity_msat) + .expect("A→B channel not found"); + + assert_eq!(node_a.prober().map_or(1, |p| p.locked_msat()), 0, "initial locked_msat is nonzero"); + + strategy.set_path(build_probe_path(&node_a, &node_b, &node_c, PROBE_AMOUNT_MSAT)); + tokio::time::sleep(Duration::from_secs(3)).await; + strategy.start_probing(); + + // Wait for the first probe to be in-flight. + let locked = tokio::time::timeout(Duration::from_secs(30), async { + loop { + if node_a.prober().map_or(0, |p| p.locked_msat()) > 0 { + break; + } + tokio::time::sleep(Duration::from_millis(100)).await; + } + }) + .await + .is_ok(); + assert!(locked, "no probe dispatched within 30 s"); + + // Capacity should have decreased due to the in-flight probe HTLC. + let capacity_with_probe = node_a + .list_channels() + .iter() + .find(|ch| ch.counterparty_node_id == node_b.node_id()) + .map(|ch| ch.outbound_capacity_msat) + .expect("A→B channel not found"); + assert!( + capacity_with_probe < capacity_at_open, + "HTLC not visible in channel state: capacity unchanged ({capacity_at_open} msat)" + ); + + // Stop B while the probe HTLC is in-flight. + node_b.stop().unwrap(); + // Pause probing so the budget can clear without a new probe re-locking it. + strategy.stop_probing(); + + tokio::time::sleep(Duration::from_secs(5)).await; + assert!( + node_a.prober().map_or(0, |p| p.locked_msat()) > 0, + "probe resolved unexpectedly while B was offline" + ); + let capacity_after_wait = node_a + .list_channels() + .iter() + .find(|ch| ch.counterparty_node_id == node_b.node_id()) + .map(|ch| ch.outbound_capacity_msat) + .unwrap_or(u64::MAX); + assert!( + capacity_after_wait >= capacity_with_probe, + "a new probe HTLC was sent despite budget being exhausted" + ); + + // strategy.stop_probing(); + + // Bring B back and explicitly reconnect to A and C so the stuck HTLC resolves + // without waiting for the background reconnection backoff. + node_b.start().unwrap(); + let node_a_addr = node_a.listening_addresses().unwrap().first().unwrap().clone(); + let node_c_addr = node_c.listening_addresses().unwrap().first().unwrap().clone(); + node_b.connect(node_a.node_id(), node_a_addr, false).unwrap(); + node_b.connect(node_c.node_id(), node_c_addr, false).unwrap(); + + let cleared = tokio::time::timeout(Duration::from_secs(60), async { + loop { + if node_a.prober().map_or(1, |p| p.locked_msat()) == 0 { + break; + } + tokio::time::sleep(Duration::from_millis(100)).await; + } + }) + .await + .is_ok(); + assert!(cleared, "locked_msat never cleared after B came back online"); + + // Re-enable probing; a new probe should be dispatched within a few ticks. + strategy.start_probing(); + let new_probe = tokio::time::timeout(Duration::from_secs(60), async { + loop { + if node_a.prober().map_or(0, |p| p.locked_msat()) > 0 { + break; + } + tokio::time::sleep(Duration::from_millis(100)).await; + } + }) + .await + .is_ok(); + assert!(new_probe, "no new probe dispatched after budget was freed"); + + node_a.stop().unwrap(); + node_b.stop().unwrap(); + node_c.stop().unwrap(); +}