diff --git a/CHANGELOG.md b/CHANGELOG.md index a1e3292..8713f02 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,34 @@ All notable changes to HyperCache are recorded here. The format follows ### Added +- **Async read-repair batching (Phase 4) + unconditional `ForwardSet`-only repair.** Two composing changes + in the same PR that together cut the wire-call cost of read-repair under quorum reads. (1) The defensive + `ForwardGet` probe in `repairRemoteReplica` is gone — every repair is now exactly one `ForwardSet`, + because the receiver's `applySet` already version-compares and noops downgrades, so the probe was pure + duplication. ~50% wire-call reduction per repair regardless of batching. (2) New opt-in + [`backend.WithDistReadRepairBatch(interval, maxBatchSize)`](pkg/backend/dist_memory.go) option queues + repairs by destination peer + key (last-write-wins by `(version, origin)`) and dispatches per-peer batches + on the interval or when a peer's pending count hits `maxBatchSize`. Concurrent reads of the same hot key + produce ONE repair through the queue, not N — the coalescer collapses duplicate `(peer, key)` entries + and bumps the new `dist.read_repair.coalesced` counter per collapsed enqueue. Disabled by default + (`interval == 0` = current synchronous behavior preserved, so `TestDistMemoryReadRepair` and + `TestDistMemoryRemoveReplication` pass byte-identical). Clean shutdown drains the queue inside `Stop()`; + crash exit loses queued repairs by design, with merkle anti-entropy as the convergence safety net. + New [`pkg/backend/dist_read_repair.go`](pkg/backend/dist_read_repair.go) hosts the `repairQueue` type + with errgroup-driven per-peer parallel `ForwardSet` dispatch. Eight unit tests in + [`pkg/backend/dist_read_repair_test.go`](pkg/backend/dist_read_repair_test.go) cover the coalesce rule + (same `(peer, key)` keeps the higher version, distinct peers stay independent), the size-threshold + inline flush, the nil-transport noop path, the `Stop()` drain semantics, the `(version, origin)` + tie-break rule, and concurrent-enqueue race-safety. Three integration tests in + [`tests/hypercache_distmemory_readrepair_batch_test.go`](tests/hypercache_distmemory_readrepair_batch_test.go) + drive the end-to-end shape — a 3-node RF=3 ConsistencyQuorum cluster, one node's local copy dropped, + N concurrent Gets from a third node — and assert the batched flush heals the dropped node, parallel + reads coalesce to ≤2 dispatches (one per remote owner) regardless of N, and `Stop()` drains queued + repairs before returning. Two new OTel metrics: + `dist.read_repair.batched` (per actual `ForwardSet` dispatched by the queue's flusher) and + `dist.read_repair.coalesced` (per duplicate-enqueue collapsed). New "Tuning — read-repair batching" + section in [`docs/operations.md`](docs/operations.md) covers the option shape, the divergence-window + trade-off, the two metrics, and when to enable it (high read-amplification with stable hot keys). - **Token-refresh visibility for the OIDC source.** Closes RFC 0003 open question 6: the `WithOIDCClientCredentials` source now wraps its `oauth2.TokenSource` with a logger that emits one `"oidc token rotated"` Info line per real rotation (expiry change), staying silent on cached returns. @@ -258,6 +286,23 @@ All notable changes to HyperCache are recorded here. The format follows ### Fixed +- **Set-forward promotion no longer requires the in-process `ErrBackendNotFound` sentinel.** + [`handleForwardPrimary`](pkg/backend/dist_memory.go) used to gate "primary unreachable → promote to + replica" on `errors.Is(errFwd, sentinel.ErrBackendNotFound)`, the error the in-process transport returns + for an unregistered peer. HTTP/gRPC transports against a stopped container surface + `net.OpError` / `io.EOF` / `context.DeadlineExceeded` instead — none of which matched the condition. + Result: when a cluster node was killed (e.g. `docker stop` in + [`scripts/tests/20-test-cluster-resilience.sh`](scripts/tests/20-test-cluster-resilience.sh)), writes for + keys whose primary was the dead node failed immediately at the forwarding hop, no hint was queued, and + the data never landed anywhere — the same 7 of 50 "during-*" writes failed reproducibly in CI's cluster + workflow. Promotion now triggers on **any** non-nil forward error when the local node is in `owners[1:]`, + matching the in-process and production transport behavior under the same resilience contract. Spurious + promotion on a transient blip is benign — `applySet` version-compares on the receiver, and merkle + anti-entropy / `chooseNewer` reconcile any divergent `(version, origin)` pair via the existing + last-write-wins rule. New test [`TestDistSet_PromotesOnGenericForwardError`](tests/hypercache_distmemory_forward_primary_promotion_test.go) + uses the chaos hooks at `DropRate=1.0` to deterministically force a generic forward error and asserts the + Set succeeds via promotion; the existing `TestDistFailureRecovery` continues to pass byte-identical (the + change widens the promotion gate, doesn't narrow it). - **`TestDistRebalanceReplicaDiffThrottle` no longer flakes under `make test-race`.** The test's 900ms hard sleep wasn't enough wall-clock budget for the rebalancer's 80ms-tick loop to actually fire 11 ticks under `-race` + `-shuffle=on`'s scheduler pressure. Replaced the sleep with a 5-second polling loop that exits as diff --git a/Makefile b/Makefile index 0cb90e0..e640a8b 100644 --- a/Makefile +++ b/Makefile @@ -99,6 +99,16 @@ run-example: update-deps: go get -u -t ./... && go mod tidy -v && go mod verify +# check_command_exists expands to a recipe line that succeeds if the +# given command resolves on PATH, otherwise prints " command not +# found" and exits non-zero. Used by prepare-base-tools' chain of +# `$(call check_command_exists,X) || install-fallback` lines: the +# first failure message above is the developer-facing hint; the +# install fallback fires when the tool itself is genuinely missing. +define check_command_exists +@which $(1) > /dev/null 2>&1 || (echo "$(1) command not found" && exit 1) +endef + prepare-toolchain: prepare-base-tools prepare-base-tools: @@ -216,23 +226,18 @@ docs-serve: docs-build PYENV_VERSION=mkdocs mkdocs serve pre-commit: - @eval "$$(pyenv init -)" && \ - pyenv activate pre-commit && \ - pre-commit run -a trailing-whitespace && \ - pre-commit run -a end-of-file-fixer && \ - pre-commit run -a markdownlint && \ - pre-commit run -a yamllint && \ - pre-commit run -a cspell && \ - pre-commit run -a cspell - -# check_command_exists is a helper function that checks if a command exists. -define check_command_exists -@which $(1) > /dev/null 2>&1 || (echo "$(1) command not found" && exit 1) -endef - -ifeq ($(call check_command_exists,$(1)),false) - $(error "$(1) command not found") -endif + @if command -v pyenv >/dev/null 2>&1; then \ + eval "$$(pyenv init -)" && \ + pyenv activate pre-commit && \ + pre-commit run -a trailing-whitespace && \ + pre-commit run -a end-of-file-fixer && \ + pre-commit run -a markdownlint && \ + pre-commit run -a yamllint && \ + pre-commit run -a cspell && \ + pre-commit run -a cspell; \ + else \ + echo "pyenv command not found"; \ + fi # help prints a list of available targets and their descriptions. help: diff --git a/cspell.config.yaml b/cspell.config.yaml index 8aa0189..2394134 100644 --- a/cspell.config.yaml +++ b/cspell.config.yaml @@ -39,6 +39,7 @@ words: - Akudx - aliceonly - ALPN + - amortisation - APITLS - APITLSCA - assertable @@ -68,6 +69,7 @@ words: - clientcredentials - cmap - Cmder + - coalescer - codacy - codebook - codegen @@ -86,12 +88,14 @@ words: - derr - disambiguator - distconfig + - distmemory - distroless - EDITMSG - elif - Equalf - errcheck - errchkjson + - errgroup - Errorf - errp - eventbus @@ -164,6 +168,7 @@ words: - keepalive - keepalives - keyf + - keyfmt - keypair - lamport - lblll @@ -216,6 +221,7 @@ words: - pygments - pymdownx - reaad + - readrepair - recvcheck - rediscluster - Redocly diff --git a/docs/operations.md b/docs/operations.md index bf675f3..e04e1fc 100644 --- a/docs/operations.md +++ b/docs/operations.md @@ -123,6 +123,44 @@ otherwise mark a healthy peer suspect. `dist.heartbeat.indirect_probe.refuted` r probes are saving you from spurious flapping; rising `dist.heartbeat.indirect_probe.failure` indicates the peer is genuinely unreachable from multiple vantage points. +## Tuning — read-repair batching + +Quorum reads (`ConsistencyQuorum` / `ConsistencyAll`) issue best-effort read-repair to every owner of the +chosen item — one `ForwardSet` per owner per read. Under read-heavy workloads on hot keys with stale replicas, +this fan-out becomes the dominant network cost on the dist transport. + +`backend.WithDistReadRepairBatch(interval, maxBatchSize)` opts into async coalescing: repairs are queued by +destination peer + key (last-write-wins by `(version, origin)`) and dispatched by a background flusher on the +configured interval or when a per-peer batch hits `maxBatchSize`. Concurrent reads of the same hot key +produce one repair through the queue, not N. + +```go +dm, _ := backend.NewDistMemory(ctx, + backend.WithDistReadConsistency(backend.ConsistencyQuorum), + backend.WithDistReadRepairBatch(100*time.Millisecond, 64), +) +``` + +**Trade-off.** Batched mode introduces a divergence window of up to `interval` where a stale replica stays +stale. Merkle anti-entropy (`WithDistMerkleAutoSync`) is the safety net for any repair the queue drops on +crash exit; clean shutdown drains the queue inside `Stop()`. Disabled by default — the existing synchronous +read-repair path is preserved when the option is not set. + +**Detection.** Two new metrics quantify the amortisation: + +- `dist.read_repair.batched` — repairs dispatched via the queue's flusher (one bump per actual `ForwardSet`). +- `dist.read_repair.coalesced` — repairs short-circuited because a same-version-or-higher entry was already + queued for the same `(peer, key)`. Every concurrent same-key read past the first bumps this. + +The aggregate `dist.read_repair` counter still bumps per repair attempt (sync dispatch or enqueue). Operators +read `dist.read_repair.batched / dist.read_repair` for the fraction routed through the queue, and +`dist.read_repair.coalesced` as the saved-wire-call counter. + +**When to enable.** High read-amplification with stable hot keys; the typical pattern is a partitioned +workload where a small set of keys takes the majority of reads and one of the owners drops out briefly. Not +useful for write-heavy paths (which don't go through read-repair) or for workloads with a flat key +distribution (the coalescer has little to collapse). + ## Operational tasks ### Drain a node diff --git a/go.mod b/go.mod index 2ce3472..9052eff 100644 --- a/go.mod +++ b/go.mod @@ -20,6 +20,7 @@ require ( go.opentelemetry.io/otel/trace v1.43.0 golang.org/x/crypto v0.51.0 golang.org/x/oauth2 v0.36.0 + golang.org/x/sync v0.20.0 gopkg.in/yaml.v3 v3.0.1 ) @@ -36,6 +37,7 @@ require ( github.com/mattn/go-isatty v0.0.22 // indirect github.com/philhofer/fwd v1.2.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/shamaton/msgpack/v3 v3.1.1 // indirect github.com/tinylib/msgp v1.6.4 // indirect github.com/valyala/bytebufferpool v1.0.0 // indirect github.com/valyala/fasthttp v1.71.0 // indirect diff --git a/go.sum b/go.sum index ccf289e..f38c21f 100644 --- a/go.sum +++ b/go.sum @@ -55,8 +55,8 @@ github.com/redis/go-redis/v9 v9.19.0 h1:XPVaaPSnG6RhYf7p+rmSa9zZfeVAnWsH5h3lxthO github.com/redis/go-redis/v9 v9.19.0/go.mod h1:v/M13XI1PVCDcm01VtPFOADfZtHf8YW3baQf57KlIkA= github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ= github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc= -github.com/shamaton/msgpack/v3 v3.1.0 h1:jsk0vEAqVvvS9+fTZ5/EcQ9tz860c9pWxJ4Iwecz8gU= -github.com/shamaton/msgpack/v3 v3.1.0/go.mod h1:DcQG8jrdrQCIxr3HlMYkiXdMhK+KfN2CitkyzsQV4uc= +github.com/shamaton/msgpack/v3 v3.1.1 h1:1EkrTpc68/H9bziTVw9eDLHLeK2v8aAyyv60quMqIY4= +github.com/shamaton/msgpack/v3 v3.1.1/go.mod h1:DcQG8jrdrQCIxr3HlMYkiXdMhK+KfN2CitkyzsQV4uc= github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= github.com/tinylib/msgp v1.6.4 h1:mOwYbyYDLPj35mkA2BjjYejgJk9BuHxDdvRnb6v2ZcQ= @@ -95,6 +95,8 @@ golang.org/x/net v0.54.0 h1:2zJIZAxAHV/OHCDTCOHAYehQzLfSXuf/5SoL/Dv6w/w= golang.org/x/net v0.54.0/go.mod h1:Sj4oj8jK6XmHpBZU/zWHw3BV3abl4Kvi+Ut7cQcY+cQ= golang.org/x/oauth2 v0.36.0 h1:peZ/1z27fi9hUOFCAZaHyrpWG5lwe0RJEEEeH0ThlIs= golang.org/x/oauth2 v0.36.0/go.mod h1:YDBUJMTkDnJS+A4BP4eZBjCqtokkg1hODuPjwiGPO7Q= +golang.org/x/sync v0.20.0 h1:e0PTpb7pjO8GAtTs2dQ6jYa5BWYlMuX047Dco/pItO4= +golang.org/x/sync v0.20.0/go.mod h1:9xrNwdLfx4jkKbNva9FpL6vEN7evnE43NNNJQ2LF3+0= golang.org/x/sys v0.44.0 h1:ildZl3J4uzeKP07r2F++Op7E9B29JRUy+a27EibtBTQ= golang.org/x/sys v0.44.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw= golang.org/x/text v0.37.0 h1:Cqjiwd9eSg8e0QAkyCaQTNHFIIzWtidPahFWR83rTrc= diff --git a/pkg/backend/dist_memory.go b/pkg/backend/dist_memory.go index b85f50c..a6ca0d4 100644 --- a/pkg/backend/dist_memory.go +++ b/pkg/backend/dist_memory.go @@ -82,6 +82,18 @@ type DistMemory struct { // storeTransport applies the wrapper transparently when chaos is set, // so the rest of the code path is unaware of the chaos surface. chaos *Chaos + // repairQueue (optional) coalesces read-repair fan-out across the + // read path. Set via WithDistReadRepairBatch with interval > 0; + // nil means repairs dispatch synchronously inline (the historical + // behavior). repairRemoteReplica checks this field on every call + // and routes accordingly. + repairQueue *repairQueue + // repairBatchInterval + repairBatchSize hold the option values + // captured at construction so the queue is built once with the + // final config after all options have been applied. Zero + // interval means batching is disabled. + repairBatchInterval time.Duration + repairBatchSize int // configuration (static for now, future: dynamic membership/gossip) replication int virtualNodes int @@ -785,6 +797,40 @@ func WithDistLogger(logger *slog.Logger) DistMemoryOption { } } +// WithDistReadRepairBatch enables async coalescing of read-repair +// fan-out. When interval > 0, repairs from the read path are +// queued by destination peer; the queue flushes periodically OR +// when a peer's pending count hits maxBatchSize. Repairs to the +// same (peer, key) collapse to the highest-version entry — +// concurrent reads of the same hot key produce one repair, not N. +// +// Default (interval = 0 or maxBatchSize <= 0): repairs dispatch +// synchronously inside the Get path. Existing callers asserting +// "replica healed by the time Get returns" see byte-identical +// behavior. +// +// Trade-off: batched mode introduces a window (up to `interval`) +// where a divergent replica stays divergent. Merkle anti-entropy +// is the convergence safety net; the read-repair path is and +// always was best-effort. Stop() drains pending entries before +// returning so a clean shutdown doesn't lose queued repairs. +func WithDistReadRepairBatch(interval time.Duration, maxBatchSize int) DistMemoryOption { + return func(dm *DistMemory) { + if interval <= 0 || maxBatchSize <= 0 { + // Out-of-range values disable batching rather than + // silently coercing to a tiny non-zero value the caller + // didn't intend. + dm.repairBatchInterval = 0 + dm.repairBatchSize = 0 + + return + } + + dm.repairBatchInterval = interval + dm.repairBatchSize = maxBatchSize + } +} + // WithDistTracerProvider supplies an OpenTelemetry TracerProvider for // the dist backend. When set, every public Get/Set/Remove call opens a // span (`dist.get` / `dist.set` / `dist.remove`) carrying consistency @@ -894,6 +940,7 @@ func NewDistMemory(ctx context.Context, opts ...DistMemoryOption) (IBackend[Dist dm.startAutoSyncIfEnabled(lifeCtx) dm.startTombstoneSweeper() dm.startRebalancerIfEnabled(lifeCtx) + dm.startRepairQueueIfEnabled(lifeCtx) return dm, nil } @@ -1190,6 +1237,8 @@ type distMetrics struct { replicaFanoutSet atomic.Int64 replicaFanoutRemove atomic.Int64 readRepair atomic.Int64 + readRepairBatched atomic.Int64 // repairs dispatched via the batched flush path + readRepairCoalesced atomic.Int64 // repairs short-circuited because a queued entry already covered the (peer, key) replicaGetMiss atomic.Int64 heartbeatSuccess atomic.Int64 heartbeatFailure atomic.Int64 @@ -1242,6 +1291,8 @@ type DistMetrics struct { ReplicaFanoutSet int64 ReplicaFanoutRemove int64 ReadRepair int64 + ReadRepairBatched int64 // subset of ReadRepair dispatched via the async coalescer + ReadRepairCoalesced int64 // repairs short-circuited by the coalescer (duplicate same-version entries) ReplicaGetMiss int64 HeartbeatSuccess int64 HeartbeatFailure int64 @@ -1315,6 +1366,8 @@ func (dm *DistMemory) Metrics() DistMetrics { ReplicaFanoutSet: dm.metrics.replicaFanoutSet.Load(), ReplicaFanoutRemove: dm.metrics.replicaFanoutRemove.Load(), ReadRepair: dm.metrics.readRepair.Load(), + ReadRepairBatched: dm.metrics.readRepairBatched.Load(), + ReadRepairCoalesced: dm.metrics.readRepairCoalesced.Load(), ReplicaGetMiss: dm.metrics.replicaGetMiss.Load(), HeartbeatSuccess: dm.metrics.heartbeatSuccess.Load(), HeartbeatFailure: dm.metrics.heartbeatFailure.Load(), @@ -1413,40 +1466,16 @@ func (dm *DistMemory) Stop(ctx context.Context) error { dm.lifeCancel() } - if dm.stopCh != nil { - close(dm.stopCh) - - dm.stopCh = nil - } - - if dm.hintStopCh != nil { - close(dm.hintStopCh) - - dm.hintStopCh = nil - } - - if dm.gossipStopCh != nil { - close(dm.gossipStopCh) - - dm.gossipStopCh = nil - } + dm.closeBackgroundLoops() - if dm.autoSyncStopCh != nil { - close(dm.autoSyncStopCh) + if dm.repairQueue != nil { + // Drain pending entries before returning so a clean shutdown + // dispatches in-flight repairs rather than dropping them. + // stop() blocks until the flusher goroutine emits its final + // flushAll and exits. + dm.repairQueue.stop() - dm.autoSyncStopCh = nil - } - - if dm.tombStopCh != nil { // stop tomb sweeper - close(dm.tombStopCh) - - dm.tombStopCh = nil - } - - if dm.rebalanceStopCh != nil { // stop rebalance loop (was leaking pre-fix) - close(dm.rebalanceStopCh) - - dm.rebalanceStopCh = nil + dm.repairQueue = nil } // Unregister the OTel metric callback before tearing down the HTTP @@ -1565,6 +1594,55 @@ func (dm *DistMemory) RemovePeer(address string) { } } +// startRepairQueueIfEnabled builds + starts the read-repair queue +// when WithDistReadRepairBatch was set with a non-zero interval + +// batch size. The queue holds a closure over loadTransport so it +// always sees the currently-active transport (including chaos-wrapped +// or post-Stop nil transports). +func (dm *DistMemory) startRepairQueueIfEnabled(ctx context.Context) { + if dm.repairBatchInterval <= 0 || dm.repairBatchSize <= 0 { + return + } + + dm.repairQueue = newRepairQueue( + dm.repairBatchInterval, + dm.repairBatchSize, + dm.loadTransport, + &dm.metrics, + dm.logger, + ) + dm.repairQueue.start(ctx) + + dm.logger.Info( + "read-repair batching enabled", + slog.Duration("interval", dm.repairBatchInterval), + slog.Int("max_batch_size", dm.repairBatchSize), + ) +} + +// closeBackgroundLoops closes every stopCh wired into a background +// goroutine launched by NewDistMemory. Extracted from Stop to keep +// that method under the function-length budget; ordering is not +// load-bearing (each loop's select handles its own teardown). +func (dm *DistMemory) closeBackgroundLoops() { + stopChs := []*chan struct{}{ + &dm.stopCh, + &dm.hintStopCh, + &dm.gossipStopCh, + &dm.autoSyncStopCh, + &dm.tombStopCh, + &dm.rebalanceStopCh, + } + + for _, ch := range stopChs { + if *ch != nil { + close(*ch) + + *ch = nil + } + } +} + // logClusterJoin emits a single structured line summarizing the cluster // shape this node is about to join. The most operator-visible event in // the cluster lifecycle: one log per node startup that captures every @@ -2696,21 +2774,16 @@ func (dm *DistMemory) repairStaleOwners( return } + // Route through repairRemoteReplica so this path benefits from + // the same probe-drop + opt-in batching as the primary + // read-repair path. The send-and-let-receiver-noop pattern + // is safe because applySet on the receiver version-compares. for _, oid := range staleOwners { if oid == dm.localNode.ID { // local handled in repairReplicas continue } - it, ok, err := transport.ForwardGet(ctx, string(oid), key) - if err != nil { // skip unreachable - continue - } - - if !ok || it.Version < chosen.Version || (it.Version == chosen.Version && it.Origin > chosen.Origin) { - _ = transport.ForwardSet(ctx, string(oid), chosen, false) - - dm.metrics.readRepair.Add(1) - } + dm.repairRemoteReplica(ctx, key, chosen, oid) } } @@ -3514,27 +3587,56 @@ func (dm *DistMemory) repairLocalReplica(ctx context.Context, key string, chosen } } -// repairRemoteReplica updates a remote replica if stale (best-effort). +// repairRemoteReplica dispatches a stale-replica repair to a remote owner. +// +// We send the chosen item unconditionally — no probe-then-write dance — because +// the receiver's applySet already version-compares and noops a write older or +// equal to what it holds. The previous shape did a defensive ForwardGet first; +// dropping it cuts every repair from two transport calls to one, with the +// receiver's version logic providing the same staleness gate it always did. +// +// If WithDistReadRepairBatch was set, the repair is enqueued for coalescing +// instead of dispatching synchronously; the read path returns without waiting +// for the wire call. The async window is bounded by the configured interval +// and drained on Stop. func (dm *DistMemory) repairRemoteReplica( ctx context.Context, - key string, + _ string, chosen *cache.Item, oid cluster.NodeID, -) { // separated to reduce cyclomatic complexity +) { transport := dm.loadTransport() if transport == nil { // cannot repair remote return } - it, ok, _ := transport.ForwardGet(ctx, string(oid), key) - if !ok || it.Version < chosen.Version || (it.Version == chosen.Version && it.Origin > chosen.Origin) { // stale - _ = transport.ForwardSet(ctx, string(oid), chosen, false) - + if dm.repairQueue != nil { + dm.repairQueue.enqueue(ctx, oid, chosen) dm.metrics.readRepair.Add(1) + + return } + + _ = transport.ForwardSet(ctx, string(oid), chosen, false) + + dm.metrics.readRepair.Add(1) } -// handleForwardPrimary tries to forward a Set to the primary; returns (proceedAsPrimary,false) if promotion required. +// handleForwardPrimary tries to forward a Set to the primary; returns +// (proceedAsPrimary, err). On any non-nil forward error — not just +// ErrBackendNotFound — the function attempts to promote to a replica +// owner if the local node is in the replica list. This is the correct +// resilience contract: the in-process transport surfaces unreachable +// peers as ErrBackendNotFound, but HTTP/gRPC transports against a +// stopped container surface net.OpError / io.EOF / +// context.DeadlineExceeded. Gating promotion on the in-process-only +// sentinel meant production forwarding failures dropped writes to keys +// whose primary had just been killed, until the next merkle tick. +// +// A spurious promotion (primary was healthy but the call hit a +// transient blip) is benign: applySet on the receiver version-compares, +// and `chooseNewer` / merkle anti-entropy reconcile any divergent +// `(version, origin)` pair through the existing last-write-wins rule. func (dm *DistMemory) handleForwardPrimary(ctx context.Context, owners []cluster.NodeID, item *cache.Item) (bool, error) { transport := dm.loadTransport() if transport == nil { @@ -3544,26 +3646,23 @@ func (dm *DistMemory) handleForwardPrimary(ctx context.Context, owners []cluster dm.metrics.forwardSet.Add(1) errFwd := transport.ForwardSet(ctx, string(owners[0]), item, true) - switch { - case errFwd == nil: - return false, nil // forwarded successfully - case errors.Is(errFwd, sentinel.ErrBackendNotFound) && len(owners) > 1: - // primary missing: promote if this node is a listed replica - for _, oid := range owners[1:] { - if oid == dm.localNode.ID { // we can promote - if !dm.ownsKeyInternal(item.Key) { // still not recognized locally (ring maybe outdated) - return false, errFwd - } + if errFwd == nil { + return false, nil + } - return true, nil // proceed as primary path + // Primary unreachable for any reason — try to promote to a replica + // owner. The local node must be in owners[1:] AND still recognize + // itself as an owner (defensive against a stale ring snapshot + // mid-membership-change). + if len(owners) > 1 { + for _, oid := range owners[1:] { + if oid == dm.localNode.ID && dm.ownsKeyInternal(item.Key) { + return true, nil } } - - return false, errFwd // not promotable - - default: - return false, errFwd } + + return false, errFwd } // initStandaloneMembership initializes membership & ring for standalone mode with optional seeds. @@ -3871,6 +3970,16 @@ var distMetricSpecs = []distMetricSpec{ desc: "Read-repair operations triggered by stale-replica reads", get: func(m DistMetrics) int64 { return m.ReadRepair }, }, + { + name: "dist.read.repair.batched", unit: unitOp, counter: true, + desc: "Read-repair operations dispatched via the async coalescer (subset of dist.read.repair)", + get: func(m DistMetrics) int64 { return m.ReadRepairBatched }, + }, + { + name: "dist.read.repair.coalesced", unit: unitOp, counter: true, + desc: "Read-repair operations short-circuited by the coalescer because a queued entry already covered the (peer, key)", + get: func(m DistMetrics) int64 { return m.ReadRepairCoalesced }, + }, { name: "dist.replica.get.miss", unit: unitOp, counter: true, desc: "Replica Get returned not-found for a key the primary holds", diff --git a/pkg/backend/dist_read_repair.go b/pkg/backend/dist_read_repair.go new file mode 100644 index 0000000..8643f46 --- /dev/null +++ b/pkg/backend/dist_read_repair.go @@ -0,0 +1,276 @@ +package backend + +import ( + "context" + "log/slog" + "sync" + "time" + + "golang.org/x/sync/errgroup" + + "github.com/hyp3rd/hypercache/internal/cluster" + cache "github.com/hyp3rd/hypercache/pkg/cache/v2" +) + +// repairQueue is the opt-in async coalescer for read-repair fan-out. +// When configured via WithDistReadRepairBatch, repairRemoteReplica +// enqueues into this queue instead of dispatching synchronously; +// a background flusher groups pending repairs by destination peer +// and dispatches them on the configured interval (or as soon as a +// per-peer batch reaches maxBatchSize). +// +// Coalescing rule: per (peer, key), only the highest-version item +// is kept. Concurrent reads of the same hot key produce one repair, +// not N — the second enqueue sees the first's entry and replaces +// it iff the new version is higher, otherwise short-circuits and +// bumps ReadRepairCoalesced. +// +// Lifecycle: started by NewDistMemory when the option is set; +// drained on Stop() so in-flight repairs aren't dropped on a clean +// shutdown. Crash exit loses queued repairs by design — merkle +// anti-entropy is the safety net for any persistence-grade +// convergence guarantee. +type repairQueue struct { + interval time.Duration + maxBatchSize int + + mu sync.Mutex + entries map[cluster.NodeID]map[string]*cache.Item + + transport func() DistTransport + metrics *distMetrics + logger *slog.Logger + + stopCh chan struct{} + doneCh chan struct{} +} + +// newRepairQueue constructs the queue. interval > 0 and +// maxBatchSize > 0 are caller invariants; the option layer enforces +// these so the queue itself stays focused on the dispatch logic. +func newRepairQueue( + interval time.Duration, + maxBatchSize int, + transport func() DistTransport, + metrics *distMetrics, + logger *slog.Logger, +) *repairQueue { + return &repairQueue{ + interval: interval, + maxBatchSize: maxBatchSize, + entries: map[cluster.NodeID]map[string]*cache.Item{}, + transport: transport, + metrics: metrics, + logger: logger, + stopCh: make(chan struct{}), + doneCh: make(chan struct{}), + } +} + +// start launches the background flusher. Idempotent guard is the +// caller's responsibility (newRepairQueue + start happen exactly +// once from NewDistMemory). +func (q *repairQueue) start(ctx context.Context) { + go q.run(ctx) +} + +// stop signals shutdown, waits for the flusher to drain remaining +// entries, then returns. Safe to call multiple times — repeated +// closes on stopCh are guarded by the channel's already-closed +// state via a select. +func (q *repairQueue) stop() { + select { + case <-q.stopCh: + // already closed + default: + close(q.stopCh) + } + + <-q.doneCh +} + +// enqueue records a pending repair for (peer, item.Key). If a +// higher-or-equal-version entry already exists for that key on +// that peer, the new entry is dropped and ReadRepairCoalesced is +// bumped. If the new entry replaces a lower-version one, the +// counter is also bumped (a redundant repair was just collapsed). +// +// Triggers an inline flush of the peer's batch if it reaches +// maxBatchSize. The inline flush runs in its own goroutine so +// enqueue returns immediately to the read path. The detached +// goroutine uses context.WithoutCancel(ctx) so it inherits tracing +// values from the caller without being cancelled when the request +// scope ends (this is fire-and-forget by design — the read path +// has already returned by the time the flush completes). +func (q *repairQueue) enqueue(ctx context.Context, peer cluster.NodeID, item *cache.Item) { + if item == nil { + return + } + + q.mu.Lock() + + peerEntries := q.entries[peer] + if peerEntries == nil { + peerEntries = map[string]*cache.Item{} + q.entries[peer] = peerEntries + } + + existing, found := peerEntries[item.Key] + if found && !isHigherVersion(item, existing) { + // Existing entry is at-least-as-new; new repair is redundant. + q.metrics.readRepairCoalesced.Add(1) + q.mu.Unlock() + + return + } + + if found { + // Replacing a lower-version entry — still a coalesce (a + // redundant wire call is being saved). + q.metrics.readRepairCoalesced.Add(1) + } + + cloned := *item + + peerEntries[item.Key] = &cloned + + shouldFlush := len(peerEntries) >= q.maxBatchSize + q.mu.Unlock() + + if shouldFlush { + flushCtx := context.WithoutCancel(ctx) + + go q.flushPeer(flushCtx, peer) + } +} + +// run is the flusher goroutine body. Ticks on interval, flushes +// everything; on stopCh signal, does one final flushAll, then +// closes doneCh so stop() can return. Shutdown flushes use +// context.WithoutCancel so the drain dispatches even when the +// parent ctx is already Done — losing queued repairs on a clean +// stop would defeat the drain guarantee. +func (q *repairQueue) run(ctx context.Context) { + defer close(q.doneCh) + + ticker := time.NewTicker(q.interval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + q.flushAll(context.WithoutCancel(ctx)) + + return + + case <-q.stopCh: + q.flushAll(context.WithoutCancel(ctx)) + + return + + case <-ticker.C: + q.flushAll(ctx) + } + } +} + +// flushAll drains every peer's pending entries. Used by the +// ticker path and by stop(). Per-peer flushes run sequentially +// here; per-peer parallelism happens INSIDE flushPeer via errgroup. +func (q *repairQueue) flushAll(ctx context.Context) { + q.mu.Lock() + + peers := make([]cluster.NodeID, 0, len(q.entries)) + for peer := range q.entries { + peers = append(peers, peer) + } + + q.mu.Unlock() + + for _, peer := range peers { + q.flushPeer(ctx, peer) + } +} + +// flushPeer drains one peer's pending entries and dispatches them +// in parallel via ForwardSet calls. Per-call failures are absorbed +// — read-repair is best-effort, merkle anti-entropy carries the +// convergence guarantee. +// +// Detaches the slice under the lock then releases before doing +// network I/O so enqueue() callers don't block on the wire. +func (q *repairQueue) flushPeer(ctx context.Context, peer cluster.NodeID) { + q.mu.Lock() + + peerEntries := q.entries[peer] + if len(peerEntries) == 0 { + q.mu.Unlock() + + return + } + + delete(q.entries, peer) + q.mu.Unlock() + + transport := q.transport() + if transport == nil { + // Transport not configured (or torn down mid-flush). The + // repairs are dropped; merkle anti-entropy will catch up. + return + } + + items := make([]*cache.Item, 0, len(peerEntries)) + for _, it := range peerEntries { + items = append(items, it) + } + + var g errgroup.Group + + for _, it := range items { + g.Go(func() error { + err := transport.ForwardSet(ctx, string(peer), it, false) + if err != nil { + // Best-effort — log at Debug, count toward the + // aggregate readRepair budget anyway since the + // receiver might still apply some of them. Failures + // don't propagate to enqueue callers; the read path + // has already returned. + if q.logger != nil { + q.logger.Debug( + "read-repair flush: ForwardSet failed", + slog.String("peer", string(peer)), + slog.String("key", it.Key), + slog.Any("err", err), + ) + } + } + + q.metrics.readRepairBatched.Add(1) + + return nil + }) + } + + _ = g.Wait() +} + +// isHigherVersion reports whether `candidate` should replace +// `existing` in the queue. Mirrors the chosen-version rule used +// by repairLocalReplica (dist_memory.go:3508) — strict version +// comparison, ties broken by origin string. Same rule on both +// the local-repair and queued-repair sides keeps the convergence +// semantics consistent. +func isHigherVersion(candidate, existing *cache.Item) bool { + if candidate.Version > existing.Version { + return true + } + + if candidate.Version < existing.Version { + return false + } + + // Versions equal: tie-break by origin (lower origin string wins, + // matching repairLocalReplica's `localIt.Origin > chosen.Origin` + // condition). + return candidate.Origin < existing.Origin +} diff --git a/pkg/backend/dist_read_repair_test.go b/pkg/backend/dist_read_repair_test.go new file mode 100644 index 0000000..3e089d3 --- /dev/null +++ b/pkg/backend/dist_read_repair_test.go @@ -0,0 +1,429 @@ +package backend + +import ( + "context" + "log/slog" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/hyp3rd/hypercache/internal/cluster" + cache "github.com/hyp3rd/hypercache/pkg/cache/v2" +) + +// captureTransport records every ForwardSet call's (peer, key, +// version) tuple so unit tests can assert "exactly these repairs +// fired". Other DistTransport methods are stubs — the queue only +// dispatches via ForwardSet, so the other verbs aren't exercised. +type captureTransport struct { + mu sync.Mutex + calls []capturedSet + + // flushDelay simulates per-call latency so parallelism is + // visible in wall-clock measurements (used by the parallel-flush + // test). Zero disables the delay. + flushDelay time.Duration +} + +type capturedSet struct { + peer string + key string + version uint64 +} + +func (c *captureTransport) ForwardSet(_ context.Context, peer string, item *cache.Item, _ bool) error { + if c.flushDelay > 0 { + time.Sleep(c.flushDelay) + } + + c.mu.Lock() + defer c.mu.Unlock() + + c.calls = append(c.calls, capturedSet{peer: peer, key: item.Key, version: item.Version}) + + return nil +} + +func (*captureTransport) ForwardGet(_ context.Context, _, _ string) (*cache.Item, bool, error) { + return nil, false, nil +} + +func (*captureTransport) ForwardRemove(_ context.Context, _, _ string, _ bool) error { + return nil +} + +func (*captureTransport) Health(_ context.Context, _ string) error { return nil } +func (*captureTransport) IndirectHealth(_ context.Context, _, _ string) error { return nil } + +func (*captureTransport) Gossip(_ context.Context, _ string, _ []GossipMember) error { + return nil +} + +func (*captureTransport) FetchMerkle(_ context.Context, _ string) (*MerkleTree, error) { + return nil, nil //nolint:nilnil // stub never invoked by these unit tests +} + +func (c *captureTransport) callCount() int { + c.mu.Lock() + defer c.mu.Unlock() + + return len(c.calls) +} + +// newQueueForTest builds a repairQueue wired to the supplied +// transport. Interval is set high enough (5s) that no tick fires +// during the test body unless the test explicitly flushes — that +// keeps the assertions deterministic. +func newQueueForTest(transport DistTransport, batchSize int) (*repairQueue, *distMetrics) { + metrics := &distMetrics{} + q := newRepairQueue( + 5*time.Second, // interval far enough out that tick doesn't fire mid-test + batchSize, + func() DistTransport { return transport }, + metrics, + slog.New(slog.DiscardHandler), + ) + + return q, metrics +} + +// TestRepairQueue_CoalesceByPeerKey pins the central contract: +// two enqueues for the same (peer, key) collapse to one entry +// (highest version wins), and ReadRepairCoalesced bumps for +// every collapsed duplicate. +func TestRepairQueue_CoalesceByPeerKey(t *testing.T) { + t.Parallel() + + transport := &captureTransport{} + q, metrics := newQueueForTest(transport, 100) + + peer := cluster.NodeID("peer-1") + + // First enqueue: version 1. + q.enqueue(context.Background(), peer, &cache.Item{Key: "k1", Version: 1, Origin: "A"}) + + // Second enqueue: same key, higher version → replaces, coalesce++. + q.enqueue(context.Background(), peer, &cache.Item{Key: "k1", Version: 2, Origin: "A"}) + + // Third enqueue: same key, LOWER version than current → dropped, coalesce++. + q.enqueue(context.Background(), peer, &cache.Item{Key: "k1", Version: 1, Origin: "A"}) + + // Flush and inspect what actually went on the wire. + q.flushAll(context.Background()) + + if got := transport.callCount(); got != 1 { + t.Errorf("ForwardSet calls: got %d, want 1 (three enqueues should coalesce to one)", got) + } + + if got := metrics.readRepairCoalesced.Load(); got != 2 { + t.Errorf("readRepairCoalesced: got %d, want 2 (two duplicates collapsed)", got) + } + + if got := transport.calls[0].version; got != 2 { + t.Errorf("dispatched version: got %d, want 2 (highest of {1,2,1})", got) + } +} + +// TestRepairQueue_DistinctPeersAreIndependent pins that the queue +// keys correctly: same key to different peers are NOT coalesced. +func TestRepairQueue_DistinctPeersAreIndependent(t *testing.T) { + t.Parallel() + + transport := &captureTransport{} + q, metrics := newQueueForTest(transport, 100) + + q.enqueue(context.Background(), cluster.NodeID("peer-A"), &cache.Item{Key: "k1", Version: 1, Origin: "X"}) + q.enqueue(context.Background(), cluster.NodeID("peer-B"), &cache.Item{Key: "k1", Version: 1, Origin: "X"}) + + q.flushAll(context.Background()) + + if got := transport.callCount(); got != 2 { + t.Errorf("ForwardSet calls: got %d, want 2 (different peers, no coalesce)", got) + } + + if got := metrics.readRepairCoalesced.Load(); got != 0 { + t.Errorf("readRepairCoalesced: got %d, want 0", got) + } +} + +// TestRepairQueue_FlushPeerRunsParallel pins the per-peer +// parallelism contract. Each peer's flush dispatches its entries +// in parallel via errgroup. We measure wall-clock against a stub +// transport that sleeps per-call; sequential dispatch would take +// N×delay, parallel takes ~delay. +func TestRepairQueue_FlushPeerRunsParallel(t *testing.T) { + t.Parallel() + + const ( + entries = 6 + perCallLatency = 50 * time.Millisecond + ) + + transport := &captureTransport{flushDelay: perCallLatency} + q, _ := newQueueForTest(transport, 100) + + peer := cluster.NodeID("peer-1") + for i := range entries { + q.enqueue(context.Background(), peer, &cache.Item{Key: keyfmt(i), Version: 1, Origin: "A"}) + } + + start := time.Now() + + q.flushPeer(context.Background(), peer) + + elapsed := time.Since(start) + + // Sequential would take ≥ entries × delay = 300ms. + // Parallel via errgroup completes in ~delay (plus scheduling slack). + maxParallel := 3 * perCallLatency // generous ceiling for CI scheduler noise + if elapsed > maxParallel { + t.Errorf("flushPeer wall-clock = %v, want < %v (per-peer dispatch should be parallel)", + elapsed, maxParallel) + } + + if got := transport.callCount(); got != entries { + t.Errorf("dispatched: got %d, want %d", got, entries) + } +} + +// TestRepairQueue_NilTransportIsNoop documents the defensive path: +// if the transport closure returns nil mid-flush (e.g. Stop +// landed between two flushes), the queue drops the entries without +// panicking. Merkle anti-entropy is the safety net for any repair +// that doesn't make it to the wire. +func TestRepairQueue_NilTransportIsNoop(t *testing.T) { + t.Parallel() + + metrics := &distMetrics{} + q := newRepairQueue( + 5*time.Second, 100, + func() DistTransport { return nil }, + metrics, + slog.New(slog.DiscardHandler), + ) + + q.enqueue(context.Background(), cluster.NodeID("peer-1"), &cache.Item{Key: "k1", Version: 1}) + + // Should not panic. + q.flushAll(context.Background()) + + // Queue is drained even when transport is nil — the entries + // are dropped, not retained, because the queue can't know + // when transport will return non-nil again. + q.mu.Lock() + + remaining := len(q.entries) + q.mu.Unlock() + + if remaining != 0 { + t.Errorf("entries remaining after nil-transport flush: got %d, want 0", remaining) + } +} + +// TestRepairQueue_StopDrainsPending pins the clean-shutdown +// contract: stop() doesn't return until pending entries have been +// flushed. The flusher goroutine's final flushAll runs on the +// stopCh path. +func TestRepairQueue_StopDrainsPending(t *testing.T) { + t.Parallel() + + transport := &captureTransport{} + + metrics := &distMetrics{} + q := newRepairQueue( + 10*time.Second, // long interval — only stop() drives the final flush + 100, + func() DistTransport { return transport }, + metrics, + slog.New(slog.DiscardHandler), + ) + + q.start(context.Background()) + + q.enqueue(context.Background(), cluster.NodeID("peer-1"), &cache.Item{Key: "k1", Version: 1, Origin: "A"}) + q.enqueue(context.Background(), cluster.NodeID("peer-2"), &cache.Item{Key: "k2", Version: 1, Origin: "A"}) + + q.stop() + + if got := transport.callCount(); got != 2 { + t.Errorf("ForwardSet calls after stop: got %d, want 2 (stop must drain)", got) + } +} + +// TestRepairQueue_SizeThresholdFlush pins that hitting +// maxBatchSize triggers an inline flush of that peer's entries +// without waiting for the interval tick. +func TestRepairQueue_SizeThresholdFlush(t *testing.T) { + t.Parallel() + + transport := &captureTransport{} + q, _ := newQueueForTest(transport, 3) // tiny batch — easy to trip + + peer := cluster.NodeID("peer-1") + + q.enqueue(context.Background(), peer, &cache.Item{Key: "k1", Version: 1, Origin: "A"}) + q.enqueue(context.Background(), peer, &cache.Item{Key: "k2", Version: 1, Origin: "A"}) + + if transport.callCount() != 0 { + t.Fatalf("flush fired before threshold: %d calls", transport.callCount()) + } + + // Third enqueue trips threshold (entries map hits maxBatchSize=3). + // The flush runs in a background goroutine; poll for completion. + q.enqueue(context.Background(), peer, &cache.Item{Key: "k3", Version: 1, Origin: "A"}) + + deadline := time.Now().Add(2 * time.Second) + for time.Now().Before(deadline) { + if transport.callCount() == 3 { + break + } + + time.Sleep(5 * time.Millisecond) + } + + if got := transport.callCount(); got != 3 { + t.Errorf("ForwardSet calls: got %d, want 3 (size-threshold should have triggered flush)", got) + } +} + +// TestIsHigherVersion pins the chosen-version tie-break rule +// the coalescer uses. Mirrors repairLocalReplica's logic in +// dist_memory.go so the local and queued paths agree on which +// item wins. +func TestIsHigherVersion(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + candidate cache.Item + existing cache.Item + want bool + }{ + { + "higher version wins", + cache.Item{Version: 2, Origin: "A"}, + cache.Item{Version: 1, Origin: "A"}, + true, + }, + { + "lower version loses", + cache.Item{Version: 1, Origin: "A"}, + cache.Item{Version: 2, Origin: "A"}, + false, + }, + { + "same version, lower origin wins", + cache.Item{Version: 1, Origin: "A"}, + cache.Item{Version: 1, Origin: "B"}, + true, + }, + { + "same version, higher origin loses", + cache.Item{Version: 1, Origin: "B"}, + cache.Item{Version: 1, Origin: "A"}, + false, + }, + { + "same version, same origin loses (no replace)", + cache.Item{Version: 1, Origin: "A"}, + cache.Item{Version: 1, Origin: "A"}, + false, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + + candidate := tc.candidate + existing := tc.existing + + if got := isHigherVersion(&candidate, &existing); got != tc.want { + t.Errorf("isHigherVersion(%+v, %+v) = %v, want %v", + candidate, existing, got, tc.want) + } + }) + } +} + +// TestRepairQueue_ConcurrentEnqueueIsRaceFree drives many +// goroutines through enqueue + concurrent flushes; race detector +// catches any unsynchronized access. Failure manifests as a -race +// trip, not a content assertion. +func TestRepairQueue_ConcurrentEnqueueIsRaceFree(t *testing.T) { + t.Parallel() + + transport := &captureTransport{} + q, _ := newQueueForTest(transport, 1000) + + const ( + goroutines = 8 + enqueuesPerWorker = 50 + ) + + var wg sync.WaitGroup + + for g := range goroutines { + gid := g + + wg.Go(func() { + peer := cluster.NodeID("peer-" + keyfmt(gid%3)) + for i := range enqueuesPerWorker { + q.enqueue(context.Background(), peer, &cache.Item{ + Key: keyfmt(i), + Version: uint64(i + 1), + Origin: "A", + }) + } + }) + } + + wg.Wait() + q.flushAll(context.Background()) + + // Sanity: at least one dispatch happened. + if transport.callCount() == 0 { + t.Errorf("no ForwardSet calls after concurrent enqueue (race or fixture bug)") + } +} + +// keyfmt is a tiny formatter helper so the tests aren't full of +// strconv.Itoa noise. Public-shaped so it's reusable from sibling +// tests in this package. +func keyfmt(i int) string { + return "k" + itoa(i) +} + +func itoa(i int) string { + if i == 0 { + return "0" + } + + var buf [20]byte + + pos := len(buf) + for i > 0 { + pos-- + + buf[pos] = byte('0' + i%10) + + i /= 10 + } + + return string(buf[pos:]) +} + +// Compile-time guard: captureTransport must satisfy DistTransport. +var _ DistTransport = (*captureTransport)(nil) + +// Compile-time check on metrics shape — if the field is removed +// the test file fails to compile, surfacing the breakage at +// `go build` rather than a runtime nil deref. +var _ = func() { + var m distMetrics + + _ = m.readRepairBatched.Load() + _ = m.readRepairCoalesced.Load() + _ = atomic.Int64{} +} diff --git a/tests/hypercache_distmemory_forward_primary_promotion_test.go b/tests/hypercache_distmemory_forward_primary_promotion_test.go new file mode 100644 index 0000000..93d92af --- /dev/null +++ b/tests/hypercache_distmemory_forward_primary_promotion_test.go @@ -0,0 +1,74 @@ +package tests + +import ( + "context" + "testing" + + "github.com/hyp3rd/hypercache/internal/cluster" + "github.com/hyp3rd/hypercache/pkg/backend" + cache "github.com/hyp3rd/hypercache/pkg/cache/v2" +) + +// TestDistSet_PromotesOnGenericForwardError pins the resilience +// contract that `handleForwardPrimary` must promote to a replica +// owner on ANY non-nil forward error, not just the in-process +// transport's `ErrBackendNotFound` sentinel. Production HTTP/gRPC +// transports against a stopped container surface `net.OpError`, +// `io.EOF`, or `context.DeadlineExceeded` — none of which match +// the pre-fix promotion condition. +// +// Pre-fix behavior on this test: Set returns ErrChaosDrop (the +// forward fails, the default switch arm returns the error +// verbatim). Post-fix: Set returns nil and the key lands on the +// promoting node's local shard. +// +// Uses chaos hooks at DropRate=1.0 to deterministically force every +// outbound forward call to return ErrChaosDrop. Combined with +// ConsistencyOne writes so the local primary path satisfies quorum +// without needing any successful replica fan-out (chaos breaks the +// replica calls too — that's covered by TestDistChaos_* +// in tests/integration/, not what this test pins). +func TestDistSet_PromotesOnGenericForwardError(t *testing.T) { + t.Parallel() + + chaos := backend.NewChaos() + + // 3 nodes, RF=3, ConsistencyOne. Chaos is wired onto every + // node's transport; we only ever Set from one node, so only + // that node's forwards exercise the promotion path. + dc := SetupInProcessClusterRF( + t, 3, 3, + backend.WithDistChaos(chaos), + backend.WithDistWriteConsistency(backend.ConsistencyOne), + ) + + a := dc.Nodes[0] + b := dc.Nodes[1] + c := dc.Nodes[2] + + // Pick a key whose primary is `b` and where `a` is a replica + // owner. From `a`, the Set will forward to `b`; chaos drops + // that call; promotion should fire because `a` is in + // owners[1:]. + desired := []cluster.NodeID{b.LocalNodeID(), a.LocalNodeID(), c.LocalNodeID()} + + key, ok := FindOwnerKey(a, "promote-on-net-err-", desired, 5000) + if !ok { + t.Fatalf("could not find key with owner ordering [B, A, C]") + } + + chaos.SetDropRate(1.0) + + err := a.Set(context.Background(), &cache.Item{Key: key, Value: "v1"}) + if err != nil { + t.Fatalf("Set: got %v, want nil (promotion should have succeeded)", err) + } + + if !a.LocalContains(key) { + t.Errorf("LocalContains(%q) on promoting node: got false, want true", key) + } + + if chaos.Drops() == 0 { + t.Errorf("chaos.Drops: got 0, want > 0 (chaos didn't see the forward attempt)") + } +} diff --git a/tests/hypercache_distmemory_readrepair_batch_test.go b/tests/hypercache_distmemory_readrepair_batch_test.go new file mode 100644 index 0000000..a1d20fb --- /dev/null +++ b/tests/hypercache_distmemory_readrepair_batch_test.go @@ -0,0 +1,234 @@ +package tests + +import ( + "context" + "testing" + "time" + + "github.com/hyp3rd/hypercache/pkg/backend" + cache "github.com/hyp3rd/hypercache/pkg/cache/v2" +) + +// testRepairItemValue is the placeholder payload these read-repair +// fixtures store. Extracted so the package-level goconst rule has +// one shared symbol instead of the literal repeating across the +// readrepair tests in this directory. +const testRepairItemValue = "val" + +// TestDistReadRepair_BatchDispatchesViaQueue is the end-to-end +// integration smoke for WithDistReadRepairBatch. It mirrors +// TestDistMemoryReadRepair's shape (set on primary, drop on +// replica, Get from replica side) but uses the batched path — +// proving the queue → flusher → ForwardSet chain actually heals +// the replica without the Get path doing the wire call inline. +// +// Polls for completion within a deadline rather than asserting +// synchronously: the batched path is async by design, so the +// "replica healed" condition is observable only AFTER the flush +// window. The poll cadence + deadline are loose enough to absorb +// CI scheduling jitter; the assertion is "convergence happens", +// not "convergence within an exact wall-clock budget". +func TestDistReadRepair_BatchDispatchesViaQueue(t *testing.T) { + t.Parallel() + + // 3 nodes / RF=3 / ConsistencyQuorum: needed=2 acks. With one + // replica's local copy dropped, the OTHER two owners still + // quorum the read — and `repairReplicas` fans the chosen value + // back across all owners via `repairRemoteReplica`, which is + // where the queue path lives. ConsistencyOne never visits this + // path (it self-heals locally and returns), so the queue would + // never see an enqueue at all under the default. + dc := SetupInProcessClusterRF( + t, 3, 3, + backend.WithDistReadConsistency(backend.ConsistencyQuorum), + backend.WithDistReadRepairBatch(50*time.Millisecond, 16), + ) + + const key = "rr-batch-key" + + owners := dc.Ring.Lookup(key) + if len(owners) < 3 { + t.Fatalf("expected 3 owners with RF=3 setup, got %d", len(owners)) + } + + primary := dc.ByID(owners[0]) + dropped := dc.ByID(owners[1]) + reader := dc.ByID(owners[2]) + + err := primary.Set(context.Background(), &cache.Item{Key: key, Value: testRepairItemValue}) + if err != nil { + t.Fatalf("set: %v", err) + } + + dropped.DebugDropLocal(key) + + if dropped.LocalContains(key) { + t.Fatalf("dropped node still has key after drop") + } + + // Get from a third owner (not the dropped one). Quorum is met + // by primary+reader. repairReplicas then enqueues a repair for + // the dropped node via the queue — the flush window heals it. + if _, ok := reader.Get(context.Background(), key); !ok { + t.Fatalf("get returned not-found") + } + + // Poll for replica healing — bounded to 2s so a genuine + // failure surfaces as a test fail, not a hang. + deadline := time.Now().Add(2 * time.Second) + for time.Now().Before(deadline) { + if dropped.LocalContains(key) && reader.Metrics().ReadRepairBatched > 0 { + break + } + + time.Sleep(10 * time.Millisecond) + } + + if !dropped.LocalContains(key) { + t.Errorf("dropped node did not heal within deadline (batched flush should have fired)") + } + + if got := reader.Metrics().ReadRepairBatched; got == 0 { + t.Errorf("ReadRepairBatched: got 0, want > 0 (queue flush didn't dispatch)") + } +} + +// TestDistReadRepair_BatchCoalescesParallelReads pins the +// amortisation win: many concurrent Gets of the same stale key +// produce ONE repair through the queue, not N. The coalescer +// keys on (peer, key) and drops same-version duplicates; +// ReadRepairCoalesced bumps for each collapsed duplicate. +func TestDistReadRepair_BatchCoalescesParallelReads(t *testing.T) { + t.Parallel() + + // Same 3/3/Quorum shape as BatchDispatchesViaQueue — see that + // test for the reasoning. Wider flush window so all N concurrent + // Gets land in the same batch. + dc := SetupInProcessClusterRF( + t, 3, 3, + backend.WithDistReadConsistency(backend.ConsistencyQuorum), + backend.WithDistReadRepairBatch(200*time.Millisecond, 64), + ) + + const ( + key = "rr-coalesce-key" + concurrentGets = 10 + ) + + owners := dc.Ring.Lookup(key) + if len(owners) < 3 { + t.Fatalf("expected 3 owners") + } + + primary := dc.ByID(owners[0]) + dropped := dc.ByID(owners[1]) + reader := dc.ByID(owners[2]) + + err := primary.Set(context.Background(), &cache.Item{Key: key, Value: testRepairItemValue}) + if err != nil { + t.Fatalf("set: %v", err) + } + + dropped.DebugDropLocal(key) + + // Drive N concurrent Gets from `reader` — every one calls + // repairReplicas which enqueues repairs for (primary,key) and + // (dropped,key). The coalescer collapses duplicates for the same + // (peer,key); each subsequent enqueue past the first per (peer, + // key) bumps ReadRepairCoalesced. + done := make(chan struct{}, concurrentGets) + for range concurrentGets { + go func() { + _, _ = reader.Get(context.Background(), key) + + done <- struct{}{} + }() + } + + for range concurrentGets { + <-done + } + + // Wait for the flush window so the dispatch counter settles. + deadline := time.Now().Add(1 * time.Second) + for time.Now().Before(deadline) { + if reader.Metrics().ReadRepairBatched > 0 { + break + } + + time.Sleep(10 * time.Millisecond) + } + + coalesced := reader.Metrics().ReadRepairCoalesced + if coalesced == 0 { + t.Errorf("ReadRepairCoalesced: got 0, want > 0 (concurrent same-key reads should coalesce)") + } + + // Sanity bound: at most one entry per (peer, key) survives the + // coalesce. With 2 remote owners (primary + dropped) we expect + // at most 2 dispatches for this key, regardless of how many Gets + // piled in. Anything > 2 indicates a coalesce bug. + if got := reader.Metrics().ReadRepairBatched; got > 2 { + t.Errorf("ReadRepairBatched: got %d, want ≤ 2 (coalesce should have collapsed %d Gets)", + got, concurrentGets) + } +} + +// TestDistReadRepair_StopDrainsBatchedQueue pins that a clean +// Stop() drains in-flight repairs before returning. Without the +// drain, queued repairs would be lost on shutdown. +// +// Uses a long flush interval so the only path that fires the +// final flush is Stop's drain — if the drain is missing or +// broken, the assertion fails because the replica never heals. +func TestDistReadRepair_StopDrainsBatchedQueue(t *testing.T) { + t.Parallel() + + // 3/3/Quorum so the read can hit quorum with one node's local + // copy dropped (see BatchDispatchesViaQueue for the full + // reasoning). Long flush interval so the only path that fires + // the final flush is Stop's drain. + dc := SetupInProcessClusterRF( + t, 3, 3, + backend.WithDistReadConsistency(backend.ConsistencyQuorum), + backend.WithDistReadRepairBatch(10*time.Second, 64), // ticker won't fire during the test + ) + + const key = "rr-drain-key" + + owners := dc.Ring.Lookup(key) + if len(owners) < 3 { + t.Fatalf("expected 3 owners") + } + + primary := dc.ByID(owners[0]) + dropped := dc.ByID(owners[1]) + reader := dc.ByID(owners[2]) + + err := primary.Set(context.Background(), &cache.Item{Key: key, Value: testRepairItemValue}) + if err != nil { + t.Fatalf("set: %v", err) + } + + dropped.DebugDropLocal(key) + + // Triggers enqueue on `reader`; the ticker won't fire for 10s. + if _, ok := reader.Get(context.Background(), key); !ok { + t.Fatalf("get returned not-found") + } + + // Pre-Stop: the repair is queued but not yet dispatched. + if dropped.LocalContains(key) { + t.Logf("dropped node already healed before Stop (size-threshold flush?); test still valid") + } + + // Stop must drain the queue before returning. The queue lives on + // `reader` (the node that did the Get), so Stop is called there. + _ = reader.Stop(context.Background()) + + // Post-Stop: the dropped node should have the key (the drain + // dispatched the queued ForwardSet). + if !dropped.LocalContains(key) { + t.Errorf("dropped node did not heal after Stop drain — queue drain is missing or broken") + } +} diff --git a/tests/hypercache_distmemory_remove_readrepair_test.go b/tests/hypercache_distmemory_remove_readrepair_test.go index f975928..5f108f7 100644 --- a/tests/hypercache_distmemory_remove_readrepair_test.go +++ b/tests/hypercache_distmemory_remove_readrepair_test.go @@ -20,7 +20,7 @@ func TestDistMemoryRemoveReplication(t *testing.T) { t.Fatalf("no owners") } - item := &cache.Item{Key: key, Value: "val"} + item := &cache.Item{Key: key, Value: testRepairItemValue} err := item.Valid() if err != nil { @@ -72,7 +72,7 @@ func TestDistMemoryReadRepair(t *testing.T) { t.Fatalf("expected >=2 owners with RF=2 setup, got %d", len(owners)) } - item := &cache.Item{Key: key, Value: "val"} + item := &cache.Item{Key: key, Value: testRepairItemValue} err := item.Valid() if err != nil {