diff --git a/CHANGELOG.md b/CHANGELOG.md index 8713f02..d5f964f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -286,7 +286,8 @@ All notable changes to HyperCache are recorded here. The format follows ### Fixed -- **Set-forward promotion no longer requires the in-process `ErrBackendNotFound` sentinel.** +- **Set-forward promotion no longer requires the in-process `ErrBackendNotFound` sentinel, and the dead + primary now converges via the hint queue (not just the next merkle tick).** [`handleForwardPrimary`](pkg/backend/dist_memory.go) used to gate "primary unreachable → promote to replica" on `errors.Is(errFwd, sentinel.ErrBackendNotFound)`, the error the in-process transport returns for an unregistered peer. HTTP/gRPC transports against a stopped container surface @@ -299,10 +300,17 @@ All notable changes to HyperCache are recorded here. The format follows matching the in-process and production transport behavior under the same resilience contract. Spurious promotion on a transient blip is benign — `applySet` version-compares on the receiver, and merkle anti-entropy / `chooseNewer` reconcile any divergent `(version, origin)` pair via the existing - last-write-wins rule. New test [`TestDistSet_PromotesOnGenericForwardError`](tests/hypercache_distmemory_forward_primary_promotion_test.go) - uses the chaos hooks at `DropRate=1.0` to deterministically force a generic forward error and asserts the - Set succeeds via promotion; the existing `TestDistFailureRecovery` continues to pass byte-identical (the - change widens the promotion gate, doesn't narrow it). + last-write-wins rule. Defense-in-depth follow-up: when promotion fires, `setImpl` now widens the replica + fan-out from `owners[1:]` to the full `owners` list, so `replicateTo`'s existing best-effort hint queueing + catches the failed forward to the dead primary. Its post-restart convergence window is bounded by + hint-replay (`WithDistHintReplayInterval`, ~200ms in the default cluster config) rather than waiting for + the next merkle tick. New OTel counter `dist.write.forward_promotion` exposes how often promotion fired — + a flapping primary surfaces as a steady rise here, well before any read- or write-side error spikes. + Test [`TestDistSet_PromotesOnGenericForwardError`](tests/hypercache_distmemory_forward_primary_promotion_test.go) + uses the chaos hooks at `DropRate=1.0` to deterministically force a generic forward error, asserts the + Set succeeds via promotion, that `HintedQueued` bumps, and — after chaos clears — that the original + primary receives the write through the natural hint-replay loop. The existing `TestDistFailureRecovery` + continues to pass byte-identical (the change widens the promotion gate, doesn't narrow it). - **`TestDistRebalanceReplicaDiffThrottle` no longer flakes under `make test-race`.** The test's 900ms hard sleep wasn't enough wall-clock budget for the rebalancer's 80ms-tick loop to actually fire 11 ticks under `-race` + `-shuffle=on`'s scheduler pressure. Replaced the sleep with a 5-second polling loop that exits as diff --git a/pkg/backend/dist_memory.go b/pkg/backend/dist_memory.go index a6ca0d4..992a88e 100644 --- a/pkg/backend/dist_memory.go +++ b/pkg/backend/dist_memory.go @@ -1274,6 +1274,7 @@ type distMetrics struct { writeQuorumFailures atomic.Int64 // number of write operations that failed quorum writeAcks atomic.Int64 // cumulative replica write acks (includes primary) writeAttempts atomic.Int64 // total write operations attempted (Set) + writeForwardPromotion atomic.Int64 // times a Set forward to primary failed and the local replica self-promoted rebalancedKeys atomic.Int64 // keys migrated during rebalancing rebalanceBatches atomic.Int64 // number of batches processed rebalanceThrottle atomic.Int64 // times rebalance was throttled due to concurrency limits @@ -1334,6 +1335,7 @@ type DistMetrics struct { WriteQuorumFailures int64 WriteAcks int64 WriteAttempts int64 + WriteForwardPromotion int64 RebalancedKeys int64 RebalanceBatches int64 RebalanceThrottle int64 @@ -1409,6 +1411,7 @@ func (dm *DistMemory) Metrics() DistMetrics { WriteQuorumFailures: dm.metrics.writeQuorumFailures.Load(), WriteAcks: dm.metrics.writeAcks.Load(), WriteAttempts: dm.metrics.writeAttempts.Load(), + WriteForwardPromotion: dm.metrics.writeForwardPromotion.Load(), RebalancedKeys: dm.metrics.rebalancedKeys.Load(), RebalanceBatches: dm.metrics.rebalanceBatches.Load(), RebalanceThrottle: dm.metrics.rebalanceThrottle.Load(), @@ -3657,6 +3660,8 @@ func (dm *DistMemory) handleForwardPrimary(ctx context.Context, owners []cluster if len(owners) > 1 { for _, oid := range owners[1:] { if oid == dm.localNode.ID && dm.ownsKeyInternal(item.Key) { + dm.metrics.writeForwardPromotion.Add(1) + return true, nil } } @@ -3823,6 +3828,8 @@ func (dm *DistMemory) setImpl(ctx context.Context, item *cache.Item, span trace. span.SetAttributes(attribute.Int("dist.owners.count", len(owners))) + promoted := false + if owners[0] != dm.localNode.ID { // attempt forward; may promote proceedAsPrimary, ferr := dm.handleForwardPrimary(ctx, owners, item) if ferr != nil { @@ -3832,6 +3839,8 @@ func (dm *DistMemory) setImpl(ctx context.Context, item *cache.Item, span trace. if !proceedAsPrimary { // forwarded successfully; nothing else to do return nil } + + promoted = true } // primary path: assign version & timestamp @@ -3840,7 +3849,19 @@ func (dm *DistMemory) setImpl(ctx context.Context, item *cache.Item, span trace. item.LastUpdated = time.Now() dm.applySet(ctx, item, false) - acks := 1 + dm.replicateTo(ctx, item, owners[1:]) + // When we promoted, the original primary is unreachable but still + // a listed owner. Pass the full owners list (not owners[1:]) so + // replicateTo's existing failure-path queues a hint for the dead + // primary — its post-restart convergence is bounded by hint-replay + // (~200ms by default) rather than waiting for the next merkle tick. + // replicateTo already skips the local node, so adding owners[0] + // back in doesn't cause a self-forward. + replicas := owners[1:] + if promoted { + replicas = owners + } + + acks := 1 + dm.replicateTo(ctx, item, replicas) dm.metrics.writeAcks.Add(int64(acks)) span.SetAttributes(attribute.Int("dist.acks", acks)) @@ -4192,6 +4213,11 @@ var distMetricSpecs = []distMetricSpec{ desc: "Set operations that failed quorum", get: func(m DistMetrics) int64 { return m.WriteQuorumFailures }, }, + { + name: "dist.write.forward_promotion", unit: unitOp, counter: true, + desc: "Set forwards that promoted the local replica because the primary was unreachable", + get: func(m DistMetrics) int64 { return m.WriteForwardPromotion }, + }, // --- Rebalance --- { diff --git a/tests/hypercache_distmemory_forward_primary_promotion_test.go b/tests/hypercache_distmemory_forward_primary_promotion_test.go index 93d92af..dd23c21 100644 --- a/tests/hypercache_distmemory_forward_primary_promotion_test.go +++ b/tests/hypercache_distmemory_forward_primary_promotion_test.go @@ -3,6 +3,7 @@ package tests import ( "context" "testing" + "time" "github.com/hyp3rd/hypercache/internal/cluster" "github.com/hyp3rd/hypercache/pkg/backend" @@ -35,11 +36,15 @@ func TestDistSet_PromotesOnGenericForwardError(t *testing.T) { // 3 nodes, RF=3, ConsistencyOne. Chaos is wired onto every // node's transport; we only ever Set from one node, so only - // that node's forwards exercise the promotion path. + // that node's forwards exercise the promotion path. Short + // hint-replay interval so the recovery assertion runs in + // well under a second. dc := SetupInProcessClusterRF( t, 3, 3, backend.WithDistChaos(chaos), backend.WithDistWriteConsistency(backend.ConsistencyOne), + backend.WithDistHintTTL(time.Minute), + backend.WithDistHintReplayInterval(20*time.Millisecond), ) a := dc.Nodes[0] @@ -71,4 +76,43 @@ func TestDistSet_PromotesOnGenericForwardError(t *testing.T) { if chaos.Drops() == 0 { t.Errorf("chaos.Drops: got 0, want > 0 (chaos didn't see the forward attempt)") } + + if got := a.Metrics().WriteForwardPromotion; got == 0 { + t.Errorf("WriteForwardPromotion: got 0, want > 0 (counter should bump on every promotion)") + } + + // The defense-in-depth contract: when we promote, the failed + // forward to the dead primary queues a hint via replicateTo's + // existing best-effort logic. Without this, the original + // primary would only see the write at the next merkle tick. + if got := a.Metrics().HintedQueued; got == 0 { + t.Errorf("HintedQueued: got 0, want > 0 (replicateTo should have queued a hint for the dead primary)") + } + + // End-to-end recovery: heal chaos and wait for the natural + // hint-replay tick (20ms interval, configured above). This + // proves the hint queued during promotion actually carries + // the write back to the primary once it's reachable again. + chaos.SetDropRate(0) + + if !waitForLocalContains(b, key, 2*time.Second) { + t.Errorf("primary did not receive the write via hint replay after chaos cleared") + } +} + +// waitForLocalContains polls node.LocalContains(key) until it returns +// true or the timeout elapses. Returns the final state. Used by the +// promotion test to absorb the hint-replay tick's scheduling jitter +// without busy-waiting the whole 2s deadline on the happy path. +func waitForLocalContains(node *backend.DistMemory, key string, timeout time.Duration) bool { + deadline := time.Now().Add(timeout) + for time.Now().Before(deadline) { + if node.LocalContains(key) { + return true + } + + time.Sleep(10 * time.Millisecond) + } + + return node.LocalContains(key) }