From e6176454be81cdc63032dafe799ef1e8de5f4e7c Mon Sep 17 00:00:00 2001 From: "F." Date: Wed, 13 May 2026 21:05:48 +0200 Subject: [PATCH] feat(cluster): add cluster-wide key browser (GET /v1/cache/keys) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Introduce a new operator-debug endpoint that fans out across every alive peer, deduplicates replicas, sorts, and returns a paged key set. Pattern matching supports two modes via a classifier in buildKeyMatcher: patterns without glob metacharacters use strings.HasPrefix (prefix mode); patterns containing `*`, `?`, or `[` use path.Match (glob mode). Invalid globs are rejected at construction and surface as 400 BAD_REQUEST. Hard caps bound worst-case memory and response size: - `max` (default 10000, ceiling 50000): deduplicated result set cap - `limit` (default 100, ceiling 500): page size; cursor is offset-based Per-peer fan-out failures are best-effort — failed peer IDs land in `partial_nodes`, consistent with read-repair/hint-replay contracts. Returns 501 when the backend is not DistMemory. Route registered before /v1/cache/:key to prevent Fiber's trie router from shadowing it. Core changes: - pkg/backend/dist_keys.go: ListKeys fan-out via listKeysAccumulator (mutex-guarded dedup map) + localMatchingKeys for self-peer shard scan - DistTransport interface extended: ListKeys(ctx, nodeID, pattern string) - InProcessTransport, DistHTTPTransport, and chaosTransport implementations - /internal/keys extended with optional `q` param (backward compatible) - collectShardKeys accepts a matcher; non-matching keys skip the limit - HyperCache.ClusterKeys added as the public entry point Tests: 12-case unit table for buildKeyMatcher, HTTP smoke tests for paged walk and 400 surfaces, five integration tests covering cluster-wide dedup at RF=3 (50 seeds → 50 keys, not 150), prefix/glob filters, and max-cap truncation. OpenAPI spec and drift-detector updated. --- CHANGELOG.md | 33 +++ cmd/hypercache-server/handlers_test.go | 137 +++++++++ cmd/hypercache-server/main.go | 153 ++++++++++ cmd/hypercache-server/openapi.yaml | 112 ++++++++ cmd/hypercache-server/openapi_test.go | 1 + cspell.config.yaml | 6 + hypercache_dist.go | 29 ++ pkg/backend/dist_chaos.go | 9 + pkg/backend/dist_chaos_test.go | 4 + pkg/backend/dist_http_server.go | 30 +- pkg/backend/dist_http_transport.go | 32 ++- pkg/backend/dist_keys.go | 281 +++++++++++++++++++ pkg/backend/dist_keys_test.go | 75 +++++ pkg/backend/dist_memory.go | 2 +- pkg/backend/dist_migration_hint_test.go | 4 + pkg/backend/dist_read_repair_test.go | 4 + pkg/backend/dist_transport.go | 27 ++ tests/dist_http_limits_test.go | 2 +- tests/hypercache_distmemory_listkeys_test.go | 186 ++++++++++++ 19 files changed, 1111 insertions(+), 16 deletions(-) create mode 100644 pkg/backend/dist_keys.go create mode 100644 pkg/backend/dist_keys_test.go create mode 100644 tests/hypercache_distmemory_listkeys_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 4768132..b5899a5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,39 @@ All notable changes to HyperCache are recorded here. The format follows ### Added +- **Cluster-wide key browser (`GET /v1/cache/keys`).** New v1 client-API endpoint that fans out across + every alive peer, dedupes replicas, sorts, and returns a paged slice — designed for the operator-debug + workflow of "browse / refine a search" rather than as a primary data-access path. The `q` parameter + switches between two modes via a small classifier: + patterns containing any of `*`, `?`, `[` go through Go's `path.Match` (platform-agnostic glob — + `filepath.Match`'s OS-specific separator semantics are wrong for arbitrary string keys); + everything else is treated as a literal prefix via `strings.HasPrefix`. Two hard caps bound the + worst case: `max` (default 10000, ceiling 50000) for the full deduplicated result set held in memory, + and `limit` (default 100, ceiling 500) for the page size — `cursor` paging is offset-based against the + sorted set so successive pages are stable across requests. Per-peer fan-out failures are best-effort: + the failed peer ID lands in `partial_nodes` rather than failing the whole call, mirroring the + read-repair and hint-replay contracts elsewhere in the cluster. Returns 501 when the underlying + backend isn't `DistMemory` (this endpoint requires a cluster). The new method + [`(*DistMemory).ListKeys`](pkg/backend/dist_keys.go) drives the fan-out via `errgroup` with a + `listKeysAccumulator` merge struct keyed by a single mutex; the self-peer slice walks local shards + directly (no HTTP self-hop). The `DistTransport` interface grows a new method + `ListKeys(ctx, nodeID, pattern)` with implementations in `InProcessTransport` (direct shard scan), + `DistHTTPTransport` (extends the existing `/internal/keys` path with an optional `q` query param — + backward compatible; cursor semantics unchanged), and `chaosTransport` (pass-through with the same + drop/latency injection hooks as the other verbs). Unit tests in + [`pkg/backend/dist_keys_test.go`](pkg/backend/dist_keys_test.go) pin the + prefix-vs-glob classifier across twelve table cases and the malformed-glob → `path.ErrBadPattern` + surface; HTTP smoke tests in [`cmd/hypercache-server/handlers_test.go`](cmd/hypercache-server/handlers_test.go) + drive seed → paged walk → assert union and no cross-page duplicates, plus 400 surfaces for invalid + cursor and malformed glob; five integration tests in + [`tests/hypercache_distmemory_listkeys_test.go`](tests/hypercache_distmemory_listkeys_test.go) + cover cluster-wide dedup at RF=3 across 5 nodes (50 unique seeds → 50 keys, not 150 = 50 × RF=3), + prefix vs glob filters, and `max`-triggered truncation. Route registration order matters in Fiber's + trie router: `/v1/cache/keys` must come before `/v1/cache/:key`, otherwise the parameterized handler + shadows it with `key="keys"`. OpenAPI spec entry (`ListKeysResponse` schema + operation) added to + [`cmd/hypercache-server/openapi.yaml`](cmd/hypercache-server/openapi.yaml); the drift-detector test + in [`cmd/hypercache-server/openapi_test.go`](cmd/hypercache-server/openapi_test.go) catches future + spec / route mismatches. - **Async read-repair batching (Phase 4) + unconditional `ForwardSet`-only repair.** Two composing changes in the same PR that together cut the wire-call cost of read-repair under quorum reads. (1) The defensive `ForwardGet` probe in `repairRemoteReplica` is gone — every repair is now exactly one `ForwardSet`, diff --git a/cmd/hypercache-server/handlers_test.go b/cmd/hypercache-server/handlers_test.go index 0184da1..16ff0ea 100644 --- a/cmd/hypercache-server/handlers_test.go +++ b/cmd/hypercache-server/handlers_test.go @@ -6,6 +6,8 @@ import ( "io" "net/http" "net/http/httptest" + "net/url" + "strconv" "strings" "testing" @@ -47,6 +49,10 @@ func newTestServer(t *testing.T) *fiber.App { app := fiber.New() nodeCtx := &nodeContext{hc: hc, nodeID: "test-node"} + // Match production ordering: literal /v1/cache/keys before the + // parameterized /v1/cache/:key so the router picks handleListKeys + // for the literal path. + app.Get("/v1/cache/keys", func(c fiber.Ctx) error { return handleListKeys(c, nodeCtx) }) app.Get("/v1/cache/:key", func(c fiber.Ctx) error { return handleGet(c, nodeCtx) }) app.Head("/v1/cache/:key", func(c fiber.Ctx) error { return handleHead(c, nodeCtx) }) app.Put("/v1/cache/:key", func(c fiber.Ctx) error { return handlePut(c, nodeCtx) }) @@ -352,3 +358,134 @@ func TestHandleBatchDelete_BasicFlow(t *testing.T) { t.Fatalf("batch-get post-delete should report found:false; got %s", got.body) } } + +// seedListKeysFixture seeds `count` keys prefixed `first-NN` plus a +// `second-1` decoy via PUT. Returns the test app so the caller can +// drive the list-keys endpoint against the populated cache. Kept +// here rather than in newTestServer so the test bodies stay +// declarative. +func seedListKeysFixture(t *testing.T, count int) *fiber.App { + t.Helper() + + app := newTestServer(t) + + for i := range count { + n := strconv.Itoa(i + 1) + key := "first-" + strings.Repeat("0", 2-len(n)) + n + + put := doRequest(t, app, http.MethodPut, "/v1/cache/"+key, "v", nil) + if put.status != http.StatusOK { + t.Fatalf("seed put %s: %d", key, put.status) + } + } + + put := doRequest(t, app, http.MethodPut, "/v1/cache/second-1", "v", nil) + if put.status != http.StatusOK { + t.Fatalf("seed second put: %d", put.status) + } + + return app +} + +// fetchListKeysPage drives one /v1/cache/keys request and decodes +// the response, failing the test on transport or parse errors. +// Extracted so the test body can focus on the cursor walk. +func fetchListKeysPage(t *testing.T, app *fiber.App, cursor string) listKeysResponse { + t.Helper() + + target := "/v1/cache/keys?q=first-&limit=10" + if cursor != "" { + target += "&cursor=" + cursor + } + + got := doRequest(t, app, http.MethodGet, target, "", nil) + if got.status != http.StatusOK { + t.Fatalf("cursor=%q: status %d body=%s", cursor, got.status, got.body) + } + + var resp listKeysResponse + + err := json.Unmarshal([]byte(got.body), &resp) + if err != nil { + t.Fatalf("cursor=%q decode: %v", cursor, err) + } + + return resp +} + +// TestHandleListKeys_PrefixAndPaging drives the v1 list-keys +// endpoint end-to-end: seed via PUT, filter by prefix, walk the +// cursor across multiple pages, assert the union matches the seed +// set and no key appears twice. +func TestHandleListKeys_PrefixAndPaging(t *testing.T) { + t.Parallel() + + const seedCount = 25 + + app := seedListKeysFixture(t, seedCount) + + collected := make(map[string]struct{}, seedCount) + cursor := "" + + for range 10 { + resp := fetchListKeysPage(t, app, cursor) + + if resp.TotalMatched != seedCount { + t.Fatalf("total_matched=%d, want %d", resp.TotalMatched, seedCount) + } + + for _, k := range resp.Keys { + if !strings.HasPrefix(k, "first-") { + t.Fatalf("non-prefix key in result: %s", k) + } + + if _, dup := collected[k]; dup { + t.Fatalf("duplicate key across pages: %s", k) + } + + collected[k] = struct{}{} + } + + if resp.NextCursor == "" { + break + } + + cursor = resp.NextCursor + } + + if len(collected) != seedCount { + t.Fatalf("collected %d keys across pages, want %d", len(collected), seedCount) + } +} + +// TestHandleListKeys_InvalidCursor pins that a malformed cursor +// surfaces 400, not 500 — the cursor field is operator-controlled +// and must be validated at the boundary. +func TestHandleListKeys_InvalidCursor(t *testing.T) { + t.Parallel() + + app := newTestServer(t) + + got := doRequest(t, app, http.MethodGet, "/v1/cache/keys?cursor=not-a-number", "", nil) + if got.status != http.StatusBadRequest { + t.Fatalf("expected 400 for malformed cursor, got %d body=%s", got.status, got.body) + } +} + +// TestHandleListKeys_InvalidGlob surfaces malformed glob patterns +// as 400, matching the same validate-at-boundary contract as +// cursor. +func TestHandleListKeys_InvalidGlob(t *testing.T) { + t.Parallel() + + app := newTestServer(t) + + // "[unclosed" is a malformed character class — URL-encoded so + // the literal `[` is preserved through fiber's query parser. + target := "/v1/cache/keys?q=" + url.QueryEscape("[unclosed") + + got := doRequest(t, app, http.MethodGet, target, "", nil) + if got.status != http.StatusBadRequest { + t.Fatalf("expected 400 for malformed glob, got %d body=%s", got.status, got.body) + } +} diff --git a/cmd/hypercache-server/main.go b/cmd/hypercache-server/main.go index 6801699..57a3d91 100644 --- a/cmd/hypercache-server/main.go +++ b/cmd/hypercache-server/main.go @@ -459,6 +459,12 @@ func registerClientRoutes(app *fiber.App, policy httpauth.Policy, nodeCtx *nodeC return c.Send(openapiSpec) }) + // /v1/cache/keys must be registered BEFORE the parameterized + // /v1/cache/:key — Fiber matches in registration order and the + // literal-path route would otherwise be shadowed by the + // param-bound handler (handleGet would be invoked with + // `key="keys"` and return 404). + app.Get("/v1/cache/keys", read, func(c fiber.Ctx) error { return handleListKeys(c, nodeCtx) }) app.Put("/v1/cache/:key", write, func(c fiber.Ctx) error { return handlePut(c, nodeCtx) }) app.Get("/v1/cache/:key", read, func(c fiber.Ctx) error { return handleGet(c, nodeCtx) }) app.Head("/v1/cache/:key", read, func(c fiber.Ctx) error { return handleHead(c, nodeCtx) }) @@ -646,6 +652,33 @@ type ownersResponse struct { Node string `json:"node"` } +// listKeysResponse is the body of GET /v1/cache/keys — operator- +// facing key browser. `NextCursor` is empty on the last page; +// `TotalMatched` is the full deduplicated matched set (capped by +// `max`). `Truncated` reports that the cluster-wide cap was hit +// and the operator should refine the pattern. `PartialNodes` +// lists peers whose fan-out failed; their keys may be missing. +type listKeysResponse struct { + Keys []string `json:"keys"` + NextCursor string `json:"next_cursor"` + TotalMatched int `json:"total_matched"` + Truncated bool `json:"truncated"` + Node string `json:"node"` + PartialNodes []string `json:"partial_nodes,omitempty"` +} + +// list-keys query-parameter bounds. Defaults match the operator +// "browse / refine" workflow; the hard caps bound the worst-case +// memory and response size — operators needing a larger sweep +// script against the per-node /internal/keys path with their own +// paging instead of lifting these. +const ( + listKeysDefaultLimit = 100 + listKeysMaxLimit = 500 + listKeysDefaultMax = 10000 + listKeysHardMax = 50000 +) + // handlePut implements PUT /v1/cache/:key. // Body is the raw value (any content type). Optional ?ttl= // applies a relative expiration; empty/absent means no expiration. @@ -1289,6 +1322,126 @@ func handleOwners(c fiber.Ctx, nodeCtx *nodeContext) error { }) } +// listKeysParams is the parsed-and-validated form of the +// /v1/cache/keys query string. Returned as a struct so +// parseListKeysQuery stays under the function-result-limit and +// the call site reads fields by name rather than position. +type listKeysParams struct { + Pattern string + Cursor int + Limit int + MaxResults int +} + +// parseBoundedPositiveInt reads a query parameter as a positive int +// with a default fallback and a hard ceiling. Empty value → default. +// Out-of-range or non-numeric → caller-visible error (must surface +// as 400 BAD_REQUEST). +func parseBoundedPositiveInt(c fiber.Ctx, name string, def, hardMax int) (int, error) { + v := c.Query(name) + if v == "" { + return def, nil + } + + n, err := strconv.Atoi(v) + if err != nil || n <= 0 { + return 0, ewrap.New("invalid " + name + ": must be a positive integer") + } + + if n > hardMax { + n = hardMax + } + + return n, nil +} + +// parseListKeysQuery extracts and validates the query parameters +// for GET /v1/cache/keys. Defaults and hard caps are applied here +// so handleListKeys keeps a single response-shape concern. +func parseListKeysQuery(c fiber.Ctx) (listKeysParams, error) { + out := listKeysParams{Pattern: c.Query("q")} + + if cursorStr := c.Query("cursor"); cursorStr != "" { + n, err := strconv.Atoi(cursorStr) + if err != nil || n < 0 { + return listKeysParams{}, ewrap.New("invalid cursor: must be a non-negative integer") + } + + out.Cursor = n + } + + limit, err := parseBoundedPositiveInt(c, "limit", listKeysDefaultLimit, listKeysMaxLimit) + if err != nil { + return listKeysParams{}, err + } + + out.Limit = limit + + maxResults, err := parseBoundedPositiveInt(c, "max", listKeysDefaultMax, listKeysHardMax) + if err != nil { + return listKeysParams{}, err + } + + out.MaxResults = maxResults + + return out, nil +} + +// handleListKeys implements GET /v1/cache/keys — operator-facing +// cluster-wide key browser. Fans out across every alive peer, +// merges + dedupes + sorts the result, then slices the page via +// cursor/limit. The full deduplicated set is held in memory for +// one request (bounded by `max`); paging re-fans out — fine for +// the operator-debug workflow this endpoint serves. +// +// Returns 501 when the underlying backend isn't a DistMemory +// (in-memory / Redis): the surface only makes sense in cluster +// mode and surfacing that explicitly is friendlier than a +// silently empty page. +func handleListKeys(c fiber.Ctx, nodeCtx *nodeContext) error { + params, err := parseListKeysQuery(c) + if err != nil { + return jsonErr(c, fiber.StatusBadRequest, codeBadRequest, err.Error()) + } + + res, err := nodeCtx.hc.ClusterKeys(c.Context(), params.Pattern, params.MaxResults) + if err != nil { + return jsonErr(c, fiber.StatusBadRequest, codeBadRequest, err.Error()) + } + + if res == nil { + return jsonErr( + c, + fiber.StatusNotImplemented, + codeInternal, + "list-keys requires a distributed backend", + ) + } + + total := len(res.Keys) + + // Cursor past the end is a valid terminal state (last page + + // 1): respond with an empty page rather than 400. Mirrors how + // SQL OFFSET past the row count returns an empty result set. + start := min(params.Cursor, total) + end := min(start+params.Limit, total) + page := res.Keys[start:end] + + nextCursor := "" + if end < total { + nextCursor = strconv.Itoa(end) + } + + return c.JSON(listKeysResponse{ + Keys: page, + NextCursor: nextCursor, + TotalMatched: total, + Truncated: res.Truncated, + Node: nodeCtx.nodeID, + PartialNodes: res.PartialNodes, + }) +} + // meResponse is the body of GET /v1/me — the resolved caller identity // after auth middleware ran. Mirrors httpauth.Identity but written as // a wire type so the JSON tags are owned by the API surface, not the diff --git a/cmd/hypercache-server/openapi.yaml b/cmd/hypercache-server/openapi.yaml index f9d6dfb..a725e9c 100644 --- a/cmd/hypercache-server/openapi.yaml +++ b/cmd/hypercache-server/openapi.yaml @@ -248,6 +248,85 @@ paths: "400": { $ref: "#/components/responses/BadRequest" } "401": { $ref: "#/components/responses/Unauthorized" } + /v1/cache/keys: + get: + operationId: listCacheKeys + tags: [ cluster ] + summary: Cluster-wide key browser. + description: | + Enumerates cache keys across every alive peer, deduplicates + replicas, sorts, and returns a paged slice. Intended for + operator-debug workflows (refining a search, sampling the + cluster) — *not* a primary data-access path. + + The `q` filter follows two modes: + - **Prefix:** patterns with no glob metacharacters (`*`, + `?`, `[`) match via `strings.HasPrefix`. Example: `first-` + matches every key starting with `first-`. + - **Glob:** patterns containing any of `*`, `?`, `[` match + via Go's `path.Match`. Example: `first-*` matches every + key with that prefix; `first-?` matches `first-` plus one + character; `[ab]*` matches keys starting with `a` or `b`. + + Caps: + - `max` bounds the full deduplicated result set held in + memory (default 10000, hard 50000). Hitting it surfaces + `truncated: true`. + - `limit` bounds the page size (default 100, hard 500). + + Peer fan-out is best-effort: a failed peer is recorded in + `partial_nodes` rather than failing the whole call. + + Returns 501 when the underlying backend isn't distributed + (single-node DistMemory in test fixtures included — this + endpoint requires a cluster). + + Requires the `cache.read` scope. + parameters: + - in: query + name: q + required: false + schema: { type: string } + description: | + Optional pattern. Prefix when no glob metacharacter is + present; glob via path.Match otherwise. Empty means no + filter. + - in: query + name: cursor + required: false + schema: { type: integer, minimum: 0, default: 0 } + description: | + Offset into the deduplicated, sorted result set returned + by a previous page's `next_cursor`. Past-end cursors + return an empty page rather than an error. + - in: query + name: limit + required: false + schema: { type: integer, minimum: 1, maximum: 500, default: 100 } + description: Page size. Hard-capped at 500. + - in: query + name: max + required: false + schema: { type: integer, minimum: 1, maximum: 50000, default: 10000 } + description: | + Total deduplicated result-set cap. Hard-capped at 50000. + Reaching it surfaces `truncated: true`. + responses: + "200": + description: Page of cluster-wide matching keys. + content: + application/json: + schema: + $ref: "#/components/schemas/ListKeysResponse" + "400": { $ref: "#/components/responses/BadRequest" } + "401": { $ref: "#/components/responses/Unauthorized" } + "501": + description: Backend does not support cluster-wide enumeration. + content: + application/json: + schema: + $ref: "#/components/schemas/ErrorResponse" + /v1/me: get: operationId: getIdentity @@ -531,6 +610,39 @@ components: items: { type: string } node: { type: string } + ListKeysResponse: + type: object + required: [ keys, next_cursor, total_matched, truncated, node ] + properties: + keys: + type: array + description: Sorted, deduplicated page of matching keys. + items: { type: string } + next_cursor: + type: string + description: | + Cursor for the next page, or empty string when this was + the final page. + total_matched: + type: integer + description: | + Total matching keys across the cluster (capped by `max`). + Use this together with `limit` to compute total pages. + truncated: + type: boolean + description: | + True when the result set was capped at `max`. The + operator should refine the pattern. + node: + type: string + description: The node that handled the fan-out. + partial_nodes: + type: array + description: | + Peer IDs whose key fetch failed during fan-out. Their + keys may be missing from `keys`; the operator can retry. + items: { type: string } + IdentityResponse: type: object required: [ id, scopes, capabilities ] diff --git a/cmd/hypercache-server/openapi_test.go b/cmd/hypercache-server/openapi_test.go index 8949514..9ff6c7c 100644 --- a/cmd/hypercache-server/openapi_test.go +++ b/cmd/hypercache-server/openapi_test.go @@ -95,6 +95,7 @@ func declaredMethodsForPath() map[string]map[string]struct{} { "/healthz": {fiber.MethodGet: {}}, "/v1/openapi.yaml": {fiber.MethodGet: {}}, "/v1/cache/:key": {fiber.MethodPut: {}, fiber.MethodGet: {}, fiber.MethodHead: {}, fiber.MethodDelete: {}}, + "/v1/cache/keys": {fiber.MethodGet: {}}, "/v1/owners/:key": {fiber.MethodGet: {}}, "/v1/me": {fiber.MethodGet: {}}, "/v1/me/can": {fiber.MethodGet: {}}, diff --git a/cspell.config.yaml b/cspell.config.yaml index 8798a99..7cf76d8 100644 --- a/cspell.config.yaml +++ b/cspell.config.yaml @@ -83,6 +83,7 @@ words: - cyclop - daixiang - Decr + - dedup - dels - depguard - deprioritize @@ -120,6 +121,7 @@ words: - frontmatter - funcorder - funlen + - gctx - geomean - gerr - Getenv @@ -175,6 +177,7 @@ words: - lblll - LFUDA - linenums + - listkeys - localmodule - logrus - longbridgeapp @@ -183,6 +186,9 @@ words: - maxmemory - memprofile - Merkle + - metachar + - metacharacter + - metacharacters - metricdata - metricnoop - mfinal diff --git a/hypercache_dist.go b/hypercache_dist.go index e5deef9..0254904 100644 --- a/hypercache_dist.go +++ b/hypercache_dist.go @@ -53,6 +53,35 @@ func (hyperCache *HyperCache[T]) ClusterOwners(key string) []string { return nil } +// ClusterKeys enumerates keys across the whole cluster, optionally +// filtered by `pattern` (prefix when no glob metacharacters are +// present, glob via path.Match otherwise — see backend.ListKeys for +// the canonical semantics). `maxResults` caps the merged-and-deduped +// result set; 0 picks the backend default. Returns a passthrough +// pointer-typed result, or nil when the backend isn't a DistMemory +// (callers treat nil as "feature unsupported" and surface 501/404). +// +// Best-effort: peer fan-out failures populate `PartialNodes` rather +// than failing the whole call, mirroring read-repair / hint-replay +// semantics elsewhere. +func (hyperCache *HyperCache[T]) ClusterKeys( + ctx context.Context, + pattern string, + maxResults int, +) (*backend.ListKeysResult, error) { + dm, ok := any(hyperCache.backend).(*backend.DistMemory) + if !ok { + return nil, nil //nolint:nilnil // explicit "feature unsupported" sentinel for callers to detect + } + + res, err := dm.ListKeys(ctx, pattern, maxResults) + if err != nil { + return nil, err + } + + return &res, nil +} + // DistMembershipSnapshot returns a snapshot of membership if distributed backend; otherwise nil slice. // //nolint:nonamedreturns diff --git a/pkg/backend/dist_chaos.go b/pkg/backend/dist_chaos.go index 5dc2f86..892aaea 100644 --- a/pkg/backend/dist_chaos.go +++ b/pkg/backend/dist_chaos.go @@ -283,6 +283,15 @@ func (t *chaosTransport) Gossip(ctx context.Context, targetNodeID string, member return t.inner.Gossip(ctx, targetNodeID, members) } +func (t *chaosTransport) ListKeys(ctx context.Context, nodeID, pattern string) ([]string, error) { + err := t.applyFault() + if err != nil { + return nil, err + } + + return t.inner.ListKeys(ctx, nodeID, pattern) +} + func (t *chaosTransport) FetchMerkle(ctx context.Context, nodeID string) (*MerkleTree, error) { err := t.applyFault() if err != nil { diff --git a/pkg/backend/dist_chaos_test.go b/pkg/backend/dist_chaos_test.go index 0f2498f..262fad1 100644 --- a/pkg/backend/dist_chaos_test.go +++ b/pkg/backend/dist_chaos_test.go @@ -48,6 +48,10 @@ func (*recordingTransport) FetchMerkle(_ context.Context, _ string) (*MerkleTree return nil, nil //nolint:nilnil // stub never invoked by chaos unit tests } +func (*recordingTransport) ListKeys(_ context.Context, _, _ string) ([]string, error) { + return nil, nil +} + // TestChaos_DropRateOneAlwaysDrops pins that DropRate=1.0 short- // circuits every transport call with ErrChaosDrop. The inner // transport must NOT be invoked. diff --git a/pkg/backend/dist_http_server.go b/pkg/backend/dist_http_server.go index 365fc2f..35005a9 100644 --- a/pkg/backend/dist_http_server.go +++ b/pkg/backend/dist_http_server.go @@ -602,6 +602,16 @@ func handleKeys(fctx fiber.Ctx, dm *DistMemory) error { limit = parsed } + // Optional filter: empty `q` keeps the pre-existing + // return-everything semantic for callers that haven't been + // upgraded (notably the merkle anti-entropy path). A non-empty + // pattern containing glob metacharacters (* ? [) is matched via + // path.Match; anything else is treated as a literal prefix. + matcher, mErr := buildKeyMatcher(fctx.Query("q")) + if mErr != nil { + return fctx.Status(fiber.StatusBadRequest).JSON(fiber.Map{constants.ErrorLabel: mErr.Error()}) + } + if cursor >= len(dm.shards) { return fctx.JSON(fiber.Map{"keys": []string{}, "next_cursor": ""}) } @@ -611,7 +621,7 @@ func handleKeys(fctx fiber.Ctx, dm *DistMemory) error { return fctx.JSON(fiber.Map{"keys": []string{}, "next_cursor": strconv.Itoa(cursor + 1)}) } - keys, truncated := collectShardKeys(shard, limit) + keys, truncated := collectShardKeys(shard, limit, matcher) nextCursor := "" @@ -637,15 +647,25 @@ func handleKeys(fctx fiber.Ctx, dm *DistMemory) error { }) } -// collectShardKeys reads up to `limit` keys from shard. limit<=0 -// returns the full shard. The truncated bool reports whether the -// shard had more keys than `limit` allowed. -func collectShardKeys(shard *distShard, limit int) ([]string, bool) { +// collectShardKeys reads up to `limit` keys from shard that match +// `matcher`. limit<=0 returns every matching key. The truncated +// bool reports whether the shard had more matching keys than +// `limit` allowed. +// +// Matcher is applied during iteration so non-matching keys never +// count against the limit — a `limit=10, q="first-*"` request +// returns the first 10 keys that match the prefix/glob, not +// "the first 10 keys, of which some happen to match.". +func collectShardKeys(shard *distShard, limit int, matcher func(string) bool) ([]string, bool) { out := make([]string, 0, shard.items.Count()) truncated := false for k := range shard.items.All() { + if !matcher(k) { + continue + } + if limit > 0 && len(out) >= limit { truncated = true diff --git a/pkg/backend/dist_http_transport.go b/pkg/backend/dist_http_transport.go index 32c4c5a..bc1c1e8 100644 --- a/pkg/backend/dist_http_transport.go +++ b/pkg/backend/dist_http_transport.go @@ -463,11 +463,17 @@ func (t *DistHTTPTransport) FetchMerkle(ctx context.Context, nodeID string) (*Me return &MerkleTree{Root: body.Root, LeafHashes: body.LeafHashes, ChunkSize: body.ChunkSize}, nil } -// ListKeys returns all keys from a remote node (expensive; used for -// tests / anti-entropy fallback). Walks the cursor-paginated -// `/internal/keys` endpoint introduced in Phase C.2 — callers -// continue to receive the full key set unchanged. -func (t *DistHTTPTransport) ListKeys(ctx context.Context, nodeID string) ([]string, error) { +// ListKeys returns keys held on a remote node, optionally filtered +// by `pattern` (forwarded server-side as the `q` query parameter on +// `/internal/keys`). An empty pattern returns the full key set +// (expensive; used for tests / anti-entropy fallback). Patterns +// containing glob metacharacters (* ? [) are matched server-side +// via path.Match; non-glob patterns are treated as prefix. +// +// Walks the cursor-paginated `/internal/keys` endpoint to assemble +// the full result. Callers that need to bound the result-set size +// should layer their own cap on top. +func (t *DistHTTPTransport) ListKeys(ctx context.Context, nodeID, pattern string) ([]string, error) { var ( all []string cursor string @@ -476,7 +482,7 @@ func (t *DistHTTPTransport) ListKeys(ctx context.Context, nodeID string) ([]stri const safetyLimit = 1024 // upper bound to prevent infinite loop on a buggy server for range safetyLimit { - page, err := t.listKeysPage(ctx, nodeID, cursor) + page, err := t.listKeysPage(ctx, nodeID, cursor, pattern) if err != nil { return nil, err } @@ -504,11 +510,19 @@ func (t *DistHTTPTransport) ListKeys(ctx context.Context, nodeID string) ([]stri // listKeysPage is the per-page fetch for ListKeys; broken out so the // pagination loop above stays readable. -func (t *DistHTTPTransport) listKeysPage(ctx context.Context, nodeID, cursor string) (keysPageResp, error) { - var query url.Values +func (t *DistHTTPTransport) listKeysPage(ctx context.Context, nodeID, cursor, pattern string) (keysPageResp, error) { + query := url.Values{} if cursor != "" { - query = url.Values{"cursor": []string{cursor}} + query.Set("cursor", cursor) + } + + if pattern != "" { + query.Set("q", pattern) + } + + if len(query) == 0 { + query = nil } hreq, err := t.newNodeRequest(ctx, http.MethodGet, nodeID, "/internal/keys", query, nil) diff --git a/pkg/backend/dist_keys.go b/pkg/backend/dist_keys.go new file mode 100644 index 0000000..1653c25 --- /dev/null +++ b/pkg/backend/dist_keys.go @@ -0,0 +1,281 @@ +package backend + +import ( + "context" + "log/slog" + "path" + "slices" + "strings" + "sync" + + "github.com/hyp3rd/ewrap" + "golang.org/x/sync/errgroup" + + "github.com/hyp3rd/hypercache/internal/cluster" +) + +// keyGlobMetacharacters is the set of bytes that, when present in a +// pattern, switch the matcher from prefix-mode to glob-mode. Chosen +// to match path.Match's syntax: '*' (any sequence), '?' (single +// character), '[' (character class). +const keyGlobMetacharacters = "*?[" + +// localKeysInitialCap is the starting capacity for the per-node +// matching-keys slice. Picked to cover the common operator-debug +// shape (filter hits a few dozen keys per shard) while avoiding a +// large up-front allocation when the matcher rejects most of the +// shard. +const localKeysInitialCap = 64 + +// defaultListKeysMax is the fallback cap applied when neither the +// caller nor the WithDistListKeysMax option specifies one. Picked +// to comfortably cover the typical operator-debug "list everything +// matching this prefix" workload (5-node × tens of thousands of +// keys per node) while still bounding worst-case memory. +const defaultListKeysMax = 10000 + +// buildKeyMatcher returns a predicate that decides whether a given +// key matches `pattern`. Three modes: +// +// - empty pattern: matches every key (no filter). +// - pattern contains glob metacharacters: full glob match via +// path.Match. Validation is done up-front by feeding a probe +// through path.Match; an invalid pattern (e.g. unmatched `[`) +// surfaces as ErrBadPattern to the caller instead of every +// subsequent match failing silently. +// - otherwise: treated as a literal prefix via strings.HasPrefix. +// +// Glob syntax is matched with `path.Match`, not `filepath.Match`: +// arbitrary string keys don't have OS-specific path-separator +// semantics, so the platform-agnostic version is what we want. +func buildKeyMatcher(pattern string) (func(string) bool, error) { + if pattern == "" { + return func(string) bool { return true }, nil + } + + if !strings.ContainsAny(pattern, keyGlobMetacharacters) { + return func(k string) bool { return strings.HasPrefix(k, pattern) }, nil + } + + // Validate the glob up-front. path.Match returns + // path.ErrBadPattern for malformed inputs and a value-dependent + // error otherwise — running it against a fixed sentinel surfaces + // the structural error without taking a position on whether + // real keys would actually match. + _, err := path.Match(pattern, "") + if err != nil { + return nil, ewrap.Wrap(err, "list-keys: invalid glob pattern") + } + + return func(k string) bool { + ok, mErr := path.Match(pattern, k) + // Pattern was already validated above; runtime mismatch + // errors here would be impossible. Treat as no-match to + // stay best-effort. + return mErr == nil && ok + }, nil +} + +// ListKeysResult bundles a cluster-wide key enumeration with the +// best-effort accounting the caller needs to communicate partial +// results to the operator. `Keys` is sorted and deduplicated across +// owners. `Truncated` is true when the merged set hit `max` and we +// stopped pulling further pages. `PartialNodes` lists peers whose +// fan-out call failed — their keys may be missing from `Keys`. +type ListKeysResult struct { + Keys []string + Truncated bool + PartialNodes []string +} + +// listKeysAccumulator is the cross-goroutine merge state for a +// single ListKeys fan-out call. Held by reference so each peer +// goroutine can lock the mutex and contribute its slice without +// closing over a stack-local map. +type listKeysAccumulator struct { + mu sync.Mutex + dedup map[string]struct{} + maxKeys int + partial []string + truncated bool +} + +// tryAdd merges `keys` into the dedup set under the lock. Stops at +// maxKeys and flips truncated. Idempotent — a key already in the +// set doesn't change accounting. +func (a *listKeysAccumulator) tryAdd(keys []string) { + a.mu.Lock() + defer a.mu.Unlock() + + for _, k := range keys { + if len(a.dedup) >= a.maxKeys { + a.truncated = true + + return + } + + a.dedup[k] = struct{}{} + } +} + +// markPartial records a peer whose fan-out call failed. The peer's +// keys may be missing from the merged result. +func (a *listKeysAccumulator) markPartial(peer cluster.NodeID) { + a.mu.Lock() + defer a.mu.Unlock() + + a.partial = append(a.partial, string(peer)) +} + +// result projects the accumulator into the public ListKeysResult. +// Sorts deterministically so paging across requests is stable. +func (a *listKeysAccumulator) result() ListKeysResult { + keys := make([]string, 0, len(a.dedup)) + for k := range a.dedup { + keys = append(keys, k) + } + + slices.Sort(keys) + + return ListKeysResult{ + Keys: keys, + Truncated: a.truncated, + PartialNodes: a.partial, + } +} + +// ListKeys enumerates keys across every alive peer (including this +// node), deduplicates by string identity, sorts, and returns up to +// `max` results. `pattern` follows the same prefix/glob rules as +// `buildKeyMatcher`. A `max` of 0 falls back to defaultListKeysMax; +// the hard ceiling lives in the v1 handler that calls this method. +// +// Per-peer failures (transport error, unreachable, etc.) don't +// fail the whole call — best-effort matches the read-repair and +// hint-replay contracts elsewhere in the cluster. Failed peers are +// returned in PartialNodes so the caller can surface a banner to +// the operator. +func (dm *DistMemory) ListKeys(ctx context.Context, pattern string, maxResults int) (ListKeysResult, error) { + if maxResults <= 0 { + maxResults = defaultListKeysMax + } + + matcher, err := buildKeyMatcher(pattern) + if err != nil { + return ListKeysResult{}, err + } + + acc := &listKeysAccumulator{ + dedup: make(map[string]struct{}, maxResults), + maxKeys: maxResults, + } + + peers := dm.alivePeerIDs() + transport := dm.loadTransport() + + g, gctx := errgroup.WithContext(ctx) + + for _, peer := range peers { + g.Go(func() error { + dm.collectPeerKeys(gctx, peer, pattern, matcher, transport, acc) + + return nil + }) + } + + _ = g.Wait() // every goroutine swallows its error; Wait can't fail. + + return acc.result(), nil +} + +// collectPeerKeys fetches one peer's matching keys and merges them +// into `acc`. Three branches: +// - self: walk local shards directly (no transport hop). +// - transport torn down: mark partial. +// - peer hop: best-effort fetch; failure → mark partial + log. +func (dm *DistMemory) collectPeerKeys( + ctx context.Context, + peer cluster.NodeID, + pattern string, + matcher func(string) bool, + transport DistTransport, + acc *listKeysAccumulator, +) { + if peer == dm.localNode.ID { + acc.tryAdd(dm.localMatchingKeys(matcher)) + + return + } + + if transport == nil { // cluster torn down mid-call + acc.markPartial(peer) + + return + } + + keys, err := transport.ListKeys(ctx, string(peer), pattern) + if err != nil { + if dm.logger != nil { + dm.logger.Debug( + "list-keys: peer fan-out failed", + slog.String("peer", string(peer)), + slog.Any("err", err), + ) + } + + acc.markPartial(peer) + + return + } + + acc.tryAdd(keys) +} + +// alivePeerIDs returns the membership snapshot's alive nodes +// (including self). Suspect/Dead nodes are excluded — their key +// sets aren't fresh enough to be worth the per-request HTTP cost. +// Falls back to a self-only list when membership isn't wired up +// (single-process / test scenarios). +func (dm *DistMemory) alivePeerIDs() []cluster.NodeID { + if dm.membership == nil { + if dm.localNode != nil { + return []cluster.NodeID{dm.localNode.ID} + } + + return nil + } + + nodes := dm.membership.List() + out := make([]cluster.NodeID, 0, len(nodes)) + + for _, n := range nodes { + if n == nil || n.State != cluster.NodeAlive { + continue + } + + out = append(out, n.ID) + } + + return out +} + +// localMatchingKeys walks the local shards once, returning keys that +// satisfy the matcher. Used by ListKeys for the self-peer slice so +// we don't pay an HTTP roundtrip to talk to ourselves. +func (dm *DistMemory) localMatchingKeys(matcher func(string) bool) []string { + out := make([]string, 0, localKeysInitialCap) + + for _, sh := range dm.shards { + if sh == nil { + continue + } + + for k := range sh.items.All() { + if matcher(k) { + out = append(out, k) + } + } + } + + return out +} diff --git a/pkg/backend/dist_keys_test.go b/pkg/backend/dist_keys_test.go new file mode 100644 index 0000000..958908b --- /dev/null +++ b/pkg/backend/dist_keys_test.go @@ -0,0 +1,75 @@ +package backend + +import ( + "errors" + "path" + "testing" +) + +const firstPrefix = "first-" + +// TestBuildKeyMatcher_Modes covers the three classifier branches — +// empty (match-all), prefix (no metacharacters), and glob (any of +// `*?[`). Anchors the prefix-vs-glob switch so adding a new metachar +// here later requires an intentional test update. +func TestBuildKeyMatcher_Modes(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + pattern string + key string + want bool + }{ + // Empty: every key matches. + {"empty matches anything", "", "anything", true}, + {"empty matches empty key", "", "", true}, + + // Prefix: HasPrefix semantics. + {"prefix hit", firstPrefix, "first-1", true}, + {"prefix miss", firstPrefix, "second-1", false}, + {"prefix exact", firstPrefix, firstPrefix, true}, + {"prefix too short", firstPrefix, "first", false}, + + // Glob via path.Match. + {"glob star matches sequence", "first-*", "first-123", true}, + {"glob star matches empty tail", "first-*", firstPrefix, true}, + {"glob star miss different prefix", "first-*", "second-123", false}, + {"glob question matches single", "first-?", "first-1", true}, + {"glob question miss two chars", "first-?", "first-12", false}, + {"glob class match", "[abc]*", "alpha", true}, + {"glob class miss", "[abc]*", "delta", false}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + matcher, err := buildKeyMatcher(tt.pattern) + if err != nil { + t.Fatalf("buildKeyMatcher(%q): unexpected error %v", tt.pattern, err) + } + + got := matcher(tt.key) + if got != tt.want { + t.Fatalf("buildKeyMatcher(%q)(%q) = %v, want %v", tt.pattern, tt.key, got, tt.want) + } + }) + } +} + +// TestBuildKeyMatcher_InvalidGlob pins that a malformed glob (e.g. +// unclosed `[`) surfaces an error at construction rather than every +// match failing silently at runtime. +func TestBuildKeyMatcher_InvalidGlob(t *testing.T) { + t.Parallel() + + _, err := buildKeyMatcher("[unclosed") + if err == nil { + t.Fatalf("expected error for unclosed character class, got nil") + } + + if !errors.Is(err, path.ErrBadPattern) { + t.Fatalf("expected wrapped path.ErrBadPattern, got %v", err) + } +} diff --git a/pkg/backend/dist_memory.go b/pkg/backend/dist_memory.go index 7a8a787..0030471 100644 --- a/pkg/backend/dist_memory.go +++ b/pkg/backend/dist_memory.go @@ -1752,7 +1752,7 @@ func (dm *DistMemory) resolveMissingKeys(ctx context.Context, nodeID string, ent return missing } - keys, kerr := httpT.ListKeys(ctx, nodeID) + keys, kerr := httpT.ListKeys(ctx, nodeID, "") if kerr != nil || len(keys) == 0 { return missing } diff --git a/pkg/backend/dist_migration_hint_test.go b/pkg/backend/dist_migration_hint_test.go index 01c2fbd..dcaabcc 100644 --- a/pkg/backend/dist_migration_hint_test.go +++ b/pkg/backend/dist_migration_hint_test.go @@ -68,6 +68,10 @@ func (*scriptedTransport) FetchMerkle(_ context.Context, _ string) (*MerkleTree, return nil, errScriptedNotUsed } +func (*scriptedTransport) ListKeys(_ context.Context, _, _ string) ([]string, error) { + return nil, errScriptedNotUsed +} + // newMigrationHintTestDM builds a DistMemory with just enough wiring to // drive queueHint + processHint directly. We bypass NewDistMemory to // avoid spinning a real cluster — the goal is to exercise the diff --git a/pkg/backend/dist_read_repair_test.go b/pkg/backend/dist_read_repair_test.go index 3e089d3..e1d597b 100644 --- a/pkg/backend/dist_read_repair_test.go +++ b/pkg/backend/dist_read_repair_test.go @@ -64,6 +64,10 @@ func (*captureTransport) FetchMerkle(_ context.Context, _ string) (*MerkleTree, return nil, nil //nolint:nilnil // stub never invoked by these unit tests } +func (*captureTransport) ListKeys(_ context.Context, _, _ string) ([]string, error) { + return nil, nil +} + func (c *captureTransport) callCount() int { c.mu.Lock() defer c.mu.Unlock() diff --git a/pkg/backend/dist_transport.go b/pkg/backend/dist_transport.go index 5b35059..535267c 100644 --- a/pkg/backend/dist_transport.go +++ b/pkg/backend/dist_transport.go @@ -30,6 +30,15 @@ type DistTransport interface { // short-circuit to a direct method call instead. Gossip(ctx context.Context, targetNodeID string, members []GossipMember) error FetchMerkle(ctx context.Context, nodeID string) (*MerkleTree, error) + // ListKeys enumerates keys held on the remote node's shards, + // optionally filtered by `pattern`. Empty pattern returns all + // keys; a pattern containing any glob meta-character (* ? [) is + // matched via path.Match, otherwise treated as a prefix. + // Implementations walk the per-shard cursor pagination internally + // and return the materialized key set to the caller; safe for + // node-scale enumeration, capped at DistMemory.listKeysMax to + // bound memory. + ListKeys(ctx context.Context, nodeID, pattern string) ([]string, error) } // GossipMember is the wire-friendly snapshot of a cluster.Node used @@ -227,6 +236,24 @@ func (t *InProcessTransport) FetchMerkle(_ context.Context, nodeID string) (*Mer return b.BuildMerkleTree(), nil } +// ListKeys enumerates the remote node's local shards, optionally +// filtered by pattern. In-process means we have direct access to +// the target's shards — no HTTP roundtrip, no cursor walking, just +// a synchronous in-memory scan. +func (t *InProcessTransport) ListKeys(_ context.Context, nodeID, pattern string) ([]string, error) { + b, ok := t.lookup(nodeID) + if !ok { + return nil, sentinel.ErrBackendNotFound + } + + matcher, err := buildKeyMatcher(pattern) + if err != nil { + return nil, err + } + + return b.localMatchingKeys(matcher), nil +} + func (t *InProcessTransport) lookup(nodeID string) (*DistMemory, bool) { t.mu.RLock() diff --git a/tests/dist_http_limits_test.go b/tests/dist_http_limits_test.go index 001a5c1..c2b2456 100644 --- a/tests/dist_http_limits_test.go +++ b/tests/dist_http_limits_test.go @@ -100,7 +100,7 @@ func TestDistHTTPClient_RejectsOversizedResponse(t *testing.T) { func(_ string) (string, bool) { return srv.URL, true }, ) - _, err := transport.ListKeys(context.Background(), "ignored") + _, err := transport.ListKeys(context.Background(), "ignored", "") if err == nil { t.Fatalf("expected ListKeys to fail when response exceeds limit, got nil") } diff --git a/tests/hypercache_distmemory_listkeys_test.go b/tests/hypercache_distmemory_listkeys_test.go new file mode 100644 index 0000000..1ee4b98 --- /dev/null +++ b/tests/hypercache_distmemory_listkeys_test.go @@ -0,0 +1,186 @@ +package tests + +import ( + "context" + "strconv" + "testing" + + cache "github.com/hyp3rd/hypercache/pkg/cache/v2" +) + +// seedListKeysCluster writes `count` items via their primary owner +// so each key lands with full replication. Returns the cluster the +// caller can issue ListKeys against. Mirrors the seeding shape of +// TestDistMemoryForwardingReplication. +func seedListKeysCluster(t *testing.T, count int) *DistCluster { + t.Helper() + + dc := SetupInProcessCluster(t, 5) + + for i := range count { + key := "first-" + strconv.Itoa(i+1) + + owners := dc.Ring.Lookup(key) + if len(owners) == 0 { + t.Fatalf("no owners for key %s", key) + } + + item := &cache.Item{Key: key, Value: key} + + err := item.Valid() + if err != nil { + t.Fatalf("item valid %s: %v", key, err) + } + + primary := dc.ByID(owners[0]) + if primary == nil { + t.Fatalf("unexpected owner id %s for %s", owners[0], key) + } + + err = primary.Set(context.Background(), item) + if err != nil { + t.Fatalf("set %s via %s: %v", key, owners[0], err) + } + } + + return dc +} + +// TestDistListKeys_ClusterWideDedup writes 50 keys to a 5-node +// cluster with RF=3 and asserts ListKeys returns each key exactly +// once — proving the cluster-wide dedup across replicas works. +func TestDistListKeys_ClusterWideDedup(t *testing.T) { + t.Parallel() + + const count = 50 + + dc := seedListKeysCluster(t, count) + + res, err := dc.Nodes[0].ListKeys(context.Background(), "", 0) + if err != nil { + t.Fatalf("ListKeys: %v", err) + } + + if len(res.Keys) != count { + t.Fatalf("expected %d unique keys, got %d (truncated=%v)", count, len(res.Keys), res.Truncated) + } + + if res.Truncated { + t.Fatalf("unexpected truncation: %+v", res) + } + + if len(res.PartialNodes) != 0 { + t.Fatalf("expected zero partial nodes, got %v", res.PartialNodes) + } + + seen := make(map[string]int, len(res.Keys)) + for _, k := range res.Keys { + seen[k]++ + } + + for k, n := range seen { + if n != 1 { + t.Fatalf("key %s appears %d times — dedup failed", k, n) + } + } +} + +// TestDistListKeys_PrefixFilter pins the prefix-mode classifier: +// a non-glob pattern is matched via strings.HasPrefix, so seeding +// keys with two distinct prefixes yields a clean split. +func TestDistListKeys_PrefixFilter(t *testing.T) { + t.Parallel() + + dc := SetupInProcessCluster(t, 5) + + put := func(key string) { + owners := dc.Ring.Lookup(key) + item := &cache.Item{Key: key, Value: key} + + err := item.Valid() + if err != nil { + t.Fatalf("item valid %s: %v", key, err) + } + + err = dc.ByID(owners[0]).Set(context.Background(), item) + if err != nil { + t.Fatalf("set %s: %v", key, err) + } + } + + for i := range 10 { + put("first-" + strconv.Itoa(i)) + } + + for i := range 5 { + put("second-" + strconv.Itoa(i)) + } + + res, err := dc.Nodes[2].ListKeys(context.Background(), "first-", 0) + if err != nil { + t.Fatalf("ListKeys: %v", err) + } + + if len(res.Keys) != 10 { + t.Fatalf("expected 10 first-* keys, got %d: %v", len(res.Keys), res.Keys) + } + + for _, k := range res.Keys { + if k[:6] != "first-" { + t.Fatalf("non-first prefix in result: %s", k) + } + } +} + +// TestDistListKeys_GlobFilter pins the glob branch: `first-?` is a +// single-character wildcard, so it should hit `first-1`..`first-9` +// (nine entries) and miss `first-10`. +func TestDistListKeys_GlobFilter(t *testing.T) { + t.Parallel() + + dc := seedListKeysCluster(t, 15) + + res, err := dc.Nodes[3].ListKeys(context.Background(), "first-?", 0) + if err != nil { + t.Fatalf("ListKeys: %v", err) + } + + if len(res.Keys) != 9 { + t.Fatalf("expected 9 single-digit first-? keys, got %d: %v", len(res.Keys), res.Keys) + } +} + +// TestDistListKeys_MaxTruncates pins that the `max` cap surfaces +// via Truncated=true and bounds the result set size. +func TestDistListKeys_MaxTruncates(t *testing.T) { + t.Parallel() + + dc := seedListKeysCluster(t, 50) + + res, err := dc.Nodes[0].ListKeys(context.Background(), "", 10) + if err != nil { + t.Fatalf("ListKeys: %v", err) + } + + if !res.Truncated { + t.Fatalf("expected truncated=true at max=10 against 50 keys, got %+v", res) + } + + if len(res.Keys) != 10 { + t.Fatalf("expected exactly 10 keys at max=10, got %d", len(res.Keys)) + } +} + +// TestDistListKeys_InvalidPattern surfaces malformed globs as a +// caller-visible error rather than silently returning an empty +// result. +func TestDistListKeys_InvalidPattern(t *testing.T) { + t.Parallel() + + dc := SetupInProcessCluster(t, 2) + + _, err := dc.Nodes[0].ListKeys(context.Background(), "[unclosed", 0) + if err == nil { + t.Fatalf("expected error for malformed glob, got nil") + } +}