diff --git a/internal/rows/arrowbased/batchloader.go b/internal/rows/arrowbased/batchloader.go index 2d86478..76f619c 100644 --- a/internal/rows/arrowbased/batchloader.go +++ b/internal/rows/arrowbased/batchloader.go @@ -5,6 +5,9 @@ import ( "context" "fmt" "io" + "math" + "math/rand" + "strconv" "strings" "time" @@ -193,6 +196,9 @@ func (bi *cloudIPCStreamIterator) Next() (io.Reader, error) { minTimeToExpiry: bi.cfg.MinTimeToExpiry, speedThresholdMbps: bi.cfg.CloudFetchSpeedThresholdMbps, httpClient: bi.httpClient, + retryMax: bi.cfg.RetryMax, + retryWaitMin: bi.cfg.RetryWaitMin, + retryWaitMax: bi.cfg.RetryWaitMax, } task.Run() bi.downloadTasks.Enqueue(task) @@ -252,6 +258,9 @@ type cloudFetchDownloadTask struct { resultChan chan cloudFetchDownloadTaskResult speedThresholdMbps float64 httpClient *http.Client + retryMax int + retryWaitMin time.Duration + retryWaitMax time.Duration } func (cft *cloudFetchDownloadTask) GetResult() (io.Reader, int64, error) { @@ -295,20 +304,32 @@ func (cft *cloudFetchDownloadTask) Run() { cft.link.RowCount, ) downloadStart := time.Now() - data, err := fetchBatchBytes(cft.ctx, cft.link, cft.minTimeToExpiry, cft.speedThresholdMbps, cft.httpClient) + rawBody, err := fetchBatchBytes( + cft.ctx, + cft.link, + cft.minTimeToExpiry, + cft.speedThresholdMbps, + cft.httpClient, + cft.retryMax, + cft.retryWaitMin, + cft.retryWaitMax, + ) if err != nil { cft.resultChan <- cloudFetchDownloadTaskResult{data: nil, err: err} return } - // Read all data into memory before closing - buf, err := io.ReadAll(getReader(data, cft.useLz4Compression)) - data.Close() //nolint:errcheck,gosec // G104: close after reading data - downloadMs := time.Since(downloadStart).Milliseconds() - if err != nil { - cft.resultChan <- cloudFetchDownloadTaskResult{data: nil, err: err} - return + buf := rawBody + if cft.useLz4Compression { + // Decompression sits outside the retry loop: malformed LZ4 is data + // corruption, not a transient network condition. + buf, err = io.ReadAll(lz4.NewReader(bytes.NewReader(rawBody))) + if err != nil { + cft.resultChan <- cloudFetchDownloadTaskResult{data: nil, err: err} + return + } } + downloadMs := time.Since(downloadStart).Milliseconds() logger.Debug().Msgf( "CloudFetch: downloaded data for link at offset %d row count %d", @@ -339,43 +360,174 @@ func logCloudFetchSpeed(fullURL string, contentLength int64, duration time.Durat } } +// fetchBatchBytes downloads a single CloudFetch result link and returns the +// raw response body, still compressed if the server used LZ4. Connection-time +// failures, retryable HTTP statuses, and mid-stream body read failures are +// retried up to retryMax times with exponential backoff and equal jitter. +// Decompression and IPC parsing stay with the caller because those failures are +// not transient network conditions. +// +// Link expiry is rechecked after each backoff: a long retry chain may outlive +// a presigned URL, and continuing past expiry is guaranteed to fail. func fetchBatchBytes( ctx context.Context, link *cli_service.TSparkArrowResultLink, minTimeToExpiry time.Duration, speedThresholdMbps float64, httpClient *http.Client, -) (io.ReadCloser, error) { - if isLinkExpired(link.ExpiryTime, minTimeToExpiry) { - return nil, errors.New(dbsqlerr.ErrLinkExpired) + retryMax int, + retryWaitMin time.Duration, + retryWaitMax time.Duration, +) ([]byte, error) { + if retryMax < 0 { + retryMax = 0 } - // TODO: Retry on HTTP errors - req, err := http.NewRequestWithContext(ctx, "GET", link.FileLink, nil) - if err != nil { - return nil, err - } + var ( + lastErr error + lastStatus int + lastRetryAfter string + ) + + for attempt := 0; attempt <= retryMax; attempt++ { + if attempt > 0 { + wait := cloudFetchBackoff(attempt, retryWaitMin, retryWaitMax, lastRetryAfter) + logger.Debug().Msgf( + "CloudFetch: retrying download of link at offset %d (attempt %d/%d) in %v; lastStatus=%d lastErr=%v", + link.StartRowOffset, attempt, retryMax, wait, lastStatus, lastErr, + ) + select { + case <-ctx.Done(): + return nil, ctx.Err() + case <-time.After(wait): + } + } - if link.HttpHeaders != nil { - for key, value := range link.HttpHeaders { - req.Header.Set(key, value) + // Check link expiry *after* backoff: a long retry chain may outlive a + // presigned URL, and there's no point spending another HTTP attempt + // (or another retry) on a link we know will be rejected. + if isLinkExpired(link.ExpiryTime, minTimeToExpiry) { + return nil, errors.New(dbsqlerr.ErrLinkExpired) } - } - startTime := time.Now() - res, err := httpClient.Do(req) - if err != nil { - return nil, err + req, err := http.NewRequestWithContext(ctx, "GET", link.FileLink, nil) + if err != nil { + return nil, err + } + if link.HttpHeaders != nil { + for key, value := range link.HttpHeaders { + req.Header.Set(key, value) + } + } + + startTime := time.Now() + res, err := httpClient.Do(req) + if err != nil { + // Caller cancellation is terminal; otherwise treat transport errors + // (TCP RST, TLS timeout, etc.) as transient. + if ctx.Err() != nil { + return nil, ctx.Err() + } + lastErr = err + lastStatus = 0 + lastRetryAfter = "" + continue + } + + if res.StatusCode == http.StatusOK { + // Read the full body inside the retry loop so truncated 200 OK + // responses are retried just like header-time failures. + buf, readErr := io.ReadAll(res.Body) + res.Body.Close() //nolint:errcheck,gosec // G104: close after drain + if readErr != nil { + if ctx.Err() != nil { + return nil, ctx.Err() + } + lastErr = readErr + lastStatus = 0 + lastRetryAfter = "" + continue + } + logCloudFetchSpeed(link.FileLink, int64(len(buf)), time.Since(startTime), speedThresholdMbps) + return buf, nil + } + + // Drain and close so the underlying connection can be reused. + _, _ = io.Copy(io.Discard, res.Body) + res.Body.Close() //nolint:errcheck,gosec // G104: closing after drain + + lastStatus = res.StatusCode + lastErr = nil + lastRetryAfter = "" + if res.Header != nil { + lastRetryAfter = res.Header.Get("Retry-After") + } + + if !isCloudFetchRetryableStatus(res.StatusCode) { + msg := fmt.Sprintf("%s: %s %d", errArrowRowsCloudFetchDownloadFailure, "HTTP error", res.StatusCode) + return nil, dbsqlerrint.NewDriverError(ctx, msg, nil) + } } - if res.StatusCode != http.StatusOK { - msg := fmt.Sprintf("%s: %s %d", errArrowRowsCloudFetchDownloadFailure, "HTTP error", res.StatusCode) - return nil, dbsqlerrint.NewDriverError(ctx, msg, err) + + if lastStatus != 0 { + msg := fmt.Sprintf("%s: %s %d (after %d retries)", errArrowRowsCloudFetchDownloadFailure, "HTTP error", lastStatus, retryMax) + return nil, dbsqlerrint.NewDriverError(ctx, msg, nil) } + msg := fmt.Sprintf("%s: %v (after %d retries)", errArrowRowsCloudFetchDownloadFailure, lastErr, retryMax) + return nil, dbsqlerrint.NewDriverError(ctx, msg, lastErr) +} + +// cloudFetchRetryableStatuses lists HTTP status codes from object storage that +// indicate transient conditions and warrant a retry. Mirrors AWS S3 guidance +// for SlowDown (503) / InternalError (500) plus the general 408/429/502/504. +var cloudFetchRetryableStatuses = map[int]struct{}{ + http.StatusRequestTimeout: {}, // 408 + http.StatusTooManyRequests: {}, // 429 + http.StatusInternalServerError: {}, // 500 + http.StatusBadGateway: {}, // 502 + http.StatusServiceUnavailable: {}, // 503 + http.StatusGatewayTimeout: {}, // 504 +} + +func isCloudFetchRetryableStatus(status int) bool { + _, ok := cloudFetchRetryableStatuses[status] + return ok +} - // Log download speed metrics - logCloudFetchSpeed(link.FileLink, res.ContentLength, time.Since(startTime), speedThresholdMbps) +// cloudFetchBackoff returns the wait before retry attempt N (1-based). The +// base delay is exponential — waitMin * 2^(attempt-1) capped at waitMax — with +// equal jitter applied: the actual sleep is uniformly distributed in +// [base/2, base]. Equal jitter (rather than no jitter) is used to spread +// synchronized retries across the up-to-MaxDownloadThreads concurrent +// downloads, which would otherwise hammer the storage endpoint in lockstep +// after a region-wide blip. If the server returned a parseable integer +// Retry-After header, that value (in seconds) is honored instead, capped at +// waitMax. HTTP-date Retry-After values are ignored — same as the Thrift +// client's backoff. +func cloudFetchBackoff(attempt int, waitMin, waitMax time.Duration, retryAfter string) time.Duration { + if retryAfter != "" { + if secs, err := strconv.ParseInt(retryAfter, 10, 64); err == nil && secs >= 0 { + d := time.Duration(secs) * time.Second + if d > waitMax { + return waitMax + } + return d + } + } - return res.Body, nil + expo := float64(waitMin) * math.Pow(2, float64(attempt-1)) + if expo > float64(waitMax) || math.IsInf(expo, 0) { + expo = float64(waitMax) + } + base := time.Duration(expo) + if base <= 0 { + return 0 + } + half := base / 2 + if half <= 0 { + return base + } + return half + time.Duration(rand.Int63n(int64(half))) //nolint:gosec // G404: jitter only, non-cryptographic } func getReader(r io.Reader, useLz4Compression bool) io.Reader { diff --git a/internal/rows/arrowbased/batchloader_test.go b/internal/rows/arrowbased/batchloader_test.go index 59947ff..bce1a3c 100644 --- a/internal/rows/arrowbased/batchloader_test.go +++ b/internal/rows/arrowbased/batchloader_test.go @@ -7,6 +7,7 @@ import ( "net/http" "net/http/httptest" "sync" + "sync/atomic" "testing" "time" @@ -30,6 +31,26 @@ func TestCloudFetchIterator(t *testing.T) { })) defer server.Close() + writeTruncatedOK := func(t *testing.T, w http.ResponseWriter, body []byte) { + t.Helper() + hj, ok := w.(http.Hijacker) + if !ok { + t.Fatal("ResponseWriter does not support Hijacker") + return + } + conn, bufrw, err := hj.Hijack() + if err != nil { + t.Fatal(err) + return + } + _, _ = fmt.Fprintf(bufrw, "HTTP/1.1 200 OK\r\nContent-Length: 1000000\r\nConnection: close\r\n\r\n") + if len(body) > 0 { + _, _ = bufrw.Write(body) + } + _ = bufrw.Flush() + _ = conn.Close() + } + t.Run("should fetch all the links", func(t *testing.T) { cloudFetchHeaders := map[string]string{ "foo": "bar", @@ -343,6 +364,396 @@ func TestCloudFetchIterator(t *testing.T) { assert.Nil(t, nextErr) assert.NotNil(t, sab) }) + + t.Run("should retry transient HTTP 503 and eventually succeed", func(t *testing.T) { + var attempts int32 + handler = func(w http.ResponseWriter, r *http.Request) { + n := atomic.AddInt32(&attempts, 1) + if n < 3 { + w.WriteHeader(http.StatusServiceUnavailable) + return + } + w.WriteHeader(http.StatusOK) + _, err := w.Write(generateMockArrowBytes(generateArrowRecord())) + if err != nil { + panic(err) + } + } + + startRowOffset := int64(100) + cfg := config.WithDefaults() + cfg.UseLz4Compression = false + cfg.MaxDownloadThreads = 1 + cfg.RetryMax = 4 + cfg.RetryWaitMin = 1 * time.Millisecond + cfg.RetryWaitMax = 5 * time.Millisecond + + bi, err := NewCloudBatchIterator( + context.Background(), + []*cli_service.TSparkArrowResultLink{{ + FileLink: server.URL, + ExpiryTime: time.Now().Add(10 * time.Minute).Unix(), + StartRowOffset: startRowOffset, + RowCount: 1, + }}, + startRowOffset, + nil, + cfg, + nil, + ) + assert.Nil(t, err) + + sab, nextErr := bi.Next() + assert.Nil(t, nextErr) + assert.NotNil(t, sab) + assert.Equal(t, int32(3), atomic.LoadInt32(&attempts), "expected 2 retries before success") + }) + + t.Run("should retry mid-stream body read failures (200 OK then connection drop)", func(t *testing.T) { + var attempts int32 + realBody := generateMockArrowBytes(generateArrowRecord()) + handler = func(w http.ResponseWriter, r *http.Request) { + n := atomic.AddInt32(&attempts, 1) + if n == 1 { + writeTruncatedOK(t, w, []byte("partial")) + return + } + w.WriteHeader(http.StatusOK) + if _, err := w.Write(realBody); err != nil { + panic(err) + } + } + + startRowOffset := int64(100) + cfg := config.WithDefaults() + cfg.UseLz4Compression = false + cfg.MaxDownloadThreads = 1 + cfg.RetryMax = 4 + cfg.RetryWaitMin = 1 * time.Millisecond + cfg.RetryWaitMax = 5 * time.Millisecond + + bi, err := NewCloudBatchIterator( + context.Background(), + []*cli_service.TSparkArrowResultLink{{ + FileLink: server.URL, + ExpiryTime: time.Now().Add(10 * time.Minute).Unix(), + StartRowOffset: startRowOffset, + RowCount: 1, + }}, + startRowOffset, + nil, + cfg, + nil, + ) + assert.Nil(t, err) + + sab, nextErr := bi.Next() + assert.Nil(t, nextErr) + assert.NotNil(t, sab) + assert.Equal(t, int32(2), atomic.LoadInt32(&attempts), "expected first attempt to fail mid-stream, second to succeed") + }) + + t.Run("should fail after exhausting retries on persistent body-read failures", func(t *testing.T) { + var attempts int32 + handler = func(w http.ResponseWriter, r *http.Request) { + atomic.AddInt32(&attempts, 1) + writeTruncatedOK(t, w, nil) + } + + startRowOffset := int64(100) + cfg := config.WithDefaults() + cfg.UseLz4Compression = false + cfg.MaxDownloadThreads = 1 + cfg.RetryMax = 2 + cfg.RetryWaitMin = 1 * time.Millisecond + cfg.RetryWaitMax = 5 * time.Millisecond + + bi, err := NewCloudBatchIterator( + context.Background(), + []*cli_service.TSparkArrowResultLink{{ + FileLink: server.URL, + ExpiryTime: time.Now().Add(10 * time.Minute).Unix(), + StartRowOffset: startRowOffset, + RowCount: 1, + }}, + startRowOffset, + nil, + cfg, + nil, + ) + assert.Nil(t, err) + + _, nextErr := bi.Next() + assert.NotNil(t, nextErr) + assert.ErrorContains(t, nextErr, "after 2 retries") + // initial attempt + RetryMax retries + assert.Equal(t, int32(3), atomic.LoadInt32(&attempts)) + }) + + t.Run("should retry transient HTTP 500 and eventually succeed", func(t *testing.T) { + var attempts int32 + handler = func(w http.ResponseWriter, r *http.Request) { + n := atomic.AddInt32(&attempts, 1) + if n < 2 { + w.WriteHeader(http.StatusInternalServerError) + return + } + w.WriteHeader(http.StatusOK) + _, err := w.Write(generateMockArrowBytes(generateArrowRecord())) + if err != nil { + panic(err) + } + } + + startRowOffset := int64(100) + cfg := config.WithDefaults() + cfg.UseLz4Compression = false + cfg.MaxDownloadThreads = 1 + cfg.RetryMax = 4 + cfg.RetryWaitMin = 1 * time.Millisecond + cfg.RetryWaitMax = 5 * time.Millisecond + + bi, err := NewCloudBatchIterator( + context.Background(), + []*cli_service.TSparkArrowResultLink{{ + FileLink: server.URL, + ExpiryTime: time.Now().Add(10 * time.Minute).Unix(), + StartRowOffset: startRowOffset, + RowCount: 1, + }}, + startRowOffset, + nil, + cfg, + nil, + ) + assert.Nil(t, err) + + sab, nextErr := bi.Next() + assert.Nil(t, nextErr) + assert.NotNil(t, sab) + assert.Equal(t, int32(2), atomic.LoadInt32(&attempts)) + }) + + t.Run("should fail after exhausting retries on persistent 503", func(t *testing.T) { + var attempts int32 + handler = func(w http.ResponseWriter, r *http.Request) { + atomic.AddInt32(&attempts, 1) + w.WriteHeader(http.StatusServiceUnavailable) + } + + startRowOffset := int64(100) + cfg := config.WithDefaults() + cfg.UseLz4Compression = false + cfg.MaxDownloadThreads = 1 + cfg.RetryMax = 2 + cfg.RetryWaitMin = 1 * time.Millisecond + cfg.RetryWaitMax = 5 * time.Millisecond + + bi, err := NewCloudBatchIterator( + context.Background(), + []*cli_service.TSparkArrowResultLink{{ + FileLink: server.URL, + ExpiryTime: time.Now().Add(10 * time.Minute).Unix(), + StartRowOffset: startRowOffset, + RowCount: 1, + }}, + startRowOffset, + nil, + cfg, + nil, + ) + assert.Nil(t, err) + + _, nextErr := bi.Next() + assert.NotNil(t, nextErr) + assert.ErrorContains(t, nextErr, fmt.Sprintf("HTTP error %d", http.StatusServiceUnavailable)) + assert.ErrorContains(t, nextErr, "after 2 retries") + // initial attempt + RetryMax retries + assert.Equal(t, int32(3), atomic.LoadInt32(&attempts)) + }) + + t.Run("should not retry on non-retryable status (403)", func(t *testing.T) { + var attempts int32 + handler = func(w http.ResponseWriter, r *http.Request) { + atomic.AddInt32(&attempts, 1) + w.WriteHeader(http.StatusForbidden) + } + + startRowOffset := int64(100) + cfg := config.WithDefaults() + cfg.UseLz4Compression = false + cfg.MaxDownloadThreads = 1 + cfg.RetryMax = 5 + cfg.RetryWaitMin = 1 * time.Millisecond + cfg.RetryWaitMax = 5 * time.Millisecond + + bi, err := NewCloudBatchIterator( + context.Background(), + []*cli_service.TSparkArrowResultLink{{ + FileLink: server.URL, + ExpiryTime: time.Now().Add(10 * time.Minute).Unix(), + StartRowOffset: startRowOffset, + RowCount: 1, + }}, + startRowOffset, + nil, + cfg, + nil, + ) + assert.Nil(t, err) + + _, nextErr := bi.Next() + assert.NotNil(t, nextErr) + assert.ErrorContains(t, nextErr, fmt.Sprintf("HTTP error %d", http.StatusForbidden)) + assert.NotContains(t, nextErr.Error(), "after") + assert.Equal(t, int32(1), atomic.LoadInt32(&attempts), "non-retryable status must fail on first attempt") + }) + + t.Run("should detect link expiry between retries", func(t *testing.T) { + var attempts int32 + handler = func(w http.ResponseWriter, r *http.Request) { + atomic.AddInt32(&attempts, 1) + w.Header().Set("Retry-After", "3") + w.WriteHeader(http.StatusServiceUnavailable) + } + + startRowOffset := int64(100) + cfg := config.WithDefaults() + cfg.UseLz4Compression = false + cfg.MaxDownloadThreads = 1 + cfg.RetryMax = 5 + cfg.RetryWaitMin = 1 * time.Millisecond + cfg.RetryWaitMax = 3 * time.Second + expiryTime := time.Now().Unix() + 2 + + bi, err := NewCloudBatchIterator( + context.Background(), + []*cli_service.TSparkArrowResultLink{{ + FileLink: server.URL, + ExpiryTime: expiryTime, + StartRowOffset: startRowOffset, + RowCount: 1, + }}, + startRowOffset, + nil, + cfg, + nil, + ) + assert.Nil(t, err) + + _, nextErr := bi.Next() + assert.NotNil(t, nextErr) + assert.ErrorContains(t, nextErr, dbsqlerr.ErrLinkExpired) + // The retry sleeps past expiry, then short-circuits before another GET. + assert.Equal(t, int32(1), atomic.LoadInt32(&attempts)) + }) + + t.Run("should respect context cancellation during backoff", func(t *testing.T) { + var attempts int32 + handler = func(w http.ResponseWriter, r *http.Request) { + atomic.AddInt32(&attempts, 1) + w.WriteHeader(http.StatusServiceUnavailable) + } + + startRowOffset := int64(100) + cfg := config.WithDefaults() + cfg.UseLz4Compression = false + cfg.MaxDownloadThreads = 1 + cfg.RetryMax = 5 + cfg.RetryWaitMin = 500 * time.Millisecond + cfg.RetryWaitMax = 1 * time.Second + + ctx, cancel := context.WithCancel(context.Background()) + go func() { + time.Sleep(50 * time.Millisecond) + cancel() + }() + + bi, err := NewCloudBatchIterator( + ctx, + []*cli_service.TSparkArrowResultLink{{ + FileLink: server.URL, + ExpiryTime: time.Now().Add(10 * time.Minute).Unix(), + StartRowOffset: startRowOffset, + RowCount: 1, + }}, + startRowOffset, + nil, + cfg, + nil, + ) + assert.Nil(t, err) + + started := time.Now() + _, nextErr := bi.Next() + elapsed := time.Since(started) + + assert.NotNil(t, nextErr) + // Cancellation should land well before all retries would otherwise complete + // (5 * 500ms+ = 2.5s+ minimum without cancel). + assert.Less(t, elapsed, 1*time.Second, "context cancel should abort retry backoff promptly") + }) +} + +func TestCloudFetchBackoff(t *testing.T) { + t.Run("retry-after integer seconds is honored", func(t *testing.T) { + got := cloudFetchBackoff(1, 100*time.Millisecond, 60*time.Second, "2") + assert.Equal(t, 2*time.Second, got) + }) + + t.Run("retry-after is capped at waitMax", func(t *testing.T) { + got := cloudFetchBackoff(1, 100*time.Millisecond, 1*time.Second, "100") + assert.Equal(t, 1*time.Second, got) + }) + + t.Run("retry-after http-date is ignored, falls back to exponential", func(t *testing.T) { + minWait := 100 * time.Millisecond + got := cloudFetchBackoff(1, minWait, 10*time.Second, "Tue, 15 Nov 1994 08:12:31 GMT") + // attempt=1 base = minWait; equal jitter in [minWait/2, minWait] + assert.GreaterOrEqual(t, got, minWait/2) + assert.LessOrEqual(t, got, minWait) + }) + + t.Run("exponential is capped at waitMax", func(t *testing.T) { + maxWait := 200 * time.Millisecond + // 100ms * 2^9 = 51200ms, capped at 200ms; equal jitter -> [100ms, 200ms] + for i := 0; i < 50; i++ { + got := cloudFetchBackoff(10, 100*time.Millisecond, maxWait, "") + assert.GreaterOrEqual(t, got, maxWait/2) + assert.LessOrEqual(t, got, maxWait) + } + }) + + t.Run("base grows exponentially with attempt", func(t *testing.T) { + minWait, maxWait := 100*time.Millisecond, 10*time.Second + // attempt=1 -> base 100ms, jitter [50ms,100ms] + // attempt=3 -> base 400ms, jitter [200ms,400ms] + for i := 0; i < 50; i++ { + got1 := cloudFetchBackoff(1, minWait, maxWait, "") + got3 := cloudFetchBackoff(3, minWait, maxWait, "") + assert.GreaterOrEqual(t, got1, 50*time.Millisecond) + assert.LessOrEqual(t, got1, 100*time.Millisecond) + assert.GreaterOrEqual(t, got3, 200*time.Millisecond) + assert.LessOrEqual(t, got3, 400*time.Millisecond) + } + }) + + t.Run("zero waitMin returns zero", func(t *testing.T) { + got := cloudFetchBackoff(1, 0, 0, "") + assert.Equal(t, time.Duration(0), got) + }) +} + +func TestCloudFetchRetryableStatus(t *testing.T) { + retryable := []int{408, 429, 500, 502, 503, 504} + notRetryable := []int{200, 201, 301, 302, 400, 401, 403, 404, 409, 410, 501} + + for _, s := range retryable { + assert.True(t, isCloudFetchRetryableStatus(s), "%d should be retryable", s) + } + for _, s := range notRetryable { + assert.False(t, isCloudFetchRetryableStatus(s), "%d should not be retryable", s) + } } func TestCloudFetchSchemaOverride(t *testing.T) {