Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,34 @@ All notable changes to HyperCache are recorded here. The format follows

### Fixed

- **Remove path no longer silently succeeds when the primary is unreachable.**
Symmetric audit-fix to the Set-forward change: [`removeImpl`](pkg/backend/dist_memory.go) used to
swallow the `ForwardRemove` error with `_ = transport.ForwardRemove(...)` and return `nil`, so a
Remove against a downed primary "succeeded" while the stale value lingered on every owner. Promotion
is now extracted into `forwardOrPromoteRemove`, mirroring `handleForwardPrimary`'s contract: on any
non-nil error, if the local node is a replica owner, apply the remove locally + fan out to peer
replicas via the existing `applyRemove(replicate=true)` path; otherwise return the error. The
promotion path bumps the shared `dist.write.forward_promotion` counter, so operators see Set + Remove
promotions on the same observable instrument. The dead primary catches up via merkle anti-entropy on
restart — the same convergence mechanism that already handles replica-side tombstones in
`replicateRemoveWithSpan`. New test [`TestDistRemove_PromotesOnGenericForwardError`](tests/hypercache_distmemory_audit_fixes_test.go)
drives chaos at `DropRate=1.0` and asserts the Remove returns `nil` (promotion succeeded), the local
copy is cleared, and the promotion counter bumped.
- **Hint replay retains the queue on any transient transport error.**
[`processHint`](pkg/backend/dist_memory.go) used to drop the hint unless the in-process
`errors.Is(err, sentinel.ErrBackendNotFound)` matched. Production HTTP/gRPC transports surface
`net.OpError` / `io.EOF` / `context.DeadlineExceeded` for a peer that's mid-restart or briefly
unreachable — none of which matched the gate, so the hint was abandoned on its very first replay
attempt instead of being retained through the outage. The exact failure mode behind the
`recovery on :8083 timed out after 60s: pre=50/50, during=43/50` symptom in the cluster-resilience
workflow: even with the Set-forward promotion in place, the hint queue lost the writes to the dead
primary before it came back. Now any non-nil error retains the hint; the configured `WithDistHintTTL`
bounds total retry time, so a permanently-broken target still drains. The deprecated `HintedDropped`
/ `MigrationHintDropped` OTel counters remain registered for stability but now only bump on
queue-capacity overflow, not replay errors. New test
[`TestDistHintReplay_RetainsOnGenericReplayError`](tests/hypercache_distmemory_audit_fixes_test.go)
forces a 150ms window of failed replays under chaos, heals chaos, and asserts the hint still replays
onto the recovered peer.
- **Set-forward promotion no longer requires the in-process `ErrBackendNotFound` sentinel, and the dead
primary now converges via the hint queue (not just the next merkle tick).**
[`handleForwardPrimary`](pkg/backend/dist_memory.go) used to gate "primary unreachable → promote to
Expand Down
1 change: 1 addition & 0 deletions cspell.config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ words:
- benchstat
- benchtime
- bitnami
- blackholed
- bodyclose
- bufbuild
- buildx
Expand Down
73 changes: 55 additions & 18 deletions pkg/backend/dist_memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -1255,13 +1255,13 @@ type distMetrics struct {
hintedQueued atomic.Int64 // hints queued (both sources)
hintedReplayed atomic.Int64 // hints successfully replayed
hintedExpired atomic.Int64 // hints expired before delivery
hintedDropped atomic.Int64 // hints dropped due to non-not-found transport errors
hintedDropped atomic.Int64 // Deprecated: no longer bumped — processHint retains on any error; OTel-registered for stability
hintedGlobalDropped atomic.Int64 // hints dropped due to global caps (count/bytes)
hintedBytes atomic.Int64 // approximate total bytes currently queued (best-effort)
migrationHintQueued atomic.Int64 // subset of hintedQueued: rebalance migration source only
migrationHintReplayed atomic.Int64 // subset of hintedReplayed: migration-source hints that drained
migrationHintExpired atomic.Int64 // subset of hintedExpired: migration-source hints aged out
migrationHintDropped atomic.Int64 // subset of hintedDropped + hintedGlobalDropped: migration-source hints that died
migrationHintDropped atomic.Int64 // migration-source hints dropped by global cap overflow (replay errors no longer bump this)
migrationHintLastAgeNanos atomic.Int64 // queue residency of the most-recently-replayed migration hint (ns)
merkleSyncs atomic.Int64 // merkle sync operations completed
merkleKeysPulled atomic.Int64 // keys applied during sync
Expand Down Expand Up @@ -3140,24 +3140,23 @@ func (dm *DistMemory) processHint(ctx context.Context, nodeID string, entry hint
return 1
}

if errors.Is(err, sentinel.ErrBackendNotFound) { // keep – backend still absent
return 0
}

dm.metrics.hintedDropped.Add(1)

if entry.source == hintSourceMigration {
dm.metrics.migrationHintDropped.Add(1)
}

dm.logger.Warn(
"hint dropped after replay error",
// Retain on any non-nil error. Pre-fix this dropped the hint
// unless the in-process `ErrBackendNotFound` sentinel matched,
// but production HTTP/gRPC transports surface `net.OpError`,
// `io.EOF`, or `context.DeadlineExceeded` for a briefly-
// unreachable peer (mid-restart, network blip), causing the
// hint to be abandoned on its very first replay attempt
// instead of being retained through the outage. The hint TTL
// bounds total retry time, so a permanently-broken target
// still drains; flapping targets converge once they stabilize.
dm.logger.Debug(
"hint replay attempt failed; retaining for retry",
slog.String("peer_id", nodeID),
slog.String("key", entry.item.Key),
slog.Any("err", err),
)

return 1
return 0
}

// --- Simple gossip (in-process only) ---.
Expand Down Expand Up @@ -3908,14 +3907,52 @@ func (dm *DistMemory) removeImpl(ctx context.Context, keys []string) error {
return sentinel.ErrNotOwner
}

dm.metrics.forwardRemove.Add(1)

_ = transport.ForwardRemove(ctx, string(owners[0]), key, true)
err := dm.forwardOrPromoteRemove(ctx, transport, key, owners)
if err != nil {
return err
}
}

return nil
}

// forwardOrPromoteRemove forwards a Remove to the listed primary; on
// any non-nil transport error, promotes to a local replica owner if
// possible (apply locally + fan out to peer replicas). This mirrors
// handleForwardPrimary's promotion contract for the write path —
// pre-fix the Remove path silently swallowed the forward error and
// returned `nil` to the caller, so a `docker stop` of the primary
// caused deletes to be lost on every owner. The dead primary catches
// up via merkle anti-entropy on restart, the same convergence
// mechanism that already handles replica-side tombstones in
// `replicateRemoveWithSpan`.
func (dm *DistMemory) forwardOrPromoteRemove(
ctx context.Context,
transport DistTransport,
key string,
owners []cluster.NodeID,
) error {
dm.metrics.forwardRemove.Add(1)

err := transport.ForwardRemove(ctx, string(owners[0]), key, true)
if err == nil {
return nil
}

if len(owners) > 1 {
for _, oid := range owners[1:] {
if oid == dm.localNode.ID && dm.ownsKeyInternal(key) {
dm.metrics.writeForwardPromotion.Add(1)
dm.applyRemove(ctx, key, true)

return nil
}
}
}

return err
}

// distMetricSpec describes one OTel observable instrument backed by a
// field on DistMetrics. The kind selects between cumulative-counter and
// gauge semantics (OTel exporters render them differently); `get` reads
Expand Down
29 changes: 17 additions & 12 deletions pkg/backend/dist_migration_hint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,12 +213,17 @@ func TestMigrationHint_ExpiredBumpsPerSourceCounter(t *testing.T) {
}
}

// TestMigrationHint_TransportErrorBumpsDroppedCounter pins the drop
// path: when the transport returns a non-NotFound error (auth failure,
// 5xx, parse error), the hint is removed and the per-source counter
// bumps. ErrBackendNotFound stays "keep" — that path is exercised in
// the next test.
func TestMigrationHint_TransportErrorBumpsDroppedCounter(t *testing.T) {
// TestMigrationHint_TransportErrorKeepsEntry pins the new
// retain-on-any-error contract: when a hint replay fails with any
// non-nil transport error (e.g. net.OpError, io.EOF, a scripted
// generic error like here), the hint stays queued for the next
// replay tick. Pre-fix this branch dropped the hint and bumped
// hintedDropped / migrationHintDropped, but that only matched the
// in-process ErrBackendNotFound sentinel — production HTTP/gRPC
// transports surfaced a different error class and lost the hint on
// the first replay attempt. TTL bounds total retry time, so a
// permanently-broken target still drains.
func TestMigrationHint_TransportErrorKeepsEntry(t *testing.T) {
t.Parallel()

dm, transport := newMigrationHintTestDM(t)
Expand All @@ -234,16 +239,16 @@ func TestMigrationHint_TransportErrorBumpsDroppedCounter(t *testing.T) {
source: hintSourceMigration,
}

if action := dm.processHint(context.Background(), "peer-B", migEntry, now); action != 1 {
t.Errorf("transport error: want action=1 (remove), got %d", action)
if action := dm.processHint(context.Background(), "peer-B", migEntry, now); action != 0 {
t.Errorf("transport error: want action=0 (keep for retry), got %d", action)
}

if got := dm.metrics.hintedDropped.Load(); got != 1 {
t.Errorf("aggregate hintedDropped: want 1, got %d", got)
if got := dm.metrics.hintedDropped.Load(); got != 0 {
t.Errorf("aggregate hintedDropped on retainable error: want 0 (no longer bumped on replay failures), got %d", got)
}

if got := dm.metrics.migrationHintDropped.Load(); got != 1 {
t.Errorf("migrationHintDropped: want 1, got %d", got)
if got := dm.metrics.migrationHintDropped.Load(); got != 0 {
t.Errorf("migrationHintDropped on retainable error: want 0, got %d", got)
}
}

Expand Down
139 changes: 139 additions & 0 deletions tests/hypercache_distmemory_audit_fixes_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
package tests

import (
"context"
"testing"
"time"

"github.com/hyp3rd/hypercache/internal/cluster"
"github.com/hyp3rd/hypercache/pkg/backend"
cache "github.com/hyp3rd/hypercache/pkg/cache/v2"
)

// TestDistRemove_PromotesOnGenericForwardError pins the resilience
// contract for the Remove path symmetric to the Set path: a
// `ForwardRemove` that fails for any reason (not just the in-process
// `ErrBackendNotFound` sentinel) must promote to a local replica
// owner and apply the remove locally + fan out to peer replicas.
// Pre-fix the error was blackholed via `_ = transport.ForwardRemove(...)`,
// so Remove silently succeeded on a downed primary while leaving the
// stale value on every owner.
func TestDistRemove_PromotesOnGenericForwardError(t *testing.T) {
t.Parallel()

chaos := backend.NewChaos()

dc := SetupInProcessClusterRF(
t, 3, 3,
backend.WithDistChaos(chaos),
backend.WithDistWriteConsistency(backend.ConsistencyOne),
)

a := dc.Nodes[0]
b := dc.Nodes[1]
c := dc.Nodes[2]

desired := []cluster.NodeID{b.LocalNodeID(), a.LocalNodeID(), c.LocalNodeID()}

key, ok := FindOwnerKey(a, "remove-promote-", desired, 5000)
if !ok {
t.Fatalf("could not find key with owner ordering [B, A, C]")
}

// Seed: write the key while chaos is off so it lands on every
// owner via the normal replication path.
err := a.Set(context.Background(), &cache.Item{Key: key, Value: "v1"})
if err != nil {
t.Fatalf("seed Set: %v", err)
}

if !a.LocalContains(key) {
t.Fatalf("seed: A.LocalContains is false; replication failed")
}

// Now block every forward and call Remove from A. The Remove
// must NOT silently succeed; it must promote and clear the
// local copy. The promotion counter (shared with the Set path
// fix) bumps once.
chaos.SetDropRate(1.0)

rerr := a.Remove(context.Background(), key)
if rerr != nil {
t.Fatalf("Remove: got %v, want nil (promotion should have succeeded)", rerr)
}

if a.LocalContains(key) {
t.Errorf("LocalContains(%q) after promoted Remove: got true, want false (local applyRemove didn't fire)",
key)
}

if got := a.Metrics().WriteForwardPromotion; got == 0 {
t.Errorf("WriteForwardPromotion: got 0, want > 0 (Remove promotion didn't bump the counter)")
}
}

// TestDistHintReplay_RetainsOnGenericReplayError pins the hint-queue
// recovery contract: a replay attempt that fails with anything other
// than the in-process `ErrBackendNotFound` sentinel — including the
// network errors production transports surface — must keep the hint
// queued for a later retry, not abandon it on the first failure.
// Pre-fix the hint was dropped on `net.OpError` / `io.EOF` etc.,
// meaning a peer that was briefly unreachable during replay lost the
// queued writes entirely.
func TestDistHintReplay_RetainsOnGenericReplayError(t *testing.T) {
t.Parallel()

chaos := backend.NewChaos()

dc := SetupInProcessClusterRF(
t, 3, 3,
backend.WithDistChaos(chaos),
backend.WithDistWriteConsistency(backend.ConsistencyOne),
backend.WithDistHintTTL(time.Minute),
backend.WithDistHintReplayInterval(20*time.Millisecond),
)

a := dc.Nodes[0]
b := dc.Nodes[1]
c := dc.Nodes[2]

desired := []cluster.NodeID{b.LocalNodeID(), a.LocalNodeID(), c.LocalNodeID()}

key, ok := FindOwnerKey(a, "hint-retain-", desired, 5000)
if !ok {
t.Fatalf("could not find key with owner ordering [B, A, C]")
}

// Drop every forward, then write. The Set promotes locally and
// queues a hint for the dead primary B; the replay tick fires
// shortly after, also fails (chaos still on), and pre-fix would
// have dropped the hint.
chaos.SetDropRate(1.0)

err := a.Set(context.Background(), &cache.Item{Key: key, Value: "v1"})
if err != nil {
t.Fatalf("Set: %v", err)
}

queued := a.Metrics().HintedQueued
if queued == 0 {
t.Fatalf("HintedQueued: got 0, want > 0 (promotion didn't queue a hint)")
}

// Let the replay loop tick a few times against the still-chaotic
// transport. Pre-fix: the very first tick drops the hint and
// HintedReplayed stays 0 forever even after chaos clears.
time.Sleep(150 * time.Millisecond)

// Now heal chaos. The retained hint replays on the next tick
// and lands on B.
chaos.SetDropRate(0)

if !waitForLocalContains(b, key, 2*time.Second) {
t.Errorf("primary did not receive the write after chaos cleared (hint was dropped on the failed replay attempts instead of being retained)")
}

if got := a.Metrics().HintedReplayed; got == 0 {
t.Errorf("HintedReplayed: got 0, want > 0 (hint was abandoned before chaos cleared)")
}
}
Loading