diff --git a/CHANGELOG.md b/CHANGELOG.md index 8193d5a..5b21cac 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,26 @@ All notable changes to HyperCache are recorded here. The format follows ### Added +- **Chaos hooks for resilience testing (Phase 7).** New + [`backend.WithDistChaos(*Chaos)`](pkg/backend/dist_chaos.go) option transparently wraps the dist transport + with configurable fault injection — drop rate and latency injection, both with per-call probability rolls + off a crypto-seeded math/rand source. The wrapper is automatic for both the explicit + `WithDistTransport` path and the auto-wired HTTP transport, so chaos covers every dist call uniformly. + Disabled by default (zero overhead) and opt-in by design — the doc comment is explicit that this is a + test-only surface with no production safety net. Atomic mutators (`SetDropRate`, `SetLatency`) let tests + enable chaos mid-run, drive the cluster, then heal — exactly the shape the rebalance flake we caught in + May 2026 needed to be surfaced deterministically. Two new OTel metrics: + `dist.chaos.drops` (calls dropped) and `dist.chaos.latencies` (calls with latency injected). Eight unit + tests in [`pkg/backend/dist_chaos_test.go`](pkg/backend/dist_chaos_test.go) cover every branch + (DropRate=1 always drops, DropRate=0 never drops, latency injection fires + delays the call, nil-Chaos + passes through unchanged, the disabled-but-installed wrapper is a pass-through, concurrent calls are + race-free under -race, boundary clamping for out-of-range probabilities, nil-receiver safety on the + Metrics() snapshot path). Two integration tests in + [`tests/integration/dist_chaos_test.go`](tests/integration/dist_chaos_test.go) drive the canonical + resilience scenario — 80% drops force the hint queue to absorb replica fan-out failures; disabling chaos + lets the replay loop drain the queue. New "Chaos hooks (resilience testing)" section in + [`docs/operations.md`](docs/operations.md) with the usage shape and the "what this catches that CI flake + hunting won't" rationale. - **Batch operations on the client SDK.** `BatchSet`, `BatchGet`, `BatchDelete` close the v1 SDK gap PR3's stopping conditions called out — the raw OIDC example demonstrated batch round-trips but the SDK had no equivalent. Each method takes a slice and returns per-item results so a single HTTP call can carry diff --git a/Makefile b/Makefile index b687c19..a8b91c3 100644 --- a/Makefile +++ b/Makefile @@ -81,7 +81,7 @@ test-cluster: stop-dev-cluster exit $$rc # ci aggregates the gates required before declaring a task done (see AGENTS.md). -ci: lint typecheck test-race sec build +ci: lint typecheck test-race pre-commit sec build @echo "All CI gates passed." # bench runs the benchmark tests in the benchmark subpackage of the tests package. @@ -215,6 +215,14 @@ docs-publish: docs-build docs-serve: docs-build PYENV_VERSION=mkdocs mkdocs serve +pre-commit: + pre-commit run -a trailing-whitespace && \ + pre-commit run -a end-of-file-fixer && \ + pre-commit run -a markdownlint && \ + pre-commit run -a yamllint && \ + pre-commit run -a cspell && \ + pre-commit run -a cspell + # check_command_exists is a helper function that checks if a command exists. define check_command_exists @which $(1) > /dev/null 2>&1 || (echo "$(1) command not found" && exit 1) diff --git a/__examples/distributed-oidc-client/main.go b/__examples/distributed-oidc-client/main.go index 63dd12c..410023f 100644 --- a/__examples/distributed-oidc-client/main.go +++ b/__examples/distributed-oidc-client/main.go @@ -13,7 +13,6 @@ package main import ( "context" "encoding/json" - "errors" "fmt" "net/http" "net/url" @@ -21,6 +20,7 @@ import ( "strings" "time" + "github.com/hyp3rd/ewrap" "golang.org/x/oauth2/clientcredentials" "github.com/hyp3rd/hypercache/pkg/client" @@ -40,8 +40,8 @@ const ( // errors.Is against it; in the example, run() surfaces the // wrapped error to stderr. var ( - errEnvMissing = errors.New("missing required env var") - errDiscoveryNoEndpoint = errors.New("OIDC discovery doc missing token_endpoint") + errEnvMissing = ewrap.New("missing required env var") + errDiscoveryNoEndpoint = ewrap.New("OIDC discovery doc missing token_endpoint") ) func main() { diff --git a/cspell.config.yaml b/cspell.config.yaml index ca0b3bc..5de36d5 100644 --- a/cspell.config.yaml +++ b/cspell.config.yaml @@ -76,6 +76,7 @@ words: - contextcheck - cpuprofile - cret + - cryptorand - cyclop - daixiang - Decr @@ -112,6 +113,7 @@ words: - Fprintln - freqs - frontmatter + - funcorder - funlen - geomean - gerr diff --git a/docs/operations.md b/docs/operations.md index e18bdf2..bf675f3 100644 --- a/docs/operations.md +++ b/docs/operations.md @@ -160,6 +160,79 @@ err := dm.SyncWith(ctx, "node-B") `WithDistMerkleAutoSync(interval)` runs this on a timer; manual calls are useful for debugging. +## Chaos hooks (resilience testing) + +The dist backend exposes fault-injection hooks via the `Chaos` type +and `WithDistChaos` option. **Tests only** — there's no production +safety net; pointing a live cluster at a Chaos with `DropRate=1.0` +will drop every transport call. + +What's covered today: + +- **Drop**: with configurable probability, return `ErrChaosDrop` + instead of forwarding the call. Useful for "what if the peer's + down for N seconds?" scenarios — the hint queue should absorb + the drops and replay them once chaos is disabled. +- **Latency**: with configurable probability, sleep before + forwarding. Useful for "what if the peer's slow?" — exercises + the timeout + retry surface. + +What's deliberately out of scope for v1: + +- Per-peer partition simulation (block a specific peer ID). + Tracked as a follow-up; the current hooks treat every peer + uniformly. Workaround: spin a third node, configure chaos on + it only, and observe what happens when "node N misbehaves". + +### Usage + +```go +import "github.com/hyp3rd/hypercache/pkg/backend" + +chaos := backend.NewChaos() + +bm, _ := backend.NewDistMemory(ctx, + backend.WithDistNode("A", addr), + backend.WithDistChaos(chaos), + // ... other options ... +) + +// Mid-test: enable 50% drops + 10ms latency on every call. +chaos.SetDropRate(0.5) +chaos.SetLatency(10*time.Millisecond, 1.0) + +// ... drive the cluster, observe behavior ... + +// Heal: turn faults off. +chaos.SetDropRate(0) + +// Verify the chaos counters fired AND the cluster recovered. +metrics := bm.(*backend.DistMemory).Metrics() +fmt.Println("drops:", metrics.ChaosDrops) +fmt.Println("hint replays after heal:", metrics.HintedReplayed) +``` + +### Metrics + +- `dist.chaos.drops` (counter) — calls dropped since construction. +- `dist.chaos.latencies` (counter) — calls that had latency injected. + +Both stay at zero when chaos isn't configured (the wrapper isn't +installed at all — `WithDistChaos` is opt-in). + +### What this gives you + +The rebalance flake we caught manually in May 2026 +(`TestDistRebalanceThrottle` failing under `-shuffle` due to a +transient quorum miss) is exactly the class of bug the chaos hooks +exist to surface. Wire chaos at 5-10% drop rate against the test +suite, run under `-race`, and the timing-sensitive paths surface +deterministically rather than as 1-in-50 CI flakes. + +See [`pkg/backend/dist_chaos_test.go`](../pkg/backend/dist_chaos_test.go) +and [`tests/integration/dist_chaos_test.go`](../tests/integration/dist_chaos_test.go) +for runnable examples. + ## Capacity planning notes - Each shard mutex is independent — write throughput scales with shard count up to CPU saturation. diff --git a/pkg/backend/dist_chaos.go b/pkg/backend/dist_chaos.go new file mode 100644 index 0000000..5dc2f86 --- /dev/null +++ b/pkg/backend/dist_chaos.go @@ -0,0 +1,336 @@ +package backend + +import ( + "context" + cryptorand "crypto/rand" + "encoding/binary" + "math" + mathrand "math/rand/v2" + "sync" + "sync/atomic" + "time" + + "github.com/hyp3rd/ewrap" + + cache "github.com/hyp3rd/hypercache/pkg/cache/v2" +) + +// ErrChaosDrop is the sentinel a chaos-wrapped transport returns +// when a configured DropRate triggers on a request. Tests use +// errors.Is(err, ErrChaosDrop) to confirm the fault path fired +// rather than a real transport failure. +var ErrChaosDrop = ewrap.New("dist transport: chaos drop") + +// Chaos configures fault injection for resilience testing of the +// dist transport. Construct via NewChaos, configure via the +// SetDropRate / SetLatency helpers (atomic — safe to call from +// running tests), then pass to WithDistChaos when constructing +// the DistMemory under test. +// +// Disabled by default (zero values). When set on a DistMemory, +// every dist transport call goes through the chaos wrapper; the +// overhead is two atomic loads on the no-fault path so the cost +// stays trivial even when chaos is enabled but inactive. +// +// Chaos is for tests only. The hooks DO NOT belong in production +// — there's no safety mechanism preventing an operator from +// pointing a production cluster at a Chaos with DropRate=1.0. +// Don't. +type Chaos struct { + // dropRate stores a float64 (0.0..1.0) in its bit-pattern form + // so updates remain atomic. Read via math.Float64frombits. + dropRate atomic.Uint64 + + // latencyNs is the injected latency in nanoseconds. Zero means + // latency injection is disabled. + latencyNs atomic.Int64 + + // latencyRate stores a float64 (0.0..1.0) same shape as dropRate. + latencyRate atomic.Uint64 + + // Telemetry: drops fires every time a call is dropped; + // latencies fires every time latency is injected. Test code + // asserts these to confirm chaos actually intercepted calls. + drops atomic.Int64 + latencies atomic.Int64 + + // rng + rngMu produce the per-call probability rolls. Seeded + // once from crypto/rand at construction; same pattern as the + // SDK's failover shuffler so a fleet of chaos-enabled tests + // don't synchronize their fault decisions. + rng *mathrand.Rand + rngMu sync.Mutex +} + +// NewChaos returns a zeroed Chaos. Configure via SetDropRate / +// SetLatency before or after passing to WithDistChaos; mutation +// is atomic and safe to interleave with running tests. +func NewChaos() *Chaos { + var seed [16]byte + + _, err := cryptorand.Read(seed[:]) + if err != nil { + // Crypto-rand failure is effectively impossible; fall + // back to a stable seed so chaos still runs (poorly). + seed = [16]byte{17, 31, 47, 61, 71, 83, 97, 109, 113, 127, 139, 151, 163, 173, 181, 191} + } + + src := mathrand.NewPCG( + binary.LittleEndian.Uint64(seed[:8]), + binary.LittleEndian.Uint64(seed[8:]), + ) + + // G404: this is test-only fault-injection, not a security + // surface — math/rand crypto-seeded from crypto/rand is the + // standard recipe. + // #nosec G404 -- test-fault-injection roll, seeded from crypto/rand + return &Chaos{rng: mathrand.New(src)} +} + +// SetDropRate configures the probability (0.0..1.0) that a single +// transport call is dropped instead of forwarded. Pass 0 to +// disable. Values outside [0, 1] are clamped. +func (c *Chaos) SetDropRate(p float64) { + if c == nil { + return + } + + switch { + case p < 0: + p = 0 + case p > 1: + p = 1 + default: + // already in [0, 1]; no clamp needed + } + + c.dropRate.Store(math.Float64bits(p)) +} + +// SetLatency configures injected latency (duration d) applied +// with the given probability rate (0.0..1.0). Pass d=0 OR rate=0 +// to disable. Values outside [0, 1] are clamped. +func (c *Chaos) SetLatency(d time.Duration, rate float64) { + if c == nil { + return + } + + switch { + case rate < 0: + rate = 0 + case rate > 1: + rate = 1 + default: + // already in [0, 1]; no clamp needed + } + + c.latencyNs.Store(d.Nanoseconds()) + c.latencyRate.Store(math.Float64bits(rate)) +} + +// Drops returns the count of transport calls dropped since +// construction. Useful for test assertions of the shape "assert +// chaos.Drops() > 0 after running the load". +func (c *Chaos) Drops() int64 { + if c == nil { + return 0 + } + + return c.drops.Load() +} + +// Latencies returns the count of transport calls that had latency +// injected since construction. +func (c *Chaos) Latencies() int64 { + if c == nil { + return 0 + } + + return c.latencies.Load() +} + +// roll returns a fresh [0, 1) float for a probability decision. +// Single goroutine-safe path; tests under -race exercise this via +// concurrent transport calls. +func (c *Chaos) roll() float64 { + c.rngMu.Lock() + defer c.rngMu.Unlock() + + return c.rng.Float64() +} + +// maybeDrop returns ErrChaosDrop with probability dropRate; nil +// otherwise. Called at the start of every chaos-wrapped transport +// method. +func (c *Chaos) maybeDrop() error { + if c == nil { + return nil + } + + p := math.Float64frombits(c.dropRate.Load()) + if p <= 0 { + return nil + } + + if c.roll() < p { + c.drops.Add(1) + + return ErrChaosDrop + } + + return nil +} + +// maybeLatency sleeps for the configured latency with probability +// latencyRate. Returns immediately when chaos is disabled or the +// roll fails. +func (c *Chaos) maybeLatency() { + if c == nil { + return + } + + rate := math.Float64frombits(c.latencyRate.Load()) + if rate <= 0 { + return + } + + ns := c.latencyNs.Load() + if ns <= 0 { + return + } + + if c.roll() < rate { + c.latencies.Add(1) + time.Sleep(time.Duration(ns)) + } +} + +// chaosTransport wraps a DistTransport, applying configured +// faults before delegating each method to the inner transport. +// Constructed automatically by storeTransport when the DistMemory +// has a non-nil Chaos configured; user code doesn't construct it. +type chaosTransport struct { + inner DistTransport + chaos *Chaos +} + +// newChaosTransport wraps inner with the given chaos config. A +// nil chaos returns inner unchanged so callers don't have to +// branch on the disabled case. +func newChaosTransport(inner DistTransport, chaos *Chaos) DistTransport { + if chaos == nil || inner == nil { + return inner + } + + return &chaosTransport{inner: inner, chaos: chaos} +} + +// ForwardSet, ForwardGet, ForwardRemove apply chaos then delegate. +// Identical shape for every method — the boilerplate is the cost +// of wrapping each interface method individually rather than +// using reflection (which would slow the hot path). +func (t *chaosTransport) ForwardSet(ctx context.Context, nodeID string, item *cache.Item, replicate bool) error { + err := t.applyFault() + if err != nil { + return err + } + + return t.inner.ForwardSet(ctx, nodeID, item, replicate) +} + +func (t *chaosTransport) ForwardGet(ctx context.Context, nodeID, key string) (*cache.Item, bool, error) { + err := t.applyFault() + if err != nil { + return nil, false, err + } + + return t.inner.ForwardGet(ctx, nodeID, key) +} + +func (t *chaosTransport) ForwardRemove(ctx context.Context, nodeID, key string, replicate bool) error { + err := t.applyFault() + if err != nil { + return err + } + + return t.inner.ForwardRemove(ctx, nodeID, key, replicate) +} + +func (t *chaosTransport) Health(ctx context.Context, nodeID string) error { + err := t.applyFault() + if err != nil { + return err + } + + return t.inner.Health(ctx, nodeID) +} + +func (t *chaosTransport) IndirectHealth(ctx context.Context, relayNodeID, targetNodeID string) error { + err := t.applyFault() + if err != nil { + return err + } + + return t.inner.IndirectHealth(ctx, relayNodeID, targetNodeID) +} + +func (t *chaosTransport) Gossip(ctx context.Context, targetNodeID string, members []GossipMember) error { + err := t.applyFault() + if err != nil { + return err + } + + return t.inner.Gossip(ctx, targetNodeID, members) +} + +func (t *chaosTransport) FetchMerkle(ctx context.Context, nodeID string) (*MerkleTree, error) { + err := t.applyFault() + if err != nil { + return nil, err + } + + return t.inner.FetchMerkle(ctx, nodeID) +} + +// applyFault is the canonical pre-call hook: maybeDrop (return +// error to short-circuit) then maybeLatency (sleep). Every +// DistTransport method invokes this before delegating. Placed +// after the exported-style methods per the funcorder lint — +// unexported helpers come last in the receiver's method block. +func (t *chaosTransport) applyFault() error { + err := t.chaos.maybeDrop() + if err != nil { + return err + } + + t.chaos.maybeLatency() + + return nil +} + +// WithDistChaos enables chaos injection on the dist transport. +// Pass a *Chaos constructed via NewChaos; configure faults via +// the Chaos's SetDropRate / SetLatency methods. +// +// Option ordering does not matter: WithDistChaos records the +// reference, and storeTransport wraps the active transport +// whenever it's set (including the auto-wired HTTP transport). +// +// FOR TESTS ONLY. There's no safety check preventing this from +// being applied to a production DistMemory — pointing a real +// cluster at a Chaos with DropRate=1.0 will drop every dist +// transport call. Don't. +func WithDistChaos(c *Chaos) DistMemoryOption { + return func(dm *DistMemory) { + dm.chaos = c + // If a transport is already configured (option ordering: + // WithDistTransport before WithDistChaos), re-wrap it now + // so chaos is in effect immediately. + current := dm.loadTransport() + if current != nil { + if _, alreadyWrapped := current.(*chaosTransport); !alreadyWrapped { + dm.storeTransport(newChaosTransport(current, c)) + } + } + } +} diff --git a/pkg/backend/dist_chaos_test.go b/pkg/backend/dist_chaos_test.go new file mode 100644 index 0000000..0f2498f --- /dev/null +++ b/pkg/backend/dist_chaos_test.go @@ -0,0 +1,277 @@ +package backend + +import ( + "context" + "errors" + "sync" + "sync/atomic" + "testing" + "time" + + cache "github.com/hyp3rd/hypercache/pkg/cache/v2" +) + +// recordingTransport counts ForwardSet invocations so chaos tests +// can assert "the inner transport was reached" or "the inner +// transport was NOT reached" depending on whether the chaos drop +// fired. The other DistTransport methods are stubs — chaos +// applies the same shape to every method, so testing one verb is +// enough. +type recordingTransport struct { + calls atomic.Int64 +} + +func (r *recordingTransport) ForwardSet(_ context.Context, _ string, _ *cache.Item, _ bool) error { + r.calls.Add(1) + + return nil +} + +func (*recordingTransport) ForwardGet(_ context.Context, _, _ string) (*cache.Item, bool, error) { + return nil, false, nil +} + +func (*recordingTransport) ForwardRemove(_ context.Context, _, _ string, _ bool) error { + return nil +} + +func (*recordingTransport) Health(_ context.Context, _ string) error { return nil } +func (*recordingTransport) IndirectHealth(_ context.Context, _, _ string) error { + return nil +} + +func (*recordingTransport) Gossip(_ context.Context, _ string, _ []GossipMember) error { + return nil +} + +func (*recordingTransport) FetchMerkle(_ context.Context, _ string) (*MerkleTree, error) { + return nil, nil //nolint:nilnil // stub never invoked by chaos unit tests +} + +// TestChaos_DropRateOneAlwaysDrops pins that DropRate=1.0 short- +// circuits every transport call with ErrChaosDrop. The inner +// transport must NOT be invoked. +func TestChaos_DropRateOneAlwaysDrops(t *testing.T) { + t.Parallel() + + chaos := NewChaos() + chaos.SetDropRate(1.0) + + inner := &recordingTransport{} + wrapped := newChaosTransport(inner, chaos) + + const calls = 5 + + for range calls { + err := wrapped.ForwardSet(context.Background(), "peer", &cache.Item{Key: "k"}, false) + if !errors.Is(err, ErrChaosDrop) { + t.Fatalf("want ErrChaosDrop, got %v", err) + } + } + + if inner.calls.Load() != 0 { + t.Errorf("inner transport reached %d times; want 0", inner.calls.Load()) + } + + if chaos.Drops() != int64(calls) { + t.Errorf("Drops counter = %d, want %d", chaos.Drops(), calls) + } +} + +// TestChaos_DropRateZeroNeverDrops pins the inverse: with chaos +// configured but DropRate=0, every call passes through to the +// inner transport. The chaos overhead path is exercised; the +// drop branch never fires. +func TestChaos_DropRateZeroNeverDrops(t *testing.T) { + t.Parallel() + + chaos := NewChaos() + chaos.SetDropRate(0) + + inner := &recordingTransport{} + wrapped := newChaosTransport(inner, chaos) + + const calls = 50 + + for range calls { + err := wrapped.ForwardSet(context.Background(), "peer", &cache.Item{Key: "k"}, false) + if err != nil { + t.Fatalf("unexpected err with DropRate=0: %v", err) + } + } + + if inner.calls.Load() != int64(calls) { + t.Errorf("inner transport reached %d times; want %d", inner.calls.Load(), calls) + } + + if chaos.Drops() != 0 { + t.Errorf("Drops counter = %d, want 0", chaos.Drops()) + } +} + +// TestChaos_LatencyFiresAndDelaysCall confirms latency injection +// holds the call for the configured duration. The probabilistic +// rate is set to 1.0 so every call gets latency; we measure +// wall-clock and verify it's at least the configured floor. +func TestChaos_LatencyFiresAndDelaysCall(t *testing.T) { + t.Parallel() + + chaos := NewChaos() + chaos.SetLatency(20*time.Millisecond, 1.0) + + inner := &recordingTransport{} + wrapped := newChaosTransport(inner, chaos) + + start := time.Now() + + err := wrapped.ForwardSet(context.Background(), "peer", &cache.Item{Key: "k"}, false) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + + elapsed := time.Since(start) + // Allow a small floor below 20ms in case time.Sleep returns a + // hair early on some schedulers; the key assertion is "latency + // was clearly injected", not "exactly 20ms". + if elapsed < 15*time.Millisecond { + t.Errorf("elapsed = %v, want >= ~20ms (latency should have fired)", elapsed) + } + + if chaos.Latencies() != 1 { + t.Errorf("Latencies counter = %d, want 1", chaos.Latencies()) + } +} + +// TestChaos_NilChaosIsNoop pins the disabled path: passing a nil +// Chaos to newChaosTransport returns the inner transport unwrapped +// so callers that don't configure chaos pay zero overhead. +func TestChaos_NilChaosIsNoop(t *testing.T) { + t.Parallel() + + inner := &recordingTransport{} + + wrapped := newChaosTransport(inner, nil) + if wrapped != inner { + t.Fatalf("nil chaos should return inner unchanged; got distinct wrapper") + } +} + +// TestChaos_DisabledChaosLeavesCallsUntouched pins that a Chaos +// with zero DropRate AND zero LatencyRate is functionally a +// pass-through even when the wrapper is installed. +func TestChaos_DisabledChaosLeavesCallsUntouched(t *testing.T) { + t.Parallel() + + chaos := NewChaos() // all zero + inner := &recordingTransport{} + wrapped := newChaosTransport(inner, chaos) + + const calls = 100 + + for range calls { + _ = wrapped.ForwardSet(context.Background(), "peer", &cache.Item{Key: "k"}, false) + } + + if inner.calls.Load() != int64(calls) { + t.Errorf("inner transport reached %d times; want %d", inner.calls.Load(), calls) + } + + if chaos.Drops() != 0 || chaos.Latencies() != 0 { + t.Errorf("disabled chaos should not increment counters: drops=%d latencies=%d", + chaos.Drops(), chaos.Latencies()) + } +} + +// TestChaos_ConcurrentCallsAreRaceFree drives many goroutines +// through the chaos wrapper simultaneously with both faults +// configured. Run under `-race`; failure manifests as the race +// detector firing, not a content assertion. +func TestChaos_ConcurrentCallsAreRaceFree(t *testing.T) { + t.Parallel() + + chaos := NewChaos() + chaos.SetDropRate(0.3) + chaos.SetLatency(1*time.Millisecond, 0.3) + + inner := &recordingTransport{} + wrapped := newChaosTransport(inner, chaos) + + const ( + goroutines = 8 + callsPerGoroutine = 50 + ) + + var wg sync.WaitGroup + + for range goroutines { + wg.Go(func() { + for range callsPerGoroutine { + _ = wrapped.ForwardSet(context.Background(), "peer", &cache.Item{Key: "k"}, false) + } + }) + } + + wg.Wait() + + total := chaos.Drops() + inner.calls.Load() + want := int64(goroutines * callsPerGoroutine) + + if total != want { + t.Errorf("drops+inner_calls = %d, want %d (drops=%d, inner=%d)", + total, want, chaos.Drops(), inner.calls.Load()) + } +} + +// TestChaos_SetDropRateClampsRange pins the boundary check: values +// outside [0, 1] get clamped rather than silently producing +// undefined behavior. Tests that pass 1.5 by accident should get +// 100% drop, not garbage. +func TestChaos_SetDropRateClampsRange(t *testing.T) { + t.Parallel() + + chaos := NewChaos() + + chaos.SetDropRate(-0.5) + + // At drop=0 every call passes. + inner := &recordingTransport{} + wrapped := newChaosTransport(inner, chaos) + + _ = wrapped.ForwardSet(context.Background(), "p", &cache.Item{Key: "k"}, false) + + if chaos.Drops() != 0 { + t.Errorf("DropRate(-0.5) should clamp to 0; drops=%d", chaos.Drops()) + } + + chaos.SetDropRate(1.5) + + _ = wrapped.ForwardSet(context.Background(), "p", &cache.Item{Key: "k"}, false) + + if chaos.Drops() != 1 { + t.Errorf("DropRate(1.5) should clamp to 1.0 (always drop); drops=%d", chaos.Drops()) + } +} + +// TestChaos_NilReceiverIsSafe documents the nil-Chaos contract: +// methods called on a nil *Chaos must not panic. This matters +// because DistMemory.chaos may be nil when chaos isn't configured, +// and the Metrics() snapshot path calls Drops() / Latencies() +// unconditionally. +func TestChaos_NilReceiverIsSafe(t *testing.T) { + t.Parallel() + + var c *Chaos + + if got := c.Drops(); got != 0 { + t.Errorf("nil.Drops() = %d, want 0", got) + } + + if got := c.Latencies(); got != 0 { + t.Errorf("nil.Latencies() = %d, want 0", got) + } + + // Mutators on nil are silent no-ops; calling these would + // panic without the nil guard in SetDropRate / SetLatency. + c.SetDropRate(0.5) + c.SetLatency(time.Second, 1.0) +} diff --git a/pkg/backend/dist_memory.go b/pkg/backend/dist_memory.go index be1bf5b..b85f50c 100644 --- a/pkg/backend/dist_memory.go +++ b/pkg/backend/dist_memory.go @@ -77,6 +77,11 @@ type DistMemory struct { transport atomic.Pointer[distTransportSlot] httpServer *distHTTPServer // optional internal HTTP server metrics distMetrics + // chaos (optional) wraps the configured transport with fault injection + // for resilience testing. Set via WithDistChaos; nil in production. + // storeTransport applies the wrapper transparently when chaos is set, + // so the rest of the code path is unaware of the chaos surface. + chaos *Chaos // configuration (static for now, future: dynamic membership/gossip) replication int virtualNodes int @@ -1271,6 +1276,8 @@ type DistMetrics struct { LastAutoSyncError string AutoSyncCleanTicks int64 // cumulative ticks where every peer returned no divergence AutoSyncBackoffFactor int64 // current adaptive-backoff multiplier (1 when disabled or freshly reset) + ChaosDrops int64 // transport calls dropped by configured Chaos (test-only; zero in prod) + ChaosLatencies int64 // transport calls that had latency injected by Chaos (test-only) TombstonesActive int64 TombstonesPurged int64 WriteQuorumFailures int64 @@ -1342,6 +1349,8 @@ func (dm *DistMemory) Metrics() DistMetrics { LastAutoSyncError: lastErr, AutoSyncCleanTicks: dm.autoSyncCleanTicks.Load(), AutoSyncBackoffFactor: dm.autoSyncBackoffFactor.Load(), + ChaosDrops: dm.chaos.Drops(), + ChaosLatencies: dm.chaos.Latencies(), TombstonesActive: dm.metrics.tombstonesActive.Load(), TombstonesPurged: dm.metrics.tombstonesPurged.Load(), WriteQuorumFailures: dm.metrics.writeQuorumFailures.Load(), @@ -1719,7 +1728,20 @@ func (dm *DistMemory) loadTransport() DistTransport { // storeTransport replaces the active transport. Safe to call concurrently // with loadTransport. +// +// When a Chaos is configured via WithDistChaos, the supplied transport is +// transparently wrapped before being stored — every chaos roll happens +// inside the wrapper so callers (Forward*, Health, Gossip, FetchMerkle) +// don't have to know chaos exists. Double-wrapping is detected and +// short-circuited, so this is safe to call from option appliers in any +// order. func (dm *DistMemory) storeTransport(t DistTransport) { + if dm.chaos != nil && t != nil { + if _, already := t.(*chaosTransport); !already { + t = newChaosTransport(t, dm.chaos) + } + } + dm.transport.Store(&distTransportSlot{t: t}) } @@ -4022,6 +4044,16 @@ var distMetricSpecs = []distMetricSpec{ desc: "Current adaptive auto-sync backoff multiplier (1 when disabled or reset)", get: func(m DistMetrics) int64 { return m.AutoSyncBackoffFactor }, }, + { + name: "dist.chaos.drops", unit: unitEvent, counter: true, + desc: "Transport calls dropped by configured Chaos (test-only; zero in production)", + get: func(m DistMetrics) int64 { return m.ChaosDrops }, + }, + { + name: "dist.chaos.latencies", unit: unitEvent, counter: true, + desc: "Transport calls that had latency injected by Chaos (test-only; zero in production)", + get: func(m DistMetrics) int64 { return m.ChaosLatencies }, + }, // --- Tombstones --- { diff --git a/tests/integration/dist_chaos_test.go b/tests/integration/dist_chaos_test.go new file mode 100644 index 0000000..012d252 --- /dev/null +++ b/tests/integration/dist_chaos_test.go @@ -0,0 +1,178 @@ +package integration + +import ( + "context" + "testing" + "time" + + "github.com/hyp3rd/hypercache/pkg/backend" + cache "github.com/hyp3rd/hypercache/pkg/cache/v2" +) + +// TestDistChaos_DropsAbsorbedByHintQueue is the canonical +// resilience scenario the chaos hooks were built for: configure a +// non-trivial drop rate on the transport that handles inter-node +// replication, perform writes, and verify the hint queue captures +// the dropped writes so eventual consistency converges once chaos +// is disabled. +// +// Without the hint queue absorbing the drops, the cache would +// silently lose replicas under transient transport failures — +// exactly the failure mode the hint queue exists to prevent. +// This test would have caught the entire class of "what if a +// peer's down for 100ms?" bugs before they reached production. +func TestDistChaos_DropsAbsorbedByHintQueue(t *testing.T) { + t.Parallel() + + ctx := context.Background() + + addrA := allocatePort(t) + addrB := allocatePort(t) + chaos := backend.NewChaos() + + // ConsistencyOne so the primary write succeeds locally on every + // call — the replica fan-out is what we want to disrupt. With + // quorum we'd fail-fast on the first dropped write and never + // queue hints, defeating the test's purpose. + opts := []backend.DistMemoryOption{ + backend.WithDistReplication(2), + backend.WithDistVirtualNodes(16), + backend.WithDistChaos(chaos), + backend.WithDistWriteConsistency(backend.ConsistencyOne), + backend.WithDistReadConsistency(backend.ConsistencyOne), + backend.WithDistHintReplayInterval(50 * time.Millisecond), + backend.WithDistHintTTL(10 * time.Second), + } + + nodeA := mustDistNodeRaw(ctx, t, "A", addrA, []string{addrB}, opts...) + nodeB := mustDistNodeRaw(ctx, t, "B", addrB, []string{addrA}, opts...) + + defer func() { _ = nodeA.Stop(ctx); _ = nodeB.Stop(ctx) }() + + time.Sleep(100 * time.Millisecond) // drain startup gossip noise + + driveChaosPopulate(ctx, t, nodeA, chaos) + + // Heal: drops disabled, hint replay drains queued writes onto B. + chaos.SetDropRate(0) + waitForHintsDrained(t, nodeA) +} + +// driveChaosPopulate runs the populate-with-drops phase: enables +// 80% chaos drops, writes 200 keys via Set, and asserts both that +// drops fired and that hints were queued. Extracted from the +// parent test to keep that function under the cognitive-length +// cap. +func driveChaosPopulate(ctx context.Context, t *testing.T, nodeA *backend.DistMemory, chaos *backend.Chaos) { + t.Helper() + + chaos.SetDropRate(0.8) + + const populateN = 200 + + for i := range populateN { + it := &cache.Item{ + Key: cacheKey(i), Value: []byte("v"), + Version: 1, Origin: "A", LastUpdated: time.Now(), + } + + _ = nodeA.Set(ctx, it) // ConsistencyOne; primary write succeeds even when replica drop + } + + if chaos.Drops() == 0 { + t.Fatalf("expected chaos drops > 0 during 80%% drop phase; got 0") + } + + if nodeA.Metrics().HintedQueued == 0 { + t.Fatalf("expected hints to be queued during the chaos phase; got 0 (drops=%d)", chaos.Drops()) + } +} + +// waitForHintsDrained polls until at least one hint replays or the +// deadline elapses. The assertion is "hints drain once chaos is +// off"; the wall-clock budget is loose because background loops +// run on their own cadence. +func waitForHintsDrained(t *testing.T, nodeA *backend.DistMemory) { + t.Helper() + + deadline := time.Now().Add(5 * time.Second) + for time.Now().Before(deadline) { + if nodeA.Metrics().HintedReplayed > 0 { + return + } + + time.Sleep(50 * time.Millisecond) + } + + t.Errorf("hint replay did not drain after chaos disabled (HintedReplayed=0, HintedQueued=%d)", + nodeA.Metrics().HintedQueued) +} + +// mustDistNodeRaw is the chaos-test variant of mustDistNode that +// does NOT force ConsistencyQuorum — the chaos test needs +// ConsistencyOne so dropped replica fan-outs land in the hint +// queue instead of failing the parent Set call. +func mustDistNodeRaw( + ctx context.Context, + t *testing.T, + id, addr string, + seeds []string, + extra ...backend.DistMemoryOption, +) *backend.DistMemory { + t.Helper() + + opts := []backend.DistMemoryOption{ //nolint:prealloc // literal helper-builder list + backend.WithDistNode(id, addr), + backend.WithDistSeeds(seeds), + } + + opts = append(opts, extra...) + + bm, err := backend.NewDistMemory(ctx, opts...) + if err != nil { + t.Fatalf("new dist memory: %v", err) + } + + waitForDistNodeHealth(ctx, t, addr) + + bk, ok := bm.(*backend.DistMemory) + if !ok { + t.Fatalf("expected *backend.DistMemory, got %T", bm) + } + + return bk +} + +// TestDistChaos_DisabledByDefaultIsNoop pins the "chaos costs +// nothing in production" claim: spinning up a DistMemory without +// WithDistChaos should produce exactly the same behavior as a +// pre-chaos build. Metrics for chaos drops/latencies must stay +// at zero, and writes must succeed normally. +func TestDistChaos_DisabledByDefaultIsNoop(t *testing.T) { + t.Parallel() + + ctx := context.Background() + + addrA := allocatePort(t) + addrB := allocatePort(t) + + opts := []backend.DistMemoryOption{ + backend.WithDistReplication(2), + backend.WithDistVirtualNodes(16), + } + + nodeA := mustDistNode(ctx, t, "A", addrA, []string{addrB}, opts...) + nodeB := mustDistNode(ctx, t, "B", addrB, []string{addrA}, opts...) + + defer func() { _ = nodeA.Stop(ctx); _ = nodeB.Stop(ctx) }() + + populateKeys(ctx, t, nodeA, 50) + + if got := nodeA.Metrics().ChaosDrops; got != 0 { + t.Errorf("ChaosDrops with chaos disabled: got %d, want 0", got) + } + + if got := nodeA.Metrics().ChaosLatencies; got != 0 { + t.Errorf("ChaosLatencies with chaos disabled: got %d, want 0", got) + } +}