From b9ecdc191472bcaca5710d864e34238a4b33d4d0 Mon Sep 17 00:00:00 2001 From: Jayant Singh Date: Thu, 7 May 2026 19:37:30 +0000 Subject: [PATCH 1/3] [ES-1892645] Retry transient S3 errors in CloudFetch downloads fetchBatchBytes had no retry on HTTP failures, so any single 5xx from S3 cancelled the entire query. With thousands of concurrent GETs on large result sets, even a sub-percent per-request failure rate makes at least one failure near-certain. Adds exponential backoff with equal jitter for 408/429/500/502/503/504 plus connection errors, honoring the existing RetryMax / RetryWaitMin / RetryWaitMax config and parseable integer Retry-After response headers. Link expiry is re-checked after each backoff so retries don't outlive the presigned URL. Co-authored-by: Isaac Signed-off-by: Jayant Singh --- internal/rows/arrowbased/batchloader.go | 177 ++++++++-- internal/rows/arrowbased/batchloader_test.go | 320 +++++++++++++++++++ 2 files changed, 475 insertions(+), 22 deletions(-) diff --git a/internal/rows/arrowbased/batchloader.go b/internal/rows/arrowbased/batchloader.go index 2d86478..17b50bb 100644 --- a/internal/rows/arrowbased/batchloader.go +++ b/internal/rows/arrowbased/batchloader.go @@ -5,6 +5,9 @@ import ( "context" "fmt" "io" + "math" + "math/rand" + "strconv" "strings" "time" @@ -193,6 +196,9 @@ func (bi *cloudIPCStreamIterator) Next() (io.Reader, error) { minTimeToExpiry: bi.cfg.MinTimeToExpiry, speedThresholdMbps: bi.cfg.CloudFetchSpeedThresholdMbps, httpClient: bi.httpClient, + retryMax: bi.cfg.RetryMax, + retryWaitMin: bi.cfg.RetryWaitMin, + retryWaitMax: bi.cfg.RetryWaitMax, } task.Run() bi.downloadTasks.Enqueue(task) @@ -252,6 +258,9 @@ type cloudFetchDownloadTask struct { resultChan chan cloudFetchDownloadTaskResult speedThresholdMbps float64 httpClient *http.Client + retryMax int + retryWaitMin time.Duration + retryWaitMax time.Duration } func (cft *cloudFetchDownloadTask) GetResult() (io.Reader, int64, error) { @@ -295,7 +304,16 @@ func (cft *cloudFetchDownloadTask) Run() { cft.link.RowCount, ) downloadStart := time.Now() - data, err := fetchBatchBytes(cft.ctx, cft.link, cft.minTimeToExpiry, cft.speedThresholdMbps, cft.httpClient) + data, err := fetchBatchBytes( + cft.ctx, + cft.link, + cft.minTimeToExpiry, + cft.speedThresholdMbps, + cft.httpClient, + cft.retryMax, + cft.retryWaitMin, + cft.retryWaitMax, + ) if err != nil { cft.resultChan <- cloudFetchDownloadTaskResult{data: nil, err: err} return @@ -339,43 +357,158 @@ func logCloudFetchSpeed(fullURL string, contentLength int64, duration time.Durat } } +// fetchBatchBytes downloads a single Cloud Fetch result link from object +// storage. Transient failures — connection errors and HTTP 408/429/500/502/503/504 +// from S3-style endpoints — are retried up to retryMax times with exponential +// backoff and equal jitter. Link expiry is rechecked before every attempt: a +// long retry chain can outlive a presigned URL, and continuing past expiry is +// guaranteed to fail. func fetchBatchBytes( ctx context.Context, link *cli_service.TSparkArrowResultLink, minTimeToExpiry time.Duration, speedThresholdMbps float64, httpClient *http.Client, + retryMax int, + retryWaitMin time.Duration, + retryWaitMax time.Duration, ) (io.ReadCloser, error) { - if isLinkExpired(link.ExpiryTime, minTimeToExpiry) { - return nil, errors.New(dbsqlerr.ErrLinkExpired) + if retryMax < 0 { + retryMax = 0 } - // TODO: Retry on HTTP errors - req, err := http.NewRequestWithContext(ctx, "GET", link.FileLink, nil) - if err != nil { - return nil, err - } + var ( + lastErr error + lastStatus int + lastRetryAfter string + ) - if link.HttpHeaders != nil { - for key, value := range link.HttpHeaders { - req.Header.Set(key, value) + for attempt := 0; attempt <= retryMax; attempt++ { + if attempt > 0 { + wait := cloudFetchBackoff(attempt, retryWaitMin, retryWaitMax, lastRetryAfter) + logger.Debug().Msgf( + "CloudFetch: retrying download of link at offset %d (attempt %d/%d) in %v; lastStatus=%d lastErr=%v", + link.StartRowOffset, attempt, retryMax, wait, lastStatus, lastErr, + ) + select { + case <-ctx.Done(): + return nil, ctx.Err() + case <-time.After(wait): + } } - } - startTime := time.Now() - res, err := httpClient.Do(req) - if err != nil { - return nil, err + // Check link expiry *after* backoff: a long retry chain may outlive a + // presigned URL, and there's no point spending another HTTP attempt + // (or another retry) on a link we know will be rejected. + if isLinkExpired(link.ExpiryTime, minTimeToExpiry) { + return nil, errors.New(dbsqlerr.ErrLinkExpired) + } + + req, err := http.NewRequestWithContext(ctx, "GET", link.FileLink, nil) + if err != nil { + return nil, err + } + if link.HttpHeaders != nil { + for key, value := range link.HttpHeaders { + req.Header.Set(key, value) + } + } + + startTime := time.Now() + res, err := httpClient.Do(req) + if err != nil { + // Caller cancellation is terminal; otherwise treat transport errors + // (TCP RST, TLS timeout, etc.) as transient. + if ctx.Err() != nil { + return nil, ctx.Err() + } + lastErr = err + lastStatus = 0 + lastRetryAfter = "" + continue + } + + if res.StatusCode == http.StatusOK { + logCloudFetchSpeed(link.FileLink, res.ContentLength, time.Since(startTime), speedThresholdMbps) + return res.Body, nil + } + + // Drain and close so the underlying connection can be reused. + _, _ = io.Copy(io.Discard, res.Body) + res.Body.Close() //nolint:errcheck,gosec // G104: closing after drain + + lastStatus = res.StatusCode + lastErr = nil + lastRetryAfter = "" + if res.Header != nil { + lastRetryAfter = res.Header.Get("Retry-After") + } + + if !isCloudFetchRetryableStatus(res.StatusCode) { + msg := fmt.Sprintf("%s: %s %d", errArrowRowsCloudFetchDownloadFailure, "HTTP error", res.StatusCode) + return nil, dbsqlerrint.NewDriverError(ctx, msg, nil) + } } - if res.StatusCode != http.StatusOK { - msg := fmt.Sprintf("%s: %s %d", errArrowRowsCloudFetchDownloadFailure, "HTTP error", res.StatusCode) - return nil, dbsqlerrint.NewDriverError(ctx, msg, err) + + if lastStatus != 0 { + msg := fmt.Sprintf("%s: %s %d (after %d retries)", errArrowRowsCloudFetchDownloadFailure, "HTTP error", lastStatus, retryMax) + return nil, dbsqlerrint.NewDriverError(ctx, msg, nil) } + msg := fmt.Sprintf("%s: %v (after %d retries)", errArrowRowsCloudFetchDownloadFailure, lastErr, retryMax) + return nil, dbsqlerrint.NewDriverError(ctx, msg, lastErr) +} - // Log download speed metrics - logCloudFetchSpeed(link.FileLink, res.ContentLength, time.Since(startTime), speedThresholdMbps) +// cloudFetchRetryableStatuses lists HTTP status codes from object storage that +// indicate transient conditions and warrant a retry. Mirrors AWS S3 guidance +// for SlowDown (503) / InternalError (500) plus the general 408/429/502/504. +var cloudFetchRetryableStatuses = map[int]struct{}{ + http.StatusRequestTimeout: {}, // 408 + http.StatusTooManyRequests: {}, // 429 + http.StatusInternalServerError: {}, // 500 + http.StatusBadGateway: {}, // 502 + http.StatusServiceUnavailable: {}, // 503 + http.StatusGatewayTimeout: {}, // 504 +} + +func isCloudFetchRetryableStatus(status int) bool { + _, ok := cloudFetchRetryableStatuses[status] + return ok +} + +// cloudFetchBackoff returns the wait before retry attempt N (1-based). The +// base delay is exponential — waitMin * 2^(attempt-1) capped at waitMax — with +// equal jitter applied: the actual sleep is uniformly distributed in +// [base/2, base]. Equal jitter (rather than no jitter) is used to spread +// synchronized retries across the up-to-MaxDownloadThreads concurrent +// downloads, which would otherwise hammer the storage endpoint in lockstep +// after a region-wide blip. If the server returned a parseable integer +// Retry-After header, that value (in seconds) is honored instead, capped at +// waitMax. HTTP-date Retry-After values are ignored — same as the Thrift +// client's backoff. +func cloudFetchBackoff(attempt int, waitMin, waitMax time.Duration, retryAfter string) time.Duration { + if retryAfter != "" { + if secs, err := strconv.ParseInt(retryAfter, 10, 64); err == nil && secs >= 0 { + d := time.Duration(secs) * time.Second + if d > waitMax { + return waitMax + } + return d + } + } - return res.Body, nil + expo := float64(waitMin) * math.Pow(2, float64(attempt-1)) + if expo > float64(waitMax) || math.IsInf(expo, 0) { + expo = float64(waitMax) + } + base := time.Duration(expo) + if base <= 0 { + return 0 + } + half := base / 2 + if half <= 0 { + return base + } + return half + time.Duration(rand.Int63n(int64(half))) //nolint:gosec // G404: jitter only, non-cryptographic } func getReader(r io.Reader, useLz4Compression bool) io.Reader { diff --git a/internal/rows/arrowbased/batchloader_test.go b/internal/rows/arrowbased/batchloader_test.go index 59947ff..6b6ac40 100644 --- a/internal/rows/arrowbased/batchloader_test.go +++ b/internal/rows/arrowbased/batchloader_test.go @@ -7,6 +7,7 @@ import ( "net/http" "net/http/httptest" "sync" + "sync/atomic" "testing" "time" @@ -343,6 +344,325 @@ func TestCloudFetchIterator(t *testing.T) { assert.Nil(t, nextErr) assert.NotNil(t, sab) }) + + // ES-1892645: Cloud Fetch must retry transient S3 errors. Without retry, + // a single 503 SlowDown (which AWS guarantees will occur with non-trivial + // frequency on large result sets) aborts the entire query. The downstream + // customer hit this on queries with 3,800-6,000 result files. + + t.Run("should retry transient HTTP 503 and eventually succeed", func(t *testing.T) { + var attempts int32 + handler = func(w http.ResponseWriter, r *http.Request) { + n := atomic.AddInt32(&attempts, 1) + if n < 3 { + w.WriteHeader(http.StatusServiceUnavailable) + return + } + w.WriteHeader(http.StatusOK) + _, err := w.Write(generateMockArrowBytes(generateArrowRecord())) + if err != nil { + panic(err) + } + } + + startRowOffset := int64(100) + cfg := config.WithDefaults() + cfg.UseLz4Compression = false + cfg.MaxDownloadThreads = 1 + cfg.RetryMax = 4 + cfg.RetryWaitMin = 1 * time.Millisecond + cfg.RetryWaitMax = 5 * time.Millisecond + + bi, err := NewCloudBatchIterator( + context.Background(), + []*cli_service.TSparkArrowResultLink{{ + FileLink: server.URL, + ExpiryTime: time.Now().Add(10 * time.Minute).Unix(), + StartRowOffset: startRowOffset, + RowCount: 1, + }}, + startRowOffset, + nil, + cfg, + nil, + ) + assert.Nil(t, err) + + sab, nextErr := bi.Next() + assert.Nil(t, nextErr) + assert.NotNil(t, sab) + assert.Equal(t, int32(3), atomic.LoadInt32(&attempts), "expected 2 retries before success") + }) + + t.Run("should retry transient HTTP 500 and eventually succeed", func(t *testing.T) { + var attempts int32 + handler = func(w http.ResponseWriter, r *http.Request) { + n := atomic.AddInt32(&attempts, 1) + if n < 2 { + w.WriteHeader(http.StatusInternalServerError) + return + } + w.WriteHeader(http.StatusOK) + _, err := w.Write(generateMockArrowBytes(generateArrowRecord())) + if err != nil { + panic(err) + } + } + + startRowOffset := int64(100) + cfg := config.WithDefaults() + cfg.UseLz4Compression = false + cfg.MaxDownloadThreads = 1 + cfg.RetryMax = 4 + cfg.RetryWaitMin = 1 * time.Millisecond + cfg.RetryWaitMax = 5 * time.Millisecond + + bi, err := NewCloudBatchIterator( + context.Background(), + []*cli_service.TSparkArrowResultLink{{ + FileLink: server.URL, + ExpiryTime: time.Now().Add(10 * time.Minute).Unix(), + StartRowOffset: startRowOffset, + RowCount: 1, + }}, + startRowOffset, + nil, + cfg, + nil, + ) + assert.Nil(t, err) + + sab, nextErr := bi.Next() + assert.Nil(t, nextErr) + assert.NotNil(t, sab) + assert.Equal(t, int32(2), atomic.LoadInt32(&attempts)) + }) + + t.Run("should fail after exhausting retries on persistent 503", func(t *testing.T) { + var attempts int32 + handler = func(w http.ResponseWriter, r *http.Request) { + atomic.AddInt32(&attempts, 1) + w.WriteHeader(http.StatusServiceUnavailable) + } + + startRowOffset := int64(100) + cfg := config.WithDefaults() + cfg.UseLz4Compression = false + cfg.MaxDownloadThreads = 1 + cfg.RetryMax = 2 + cfg.RetryWaitMin = 1 * time.Millisecond + cfg.RetryWaitMax = 5 * time.Millisecond + + bi, err := NewCloudBatchIterator( + context.Background(), + []*cli_service.TSparkArrowResultLink{{ + FileLink: server.URL, + ExpiryTime: time.Now().Add(10 * time.Minute).Unix(), + StartRowOffset: startRowOffset, + RowCount: 1, + }}, + startRowOffset, + nil, + cfg, + nil, + ) + assert.Nil(t, err) + + _, nextErr := bi.Next() + assert.NotNil(t, nextErr) + assert.ErrorContains(t, nextErr, fmt.Sprintf("HTTP error %d", http.StatusServiceUnavailable)) + assert.ErrorContains(t, nextErr, "after 2 retries") + // initial attempt + RetryMax retries + assert.Equal(t, int32(3), atomic.LoadInt32(&attempts)) + }) + + t.Run("should not retry on non-retryable status (403)", func(t *testing.T) { + var attempts int32 + handler = func(w http.ResponseWriter, r *http.Request) { + atomic.AddInt32(&attempts, 1) + w.WriteHeader(http.StatusForbidden) + } + + startRowOffset := int64(100) + cfg := config.WithDefaults() + cfg.UseLz4Compression = false + cfg.MaxDownloadThreads = 1 + cfg.RetryMax = 5 + cfg.RetryWaitMin = 1 * time.Millisecond + cfg.RetryWaitMax = 5 * time.Millisecond + + bi, err := NewCloudBatchIterator( + context.Background(), + []*cli_service.TSparkArrowResultLink{{ + FileLink: server.URL, + ExpiryTime: time.Now().Add(10 * time.Minute).Unix(), + StartRowOffset: startRowOffset, + RowCount: 1, + }}, + startRowOffset, + nil, + cfg, + nil, + ) + assert.Nil(t, err) + + _, nextErr := bi.Next() + assert.NotNil(t, nextErr) + assert.ErrorContains(t, nextErr, fmt.Sprintf("HTTP error %d", http.StatusForbidden)) + assert.NotContains(t, nextErr.Error(), "after") + assert.Equal(t, int32(1), atomic.LoadInt32(&attempts), "non-retryable status must fail on first attempt") + }) + + t.Run("should detect link expiry between retries", func(t *testing.T) { + // First attempt sees a not-yet-expired link, gets 503, sleeps. The + // retry sleep (≥ retryWaitMin/2 = 1s with equal jitter) crosses a + // Unix-second boundary, so the second iteration finds the link + // expired and short-circuits. We expect exactly one HTTP attempt + // followed by ErrLinkExpired — not all retries exhausted. + var attempts int32 + handler = func(w http.ResponseWriter, r *http.Request) { + atomic.AddInt32(&attempts, 1) + w.WriteHeader(http.StatusServiceUnavailable) + } + + startRowOffset := int64(100) + cfg := config.WithDefaults() + cfg.UseLz4Compression = false + cfg.MaxDownloadThreads = 1 + cfg.RetryMax = 5 + // waitMin=2s → equal jitter gives sleep ∈ [1s, 2s), guaranteed to + // cross at least one Unix-second tick. + cfg.RetryWaitMin = 2 * time.Second + cfg.RetryWaitMax = 4 * time.Second + + bi, err := NewCloudBatchIterator( + context.Background(), + []*cli_service.TSparkArrowResultLink{{ + FileLink: server.URL, + ExpiryTime: time.Now().Unix(), // floor(now); expires within the next second + StartRowOffset: startRowOffset, + RowCount: 1, + }}, + startRowOffset, + nil, + cfg, + nil, + ) + assert.Nil(t, err) + + _, nextErr := bi.Next() + assert.NotNil(t, nextErr) + assert.ErrorContains(t, nextErr, dbsqlerr.ErrLinkExpired) + // Only the first attempt should have hit the server. + assert.Equal(t, int32(1), atomic.LoadInt32(&attempts)) + }) + + t.Run("should respect context cancellation during backoff", func(t *testing.T) { + var attempts int32 + handler = func(w http.ResponseWriter, r *http.Request) { + atomic.AddInt32(&attempts, 1) + w.WriteHeader(http.StatusServiceUnavailable) + } + + startRowOffset := int64(100) + cfg := config.WithDefaults() + cfg.UseLz4Compression = false + cfg.MaxDownloadThreads = 1 + cfg.RetryMax = 5 + cfg.RetryWaitMin = 500 * time.Millisecond + cfg.RetryWaitMax = 1 * time.Second + + ctx, cancel := context.WithCancel(context.Background()) + go func() { + time.Sleep(50 * time.Millisecond) + cancel() + }() + + bi, err := NewCloudBatchIterator( + ctx, + []*cli_service.TSparkArrowResultLink{{ + FileLink: server.URL, + ExpiryTime: time.Now().Add(10 * time.Minute).Unix(), + StartRowOffset: startRowOffset, + RowCount: 1, + }}, + startRowOffset, + nil, + cfg, + nil, + ) + assert.Nil(t, err) + + started := time.Now() + _, nextErr := bi.Next() + elapsed := time.Since(started) + + assert.NotNil(t, nextErr) + // Cancellation should land well before all retries would otherwise complete + // (5 * 500ms+ = 2.5s+ minimum without cancel). + assert.Less(t, elapsed, 1*time.Second, "context cancel should abort retry backoff promptly") + }) +} + +func TestCloudFetchBackoff(t *testing.T) { + t.Run("retry-after integer seconds is honored", func(t *testing.T) { + got := cloudFetchBackoff(1, 100*time.Millisecond, 60*time.Second, "2") + assert.Equal(t, 2*time.Second, got) + }) + + t.Run("retry-after is capped at waitMax", func(t *testing.T) { + got := cloudFetchBackoff(1, 100*time.Millisecond, 1*time.Second, "100") + assert.Equal(t, 1*time.Second, got) + }) + + t.Run("retry-after http-date is ignored, falls back to exponential", func(t *testing.T) { + minWait := 100 * time.Millisecond + got := cloudFetchBackoff(1, minWait, 10*time.Second, "Tue, 15 Nov 1994 08:12:31 GMT") + // attempt=1 base = minWait; equal jitter in [minWait/2, minWait] + assert.GreaterOrEqual(t, got, minWait/2) + assert.LessOrEqual(t, got, minWait) + }) + + t.Run("exponential is capped at waitMax", func(t *testing.T) { + maxWait := 200 * time.Millisecond + // 100ms * 2^9 = 51200ms, capped at 200ms; equal jitter -> [100ms, 200ms] + for i := 0; i < 50; i++ { + got := cloudFetchBackoff(10, 100*time.Millisecond, maxWait, "") + assert.GreaterOrEqual(t, got, maxWait/2) + assert.LessOrEqual(t, got, maxWait) + } + }) + + t.Run("base grows exponentially with attempt", func(t *testing.T) { + minWait, maxWait := 100*time.Millisecond, 10*time.Second + // attempt=1 -> base 100ms, jitter [50ms,100ms] + // attempt=3 -> base 400ms, jitter [200ms,400ms] + for i := 0; i < 50; i++ { + got1 := cloudFetchBackoff(1, minWait, maxWait, "") + got3 := cloudFetchBackoff(3, minWait, maxWait, "") + assert.GreaterOrEqual(t, got1, 50*time.Millisecond) + assert.LessOrEqual(t, got1, 100*time.Millisecond) + assert.GreaterOrEqual(t, got3, 200*time.Millisecond) + assert.LessOrEqual(t, got3, 400*time.Millisecond) + } + }) + + t.Run("zero waitMin returns zero", func(t *testing.T) { + got := cloudFetchBackoff(1, 0, 0, "") + assert.Equal(t, time.Duration(0), got) + }) +} + +func TestCloudFetchRetryableStatus(t *testing.T) { + retryable := []int{408, 429, 500, 502, 503, 504} + notRetryable := []int{200, 201, 301, 302, 400, 401, 403, 404, 409, 410, 501} + + for _, s := range retryable { + assert.True(t, isCloudFetchRetryableStatus(s), "%d should be retryable", s) + } + for _, s := range notRetryable { + assert.False(t, isCloudFetchRetryableStatus(s), "%d should not be retryable", s) + } } func TestCloudFetchSchemaOverride(t *testing.T) { From bd6ad67c60a4d56eae358a5253abb4674436a728 Mon Sep 17 00:00:00 2001 From: Jayant Singh Date: Thu, 7 May 2026 20:02:30 +0000 Subject: [PATCH 2/3] [ES-1892645] Retry mid-stream body read failures, deflake expiry test MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two follow-ups from review feedback: 1. fetchBatchBytes returned res.Body on 200 OK and let the caller's io.ReadAll happen outside the retry loop, so a TCP RST or truncated body during streaming surfaced as a hard failure. With multi-MB S3 objects this is the dominant transient mode at scale and exactly the failure we want covered. Buffer the body inside the retry loop and treat read errors as transient. Decompression stays outside the loop — malformed LZ4 is data corruption, not transient. 2. The expiry-between-retries test used ExpiryTime = floor(now), which could already be expired before iter 0's check if construction crossed a Unix-second boundary (attempts=0 instead of 1). Add 1s of guaranteed headroom and bump RetryWaitMin to 4s so the post-backoff sleep deterministically pushes floor(now) past expiry on iter 1. Co-authored-by: Isaac Signed-off-by: Jayant Singh --- internal/rows/arrowbased/batchloader.go | 46 ++++-- internal/rows/arrowbased/batchloader_test.go | 139 +++++++++++++++++-- 2 files changed, 164 insertions(+), 21 deletions(-) diff --git a/internal/rows/arrowbased/batchloader.go b/internal/rows/arrowbased/batchloader.go index 17b50bb..f059c33 100644 --- a/internal/rows/arrowbased/batchloader.go +++ b/internal/rows/arrowbased/batchloader.go @@ -304,6 +304,9 @@ func (cft *cloudFetchDownloadTask) Run() { cft.link.RowCount, ) downloadStart := time.Now() + // fetchBatchBytes now buffers the body in memory and retries on + // mid-stream failures, so the returned reader is always a + // *bytes.Reader and needs no Close. data, err := fetchBatchBytes( cft.ctx, cft.link, @@ -319,9 +322,10 @@ func (cft *cloudFetchDownloadTask) Run() { return } - // Read all data into memory before closing + // Decompression sits outside the retry loop on purpose: a malformed + // LZ4 frame is data corruption, not a transient network condition, + // and won't recover on retry. buf, err := io.ReadAll(getReader(data, cft.useLz4Compression)) - data.Close() //nolint:errcheck,gosec // G104: close after reading data downloadMs := time.Since(downloadStart).Milliseconds() if err != nil { cft.resultChan <- cloudFetchDownloadTaskResult{data: nil, err: err} @@ -358,11 +362,18 @@ func logCloudFetchSpeed(fullURL string, contentLength int64, duration time.Durat } // fetchBatchBytes downloads a single Cloud Fetch result link from object -// storage. Transient failures — connection errors and HTTP 408/429/500/502/503/504 -// from S3-style endpoints — are retried up to retryMax times with exponential -// backoff and equal jitter. Link expiry is rechecked before every attempt: a -// long retry chain can outlive a presigned URL, and continuing past expiry is -// guaranteed to fail. +// storage and returns the raw (still-compressed, if any) response body +// buffered in memory. Both connection-time failures and mid-stream body-read +// failures are retried up to retryMax times with exponential backoff and +// equal jitter, alongside HTTP 408/429/500/502/503/504. The body read +// happens inside the retry loop on purpose: with multi-MB S3 objects, a TCP +// RST or truncated response surfaces as an io.ReadAll error *after* the +// 200 OK headers have already arrived, and that's exactly the failure mode +// the customer hits at scale. Decompression and IPC parsing are left to the +// caller — those errors aren't transient, so retrying them is wasted work. +// +// Link expiry is rechecked after each backoff: a long retry chain may outlive +// a presigned URL, and continuing past expiry is guaranteed to fail. func fetchBatchBytes( ctx context.Context, link *cli_service.TSparkArrowResultLink, @@ -372,7 +383,7 @@ func fetchBatchBytes( retryMax int, retryWaitMin time.Duration, retryWaitMax time.Duration, -) (io.ReadCloser, error) { +) (io.Reader, error) { if retryMax < 0 { retryMax = 0 } @@ -429,8 +440,23 @@ func fetchBatchBytes( } if res.StatusCode == http.StatusOK { - logCloudFetchSpeed(link.FileLink, res.ContentLength, time.Since(startTime), speedThresholdMbps) - return res.Body, nil + // Drain the full body inside the retry loop so a mid-stream + // failure (TCP RST, S3 cutting the connection partway through a + // multi-MB object, server-claimed Content-Length not delivered) + // is retried just like a header-time error. + buf, readErr := io.ReadAll(res.Body) + res.Body.Close() //nolint:errcheck,gosec // G104: close after drain + if readErr != nil { + if ctx.Err() != nil { + return nil, ctx.Err() + } + lastErr = readErr + lastStatus = 0 + lastRetryAfter = "" + continue + } + logCloudFetchSpeed(link.FileLink, int64(len(buf)), time.Since(startTime), speedThresholdMbps) + return bytes.NewReader(buf), nil } // Drain and close so the underlying connection can be reused. diff --git a/internal/rows/arrowbased/batchloader_test.go b/internal/rows/arrowbased/batchloader_test.go index 6b6ac40..ce5c71d 100644 --- a/internal/rows/arrowbased/batchloader_test.go +++ b/internal/rows/arrowbased/batchloader_test.go @@ -394,6 +394,121 @@ func TestCloudFetchIterator(t *testing.T) { assert.Equal(t, int32(3), atomic.LoadInt32(&attempts), "expected 2 retries before success") }) + t.Run("should retry mid-stream body read failures (200 OK then connection drop)", func(t *testing.T) { + // Reproduces the gap closed by ES-1892645 review feedback: a 200 OK + // response whose body is truncated mid-stream must be retried, not + // surfaced as a hard failure. With multi-MB S3 objects this is the + // dominant transient mode (TCP RST during streaming, S3 cutting the + // connection partway), and is invisible to the status-code check. + var attempts int32 + realBody := generateMockArrowBytes(generateArrowRecord()) + handler = func(w http.ResponseWriter, r *http.Request) { + n := atomic.AddInt32(&attempts, 1) + if n == 1 { + // Hijack so we can write a 200 OK with a Content-Length we + // will deliberately under-fulfill, then close the TCP + // connection. The client's io.ReadAll surfaces this as + // ErrUnexpectedEOF. + hj, ok := w.(http.Hijacker) + if !ok { + t.Fatal("ResponseWriter does not support Hijacker") + return + } + conn, bufrw, err := hj.Hijack() + if err != nil { + t.Fatal(err) + return + } + _, _ = fmt.Fprintf(bufrw, "HTTP/1.1 200 OK\r\nContent-Length: 1000000\r\nConnection: close\r\n\r\n") + _, _ = bufrw.Write([]byte("partial")) + _ = bufrw.Flush() + _ = conn.Close() + return + } + w.WriteHeader(http.StatusOK) + if _, err := w.Write(realBody); err != nil { + panic(err) + } + } + + startRowOffset := int64(100) + cfg := config.WithDefaults() + cfg.UseLz4Compression = false + cfg.MaxDownloadThreads = 1 + cfg.RetryMax = 4 + cfg.RetryWaitMin = 1 * time.Millisecond + cfg.RetryWaitMax = 5 * time.Millisecond + + bi, err := NewCloudBatchIterator( + context.Background(), + []*cli_service.TSparkArrowResultLink{{ + FileLink: server.URL, + ExpiryTime: time.Now().Add(10 * time.Minute).Unix(), + StartRowOffset: startRowOffset, + RowCount: 1, + }}, + startRowOffset, + nil, + cfg, + nil, + ) + assert.Nil(t, err) + + sab, nextErr := bi.Next() + assert.Nil(t, nextErr) + assert.NotNil(t, sab) + assert.Equal(t, int32(2), atomic.LoadInt32(&attempts), "expected first attempt to fail mid-stream, second to succeed") + }) + + t.Run("should fail after exhausting retries on persistent body-read failures", func(t *testing.T) { + var attempts int32 + handler = func(w http.ResponseWriter, r *http.Request) { + atomic.AddInt32(&attempts, 1) + hj, ok := w.(http.Hijacker) + if !ok { + t.Fatal("ResponseWriter does not support Hijacker") + return + } + conn, bufrw, err := hj.Hijack() + if err != nil { + t.Fatal(err) + return + } + _, _ = fmt.Fprintf(bufrw, "HTTP/1.1 200 OK\r\nContent-Length: 1000000\r\nConnection: close\r\n\r\n") + _ = bufrw.Flush() + _ = conn.Close() + } + + startRowOffset := int64(100) + cfg := config.WithDefaults() + cfg.UseLz4Compression = false + cfg.MaxDownloadThreads = 1 + cfg.RetryMax = 2 + cfg.RetryWaitMin = 1 * time.Millisecond + cfg.RetryWaitMax = 5 * time.Millisecond + + bi, err := NewCloudBatchIterator( + context.Background(), + []*cli_service.TSparkArrowResultLink{{ + FileLink: server.URL, + ExpiryTime: time.Now().Add(10 * time.Minute).Unix(), + StartRowOffset: startRowOffset, + RowCount: 1, + }}, + startRowOffset, + nil, + cfg, + nil, + ) + assert.Nil(t, err) + + _, nextErr := bi.Next() + assert.NotNil(t, nextErr) + assert.ErrorContains(t, nextErr, "after 2 retries") + // initial attempt + RetryMax retries + assert.Equal(t, int32(3), atomic.LoadInt32(&attempts)) + }) + t.Run("should retry transient HTTP 500 and eventually succeed", func(t *testing.T) { var attempts int32 handler = func(w http.ResponseWriter, r *http.Request) { @@ -514,11 +629,14 @@ func TestCloudFetchIterator(t *testing.T) { }) t.Run("should detect link expiry between retries", func(t *testing.T) { - // First attempt sees a not-yet-expired link, gets 503, sleeps. The - // retry sleep (≥ retryWaitMin/2 = 1s with equal jitter) crosses a - // Unix-second boundary, so the second iteration finds the link - // expired and short-circuits. We expect exactly one HTTP attempt - // followed by ErrLinkExpired — not all retries exhausted. + // Time math: ExpiryTime = floor(now)+1 (via .Add(time.Second).Unix()) + // gives ~1s of guaranteed headroom on attempt 0, so even with + // realistic CI scheduling delay the first expiry check passes and + // the first HTTP attempt fires. waitMin=4s → equal jitter sleeps + // in [2s, 4s); 2s is enough to push floor(now) strictly past the + // expiry on attempt 1 regardless of where t0 sat in its second. + // Trades ~2-4s of wall clock for determinism — see ES-1892645 + // review feedback. var attempts int32 handler = func(w http.ResponseWriter, r *http.Request) { atomic.AddInt32(&attempts, 1) @@ -530,16 +648,14 @@ func TestCloudFetchIterator(t *testing.T) { cfg.UseLz4Compression = false cfg.MaxDownloadThreads = 1 cfg.RetryMax = 5 - // waitMin=2s → equal jitter gives sleep ∈ [1s, 2s), guaranteed to - // cross at least one Unix-second tick. - cfg.RetryWaitMin = 2 * time.Second - cfg.RetryWaitMax = 4 * time.Second + cfg.RetryWaitMin = 4 * time.Second + cfg.RetryWaitMax = 8 * time.Second bi, err := NewCloudBatchIterator( context.Background(), []*cli_service.TSparkArrowResultLink{{ FileLink: server.URL, - ExpiryTime: time.Now().Unix(), // floor(now); expires within the next second + ExpiryTime: time.Now().Add(time.Second).Unix(), StartRowOffset: startRowOffset, RowCount: 1, }}, @@ -553,7 +669,8 @@ func TestCloudFetchIterator(t *testing.T) { _, nextErr := bi.Next() assert.NotNil(t, nextErr) assert.ErrorContains(t, nextErr, dbsqlerr.ErrLinkExpired) - // Only the first attempt should have hit the server. + // Only the first attempt should have hit the server — the second + // iteration short-circuits on the post-backoff expiry check. assert.Equal(t, int32(1), atomic.LoadInt32(&attempts)) }) From 4b72cac621a5470190b44ebda8529a5b5bca4f4d Mon Sep 17 00:00:00 2001 From: Jayant Singh Date: Thu, 7 May 2026 20:19:31 +0000 Subject: [PATCH 3/3] Improvements Signed-off-by: Jayant Singh --- internal/rows/arrowbased/batchloader.go | 49 +++++------- internal/rows/arrowbased/batchloader_test.go | 82 +++++++------------- 2 files changed, 49 insertions(+), 82 deletions(-) diff --git a/internal/rows/arrowbased/batchloader.go b/internal/rows/arrowbased/batchloader.go index f059c33..76f619c 100644 --- a/internal/rows/arrowbased/batchloader.go +++ b/internal/rows/arrowbased/batchloader.go @@ -304,10 +304,7 @@ func (cft *cloudFetchDownloadTask) Run() { cft.link.RowCount, ) downloadStart := time.Now() - // fetchBatchBytes now buffers the body in memory and retries on - // mid-stream failures, so the returned reader is always a - // *bytes.Reader and needs no Close. - data, err := fetchBatchBytes( + rawBody, err := fetchBatchBytes( cft.ctx, cft.link, cft.minTimeToExpiry, @@ -322,15 +319,17 @@ func (cft *cloudFetchDownloadTask) Run() { return } - // Decompression sits outside the retry loop on purpose: a malformed - // LZ4 frame is data corruption, not a transient network condition, - // and won't recover on retry. - buf, err := io.ReadAll(getReader(data, cft.useLz4Compression)) - downloadMs := time.Since(downloadStart).Milliseconds() - if err != nil { - cft.resultChan <- cloudFetchDownloadTaskResult{data: nil, err: err} - return + buf := rawBody + if cft.useLz4Compression { + // Decompression sits outside the retry loop: malformed LZ4 is data + // corruption, not a transient network condition. + buf, err = io.ReadAll(lz4.NewReader(bytes.NewReader(rawBody))) + if err != nil { + cft.resultChan <- cloudFetchDownloadTaskResult{data: nil, err: err} + return + } } + downloadMs := time.Since(downloadStart).Milliseconds() logger.Debug().Msgf( "CloudFetch: downloaded data for link at offset %d row count %d", @@ -361,16 +360,12 @@ func logCloudFetchSpeed(fullURL string, contentLength int64, duration time.Durat } } -// fetchBatchBytes downloads a single Cloud Fetch result link from object -// storage and returns the raw (still-compressed, if any) response body -// buffered in memory. Both connection-time failures and mid-stream body-read -// failures are retried up to retryMax times with exponential backoff and -// equal jitter, alongside HTTP 408/429/500/502/503/504. The body read -// happens inside the retry loop on purpose: with multi-MB S3 objects, a TCP -// RST or truncated response surfaces as an io.ReadAll error *after* the -// 200 OK headers have already arrived, and that's exactly the failure mode -// the customer hits at scale. Decompression and IPC parsing are left to the -// caller — those errors aren't transient, so retrying them is wasted work. +// fetchBatchBytes downloads a single CloudFetch result link and returns the +// raw response body, still compressed if the server used LZ4. Connection-time +// failures, retryable HTTP statuses, and mid-stream body read failures are +// retried up to retryMax times with exponential backoff and equal jitter. +// Decompression and IPC parsing stay with the caller because those failures are +// not transient network conditions. // // Link expiry is rechecked after each backoff: a long retry chain may outlive // a presigned URL, and continuing past expiry is guaranteed to fail. @@ -383,7 +378,7 @@ func fetchBatchBytes( retryMax int, retryWaitMin time.Duration, retryWaitMax time.Duration, -) (io.Reader, error) { +) ([]byte, error) { if retryMax < 0 { retryMax = 0 } @@ -440,10 +435,8 @@ func fetchBatchBytes( } if res.StatusCode == http.StatusOK { - // Drain the full body inside the retry loop so a mid-stream - // failure (TCP RST, S3 cutting the connection partway through a - // multi-MB object, server-claimed Content-Length not delivered) - // is retried just like a header-time error. + // Read the full body inside the retry loop so truncated 200 OK + // responses are retried just like header-time failures. buf, readErr := io.ReadAll(res.Body) res.Body.Close() //nolint:errcheck,gosec // G104: close after drain if readErr != nil { @@ -456,7 +449,7 @@ func fetchBatchBytes( continue } logCloudFetchSpeed(link.FileLink, int64(len(buf)), time.Since(startTime), speedThresholdMbps) - return bytes.NewReader(buf), nil + return buf, nil } // Drain and close so the underlying connection can be reused. diff --git a/internal/rows/arrowbased/batchloader_test.go b/internal/rows/arrowbased/batchloader_test.go index ce5c71d..bce1a3c 100644 --- a/internal/rows/arrowbased/batchloader_test.go +++ b/internal/rows/arrowbased/batchloader_test.go @@ -31,6 +31,26 @@ func TestCloudFetchIterator(t *testing.T) { })) defer server.Close() + writeTruncatedOK := func(t *testing.T, w http.ResponseWriter, body []byte) { + t.Helper() + hj, ok := w.(http.Hijacker) + if !ok { + t.Fatal("ResponseWriter does not support Hijacker") + return + } + conn, bufrw, err := hj.Hijack() + if err != nil { + t.Fatal(err) + return + } + _, _ = fmt.Fprintf(bufrw, "HTTP/1.1 200 OK\r\nContent-Length: 1000000\r\nConnection: close\r\n\r\n") + if len(body) > 0 { + _, _ = bufrw.Write(body) + } + _ = bufrw.Flush() + _ = conn.Close() + } + t.Run("should fetch all the links", func(t *testing.T) { cloudFetchHeaders := map[string]string{ "foo": "bar", @@ -345,11 +365,6 @@ func TestCloudFetchIterator(t *testing.T) { assert.NotNil(t, sab) }) - // ES-1892645: Cloud Fetch must retry transient S3 errors. Without retry, - // a single 503 SlowDown (which AWS guarantees will occur with non-trivial - // frequency on large result sets) aborts the entire query. The downstream - // customer hit this on queries with 3,800-6,000 result files. - t.Run("should retry transient HTTP 503 and eventually succeed", func(t *testing.T) { var attempts int32 handler = func(w http.ResponseWriter, r *http.Request) { @@ -395,34 +410,12 @@ func TestCloudFetchIterator(t *testing.T) { }) t.Run("should retry mid-stream body read failures (200 OK then connection drop)", func(t *testing.T) { - // Reproduces the gap closed by ES-1892645 review feedback: a 200 OK - // response whose body is truncated mid-stream must be retried, not - // surfaced as a hard failure. With multi-MB S3 objects this is the - // dominant transient mode (TCP RST during streaming, S3 cutting the - // connection partway), and is invisible to the status-code check. var attempts int32 realBody := generateMockArrowBytes(generateArrowRecord()) handler = func(w http.ResponseWriter, r *http.Request) { n := atomic.AddInt32(&attempts, 1) if n == 1 { - // Hijack so we can write a 200 OK with a Content-Length we - // will deliberately under-fulfill, then close the TCP - // connection. The client's io.ReadAll surfaces this as - // ErrUnexpectedEOF. - hj, ok := w.(http.Hijacker) - if !ok { - t.Fatal("ResponseWriter does not support Hijacker") - return - } - conn, bufrw, err := hj.Hijack() - if err != nil { - t.Fatal(err) - return - } - _, _ = fmt.Fprintf(bufrw, "HTTP/1.1 200 OK\r\nContent-Length: 1000000\r\nConnection: close\r\n\r\n") - _, _ = bufrw.Write([]byte("partial")) - _ = bufrw.Flush() - _ = conn.Close() + writeTruncatedOK(t, w, []byte("partial")) return } w.WriteHeader(http.StatusOK) @@ -464,19 +457,7 @@ func TestCloudFetchIterator(t *testing.T) { var attempts int32 handler = func(w http.ResponseWriter, r *http.Request) { atomic.AddInt32(&attempts, 1) - hj, ok := w.(http.Hijacker) - if !ok { - t.Fatal("ResponseWriter does not support Hijacker") - return - } - conn, bufrw, err := hj.Hijack() - if err != nil { - t.Fatal(err) - return - } - _, _ = fmt.Fprintf(bufrw, "HTTP/1.1 200 OK\r\nContent-Length: 1000000\r\nConnection: close\r\n\r\n") - _ = bufrw.Flush() - _ = conn.Close() + writeTruncatedOK(t, w, nil) } startRowOffset := int64(100) @@ -629,17 +610,10 @@ func TestCloudFetchIterator(t *testing.T) { }) t.Run("should detect link expiry between retries", func(t *testing.T) { - // Time math: ExpiryTime = floor(now)+1 (via .Add(time.Second).Unix()) - // gives ~1s of guaranteed headroom on attempt 0, so even with - // realistic CI scheduling delay the first expiry check passes and - // the first HTTP attempt fires. waitMin=4s → equal jitter sleeps - // in [2s, 4s); 2s is enough to push floor(now) strictly past the - // expiry on attempt 1 regardless of where t0 sat in its second. - // Trades ~2-4s of wall clock for determinism — see ES-1892645 - // review feedback. var attempts int32 handler = func(w http.ResponseWriter, r *http.Request) { atomic.AddInt32(&attempts, 1) + w.Header().Set("Retry-After", "3") w.WriteHeader(http.StatusServiceUnavailable) } @@ -648,14 +622,15 @@ func TestCloudFetchIterator(t *testing.T) { cfg.UseLz4Compression = false cfg.MaxDownloadThreads = 1 cfg.RetryMax = 5 - cfg.RetryWaitMin = 4 * time.Second - cfg.RetryWaitMax = 8 * time.Second + cfg.RetryWaitMin = 1 * time.Millisecond + cfg.RetryWaitMax = 3 * time.Second + expiryTime := time.Now().Unix() + 2 bi, err := NewCloudBatchIterator( context.Background(), []*cli_service.TSparkArrowResultLink{{ FileLink: server.URL, - ExpiryTime: time.Now().Add(time.Second).Unix(), + ExpiryTime: expiryTime, StartRowOffset: startRowOffset, RowCount: 1, }}, @@ -669,8 +644,7 @@ func TestCloudFetchIterator(t *testing.T) { _, nextErr := bi.Next() assert.NotNil(t, nextErr) assert.ErrorContains(t, nextErr, dbsqlerr.ErrLinkExpired) - // Only the first attempt should have hit the server — the second - // iteration short-circuits on the post-backoff expiry check. + // The retry sleeps past expiry, then short-circuits before another GET. assert.Equal(t, int32(1), atomic.LoadInt32(&attempts)) })