From f466457599a0779ccda52fdf2ae78f7310e26e17 Mon Sep 17 00:00:00 2001 From: Abeeujah Date: Sat, 11 Apr 2026 18:12:35 +0100 Subject: [PATCH] Batch MPP claims into single ChannelMonitorUpdate Claiming multiple MPP parts on the same channel was partially sequential, requiring claimee to claim the first part and wait for the peer to respond again before other parts can be claimed. This UX results in claim latency, time spent waiting on channel monitor updates, requiring a full round-trip (RAA/CS) for HTLC fulfillment. This change optimizes the process by batching these claims into a single update and a single commitment_signed message. - Introduce UpdateFulfillsCommitFetch enum and the get_update_fulfill_htlcs_and_commit method to Channel. - Update ChannelManager to group claimable HTLCs by counterparty and channel ID before delegation. - Refactor chanmon_update_fail_tests.rs and payment_tests.rs to align with the new atomic batching semantics. Tests has been updated to reflect this new batching of MPP claims - `test_single_channel_multiple_mpp` - `auto_retry_partial_failure` - `test_keysend_dup_hash_partial_mpp` --- lightning/src/ln/chanmon_update_fail_tests.rs | 266 +++--------- lightning/src/ln/channel.rs | 161 +++++++ lightning/src/ln/channelmanager.rs | 402 +++++++++++++++--- lightning/src/ln/payment_tests.rs | 47 +- 4 files changed, 577 insertions(+), 299 deletions(-) diff --git a/lightning/src/ln/chanmon_update_fail_tests.rs b/lightning/src/ln/chanmon_update_fail_tests.rs index 9633800db08..af80e3902b6 100644 --- a/lightning/src/ln/chanmon_update_fail_tests.rs +++ b/lightning/src/ln/chanmon_update_fail_tests.rs @@ -4654,34 +4654,20 @@ fn test_claim_to_closed_channel_blocks_claimed_event() { } #[test] -#[cfg(all(feature = "std", not(target_os = "windows")))] fn test_single_channel_multiple_mpp() { use crate::util::config::UserConfig; - use std::sync::atomic::{AtomicBool, Ordering}; - // Test what happens when we attempt to claim an MPP with many parts that came to us through - // the same channel with a synchronous persistence interface which has very high latency. - // - // Previously, if a `revoke_and_ack` came in while we were still running in - // `ChannelManager::claim_payment` we'd end up hanging waiting to apply a - // `ChannelMonitorUpdate` until after it completed. See the commit which introduced this test - // for more info. + // Test that when an MPP payment has many parts ariving on the same channel, all of them are + // claimed in a single commitment update rather than requiring a round-trip per claim. let chanmon_cfgs = create_chanmon_cfgs(9); let node_cfgs = create_node_cfgs(9, &chanmon_cfgs); let mut config = test_default_channel_config(); - // Set the percentage to the default value at the time this test was written config.channel_handshake_config.announced_channel_max_inbound_htlc_value_in_flight_percentage = 10; let configs: [Option; 9] = core::array::from_fn(|_| Some(config.clone())); let node_chanmgrs = create_node_chanmgrs(9, &node_cfgs, &configs); let mut nodes = create_network(9, &node_cfgs, &node_chanmgrs); - let node_b_id = nodes[1].node.get_our_node_id(); - let node_c_id = nodes[2].node.get_our_node_id(); - let node_d_id = nodes[3].node.get_our_node_id(); - let node_e_id = nodes[4].node.get_our_node_id(); - let node_f_id = nodes[5].node.get_our_node_id(); - let node_g_id = nodes[6].node.get_our_node_id(); let node_h_id = nodes[7].node.get_our_node_id(); let node_i_id = nodes[8].node.get_our_node_id(); @@ -4691,28 +4677,7 @@ fn test_single_channel_multiple_mpp() { // 7 // 8 // - // We can in theory reproduce this issue with fewer channels/HTLCs, but getting this test - // robust is rather challenging. We rely on having the main test thread wait on locks held in - // the background `claim_funds` thread and unlocking when the `claim_funds` thread completes a - // single `ChannelMonitorUpdate`. - // This thread calls `get_and_clear_pending_msg_events()` and `handle_revoke_and_ack()`, both - // of which require `ChannelManager` locks, but we have to make sure this thread gets a chance - // to be blocked on the mutexes before we let the background thread wake `claim_funds` so that - // the mutex can switch to this main thread. - // This relies on our locks being fair, but also on our threads getting runtime during the test - // run, which can be pretty competitive. Thus we do a dumb dance to be as conservative as - // possible - we have a background thread which completes a `ChannelMonitorUpdate` (by sending - // into the `write_blocker` mpsc) but it doesn't run until a mpsc channel sends from this main - // thread to the background thread, and then we let it sleep a while before we send the - // `ChannelMonitorUpdate` unblocker. - // Further, we give ourselves two chances each time, needing 4 HTLCs just to unlock our two - // `ChannelManager` calls. We then need a few remaining HTLCs to actually trigger the bug, so - // we use 6 HTLCs. - // Finaly, we do not run this test on Winblowz because it, somehow, in 2025, does not implement - // actual preemptive multitasking and thinks that cooperative multitasking somehow is - // acceptable in the 21st century, let alone a quarter of the way into it. - const MAX_THREAD_INIT_TIME: std::time::Duration = std::time::Duration::from_secs(1); - + // All six parts converge on the same channel (7->8) create_announced_chan_between_nodes_with_value(&nodes, 0, 1, 100_000, 0); create_announced_chan_between_nodes_with_value(&nodes, 0, 2, 100_000, 0); create_announced_chan_between_nodes_with_value(&nodes, 0, 3, 100_000, 0); @@ -4728,7 +4693,7 @@ fn test_single_channel_multiple_mpp() { create_announced_chan_between_nodes_with_value(&nodes, 6, 7, 100_000, 0); create_announced_chan_between_nodes_with_value(&nodes, 7, 8, 1_000_000, 0); - let (mut route, payment_hash, payment_preimage, payment_secret) = + let (route, payment_hash, payment_preimage, payment_secret) = get_route_and_payment_hash!(&nodes[0], nodes[8], 50_000_000); send_along_route_with_secret( @@ -4747,177 +4712,74 @@ fn test_single_channel_multiple_mpp() { payment_secret, ); - let (do_a_write, blocker) = std::sync::mpsc::sync_channel(0); - *nodes[8].chain_monitor.write_blocker.lock().unwrap() = Some(blocker); - - // Until we have std::thread::scoped we have to unsafe { turn off the borrow checker }. - // We do this by casting a pointer to a `TestChannelManager` to a pointer to a - // `TestChannelManager` with different (in this case 'static) lifetime. - // This is even suggested in the second example at - // https://doc.rust-lang.org/std/mem/fn.transmute.html#examples - let claim_node: &'static TestChannelManager<'static, 'static> = - unsafe { std::mem::transmute(nodes[8].node as &TestChannelManager) }; - let thrd = std::thread::spawn(move || { - // Initiate the claim in a background thread as it will immediately block waiting on the - // `write_blocker` we set above. - claim_node.claim_funds(payment_preimage); - }); - - // First unlock one monitor so that we have a pending - // `update_fulfill_htlc`/`commitment_signed` pair to pass to our counterparty. - do_a_write.send(()).unwrap(); - - let event_node: &'static TestChannelManager<'static, 'static> = - unsafe { std::mem::transmute(nodes[8].node as &TestChannelManager) }; - let thrd_event = std::thread::spawn(move || { - let mut have_event = false; - while !have_event { - let mut events = event_node.get_and_clear_pending_events(); - assert!(events.len() == 1 || events.len() == 0); - if events.len() == 1 { - if let Event::PaymentClaimed { .. } = events[0] { - } else { - panic!("Unexpected event {events:?}"); - } - have_event = true; - } - if !have_event { - std::thread::yield_now(); - } - } - }); - - // Then fetch the `update_fulfill_htlc`/`commitment_signed`. Note that the - // `get_and_clear_pending_msg_events` will immediately hang trying to take a peer lock which - // `claim_funds` is holding. Thus, we release a second write after a small sleep in the - // background to give `claim_funds` a chance to step forward, unblocking - // `get_and_clear_pending_msg_events`. - let do_a_write_background = do_a_write.clone(); - let block_thrd2 = AtomicBool::new(true); - let block_thrd2_read: &'static AtomicBool = unsafe { std::mem::transmute(&block_thrd2) }; - let thrd2 = std::thread::spawn(move || { - while block_thrd2_read.load(Ordering::Acquire) { - std::thread::yield_now(); - } - std::thread::sleep(MAX_THREAD_INIT_TIME); - do_a_write_background.send(()).unwrap(); - std::thread::sleep(MAX_THREAD_INIT_TIME); - do_a_write_background.send(()).unwrap(); - }); - block_thrd2.store(false, Ordering::Release); - let mut first_updates = get_htlc_update_msgs(&nodes[8], &node_h_id); - - // Thread 2 could unblock first, or it could get blocked waiting on us to process a - // `PaymentClaimed` event. Either way, wait until both have finished. - thrd2.join().unwrap(); - thrd_event.join().unwrap(); - - // Disconnect node 6 from all its peers so it doesn't bother to fail the HTLCs back - nodes[7].node.peer_disconnected(node_b_id); - nodes[7].node.peer_disconnected(node_c_id); - nodes[7].node.peer_disconnected(node_d_id); - nodes[7].node.peer_disconnected(node_e_id); - nodes[7].node.peer_disconnected(node_f_id); - nodes[7].node.peer_disconnected(node_g_id); - - let first_update_fulfill = first_updates.update_fulfill_htlcs.remove(0); - nodes[7].node.handle_update_fulfill_htlc(node_i_id, first_update_fulfill); - check_added_monitors(&nodes[7], 1); - expect_payment_forwarded!(nodes[7], nodes[1], nodes[8], Some(1000), false, false); - nodes[7].node.handle_commitment_signed_batch_test(node_i_id, &first_updates.commitment_signed); - check_added_monitors(&nodes[7], 1); - let (raa, cs) = get_revoke_commit_msgs(&nodes[7], &node_i_id); - - // Now, handle the `revoke_and_ack` from node 5. Note that `claim_funds` is still blocked on - // our peer lock, so we have to release a write to let it process. - // After this call completes, the channel previously would be locked up and should not be able - // to make further progress. - let do_a_write_background = do_a_write.clone(); - let block_thrd3 = AtomicBool::new(true); - let block_thrd3_read: &'static AtomicBool = unsafe { std::mem::transmute(&block_thrd3) }; - let thrd3 = std::thread::spawn(move || { - while block_thrd3_read.load(Ordering::Acquire) { - std::thread::yield_now(); - } - std::thread::sleep(MAX_THREAD_INIT_TIME); - do_a_write_background.send(()).unwrap(); - std::thread::sleep(MAX_THREAD_INIT_TIME); - do_a_write_background.send(()).unwrap(); - }); - block_thrd3.store(false, Ordering::Release); - nodes[8].node.handle_revoke_and_ack(node_h_id, &raa); - thrd3.join().unwrap(); - assert!(!thrd.is_finished()); - - let thrd4 = std::thread::spawn(move || { - do_a_write.send(()).unwrap(); - do_a_write.send(()).unwrap(); - }); - - thrd4.join().unwrap(); - thrd.join().unwrap(); - - // At the end, we should have 7 ChannelMonitorUpdates - 6 for HTLC claims, and one for the - // above `revoke_and_ack`. - check_added_monitors(&nodes[8], 7); - - // Now drive everything to the end, at least as far as node 7 is concerned... - *nodes[8].chain_monitor.write_blocker.lock().unwrap() = None; - nodes[8].node.handle_commitment_signed_batch_test(node_h_id, &cs); + // All six parts are on the same channel, so claiming should produce a single batched + // ChannelMonitorUpdate containing all 6 preimages and one commitment. + nodes[8].node.claim_funds(payment_preimage); + expect_payment_claimed!(nodes[8], payment_hash, 50_000_000); check_added_monitors(&nodes[8], 1); - let (mut updates, raa) = get_updates_and_revoke(&nodes[8], &node_h_id); - - nodes[7].node.handle_update_fulfill_htlc(node_i_id, updates.update_fulfill_htlcs.remove(0)); - expect_payment_forwarded!(nodes[7], nodes[2], nodes[8], Some(1000), false, false); - nodes[7].node.handle_update_fulfill_htlc(node_i_id, updates.update_fulfill_htlcs.remove(0)); - expect_payment_forwarded!(nodes[7], nodes[3], nodes[8], Some(1000), false, false); - let mut next_source = 4; - if let Some(update) = updates.update_fulfill_htlcs.get(0) { - nodes[7].node.handle_update_fulfill_htlc(node_i_id, update.clone()); - expect_payment_forwarded!(nodes[7], nodes[4], nodes[8], Some(1000), false, false); - next_source += 1; - } - - nodes[7].node.handle_commitment_signed_batch_test(node_i_id, &updates.commitment_signed); - nodes[7].node.handle_revoke_and_ack(node_i_id, &raa); - if updates.update_fulfill_htlcs.get(0).is_some() { - check_added_monitors(&nodes[7], 5); - } else { - check_added_monitors(&nodes[7], 4); + let mut first_updates = get_htlc_update_msgs(&nodes[8], &node_h_id); + assert_eq!(first_updates.update_fulfill_htlcs.len(), 6); + + // Disconnect node 7 from intermediate nodes so it doesn't bother forwarding back. + nodes[7].node.peer_disconnected(nodes[1].node.get_our_node_id()); + nodes[7].node.peer_disconnected(nodes[2].node.get_our_node_id()); + nodes[7].node.peer_disconnected(nodes[3].node.get_our_node_id()); + nodes[7].node.peer_disconnected(nodes[4].node.get_our_node_id()); + nodes[7].node.peer_disconnected(nodes[5].node.get_our_node_id()); + nodes[7].node.peer_disconnected(nodes[6].node.get_our_node_id()); + + // Deliver all 6 fulfills to node 7 before handling the commitment_signed. + // Each handle_update_fulfill_htlc triggers claim_funds_internal on node 7's upstream + // channels (which are disconnected), generating a preimage monitor update + PaymentForwarded. + for fulfill in first_updates.update_fulfill_htlcs.drain(..) { + nodes[7].node.handle_update_fulfill_htlc(node_i_id, fulfill); + check_added_monitors(&nodes[7], 1); } - - let (raa, cs) = get_revoke_commit_msgs(&nodes[7], &node_i_id); - - nodes[8].node.handle_revoke_and_ack(node_h_id, &raa); - nodes[8].node.handle_commitment_signed_batch_test(node_h_id, &cs); - check_added_monitors(&nodes[8], 2); - - let (mut updates, raa) = get_updates_and_revoke(&nodes[8], &node_h_id); - - nodes[7].node.handle_update_fulfill_htlc(node_i_id, updates.update_fulfill_htlcs.remove(0)); - expect_payment_forwarded!(nodes[7], nodes[next_source], nodes[8], Some(1000), false, false); - next_source += 1; - nodes[7].node.handle_update_fulfill_htlc(node_i_id, updates.update_fulfill_htlcs.remove(0)); - expect_payment_forwarded!(nodes[7], nodes[next_source], nodes[8], Some(1000), false, false); - next_source += 1; - if let Some(update) = updates.update_fulfill_htlcs.get(0) { - nodes[7].node.handle_update_fulfill_htlc(node_i_id, update.clone()); - expect_payment_forwarded!(nodes[7], nodes[next_source], nodes[8], Some(1000), false, false); + let events = nodes[7].node.get_and_clear_pending_events(); + assert_eq!(events.len(), 6); + let mut seen_prev_node_ids = std::collections::HashSet::new(); + for event in events { + match event { + Event::PaymentForwarded { + prev_htlcs, + next_htlcs, + total_fee_earned_msat, + claim_from_onchain_tx, + .. + } => { + assert_eq!(total_fee_earned_msat, Some(1000)); + assert!(!claim_from_onchain_tx); + assert_eq!(prev_htlcs.len(), 1); + assert_eq!(next_htlcs.len(), 1); + let prev_node_id = prev_htlcs[0].node_id.unwrap(); + let next_node_id = next_htlcs[0].node_id.unwrap(); + assert_eq!(next_node_id, node_i_id); + // Each forward should come from a unique intermediate node (1-6) + assert!( + seen_prev_node_ids.insert(prev_node_id), + "Duplicate prev_node_id in PaymentForwarded events" + ); + }, + _ => panic!("Unexpected event {:?}", event), + } } - - nodes[7].node.handle_commitment_signed_batch_test(node_i_id, &updates.commitment_signed); - nodes[7].node.handle_revoke_and_ack(node_i_id, &raa); - if updates.update_fulfill_htlcs.get(0).is_some() { - check_added_monitors(&nodes[7], 5); - } else { - check_added_monitors(&nodes[7], 4); + // Verify all 6 intermediate nodes were seen + for i in 1..=6 { + assert!( + seen_prev_node_ids.contains(&nodes[i].node.get_our_node_id()), + "Missing PaymentForwarded for node {}", + i + ); } - + nodes[7].node.handle_commitment_signed_batch_test(node_i_id, &first_updates.commitment_signed); + check_added_monitors(&nodes[7], 1); let (raa, cs) = get_revoke_commit_msgs(&nodes[7], &node_i_id); + nodes[8].node.handle_revoke_and_ack(node_h_id, &raa); + check_added_monitors(&nodes[8], 1); nodes[8].node.handle_commitment_signed_batch_test(node_h_id, &cs); - check_added_monitors(&nodes[8], 2); + check_added_monitors(&nodes[8], 1); let raa = get_event_msg!(nodes[8], MessageSendEvent::SendRevokeAndACK, node_h_id); nodes[7].node.handle_revoke_and_ack(node_i_id, &raa); diff --git a/lightning/src/ln/channel.rs b/lightning/src/ln/channel.rs index fbf2a4caa9f..49e9621f4e2 100644 --- a/lightning/src/ln/channel.rs +++ b/lightning/src/ln/channel.rs @@ -7702,6 +7702,167 @@ where } } + /// Queues an HTLC claim into the holding cell for later batch release via + /// [`Self::free_holding_cell_htlcs`]. Unlike [`Self::get_update_fulfill_htlc_and_commit`], + /// this does not generate a commitment update or a preimage `ChannelMonitorUpdate` — those + /// will be produced when the holding cell is freed. + /// + /// Returns `true` if the claim was queued, `false` if it was a duplicate. + pub fn queue_claim_htlc( + &mut self, htlc_id_arg: u64, payment_preimage: PaymentPreimage, + attribution_data: Option, logger: &L, + ) -> bool { + if !matches!(self.context.channel_state, ChannelState::ChannelReady(_)) { + log_error!( + logger, + "Attempted to claim HTLC {} on channel {} in non-operational state {:?}", + htlc_id_arg, + &self.context.channel_id(), + &self.context.channel_state + ); + debug_assert!( + false, + "Was asked to fulfill an HTLC when channel was not in an operational state" + ); + return false; + } + + let htlc = + match self.context.pending_inbound_htlcs.iter().find(|h| h.htlc_id == htlc_id_arg) { + Some(h) => h, + None => { + log_trace!( + logger, + "Attempted to claim unknown inbound HTLC {} on channel {}", + htlc_id_arg, + &self.context.channel_id() + ); + return false; + }, + }; + + match htlc.state { + InboundHTLCState::Committed { .. } => {}, + InboundHTLCState::LocalRemoved(ref reason) => { + match reason { + InboundHTLCRemovalReason::Fulfill { .. } => { + log_trace!(logger, "HTLC {} on channel {} already marked fulfilled, ignoring duplicate claim", htlc_id_arg, &self.context.channel_id()); + }, + _ => { + log_warn!(logger, "Have preimage but HTLC {} with payment hash {} on channel {} was already failed; ignoring claim", htlc_id_arg, &htlc.payment_hash, &self.context.channel_id()); + debug_assert!(false, "Tried to fulfill an HTLC that was already failed"); + }, + } + return false; + }, + _ => { + log_error!(logger, "Attempting to claim HTLC {} on channel {} before it was fully committed (state: {})", htlc_id_arg, &self.context.channel_id(), htlc.state); + debug_assert!(false, "Tried to claim an HTLC before it was fully committed"); + return false; + }, + } + + for pending_update in self.context.holding_cell_htlc_updates.iter() { + match pending_update { + &HTLCUpdateAwaitingACK::ClaimHTLC { htlc_id, .. } if htlc_id == htlc_id_arg => { + log_trace!( + logger, + "HTLC {} on channel {} already queued for claim, ignoring duplicate", + htlc_id_arg, + &self.context.channel_id() + ); + return false; + }, + &HTLCUpdateAwaitingACK::FailHTLC { htlc_id, .. } + | &HTLCUpdateAwaitingACK::FailMalformedHTLC { htlc_id, .. } + if htlc_id == htlc_id_arg => + { + log_warn!(logger, "Attempted to claim HTLC {} on channel {} that is already queued for failure", htlc_id_arg, &self.context.channel_id()); + debug_assert!(false, "Tried to fulfill an HTLC that was already failed"); + return false; + }, + _ => {}, + } + } + + log_trace!( + logger, + "Queuing claim for HTLC {} (payment hash {}) on channel {} in holding cell", + htlc_id_arg, + &self + .context + .pending_inbound_htlcs + .iter() + .find(|h| h.htlc_id == htlc_id_arg) + .map(|h| h.payment_hash) + .unwrap_or(PaymentHash([0u8; 32])), + &self.context.channel_id() + ); + self.context.holding_cell_htlc_updates.push(HTLCUpdateAwaitingACK::ClaimHTLC { + payment_preimage, + attribution_data, + htlc_id: htlc_id_arg, + }); + true + } + + /// Builds a preimage-only [`ChannelMonitorUpdate`] for a payment whose claims have just been + /// queued via [`Self::queue_claim_htlc`]. + /// + /// This is used to ensure preimage durability when the holding cell cannot be flushed + /// immediately (e.g. while awaiting RAA, while a monitor update is in progress, while + /// disconnected, or while quiescent). The eventual holding-cell flush will produce a + /// [`ChannelMonitorUpdate`] containing redundant `PaymentPreimage` steps for the same + /// preimage, which is harmless: `ChannelMonitor` is idempotent w.r.t. the preimage, and we + /// do not bother to track `payment_info` again at flush time. + /// + /// Increments `latest_monitor_update_id` by 1 and handles the + /// `blocked_monitor_updates` ID juggling so the new update is consecutive with any held + /// updates. + pub fn build_preimage_only_monitor_update( + &mut self, payment_preimage: PaymentPreimage, payment_info: Option, + logger: &L, + ) -> ChannelMonitorUpdate { + self.context.latest_monitor_update_id += 1; + let mut monitor_update = ChannelMonitorUpdate { + update_id: self.context.latest_monitor_update_id, + updates: vec![ChannelMonitorUpdateStep::PaymentPreimage { + payment_preimage, + payment_info, + }], + channel_id: Some(self.context.channel_id()), + }; + + // If there are blocked monitor updates queued ahead of us, the new preimage update + // has to fly before them, so insert it at the head and bump their IDs (mirrors the + // logic in `get_update_fulfill_htlc_and_commit`). + if !self.context.blocked_monitor_updates.is_empty() { + let new_mon_id = self.context.blocked_monitor_updates[0].update.update_id; + monitor_update.update_id = new_mon_id; + for held_update in self.context.blocked_monitor_updates.iter_mut() { + held_update.update.update_id += 1; + } + } + + // Mark the channel as having a monitor update in progress so that when the update is + // submitted to the chain monitor and persistence completes (synchronously or otherwise), + // `monitor_updating_restored` doesn't trip its `MONITOR_UPDATE_IN_PROGRESS` assertion. + // This mirrors the `monitor_updating_paused` call made in `free_holding_cell_htlcs` + // (the `Some` branch of `maybe_free_holding_cell_htlcs`) and in + // `get_update_fulfill_htlc_and_commit`. + self.monitor_updating_paused( + false, + false, + false, + Vec::new(), + Vec::new(), + Vec::new(), + logger, + ); + + monitor_update + } + /// Returns `Err` (always with [`ChannelError::Ignore`]) if the HTLC could not be failed (e.g. /// if it was already resolved). Otherwise returns `Ok`. pub fn queue_fail_htlc( diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index 570639d8995..c20882c723c 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -9596,26 +9596,41 @@ impl< None }; let payment_info = Some(PaymentClaimDetails { mpp_parts, claiming_payment }); + + // Group sources by (counterparty_node_id, channel_id) so that multiple MPP + // parts on the same channel can be batched into a single commitment update. + let mut grouped_sources: Vec<(PublicKey, ChannelId, Vec)> = Vec::new(); for htlc in sources { - let this_mpp_claim = - pending_mpp_claim_ptr_opt.as_ref().map(|pending_mpp_claim| { - let counterparty_id = htlc.mpp_part.prev_hop.counterparty_node_id; - let counterparty_id = counterparty_id - .expect("Prior to upgrading to LDK 0.1, all pending HTLCs forwarded by LDK 0.0.123 or before must be resolved. It appears at least one claimable payment was not resolved. Please downgrade to LDK 0.0.125 and resolve the HTLC by claiming the payment prior to upgrading."); + let counterparty_id = htlc.mpp_part.prev_hop.counterparty_node_id.expect("Prior to upgrading to LDK 0.1, all pending HTLCs forwarded by LDK 0.0.123 or before must be resolved. It appears at least one claimable payment was not resolved. Please downgrade to LDK 0.0.125 and resolve the HTLC by claiming the payment prior to upgrading."); + let chan_id = htlc.mpp_part.prev_hop.channel_id; + if let Some(group) = grouped_sources + .iter_mut() + .find(|(cp, cid, _)| *cp == counterparty_id && *cid == chan_id) + { + group.2.push(htlc); + } else { + grouped_sources.push((counterparty_id, chan_id, vec![htlc])); + } + } + + for (_, _, group) in grouped_sources { + if group.len() == 1 { + // Single HTLC on this channel, use existing path. + let htlc = group.into_iter().next().unwrap(); + let this_mpp_claim = pending_mpp_claim_ptr_opt.as_ref().map(|pending_mpp_claim| { + let counterparty_id = htlc.mpp_part.prev_hop.counterparty_node_id.expect("Prior to upgrading to LDK 0.1, all pending HTLCs forwarded by LDK 0.0.123 or before must be resolved. It appears at least one claimable payment was not resolved. Please downgrade to LDK 0.0.125 and resolve the HTLC by claiming the payment prior to upgrading."); let claim_ptr = PendingMPPClaimPointer(Arc::clone(pending_mpp_claim)); (counterparty_id, htlc.mpp_part.prev_hop.channel_id, claim_ptr) }); - let raa_blocker = pending_mpp_claim_ptr_opt.as_ref().map(|pending_claim| { - RAAMonitorUpdateBlockingAction::ClaimedMPPPayment { - pending_claim: PendingMPPClaimPointer(Arc::clone(pending_claim)), - } - }); + let raa_blocker = pending_mpp_claim_ptr_opt.as_ref().map(|pending_claim| { + RAAMonitorUpdateBlockingAction::ClaimedMPPPayment { + pending_claim: PendingMPPClaimPointer(Arc::clone(pending_claim)), + } + }); - // Create new attribution data as the final hop. Always report a zero hold time, because reporting a - // non-zero value will not make a difference in the penalty that may be applied by the sender. If there - // is a phantom hop, we need to double-process. - let attribution_data = - if let Some(phantom_secret) = htlc.mpp_part.prev_hop.phantom_shared_secret { + let attribution_data = if let Some(phantom_secret) = + htlc.mpp_part.prev_hop.phantom_shared_secret + { let attribution_data = process_fulfill_attribution_data(None, &phantom_secret, 0); Some(attribution_data) @@ -9623,31 +9638,41 @@ impl< None }; - let attribution_data = process_fulfill_attribution_data( - attribution_data, - &htlc.mpp_part.prev_hop.incoming_packet_shared_secret, - 0, - ); + let attribution_data = process_fulfill_attribution_data( + attribution_data, + &htlc.mpp_part.prev_hop.incoming_packet_shared_secret, + 0, + ); - self.claim_funds_from_hop( - &htlc.mpp_part.prev_hop, - payment_preimage, - payment_info.clone(), - Some(attribution_data), - |_, definitely_duplicate| { - debug_assert!( - !definitely_duplicate, - "We shouldn't claim duplicatively from a payment" - ); - ( - Some(MonitorUpdateCompletionAction::PaymentClaimed { - payment_hash, - pending_mpp_claim: this_mpp_claim, - }), - raa_blocker, - ) - }, - ); + self.claim_funds_from_hop( + &htlc.mpp_part.prev_hop, + payment_preimage, + payment_info.clone(), + Some(attribution_data), + |_, definitely_duplicate| { + debug_assert!( + !definitely_duplicate, + "We shouldn't claim duplicatively from a payment" + ); + ( + Some(MonitorUpdateCompletionAction::PaymentClaimed { + payment_hash, + pending_mpp_claim: this_mpp_claim, + }), + raa_blocker, + ) + }, + ); + } else { + // Multiple HTLCs on the same channel, batch into a single commitment. + self.claim_batch_funds_from_channel( + group, + payment_preimage, + payment_hash, + payment_info.clone(), + &pending_mpp_claim_ptr_opt, + ); + } } } else { for htlc in sources { @@ -10095,6 +10120,266 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ } } + /// Batch-claims multiple HTLCs from the same channel in a single commitment update. + /// + /// This is used when an MPP payment has multiple parts arriving on the same channel, allowing + /// all of them to be fulfilled in one `commitment_signed` message rather than requiring a + /// round-trip (RAA) between each claim. Claims are queued into the channel's holding cell + /// via [`FundedChannel::queue_claim_htlc`] and then released all at once via + /// [`FundedChannel::maybe_free_holding_cell_htlcs`]. + fn claim_batch_funds_from_channel( + &self, htlcs: Vec, payment_preimage: PaymentPreimage, + payment_hash: PaymentHash, payment_info: Option, + pending_mpp_claim_ptr_opt: &Option>>, + ) { + debug_assert!(htlcs.len() > 1); + + let per_peer_state = self.per_peer_state.read().unwrap(); + + let counterparty_node_id = htlcs[0].mpp_part.prev_hop.counterparty_node_id.expect("Prior to upgrading to LDK 0.1, all pending HTLCs forwarded by LDK 0.0.123 or before must be resolved. It appears at least one claimable payment was not resolved. Please downgrade to LDK 0.0.125 and resolve the HTLC by claiming the payment prior to upgrading."); + let chan_id = htlcs[0].mpp_part.prev_hop.channel_id; + + let mut peer_state_lock = per_peer_state.get(&counterparty_node_id).map(|peer_mutex| peer_mutex.lock().unwrap()).expect("Prior to upgrading to LDK 0.1, all pending HTLCs forwarded by LDK 0.0.123 or before must be resolved. It appears at least one claimable payment was not resolved. Please downgrade to LDK 0.0.125 and resolve the HTLC by claiming the payment prior to upgrading."); + + { + let peer_state = &mut *peer_state_lock; + if let hash_map::Entry::Occupied(mut chan_entry) = + peer_state.channel_by_id.entry(chan_id) + { + if let Some(chan) = chan_entry.get_mut().as_funded_mut() { + let logger = WithChannelContext::from(&self.logger, &chan.context, None); + + // Queue all claims into the holding cell. queue_claim_htlc does not + // generate monitor updates or commitment changes; those will all be + // produced in one batch when the holding cell is freed below. + let mut any_new = false; + for htlc in &htlcs { + let attribution_data = if let Some(phantom_secret) = + htlc.mpp_part.prev_hop.phantom_shared_secret + { + Some(process_fulfill_attribution_data(None, &phantom_secret, 0)) + } else { + None + }; + + let attribution_data = process_fulfill_attribution_data( + attribution_data, + &htlc.mpp_part.prev_hop.incoming_packet_shared_secret, + 0, + ); + + if chan.queue_claim_htlc( + htlc.mpp_part.prev_hop.htlc_id, + payment_preimage, + Some(attribution_data), + &&logger, + ) { + any_new = true; + } + } + + if !any_new { + // All claims were duplicates — this is a startup replay. + let during_init = + !self.background_events_processed_since_startup.load(Ordering::Acquire); + + let this_mpp_claim = + pending_mpp_claim_ptr_opt.as_ref().map(|pending_claim| { + ( + counterparty_node_id, + chan_id, + PendingMPPClaimPointer(Arc::clone(pending_claim)), + ) + }); + let raa_blocker = pending_mpp_claim_ptr_opt.as_ref().map(|pending_claim| { + RAAMonitorUpdateBlockingAction::ClaimedMPPPayment { + pending_claim: PendingMPPClaimPointer(Arc::clone(pending_claim)), + } + }); + + if let Some(raa_blocker) = raa_blocker { + let actions = &mut peer_state.actions_blocking_raa_monitor_updates; + let actions_list = actions.entry(chan_id).or_insert_with(Vec::new); + if !actions_list.contains(&raa_blocker) { + debug_assert!(during_init); + actions_list.push(raa_blocker); + } + } + + let action = MonitorUpdateCompletionAction::PaymentClaimed { + payment_hash, + pending_mpp_claim: this_mpp_claim, + }; + + let in_flight_mons = peer_state.in_flight_monitor_updates.get(&chan_id); + if in_flight_mons.map(|(_, mons)| !mons.is_empty()).unwrap_or(false) { + peer_state + .monitor_update_blocked_actions + .entry(chan_id) + .or_insert_with(Vec::new) + .push(action); + return; + } + + mem::drop(peer_state_lock); + mem::drop(per_peer_state); + + debug_assert!( + during_init, + "Duplicate batch claims should only occur during startup replay" + ); + self.handle_monitor_update_completion_actions([action]); + return; + } + + // Try to free the holding cell, producing a single ChannelMonitorUpdate with + // all preimage steps and one commitment update. If the channel cannot + // generate a new commitment right now (awaiting RAA, monitor update in + // progress, peer disconnected, quiescent, ...) we still need to persist a + // preimage-only ChannelMonitorUpdate immediately to ensure the preimage + // is durable across restarts. The eventual natural flush of the holding + // cell will produce the commitment_signed. + let (monitor_opt, holding_cell_failed_htlcs) = + chan.maybe_free_holding_cell_htlcs(&self.fee_estimator, &&logger); + + let monitor_update = if let Some(mut monitor_update) = monitor_opt { + // Inject payment_info into the first PaymentPreimage step that + // matches our preimage (the holding cell may contain unrelated + // claims for other preimages from prior queueing). + for step in monitor_update.updates.iter_mut() { + if let ChannelMonitorUpdateStep::PaymentPreimage { + payment_preimage: ref step_preimage, + payment_info: ref mut info, + } = step + { + if *step_preimage == payment_preimage { + *info = payment_info; + break; + } + } + } + monitor_update + } else { + // Deferred flush: build a preimage-only ChannelMonitorUpdate so the + // preimage is durable now. The commitment_signed will be sent later + // when the holding cell is naturally flushed. + // `build_preimage_only_monitor_update` also marks the channel as having + // a monitor update in progress, mirroring `free_holding_cell_htlcs` + // (the `Some` branch above), so that synchronous persistence completion + // doesn't trip the `MONITOR_UPDATE_IN_PROGRESS` assertion in + // `monitor_updating_restored`. + chan.build_preimage_only_monitor_update( + payment_preimage, + payment_info, + &&logger, + ) + }; + + let this_mpp_claim = pending_mpp_claim_ptr_opt.as_ref().map(|pending_claim| { + ( + counterparty_node_id, + chan_id, + PendingMPPClaimPointer(Arc::clone(pending_claim)), + ) + }); + peer_state + .monitor_update_blocked_actions + .entry(chan_id) + .or_insert(Vec::new()) + .push(MonitorUpdateCompletionAction::PaymentClaimed { + payment_hash, + pending_mpp_claim: this_mpp_claim, + }); + if let Some(raa_blocker) = + pending_mpp_claim_ptr_opt.as_ref().map(|pending_claim| { + RAAMonitorUpdateBlockingAction::ClaimedMPPPayment { + pending_claim: PendingMPPClaimPointer(Arc::clone(pending_claim)), + } + }) { + peer_state + .actions_blocking_raa_monitor_updates + .entry(chan_id) + .or_insert_with(Vec::new) + .push(raa_blocker); + } + let funding_txo = htlcs[0].mpp_part.prev_hop.outpoint; + let post_update_data = self.handle_new_monitor_update( + &mut peer_state.in_flight_monitor_updates, + &mut peer_state.monitor_update_blocked_actions, + &mut peer_state.pending_msg_events, + peer_state.is_connected, + chan, + funding_txo, + monitor_update, + ); + mem::drop(peer_state_lock); + mem::drop(per_peer_state); + self.fail_holding_cell_htlcs( + holding_cell_failed_htlcs, + chan_id, + &counterparty_node_id, + ); + if let Some(data) = post_update_data { + self.handle_post_monitor_update_chan_resume(data); + } + return; + } + return; + } + } + + // Channel is closed, fall back to per-HTLC claiming against the closed channel monitor. + mem::drop(peer_state_lock); + mem::drop(per_peer_state); + let mut this_mpp_claim = pending_mpp_claim_ptr_opt.as_ref().map(|pending_mpp_claim| { + let claim_ptr = PendingMPPClaimPointer(Arc::clone(pending_mpp_claim)); + (counterparty_node_id, chan_id, claim_ptr) + }); + let mut raa_blocker = pending_mpp_claim_ptr_opt.as_ref().map(|pending_claim| { + RAAMonitorUpdateBlockingAction::ClaimedMPPPayment { + pending_claim: PendingMPPClaimPointer(Arc::clone(pending_claim)), + } + }); + for htlc in htlcs { + let attribution_data = + if let Some(phantom_secret) = htlc.mpp_part.prev_hop.phantom_shared_secret { + Some(process_fulfill_attribution_data(None, &phantom_secret, 0)) + } else { + None + }; + + let attribution_data = process_fulfill_attribution_data( + attribution_data, + &htlc.mpp_part.prev_hop.incoming_packet_shared_secret, + 0, + ); + + let first_mpp_claim = this_mpp_claim.take(); + let first_raa_blocker = raa_blocker.take(); + self.claim_funds_from_hop( + &htlc.mpp_part.prev_hop, + payment_preimage, + payment_info.clone(), + Some(attribution_data), + |_, definitely_duplicate| { + debug_assert!( + !definitely_duplicate, + "We shouldn't claim duplicatively from a payment" + ); + ( + first_mpp_claim.map(|claim| { + MonitorUpdateCompletionAction::PaymentClaimed { + payment_hash, + pending_mpp_claim: Some(claim), + } + }), + first_raa_blocker, + ) + }, + ); + } + } + fn finalize_claims(&self, sources: Vec<(HTLCSource, Option)>) { // Decode attribution data to hold times. let hold_times = sources.into_iter().filter_map(|(source, attribution_data)| { @@ -20879,39 +21164,26 @@ mod tests { assert_eq!(events.len(), 1); pass_along_path(&nodes[0], &[&nodes[1]], 200_000, our_payment_hash, Some(payment_secret), events.drain(..).next().unwrap(), true, None); - // Claim the full MPP payment. Note that we can't use a test utility like - // claim_funds_along_route because the ordering of the messages causes the second half of the - // payment to be put in the holding cell, which confuses the test utilities. So we exchange the - // lightning messages manually. + // Claim the full MPP payment. Both parts are on the same channel, so they should be + // batched into a single commitment update. nodes[1].node.claim_funds(payment_preimage); expect_payment_claimed!(nodes[1], our_payment_hash, 200_000); - check_added_monitors(&nodes[1], 2); + check_added_monitors(&nodes[1], 1); - let mut bs_1st_updates = get_htlc_update_msgs(&nodes[1], &nodes[0].node.get_our_node_id()); - nodes[0].node.handle_update_fulfill_htlc(nodes[1].node.get_our_node_id(), bs_1st_updates.update_fulfill_htlcs.remove(0)); + let mut bs_updates = get_htlc_update_msgs(&nodes[1], &nodes[0].node.get_our_node_id()); + assert_eq!(bs_updates.update_fulfill_htlcs.len(), 2); + nodes[0].node.handle_update_fulfill_htlc(nodes[1].node.get_our_node_id(), bs_updates.update_fulfill_htlcs.remove(0)); expect_payment_sent(&nodes[0], payment_preimage, None, false, false); - nodes[0].node.handle_commitment_signed_batch_test(nodes[1].node.get_our_node_id(), &bs_1st_updates.commitment_signed); - check_added_monitors(&nodes[0], 1); - let (as_first_raa, as_first_cs) = get_revoke_commit_msgs(&nodes[0], &nodes[1].node.get_our_node_id()); - nodes[1].node.handle_revoke_and_ack(nodes[0].node.get_our_node_id(), &as_first_raa); - check_added_monitors(&nodes[1], 1); - let mut bs_2nd_updates = get_htlc_update_msgs(&nodes[1], &nodes[0].node.get_our_node_id()); - nodes[1].node.handle_commitment_signed_batch_test(nodes[0].node.get_our_node_id(), &as_first_cs); - check_added_monitors(&nodes[1], 1); - let bs_first_raa = get_event_msg!(nodes[1], MessageSendEvent::SendRevokeAndACK, nodes[0].node.get_our_node_id()); - nodes[0].node.handle_update_fulfill_htlc(nodes[1].node.get_our_node_id(), bs_2nd_updates.update_fulfill_htlcs.remove(0)); - nodes[0].node.handle_commitment_signed_batch_test(nodes[1].node.get_our_node_id(), &bs_2nd_updates.commitment_signed); - check_added_monitors(&nodes[0], 1); - let as_second_raa = get_event_msg!(nodes[0], MessageSendEvent::SendRevokeAndACK, nodes[1].node.get_our_node_id()); - nodes[0].node.handle_revoke_and_ack(nodes[1].node.get_our_node_id(), &bs_first_raa); - let as_second_updates = get_htlc_update_msgs(&nodes[0], &nodes[1].node.get_our_node_id()); + nodes[0].node.handle_update_fulfill_htlc(nodes[1].node.get_our_node_id(), bs_updates.update_fulfill_htlcs.remove(0)); + nodes[0].node.handle_commitment_signed_batch_test(nodes[1].node.get_our_node_id(), &bs_updates.commitment_signed); check_added_monitors(&nodes[0], 1); - nodes[1].node.handle_revoke_and_ack(nodes[0].node.get_our_node_id(), &as_second_raa); + let (as_raa, as_cs) = get_revoke_commit_msgs(&nodes[0], &nodes[1].node.get_our_node_id()); + nodes[1].node.handle_revoke_and_ack(nodes[0].node.get_our_node_id(), &as_raa); check_added_monitors(&nodes[1], 1); - nodes[1].node.handle_commitment_signed_batch_test(nodes[0].node.get_our_node_id(), &as_second_updates.commitment_signed); + nodes[1].node.handle_commitment_signed_batch_test(nodes[0].node.get_our_node_id(), &as_cs); check_added_monitors(&nodes[1], 1); - let bs_third_raa = get_event_msg!(nodes[1], MessageSendEvent::SendRevokeAndACK, nodes[0].node.get_our_node_id()); - nodes[0].node.handle_revoke_and_ack(nodes[1].node.get_our_node_id(), &bs_third_raa); + let bs_raa = get_event_msg!(nodes[1], MessageSendEvent::SendRevokeAndACK, nodes[0].node.get_our_node_id()); + nodes[0].node.handle_revoke_and_ack(nodes[1].node.get_our_node_id(), &bs_raa); check_added_monitors(&nodes[0], 1); // Note that successful MPP payments will generate a single PaymentSent event upon the first diff --git a/lightning/src/ln/payment_tests.rs b/lightning/src/ln/payment_tests.rs index 5b4f5f93d71..4f37b972474 100644 --- a/lightning/src/ln/payment_tests.rs +++ b/lightning/src/ln/payment_tests.rs @@ -3089,55 +3089,38 @@ fn auto_retry_partial_failure() { expect_htlc_failure_conditions(nodes[1].node.get_and_clear_pending_events(), &[]); nodes[1].node.process_pending_htlc_forwards(); expect_payment_claimable!(nodes[1], payment_hash, payment_secret, amt_msat); + // All 3 parts arrived on the same channel (chan_1), so claim_funds batches them + // into a single commitment update with all 3 update_fulfill_htlcmessages. nodes[1].node.claim_funds(payment_preimage); expect_payment_claimed!(nodes[1], payment_hash, amt_msat); + check_added_monitors(&nodes[1], 1); let mut bs_claim = get_htlc_update_msgs(&nodes[1], &node_a_id); - assert_eq!(bs_claim.update_fulfill_htlcs.len(), 1); + assert_eq!(bs_claim.update_fulfill_htlcs.len(), 3); nodes[0].node.handle_update_fulfill_htlc(node_b_id, bs_claim.update_fulfill_htlcs.remove(0)); expect_payment_sent(&nodes[0], payment_preimage, None, false, false); + nodes[0].node.handle_update_fulfill_htlc(node_b_id, bs_claim.update_fulfill_htlcs.remove(0)); + nodes[0].node.handle_update_fulfill_htlc(node_b_id, bs_claim.update_fulfill_htlcs.remove(0)); nodes[0].node.handle_commitment_signed_batch_test(node_b_id, &bs_claim.commitment_signed); check_added_monitors(&nodes[0], 1); let (as_third_raa, as_third_cs) = get_revoke_commit_msgs(&nodes[0], &node_b_id); nodes[1].node.handle_revoke_and_ack(node_a_id, &as_third_raa); - check_added_monitors(&nodes[1], 4); - let mut bs_2nd_claim = get_htlc_update_msgs(&nodes[1], &node_a_id); - - nodes[1].node.handle_commitment_signed_batch_test(node_a_id, &as_third_cs); - check_added_monitors(&nodes[1], 1); - let bs_third_raa = get_event_msg!(nodes[1], MessageSendEvent::SendRevokeAndACK, node_a_id); - - nodes[0].node.handle_revoke_and_ack(node_b_id, &bs_third_raa); - check_added_monitors(&nodes[0], 1); - expect_payment_path_successful!(nodes[0]); - - let bs_second_fulfill_a = bs_2nd_claim.update_fulfill_htlcs.remove(0); - let bs_second_fulfill_b = bs_2nd_claim.update_fulfill_htlcs.remove(0); - nodes[0].node.handle_update_fulfill_htlc(node_b_id, bs_second_fulfill_a); - nodes[0].node.handle_update_fulfill_htlc(node_b_id, bs_second_fulfill_b); - nodes[0].node.handle_commitment_signed_batch_test(node_b_id, &bs_2nd_claim.commitment_signed); - check_added_monitors(&nodes[0], 1); - let (as_fourth_raa, as_fourth_cs) = get_revoke_commit_msgs(&nodes[0], &node_b_id); - - nodes[1].node.handle_revoke_and_ack(node_a_id, &as_fourth_raa); check_added_monitors(&nodes[1], 1); - nodes[1].node.handle_commitment_signed_batch_test(node_a_id, &as_fourth_cs); + nodes[1].node.handle_commitment_signed_batch_test(node_a_id, &as_third_cs); check_added_monitors(&nodes[1], 1); - let bs_second_raa = get_event_msg!(nodes[1], MessageSendEvent::SendRevokeAndACK, node_a_id); + let bs_raa = get_event_msg!(nodes[1], MessageSendEvent::SendRevokeAndACK, node_a_id); - nodes[0].node.handle_revoke_and_ack(node_b_id, &bs_second_raa); + nodes[0].node.handle_revoke_and_ack(node_b_id, &bs_raa); check_added_monitors(&nodes[0], 1); let events = nodes[0].node.get_and_clear_pending_events(); - assert_eq!(events.len(), 2); - if let Event::PaymentPathSuccessful { .. } = events[0] { - } else { - panic!(); - } - if let Event::PaymentPathSuccessful { .. } = events[1] { - } else { - panic!(); + assert_eq!(events.len(), 3); + for event in &events { + if let Event::PaymentPathSuccessful { .. } = event { + } else { + panic!("Unexpected event {:?}", event); + } } }