diff --git a/CHANGELOG.md b/CHANGELOG.md index d5f964f..8bf6f7e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -286,6 +286,34 @@ All notable changes to HyperCache are recorded here. The format follows ### Fixed +- **Remove path no longer silently succeeds when the primary is unreachable.** + Symmetric audit-fix to the Set-forward change: [`removeImpl`](pkg/backend/dist_memory.go) used to + swallow the `ForwardRemove` error with `_ = transport.ForwardRemove(...)` and return `nil`, so a + Remove against a downed primary "succeeded" while the stale value lingered on every owner. Promotion + is now extracted into `forwardOrPromoteRemove`, mirroring `handleForwardPrimary`'s contract: on any + non-nil error, if the local node is a replica owner, apply the remove locally + fan out to peer + replicas via the existing `applyRemove(replicate=true)` path; otherwise return the error. The + promotion path bumps the shared `dist.write.forward_promotion` counter, so operators see Set + Remove + promotions on the same observable instrument. The dead primary catches up via merkle anti-entropy on + restart — the same convergence mechanism that already handles replica-side tombstones in + `replicateRemoveWithSpan`. New test [`TestDistRemove_PromotesOnGenericForwardError`](tests/hypercache_distmemory_audit_fixes_test.go) + drives chaos at `DropRate=1.0` and asserts the Remove returns `nil` (promotion succeeded), the local + copy is cleared, and the promotion counter bumped. +- **Hint replay retains the queue on any transient transport error.** + [`processHint`](pkg/backend/dist_memory.go) used to drop the hint unless the in-process + `errors.Is(err, sentinel.ErrBackendNotFound)` matched. Production HTTP/gRPC transports surface + `net.OpError` / `io.EOF` / `context.DeadlineExceeded` for a peer that's mid-restart or briefly + unreachable — none of which matched the gate, so the hint was abandoned on its very first replay + attempt instead of being retained through the outage. The exact failure mode behind the + `recovery on :8083 timed out after 60s: pre=50/50, during=43/50` symptom in the cluster-resilience + workflow: even with the Set-forward promotion in place, the hint queue lost the writes to the dead + primary before it came back. Now any non-nil error retains the hint; the configured `WithDistHintTTL` + bounds total retry time, so a permanently-broken target still drains. The deprecated `HintedDropped` + / `MigrationHintDropped` OTel counters remain registered for stability but now only bump on + queue-capacity overflow, not replay errors. New test + [`TestDistHintReplay_RetainsOnGenericReplayError`](tests/hypercache_distmemory_audit_fixes_test.go) + forces a 150ms window of failed replays under chaos, heals chaos, and asserts the hint still replays + onto the recovered peer. - **Set-forward promotion no longer requires the in-process `ErrBackendNotFound` sentinel, and the dead primary now converges via the hint queue (not just the next merkle tick).** [`handleForwardPrimary`](pkg/backend/dist_memory.go) used to gate "primary unreachable → promote to diff --git a/cspell.config.yaml b/cspell.config.yaml index 2394134..740a69c 100644 --- a/cspell.config.yaml +++ b/cspell.config.yaml @@ -53,6 +53,7 @@ words: - benchstat - benchtime - bitnami + - blackholed - bodyclose - bufbuild - buildx diff --git a/pkg/backend/dist_memory.go b/pkg/backend/dist_memory.go index 992a88e..c130aae 100644 --- a/pkg/backend/dist_memory.go +++ b/pkg/backend/dist_memory.go @@ -1255,13 +1255,13 @@ type distMetrics struct { hintedQueued atomic.Int64 // hints queued (both sources) hintedReplayed atomic.Int64 // hints successfully replayed hintedExpired atomic.Int64 // hints expired before delivery - hintedDropped atomic.Int64 // hints dropped due to non-not-found transport errors + hintedDropped atomic.Int64 // Deprecated: no longer bumped — processHint retains on any error; OTel-registered for stability hintedGlobalDropped atomic.Int64 // hints dropped due to global caps (count/bytes) hintedBytes atomic.Int64 // approximate total bytes currently queued (best-effort) migrationHintQueued atomic.Int64 // subset of hintedQueued: rebalance migration source only migrationHintReplayed atomic.Int64 // subset of hintedReplayed: migration-source hints that drained migrationHintExpired atomic.Int64 // subset of hintedExpired: migration-source hints aged out - migrationHintDropped atomic.Int64 // subset of hintedDropped + hintedGlobalDropped: migration-source hints that died + migrationHintDropped atomic.Int64 // migration-source hints dropped by global cap overflow (replay errors no longer bump this) migrationHintLastAgeNanos atomic.Int64 // queue residency of the most-recently-replayed migration hint (ns) merkleSyncs atomic.Int64 // merkle sync operations completed merkleKeysPulled atomic.Int64 // keys applied during sync @@ -3140,24 +3140,23 @@ func (dm *DistMemory) processHint(ctx context.Context, nodeID string, entry hint return 1 } - if errors.Is(err, sentinel.ErrBackendNotFound) { // keep – backend still absent - return 0 - } - - dm.metrics.hintedDropped.Add(1) - - if entry.source == hintSourceMigration { - dm.metrics.migrationHintDropped.Add(1) - } - - dm.logger.Warn( - "hint dropped after replay error", + // Retain on any non-nil error. Pre-fix this dropped the hint + // unless the in-process `ErrBackendNotFound` sentinel matched, + // but production HTTP/gRPC transports surface `net.OpError`, + // `io.EOF`, or `context.DeadlineExceeded` for a briefly- + // unreachable peer (mid-restart, network blip), causing the + // hint to be abandoned on its very first replay attempt + // instead of being retained through the outage. The hint TTL + // bounds total retry time, so a permanently-broken target + // still drains; flapping targets converge once they stabilize. + dm.logger.Debug( + "hint replay attempt failed; retaining for retry", slog.String("peer_id", nodeID), slog.String("key", entry.item.Key), slog.Any("err", err), ) - return 1 + return 0 } // --- Simple gossip (in-process only) ---. @@ -3908,14 +3907,52 @@ func (dm *DistMemory) removeImpl(ctx context.Context, keys []string) error { return sentinel.ErrNotOwner } - dm.metrics.forwardRemove.Add(1) - - _ = transport.ForwardRemove(ctx, string(owners[0]), key, true) + err := dm.forwardOrPromoteRemove(ctx, transport, key, owners) + if err != nil { + return err + } } return nil } +// forwardOrPromoteRemove forwards a Remove to the listed primary; on +// any non-nil transport error, promotes to a local replica owner if +// possible (apply locally + fan out to peer replicas). This mirrors +// handleForwardPrimary's promotion contract for the write path — +// pre-fix the Remove path silently swallowed the forward error and +// returned `nil` to the caller, so a `docker stop` of the primary +// caused deletes to be lost on every owner. The dead primary catches +// up via merkle anti-entropy on restart, the same convergence +// mechanism that already handles replica-side tombstones in +// `replicateRemoveWithSpan`. +func (dm *DistMemory) forwardOrPromoteRemove( + ctx context.Context, + transport DistTransport, + key string, + owners []cluster.NodeID, +) error { + dm.metrics.forwardRemove.Add(1) + + err := transport.ForwardRemove(ctx, string(owners[0]), key, true) + if err == nil { + return nil + } + + if len(owners) > 1 { + for _, oid := range owners[1:] { + if oid == dm.localNode.ID && dm.ownsKeyInternal(key) { + dm.metrics.writeForwardPromotion.Add(1) + dm.applyRemove(ctx, key, true) + + return nil + } + } + } + + return err +} + // distMetricSpec describes one OTel observable instrument backed by a // field on DistMetrics. The kind selects between cumulative-counter and // gauge semantics (OTel exporters render them differently); `get` reads diff --git a/pkg/backend/dist_migration_hint_test.go b/pkg/backend/dist_migration_hint_test.go index 1b504f5..01c2fbd 100644 --- a/pkg/backend/dist_migration_hint_test.go +++ b/pkg/backend/dist_migration_hint_test.go @@ -213,12 +213,17 @@ func TestMigrationHint_ExpiredBumpsPerSourceCounter(t *testing.T) { } } -// TestMigrationHint_TransportErrorBumpsDroppedCounter pins the drop -// path: when the transport returns a non-NotFound error (auth failure, -// 5xx, parse error), the hint is removed and the per-source counter -// bumps. ErrBackendNotFound stays "keep" — that path is exercised in -// the next test. -func TestMigrationHint_TransportErrorBumpsDroppedCounter(t *testing.T) { +// TestMigrationHint_TransportErrorKeepsEntry pins the new +// retain-on-any-error contract: when a hint replay fails with any +// non-nil transport error (e.g. net.OpError, io.EOF, a scripted +// generic error like here), the hint stays queued for the next +// replay tick. Pre-fix this branch dropped the hint and bumped +// hintedDropped / migrationHintDropped, but that only matched the +// in-process ErrBackendNotFound sentinel — production HTTP/gRPC +// transports surfaced a different error class and lost the hint on +// the first replay attempt. TTL bounds total retry time, so a +// permanently-broken target still drains. +func TestMigrationHint_TransportErrorKeepsEntry(t *testing.T) { t.Parallel() dm, transport := newMigrationHintTestDM(t) @@ -234,16 +239,16 @@ func TestMigrationHint_TransportErrorBumpsDroppedCounter(t *testing.T) { source: hintSourceMigration, } - if action := dm.processHint(context.Background(), "peer-B", migEntry, now); action != 1 { - t.Errorf("transport error: want action=1 (remove), got %d", action) + if action := dm.processHint(context.Background(), "peer-B", migEntry, now); action != 0 { + t.Errorf("transport error: want action=0 (keep for retry), got %d", action) } - if got := dm.metrics.hintedDropped.Load(); got != 1 { - t.Errorf("aggregate hintedDropped: want 1, got %d", got) + if got := dm.metrics.hintedDropped.Load(); got != 0 { + t.Errorf("aggregate hintedDropped on retainable error: want 0 (no longer bumped on replay failures), got %d", got) } - if got := dm.metrics.migrationHintDropped.Load(); got != 1 { - t.Errorf("migrationHintDropped: want 1, got %d", got) + if got := dm.metrics.migrationHintDropped.Load(); got != 0 { + t.Errorf("migrationHintDropped on retainable error: want 0, got %d", got) } } diff --git a/tests/hypercache_distmemory_audit_fixes_test.go b/tests/hypercache_distmemory_audit_fixes_test.go new file mode 100644 index 0000000..01a837f --- /dev/null +++ b/tests/hypercache_distmemory_audit_fixes_test.go @@ -0,0 +1,139 @@ +package tests + +import ( + "context" + "testing" + "time" + + "github.com/hyp3rd/hypercache/internal/cluster" + "github.com/hyp3rd/hypercache/pkg/backend" + cache "github.com/hyp3rd/hypercache/pkg/cache/v2" +) + +// TestDistRemove_PromotesOnGenericForwardError pins the resilience +// contract for the Remove path symmetric to the Set path: a +// `ForwardRemove` that fails for any reason (not just the in-process +// `ErrBackendNotFound` sentinel) must promote to a local replica +// owner and apply the remove locally + fan out to peer replicas. +// Pre-fix the error was blackholed via `_ = transport.ForwardRemove(...)`, +// so Remove silently succeeded on a downed primary while leaving the +// stale value on every owner. +func TestDistRemove_PromotesOnGenericForwardError(t *testing.T) { + t.Parallel() + + chaos := backend.NewChaos() + + dc := SetupInProcessClusterRF( + t, 3, 3, + backend.WithDistChaos(chaos), + backend.WithDistWriteConsistency(backend.ConsistencyOne), + ) + + a := dc.Nodes[0] + b := dc.Nodes[1] + c := dc.Nodes[2] + + desired := []cluster.NodeID{b.LocalNodeID(), a.LocalNodeID(), c.LocalNodeID()} + + key, ok := FindOwnerKey(a, "remove-promote-", desired, 5000) + if !ok { + t.Fatalf("could not find key with owner ordering [B, A, C]") + } + + // Seed: write the key while chaos is off so it lands on every + // owner via the normal replication path. + err := a.Set(context.Background(), &cache.Item{Key: key, Value: "v1"}) + if err != nil { + t.Fatalf("seed Set: %v", err) + } + + if !a.LocalContains(key) { + t.Fatalf("seed: A.LocalContains is false; replication failed") + } + + // Now block every forward and call Remove from A. The Remove + // must NOT silently succeed; it must promote and clear the + // local copy. The promotion counter (shared with the Set path + // fix) bumps once. + chaos.SetDropRate(1.0) + + rerr := a.Remove(context.Background(), key) + if rerr != nil { + t.Fatalf("Remove: got %v, want nil (promotion should have succeeded)", rerr) + } + + if a.LocalContains(key) { + t.Errorf("LocalContains(%q) after promoted Remove: got true, want false (local applyRemove didn't fire)", + key) + } + + if got := a.Metrics().WriteForwardPromotion; got == 0 { + t.Errorf("WriteForwardPromotion: got 0, want > 0 (Remove promotion didn't bump the counter)") + } +} + +// TestDistHintReplay_RetainsOnGenericReplayError pins the hint-queue +// recovery contract: a replay attempt that fails with anything other +// than the in-process `ErrBackendNotFound` sentinel — including the +// network errors production transports surface — must keep the hint +// queued for a later retry, not abandon it on the first failure. +// Pre-fix the hint was dropped on `net.OpError` / `io.EOF` etc., +// meaning a peer that was briefly unreachable during replay lost the +// queued writes entirely. +func TestDistHintReplay_RetainsOnGenericReplayError(t *testing.T) { + t.Parallel() + + chaos := backend.NewChaos() + + dc := SetupInProcessClusterRF( + t, 3, 3, + backend.WithDistChaos(chaos), + backend.WithDistWriteConsistency(backend.ConsistencyOne), + backend.WithDistHintTTL(time.Minute), + backend.WithDistHintReplayInterval(20*time.Millisecond), + ) + + a := dc.Nodes[0] + b := dc.Nodes[1] + c := dc.Nodes[2] + + desired := []cluster.NodeID{b.LocalNodeID(), a.LocalNodeID(), c.LocalNodeID()} + + key, ok := FindOwnerKey(a, "hint-retain-", desired, 5000) + if !ok { + t.Fatalf("could not find key with owner ordering [B, A, C]") + } + + // Drop every forward, then write. The Set promotes locally and + // queues a hint for the dead primary B; the replay tick fires + // shortly after, also fails (chaos still on), and pre-fix would + // have dropped the hint. + chaos.SetDropRate(1.0) + + err := a.Set(context.Background(), &cache.Item{Key: key, Value: "v1"}) + if err != nil { + t.Fatalf("Set: %v", err) + } + + queued := a.Metrics().HintedQueued + if queued == 0 { + t.Fatalf("HintedQueued: got 0, want > 0 (promotion didn't queue a hint)") + } + + // Let the replay loop tick a few times against the still-chaotic + // transport. Pre-fix: the very first tick drops the hint and + // HintedReplayed stays 0 forever even after chaos clears. + time.Sleep(150 * time.Millisecond) + + // Now heal chaos. The retained hint replays on the next tick + // and lands on B. + chaos.SetDropRate(0) + + if !waitForLocalContains(b, key, 2*time.Second) { + t.Errorf("primary did not receive the write after chaos cleared (hint was dropped on the failed replay attempts instead of being retained)") + } + + if got := a.Metrics().HintedReplayed; got == 0 { + t.Errorf("HintedReplayed: got 0, want > 0 (hint was abandoned before chaos cleared)") + } +}