Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 14 additions & 5 deletions src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,8 @@ use crate::fee_estimator::OnchainFeeEstimator;
use crate::gossip::GossipSource;
use crate::io::sqlite_store::SqliteStore;
use crate::io::utils::{
read_event_queue, read_external_pathfinding_scores_from_cache, read_network_graph,
read_node_metrics, read_output_sweeper, read_payments, read_peer_info, read_pending_payments,
read_scorer,
read_all_objects, read_event_queue, read_external_pathfinding_scores_from_cache,
read_network_graph, read_node_metrics, read_output_sweeper, read_peer_info, read_scorer,
};
use crate::io::vss_store::VssStoreBuilder;
use crate::io::{
Expand Down Expand Up @@ -1279,9 +1278,19 @@ fn build_with_store_internal(
let (payment_store_res, node_metris_res, pending_payment_store_res) =
runtime.block_on(async move {
tokio::join!(
read_payments(&*kv_store_ref, Arc::clone(&logger_ref)),
read_all_objects(
&*kv_store_ref,
PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE,
PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE,
Arc::clone(&logger_ref),
),
read_node_metrics(&*kv_store_ref, Arc::clone(&logger_ref)),
read_pending_payments(&*kv_store_ref, Arc::clone(&logger_ref))
read_all_objects(
&*kv_store_ref,
PENDING_PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE,
PENDING_PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE,
Arc::clone(&logger_ref),
)
)
});

Expand Down
124 changes: 17 additions & 107 deletions src/io/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,10 @@ use crate::io::{
NODE_METRICS_KEY, NODE_METRICS_PRIMARY_NAMESPACE, NODE_METRICS_SECONDARY_NAMESPACE,
};
use crate::logger::{log_error, LdkLogger, Logger};
use crate::payment::PendingPaymentDetails;
use crate::peer_store::PeerStore;
use crate::types::{Broadcaster, DynStore, KeysManager, Sweeper};
use crate::wallet::ser::{ChangeSetDeserWrapper, ChangeSetSerWrapper};
use crate::{Error, EventQueue, NodeMetrics, PaymentDetails};
use crate::{Error, EventQueue, NodeMetrics};

pub const EXTERNAL_PATHFINDING_SCORES_CACHE_KEY: &str = "external_pathfinding_scores_cache";

Expand Down Expand Up @@ -221,21 +220,19 @@ where
})
}

/// Read previously persisted payments information from the store.
pub(crate) async fn read_payments<L: Deref>(
kv_store: &DynStore, logger: L,
) -> Result<Vec<PaymentDetails>, std::io::Error>
/// Read all objects of type `T` from the given namespace, spawning reads in parallel.
pub(crate) async fn read_all_objects<T, L>(
Comment thread
joostjager marked this conversation as resolved.
kv_store: &DynStore, primary_namespace: &str, secondary_namespace: &str, logger: L,
) -> Result<Vec<T>, std::io::Error>
where
T: Readable,
L: Deref,
L::Target: LdkLogger,
{
let type_name = std::any::type_name::<T>();
let mut res = Vec::new();

let mut stored_keys = KVStore::list(
&*kv_store,
PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE,
PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE,
)
.await?;
let mut stored_keys = KVStore::list(&*kv_store, primary_namespace, secondary_namespace).await?;

const BATCH_SIZE: usize = 50;

Expand All @@ -244,12 +241,7 @@ where
// Fill JoinSet with tasks if possible
while set.len() < BATCH_SIZE && !stored_keys.is_empty() {
if let Some(next_key) = stored_keys.pop() {
let fut = KVStore::read(
&*kv_store,
PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE,
PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE,
&next_key,
);
let fut = KVStore::read(kv_store, primary_namespace, secondary_namespace, &next_key);
set.spawn(fut);
debug_assert!(set.len() <= BATCH_SIZE);
}
Expand All @@ -259,37 +251,32 @@ where
// Exit early if we get an IO error.
let reader = read_res
.map_err(|e| {
log_error!(logger, "Failed to read PaymentDetails: {}", e);
log_error!(logger, "Failed to read {}: {}", type_name, e);
set.abort_all();
e
})?
.map_err(|e| {
log_error!(logger, "Failed to read PaymentDetails: {}", e);
log_error!(logger, "Failed to read {}: {}", type_name, e);
set.abort_all();
e
})?;

// Refill set for every finished future, if we still have something to do.
if let Some(next_key) = stored_keys.pop() {
let fut = KVStore::read(
&*kv_store,
PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE,
PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE,
&next_key,
);
let fut = KVStore::read(kv_store, primary_namespace, secondary_namespace, &next_key);
set.spawn(fut);
debug_assert!(set.len() <= BATCH_SIZE);
}

// Handle result.
let payment = PaymentDetails::read(&mut &*reader).map_err(|e| {
log_error!(logger, "Failed to deserialize PaymentDetails: {}", e);
let object = T::read(&mut &*reader).map_err(|e| {
log_error!(logger, "Failed to deserialize {}: {}", type_name, e);
std::io::Error::new(
std::io::ErrorKind::InvalidData,
"Failed to deserialize PaymentDetails",
format!("Failed to deserialize {}", type_name),
)
})?;
res.push(payment);
res.push(object);
}

debug_assert!(set.is_empty());
Expand Down Expand Up @@ -632,83 +619,6 @@ pub(crate) fn read_bdk_wallet_change_set(
Ok(Some(change_set))
}

/// Read previously persisted pending payments information from the store.
pub(crate) async fn read_pending_payments<L: Deref>(
kv_store: &DynStore, logger: L,
) -> Result<Vec<PendingPaymentDetails>, std::io::Error>
where
L::Target: LdkLogger,
{
let mut res = Vec::new();

let mut stored_keys = KVStore::list(
&*kv_store,
PENDING_PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE,
PENDING_PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE,
)
.await?;

const BATCH_SIZE: usize = 50;

let mut set = tokio::task::JoinSet::new();

// Fill JoinSet with tasks if possible
while set.len() < BATCH_SIZE && !stored_keys.is_empty() {
if let Some(next_key) = stored_keys.pop() {
let fut = KVStore::read(
&*kv_store,
PENDING_PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE,
PENDING_PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE,
&next_key,
);
set.spawn(fut);
debug_assert!(set.len() <= BATCH_SIZE);
}
}

while let Some(read_res) = set.join_next().await {
// Exit early if we get an IO error.
let reader = read_res
.map_err(|e| {
log_error!(logger, "Failed to read PendingPaymentDetails: {}", e);
set.abort_all();
e
})?
.map_err(|e| {
log_error!(logger, "Failed to read PendingPaymentDetails: {}", e);
set.abort_all();
e
})?;

// Refill set for every finished future, if we still have something to do.
if let Some(next_key) = stored_keys.pop() {
let fut = KVStore::read(
&*kv_store,
PENDING_PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE,
PENDING_PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE,
&next_key,
);
set.spawn(fut);
debug_assert!(set.len() <= BATCH_SIZE);
}

// Handle result.
let pending_payment = PendingPaymentDetails::read(&mut &*reader).map_err(|e| {
log_error!(logger, "Failed to deserialize PendingPaymentDetails: {}", e);
std::io::Error::new(
std::io::ErrorKind::InvalidData,
"Failed to deserialize PendingPaymentDetails",
)
})?;
res.push(pending_payment);
}

debug_assert!(set.is_empty());
debug_assert!(stored_keys.is_empty());

Ok(res)
}

#[cfg(test)]
mod tests {
use super::read_or_generate_seed_file;
Expand Down
Loading