Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 14 additions & 14 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,18 +40,18 @@ default = []
#lightning-macros = { version = "0.2.0" }
#lightning-dns-resolver = { version = "0.3.0" }

lightning = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "38a62c32454d3eac22578144c479dbf9a6d9bff6", features = ["std"] }
lightning-types = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "38a62c32454d3eac22578144c479dbf9a6d9bff6" }
lightning-invoice = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "38a62c32454d3eac22578144c479dbf9a6d9bff6", features = ["std"] }
lightning-net-tokio = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "38a62c32454d3eac22578144c479dbf9a6d9bff6" }
lightning-persister = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "38a62c32454d3eac22578144c479dbf9a6d9bff6", features = ["tokio"] }
lightning-background-processor = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "38a62c32454d3eac22578144c479dbf9a6d9bff6" }
lightning-rapid-gossip-sync = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "38a62c32454d3eac22578144c479dbf9a6d9bff6" }
lightning-block-sync = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "38a62c32454d3eac22578144c479dbf9a6d9bff6", features = ["rest-client", "rpc-client", "tokio"] }
lightning-transaction-sync = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "38a62c32454d3eac22578144c479dbf9a6d9bff6", features = ["esplora-async-https", "time", "electrum-rustls-ring"] }
lightning-liquidity = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "38a62c32454d3eac22578144c479dbf9a6d9bff6", features = ["std"] }
lightning-macros = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "38a62c32454d3eac22578144c479dbf9a6d9bff6" }
lightning-dns-resolver = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "38a62c32454d3eac22578144c479dbf9a6d9bff6" }
lightning = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "2313bd584d2c46a50d67b8266f488c07516e0b3c", features = ["std"] }
lightning-types = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "2313bd584d2c46a50d67b8266f488c07516e0b3c" }
lightning-invoice = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "2313bd584d2c46a50d67b8266f488c07516e0b3c", features = ["std"] }
lightning-net-tokio = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "2313bd584d2c46a50d67b8266f488c07516e0b3c" }
lightning-persister = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "2313bd584d2c46a50d67b8266f488c07516e0b3c", features = ["tokio"] }
lightning-background-processor = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "2313bd584d2c46a50d67b8266f488c07516e0b3c" }
lightning-rapid-gossip-sync = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "2313bd584d2c46a50d67b8266f488c07516e0b3c" }
lightning-block-sync = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "2313bd584d2c46a50d67b8266f488c07516e0b3c", features = ["rest-client", "rpc-client", "tokio"] }
lightning-transaction-sync = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "2313bd584d2c46a50d67b8266f488c07516e0b3c", features = ["esplora-async-https", "time", "electrum-rustls-ring"] }
lightning-liquidity = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "2313bd584d2c46a50d67b8266f488c07516e0b3c", features = ["std"] }
lightning-macros = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "2313bd584d2c46a50d67b8266f488c07516e0b3c" }
lightning-dns-resolver = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "2313bd584d2c46a50d67b8266f488c07516e0b3c" }

bdk_chain = { version = "0.23.0", default-features = false, features = ["std"] }
bdk_esplora = { version = "0.22.0", default-features = false, features = ["async-https-rustls", "tokio"]}
Expand Down Expand Up @@ -81,13 +81,13 @@ async-trait = { version = "0.1", default-features = false }
vss-client = { package = "vss-client-ng", version = "0.5" }
prost = { version = "0.11.6", default-features = false}
#bitcoin-payment-instructions = { version = "0.6" }
bitcoin-payment-instructions = { git = "https://github.com/jkczyz/bitcoin-payment-instructions", rev = "a7b32d5fded9bb45f73bf82e6d7187adf705171c" }
bitcoin-payment-instructions = { git = "https://github.com/benthecarman/bitcoin-payment-instructions", rev = "23bb47b2d568571c3191d59881ff048d21537ebd" }

[target.'cfg(windows)'.dependencies]
winapi = { version = "0.3", features = ["winbase"] }

[dev-dependencies]
lightning = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "38a62c32454d3eac22578144c479dbf9a6d9bff6", features = ["std", "_test_utils"] }
lightning = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "2313bd584d2c46a50d67b8266f488c07516e0b3c", features = ["std", "_test_utils"] }
rand = { version = "0.9.2", default-features = false, features = ["std", "thread_rng", "os_rng"] }
proptest = "1.0.0"
regex = "1.5.6"
Expand Down
1 change: 0 additions & 1 deletion bindings/ldk_node.udl
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,6 @@ enum NodeError {

typedef dictionary NodeStatus;

[Remote]
dictionary BestBlock {
BlockHash block_hash;
u32 height;
Expand Down
34 changes: 22 additions & 12 deletions src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use bdk_wallet::{KeychainKind, Wallet as BdkWallet};
use bitcoin::bip32::{ChildNumber, Xpriv};
use bitcoin::key::Secp256k1;
use bitcoin::secp256k1::PublicKey;
use bitcoin::{BlockHash, Network};
use bitcoin::Network;
use bitcoin_payment_instructions::dns_resolver::DNSHrnResolver;
use bitcoin_payment_instructions::onion_message_resolver::LDKOnionMessageDNSSECHrnResolver;
use lightning::chain::{chainmonitor, BestBlock};
Expand All @@ -43,7 +43,6 @@ use lightning::util::persist::{
use lightning::util::ser::ReadableArgs;
use lightning::util::sweep::OutputSweeper;
use lightning_dns_resolver::OMDomainResolver;
use lightning_persister::fs_store::v1::FilesystemStore;
use vss_client::headers::VssHeaderProvider;

use crate::chain::ChainSource;
Expand All @@ -59,9 +58,9 @@ use crate::fee_estimator::OnchainFeeEstimator;
use crate::gossip::GossipSource;
use crate::io::sqlite_store::SqliteStore;
use crate::io::utils::{
read_event_queue, read_external_pathfinding_scores_from_cache, read_network_graph,
read_node_metrics, read_output_sweeper, read_payments, read_peer_info, read_pending_payments,
read_scorer,
open_or_migrate_fs_store, read_event_queue, read_external_pathfinding_scores_from_cache,
read_network_graph, read_node_metrics, read_output_sweeper, read_payments, read_peer_info,
read_pending_payments, read_scorer,
};
use crate::io::vss_store::VssStoreBuilder;
use crate::io::{
Expand Down Expand Up @@ -641,15 +640,22 @@ impl NodeBuilder {
self.build_with_store(node_entropy, kv_store)
}

/// Builds a [`Node`] instance with a [`FilesystemStore`] backend and according to the options
/// Builds a [`Node`] instance with a [`FilesystemStoreV2`] backend and according to the options
/// previously configured.
///
/// If the storage directory contains data from a v1 filesystem store, it will be
/// automatically migrated to the v2 format.
///
/// [`FilesystemStoreV2`]: lightning_persister::fs_store::v2::FilesystemStoreV2
pub fn build_with_fs_store(&self, node_entropy: NodeEntropy) -> Result<Node, BuildError> {
let mut storage_dir_path: PathBuf = self.config.storage_dir_path.clone().into();
storage_dir_path.push("fs_store");

fs::create_dir_all(storage_dir_path.clone())
.map_err(|_| BuildError::StoragePathAccessFailed)?;
let kv_store = FilesystemStore::new(storage_dir_path);

let kv_store = open_or_migrate_fs_store(storage_dir_path)?;

self.build_with_store(node_entropy, kv_store)
}

Expand Down Expand Up @@ -1103,7 +1109,7 @@ impl ArcedNodeBuilder {
self.inner.read().expect("lock").build(*node_entropy).map(Arc::new)
}

/// Builds a [`Node`] instance with a [`FilesystemStore`] backend and according to the options
/// Builds a [`Node`] instance with a [`FilesystemStoreV2`] backend and according to the options
/// previously configured.
pub fn build_with_fs_store(
&self, node_entropy: Arc<NodeEntropy>,
Expand Down Expand Up @@ -1660,8 +1666,12 @@ fn build_with_store_internal(

// If we act as an LSPS2 service, set the HTLC-value-in-flight to 100% of the channel value
// to ensure we can forward the initial payment.
user_config.channel_handshake_config.max_inbound_htlc_value_in_flight_percent_of_channel =
100;
user_config
.channel_handshake_config
.announced_channel_max_inbound_htlc_value_in_flight_percentage = 100;
user_config
.channel_handshake_config
.unannounced_channel_max_inbound_htlc_value_in_flight_percentage = 100;
}

if let Some(role) = async_payments_role {
Expand Down Expand Up @@ -1695,8 +1705,8 @@ fn build_with_store_internal(
user_config,
channel_monitor_references,
);
let (_hash, channel_manager) =
<(BlockHash, ChannelManager)>::read(&mut &*reader, read_args).map_err(|e| {
let (_best_block, channel_manager) =
<(BestBlock, ChannelManager)>::read(&mut &*reader, read_args).map_err(|e| {
log_error!(logger, "Failed to read channel manager from store: {}", e);
BuildError::ReadFailed
})?;
Expand Down
145 changes: 58 additions & 87 deletions src/chain/bitcoind.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
// http://opensource.org/licenses/MIT>, at your option. You may not use this file except in
// accordance with one or both of these licenses.

use std::collections::{HashMap, VecDeque};
use std::collections::HashMap;
use std::fmt;
use std::future::Future;
use std::sync::atomic::{AtomicU64, Ordering};
Expand All @@ -25,8 +25,7 @@ use lightning_block_sync::poll::{ChainPoller, ChainTip, ValidatedBlockHeader};
use lightning_block_sync::rest::RestClient;
use lightning_block_sync::rpc::{RpcClient, RpcClientError};
use lightning_block_sync::{
BlockData, BlockHeaderData, BlockSource, BlockSourceError, BlockSourceErrorKind, Cache,
SpvClient,
BlockData, BlockHeaderData, BlockSource, BlockSourceError, BlockSourceErrorKind, SpvClient,
};
use serde::Serialize;

Expand All @@ -47,9 +46,12 @@ use crate::{Error, NodeMetrics};
const CHAIN_POLLING_INTERVAL_SECS: u64 = 2;
const CHAIN_POLLING_TIMEOUT_SECS: u64 = 10;

type LongLivedSpvClient =
SpvClient<ChainPoller<Arc<BitcoindClient>, BitcoindClient>, Arc<ChainListener>>;

pub(super) struct BitcoindChainSource {
api_client: Arc<BitcoindClient>,
header_cache: tokio::sync::Mutex<BoundedHeaderCache>,
spv_client: tokio::sync::Mutex<Option<LongLivedSpvClient>>,
latest_chain_tip: RwLock<Option<ValidatedBlockHeader>>,
wallet_polling_status: Mutex<WalletSyncStatus>,
fee_estimator: Arc<OnchainFeeEstimator>,
Expand All @@ -72,12 +74,12 @@ impl BitcoindChainSource {
rpc_password.clone(),
));

let header_cache = tokio::sync::Mutex::new(BoundedHeaderCache::new());
let spv_client = tokio::sync::Mutex::new(None);
let latest_chain_tip = RwLock::new(None);
let wallet_polling_status = Mutex::new(WalletSyncStatus::Completed);
Self {
api_client,
header_cache,
spv_client,
latest_chain_tip,
wallet_polling_status,
fee_estimator,
Expand All @@ -103,13 +105,13 @@ impl BitcoindChainSource {
rpc_password,
));

let header_cache = tokio::sync::Mutex::new(BoundedHeaderCache::new());
let spv_client = tokio::sync::Mutex::new(None);
let latest_chain_tip = RwLock::new(None);
let wallet_polling_status = Mutex::new(WalletSyncStatus::Completed);

Self {
api_client,
header_cache,
spv_client,
latest_chain_tip,
wallet_polling_status,
fee_estimator,
Expand Down Expand Up @@ -153,46 +155,43 @@ impl BitcoindChainSource {
return;
}

let channel_manager_best_block_hash = channel_manager.current_best_block().block_hash;
let sweeper_best_block_hash = output_sweeper.current_best_block().block_hash;
let onchain_wallet_best_block_hash = onchain_wallet.current_best_block().block_hash;
let channel_manager_best_block = channel_manager.current_best_block();
let sweeper_best_block = output_sweeper.current_best_block();
let onchain_wallet_best_block = onchain_wallet.current_best_block();

let mut chain_listeners = vec![
(onchain_wallet_best_block_hash, &*onchain_wallet as &(dyn Listen + Send + Sync)),
(channel_manager_best_block_hash, &*channel_manager as &(dyn Listen + Send + Sync)),
(sweeper_best_block_hash, &*output_sweeper as &(dyn Listen + Send + Sync)),
(onchain_wallet_best_block, &*onchain_wallet as &(dyn Listen + Send + Sync)),
(channel_manager_best_block, &*channel_manager as &(dyn Listen + Send + Sync)),
(sweeper_best_block, &*output_sweeper as &(dyn Listen + Send + Sync)),
];

// TODO: Eventually we might want to see if we can synchronize `ChannelMonitor`s
// before giving them to `ChainMonitor` it the first place. However, this isn't
// trivial as we load them on initialization (in the `Builder`) and only gain
// network access during `start`. For now, we just make sure we get the worst known
// block hash and sychronize them via `ChainMonitor`.
if let Some(worst_channel_monitor_block_hash) = chain_monitor
if let Some(worst_channel_monitor_best_block) = chain_monitor
.list_monitors()
.iter()
.flat_map(|channel_id| chain_monitor.get_monitor(*channel_id))
.map(|m| m.current_best_block())
.min_by_key(|b| b.height)
.map(|b| b.block_hash)
{
chain_listeners.push((
worst_channel_monitor_block_hash,
worst_channel_monitor_best_block,
&*chain_monitor as &(dyn Listen + Send + Sync),
));
}

let mut locked_header_cache = self.header_cache.lock().await;
let now = SystemTime::now();
match synchronize_listeners(
self.api_client.as_ref(),
self.config.network,
&mut *locked_header_cache,
chain_listeners.clone(),
)
.await
{
Ok(chain_tip) => {
Ok((header_cache, chain_tip)) => {
{
let elapsed_ms = now.elapsed().map(|d| d.as_millis()).unwrap_or(0);
log_info!(
Expand All @@ -201,6 +200,23 @@ impl BitcoindChainSource {
elapsed_ms,
);
*self.latest_chain_tip.write().expect("lock") = Some(chain_tip);

let chain_listener = Arc::new(ChainListener {
onchain_wallet: Arc::clone(&onchain_wallet),
channel_manager: Arc::clone(&channel_manager),
chain_monitor: Arc::clone(&chain_monitor),
output_sweeper: Arc::clone(&output_sweeper),
});
let chain_poller =
ChainPoller::new(Arc::clone(&self.api_client), self.config.network);
let mut locked_spv_client = self.spv_client.lock().await;
*locked_spv_client = Some(SpvClient::new(
chain_tip,
chain_poller,
header_cache,
chain_listener,
));

let unix_time_secs_opt =
SystemTime::now().duration_since(UNIX_EPOCH).ok().map(|d| d.as_secs());
update_and_persist_node_metrics(
Expand Down Expand Up @@ -394,36 +410,31 @@ impl BitcoindChainSource {

async fn poll_and_update_listeners_inner(
&self, onchain_wallet: Arc<Wallet>, channel_manager: Arc<ChannelManager>,
chain_monitor: Arc<ChainMonitor>, output_sweeper: Arc<Sweeper>,
_chain_monitor: Arc<ChainMonitor>, _output_sweeper: Arc<Sweeper>,
) -> Result<(), Error> {
let latest_chain_tip_opt = self.latest_chain_tip.read().expect("lock").clone();
let chain_tip =
if let Some(tip) = latest_chain_tip_opt { tip } else { self.poll_chain_tip().await? };

let mut locked_header_cache = self.header_cache.lock().await;
let chain_poller = ChainPoller::new(Arc::clone(&self.api_client), self.config.network);
let chain_listener = ChainListener {
onchain_wallet: Arc::clone(&onchain_wallet),
channel_manager: Arc::clone(&channel_manager),
chain_monitor: Arc::clone(&chain_monitor),
output_sweeper,
};
let mut spv_client =
SpvClient::new(chain_tip, chain_poller, &mut *locked_header_cache, &chain_listener);
// Ensure `latest_chain_tip` is populated even if the polling loop runs before
// `synchronize_listeners` has completed.
if self.latest_chain_tip.read().expect("lock").is_none() {
self.poll_chain_tip().await?;
}

let now = SystemTime::now();
match spv_client.poll_best_tip().await {
Ok((ChainTip::Better(tip), true)) => {
let elapsed_ms = now.elapsed().map(|d| d.as_millis()).unwrap_or(0);
log_trace!(self.logger, "Finished polling best tip in {}ms", elapsed_ms);
*self.latest_chain_tip.write().expect("lock") = Some(tip);
},
Ok(_) => {},
Err(e) => {
log_error!(self.logger, "Failed to poll for chain data: {:?}", e);
return Err(Error::TxSyncFailed);
},
let mut locked_spv_client = self.spv_client.lock().await;
if let Some(spv_client) = locked_spv_client.as_mut() {
let now = SystemTime::now();
match spv_client.poll_best_tip().await {
Ok((ChainTip::Better(tip), true)) => {
let elapsed_ms = now.elapsed().map(|d| d.as_millis()).unwrap_or(0);
log_trace!(self.logger, "Finished polling best tip in {}ms", elapsed_ms);
*self.latest_chain_tip.write().expect("lock") = Some(tip);
},
Ok(_) => {},
Err(e) => {
log_error!(self.logger, "Failed to poll for chain data: {:?}", e);
return Err(Error::TxSyncFailed);
},
}
}
drop(locked_spv_client);

let cur_height = channel_manager.current_best_block().height;

Expand Down Expand Up @@ -1350,46 +1361,6 @@ pub(crate) enum FeeRateEstimationMode {
Conservative,
}

const MAX_HEADER_CACHE_ENTRIES: usize = 100;

pub(crate) struct BoundedHeaderCache {
header_map: HashMap<BlockHash, ValidatedBlockHeader>,
recently_seen: VecDeque<BlockHash>,
}

impl BoundedHeaderCache {
pub(crate) fn new() -> Self {
let header_map = HashMap::new();
let recently_seen = VecDeque::new();
Self { header_map, recently_seen }
}
}

impl Cache for BoundedHeaderCache {
fn look_up(&self, block_hash: &BlockHash) -> Option<&ValidatedBlockHeader> {
self.header_map.get(block_hash)
}

fn block_connected(&mut self, block_hash: BlockHash, block_header: ValidatedBlockHeader) {
self.recently_seen.push_back(block_hash);
self.header_map.insert(block_hash, block_header);

if self.header_map.len() >= MAX_HEADER_CACHE_ENTRIES {
// Keep dropping old entries until we've actually removed a header entry.
while let Some(oldest_entry) = self.recently_seen.pop_front() {
if self.header_map.remove(&oldest_entry).is_some() {
break;
}
}
}
}

fn block_disconnected(&mut self, block_hash: &BlockHash) -> Option<ValidatedBlockHeader> {
self.recently_seen.retain(|e| e != block_hash);
self.header_map.remove(block_hash)
}
}

pub(crate) struct ChainListener {
pub(crate) onchain_wallet: Arc<Wallet>,
pub(crate) channel_manager: Arc<ChannelManager>,
Expand Down
Loading
Loading