Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 61 additions & 0 deletions pkg/beholder/chip_ingress_batch_emitter_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@ import (
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/instrumentation"
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest"

"github.com/smartcontractkit/chainlink-common/pkg/beholder"
"github.com/smartcontractkit/chainlink-common/pkg/chipingress"
Expand Down Expand Up @@ -466,6 +468,30 @@ func TestChipIngressBatchEmitterService_Metrics(t *testing.T) {
require.NoError(t, emitter.Close())

rm := collectEmitterMetrics(t, reader)
metricdatatest.AssertEqual(t, metricdata.ScopeMetrics{
Scope: instrumentation.Scope{Name: "beholder/chip_ingress_batch_emitter"},
Metrics: []metricdata.Metrics{
{
Name: "chip_ingress.events_sent",
Description: "Total events successfully sent via PublishBatch",
Unit: "{event}",
Data: metricdata.Sum[int64]{
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: true,
DataPoints: []metricdata.DataPoint[int64]{
{
Attributes: attribute.NewSet(
attribute.String("domain", "platform"),
attribute.String("entity", "MetricEvent"),
),
Value: 1,
},
},
},
},
},
}, mustEmitterScopeMetrics(t, rm, "beholder/chip_ingress_batch_emitter"), metricdatatest.IgnoreTimestamp())

metric := mustEmitterMetric(t, rm, "chip_ingress.events_sent")
sum, ok := metric.Data.(metricdata.Sum[int64])
require.True(t, ok)
Expand Down Expand Up @@ -506,6 +532,30 @@ func TestChipIngressBatchEmitterService_Metrics(t *testing.T) {
require.NoError(t, emitter.Close())

rm := collectEmitterMetrics(t, reader)
metricdatatest.AssertEqual(t, metricdata.ScopeMetrics{
Scope: instrumentation.Scope{Name: "beholder/chip_ingress_batch_emitter"},
Metrics: []metricdata.Metrics{
{
Name: "chip_ingress.events_dropped",
Description: "Total events dropped (buffer full or send failure)",
Unit: "{event}",
Data: metricdata.Sum[int64]{
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: true,
DataPoints: []metricdata.DataPoint[int64]{
{
Attributes: attribute.NewSet(
attribute.String("domain", "platform"),
attribute.String("entity", "MetricDropEvent"),
),
Value: 1,
},
},
},
},
},
}, mustEmitterScopeMetrics(t, rm, "beholder/chip_ingress_batch_emitter"), metricdatatest.IgnoreTimestamp())

metric := mustEmitterMetric(t, rm, "chip_ingress.events_dropped")
sum, ok := metric.Data.(metricdata.Sum[int64])
require.True(t, ok)
Expand Down Expand Up @@ -576,6 +626,17 @@ func mustEmitterMetric(t *testing.T, rm metricdata.ResourceMetrics, name string)
return metricdata.Metrics{}
}

func mustEmitterScopeMetrics(t *testing.T, rm metricdata.ResourceMetrics, name string) metricdata.ScopeMetrics {
t.Helper()
for _, sm := range rm.ScopeMetrics {
if sm.Scope.Name == name {
return sm
}
}
t.Fatalf("scope metrics %q not found", name)
return metricdata.ScopeMetrics{}
}

func mustEmitterInt64SumPoint(t *testing.T, sum metricdata.Sum[int64], k1, v1, k2, v2 string) metricdata.DataPoint[int64] {
t.Helper()
for _, dp := range sum.DataPoints {
Expand Down
165 changes: 165 additions & 0 deletions pkg/chipingress/batch/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@ import (
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/instrumentation"
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest"

"github.com/smartcontractkit/chainlink-common/pkg/chipingress"
"github.com/smartcontractkit/chainlink-common/pkg/chipingress/mocks"
Expand Down Expand Up @@ -1157,6 +1159,77 @@ func TestBatchClient_Metrics(t *testing.T) {

client.Stop()
rm := collectResourceMetrics(t, reader)
metricdatatest.AssertEqual(t, metricdata.ScopeMetrics{
Scope: instrumentation.Scope{Name: "chipingress/batch_client"},
Metrics: []metricdata.Metrics{
{
Name: "chip_ingress.batch.send_requests_total",
Description: "Total PublishBatch requests sent by batch client",
Unit: "{request}",
Data: metricdata.Sum[int64]{
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: true,
DataPoints: []metricdata.DataPoint[int64]{
{Attributes: attribute.NewSet(attribute.String("status", "success"))},
},
},
},
{
Name: "chip_ingress.batch.request_size_messages",
Description: "PublishBatch request size measured in number of events",
Unit: "{event}",
Data: metricdata.Histogram[int64]{
Temporality: metricdata.CumulativeTemporality,
DataPoints: []metricdata.HistogramDataPoint[int64]{
{Attributes: attribute.NewSet(attribute.Int("max_batch_size", 1))},
},
},
},
{
Name: "chip_ingress.batch.request_size_bytes",
Description: "PublishBatch request size measured in bytes",
Unit: "By",
Data: metricdata.Histogram[int64]{
Temporality: metricdata.CumulativeTemporality,
DataPoints: []metricdata.HistogramDataPoint[int64]{
{Attributes: attribute.NewSet(attribute.Int("max_grpc_request_size_bytes", 2048))},
},
},
},
{
Name: "chip_ingress.batch.request_latency_ms",
Description: "PublishBatch end-to-end latency in milliseconds",
Unit: "ms",
Data: metricdata.Histogram[float64]{
Temporality: metricdata.CumulativeTemporality,
DataPoints: []metricdata.HistogramDataPoint[float64]{
{Attributes: attribute.NewSet(attribute.String("status", "success"))},
},
},
},
{
Name: "chip_ingress.batch.config.info",
Description: "Batch client configuration info metric",
Unit: "{info}",
Data: metricdata.Gauge[int64]{
DataPoints: []metricdata.DataPoint[int64]{
{
Attributes: attribute.NewSet(
attribute.Int("max_batch_size", 1),
attribute.Int("message_buffer_size", 10),
attribute.Int("max_concurrent_sends", 1),
attribute.Int64("batch_interval_ms", 1000),
attribute.Int64("max_publish_timeout_ms", 5000),
attribute.Int64("shutdown_timeout_ms", 5000),
attribute.Bool("clone_event", true),
attribute.Int("max_grpc_request_size_bytes", 2048),
),
},
},
},
},
},
}, mustScopeMetrics(t, rm, "chipingress/batch_client"), metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreValue())

reqTotal := mustMetric(t, rm, "chip_ingress.batch.send_requests_total")
reqSum, ok := reqTotal.Data.(metricdata.Sum[int64])
Expand Down Expand Up @@ -1222,6 +1295,87 @@ func TestBatchClient_Metrics(t *testing.T) {

client.Stop()
rm := collectResourceMetrics(t, reader)
metricdatatest.AssertEqual(t, metricdata.ScopeMetrics{
Scope: instrumentation.Scope{Name: "chipingress/batch_client"},
Metrics: []metricdata.Metrics{
{
Name: "chip_ingress.batch.send_requests_total",
Description: "Total PublishBatch requests sent by batch client",
Unit: "{request}",
Data: metricdata.Sum[int64]{
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: true,
DataPoints: []metricdata.DataPoint[int64]{
{Attributes: attribute.NewSet(attribute.String("status", "failure"))},
},
},
},
{
Name: "chip_ingress.batch.send_failures_total",
Description: "Total failed PublishBatch requests sent by batch client",
Unit: "{request}",
Data: metricdata.Sum[int64]{
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: true,
DataPoints: []metricdata.DataPoint[int64]{{}},
},
},
{
Name: "chip_ingress.batch.request_size_messages",
Description: "PublishBatch request size measured in number of events",
Unit: "{event}",
Data: metricdata.Histogram[int64]{
Temporality: metricdata.CumulativeTemporality,
DataPoints: []metricdata.HistogramDataPoint[int64]{
{Attributes: attribute.NewSet(attribute.Int("max_batch_size", 1))},
},
},
},
{
Name: "chip_ingress.batch.request_size_bytes",
Description: "PublishBatch request size measured in bytes",
Unit: "By",
Data: metricdata.Histogram[int64]{
Temporality: metricdata.CumulativeTemporality,
DataPoints: []metricdata.HistogramDataPoint[int64]{
{Attributes: attribute.NewSet(attribute.Int("max_grpc_request_size_bytes", 16*1024*1024))},
},
},
},
{
Name: "chip_ingress.batch.request_latency_ms",
Description: "PublishBatch end-to-end latency in milliseconds",
Unit: "ms",
Data: metricdata.Histogram[float64]{
Temporality: metricdata.CumulativeTemporality,
DataPoints: []metricdata.HistogramDataPoint[float64]{
{Attributes: attribute.NewSet(attribute.String("status", "failure"))},
},
},
},
{
Name: "chip_ingress.batch.config.info",
Description: "Batch client configuration info metric",
Unit: "{info}",
Data: metricdata.Gauge[int64]{
DataPoints: []metricdata.DataPoint[int64]{
{
Attributes: attribute.NewSet(
attribute.Int("max_batch_size", 1),
attribute.Int("message_buffer_size", 10),
attribute.Int("max_concurrent_sends", 1),
attribute.Int64("batch_interval_ms", 100),
attribute.Int64("max_publish_timeout_ms", 5000),
attribute.Int64("shutdown_timeout_ms", 5000),
attribute.Bool("clone_event", true),
attribute.Int("max_grpc_request_size_bytes", 16*1024*1024),
),
},
},
},
},
},
}, mustScopeMetrics(t, rm, "chipingress/batch_client"), metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreValue())

reqTotal := mustMetric(t, rm, "chip_ingress.batch.send_requests_total")
reqSum, ok := reqTotal.Data.(metricdata.Sum[int64])
Expand Down Expand Up @@ -1302,6 +1456,17 @@ func mustMetric(t *testing.T, rm metricdata.ResourceMetrics, name string) metric
return metricdata.Metrics{}
}

func mustScopeMetrics(t *testing.T, rm metricdata.ResourceMetrics, name string) metricdata.ScopeMetrics {
t.Helper()
for _, sm := range rm.ScopeMetrics {
if sm.Scope.Name == name {
return sm
}
}
t.Fatalf("scope metrics %q not found", name)
return metricdata.ScopeMetrics{}
}

func mustInt64SumPointWithAttr(t *testing.T, sum metricdata.Sum[int64], key, want string) metricdata.DataPoint[int64] {
t.Helper()
for _, dp := range sum.DataPoints {
Expand Down
Loading