Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Improve Pubky profile restore, contact editing, and contact routing flows #905

### Fixed
- Fix probe results and add keysend probes #920
- Align top bar back arrow and passphrase input cursor/placeholder with iOS #906
- Polish Terms of Use screen padding to match iOS #903

Expand Down
4 changes: 4 additions & 0 deletions app/src/main/java/to/bitkit/models/USat.kt
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ value class USat(val value: ULong) : Comparable<USat> {
/** Saturating addition: caps at ULong.MAX_VALUE if result would overflow. */
operator fun plus(other: USat): ULong =
if (value <= ULong.MAX_VALUE - other.value) value + other.value else ULong.MAX_VALUE

/** Saturating multiplication: caps at ULong.MAX_VALUE if result would overflow. */
operator fun times(other: USat): ULong =
if (other.value == 0uL || value <= ULong.MAX_VALUE / other.value) value * other.value else ULong.MAX_VALUE
}

/**
Expand Down
126 changes: 114 additions & 12 deletions app/src/main/java/to/bitkit/repositories/LightningRepo.kt
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,11 @@ import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.asSharedFlow
import kotlinx.coroutines.flow.asStateFlow
import kotlinx.coroutines.flow.distinctUntilChanged
import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.mapNotNull
import kotlinx.coroutines.flow.onSubscription
import kotlinx.coroutines.flow.update
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
Expand All @@ -44,6 +47,7 @@ import org.lightningdevkit.ldknode.ClosureReason
import org.lightningdevkit.ldknode.Event
import org.lightningdevkit.ldknode.NodeStatus
import org.lightningdevkit.ldknode.PaymentDetails
import org.lightningdevkit.ldknode.PaymentHash
import org.lightningdevkit.ldknode.PaymentId
import org.lightningdevkit.ldknode.PeerDetails
import org.lightningdevkit.ldknode.SpendableUtxo
Expand All @@ -60,6 +64,7 @@ import to.bitkit.ext.nowTimestamp
import to.bitkit.ext.toPeerDetailsList
import to.bitkit.models.ALL_ADDRESS_TYPE_STRINGS
import to.bitkit.models.CoinSelectionPreference
import to.bitkit.models.MSat
import to.bitkit.models.NATIVE_WITNESS_TYPES
import to.bitkit.models.NodeLifecycleState
import to.bitkit.models.OpenChannelResult
Expand Down Expand Up @@ -121,6 +126,8 @@ class LightningRepo @Inject constructor(
val isRecoveryMode = _isRecoveryMode.asStateFlow()

private val channelCache = ConcurrentHashMap<String, ChannelDetails>()
private val probeOutcomeCache = ConcurrentHashMap<PaymentId, ProbeOutcome>()
private val probeOutcomeSignal = MutableSharedFlow<ProbeOutcome>(extraBufferCapacity = 64)

private val syncMutex = Mutex()
private val syncPending = AtomicBoolean(false)
Expand Down Expand Up @@ -420,6 +427,7 @@ class LightningRepo @Inject constructor(

private suspend fun onEvent(event: Event) {
handleLdkEvent(event)
recordProbeOutcome(event)
_eventHandlers.toList().forEach {
runCatching { it.invoke(event) }
}
Expand All @@ -441,12 +449,14 @@ class LightningRepo @Inject constructor(
suspend fun stop(): Result<Unit> = withContext(bgDispatcher) {
lifecycleMutex.withLock {
if (_lightningState.value.nodeLifecycleState.isStoppedOrStopping()) {
clearProbeOutcomes()
return@withLock Result.success(Unit)
}

runCatching {
_lightningState.update { it.copy(nodeLifecycleState = NodeLifecycleState.Stopping) }
lightningService.stop()
clearProbeOutcomes()
_lightningState.update { LightningState(nodeLifecycleState = NodeLifecycleState.Stopped) }
}.onFailure {
Logger.error("Node stop error", it, context = TAG)
Expand Down Expand Up @@ -529,6 +539,21 @@ class LightningRepo @Inject constructor(
}
}

private suspend fun recordProbeOutcome(event: Event) {
val outcome = when (event) {
is Event.ProbeSuccessful -> ProbeOutcome.Success(event.paymentId, event.paymentHash)
is Event.ProbeFailed -> ProbeOutcome.Failure(event.paymentId, event.paymentHash, event.shortChannelId)
else -> return
}

probeOutcomeCache[outcome.paymentId] = outcome
probeOutcomeSignal.emit(outcome)
}

private fun clearProbeOutcomes() {
probeOutcomeCache.clear()
}

private suspend fun registerClosedChannel(channelId: String, reason: ClosureReason?) = withContext(bgDispatcher) {
runCatching {
val channel = channelCache[channelId] ?: run {
Expand Down Expand Up @@ -582,6 +607,7 @@ class LightningRepo @Inject constructor(
stop().mapCatching {
Logger.debug("node stopped, calling wipeStorage", context = TAG)
lightningService.wipeStorage(walletIndex)
clearProbeOutcomes()
_lightningState.update {
LightningState(
nodeStatus = it.nodeStatus,
Expand Down Expand Up @@ -1363,23 +1389,74 @@ class LightningRepo @Inject constructor(
// endregion

// region probing
suspend fun sendProbeForInvoice(bolt11: String, amountSats: ULong? = null): Result<Unit> =
suspend fun sendProbeForInvoice(bolt11: String, amountSats: ULong? = null): Result<ProbeDispatch> =
executeWhenNodeRunning("sendProbeForInvoice") {
Logger.debug(
"sendProbeForInvoice: amountSats=${amountSats ?: "null (using invoice amount)"}",
context = TAG
"sendProbeForInvoice: amountSats='${amountSats ?: "null (using invoice amount)"}'",
context = TAG,
)
runCatching {
if (amountSats != null) {
val amountMsat = amountSats * 1000u
lightningService.sendProbesUsingAmount(bolt11, amountMsat)
} else {
lightningService.sendProbes(bolt11)
}
}.getOrElse {
Result.failure(it)
val result = if (amountSats != null) {
val amountMsat = amountSats.safe() * MSat.PER_SAT.safe()
lightningService.sendProbesUsingAmount(bolt11, amountMsat)
} else {
lightningService.sendProbes(bolt11)
}

result.map { ProbeDispatch(paymentIds = it) }
}

suspend fun sendProbeForNode(nodeId: String, amountSats: ULong): Result<ProbeDispatch> =
executeWhenNodeRunning("sendProbeForNode") {
Logger.debug(
"Sending keysend probe to nodeId='$nodeId' amountSats='$amountSats'",
context = TAG,
)
val amountMsat = amountSats.safe() * MSat.PER_SAT.safe()
lightningService.sendKeysendProbe(nodeId, amountMsat).map {
ProbeDispatch(paymentIds = it)
}
}

suspend fun waitForProbeOutcome(
paymentIds: Set<PaymentId>,
timeout: Duration = PROBE_TIMEOUT,
): Result<ProbeOutcome> = withContext(bgDispatcher) {
if (paymentIds.isEmpty()) {
return@withContext Result.failure(ProbeError.NoProbeHandles())
}

val trackedIds = paymentIds.toSet()
val outcome = withTimeoutOrNull(timeout) {
val pending = trackedIds.toMutableSet()
var lastFailure: ProbeOutcome.Failure? = null

probeOutcomeSignal
.onSubscription {
trackedIds.forEach { id ->
probeOutcomeCache[id]?.let { emit(it) }
}
}
.filter { it.paymentId in trackedIds }
.mapNotNull { probeOutcome ->
if (!pending.remove(probeOutcome.paymentId)) return@mapNotNull null

probeOutcomeCache.remove(probeOutcome.paymentId)
when (probeOutcome) {
is ProbeOutcome.Success -> probeOutcome
is ProbeOutcome.Failure -> {
lastFailure = probeOutcome
if (pending.isEmpty()) lastFailure else null
}
}
}
.first()
}

trackedIds.forEach { probeOutcomeCache.remove(it) }

outcome?.let { Result.success(it) }
?: Result.failure(ProbeError.TimedOut())
}
// endregion

suspend fun restartNode(): Result<Unit> = withContext(bgDispatcher) {
Expand All @@ -1404,6 +1481,7 @@ class LightningRepo @Inject constructor(
private const val CHANNELS_READY_TIMEOUT_MS = 15_000L
private const val CHANNELS_USABLE_TIMEOUT_MS = 15_000L
val SEND_LN_TIMEOUT = 10.seconds
private val PROBE_TIMEOUT = 60.seconds
}
}

Expand All @@ -1413,6 +1491,10 @@ class NodeStopTimeoutError : AppError("Timeout waiting for node to stop")
class NodeRunTimeoutError(opName: String) : AppError("Timeout waiting for node to run and execute: '$opName'")
class GetPaymentsError : AppError("It wasn't possible get the payments")
class SyncUnhealthyError : AppError("Wallet sync failed before send")
sealed class ProbeError(message: String) : AppError(message) {
class NoProbeHandles : ProbeError("No probe handles returned")
class TimedOut : ProbeError("Probe timed out")
}

@Stable
data class LightningState(
Expand All @@ -1436,3 +1518,23 @@ data class LightningState(
val isSyncHealthy: Boolean
get() = lastSyncError == null && lastSuccessfulSyncAt != null
}

data class ProbeDispatch(
val paymentIds: Set<PaymentId>,
)

sealed interface ProbeOutcome {
val paymentId: PaymentId
val paymentHash: PaymentHash

data class Success(
override val paymentId: PaymentId,
override val paymentHash: PaymentHash,
) : ProbeOutcome

data class Failure(
override val paymentId: PaymentId,
override val paymentHash: PaymentHash,
val shortChannelId: ULong?,
) : ProbeOutcome
}
34 changes: 27 additions & 7 deletions app/src/main/java/to/bitkit/services/LightningService.kt
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import org.lightningdevkit.ldknode.NodeStatus
import org.lightningdevkit.ldknode.PaymentDetails
import org.lightningdevkit.ldknode.PaymentId
import org.lightningdevkit.ldknode.PeerDetails
import org.lightningdevkit.ldknode.PublicKey
import org.lightningdevkit.ldknode.SpendableUtxo
import org.lightningdevkit.ldknode.Txid
import org.lightningdevkit.ldknode.defaultConfig
Expand All @@ -49,8 +50,8 @@ import to.bitkit.env.Env
import to.bitkit.ext.totalNextOutboundHtlcLimitSats
import to.bitkit.ext.uByteList
import to.bitkit.ext.uri
import to.bitkit.models.msatFloorOf
import to.bitkit.models.OpenChannelResult
import to.bitkit.models.msatFloorOf
import to.bitkit.models.toAddressType
import to.bitkit.utils.AppError
import to.bitkit.utils.LdkError
Expand Down Expand Up @@ -135,6 +136,7 @@ class LightningService @Inject constructor(
trustedPeersNoReserve = trustedPeerNodeIds,
perChannelReserveSats = 1u,
),
probingLiquidityLimitMultiplier = 1uL,
includeUntrustedPendingInSpendable = true,
)
}
Expand Down Expand Up @@ -721,7 +723,7 @@ class LightningService @Inject constructor(
// endregion

// region probing
suspend fun sendProbes(bolt11: String): Result<Unit> {
suspend fun sendProbes(bolt11: String): Result<Set<PaymentId>> {
val node = this.node ?: throw ServiceError.NodeNotSetup()

val bolt11Invoice = runCatching { Bolt11Invoice.fromStr(bolt11) }
Expand All @@ -735,16 +737,16 @@ class LightningService @Inject constructor(

return ServiceQueue.LDK.background {
runCatching {
node.bolt11Payment().sendProbes(bolt11Invoice, null)
Result.success(Unit)
val handles = node.bolt11Payment().sendProbes(bolt11Invoice, null)
Result.success(handles.map { it.paymentId }.toSet())
}.getOrElse {
dumpNetworkGraphInfo(bolt11)
Result.failure(if (it is NodeException) LdkError(it) else it)
}
}
}

suspend fun sendProbesUsingAmount(bolt11: String, amountMsat: ULong): Result<Unit> {
suspend fun sendProbesUsingAmount(bolt11: String, amountMsat: ULong): Result<Set<PaymentId>> {
val node = this.node ?: throw ServiceError.NodeNotSetup()

val bolt11Invoice = runCatching { Bolt11Invoice.fromStr(bolt11) }
Expand All @@ -759,14 +761,32 @@ class LightningService @Inject constructor(

return ServiceQueue.LDK.background {
runCatching {
node.bolt11Payment().sendProbesUsingAmount(bolt11Invoice, amountMsat, null)
Result.success(Unit)
val handles = node.bolt11Payment().sendProbesUsingAmount(bolt11Invoice, amountMsat, null)
Result.success(handles.map { it.paymentId }.toSet())
}.getOrElse {
dumpNetworkGraphInfo(bolt11)
Result.failure(if (it is NodeException) LdkError(it) else it)
}
}
}

suspend fun sendKeysendProbe(nodeId: PublicKey, amountMsat: ULong): Result<Set<PaymentId>> {
val node = this.node ?: throw ServiceError.NodeNotSetup()

Logger.debug(
"Sending keysend probe to nodeId='$nodeId' amountMsat='$amountMsat' (${msatFloorOf(amountMsat)} sats)",
context = TAG,
)

return ServiceQueue.LDK.background {
runCatching {
val handles = node.spontaneousPayment().sendProbes(amountMsat, nodeId)
Result.success(handles.map { it.paymentId }.toSet())
}.getOrElse {
Result.failure(if (it is NodeException) LdkError(it) else it)
}
}
}
// endregion

// region utxo selection
Expand Down
Loading
Loading