diff --git a/src/builder.rs b/src/builder.rs index 0b44dc153..f330de025 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -59,9 +59,8 @@ use crate::fee_estimator::OnchainFeeEstimator; use crate::gossip::GossipSource; use crate::io::sqlite_store::SqliteStore; use crate::io::utils::{ - read_event_queue, read_external_pathfinding_scores_from_cache, read_network_graph, - read_node_metrics, read_output_sweeper, read_payments, read_peer_info, read_pending_payments, - read_scorer, + read_all_objects, read_event_queue, read_external_pathfinding_scores_from_cache, + read_network_graph, read_node_metrics, read_output_sweeper, read_peer_info, read_scorer, }; use crate::io::vss_store::VssStoreBuilder; use crate::io::{ @@ -1279,9 +1278,19 @@ fn build_with_store_internal( let (payment_store_res, node_metris_res, pending_payment_store_res) = runtime.block_on(async move { tokio::join!( - read_payments(&*kv_store_ref, Arc::clone(&logger_ref)), + read_all_objects( + &*kv_store_ref, + PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, + PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, + Arc::clone(&logger_ref), + ), read_node_metrics(&*kv_store_ref, Arc::clone(&logger_ref)), - read_pending_payments(&*kv_store_ref, Arc::clone(&logger_ref)) + read_all_objects( + &*kv_store_ref, + PENDING_PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, + PENDING_PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, + Arc::clone(&logger_ref), + ) ) }); diff --git a/src/io/utils.rs b/src/io/utils.rs index ff78c7e91..2b1822285 100644 --- a/src/io/utils.rs +++ b/src/io/utils.rs @@ -44,11 +44,10 @@ use crate::io::{ NODE_METRICS_KEY, NODE_METRICS_PRIMARY_NAMESPACE, NODE_METRICS_SECONDARY_NAMESPACE, }; use crate::logger::{log_error, LdkLogger, Logger}; -use crate::payment::PendingPaymentDetails; use crate::peer_store::PeerStore; use crate::types::{Broadcaster, DynStore, KeysManager, Sweeper}; use crate::wallet::ser::{ChangeSetDeserWrapper, ChangeSetSerWrapper}; -use crate::{Error, EventQueue, NodeMetrics, PaymentDetails}; +use crate::{Error, EventQueue, NodeMetrics}; pub const EXTERNAL_PATHFINDING_SCORES_CACHE_KEY: &str = "external_pathfinding_scores_cache"; @@ -221,21 +220,19 @@ where }) } -/// Read previously persisted payments information from the store. -pub(crate) async fn read_payments( - kv_store: &DynStore, logger: L, -) -> Result, std::io::Error> +/// Read all objects of type `T` from the given namespace, spawning reads in parallel. +pub(crate) async fn read_all_objects( + kv_store: &DynStore, primary_namespace: &str, secondary_namespace: &str, logger: L, +) -> Result, std::io::Error> where + T: Readable, + L: Deref, L::Target: LdkLogger, { + let type_name = std::any::type_name::(); let mut res = Vec::new(); - let mut stored_keys = KVStore::list( - &*kv_store, - PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, - PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, - ) - .await?; + let mut stored_keys = KVStore::list(&*kv_store, primary_namespace, secondary_namespace).await?; const BATCH_SIZE: usize = 50; @@ -244,12 +241,7 @@ where // Fill JoinSet with tasks if possible while set.len() < BATCH_SIZE && !stored_keys.is_empty() { if let Some(next_key) = stored_keys.pop() { - let fut = KVStore::read( - &*kv_store, - PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, - PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, - &next_key, - ); + let fut = KVStore::read(kv_store, primary_namespace, secondary_namespace, &next_key); set.spawn(fut); debug_assert!(set.len() <= BATCH_SIZE); } @@ -259,37 +251,32 @@ where // Exit early if we get an IO error. let reader = read_res .map_err(|e| { - log_error!(logger, "Failed to read PaymentDetails: {}", e); + log_error!(logger, "Failed to read {}: {}", type_name, e); set.abort_all(); e })? .map_err(|e| { - log_error!(logger, "Failed to read PaymentDetails: {}", e); + log_error!(logger, "Failed to read {}: {}", type_name, e); set.abort_all(); e })?; // Refill set for every finished future, if we still have something to do. if let Some(next_key) = stored_keys.pop() { - let fut = KVStore::read( - &*kv_store, - PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, - PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, - &next_key, - ); + let fut = KVStore::read(kv_store, primary_namespace, secondary_namespace, &next_key); set.spawn(fut); debug_assert!(set.len() <= BATCH_SIZE); } // Handle result. - let payment = PaymentDetails::read(&mut &*reader).map_err(|e| { - log_error!(logger, "Failed to deserialize PaymentDetails: {}", e); + let object = T::read(&mut &*reader).map_err(|e| { + log_error!(logger, "Failed to deserialize {}: {}", type_name, e); std::io::Error::new( std::io::ErrorKind::InvalidData, - "Failed to deserialize PaymentDetails", + format!("Failed to deserialize {}", type_name), ) })?; - res.push(payment); + res.push(object); } debug_assert!(set.is_empty()); @@ -632,83 +619,6 @@ pub(crate) fn read_bdk_wallet_change_set( Ok(Some(change_set)) } -/// Read previously persisted pending payments information from the store. -pub(crate) async fn read_pending_payments( - kv_store: &DynStore, logger: L, -) -> Result, std::io::Error> -where - L::Target: LdkLogger, -{ - let mut res = Vec::new(); - - let mut stored_keys = KVStore::list( - &*kv_store, - PENDING_PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, - PENDING_PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, - ) - .await?; - - const BATCH_SIZE: usize = 50; - - let mut set = tokio::task::JoinSet::new(); - - // Fill JoinSet with tasks if possible - while set.len() < BATCH_SIZE && !stored_keys.is_empty() { - if let Some(next_key) = stored_keys.pop() { - let fut = KVStore::read( - &*kv_store, - PENDING_PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, - PENDING_PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, - &next_key, - ); - set.spawn(fut); - debug_assert!(set.len() <= BATCH_SIZE); - } - } - - while let Some(read_res) = set.join_next().await { - // Exit early if we get an IO error. - let reader = read_res - .map_err(|e| { - log_error!(logger, "Failed to read PendingPaymentDetails: {}", e); - set.abort_all(); - e - })? - .map_err(|e| { - log_error!(logger, "Failed to read PendingPaymentDetails: {}", e); - set.abort_all(); - e - })?; - - // Refill set for every finished future, if we still have something to do. - if let Some(next_key) = stored_keys.pop() { - let fut = KVStore::read( - &*kv_store, - PENDING_PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, - PENDING_PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, - &next_key, - ); - set.spawn(fut); - debug_assert!(set.len() <= BATCH_SIZE); - } - - // Handle result. - let pending_payment = PendingPaymentDetails::read(&mut &*reader).map_err(|e| { - log_error!(logger, "Failed to deserialize PendingPaymentDetails: {}", e); - std::io::Error::new( - std::io::ErrorKind::InvalidData, - "Failed to deserialize PendingPaymentDetails", - ) - })?; - res.push(pending_payment); - } - - debug_assert!(set.is_empty()); - debug_assert!(stored_keys.is_empty()); - - Ok(res) -} - #[cfg(test)] mod tests { use super::read_or_generate_seed_file;