From 62121921062a7355e6c7db8e288971e3510111a9 Mon Sep 17 00:00:00 2001 From: Pavel <177363085+pkcll@users.noreply.github.com> Date: Tue, 28 Apr 2026 20:31:13 -0400 Subject: [PATCH] fix: split oversized chip ingress batches --- pkg/chipingress/batch/client.go | 93 ++++++++++++++++----- pkg/chipingress/batch/client_test.go | 120 +++++++++++++++++++++++++++ 2 files changed, 190 insertions(+), 23 deletions(-) diff --git a/pkg/chipingress/batch/client.go b/pkg/chipingress/batch/client.go index 6033b9d271..55ccad458f 100644 --- a/pkg/chipingress/batch/client.go +++ b/pkg/chipingress/batch/client.go @@ -3,6 +3,7 @@ package batch import ( "context" "errors" + "fmt" "strconv" "sync" "sync/atomic" @@ -252,34 +253,80 @@ func (b *Client) sendBatch(ctx context.Context, messages []*messageWithCallback) go func() { defer func() { <-b.maxConcurrentSends }() - // this is specifically to prevent long running network calls - ctxTimeout, cancel := context.WithTimeout(ctx, b.maxPublishTimeout) - defer cancel() - events := make([]*chipingress.CloudEventPb, len(messages)) - for i, msg := range messages { - events[i] = msg.event - } - batchReq := &chipingress.CloudEventBatch{Events: events} - batchBytes := proto.Size(batchReq) - startedAt := time.Now() - _, err := b.client.PublishBatch(ctxTimeout, batchReq) - b.metrics.recordSend(context.Background(), len(messages), batchBytes, time.Since(startedAt), err == nil) - if err != nil { - b.log.Errorw("failed to publish batch", "error", err) - } - // the callbacks are placed in their own goroutine to not block releasing the semaphore - // we use a wait group, to ensure all callbacks are completed if .Stop() is called. - b.callbackWg.Go(func() { - for _, msg := range messages { - if msg.callback != nil { - msg.callback(err) - } + for _, batchMessages := range splitMessagesByRequestSize(messages, b.maxGRPCRequestSize) { + batchReq, batchBytes := newBatchRequest(batchMessages) + if b.maxGRPCRequestSize > 0 && batchBytes > b.maxGRPCRequestSize { + err := fmt.Errorf("publish batch serialized size %d exceeds max gRPC request size %d", batchBytes, b.maxGRPCRequestSize) + b.metrics.recordSend(context.Background(), len(batchMessages), batchBytes, 0, false) + b.log.Errorw("failed to publish batch", "error", err) + b.completeBatchCallbacks(batchMessages, err) + continue } - }) + + // this is specifically to prevent long running network calls + ctxTimeout, cancel := context.WithTimeout(ctx, b.maxPublishTimeout) + startedAt := time.Now() + _, err := b.client.PublishBatch(ctxTimeout, batchReq) + cancel() + + b.metrics.recordSend(context.Background(), len(batchMessages), batchBytes, time.Since(startedAt), err == nil) + if err != nil { + b.log.Errorw("failed to publish batch", "error", err) + } + b.completeBatchCallbacks(batchMessages, err) + } }() } +func (b *Client) completeBatchCallbacks(messages []*messageWithCallback, err error) { + callbackMessages, callbackErr := messages, err + // the callbacks are placed in their own goroutine to not block releasing the semaphore + // we use a wait group, to ensure all callbacks are completed if .Stop() is called. + b.callbackWg.Go(func() { + for _, msg := range callbackMessages { + if msg.callback != nil { + msg.callback(callbackErr) + } + } + }) +} + +func splitMessagesByRequestSize(messages []*messageWithCallback, maxRequestSize int) [][]*messageWithCallback { + if len(messages) == 0 { + return nil + } + if maxRequestSize <= 0 { + return [][]*messageWithCallback{messages} + } + + var batches [][]*messageWithCallback + current := make([]*messageWithCallback, 0, len(messages)) + for _, msg := range messages { + candidate := append(current, msg) + _, candidateBytes := newBatchRequest(candidate) + if len(current) > 0 && candidateBytes > maxRequestSize { + batches = append(batches, current) + current = []*messageWithCallback{msg} + continue + } + current = candidate + } + if len(current) > 0 { + batches = append(batches, current) + } + return batches +} + +func newBatchRequest(messages []*messageWithCallback) (*chipingress.CloudEventBatch, int) { + events := make([]*chipingress.CloudEventPb, len(messages)) + for i, msg := range messages { + events[i] = msg.event + } + batchReq := &chipingress.CloudEventBatch{Events: events} + return batchReq, proto.Size(batchReq) +} + // WithBatchSize sets the number of messages to accumulate before sending a batch func WithBatchSize(batchSize int) Opt { return func(c *Client) { diff --git a/pkg/chipingress/batch/client_test.go b/pkg/chipingress/batch/client_test.go index 5c73e5906d..e6c2a2ae7c 100644 --- a/pkg/chipingress/batch/client_test.go +++ b/pkg/chipingress/batch/client_test.go @@ -8,6 +8,7 @@ import ( "testing" "time" + cepb "github.com/cloudevents/sdk-go/binding/format/protobuf/v2/pb" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" @@ -15,6 +16,7 @@ import ( "go.opentelemetry.io/otel/attribute" sdkmetric "go.opentelemetry.io/otel/sdk/metric" "go.opentelemetry.io/otel/sdk/metric/metricdata" + "google.golang.org/protobuf/proto" "github.com/smartcontractkit/chainlink-common/pkg/chipingress" "github.com/smartcontractkit/chainlink-common/pkg/chipingress/mocks" @@ -211,6 +213,112 @@ func TestSendBatch(t *testing.T) { mockClient.AssertExpectations(t) }) + + t.Run("splits oversized batch by max gRPC request size", func(t *testing.T) { + events := []*chipingress.CloudEventPb{ + largeTestEvent("test-id-1"), + largeTestEvent("test-id-2"), + largeTestEvent("test-id-3"), + largeTestEvent("test-id-4"), + largeTestEvent("test-id-5"), + } + maxRequestSize := proto.Size(&chipingress.CloudEventBatch{Events: events[:2]}) + require.LessOrEqual(t, proto.Size(&chipingress.CloudEventBatch{Events: events[:1]}), maxRequestSize) + require.Greater(t, proto.Size(&chipingress.CloudEventBatch{Events: events[:3]}), maxRequestSize) + + mockClient := mocks.NewClient(t) + done := make(chan struct{}) + callbackDone := make(chan error, len(events)) + var mu sync.Mutex + var publishedIDs []string + var publishedSizes []int + + mockClient. + On("PublishBatch", + mock.Anything, + mock.MatchedBy(func(batch *chipingress.CloudEventBatch) bool { + return len(batch.Events) > 0 && proto.Size(batch) <= maxRequestSize + }), + ). + Return(&chipingress.PublishResponse{}, nil). + Run(func(args mock.Arguments) { + batch := args.Get(1).(*chipingress.CloudEventBatch) + mu.Lock() + for _, event := range batch.Events { + publishedIDs = append(publishedIDs, event.Id) + } + publishedSizes = append(publishedSizes, proto.Size(batch)) + if len(publishedIDs) == len(events) { + close(done) + } + mu.Unlock() + }). + Times(3) + + client, err := NewBatchClient(mockClient, WithMaxGRPCRequestSize(maxRequestSize)) + require.NoError(t, err) + + messages := make([]*messageWithCallback, 0, len(events)) + for _, event := range events { + messages = append(messages, &messageWithCallback{ + event: event, + callback: func(err error) { + callbackDone <- err + }, + }) + } + + client.sendBatch(t.Context(), messages) + + select { + case <-done: + case <-time.After(time.Second): + t.Fatal("timeout waiting for split batches to be sent") + } + for range events { + select { + case err := <-callbackDone: + require.NoError(t, err) + case <-time.After(time.Second): + t.Fatal("timeout waiting for split batch callback") + } + } + + assert.Equal(t, []string{"test-id-1", "test-id-2", "test-id-3", "test-id-4", "test-id-5"}, publishedIDs) + for _, size := range publishedSizes { + assert.LessOrEqual(t, size, maxRequestSize) + } + mockClient.AssertExpectations(t) + }) + + t.Run("doesn't publish a single event over max gRPC request size", func(t *testing.T) { + mockClient := mocks.NewClient(t) + callbackDone := make(chan error, 1) + event := largeTestEvent("oversized-id") + maxRequestSize := proto.Size(&chipingress.CloudEventBatch{Events: []*chipingress.CloudEventPb{event}}) - 1 + + client, err := NewBatchClient(mockClient, WithMaxGRPCRequestSize(maxRequestSize)) + require.NoError(t, err) + + client.sendBatch(t.Context(), []*messageWithCallback{ + { + event: event, + callback: func(err error) { + callbackDone <- err + }, + }, + }) + + select { + case err := <-callbackDone: + require.Error(t, err) + assert.Contains(t, err.Error(), "exceeds max gRPC request size") + case <-time.After(time.Second): + t.Fatal("timeout waiting for oversized batch callback") + } + + mockClient.AssertNotCalled(t, "PublishBatch", mock.Anything, mock.Anything) + }) } func TestStart(t *testing.T) { @@ -903,6 +1011,18 @@ func countCounters(counters *sync.Map) int { return n } +func largeTestEvent(id string) *chipingress.CloudEventPb { + return &chipingress.CloudEventPb{ + Id: id, + Source: "test-source", + Type: "test.event.type", + SpecVersion: "1.0", + Data: &cepb.CloudEvent_BinaryData{ + BinaryData: []byte("0123456789abcdefghijklmnopqrstuvwxyz"), + }, + } +} + func TestSeqnum(t *testing.T) { t.Run("dropped messages consume seqnum and create detectable gaps", func(t *testing.T) { client, err := NewBatchClient(nil, WithMessageBuffer(1))