Conversation
|
👋 thomaska, thanks for creating this pull request! To help reviewers, please consider creating future PRs as drafts first. This allows you to self-review and make any final changes before notifying the team. Once you're ready, you can mark it as "Ready for review" to request feedback. Thanks! |
There was a problem hiding this comment.
Pull request overview
This pull request replaces the per-event ChIP Ingress emission with a batched approach to reduce overhead from N gRPC calls + N Kafka transactions to 1 call + 1 transaction per flush interval. The implementation introduces a new ChipIngressBatchEmitter that buffers events per (domain, entity) pair and flushes them periodically using PublishBatch.
Changes:
- Introduced
ChipIngressBatchEmitterwith per-(domain, entity) worker goroutines for batching events - Added
chipIngressEmitterWorkerto handle batch assembly and sending with configurable timeouts - Removed goroutine wrapper from
DualSourceEmitter.Emit()since batching is now non-blocking (channel send) - Added 4 new configuration parameters with sensible defaults (BufferSize: 100, MaxBatchSize: 50, SendInterval: 500ms, SendTimeout: 10s)
Reviewed changes
Copilot reviewed 7 out of 7 changed files in this pull request and generated no comments.
Show a summary per file
| File | Description |
|---|---|
| pkg/beholder/chip_ingress_batch_emitter.go | New batch emitter with per-worker buffering and periodic flushing via PublishBatch |
| pkg/beholder/chip_ingress_emitter_worker.go | Worker implementation handling batch assembly, channel draining, and exponential backoff logging for drops |
| pkg/beholder/chip_ingress_batch_emitter_test.go | Comprehensive test coverage (10 tests) for batching, max batch size, isolation, buffer overflow, lifecycle, errors, and defaults |
| pkg/beholder/dual_source_emitter.go | Simplified Emit() by removing goroutine wrapper since ChipIngressBatchEmitter.Emit() is non-blocking |
| pkg/beholder/client.go | Updated to create and start ChipIngressBatchEmitter instead of ChipIngressEmitter; added comment about closure ordering |
| pkg/beholder/config.go | Added 4 new config fields with inline documentation and default values |
| pkg/beholder/config_test.go | Updated expected output to include new config fields |
Comments suppressed due to low confidence (2)
pkg/beholder/config.go:50
- The comment states "Zero disables batching" but the implementation in NewChipIngressBatchEmitter treats zero as "use default" and sets it to 500ms. The comment should be corrected to match the actual behavior, e.g., "Flush interval per worker (default 500ms when zero or unset)".
ChipIngressSendInterval time.Duration // Flush interval per worker (default 500ms). Zero disables batching.
pkg/beholder/client.go:248
- The messageLoggerProvider appears twice in the shutdowner slice. This will cause it to be shut down twice, which could lead to errors or undefined behavior. Remove one of the duplicate entries.
for _, provider := range []shutdowner{messageLoggerProvider, loggerProvider, tracerProvider, meterProvider, messageLoggerProvider} {
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
| return nil, err | ||
| } | ||
|
|
||
| chipIngressEmitter, err := NewChipIngressEmitter(chipIngressClient) |
| // via chipingress.Client.PublishBatch on a periodic interval. | ||
| // It satisfies the Emitter interface so it can be used as a drop-in replacement | ||
| // for ChipIngressEmitter. | ||
| type ChipIngressBatchEmitter struct { |
| return e, nil | ||
| } | ||
|
|
||
| func (e *ChipIngressBatchEmitter) start(_ context.Context) error { |
There was a problem hiding this comment.
what's the role of this function if it always returns null?
There was a problem hiding this comment.
it was mostly added as a placeholder, but can be omitted as well.
And after checking, in the core/services/workflows/syncer/v2/handler.go in EventHandler it's also omitted, so. probably it's more consistent.
|
|
||
| // NewChipIngressBatchEmitter creates a batch emitter backed by the given chipingress client. | ||
| // Call Start() to begin health monitoring, and Close() to stop all workers. | ||
| func NewChipIngressBatchEmitter(client chipingress.Client, lggr logger.Logger, cfg Config) (*ChipIngressBatchEmitter, error) { |
There was a problem hiding this comment.
this is pure stylistic, and feel free to ignore it, make the logger the last param and after renaming the struct so ChipIngressBatchService, make sure to adjust the name of the constructor
| var events []chipingress.CloudEvent | ||
|
|
||
| for len(w.ch) > 0 && len(events) < int(w.maxBatchSize) { // #nosec G115 | ||
| payload := <-w.ch |
There was a problem hiding this comment.
if im not mistaken, this can block if the channel is drained by another goroutine.
i'd use a select instead
max := int(w.maxBatchSize)
for len(events) < max {
select {
case payload := <-w.ch:
event, err := w.payloadToEvent(payload)
if err != nil {
w.lggr.Warnf("failed to build CloudEvent, dropping: %v", err)
continue
}
events = append(events, event)
default:
return
}
}
|
|
||
| func (c *Client) start(ctx context.Context) error { | ||
| if c.batchEmitterService != nil { | ||
| return c.batchEmitterService.Start(ctx) |
There was a problem hiding this comment.
Does this need to be closed too? If pass it to the Engine instead, it will manage it for you.
…directly The vars were indirecting beholder.NewClient and client.Start solely to support test stubbing.
When Config.CloseIfNeverStarted is true, calling Close() on a service that was never started now runs the Config.Close hook (to release constructor-held resources) without attempting to stop sub-services, which are guaranteed to also be unstarted and would return ErrCannotStopUnstarted.
…everStarted Replace ManagedServices() with the services.Config sub-service pattern so the batch emitter starts and stops automatically with the Client lifecycle. Set CloseIfNeverStarted: true on all client constructors (GRPC, HTTP, Noop) so that calling Close() before Start() still runs the OTel/Chip teardown hook without erroring on the never-started batch emitter sub-service.
…batch emitter When ChipIngressBatchEmitterEnabled is true, the chipIngressEmitter is a ChipIngressBatchEmitterService whose lifecycle is owned by the Client service engine. DualSourceEmitter.Close was unconditionally calling Close on it, which returns ErrCannotStopUnstarted when the client is closed before being started (the CloseIfNeverStarted path). Skip the chipIngressEmitter.Close call when batch emitter mode is active; the service framework handles teardown.
Add TestClient_batchEmitterService with four subtests covering the new service-managed batch emitter lifecycle: - starts with client: Ready/Emit error assertions before Start, clean shutdown - emit succeeds before start: valid emit returns nil (OTLP path always active; batch emitter error is swallowed by DualSourceEmitter) - close without start: CloseIfNeverStarted path returns no error - requires logger: constructor rejects nil ChipIngressLogger Remove managed_services_test.go; tests consolidated into client_test.go.
| batchBytes := proto.Size(batchReq) | ||
| startedAt := time.Now() | ||
| _, err := b.client.PublishBatch(ctxTimeout, batchReq) | ||
| b.metrics.recordSend(context.Background(), len(messages), batchBytes, time.Since(startedAt), err == nil) |
There was a problem hiding this comment.
Why background context?
| b.metrics.recordSend(context.Background(), len(messages), batchBytes, time.Since(startedAt), err == nil) | |
| b.metrics.recordSend(ctxTimeout, len(messages), batchBytes, time.Since(startedAt), err == nil) |
Use cfg.ChipIngressLogger (fallback nop) when wiring the HTTP client's service engine, and record send metrics with the request timeout context instead of context.Background.
| // When the batch emitter is enabled its lifecycle (Start/Stop) is owned by the Client's | ||
| // service engine, not by DualSourceEmitter. Calling Close() on it here would return | ||
| // ErrCannotStopUnstarted when the client is closed without ever being started. | ||
| var chipErr error | ||
| if !d.chipIngressBatchEmitterEnabled { | ||
| chipErr = d.chipIngressEmitter.Close() |
There was a problem hiding this comment.
This sounds like an implementation detail that shouldn't be tracked at this level. Can we find a way to separate ownership from implementation details?
Ticket: https://smartcontract-it.atlassian.net/browse/INFOPLAT-3436
Summary
This PR adds chip-ingress batch delivery behind beholder, makes
beholder.Clientown the batch emitter lifecycle, and adds a service-layer escape hatch for cleanup when a lifecycle-managed service is closed before it was ever started.What changed
ChipIngressBatchEmitterService, which batches emitted events and publishes them throughchipingress.Client.PublishBatchbeholder.Clientnow owns the optional batch emitter as a sub-service and starts/stops it through its service lifecycleservices.Config.CloseIfNeverStartedso ctor-held resources can still run their close hook even whenStart()was never calledDualSourceEmitter.Close()so it does not directly close a service-managed batch emitterpkg/chipingress/batch.Clientnow records batch request metrics and flushes shutdown work independently of caller cancellationbeholder.Clientdirectly and no longer uses test-only indirection inpkg/loop/server.gopkg/loop/server_test.goCL_CHIP_INGRESS_BATCH_EMITTER_ENABLEDData Flow
flowchart LR A["Caller or app code"] -->|"Emit(ctx, body, attrs)"| B["beholder.Client.Emitter"] B --> C["DualSourceEmitter"] C -->|"synchronous"| D["OTLP message emitter"] C -->|"batch enabled"| E["ChipIngressBatchEmitterService"] E -->|"queue event"| F["batch.Client"] F -->|"accumulate by size or interval"| G["PublishBatch request"] G --> H["chipingress.Client"] H --> I["Chip Ingress gRPC endpoint"] F --> J["batch client metrics"] E --> K["emitter success or drop metrics"] B --> L["beholder logger, tracer, and meter providers"]Dependency Diagram
flowchart TD S["loop.Server"] -->|"start and close"| BC["beholder.Client"] S --> CFG["loop.EnvConfig"] CFG --> FLAG["CL_CHIP_INGRESS_BATCH_EMITTER_ENABLED"] BC --> ENG1["services.Engine"] BC --> DSE["DualSourceEmitter"] BC --> CHIP["chipingress.Client"] BC --> OTLP["OTLP providers"] ENG1 --> BES["ChipIngressBatchEmitterService"] BES --> ENG2["services.Engine"] BES --> BATCH["batch.Client"] BATCH --> CHIP ENG1 --> CINS["CloseIfNeverStarted"] ENG2 --> CINS CINS --> SVC["services.Config"]Metrics
Added batch delivery metrics for:
chip_ingress.batch.send_requests_totalchip_ingress.batch.send_failures_totalchip_ingress.batch.request_size_messageschip_ingress.batch.request_size_byteschip_ingress.batch.request_latency_mschip_ingress.batch.config.infochip_ingress.events_sentchip_ingress.events_droppedTests
Added coverage for:
CloseIfNeverStartedlifecycle behaviorSupports
smartcontractkit/chainlink#21327