Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 75 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,81 @@ All notable changes to HyperCache are recorded here. The format follows

### Fixed

- **applySet now clones the key string before storing it as the shard's map key.** Under HTTP traffic
(Fiber + the v1 cache API), path parameters returned by `c.Params("key")` are backed by a pooled
request buffer that the framework reuses for the next request. The original `applySet` stored the
caller's string directly as the `ConcurrentMap` key; when the next request landed, the buffer's
bytes mutated, and so did every map key (and `Item.Key` field) we'd previously stored. The
immediate symptom: the same logical key drifted across multiple shards (`first-24` showing up in
shards 2, 3, 4, and twice in shard 6), and phantom keys like `first-479` materialized in the
iteration (a "first-4" buffer overlaid with "79" from the next URL). The rebalance loop, scanning
`sh.items.All()`, kept re-flagging these phantoms — `RebalancedPrimary` climbed at ~60/s on a
5-node cluster after a single 100-key write batch, even though MembershipVersion, hint queues,
merkle counters, and the `WriteApplyRefused` guard were all quiet. The fix is one
`strings.Clone(item.Key)` call in [`applySet`](pkg/backend/dist_memory.go) before recording the
originalPrimary and storing: the cloned key has its own backing array, fully detached from the
caller's pooled buffer. The `Item.Key` field on the stored clone gets the same stable value so
any downstream code observes a coherent shard entry. Post-fix the cluster's rebalance counters
stay at exactly zero in steady state across all 5 nodes.
- **Receiver-side ownership guard breaks the divergent-ring-view rebalance loop.** After the
`migrateIfNeeded`-side fix (one migration per stuck key, then release) shipped, operators on a 5-node
cluster running [`scripts/tests/30-test-cluster-writes.sh`](scripts/tests/30-test-cluster-writes.sh)
still saw `RebalancedPrimary` climb at ~60/s post-script with no membership, hint, or merkle activity.
Root cause: when the migration target's ring view still treated the original source as a replica, the
target's `applySet` fan-out re-planted the key on the source. The source released it (per the earlier
fix), then received it back on the next gossip tick, then migrated again — perpetual cycle even though
no state was actually transitioning. New [`applyForwardedSet`](pkg/backend/dist_memory.go) is the entry
point used by the transport-receiver paths (`InProcessTransport.ForwardSet` and the HTTP
`/internal/set` handler) and applies an ownership guard: if the receiver's ring view says it isn't an
owner of the key, the write is silently dropped. The sender's transport call still returns nil (no
behavioral break — best-effort semantics already governed the hot path), but the receiver's shard
stays clean. Merkle anti-entropy is the convergence safety net for any write refused here. The guard
is deliberately NOT in `applySet` itself: legitimate internal callers (setImpl primary path,
`migrateIfNeeded` forwarder, merkle pull, read-repair) have either already verified ownership or
explicitly want to plant regardless — moving the guard would have broken `TestHTTPFetchMerkle`. New
`dist.write.apply_refused` counter exposes how often the guard fires (zero on healthy views;
non-zero indicates divergence operators may want to investigate). New test
[`TestDistRebalance_ApplyOwnershipGuardRefusesForeignWrites`](tests/hypercache_distmemory_rebalance_steady_test.go)
drives a direct `ForwardSet` to a non-owner and asserts the shard stays clean and the refused-counter
ticks up.
- **Rebalance counters no longer climb in a steady-state cluster.** When a key was no longer owned by the
current node — because the ring had shifted away from it (typical after a node joins or a singleton
cluster gains peers) — [`migrateIfNeeded`](pkg/backend/dist_memory.go) forwarded the value to the new
primary but only scheduled the LOCAL copy for deletion when `WithDistRemovalGrace > 0`. The default
removal-grace setting is zero, which meant the local item was never released; on every subsequent
rebalance tick `shouldRebalance` re-flagged the same key via its `!ownsKeyInternal` branch, and
`migrateIfNeeded` re-emitted the migration. Operators saw `RebalancedKeys` and `RebalancedPrimary`
climb at the scan-tick rate forever — e.g. 5,326 keys / 5,102 primary migrations on a 5-node cluster
with ~14 stuck keys and a 100ms ticker, even though no membership had actually changed. Migration now
releases the local copy immediately when `removalGracePeriod == 0` (and continues to schedule a
deferred delete via `shedRemovedKeys` when a grace period is configured), so each stuck key produces
exactly one migration and the loop quiesces. Two new integration tests in
[`tests/hypercache_distmemory_rebalance_steady_test.go`](tests/hypercache_distmemory_rebalance_steady_test.go)
pin both contracts: `TestDistRebalance_IdleClusterIsSilent` asserts a 5-node RF=3 cluster with no
out-of-place keys produces zero counter bumps across many ticks, and
`TestDistRebalance_LostOwnershipDrainsOnce` plants one stuck key per node via `DebugInject` and asserts
the counters reach exactly one bump per stuck key and never advance after that.
- **Incarnation and MembershipVersion no longer churn on every heartbeat.** SWIM-style incarnation
numbers and the membership version vector were both inflating roughly in lock-step with elapsed-probes —
a 5-node cluster running for a few hours showed incarnations near 2,378 per peer and a MembershipVersion
past 4,800, even though no nodes had actually changed state. [`Membership.Mark`](internal/cluster/membership.go)
was unconditionally incrementing incarnation, advancing the version counter, AND firing observers on
every call; the heartbeat-success path in `evaluateLiveness` calls `Mark(peer, NodeAlive)` once per probe
per peer. Three downstream effects: (i) operators couldn't read incarnation as a state-change signal,
(ii) gossip-merge fanned out spurious "version went up" deltas, (iii) SSE consumers received constant
no-op `members` events. Mark now treats same-state as a full no-op — LastSeen still refreshes (the
suspect-after timeout machinery needs that), but incarnation, version, and observers all stay quiet.
Genuine state transitions (Alive↔Suspect) still bump all three, so the "higher incarnation wins" gossip
merge continues to propagate real changes. New [`Membership.Refute`](internal/cluster/membership.go) is
the explicit SWIM self-refute primitive: it always bumps incarnation and sets state to NodeAlive, even
when the local view is already Alive — the one path that legitimately needs to publish a
higher-incarnation refutation packet regardless of local-view state. `refuteIfSuspected` in
[`pkg/backend/dist_memory.go`](pkg/backend/dist_memory.go) switched from `Mark(localID, NodeAlive)` to
`Refute(localID)` so the divergent semantic is obvious at the call site. Five new unit tests in
[`internal/cluster/membership_test.go`](internal/cluster/membership_test.go) pin: no-incarnation-bump on
same-state Mark, no-version-bump and no-observer-fire on same-state Mark, bump-on-transition, refute
always bumps, and the ghost-node guard. The existing `TestDistSWIM_SelfRefute` integration test
continues to pass byte-identical.
- **Remove path no longer silently succeeds when the primary is unreachable.**
Symmetric audit-fix to the Set-forward change: [`removeImpl`](pkg/backend/dist_memory.go) used to
swallow the `ForwardRemove` error with `_ = transport.ForwardRemove(...)` and return `nil`, so a
Expand Down
2 changes: 2 additions & 0 deletions cspell.config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ words:
- mset
- msgpack
- mvdan
- neighbouring
- nestif
- Newf
- nilnil
Expand Down Expand Up @@ -221,6 +222,7 @@ words:
- pyenv
- pygments
- pymdownx
- quiesces
- reaad
- readrepair
- recvcheck
Expand Down
84 changes: 82 additions & 2 deletions internal/cluster/membership.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,34 @@ func (m *Membership) Remove(id NodeID) bool {
return true
}

// Mark updates node state + incarnation and refreshes LastSeen. Returns true if node exists.
// Mark records an observer-side state update for the given node.
// Behavior depends on whether `state` represents a real transition:
//
// - state == current: no-op. LastSeen refreshes (the peer answered
// a heartbeat probe, so its observability window resets), but
// incarnation, the membership version vector, and registered
// observers are all left alone.
// - state != current: incarnation bumps, version vector advances,
// and observers fire — this is a real membership state change
// and gossip-merge / SSE consumers need to know.
//
// Returns true if the node exists.
//
// Why no-op on same state: pre-fix every successful heartbeat probe
// called Mark(peer, Alive) and unconditionally bumped both the
// node's incarnation AND the membership version counter. Operators
// running a 5-node cluster for a few hours saw incarnations climb
// to ~2,378 and MembershipVersion past 4,800 — the value tracked
// "elapsed probes," not "state changes." SWIM defines incarnation
// as a sequence number OWNED by the node itself, and the
// membership-version vector should signal "something real
// happened" so gossip knows when to fan out a delta. Both must be
// quiet during steady-state liveness churn.
//
// Self-refute — the one observer-side path that legitimately needs
// to bump incarnation regardless of the local state value — lives
// in `Refute()` so the divergent semantic is explicit at the call
// site.
func (m *Membership) Mark(id NodeID, state NodeState) bool {
m.mu.Lock()

Expand All @@ -145,9 +172,20 @@ func (m *Membership) Mark(id NodeID, state NodeState) bool {
return false
}

n.State = state
if n.State == state {
// Same-state no-op: refresh LastSeen so the suspect-after
// timeout machinery sees the probe response, but do NOT
// advance version / fire observers / bump incarnation.
// Nothing materially changed.
n.LastSeen = time.Now()
m.mu.Unlock()

return true
}

n.Incarnation++

n.State = state
n.LastSeen = time.Now()

version := m.ver.Next()
Expand All @@ -160,6 +198,48 @@ func (m *Membership) Mark(id NodeID, state NodeState) bool {
return true
}

// Refute is the SWIM self-refute primitive: when this node receives
// gossip that it is Suspect/Dead at some incarnation N, it MUST
// publish a higher incarnation (N+1) with state=Alive so the next
// gossip tick disseminates the refutation cluster-wide (peers using
// "higher incarnation wins" adopt the new value).
//
// Unlike `Mark()`, Refute always bumps incarnation — the local state
// is already Alive (a node never thinks itself dead), so a
// transition-only rule would silently no-op and the refutation would
// fail to propagate. The dedicated method makes the divergent
// semantic obvious at the only call site that needs it
// (`refuteIfSuspected` in pkg/backend/dist_memory.go).
//
// Returns true if the node exists. State is unconditionally set to
// NodeAlive; the typical caller already knows the local node is
// alive but the explicit assignment guards against odd states the
// local view might be holding.
func (m *Membership) Refute(id NodeID) bool {
m.mu.Lock()

n, ok := m.nodes[id]
if !ok {
m.mu.Unlock()

return false
}

n.Incarnation++

n.State = NodeAlive
n.LastSeen = time.Now()

version := m.ver.Next()
observers := m.observers

m.mu.Unlock()

notify(observers, id, NodeAlive, version)

return true
}

// notify invokes each observer in registration order with the
// resolved state and version. Pulled out so the call sites read
// "mutate, unlock, notify" uniformly without inline loops.
Expand Down
181 changes: 181 additions & 0 deletions internal/cluster/membership_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,3 +211,184 @@ func TestMembership_OnStateChange_DoesNotBlockMutation(t *testing.T) {
close(releaseObserver)
wg.Wait()
}

// TestMembership_Mark_NoBumpOnSameState pins the
// transition-only-bump contract: Mark() must NOT increment a node's
// incarnation when the requested state matches the current state.
// Without this rule, steady-state heartbeat success paths
// (evaluateLiveness → Mark(peer, Alive)) inflate the counter once
// per probe — operators saw incarnation values in the thousands
// after a few hours of normal operation. Incarnation is owned by
// the node itself in SWIM; observers should not churn it.
func TestMembership_Mark_NoBumpOnSameState(t *testing.T) {
t.Parallel()

m := NewMembership(NewRing())
m.Upsert(NewNode("n1", "127.0.0.1:7946"))

before := m.List()[0].Incarnation

// Repeat the same Mark a few times — this models the
// "successful probe every second" pattern that drove the bug.
for range 50 {
m.Mark("n1", NodeAlive)
}

after := m.List()[0].Incarnation
if after != before {
t.Errorf("incarnation churned on same-state Mark: before=%d after=%d (want stable)",
before, after)
}
}

// TestMembership_Mark_SameStateIsFullNoOp pins the rest of the
// no-op-Mark contract beyond just incarnation: when state matches
// the current value, the membership version vector must NOT
// advance and registered observers must NOT fire. Without this
// rule, every successful heartbeat probe bumps the version
// counter once per peer per interval — a 5-node cluster running
// for a few hours showed MembershipVersion past 4,800 even though
// no nodes had actually changed state. Cascading effects: gossip
// fans out spurious "version went up" deltas, SSE consumers see
// constant "members" event spam, and the metric stops being
// useful as a real-membership-change indicator.
func TestMembership_Mark_SameStateIsFullNoOp(t *testing.T) {
t.Parallel()

m := NewMembership(NewRing())
m.Upsert(NewNode("n1", "127.0.0.1:7946"))

var fired atomic.Int32

m.OnStateChange(func(_ NodeID, _ NodeState, _ uint64) {
fired.Add(1)
})

// Capture the version baseline after the Upsert above.
versionBefore := m.Version()

// Pound on Mark with the existing state. Models the steady
// "probe succeeded again" pattern that drove the bug.
for range 100 {
m.Mark("n1", NodeAlive)
}

if got := m.Version(); got != versionBefore {
t.Errorf("Version drifted on same-state Mark: before=%d after=%d (want stable)",
versionBefore, got)
}

if got := fired.Load(); got != 0 {
t.Errorf("observer fired %d times for same-state Mark, want 0", got)
}

// Sanity: LastSeen still refreshes so the suspect-timeout
// machinery sees probes. We can't assert exact wall-clock
// values cleanly here, but we can assert it advanced past the
// Upsert moment by being non-zero.
if m.List()[0].LastSeen.IsZero() {
t.Errorf("LastSeen wasn't refreshed by same-state Mark")
}
}

// TestMembership_Mark_BumpsOnTransition guards the other side of the
// contract: a genuine state transition (Alive→Suspect, Suspect→Alive,
// etc.) MUST bump incarnation so the gossip-merge rule
// "higher incarnation wins" propagates the change cluster-wide. If we
// over-suppress, transitions would silently fail to propagate and a
// peer briefly marked Suspect would stay Suspect on neighbouring
// nodes forever.
func TestMembership_Mark_BumpsOnTransition(t *testing.T) {
t.Parallel()

m := NewMembership(NewRing())
m.Upsert(NewNode("n1", "127.0.0.1:7946"))

v0 := m.List()[0].Incarnation

m.Mark("n1", NodeSuspect)

v1 := m.List()[0].Incarnation
if v1 != v0+1 {
t.Errorf("Alive→Suspect: got incarnation %d, want %d", v1, v0+1)
}

m.Mark("n1", NodeAlive)

v2 := m.List()[0].Incarnation
if v2 != v1+1 {
t.Errorf("Suspect→Alive: got incarnation %d, want %d", v2, v1+1)
}

// Same-state again — must NOT bump even after recent transitions.
m.Mark("n1", NodeAlive)
m.Mark("n1", NodeAlive)

v3 := m.List()[0].Incarnation
if v3 != v2 {
t.Errorf("Alive→Alive after a transition burst: got %d, want stable at %d", v3, v2)
}
}

// TestMembership_Refute_AlwaysBumps pins the SWIM self-refute
// primitive: Refute() unconditionally increments incarnation, even
// when the local view of the node is already Alive. Without this,
// the refutation packet a node sends back to a peer that suspected
// it would carry the SAME incarnation as the suspect claim — and
// "higher incarnation wins" would refuse to overwrite, so the
// suspect claim would stick even though the node refuted it.
func TestMembership_Refute_AlwaysBumps(t *testing.T) {
t.Parallel()

m := NewMembership(NewRing())
m.Upsert(NewNode("self", "127.0.0.1:7946"))

v0 := m.List()[0].Incarnation

// Local state is Alive — a transition-only rule would no-op
// here, which would silently break refutation propagation.
m.Refute("self")

v1 := m.List()[0].Incarnation
if v1 != v0+1 {
t.Errorf("first Refute: got incarnation %d, want %d", v1, v0+1)
}

// Each subsequent refute climbs one more — chained suspect
// claims from different peers must each be answerable with a
// strictly-higher incarnation.
m.Refute("self")
m.Refute("self")

v2 := m.List()[0].Incarnation
if v2 != v1+2 {
t.Errorf("chained Refute: got incarnation %d, want %d", v2, v1+2)
}

// State must end Alive regardless of intermediate values.
if got := m.List()[0].State; got != NodeAlive {
t.Errorf("Refute state: got %v, want NodeAlive", got)
}
}

// TestMembership_Refute_GhostReturnsFalse mirrors the
// non-existent-node guard already present on Mark/Remove.
func TestMembership_Refute_GhostReturnsFalse(t *testing.T) {
t.Parallel()

m := NewMembership(NewRing())

var fired atomic.Int32

m.OnStateChange(func(_ NodeID, _ NodeState, _ uint64) {
fired.Add(1)
})

if m.Refute("ghost") {
t.Fatal("Refute on non-existent node returned true")
}

if got := fired.Load(); got != 0 {
t.Errorf("observer fired %d times for ghost Refute, want 0", got)
}
}
Loading
Loading