diff --git a/CHANGELOG.md b/CHANGELOG.md index aed8454..e5468d4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,27 @@ All notable changes to HyperCache are recorded here. The format follows ### Added +- **Migration-source observability for the hint queue.** Hints produced by rebalance migrations are now + tagged at queue time and tracked in a dedicated set of counters alongside the existing aggregate + metrics. Five new OTel metrics: `dist.migration.queued`, `dist.migration.replayed`, + `dist.migration.expired`, `dist.migration.dropped`, and `dist.migration.last_age_ns` (queue residency of + the most-recently-replayed migration hint — direct signal of new-primary reachability during rolling + deploys). Existing `dist.hinted.*` counters keep their meaning as the aggregate across both sources, so + operators can derive replication-only as `aggregate - migration`. Implementation reuses the proven hint + queue infrastructure (TTL, caps, replay, drop logic) — no second queue, no second drain loop. + Tests in [`pkg/backend/dist_migration_hint_test.go`](pkg/backend/dist_migration_hint_test.go) cover + source-tag preservation through queue→replay, per-source counter increments on every terminal path + (replay success, expired, transport drop, global-cap drop), and the not-found keep-in-queue path. +- **Adaptive Merkle anti-entropy scheduling.** New + [`backend.WithDistMerkleAdaptiveBackoff(maxFactor)`](pkg/backend/dist_memory.go) option lets the auto-sync + loop double its sleep interval after each tick that finds zero divergence across every peer, capped at + `maxFactor`. Any tick with at least one dirty peer snaps the factor back to 1× immediately — recovery is + never lazy. Disabled by default (factor=0 or 1) so existing deployments see no behavior change. Two new + OTel metrics expose the state: `dist.auto_sync.backoff_factor` (gauge) and `dist.auto_sync.clean_ticks` + (counter). Each factor change is logged once at Info (`merkle auto-sync backoff factor changed`) — no + per-tick log spam. Unit tests in + [`pkg/backend/dist_adaptive_backoff_test.go`](pkg/backend/dist_adaptive_backoff_test.go) cover the ramp, + the cap, the dirty-tick reset, and the disabled-by-default back-compat invariant. - **Structured logging for background loops and cluster lifecycle.** HyperCache gained a `WithLogger(*slog.Logger)` option ([config.go](config.go)) that wires a structured logger through the wrapper. Previously the eviction loop, expiration loop, and HyperCache lifecycle ran fully silent — diff --git a/go.mod b/go.mod index a7f751b..7b4832b 100644 --- a/go.mod +++ b/go.mod @@ -27,7 +27,7 @@ require ( github.com/go-logr/logr v1.4.3 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/gofiber/schema v1.7.1 // indirect - github.com/gofiber/utils/v2 v2.0.4 // indirect + github.com/gofiber/utils/v2 v2.0.5 // indirect github.com/google/uuid v1.6.0 // indirect github.com/klauspost/compress v1.18.6 // indirect github.com/mattn/go-colorable v0.1.14 // indirect diff --git a/go.sum b/go.sum index f16b509..dcfcbe1 100644 --- a/go.sum +++ b/go.sum @@ -10,8 +10,8 @@ github.com/coreos/go-oidc/v3 v3.18.0 h1:V9orjXynvu5wiC9SemFTWnG4F45v403aIcjWo0d4 github.com/coreos/go-oidc/v3 v3.18.0/go.mod h1:DYCf24+ncYi+XkIH97GY1+dqoRlbaSI26KVTCI9SrY4= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/fxamacker/cbor/v2 v2.9.1 h1:2rWm8B193Ll4VdjsJY28jxs70IdDsHRWgQYAI80+rMQ= -github.com/fxamacker/cbor/v2 v2.9.1/go.mod h1:vM4b+DJCtHn+zz7h3FFp/hDAI9WNWCsZj23V5ytsSxQ= +github.com/fxamacker/cbor/v2 v2.9.2 h1:X4Ksno9+x3cz0TZv69ec1hxP/+tymuR8PXQJyDwfh78= +github.com/fxamacker/cbor/v2 v2.9.2/go.mod h1:vM4b+DJCtHn+zz7h3FFp/hDAI9WNWCsZj23V5ytsSxQ= github.com/go-jose/go-jose/v4 v4.1.4 h1:moDMcTHmvE6Groj34emNPLs/qtYXRVcd6S7NHbHz3kA= github.com/go-jose/go-jose/v4 v4.1.4/go.mod h1:x4oUasVrzR7071A4TnHLGSPpNOm2a21K9Kf04k1rs08= github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= @@ -25,8 +25,8 @@ github.com/gofiber/fiber/v3 v3.2.0 h1:g9+09D320foINPpCnR3ibQ5oBEFHjAWRRfDG1te54u github.com/gofiber/fiber/v3 v3.2.0/go.mod h1:FHOsc2Db7HhHpsE62QAaJlXVV1pNkbZEptZ4jtti7m4= github.com/gofiber/schema v1.7.1 h1:oSJBKdgP8JeIME4TQSAqlNKTU2iBB+2RNmKi8Nsc+TI= github.com/gofiber/schema v1.7.1/go.mod h1:A/X5Ffyru4p9eBdp99qu+nzviHzQiZ7odLT+TwxWhbk= -github.com/gofiber/utils/v2 v2.0.4 h1:WwAxUA7L4MW2DjdEHF234lfqvBqd2vYYuBtA9TJq2ec= -github.com/gofiber/utils/v2 v2.0.4/go.mod h1:GGERKU3Vhj5z6hS8YKvxL99A54DjOvTFZ0cjZnG4Lj4= +github.com/gofiber/utils/v2 v2.0.5 h1:IMXoI2A5Dao/aMMBURTNxnhbtQO4kUwUFOgcwFSIjLU= +github.com/gofiber/utils/v2 v2.0.5/go.mod h1:FwwopfzwAQsoXLCHhOT24eH2jQfBgrrra9S5p0+luxg= github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= diff --git a/pkg/backend/dist_adaptive_backoff_test.go b/pkg/backend/dist_adaptive_backoff_test.go new file mode 100644 index 0000000..0adba71 --- /dev/null +++ b/pkg/backend/dist_adaptive_backoff_test.go @@ -0,0 +1,171 @@ +package backend + +import ( + "log/slog" + "testing" + "time" +) + +// newBackoffTestDM constructs the bare-minimum DistMemory the backoff +// logic needs: a logger (the real constructor always wires one; +// updateAutoSyncBackoff dereferences it on factor change). Real +// production paths go through NewDistMemory; this fixture exists only +// so we can exercise the pure scheduling logic without a transport. +func newBackoffTestDM(maxFactor int) *DistMemory { + return &DistMemory{ + logger: slog.New(slog.DiscardHandler), + autoSyncMaxBackoffFactor: maxFactor, + } +} + +// TestAdaptiveBackoff_DisabledIsNoop confirms the default behavior is +// unchanged: with no WithDistMerkleAdaptiveBackoff option set, neither +// the factor nor the clean-tick counter ever moves regardless of tick +// outcome, and nextAutoSyncDelay always returns the raw interval. This +// is the back-compat guarantee — existing deployments see no behavioral +// shift on upgrade. +func TestAdaptiveBackoff_DisabledIsNoop(t *testing.T) { + t.Parallel() + + dm := newBackoffTestDM(0) + + // Several clean ticks — counter and factor must stay flat. + for range 5 { + dm.updateAutoSyncBackoff(true) + } + + if got := dm.autoSyncBackoffFactor.Load(); got != 0 { + t.Errorf("backoff factor: want 0 (uninitialized, disabled), got %d", got) + } + + if got := dm.autoSyncCleanTicks.Load(); got != 0 { + t.Errorf("clean ticks: want 0 (disabled), got %d", got) + } + + if got := dm.nextAutoSyncDelay(30 * time.Second); got != 30*time.Second { + t.Errorf("nextAutoSyncDelay (disabled): want 30s, got %v", got) + } +} + +// TestAdaptiveBackoff_RampsAndCaps walks the factor through its +// doubling progression up to the configured maximum, verifies the cap +// holds across additional clean ticks, and confirms a single dirty +// tick snaps the factor back to 1. +func TestAdaptiveBackoff_RampsAndCaps(t *testing.T) { + t.Parallel() + + dm := newBackoffTestDM(8) + dm.autoSyncBackoffFactor.Store(1) // mirrors what the loop does at start + + // 1 -> 2 -> 4 -> 8 -> 8 (capped) -> 8 (still capped). + want := []int64{2, 4, 8, 8, 8} + for i, w := range want { + dm.updateAutoSyncBackoff(true) + + got := dm.autoSyncBackoffFactor.Load() + if got != w { + t.Errorf("clean tick %d: want factor %d, got %d", i+1, w, got) + } + } + + if got := dm.autoSyncCleanTicks.Load(); got != int64(len(want)) { + t.Errorf("clean-ticks counter: want %d, got %d", len(want), got) + } + + // Dirty tick: factor must immediately reset to 1, counter must not + // increment (clean-ticks only tracks clean ticks). + dm.updateAutoSyncBackoff(false) + + if got := dm.autoSyncBackoffFactor.Load(); got != 1 { + t.Errorf("after dirty tick: want factor 1 (reset), got %d", got) + } + + if got := dm.autoSyncCleanTicks.Load(); got != int64(len(want)) { + t.Errorf("clean-ticks counter must not bump on dirty tick: want %d, got %d", len(want), got) + } + + // Ramp begins again from 1. + dm.updateAutoSyncBackoff(true) + + if got := dm.autoSyncBackoffFactor.Load(); got != 2 { + t.Errorf("clean tick after reset: want factor 2, got %d", got) + } +} + +// TestAdaptiveBackoff_NextDelayMultiplies confirms the timer-driven +// scheduler multiplies the base interval by the current factor. This +// is the contract the autoSyncLoop relies on to actually sleep longer +// — without this, the metric would move but the loop would still wake +// every base interval. +func TestAdaptiveBackoff_NextDelayMultiplies(t *testing.T) { + t.Parallel() + + dm := newBackoffTestDM(16) + dm.autoSyncBackoffFactor.Store(4) + + base := 30 * time.Second + + got := dm.nextAutoSyncDelay(base) + if got != 4*base { + t.Errorf("nextAutoSyncDelay at factor 4: want %v, got %v", 4*base, got) + } + + // Factor < 1 (shouldn't happen under normal operation, but guard + // against a future zero-store race): clamp to 1. + dm.autoSyncBackoffFactor.Store(0) + + if got := dm.nextAutoSyncDelay(base); got != base { + t.Errorf("nextAutoSyncDelay with factor 0 should clamp to 1×: want %v, got %v", base, got) + } +} + +// TestAdaptiveBackoff_MaxFactorOneStaysDisabled pins the edge case +// where an operator passes maxFactor=1: that's semantically "disabled" +// (no multiplication), and we treat it as such. The option helper +// already normalizes sub-1 values, but a literal 1 is still valid +// configuration and must behave like the disabled default. +func TestAdaptiveBackoff_MaxFactorOneStaysDisabled(t *testing.T) { + t.Parallel() + + dm := newBackoffTestDM(1) + dm.autoSyncBackoffFactor.Store(1) + + for range 5 { + dm.updateAutoSyncBackoff(true) + } + + if got := dm.autoSyncBackoffFactor.Load(); got != 1 { + t.Errorf("maxFactor=1 must keep factor at 1, got %d", got) + } + + if got := dm.autoSyncCleanTicks.Load(); got != 0 { + t.Errorf("maxFactor=1 must not count clean ticks, got %d", got) + } +} + +// TestAdaptiveBackoff_OptionNormalizesNegatives covers the option +// helper itself: negative or zero values are coerced to 0 (the +// "disabled" sentinel), preventing accidental enablement with a +// surprising factor. +func TestAdaptiveBackoff_OptionNormalizesNegatives(t *testing.T) { + t.Parallel() + + cases := []struct { + in int + want int + }{ + {in: -5, want: 0}, + {in: 0, want: 0}, + {in: 1, want: 1}, + {in: 8, want: 8}, + } + + for _, tc := range cases { + dm := newBackoffTestDM(0) + WithDistMerkleAdaptiveBackoff(tc.in)(dm) + + if dm.autoSyncMaxBackoffFactor != tc.want { + t.Errorf("WithDistMerkleAdaptiveBackoff(%d): want %d, got %d", tc.in, tc.want, dm.autoSyncMaxBackoffFactor) + } + } +} diff --git a/pkg/backend/dist_memory.go b/pkg/backend/dist_memory.go index 52ff3b7..be1bf5b 100644 --- a/pkg/backend/dist_memory.go +++ b/pkg/backend/dist_memory.go @@ -146,6 +146,14 @@ type DistMemory struct { lastAutoSyncDuration atomic.Int64 // nanoseconds of last full loop lastAutoSyncError atomic.Value // error string or nil + // adaptive backoff: when enabled (autoSyncMaxBackoffFactor > 1), the + // auto-sync loop doubles its sleep after each tick that found zero + // divergence across all peers, capped at autoSyncMaxBackoffFactor. + // Any tick that pulls keys resets the factor to 1 immediately. + autoSyncMaxBackoffFactor int // 0/1 = disabled + autoSyncBackoffFactor atomic.Int64 // current multiplier; >=1 when adaptive enabled + autoSyncCleanTicks atomic.Int64 // cumulative clean ticks (whole-cycle, not per-peer) + // tombstone version source when no prior item exists (monotonic per process) tombVersionCounter atomic.Uint64 @@ -246,11 +254,30 @@ type distTransportSlot struct{ t DistTransport } // hintedEntry represents a deferred replica write. type hintedEntry struct { - item *cache.Item - expire time.Time - size int64 // approximate bytes for global cap accounting + item *cache.Item + queuedAt time.Time + expire time.Time + size int64 // approximate bytes for global cap accounting + source hintSource // why this hint exists (replication fan-out vs rebalance migration) } +// hintSource records why a hint landed in the queue. Used to split +// the aggregate hint counters (which sum across both sources) into a +// migration-only view for operators tracking ownership-transfer +// liveness during rebalance phases. Replication-only counts are derivable +// as (aggregate - migration); we don't expose a third metric for it. +type hintSource int8 + +const ( + // hintSourceReplication marks hints produced by a failed write fan-out + // to a peer replica. This is the historical default — every hint + // landed in this bucket before migration-source tagging existed. + hintSourceReplication hintSource = iota + // hintSourceMigration marks hints produced by a rebalance tick that + // could not deliver an item to its new primary owner. + hintSourceMigration +) + // tombstone marks a delete intent with version ordering to prevent resurrection. type tombstone struct { version uint64 @@ -358,6 +385,29 @@ func WithDistMerkleAutoSync(interval time.Duration) DistMemoryOption { } } +// WithDistMerkleAdaptiveBackoff enables adaptive scheduling for the Merkle +// anti-entropy loop. When maxFactor > 1, the loop doubles its sleep +// interval (1×, 2×, 4×, 8×, …) after each tick that found zero divergence +// across every peer, capped at maxFactor. Any tick with at least one +// dirty peer resets the factor to 1 on the next sleep — recovery is +// always immediate, never lazy. +// +// maxFactor <= 1 disables backoff (the default): the loop wakes at every +// `WithDistMerkleAutoSync` interval regardless of recent divergence. +// +// The current factor is exposed as `dist.auto_sync.backoff_factor` and +// the cumulative count of clean ticks as `dist.auto_sync.clean_ticks`. +// Each factor change is logged once at Info; no per-tick spam. +func WithDistMerkleAdaptiveBackoff(maxFactor int) DistMemoryOption { + return func(dm *DistMemory) { + if maxFactor < 1 { + maxFactor = 0 + } + + dm.autoSyncMaxBackoffFactor = maxFactor + } +} + // WithDistMerkleAutoSyncPeers limits number of peers synced per interval (0 or <0 = all). func WithDistMerkleAutoSyncPeers(n int) DistMemoryOption { return func(dm *DistMemory) { @@ -513,54 +563,14 @@ func equalBytes(a, b []byte) bool { // tiny helper } // SyncWith performs Merkle anti-entropy against a remote node (pull newer versions for differing chunks). +// +// Returns nil even when the local and remote trees agree (a "clean" sync). +// Callers that need to distinguish clean from dirty cycles — currently only +// the adaptive auto-sync backoff path — use syncWithStatus directly. func (dm *DistMemory) SyncWith(ctx context.Context, nodeID string) error { - transport := dm.loadTransport() - if transport == nil { - return errNoTransport - } - - startFetch := time.Now() - - remoteTree, err := transport.FetchMerkle(ctx, nodeID) - if err != nil { - dm.logger.Warn( - "merkle sync fetch failed", - slog.String("peer_id", nodeID), - slog.Any("err", err), - ) - - return err - } - - fetchDur := time.Since(startFetch) - dm.metrics.merkleFetchNanos.Store(fetchDur.Nanoseconds()) - - startBuild := time.Now() - localTree := dm.BuildMerkleTree() - buildDur := time.Since(startBuild) - dm.metrics.merkleBuildNanos.Store(buildDur.Nanoseconds()) - - entries := dm.sortedMerkleEntries() - startDiff := time.Now() - diffs := localTree.DiffLeafRanges(remoteTree) - diffDur := time.Since(startDiff) - dm.metrics.merkleDiffNanos.Store(diffDur.Nanoseconds()) - - missing := dm.resolveMissingKeys(ctx, nodeID, entries) - - dm.applyMerkleDiffs(ctx, nodeID, entries, diffs, localTree.ChunkSize) - - for k := range missing { // missing = remote-only keys - dm.fetchAndAdopt(ctx, nodeID, k) - } - - if len(diffs) == 0 && len(missing) == 0 { - return nil - } + _, err := dm.syncWithStatus(ctx, nodeID) - dm.metrics.merkleSyncs.Add(1) - - return nil + return err } // WithDistCapacity sets logical capacity (not strictly enforced yet). @@ -1188,12 +1198,17 @@ type distMetrics struct { versionConflicts atomic.Int64 // times a newer version (or tie-broken origin) replaced previous candidate versionTieBreaks atomic.Int64 // subset of conflicts decided by origin tie-break readPrimaryPromote atomic.Int64 // times read path skipped unreachable primary and promoted next owner - hintedQueued atomic.Int64 // hints queued + hintedQueued atomic.Int64 // hints queued (both sources) hintedReplayed atomic.Int64 // hints successfully replayed hintedExpired atomic.Int64 // hints expired before delivery hintedDropped atomic.Int64 // hints dropped due to non-not-found transport errors hintedGlobalDropped atomic.Int64 // hints dropped due to global caps (count/bytes) hintedBytes atomic.Int64 // approximate total bytes currently queued (best-effort) + migrationHintQueued atomic.Int64 // subset of hintedQueued: rebalance migration source only + migrationHintReplayed atomic.Int64 // subset of hintedReplayed: migration-source hints that drained + migrationHintExpired atomic.Int64 // subset of hintedExpired: migration-source hints aged out + migrationHintDropped atomic.Int64 // subset of hintedDropped + hintedGlobalDropped: migration-source hints that died + migrationHintLastAgeNanos atomic.Int64 // queue residency of the most-recently-replayed migration hint (ns) merkleSyncs atomic.Int64 // merkle sync operations completed merkleKeysPulled atomic.Int64 // keys applied during sync merkleBuildNanos atomic.Int64 // last build duration (ns) @@ -1241,6 +1256,11 @@ type DistMetrics struct { HintedDropped int64 HintedGlobalDropped int64 HintedBytes int64 + MigrationHintQueued int64 // subset of HintedQueued attributable to rebalance migrations + MigrationHintReplayed int64 // subset of HintedReplayed for migration hints + MigrationHintExpired int64 // subset of HintedExpired for migration hints + MigrationHintDropped int64 // subset of HintedDropped + HintedGlobalDropped for migration hints + MigrationHintLastAgeNanos int64 // queue residency of the most-recently-replayed migration hint (ns) MerkleSyncs int64 MerkleKeysPulled int64 MerkleBuildNanos int64 @@ -1249,6 +1269,8 @@ type DistMetrics struct { AutoSyncLoops int64 LastAutoSyncNanos int64 LastAutoSyncError string + AutoSyncCleanTicks int64 // cumulative ticks where every peer returned no divergence + AutoSyncBackoffFactor int64 // current adaptive-backoff multiplier (1 when disabled or freshly reset) TombstonesActive int64 TombstonesPurged int64 WriteQuorumFailures int64 @@ -1305,6 +1327,11 @@ func (dm *DistMemory) Metrics() DistMetrics { HintedDropped: dm.metrics.hintedDropped.Load(), HintedGlobalDropped: dm.metrics.hintedGlobalDropped.Load(), HintedBytes: dm.metrics.hintedBytes.Load(), + MigrationHintQueued: dm.metrics.migrationHintQueued.Load(), + MigrationHintReplayed: dm.metrics.migrationHintReplayed.Load(), + MigrationHintExpired: dm.metrics.migrationHintExpired.Load(), + MigrationHintDropped: dm.metrics.migrationHintDropped.Load(), + MigrationHintLastAgeNanos: dm.metrics.migrationHintLastAgeNanos.Load(), MerkleSyncs: dm.metrics.merkleSyncs.Load(), MerkleKeysPulled: dm.metrics.merkleKeysPulled.Load(), MerkleBuildNanos: dm.metrics.merkleBuildNanos.Load(), @@ -1313,6 +1340,8 @@ func (dm *DistMemory) Metrics() DistMetrics { AutoSyncLoops: dm.metrics.autoSyncLoops.Load(), LastAutoSyncNanos: dm.lastAutoSyncDuration.Load(), LastAutoSyncError: lastErr, + AutoSyncCleanTicks: dm.autoSyncCleanTicks.Load(), + AutoSyncBackoffFactor: dm.autoSyncBackoffFactor.Load(), TombstonesActive: dm.metrics.tombstonesActive.Load(), TombstonesPurged: dm.metrics.tombstonesPurged.Load(), WriteQuorumFailures: dm.metrics.writeQuorumFailures.Load(), @@ -2247,7 +2276,7 @@ func (dm *DistMemory) migrateIfNeeded(ctx context.Context, item *cache.Item) { slog.Any("err", migrationErr), ) - dm.queueHint(string(owners[0]), item) + dm.queueHint(string(owners[0]), item, hintSourceMigration) } // Update originalPrimary so we don't recount repeatedly. @@ -2728,7 +2757,7 @@ func (dm *DistMemory) replicateTo(ctx context.Context, item *cache.Item, replica // (timeout, 5xx, connection reset) silently dropped replicas. // The hint TTL bounds total retry time, so a target that's // permanently gone still drains rather than ballooning. - dm.queueHint(string(oid), item) + dm.queueHint(string(oid), item, hintSourceReplication) } return acks @@ -2803,7 +2832,7 @@ func (dm *DistMemory) getWithConsistencyParallel( } // --- Hinted handoff implementation ---. -func (dm *DistMemory) queueHint(nodeID string, item *cache.Item) { // reduced complexity +func (dm *DistMemory) queueHint(nodeID string, item *cache.Item, source hintSource) { // reduced complexity if dm.hintTTL <= 0 { return } @@ -2830,12 +2859,24 @@ func (dm *DistMemory) queueHint(nodeID string, item *cache.Item) { // reduced co dm.hintsMu.Unlock() dm.metrics.hintedGlobalDropped.Add(1) + if source == hintSourceMigration { + dm.metrics.migrationHintDropped.Add(1) + } + return } cloned := *item - queue = append(queue, hintedEntry{item: &cloned, expire: time.Now().Add(dm.hintTTL), size: size}) + now := time.Now() + + queue = append(queue, hintedEntry{ + item: &cloned, + queuedAt: now, + expire: now.Add(dm.hintTTL), + size: size, + source: source, + }) dm.hints[nodeID] = queue dm.adjustHintAccounting(1, size) @@ -2847,6 +2888,10 @@ func (dm *DistMemory) queueHint(nodeID string, item *cache.Item) { // reduced co dm.metrics.hintedQueued.Add(1) dm.metrics.hintedBytes.Store(bytesNow) + + if source == hintSourceMigration { + dm.metrics.migrationHintQueued.Add(1) + } } // approxHintSize estimates the size of a hinted item for global caps. @@ -2965,6 +3010,10 @@ func (dm *DistMemory) processHint(ctx context.Context, nodeID string, entry hint if now.After(entry.expire) { dm.metrics.hintedExpired.Add(1) + if entry.source == hintSourceMigration { + dm.metrics.migrationHintExpired.Add(1) + } + return 1 } @@ -2977,6 +3026,19 @@ func (dm *DistMemory) processHint(ctx context.Context, nodeID string, entry hint if err == nil { dm.metrics.hintedReplayed.Add(1) + if entry.source == hintSourceMigration { + dm.metrics.migrationHintReplayed.Add(1) + + // last_age_ns reflects queue residency of the most-recently + // replayed migration hint. Operators read this as "how long did + // the last migration retry wait for the new primary to come + // back?" — a direct signal of cross-node reachability during + // rolling deploys. + if !entry.queuedAt.IsZero() { + dm.metrics.migrationHintLastAgeNanos.Store(now.Sub(entry.queuedAt).Nanoseconds()) + } + } + return 1 } @@ -2986,6 +3048,10 @@ func (dm *DistMemory) processHint(ctx context.Context, nodeID string, entry hint dm.metrics.hintedDropped.Add(1) + if entry.source == hintSourceMigration { + dm.metrics.migrationHintDropped.Add(1) + } + dm.logger.Warn( "hint dropped after replay error", slog.String("peer_id", nodeID), @@ -3037,22 +3103,91 @@ func (dm *DistMemory) startAutoSyncIfEnabled(ctx context.Context) { ) } +// syncWithStatus mirrors SyncWith but additionally reports whether the cycle +// found no divergence (clean=true means: zero differing chunks, zero +// remote-only keys). Used by runAutoSyncTick to drive the adaptive backoff +// without changing the public SyncWith signature. +func (dm *DistMemory) syncWithStatus(ctx context.Context, nodeID string) (bool, error) { + transport := dm.loadTransport() + if transport == nil { + return false, errNoTransport + } + + startFetch := time.Now() + + remoteTree, err := transport.FetchMerkle(ctx, nodeID) + if err != nil { + dm.logger.Warn( + "merkle sync fetch failed", + slog.String("peer_id", nodeID), + slog.Any("err", err), + ) + + return false, err + } + + fetchDur := time.Since(startFetch) + dm.metrics.merkleFetchNanos.Store(fetchDur.Nanoseconds()) + + startBuild := time.Now() + localTree := dm.BuildMerkleTree() + buildDur := time.Since(startBuild) + dm.metrics.merkleBuildNanos.Store(buildDur.Nanoseconds()) + + entries := dm.sortedMerkleEntries() + startDiff := time.Now() + diffs := localTree.DiffLeafRanges(remoteTree) + diffDur := time.Since(startDiff) + dm.metrics.merkleDiffNanos.Store(diffDur.Nanoseconds()) + + missing := dm.resolveMissingKeys(ctx, nodeID, entries) + + dm.applyMerkleDiffs(ctx, nodeID, entries, diffs, localTree.ChunkSize) + + for k := range missing { // missing = remote-only keys + dm.fetchAndAdopt(ctx, nodeID, k) + } + + if len(diffs) == 0 && len(missing) == 0 { + return true, nil + } + + dm.metrics.merkleSyncs.Add(1) + + return false, nil +} + +// autoSyncLoop drives Merkle anti-entropy on a timer. With adaptive +// backoff disabled (the default) it behaves like a fixed-interval ticker. +// With backoff enabled it resets the timer between ticks using +// nextAutoSyncDelay, doubling on clean cycles and snapping back to 1× on +// dirty ones. func (dm *DistMemory) autoSyncLoop(ctx context.Context, interval time.Duration, stopCh <-chan struct{}) { - ticker := time.NewTicker(interval) - defer ticker.Stop() + if dm.autoSyncMaxBackoffFactor > 1 { + dm.autoSyncBackoffFactor.Store(1) + } + + timer := time.NewTimer(interval) + defer timer.Stop() for { select { case <-stopCh: return - case <-ticker.C: - dm.runAutoSyncTick(ctx) + case <-timer.C: + clean := dm.runAutoSyncTick(ctx) + dm.updateAutoSyncBackoff(clean) + timer.Reset(dm.nextAutoSyncDelay(interval)) } } } -// runAutoSyncTick performs one auto-sync cycle; separated for lower complexity. -func (dm *DistMemory) runAutoSyncTick(ctx context.Context) { +// runAutoSyncTick performs one auto-sync cycle. Returns true when every +// peer reported a clean (no-divergence) result and at least one peer was +// actually synced; false on any divergence, any sync error, or when no +// peers were eligible (caller treats "no peers" as not-clean to avoid +// backing off in single-node configurations). +func (dm *DistMemory) runAutoSyncTick(ctx context.Context) bool { start := time.Now() var lastErr error @@ -3060,6 +3195,7 @@ func (dm *DistMemory) runAutoSyncTick(ctx context.Context) { members := dm.membership.List() limit := dm.autoSyncPeersPerInterval synced := 0 + dirty := false for _, member := range members { if member == nil || string(member.ID) == dm.nodeID { // skip self @@ -3070,9 +3206,12 @@ func (dm *DistMemory) runAutoSyncTick(ctx context.Context) { break } - err := dm.SyncWith(ctx, string(member.ID)) - if err != nil { // capture last error only + clean, err := dm.syncWithStatus(ctx, string(member.ID)) + if err != nil { // capture last error only; treat as dirty so we don't back off through outages lastErr = err + dirty = true + } else if !clean { + dirty = true } synced++ @@ -3087,6 +3226,54 @@ func (dm *DistMemory) runAutoSyncTick(ctx context.Context) { } dm.metrics.autoSyncLoops.Add(1) + + return synced > 0 && !dirty +} + +// updateAutoSyncBackoff advances the backoff factor based on tick outcome. +// Clean tick: double up to the cap. Dirty tick: snap back to 1×. No-op +// when adaptive backoff is disabled (autoSyncMaxBackoffFactor <= 1). +func (dm *DistMemory) updateAutoSyncBackoff(clean bool) { + if dm.autoSyncMaxBackoffFactor <= 1 { + return + } + + prev := dm.autoSyncBackoffFactor.Load() + + var next int64 + + if clean { + dm.autoSyncCleanTicks.Add(1) + + next = min(prev*2, int64(dm.autoSyncMaxBackoffFactor)) + } else { + next = 1 + } + + if next == prev { + return + } + + dm.autoSyncBackoffFactor.Store(next) + dm.logger.Info( + "merkle auto-sync backoff factor changed", + slog.Int64("from", prev), + slog.Int64("to", next), + slog.Bool("clean", clean), + ) +} + +// nextAutoSyncDelay returns the sleep duration before the next auto-sync +// tick. When adaptive backoff is disabled this is just `interval`; when +// enabled it is `interval * currentFactor`. +func (dm *DistMemory) nextAutoSyncDelay(interval time.Duration) time.Duration { + if dm.autoSyncMaxBackoffFactor <= 1 { + return interval + } + + factor := max(dm.autoSyncBackoffFactor.Load(), 1) + + return interval * time.Duration(factor) } func (dm *DistMemory) gossipLoop(stopCh <-chan struct{}) { @@ -3763,6 +3950,31 @@ var distMetricSpecs = []distMetricSpec{ desc: "Approximate total bytes currently queued in hints", get: func(m DistMetrics) int64 { return m.HintedBytes }, }, + { + name: "dist.migration.queued", unit: unitHint, counter: true, + desc: "Migration-source hints queued (subset of dist.hinted.queued from rebalance ticks)", + get: func(m DistMetrics) int64 { return m.MigrationHintQueued }, + }, + { + name: "dist.migration.replayed", unit: unitHint, counter: true, + desc: "Migration-source hints successfully delivered on replay", + get: func(m DistMetrics) int64 { return m.MigrationHintReplayed }, + }, + { + name: "dist.migration.expired", unit: unitHint, counter: true, + desc: "Migration-source hints that aged past hint TTL before delivery", + get: func(m DistMetrics) int64 { return m.MigrationHintExpired }, + }, + { + name: "dist.migration.dropped", unit: unitHint, counter: true, + desc: "Migration-source hints discarded by replay error or queue caps", + get: func(m DistMetrics) int64 { return m.MigrationHintDropped }, + }, + { + name: "dist.migration.last_age_ns", unit: unitNanos, counter: false, + desc: "Queue residency of the most-recently-replayed migration hint (ns)", + get: func(m DistMetrics) int64 { return m.MigrationHintLastAgeNanos }, + }, // --- Anti-entropy (Merkle) --- { @@ -3800,6 +4012,16 @@ var distMetricSpecs = []distMetricSpec{ desc: "Duration of last auto-sync loop", get: func(m DistMetrics) int64 { return m.LastAutoSyncNanos }, }, + { + name: "dist.auto_sync.clean_ticks", unit: unitTick, counter: true, + desc: "Auto-sync ticks where every peer returned zero divergence (drives adaptive backoff)", + get: func(m DistMetrics) int64 { return m.AutoSyncCleanTicks }, + }, + { + name: "dist.auto_sync.backoff_factor", unit: unitTick, counter: false, + desc: "Current adaptive auto-sync backoff multiplier (1 when disabled or reset)", + get: func(m DistMetrics) int64 { return m.AutoSyncBackoffFactor }, + }, // --- Tombstones --- { diff --git a/pkg/backend/dist_migration_hint_test.go b/pkg/backend/dist_migration_hint_test.go new file mode 100644 index 0000000..a636985 --- /dev/null +++ b/pkg/backend/dist_migration_hint_test.go @@ -0,0 +1,309 @@ +package backend + +import ( + "context" + "errors" + "log/slog" + "sync/atomic" + "testing" + "time" + + "github.com/hyp3rd/hypercache/internal/cluster" + "github.com/hyp3rd/hypercache/internal/sentinel" + cache "github.com/hyp3rd/hypercache/pkg/cache/v2" +) + +// Static sentinels for the scriptedTransport — err113 forbids defining +// dynamic errors with errors.New inside test bodies. +var ( + errScriptedNotUsed = errors.New("scriptedTransport: method not exercised by this test") + errScriptedSimulate = errors.New("scriptedTransport: simulated transport error") +) + +const migrationHintTestOrigin = "test-A" + +// scriptedTransport is a DistTransport stub that returns whatever the +// test sets on its forwardSetErr field. Keeps the test focused on +// hint-source bookkeeping without involving the real HTTP or +// in-process transport paths. +type scriptedTransport struct { + forwardSetErr atomic.Value // error or nil + forwardSetCalls atomic.Int64 +} + +func (s *scriptedTransport) ForwardSet(_ context.Context, _ string, _ *cache.Item, _ bool) error { + s.forwardSetCalls.Add(1) + + if v := s.forwardSetErr.Load(); v != nil { + if err, ok := v.(error); ok { + return err + } + } + + return nil +} + +func (*scriptedTransport) ForwardGet(_ context.Context, _, _ string) (*cache.Item, bool, error) { + return nil, false, nil +} + +func (*scriptedTransport) ForwardRemove(_ context.Context, _, _ string, _ bool) error { + return nil +} + +func (*scriptedTransport) Health(_ context.Context, _ string) error { + return nil +} + +func (*scriptedTransport) IndirectHealth(_ context.Context, _, _ string) error { + return nil +} + +func (*scriptedTransport) Gossip(_ context.Context, _ string, _ []GossipMember) error { + return nil +} + +func (*scriptedTransport) FetchMerkle(_ context.Context, _ string) (*MerkleTree, error) { + return nil, errScriptedNotUsed +} + +// newMigrationHintTestDM builds a DistMemory with just enough wiring to +// drive queueHint + processHint directly. We bypass NewDistMemory to +// avoid spinning a real cluster — the goal is to exercise the +// source-aware counters in isolation. Hint TTL is set to one minute +// (large enough that no test cares about expiry except the one that +// explicitly constructs an entry whose expire is in the past). +func newMigrationHintTestDM(t *testing.T) (*DistMemory, *scriptedTransport) { + t.Helper() + + transport := &scriptedTransport{} + dm := &DistMemory{ + logger: slog.New(slog.DiscardHandler), + hintTTL: time.Minute, + localNode: cluster.NewNode(migrationHintTestOrigin, "127.0.0.1:0"), + } + dm.storeTransport(transport) + + return dm, transport +} + +// TestMigrationHint_QueueTagsSource verifies that queueHint tags each +// entry with the source it was queued from and bumps the matching +// per-source counter alongside the aggregate. Without this contract, +// every downstream replay-time accounting decision (clean / dirty / +// migration-only metrics) would fall apart. +func TestMigrationHint_QueueTagsSource(t *testing.T) { + t.Parallel() + + dm, _ := newMigrationHintTestDM(t) + item := &cache.Item{Key: "k1", Value: []byte("v"), Version: 1, Origin: migrationHintTestOrigin} + + dm.queueHint("peer-B", item, hintSourceMigration) + dm.queueHint("peer-B", item, hintSourceReplication) + + if got := dm.metrics.hintedQueued.Load(); got != 2 { + t.Errorf("aggregate hintedQueued: want 2, got %d", got) + } + + if got := dm.metrics.migrationHintQueued.Load(); got != 1 { + t.Errorf("migrationHintQueued: want 1, got %d", got) + } + + dm.hintsMu.Lock() + + queue := dm.hints["peer-B"] + dm.hintsMu.Unlock() + + if len(queue) != 2 { + t.Fatalf("queue length: want 2, got %d", len(queue)) + } + + if queue[0].source != hintSourceMigration { + t.Errorf("queue[0].source: want hintSourceMigration, got %d", queue[0].source) + } + + if queue[1].source != hintSourceReplication { + t.Errorf("queue[1].source: want hintSourceReplication, got %d", queue[1].source) + } +} + +// TestMigrationHint_ReplaySuccessUpdatesPerSourceCounters drives +// processHint manually through its happy path and confirms migration +// hints get tallied separately, replication hints do not bump the +// migration counter, and last_age_ns moves to a non-zero value. +func TestMigrationHint_ReplaySuccessUpdatesPerSourceCounters(t *testing.T) { + t.Parallel() + + dm, _ := newMigrationHintTestDM(t) + queuedAt := time.Now().Add(-3 * time.Second) // 3s residency + item := &cache.Item{Key: "k1", Value: []byte("v"), Version: 1, Origin: migrationHintTestOrigin} + + migEntry := hintedEntry{ + item: item, + queuedAt: queuedAt, + expire: queuedAt.Add(time.Minute), + source: hintSourceMigration, + } + + repEntry := hintedEntry{ + item: item, + queuedAt: queuedAt, + expire: queuedAt.Add(time.Minute), + source: hintSourceReplication, + } + + now := time.Now() + + if action := dm.processHint(context.Background(), "peer-B", migEntry, now); action != 1 { + t.Errorf("migration replay: want action=1 (remove), got %d", action) + } + + if action := dm.processHint(context.Background(), "peer-B", repEntry, now); action != 1 { + t.Errorf("replication replay: want action=1 (remove), got %d", action) + } + + if got := dm.metrics.hintedReplayed.Load(); got != 2 { + t.Errorf("aggregate hintedReplayed: want 2, got %d", got) + } + + if got := dm.metrics.migrationHintReplayed.Load(); got != 1 { + t.Errorf("migrationHintReplayed: want 1 (migration only), got %d", got) + } + + ageNs := dm.metrics.migrationHintLastAgeNanos.Load() + if ageNs <= 0 { + t.Errorf("migrationHintLastAgeNanos: want > 0 after migration replay, got %d", ageNs) + } + + // The recorded age should be roughly 3 seconds (the queuedAt offset). + // Allow generous slack to absorb test-machine scheduling jitter. + if ageNs < int64(2*time.Second) || ageNs > int64(10*time.Second) { + t.Errorf("migrationHintLastAgeNanos: want ~3s residency, got %v", time.Duration(ageNs)) + } +} + +// TestMigrationHint_ExpiredBumpsPerSourceCounter exercises the expired +// branch of processHint with a hint whose expire is in the past, and +// verifies the migration-source expired counter ticks. +func TestMigrationHint_ExpiredBumpsPerSourceCounter(t *testing.T) { + t.Parallel() + + dm, _ := newMigrationHintTestDM(t) + now := time.Now() + item := &cache.Item{Key: "k1", Value: []byte("v"), Version: 1, Origin: migrationHintTestOrigin} + + migEntry := hintedEntry{ + item: item, + queuedAt: now.Add(-2 * time.Hour), + expire: now.Add(-time.Hour), + source: hintSourceMigration, + } + + if action := dm.processHint(context.Background(), "peer-B", migEntry, now); action != 1 { + t.Errorf("expired migration: want action=1 (remove), got %d", action) + } + + if got := dm.metrics.hintedExpired.Load(); got != 1 { + t.Errorf("aggregate hintedExpired: want 1, got %d", got) + } + + if got := dm.metrics.migrationHintExpired.Load(); got != 1 { + t.Errorf("migrationHintExpired: want 1, got %d", got) + } +} + +// TestMigrationHint_TransportErrorBumpsDroppedCounter pins the drop +// path: when the transport returns a non-NotFound error (auth failure, +// 5xx, parse error), the hint is removed and the per-source counter +// bumps. ErrBackendNotFound stays "keep" — that path is exercised in +// the next test. +func TestMigrationHint_TransportErrorBumpsDroppedCounter(t *testing.T) { + t.Parallel() + + dm, transport := newMigrationHintTestDM(t) + transport.forwardSetErr.Store(errScriptedSimulate) + + now := time.Now() + item := &cache.Item{Key: "k1", Value: []byte("v"), Version: 1, Origin: migrationHintTestOrigin} + + migEntry := hintedEntry{ + item: item, + queuedAt: now.Add(-time.Second), + expire: now.Add(time.Minute), + source: hintSourceMigration, + } + + if action := dm.processHint(context.Background(), "peer-B", migEntry, now); action != 1 { + t.Errorf("transport error: want action=1 (remove), got %d", action) + } + + if got := dm.metrics.hintedDropped.Load(); got != 1 { + t.Errorf("aggregate hintedDropped: want 1, got %d", got) + } + + if got := dm.metrics.migrationHintDropped.Load(); got != 1 { + t.Errorf("migrationHintDropped: want 1, got %d", got) + } +} + +// TestMigrationHint_NotFoundKeepsEntry confirms the not-found path +// (peer still absent — typically backend restarting) keeps the entry +// in the queue rather than dropping. Migration counters must NOT +// increment on this path; the hint will be retried on a later tick. +func TestMigrationHint_NotFoundKeepsEntry(t *testing.T) { + t.Parallel() + + dm, transport := newMigrationHintTestDM(t) + transport.forwardSetErr.Store(sentinel.ErrBackendNotFound) + + now := time.Now() + item := &cache.Item{Key: "k1", Value: []byte("v"), Version: 1, Origin: migrationHintTestOrigin} + + migEntry := hintedEntry{ + item: item, + queuedAt: now.Add(-time.Second), + expire: now.Add(time.Minute), + source: hintSourceMigration, + } + + if action := dm.processHint(context.Background(), "peer-B", migEntry, now); action != 0 { + t.Errorf("not-found: want action=0 (keep), got %d", action) + } + + if got := dm.metrics.migrationHintDropped.Load(); got != 0 { + t.Errorf("migrationHintDropped on keep: want 0, got %d", got) + } + + if got := dm.metrics.migrationHintReplayed.Load(); got != 0 { + t.Errorf("migrationHintReplayed on keep: want 0, got %d", got) + } +} + +// TestMigrationHint_GlobalCapDropTagsSource confirms that even the +// global-cap rejection path (queue refused because hintMaxTotal / +// hintMaxBytes was already at the cap) routes through the +// migration-aware drop counter. +func TestMigrationHint_GlobalCapDropTagsSource(t *testing.T) { + t.Parallel() + + dm, _ := newMigrationHintTestDM(t) + + // Force the cap so the next queue attempt trips the global drop branch. + dm.hintMaxTotal = 1 + dm.hintTotal = 1 + + item := &cache.Item{Key: "k1", Value: []byte("v"), Version: 1, Origin: migrationHintTestOrigin} + dm.queueHint("peer-B", item, hintSourceMigration) + + if got := dm.metrics.hintedGlobalDropped.Load(); got != 1 { + t.Errorf("aggregate hintedGlobalDropped: want 1, got %d", got) + } + + if got := dm.metrics.migrationHintDropped.Load(); got != 1 { + t.Errorf("migrationHintDropped on global-cap drop: want 1, got %d", got) + } + + if got := dm.metrics.hintedQueued.Load(); got != 0 { + t.Errorf("hintedQueued must not increment on drop: got %d", got) + } +} diff --git a/scripts/tests/30-test-cluster-writes.sh b/scripts/tests/30-test-cluster-writes.sh new file mode 100755 index 0000000..3507980 --- /dev/null +++ b/scripts/tests/30-test-cluster-writes.sh @@ -0,0 +1,152 @@ +#!/usr/bin/env bash + +set -euo pipefail + +readonly TOKEN="${HYPERCACHE_TOKEN:-dev-token}" +readonly COMPOSE_FILE="${COMPOSE_FILE:-docker-compose.cluster.yml}" +readonly SURVIVING_PORTS="${SURVIVING_PORTS:-8081 8082 8084 8085}" +readonly KEY_COUNT="${KEY_COUNT:-50}" + +fail_count=0 + +log_fail() { + if [[ -t 1 ]]; then + printf '\033[31mFAIL\033[0m %s\n' "$1" + else + printf 'FAIL %s\n' "$1" + fi + + fail_count=$((fail_count + 1)) +} + +log_ok() { + if [[ -t 1 ]]; then + printf '\033[32m OK \033[0m %s\n' "$1" + else + printf ' OK %s\n' "$1" + fi +} + +# put_batch writes KEY_COUNT keys with the given prefix to the +# given port. Each PUT is asserted to return 200; any non-200 +# bumps the failure count but the loop continues so we can see +# the full picture. +put_batch() { + local port="$1" + local prefix="$2" + + local fails=0 + + for i in $(seq 1 "$KEY_COUNT"); do + status=$(curl -sS -o /dev/null -w '%{http_code}' \ + -H "Authorization: Bearer $TOKEN" \ + -X PUT --data "value-$i" \ + "http://localhost:$port/v1/cache/${prefix}-${i}" || echo "000") + + if [[ "$status" != "200" ]]; then + fails=$((fails + 1)) + fi + done + + if [[ "$fails" -gt 0 ]]; then + log_fail "PUT ${prefix}-* on :$port: ${fails}/${KEY_COUNT} writes failed" + return 1 + fi + + log_ok "PUT ${prefix}-* on :$port: all ${KEY_COUNT} writes succeeded" + return 0 +} + +# verify_batch_visible asserts that GET /v1/cache/-N on the +# given port succeeds for every N in 1..KEY_COUNT. Used to confirm +# the cluster routes correctly to surviving owners while one node +# is down. +verify_batch_visible() { + local port="$1" + local prefix="$2" + + local missing=0 + + for i in $(seq 1 "$KEY_COUNT"); do + status=$(curl -sS -o /dev/null -w '%{http_code}' \ + -H "Authorization: Bearer $TOKEN" \ + "http://localhost:$port/v1/cache/${prefix}-${i}" || echo "000") + + if [[ "$status" != "200" ]]; then + missing=$((missing + 1)) + fi + done + + if [[ "$missing" -gt 0 ]]; then + log_fail "GET ${prefix}-* on :$port: ${missing}/${KEY_COUNT} keys missing" + return 1 + fi + + log_ok "GET ${prefix}-* on :$port: all ${KEY_COUNT} keys visible" + return 0 +} + +# count_visible returns the number of keys (0..KEY_COUNT) currently +# visible on the given port — used by the recovery polling loop. +count_visible() { + local port="$1" + local prefix="$2" + + local found=0 + + for i in $(seq 1 "$KEY_COUNT"); do + status=$(curl -sS -o /dev/null -w '%{http_code}' \ + -H "Authorization: Bearer $TOKEN" \ + "http://localhost:$port/v1/cache/${prefix}-${i}" || echo "000") + + if [[ "$status" == "200" ]]; then + found=$((found + 1)) + fi + done + + echo "$found" +} + +echo "=== Phase 1: seed batch on :8081, verify cluster-wide ===" +put_batch 8081 "first" || true + +sleep 1 + +# Spot-check pre-batch on every surviving port + the to-be-killed +# port. They should all see all 50 keys. +for port in 8081 8082 8083 8084 8085; do + verify_batch_visible "$port" "first" || true +done + +echo "" +echo "=== Phase 2: write second batch on :8081 ===" +# Some of these keys' primary or replicas will be the down node; +# the writes succeed by quorum on the surviving 4 nodes, with +# hints queued for the down node's replicas (Phase B.2 contract). +put_batch 8081 "second" || true + +sleep 1 + +echo "" +echo "=== Phase 3: nodes serve every key (first + second) ===" +for port in $SURVIVING_PORTS; do + verify_batch_visible "$port" "first" || true + verify_batch_visible "$port" "second" || true +done + +echo "" +if [[ "$fail_count" -gt 0 ]]; then + if [[ -t 1 ]]; then + printf '\033[31m=== %d assertion(s) failed ===\033[0m\n' "$fail_count" + else + printf '=== %d assertion(s) failed ===\n' "$fail_count" + fi + + exit 1 +fi + +if [[ -t 1 ]]; then + printf '\033[32m=== write test passed ===\033[0m\n' +else + printf '=== write test passed ===\n' +fi