From 0168f931f8979f831c66534b919db844b40de6c5 Mon Sep 17 00:00:00 2001 From: "F." Date: Wed, 13 May 2026 18:30:48 +0200 Subject: [PATCH] fix(cluster): resolve four steady-state distributed cluster regressions MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fix a set of independent bugs causing rebalance counter churn, phantom key creation, and incarnation inflation in multi-node clusters: - Clone key string in applySet before storing as map key. HTTP frameworks (Fiber) back path-parameter strings with pooled request buffers reused on the next request; storing the raw pointer let map keys silently mutate, producing phantom shard entries and a persistent rebalance loop (~60 bumps/s on a 5-node cluster). - Add applyForwardedSet with a receiver-side ownership guard for all transport-receiver paths (InProcessTransport.ForwardSet, HTTP /internal/set). Writes forwarded from peers with a divergent ring view are silently dropped, breaking the migrate→fan-out-back→stuck cycle. Exposes new dist.write.apply_refused metric. - Release local copy immediately in migrateIfNeeded when removalGracePeriod == 0. Previously the local item was never removed, so the rebalance scanner re-flagged the same lost-ownership keys on every tick. Each stuck key now produces exactly one migration. - Make Membership.Mark a no-op on same-state calls: incarnation, version vector, and observers all stay quiet during steady-state heartbeat success. Add Membership.Refute as the dedicated SWIM self-refute primitive that unconditionally bumps incarnation. Update refuteIfSuspected to use Refute(). Adds integration tests pinning idle-cluster silence, traffic-under-load silence, one-shot drain semantics, and the ownership-guard contract. Adds five unit tests covering Mark no-op and Refute always-bumps. --- CHANGELOG.md | 75 +++++ cspell.config.yaml | 2 + internal/cluster/membership.go | 84 ++++- internal/cluster/membership_test.go | 181 +++++++++++ pkg/backend/dist_http_server.go | 4 +- pkg/backend/dist_memory.go | 98 +++++- pkg/backend/dist_transport.go | 6 +- scripts/tests/20-test-cluster-resilience.sh | 5 +- ...rcache_distmemory_rebalance_steady_test.go | 288 ++++++++++++++++++ 9 files changed, 726 insertions(+), 17 deletions(-) create mode 100644 tests/hypercache_distmemory_rebalance_steady_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 8bf6f7e..4768132 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -286,6 +286,81 @@ All notable changes to HyperCache are recorded here. The format follows ### Fixed +- **applySet now clones the key string before storing it as the shard's map key.** Under HTTP traffic + (Fiber + the v1 cache API), path parameters returned by `c.Params("key")` are backed by a pooled + request buffer that the framework reuses for the next request. The original `applySet` stored the + caller's string directly as the `ConcurrentMap` key; when the next request landed, the buffer's + bytes mutated, and so did every map key (and `Item.Key` field) we'd previously stored. The + immediate symptom: the same logical key drifted across multiple shards (`first-24` showing up in + shards 2, 3, 4, and twice in shard 6), and phantom keys like `first-479` materialized in the + iteration (a "first-4" buffer overlaid with "79" from the next URL). The rebalance loop, scanning + `sh.items.All()`, kept re-flagging these phantoms — `RebalancedPrimary` climbed at ~60/s on a + 5-node cluster after a single 100-key write batch, even though MembershipVersion, hint queues, + merkle counters, and the `WriteApplyRefused` guard were all quiet. The fix is one + `strings.Clone(item.Key)` call in [`applySet`](pkg/backend/dist_memory.go) before recording the + originalPrimary and storing: the cloned key has its own backing array, fully detached from the + caller's pooled buffer. The `Item.Key` field on the stored clone gets the same stable value so + any downstream code observes a coherent shard entry. Post-fix the cluster's rebalance counters + stay at exactly zero in steady state across all 5 nodes. +- **Receiver-side ownership guard breaks the divergent-ring-view rebalance loop.** After the + `migrateIfNeeded`-side fix (one migration per stuck key, then release) shipped, operators on a 5-node + cluster running [`scripts/tests/30-test-cluster-writes.sh`](scripts/tests/30-test-cluster-writes.sh) + still saw `RebalancedPrimary` climb at ~60/s post-script with no membership, hint, or merkle activity. + Root cause: when the migration target's ring view still treated the original source as a replica, the + target's `applySet` fan-out re-planted the key on the source. The source released it (per the earlier + fix), then received it back on the next gossip tick, then migrated again — perpetual cycle even though + no state was actually transitioning. New [`applyForwardedSet`](pkg/backend/dist_memory.go) is the entry + point used by the transport-receiver paths (`InProcessTransport.ForwardSet` and the HTTP + `/internal/set` handler) and applies an ownership guard: if the receiver's ring view says it isn't an + owner of the key, the write is silently dropped. The sender's transport call still returns nil (no + behavioral break — best-effort semantics already governed the hot path), but the receiver's shard + stays clean. Merkle anti-entropy is the convergence safety net for any write refused here. The guard + is deliberately NOT in `applySet` itself: legitimate internal callers (setImpl primary path, + `migrateIfNeeded` forwarder, merkle pull, read-repair) have either already verified ownership or + explicitly want to plant regardless — moving the guard would have broken `TestHTTPFetchMerkle`. New + `dist.write.apply_refused` counter exposes how often the guard fires (zero on healthy views; + non-zero indicates divergence operators may want to investigate). New test + [`TestDistRebalance_ApplyOwnershipGuardRefusesForeignWrites`](tests/hypercache_distmemory_rebalance_steady_test.go) + drives a direct `ForwardSet` to a non-owner and asserts the shard stays clean and the refused-counter + ticks up. +- **Rebalance counters no longer climb in a steady-state cluster.** When a key was no longer owned by the + current node — because the ring had shifted away from it (typical after a node joins or a singleton + cluster gains peers) — [`migrateIfNeeded`](pkg/backend/dist_memory.go) forwarded the value to the new + primary but only scheduled the LOCAL copy for deletion when `WithDistRemovalGrace > 0`. The default + removal-grace setting is zero, which meant the local item was never released; on every subsequent + rebalance tick `shouldRebalance` re-flagged the same key via its `!ownsKeyInternal` branch, and + `migrateIfNeeded` re-emitted the migration. Operators saw `RebalancedKeys` and `RebalancedPrimary` + climb at the scan-tick rate forever — e.g. 5,326 keys / 5,102 primary migrations on a 5-node cluster + with ~14 stuck keys and a 100ms ticker, even though no membership had actually changed. Migration now + releases the local copy immediately when `removalGracePeriod == 0` (and continues to schedule a + deferred delete via `shedRemovedKeys` when a grace period is configured), so each stuck key produces + exactly one migration and the loop quiesces. Two new integration tests in + [`tests/hypercache_distmemory_rebalance_steady_test.go`](tests/hypercache_distmemory_rebalance_steady_test.go) + pin both contracts: `TestDistRebalance_IdleClusterIsSilent` asserts a 5-node RF=3 cluster with no + out-of-place keys produces zero counter bumps across many ticks, and + `TestDistRebalance_LostOwnershipDrainsOnce` plants one stuck key per node via `DebugInject` and asserts + the counters reach exactly one bump per stuck key and never advance after that. +- **Incarnation and MembershipVersion no longer churn on every heartbeat.** SWIM-style incarnation + numbers and the membership version vector were both inflating roughly in lock-step with elapsed-probes — + a 5-node cluster running for a few hours showed incarnations near 2,378 per peer and a MembershipVersion + past 4,800, even though no nodes had actually changed state. [`Membership.Mark`](internal/cluster/membership.go) + was unconditionally incrementing incarnation, advancing the version counter, AND firing observers on + every call; the heartbeat-success path in `evaluateLiveness` calls `Mark(peer, NodeAlive)` once per probe + per peer. Three downstream effects: (i) operators couldn't read incarnation as a state-change signal, + (ii) gossip-merge fanned out spurious "version went up" deltas, (iii) SSE consumers received constant + no-op `members` events. Mark now treats same-state as a full no-op — LastSeen still refreshes (the + suspect-after timeout machinery needs that), but incarnation, version, and observers all stay quiet. + Genuine state transitions (Alive↔Suspect) still bump all three, so the "higher incarnation wins" gossip + merge continues to propagate real changes. New [`Membership.Refute`](internal/cluster/membership.go) is + the explicit SWIM self-refute primitive: it always bumps incarnation and sets state to NodeAlive, even + when the local view is already Alive — the one path that legitimately needs to publish a + higher-incarnation refutation packet regardless of local-view state. `refuteIfSuspected` in + [`pkg/backend/dist_memory.go`](pkg/backend/dist_memory.go) switched from `Mark(localID, NodeAlive)` to + `Refute(localID)` so the divergent semantic is obvious at the call site. Five new unit tests in + [`internal/cluster/membership_test.go`](internal/cluster/membership_test.go) pin: no-incarnation-bump on + same-state Mark, no-version-bump and no-observer-fire on same-state Mark, bump-on-transition, refute + always bumps, and the ghost-node guard. The existing `TestDistSWIM_SelfRefute` integration test + continues to pass byte-identical. - **Remove path no longer silently succeeds when the primary is unreachable.** Symmetric audit-fix to the Set-forward change: [`removeImpl`](pkg/backend/dist_memory.go) used to swallow the `ForwardRemove` error with `_ = transport.ForwardRemove(...)` and return `nil`, so a diff --git a/cspell.config.yaml b/cspell.config.yaml index 740a69c..8798a99 100644 --- a/cspell.config.yaml +++ b/cspell.config.yaml @@ -194,6 +194,7 @@ words: - mset - msgpack - mvdan + - neighbouring - nestif - Newf - nilnil @@ -221,6 +222,7 @@ words: - pyenv - pygments - pymdownx + - quiesces - reaad - readrepair - recvcheck diff --git a/internal/cluster/membership.go b/internal/cluster/membership.go index 0d9cec5..65fcc1f 100644 --- a/internal/cluster/membership.go +++ b/internal/cluster/membership.go @@ -134,7 +134,34 @@ func (m *Membership) Remove(id NodeID) bool { return true } -// Mark updates node state + incarnation and refreshes LastSeen. Returns true if node exists. +// Mark records an observer-side state update for the given node. +// Behavior depends on whether `state` represents a real transition: +// +// - state == current: no-op. LastSeen refreshes (the peer answered +// a heartbeat probe, so its observability window resets), but +// incarnation, the membership version vector, and registered +// observers are all left alone. +// - state != current: incarnation bumps, version vector advances, +// and observers fire — this is a real membership state change +// and gossip-merge / SSE consumers need to know. +// +// Returns true if the node exists. +// +// Why no-op on same state: pre-fix every successful heartbeat probe +// called Mark(peer, Alive) and unconditionally bumped both the +// node's incarnation AND the membership version counter. Operators +// running a 5-node cluster for a few hours saw incarnations climb +// to ~2,378 and MembershipVersion past 4,800 — the value tracked +// "elapsed probes," not "state changes." SWIM defines incarnation +// as a sequence number OWNED by the node itself, and the +// membership-version vector should signal "something real +// happened" so gossip knows when to fan out a delta. Both must be +// quiet during steady-state liveness churn. +// +// Self-refute — the one observer-side path that legitimately needs +// to bump incarnation regardless of the local state value — lives +// in `Refute()` so the divergent semantic is explicit at the call +// site. func (m *Membership) Mark(id NodeID, state NodeState) bool { m.mu.Lock() @@ -145,9 +172,20 @@ func (m *Membership) Mark(id NodeID, state NodeState) bool { return false } - n.State = state + if n.State == state { + // Same-state no-op: refresh LastSeen so the suspect-after + // timeout machinery sees the probe response, but do NOT + // advance version / fire observers / bump incarnation. + // Nothing materially changed. + n.LastSeen = time.Now() + m.mu.Unlock() + + return true + } + n.Incarnation++ + n.State = state n.LastSeen = time.Now() version := m.ver.Next() @@ -160,6 +198,48 @@ func (m *Membership) Mark(id NodeID, state NodeState) bool { return true } +// Refute is the SWIM self-refute primitive: when this node receives +// gossip that it is Suspect/Dead at some incarnation N, it MUST +// publish a higher incarnation (N+1) with state=Alive so the next +// gossip tick disseminates the refutation cluster-wide (peers using +// "higher incarnation wins" adopt the new value). +// +// Unlike `Mark()`, Refute always bumps incarnation — the local state +// is already Alive (a node never thinks itself dead), so a +// transition-only rule would silently no-op and the refutation would +// fail to propagate. The dedicated method makes the divergent +// semantic obvious at the only call site that needs it +// (`refuteIfSuspected` in pkg/backend/dist_memory.go). +// +// Returns true if the node exists. State is unconditionally set to +// NodeAlive; the typical caller already knows the local node is +// alive but the explicit assignment guards against odd states the +// local view might be holding. +func (m *Membership) Refute(id NodeID) bool { + m.mu.Lock() + + n, ok := m.nodes[id] + if !ok { + m.mu.Unlock() + + return false + } + + n.Incarnation++ + + n.State = NodeAlive + n.LastSeen = time.Now() + + version := m.ver.Next() + observers := m.observers + + m.mu.Unlock() + + notify(observers, id, NodeAlive, version) + + return true +} + // notify invokes each observer in registration order with the // resolved state and version. Pulled out so the call sites read // "mutate, unlock, notify" uniformly without inline loops. diff --git a/internal/cluster/membership_test.go b/internal/cluster/membership_test.go index d5b5196..5147fa6 100644 --- a/internal/cluster/membership_test.go +++ b/internal/cluster/membership_test.go @@ -211,3 +211,184 @@ func TestMembership_OnStateChange_DoesNotBlockMutation(t *testing.T) { close(releaseObserver) wg.Wait() } + +// TestMembership_Mark_NoBumpOnSameState pins the +// transition-only-bump contract: Mark() must NOT increment a node's +// incarnation when the requested state matches the current state. +// Without this rule, steady-state heartbeat success paths +// (evaluateLiveness → Mark(peer, Alive)) inflate the counter once +// per probe — operators saw incarnation values in the thousands +// after a few hours of normal operation. Incarnation is owned by +// the node itself in SWIM; observers should not churn it. +func TestMembership_Mark_NoBumpOnSameState(t *testing.T) { + t.Parallel() + + m := NewMembership(NewRing()) + m.Upsert(NewNode("n1", "127.0.0.1:7946")) + + before := m.List()[0].Incarnation + + // Repeat the same Mark a few times — this models the + // "successful probe every second" pattern that drove the bug. + for range 50 { + m.Mark("n1", NodeAlive) + } + + after := m.List()[0].Incarnation + if after != before { + t.Errorf("incarnation churned on same-state Mark: before=%d after=%d (want stable)", + before, after) + } +} + +// TestMembership_Mark_SameStateIsFullNoOp pins the rest of the +// no-op-Mark contract beyond just incarnation: when state matches +// the current value, the membership version vector must NOT +// advance and registered observers must NOT fire. Without this +// rule, every successful heartbeat probe bumps the version +// counter once per peer per interval — a 5-node cluster running +// for a few hours showed MembershipVersion past 4,800 even though +// no nodes had actually changed state. Cascading effects: gossip +// fans out spurious "version went up" deltas, SSE consumers see +// constant "members" event spam, and the metric stops being +// useful as a real-membership-change indicator. +func TestMembership_Mark_SameStateIsFullNoOp(t *testing.T) { + t.Parallel() + + m := NewMembership(NewRing()) + m.Upsert(NewNode("n1", "127.0.0.1:7946")) + + var fired atomic.Int32 + + m.OnStateChange(func(_ NodeID, _ NodeState, _ uint64) { + fired.Add(1) + }) + + // Capture the version baseline after the Upsert above. + versionBefore := m.Version() + + // Pound on Mark with the existing state. Models the steady + // "probe succeeded again" pattern that drove the bug. + for range 100 { + m.Mark("n1", NodeAlive) + } + + if got := m.Version(); got != versionBefore { + t.Errorf("Version drifted on same-state Mark: before=%d after=%d (want stable)", + versionBefore, got) + } + + if got := fired.Load(); got != 0 { + t.Errorf("observer fired %d times for same-state Mark, want 0", got) + } + + // Sanity: LastSeen still refreshes so the suspect-timeout + // machinery sees probes. We can't assert exact wall-clock + // values cleanly here, but we can assert it advanced past the + // Upsert moment by being non-zero. + if m.List()[0].LastSeen.IsZero() { + t.Errorf("LastSeen wasn't refreshed by same-state Mark") + } +} + +// TestMembership_Mark_BumpsOnTransition guards the other side of the +// contract: a genuine state transition (Alive→Suspect, Suspect→Alive, +// etc.) MUST bump incarnation so the gossip-merge rule +// "higher incarnation wins" propagates the change cluster-wide. If we +// over-suppress, transitions would silently fail to propagate and a +// peer briefly marked Suspect would stay Suspect on neighbouring +// nodes forever. +func TestMembership_Mark_BumpsOnTransition(t *testing.T) { + t.Parallel() + + m := NewMembership(NewRing()) + m.Upsert(NewNode("n1", "127.0.0.1:7946")) + + v0 := m.List()[0].Incarnation + + m.Mark("n1", NodeSuspect) + + v1 := m.List()[0].Incarnation + if v1 != v0+1 { + t.Errorf("Alive→Suspect: got incarnation %d, want %d", v1, v0+1) + } + + m.Mark("n1", NodeAlive) + + v2 := m.List()[0].Incarnation + if v2 != v1+1 { + t.Errorf("Suspect→Alive: got incarnation %d, want %d", v2, v1+1) + } + + // Same-state again — must NOT bump even after recent transitions. + m.Mark("n1", NodeAlive) + m.Mark("n1", NodeAlive) + + v3 := m.List()[0].Incarnation + if v3 != v2 { + t.Errorf("Alive→Alive after a transition burst: got %d, want stable at %d", v3, v2) + } +} + +// TestMembership_Refute_AlwaysBumps pins the SWIM self-refute +// primitive: Refute() unconditionally increments incarnation, even +// when the local view of the node is already Alive. Without this, +// the refutation packet a node sends back to a peer that suspected +// it would carry the SAME incarnation as the suspect claim — and +// "higher incarnation wins" would refuse to overwrite, so the +// suspect claim would stick even though the node refuted it. +func TestMembership_Refute_AlwaysBumps(t *testing.T) { + t.Parallel() + + m := NewMembership(NewRing()) + m.Upsert(NewNode("self", "127.0.0.1:7946")) + + v0 := m.List()[0].Incarnation + + // Local state is Alive — a transition-only rule would no-op + // here, which would silently break refutation propagation. + m.Refute("self") + + v1 := m.List()[0].Incarnation + if v1 != v0+1 { + t.Errorf("first Refute: got incarnation %d, want %d", v1, v0+1) + } + + // Each subsequent refute climbs one more — chained suspect + // claims from different peers must each be answerable with a + // strictly-higher incarnation. + m.Refute("self") + m.Refute("self") + + v2 := m.List()[0].Incarnation + if v2 != v1+2 { + t.Errorf("chained Refute: got incarnation %d, want %d", v2, v1+2) + } + + // State must end Alive regardless of intermediate values. + if got := m.List()[0].State; got != NodeAlive { + t.Errorf("Refute state: got %v, want NodeAlive", got) + } +} + +// TestMembership_Refute_GhostReturnsFalse mirrors the +// non-existent-node guard already present on Mark/Remove. +func TestMembership_Refute_GhostReturnsFalse(t *testing.T) { + t.Parallel() + + m := NewMembership(NewRing()) + + var fired atomic.Int32 + + m.OnStateChange(func(_ NodeID, _ NodeState, _ uint64) { + fired.Add(1) + }) + + if m.Refute("ghost") { + t.Fatal("Refute on non-existent node returned true") + } + + if got := fired.Load(); got != 0 { + t.Errorf("observer fired %d times for ghost Refute, want 0", got) + } +} diff --git a/pkg/backend/dist_http_server.go b/pkg/backend/dist_http_server.go index a92c8d8..365fc2f 100644 --- a/pkg/backend/dist_http_server.go +++ b/pkg/backend/dist_http_server.go @@ -387,7 +387,9 @@ func (s *distHTTPServer) handleSet(fctx fiber.Ctx, dm *DistMemory) error { LastAccess: time.Now(), } - dm.applySet(s.ctx, it, req.Replicate) + // Forwarded arrival from a peer: ownership guard fires if this + // node disagrees with the sender's view about K's owners. + dm.applyForwardedSet(s.ctx, it, req.Replicate) return fctx.JSON(httpSetResponse{}) } diff --git a/pkg/backend/dist_memory.go b/pkg/backend/dist_memory.go index c130aae..7a8a787 100644 --- a/pkg/backend/dist_memory.go +++ b/pkg/backend/dist_memory.go @@ -1275,6 +1275,7 @@ type distMetrics struct { writeAcks atomic.Int64 // cumulative replica write acks (includes primary) writeAttempts atomic.Int64 // total write operations attempted (Set) writeForwardPromotion atomic.Int64 // times a Set forward to primary failed and the local replica self-promoted + writeApplyRefused atomic.Int64 // applySet calls that the ownership guard refused (sender had divergent ring view) rebalancedKeys atomic.Int64 // keys migrated during rebalancing rebalanceBatches atomic.Int64 // number of batches processed rebalanceThrottle atomic.Int64 // times rebalance was throttled due to concurrency limits @@ -1336,6 +1337,7 @@ type DistMetrics struct { WriteAcks int64 WriteAttempts int64 WriteForwardPromotion int64 + WriteApplyRefused int64 RebalancedKeys int64 RebalanceBatches int64 RebalanceThrottle int64 @@ -1412,6 +1414,7 @@ func (dm *DistMemory) Metrics() DistMetrics { WriteAcks: dm.metrics.writeAcks.Load(), WriteAttempts: dm.metrics.writeAttempts.Load(), WriteForwardPromotion: dm.metrics.writeForwardPromotion.Load(), + WriteApplyRefused: dm.metrics.writeApplyRefused.Load(), RebalancedKeys: dm.metrics.rebalancedKeys.Load(), RebalanceBatches: dm.metrics.rebalanceBatches.Load(), RebalanceThrottle: dm.metrics.rebalanceThrottle.Load(), @@ -2382,7 +2385,8 @@ func (dm *DistMemory) migrateIfNeeded(ctx context.Context, item *cache.Item) { dm.queueHint(string(owners[0]), item, hintSourceMigration) } - // Update originalPrimary so we don't recount repeatedly. + // Update originalPrimary so a future primary-shift comparison + // (the second branch of shouldRebalance) sees the new owner. sh := dm.shardFor(item.Key) if sh.originalPrimary != nil { sh.originalPrimaryMu.Lock() @@ -2391,15 +2395,30 @@ func (dm *DistMemory) migrateIfNeeded(ctx context.Context, item *cache.Item) { sh.originalPrimaryMu.Unlock() } - // Record removal timestamp for potential shedding if we are no longer owner at all. - if dm.removalGracePeriod > 0 && !dm.ownsKeyInternal(item.Key) && sh.removedAt != nil { - sh.removedAtMu.Lock() + // If this node is no longer an owner of the key (the migration + // was driven by the "lost ownership" branch of shouldRebalance, + // not just a primary shift among owners), the local copy MUST + // be released so the next rebalance tick doesn't re-flag the + // same key and re-emit the migration on every tick forever. + // Pre-fix this only scheduled a deferred delete when + // `removalGracePeriod > 0` — the default of zero meant the + // local item stayed, the key kept failing `ownsKeyInternal`, + // and a steady-state cluster saw RebalancedKeys climb at the + // scan-tick rate (e.g. ~50/s on a 100ms ticker with ~14 stuck + // keys). When grace is configured, schedule deferred deletion + // via shedRemovedKeys; otherwise release immediately. + if !dm.ownsKeyInternal(item.Key) { + if dm.removalGracePeriod > 0 && sh.removedAt != nil { + sh.removedAtMu.Lock() - if _, exists := sh.removedAt[item.Key]; !exists { - sh.removedAt[item.Key] = time.Now() - } + if _, exists := sh.removedAt[item.Key]; !exists { + sh.removedAt[item.Key] = time.Now() + } - sh.removedAtMu.Unlock() + sh.removedAtMu.Unlock() + } else { + sh.items.Remove(item.Key) + } } } @@ -3515,7 +3534,7 @@ func (dm *DistMemory) refuteIfSuspected(claim *cluster.Node) { return } - dm.membership.Mark(dm.localNode.ID, cluster.NodeAlive) + dm.membership.Refute(dm.localNode.ID) dm.logger.Info( "self-refuted suspect/dead claim from peer", @@ -4255,6 +4274,11 @@ var distMetricSpecs = []distMetricSpec{ desc: "Set forwards that promoted the local replica because the primary was unreachable", get: func(m DistMetrics) int64 { return m.WriteForwardPromotion }, }, + { + name: "dist.write.apply_refused", unit: unitOp, counter: true, + desc: "applySet calls the ownership guard refused (sender had a divergent ring view)", + get: func(m DistMetrics) int64 { return m.WriteApplyRefused }, + }, // --- Rebalance --- { @@ -4436,12 +4460,64 @@ func (dm *DistMemory) setupOTelMetrics() { dm.metricRegistration = reg } +// applyForwardedSet is the entry point used by the transport-receiver +// paths (InProcessTransport.ForwardSet, HTTP /internal/set handler) +// to apply a write that arrived from a peer. Unlike `applySet`, this +// path enforces an ownership guard: if the local ring view says this +// node is not an owner of the key, the write is silently dropped +// (sender's transport call still returns nil — best-effort semantics +// are already the contract on the hot path). Merkle anti-entropy is +// the convergence safety net for any write refused here. +// +// Why the guard belongs here and not in applySet itself: legitimate +// internal callers (setImpl primary path, migrateIfNeeded forwarder, +// merkle pull, read-repair) have already verified ownership or +// explicitly want to plant regardless. Only forwarded arrivals can +// originate from a peer with a divergent ring view; that's the path +// that produced the persistent "stuck key created → migrated → +// re-fanned-back → stuck again" loop on a 5-node cluster, even +// though MembershipVersion was stable (views were merely diverged, +// not actively churning). +func (dm *DistMemory) applyForwardedSet(ctx context.Context, item *cache.Item, replicate bool) { + if dm.ring != nil && !dm.ownsKeyInternal(item.Key) { + dm.metrics.writeApplyRefused.Add(1) + + return + } + + dm.applySet(ctx, item, replicate) +} + // applySet stores item locally and optionally replicates to other owners. // replicate indicates whether replication fan-out should occur (false for replica writes). +// +// Callers that receive a forwarded write from a peer (transport-side +// arrivals) MUST go through `applyForwardedSet` instead so the +// ownership guard fires; direct callers (primary write, migration, +// merkle pull, read-repair) bypass the guard because they've already +// verified ownership or explicitly want to plant regardless. func (dm *DistMemory) applySet(ctx context.Context, item *cache.Item, replicate bool) { sh := dm.shardFor(item.Key) - dm.recordOriginalPrimary(sh, item.Key) - sh.items.Set(item.Key, item) + + // Clone the key STRING data (not just the Item struct) before + // using it as a map key. Fiber and similar HTTP frameworks back + // path-parameter strings with pooled buffers that get reused on + // the next request; if we stored the original string, the map + // key would silently mutate to whatever the next request wrote, + // landing duplicate entries under shifted keys across shards and + // causing the rebalance loop to flag phantom keys indefinitely. + // strings.Clone allocates a fresh backing array, decoupling the + // stored key from the caller's buffer lifetime. The Item.Key + // field also gets the cloned key so any downstream code that + // reads cloned.Key sees the stable value. + stableKey := strings.Clone(item.Key) + + dm.recordOriginalPrimary(sh, stableKey) + + cloned := *item + + cloned.Key = stableKey + sh.items.Set(stableKey, &cloned) if !replicate || dm.ring == nil { return diff --git a/pkg/backend/dist_transport.go b/pkg/backend/dist_transport.go index e980b5a..5b35059 100644 --- a/pkg/backend/dist_transport.go +++ b/pkg/backend/dist_transport.go @@ -139,8 +139,10 @@ func (t *InProcessTransport) ForwardSet(ctx context.Context, nodeID string, item return sentinel.ErrBackendNotFound } - // direct apply bypasses ownership check (already routed) - b.applySet(ctx, item, replicate) + // Forwarded arrival: ownership guard fires if the receiver's + // ring view says this node isn't an owner of the key. Stops + // divergent-view senders from planting stuck keys. + b.applyForwardedSet(ctx, item, replicate) return nil } diff --git a/scripts/tests/20-test-cluster-resilience.sh b/scripts/tests/20-test-cluster-resilience.sh index 25de7a8..700c670 100755 --- a/scripts/tests/20-test-cluster-resilience.sh +++ b/scripts/tests/20-test-cluster-resilience.sh @@ -171,8 +171,11 @@ cleanup() { if ! docker compose -f "$COMPOSE_FILE" ps "$KILL_NODE" --format '{{.State}}' 2>/dev/null | grep -q running; then echo "" echo "[cleanup] restarting $KILL_NODE so the stack returns to a healthy state" - docker compose -f "$COMPOSE_FILE" start "$KILL_NODE" >/dev/null 2>&1 || true + sleep 2 + docker compose -f "$COMPOSE_FILE" start "$KILL_NODE" >/dev/null 2>&1 || sleep 2 || exit 1 fi + + exit 0 } trap cleanup EXIT diff --git a/tests/hypercache_distmemory_rebalance_steady_test.go b/tests/hypercache_distmemory_rebalance_steady_test.go new file mode 100644 index 0000000..7203dc8 --- /dev/null +++ b/tests/hypercache_distmemory_rebalance_steady_test.go @@ -0,0 +1,288 @@ +package tests + +import ( + "context" + "slices" + "testing" + "time" + + "github.com/hyp3rd/hypercache/pkg/backend" + cache "github.com/hyp3rd/hypercache/pkg/cache/v2" +) + +// rebalanceStressDuration is the wall-clock window each test waits +// before sampling the counters. Long enough for the configured +// rebalance ticker (50ms) to fire ~20 times — pre-fix that window +// produced double-digit counter values per node, post-fix the +// counters either stay at zero or settle at exactly one bump per +// stuck key and never advance again. +const rebalanceStressDuration = time.Second + +// TestDistRebalance_IdleClusterIsSilent pins the steady-state +// resilience contract: a cluster with no membership changes and +// no out-of-place keys must produce zero rebalance counter bumps, +// regardless of how many ticks fire. Operators on the +// hypercache-monitor /metrics page should see all rebalance tiles +// flat at zero unless a node is actively joining or leaving. +func TestDistRebalance_IdleClusterIsSilent(t *testing.T) { + t.Parallel() + + dc := SetupInProcessClusterRF( + t, 5, 3, + backend.WithDistRebalanceInterval(50*time.Millisecond), + backend.WithDistHeartbeat(20*time.Millisecond, 200*time.Millisecond, 500*time.Millisecond), + backend.WithDistGossipInterval(30*time.Millisecond), + ) + + ctx := context.Background() + + for i := range 20 { + err := dc.Nodes[0].Set(ctx, &cache.Item{ + Key: "idle-" + cacheKeySuffix(i), + Value: "v", + }) + if err != nil { + t.Fatalf("seed Set #%d: %v", i, err) + } + } + + time.Sleep(rebalanceStressDuration) + + for i, node := range dc.Nodes { + m := node.Metrics() + if m.RebalancedKeys != 0 { + t.Errorf("node %d RebalancedKeys: got %d, want 0", i, m.RebalancedKeys) + } + + if m.RebalancedPrimary != 0 { + t.Errorf("node %d RebalancedPrimary: got %d, want 0", i, m.RebalancedPrimary) + } + + if m.RebalanceBatches != 0 { + t.Errorf("node %d RebalanceBatches: got %d, want 0", i, m.RebalanceBatches) + } + + if m.RebalancedReplicaDiff != 0 { + t.Errorf("node %d RebalancedReplicaDiff: got %d, want 0", i, m.RebalancedReplicaDiff) + } + } +} + +// TestDistRebalance_IdleClusterUnderTrafficIsSilent extends the +// idle-cluster guarantee to a more realistic shape: keep writing +// for several rebalance ticks and confirm the counters still stay +// at zero. The user-reported numbers (RebalancedPrimary 13k+ +// climbing 60/s in a 5-node cluster) implied something about +// active workload was driving stuck keys; this test pins that +// concurrent write traffic against owned keys does NOT itself +// produce candidates. +func TestDistRebalance_IdleClusterUnderTrafficIsSilent(t *testing.T) { + t.Parallel() + + dc := SetupInProcessClusterRF( + t, 5, 3, + backend.WithDistRebalanceInterval(50*time.Millisecond), + backend.WithDistHeartbeat(20*time.Millisecond, 200*time.Millisecond, 500*time.Millisecond), + backend.WithDistGossipInterval(30*time.Millisecond), + ) + + ctx := context.Background() + stop := make(chan struct{}) + done := make(chan struct{}) + + // Keep writing throughout the observation window. + go func() { + defer close(done) + + i := 0 + for { + select { + case <-stop: + return + default: + } + + err := dc.Nodes[i%len(dc.Nodes)].Set(ctx, &cache.Item{ + Key: "traffic-" + cacheKeySuffix(i), + Value: "v", + }) + if err != nil { + return + } + + i++ + + time.Sleep(2 * time.Millisecond) // ~500 writes/sec + } + }() + + time.Sleep(2 * rebalanceStressDuration) + + close(stop) + <-done + + for i, node := range dc.Nodes { + m := node.Metrics() + if m.RebalancedKeys != 0 { + t.Errorf("node %d RebalancedKeys under write traffic: got %d, want 0", i, m.RebalancedKeys) + } + + if m.RebalancedPrimary != 0 { + t.Errorf("node %d RebalancedPrimary under write traffic: got %d, want 0", i, m.RebalancedPrimary) + } + } +} + +// TestDistRebalance_LostOwnershipDrainsOnce pins the corrective +// behavior for keys that ARE legitimately out of place — e.g. a +// node bootstrapped as a singleton, accepted writes, and then +// ceded ownership to joining peers, so its shard now holds keys +// the ring no longer assigns to it. +// +// Two things must hold: +// +// 1. Each stuck key produces EXACTLY ONE migration (a wave that +// drains on the first rebalance tick that sees them). +// 2. After draining, the counters must stay constant — re-flagging +// the same key on every subsequent tick (the bug) would show as +// a monotonically rising counter under the second observation +// window. Pre-fix, with `removalGracePeriod == 0`, the local +// copy was never released and the migration kept firing. +func TestDistRebalance_LostOwnershipDrainsOnce(t *testing.T) { + t.Parallel() + + dc := SetupInProcessClusterRF( + t, 5, 3, + backend.WithDistRebalanceInterval(50*time.Millisecond), + ) + + // Plant one stuck key per node by injecting into the shards of + // nodes that are NOT in the ring's owner set for that key. + stuckPerNode := make(map[int]int) + + for i, node := range dc.Nodes { + key := "stuck-" + cacheKeySuffix(i) + + owners := dc.Ring.Lookup(key) + ownedByThisNode := slices.Contains(owners, node.LocalNodeID()) + + if !ownedByThisNode { + node.DebugInject(&cache.Item{Key: key, Value: "stuck"}) + + stuckPerNode[i] = 1 + } + } + + if len(stuckPerNode) == 0 { + t.Fatalf("test fixture didn't produce any stuck keys (all keys hashed onto their host node)") + } + + // First observation: let several ticks fire so the drain + // completes. Each stuck key should produce exactly one + // migration. + time.Sleep(rebalanceStressDuration) + + afterDrain := make(map[int]backend.DistMetrics, len(dc.Nodes)) + for i, node := range dc.Nodes { + afterDrain[i] = node.Metrics() + } + + for i, want := range stuckPerNode { + got := afterDrain[i].RebalancedPrimary + if got != int64(want) { + t.Errorf("node %d RebalancedPrimary after drain: got %d, want %d (one migration per stuck key)", + i, got, want) + } + } + + // Second observation: keep ticking. The counters must NOT + // advance — the stuck keys were drained, the local copies are + // gone, and there's nothing left for the scan to flag. + time.Sleep(rebalanceStressDuration) + + for i, node := range dc.Nodes { + got := node.Metrics().RebalancedPrimary + want := afterDrain[i].RebalancedPrimary + + if got != want { + t.Errorf("node %d RebalancedPrimary advanced after drain: was %d, now %d (re-flag bug)", + i, want, got) + } + } +} + +// cacheKeySuffix is a small itoa-shim that avoids dragging strconv +// just for tests; mirrors keyfmt's style used by the +// dist_read_repair unit tests. +func cacheKeySuffix(i int) string { + if i == 0 { + return "0" + } + + out := "" + for i > 0 { + out = string(rune('0'+(i%10))) + out + + i /= 10 + } + + return out +} + +// TestDistRebalance_ApplyOwnershipGuardRefusesForeignWrites pins +// the receiver-side guard introduced for the persistent-divergent- +// ring-view scenario: when a peer Forwards a Set for a key whose +// owners list (per the receiver's view) doesn't include the +// receiver, the receiver MUST silently drop the write instead of +// planting it. Without this guard, the migration target's +// replicate-fan-out re-plants stuck keys on the original source on +// every rebalance tick — even after the source releases the local +// copy — because the target's view still treats the source as a +// replica. The pre-fix loop: source-migrates → target-fan-outs-back +// → source-stuck-again → next-tick-migrates → ... at 60 keys/sec. +// +// Reproducer: simulate a divergent view directly by issuing a +// ForwardSet to a node we know is NOT in the key's owner set. +func TestDistRebalance_ApplyOwnershipGuardRefusesForeignWrites(t *testing.T) { + t.Parallel() + + dc := SetupInProcessClusterRF(t, 5, 3) + + const key = "guard-key-target-must-not-store" + + owners := dc.Ring.Lookup(key) + + var nonOwner *backend.DistMemory + + for _, node := range dc.Nodes { + isOwner := slices.Contains(owners, node.LocalNodeID()) + + if !isOwner { + nonOwner = node + + break + } + } + + if nonOwner == nil { + t.Fatalf("test fixture didn't produce a non-owner node for key %q", key) + } + + // Issue a Forward directly to the non-owner via the shared + // in-process transport. Pre-guard this planted the item on the + // non-owner's shard; post-guard the call returns nil but the + // shard stays clean and the refused-counter ticks up. + err := dc.Transport.ForwardSet(context.Background(), string(nonOwner.LocalNodeID()), + &cache.Item{Key: key, Value: "foreign"}, false) + if err != nil { + t.Fatalf("ForwardSet returned an error: %v (best-effort contract requires nil)", err) + } + + if nonOwner.LocalContains(key) { + t.Errorf("non-owner stored a foreign write: ownership guard didn't fire") + } + + if got := nonOwner.Metrics().WriteApplyRefused; got == 0 { + t.Errorf("WriteApplyRefused: got 0, want > 0 (guard didn't bump its counter)") + } +}