Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 13 additions & 5 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,8 @@ All notable changes to HyperCache are recorded here. The format follows

### Fixed

- **Set-forward promotion no longer requires the in-process `ErrBackendNotFound` sentinel.**
- **Set-forward promotion no longer requires the in-process `ErrBackendNotFound` sentinel, and the dead
primary now converges via the hint queue (not just the next merkle tick).**
[`handleForwardPrimary`](pkg/backend/dist_memory.go) used to gate "primary unreachable → promote to
replica" on `errors.Is(errFwd, sentinel.ErrBackendNotFound)`, the error the in-process transport returns
for an unregistered peer. HTTP/gRPC transports against a stopped container surface
Expand All @@ -299,10 +300,17 @@ All notable changes to HyperCache are recorded here. The format follows
matching the in-process and production transport behavior under the same resilience contract. Spurious
promotion on a transient blip is benign — `applySet` version-compares on the receiver, and merkle
anti-entropy / `chooseNewer` reconcile any divergent `(version, origin)` pair via the existing
last-write-wins rule. New test [`TestDistSet_PromotesOnGenericForwardError`](tests/hypercache_distmemory_forward_primary_promotion_test.go)
uses the chaos hooks at `DropRate=1.0` to deterministically force a generic forward error and asserts the
Set succeeds via promotion; the existing `TestDistFailureRecovery` continues to pass byte-identical (the
change widens the promotion gate, doesn't narrow it).
last-write-wins rule. Defense-in-depth follow-up: when promotion fires, `setImpl` now widens the replica
fan-out from `owners[1:]` to the full `owners` list, so `replicateTo`'s existing best-effort hint queueing
catches the failed forward to the dead primary. Its post-restart convergence window is bounded by
hint-replay (`WithDistHintReplayInterval`, ~200ms in the default cluster config) rather than waiting for
the next merkle tick. New OTel counter `dist.write.forward_promotion` exposes how often promotion fired —
a flapping primary surfaces as a steady rise here, well before any read- or write-side error spikes.
Test [`TestDistSet_PromotesOnGenericForwardError`](tests/hypercache_distmemory_forward_primary_promotion_test.go)
uses the chaos hooks at `DropRate=1.0` to deterministically force a generic forward error, asserts the
Set succeeds via promotion, that `HintedQueued` bumps, and — after chaos clears — that the original
primary receives the write through the natural hint-replay loop. The existing `TestDistFailureRecovery`
continues to pass byte-identical (the change widens the promotion gate, doesn't narrow it).
- **`TestDistRebalanceReplicaDiffThrottle` no longer flakes under `make test-race`.** The test's 900ms hard
sleep wasn't enough wall-clock budget for the rebalancer's 80ms-tick loop to actually fire 11 ticks under
`-race` + `-shuffle=on`'s scheduler pressure. Replaced the sleep with a 5-second polling loop that exits as
Expand Down
28 changes: 27 additions & 1 deletion pkg/backend/dist_memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -1274,6 +1274,7 @@ type distMetrics struct {
writeQuorumFailures atomic.Int64 // number of write operations that failed quorum
writeAcks atomic.Int64 // cumulative replica write acks (includes primary)
writeAttempts atomic.Int64 // total write operations attempted (Set)
writeForwardPromotion atomic.Int64 // times a Set forward to primary failed and the local replica self-promoted
rebalancedKeys atomic.Int64 // keys migrated during rebalancing
rebalanceBatches atomic.Int64 // number of batches processed
rebalanceThrottle atomic.Int64 // times rebalance was throttled due to concurrency limits
Expand Down Expand Up @@ -1334,6 +1335,7 @@ type DistMetrics struct {
WriteQuorumFailures int64
WriteAcks int64
WriteAttempts int64
WriteForwardPromotion int64
RebalancedKeys int64
RebalanceBatches int64
RebalanceThrottle int64
Expand Down Expand Up @@ -1409,6 +1411,7 @@ func (dm *DistMemory) Metrics() DistMetrics {
WriteQuorumFailures: dm.metrics.writeQuorumFailures.Load(),
WriteAcks: dm.metrics.writeAcks.Load(),
WriteAttempts: dm.metrics.writeAttempts.Load(),
WriteForwardPromotion: dm.metrics.writeForwardPromotion.Load(),
RebalancedKeys: dm.metrics.rebalancedKeys.Load(),
RebalanceBatches: dm.metrics.rebalanceBatches.Load(),
RebalanceThrottle: dm.metrics.rebalanceThrottle.Load(),
Expand Down Expand Up @@ -3657,6 +3660,8 @@ func (dm *DistMemory) handleForwardPrimary(ctx context.Context, owners []cluster
if len(owners) > 1 {
for _, oid := range owners[1:] {
if oid == dm.localNode.ID && dm.ownsKeyInternal(item.Key) {
dm.metrics.writeForwardPromotion.Add(1)

return true, nil
}
}
Expand Down Expand Up @@ -3823,6 +3828,8 @@ func (dm *DistMemory) setImpl(ctx context.Context, item *cache.Item, span trace.

span.SetAttributes(attribute.Int("dist.owners.count", len(owners)))

promoted := false

if owners[0] != dm.localNode.ID { // attempt forward; may promote
proceedAsPrimary, ferr := dm.handleForwardPrimary(ctx, owners, item)
if ferr != nil {
Expand All @@ -3832,6 +3839,8 @@ func (dm *DistMemory) setImpl(ctx context.Context, item *cache.Item, span trace.
if !proceedAsPrimary { // forwarded successfully; nothing else to do
return nil
}

promoted = true
}

// primary path: assign version & timestamp
Expand All @@ -3840,7 +3849,19 @@ func (dm *DistMemory) setImpl(ctx context.Context, item *cache.Item, span trace.
item.LastUpdated = time.Now()
dm.applySet(ctx, item, false)

acks := 1 + dm.replicateTo(ctx, item, owners[1:])
// When we promoted, the original primary is unreachable but still
// a listed owner. Pass the full owners list (not owners[1:]) so
// replicateTo's existing failure-path queues a hint for the dead
// primary — its post-restart convergence is bounded by hint-replay
// (~200ms by default) rather than waiting for the next merkle tick.
// replicateTo already skips the local node, so adding owners[0]
// back in doesn't cause a self-forward.
replicas := owners[1:]
if promoted {
replicas = owners
}

acks := 1 + dm.replicateTo(ctx, item, replicas)
dm.metrics.writeAcks.Add(int64(acks))

span.SetAttributes(attribute.Int("dist.acks", acks))
Expand Down Expand Up @@ -4192,6 +4213,11 @@ var distMetricSpecs = []distMetricSpec{
desc: "Set operations that failed quorum",
get: func(m DistMetrics) int64 { return m.WriteQuorumFailures },
},
{
name: "dist.write.forward_promotion", unit: unitOp, counter: true,
desc: "Set forwards that promoted the local replica because the primary was unreachable",
get: func(m DistMetrics) int64 { return m.WriteForwardPromotion },
},

// --- Rebalance ---
{
Expand Down
46 changes: 45 additions & 1 deletion tests/hypercache_distmemory_forward_primary_promotion_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package tests
import (
"context"
"testing"
"time"

"github.com/hyp3rd/hypercache/internal/cluster"
"github.com/hyp3rd/hypercache/pkg/backend"
Expand Down Expand Up @@ -35,11 +36,15 @@ func TestDistSet_PromotesOnGenericForwardError(t *testing.T) {

// 3 nodes, RF=3, ConsistencyOne. Chaos is wired onto every
// node's transport; we only ever Set from one node, so only
// that node's forwards exercise the promotion path.
// that node's forwards exercise the promotion path. Short
// hint-replay interval so the recovery assertion runs in
// well under a second.
dc := SetupInProcessClusterRF(
t, 3, 3,
backend.WithDistChaos(chaos),
backend.WithDistWriteConsistency(backend.ConsistencyOne),
backend.WithDistHintTTL(time.Minute),
backend.WithDistHintReplayInterval(20*time.Millisecond),
)

a := dc.Nodes[0]
Expand Down Expand Up @@ -71,4 +76,43 @@ func TestDistSet_PromotesOnGenericForwardError(t *testing.T) {
if chaos.Drops() == 0 {
t.Errorf("chaos.Drops: got 0, want > 0 (chaos didn't see the forward attempt)")
}

if got := a.Metrics().WriteForwardPromotion; got == 0 {
t.Errorf("WriteForwardPromotion: got 0, want > 0 (counter should bump on every promotion)")
}

// The defense-in-depth contract: when we promote, the failed
// forward to the dead primary queues a hint via replicateTo's
// existing best-effort logic. Without this, the original
// primary would only see the write at the next merkle tick.
if got := a.Metrics().HintedQueued; got == 0 {
t.Errorf("HintedQueued: got 0, want > 0 (replicateTo should have queued a hint for the dead primary)")
}

// End-to-end recovery: heal chaos and wait for the natural
// hint-replay tick (20ms interval, configured above). This
// proves the hint queued during promotion actually carries
// the write back to the primary once it's reachable again.
chaos.SetDropRate(0)

if !waitForLocalContains(b, key, 2*time.Second) {
t.Errorf("primary did not receive the write via hint replay after chaos cleared")
}
}

// waitForLocalContains polls node.LocalContains(key) until it returns
// true or the timeout elapses. Returns the final state. Used by the
// promotion test to absorb the hint-replay tick's scheduling jitter
// without busy-waiting the whole 2s deadline on the happy path.
func waitForLocalContains(node *backend.DistMemory, key string, timeout time.Duration) bool {
deadline := time.Now().Add(timeout)
for time.Now().Before(deadline) {
if node.LocalContains(key) {
return true
}

time.Sleep(10 * time.Millisecond)
}

return node.LocalContains(key)
}
Loading