diff --git a/pkg/beholder/chip_ingress_batch_emitter_service_test.go b/pkg/beholder/chip_ingress_batch_emitter_service_test.go index 6bf44cb961..584a7d75ac 100644 --- a/pkg/beholder/chip_ingress_batch_emitter_service_test.go +++ b/pkg/beholder/chip_ingress_batch_emitter_service_test.go @@ -11,8 +11,10 @@ import ( "github.com/stretchr/testify/require" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/sdk/instrumentation" sdkmetric "go.opentelemetry.io/otel/sdk/metric" "go.opentelemetry.io/otel/sdk/metric/metricdata" + "go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest" "github.com/smartcontractkit/chainlink-common/pkg/beholder" "github.com/smartcontractkit/chainlink-common/pkg/chipingress" @@ -466,6 +468,30 @@ func TestChipIngressBatchEmitterService_Metrics(t *testing.T) { require.NoError(t, emitter.Close()) rm := collectEmitterMetrics(t, reader) + metricdatatest.AssertEqual(t, metricdata.ScopeMetrics{ + Scope: instrumentation.Scope{Name: "beholder/chip_ingress_batch_emitter"}, + Metrics: []metricdata.Metrics{ + { + Name: "chip_ingress.events_sent", + Description: "Total events successfully sent via PublishBatch", + Unit: "{event}", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet( + attribute.String("domain", "platform"), + attribute.String("entity", "MetricEvent"), + ), + Value: 1, + }, + }, + }, + }, + }, + }, mustEmitterScopeMetrics(t, rm, "beholder/chip_ingress_batch_emitter"), metricdatatest.IgnoreTimestamp()) + metric := mustEmitterMetric(t, rm, "chip_ingress.events_sent") sum, ok := metric.Data.(metricdata.Sum[int64]) require.True(t, ok) @@ -506,6 +532,30 @@ func TestChipIngressBatchEmitterService_Metrics(t *testing.T) { require.NoError(t, emitter.Close()) rm := collectEmitterMetrics(t, reader) + metricdatatest.AssertEqual(t, metricdata.ScopeMetrics{ + Scope: instrumentation.Scope{Name: "beholder/chip_ingress_batch_emitter"}, + Metrics: []metricdata.Metrics{ + { + Name: "chip_ingress.events_dropped", + Description: "Total events dropped (buffer full or send failure)", + Unit: "{event}", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet( + attribute.String("domain", "platform"), + attribute.String("entity", "MetricDropEvent"), + ), + Value: 1, + }, + }, + }, + }, + }, + }, mustEmitterScopeMetrics(t, rm, "beholder/chip_ingress_batch_emitter"), metricdatatest.IgnoreTimestamp()) + metric := mustEmitterMetric(t, rm, "chip_ingress.events_dropped") sum, ok := metric.Data.(metricdata.Sum[int64]) require.True(t, ok) @@ -576,6 +626,17 @@ func mustEmitterMetric(t *testing.T, rm metricdata.ResourceMetrics, name string) return metricdata.Metrics{} } +func mustEmitterScopeMetrics(t *testing.T, rm metricdata.ResourceMetrics, name string) metricdata.ScopeMetrics { + t.Helper() + for _, sm := range rm.ScopeMetrics { + if sm.Scope.Name == name { + return sm + } + } + t.Fatalf("scope metrics %q not found", name) + return metricdata.ScopeMetrics{} +} + func mustEmitterInt64SumPoint(t *testing.T, sum metricdata.Sum[int64], k1, v1, k2, v2 string) metricdata.DataPoint[int64] { t.Helper() for _, dp := range sum.DataPoints { diff --git a/pkg/chipingress/batch/client_test.go b/pkg/chipingress/batch/client_test.go index 5c73e5906d..2a6f4f9f2a 100644 --- a/pkg/chipingress/batch/client_test.go +++ b/pkg/chipingress/batch/client_test.go @@ -13,8 +13,10 @@ import ( "github.com/stretchr/testify/require" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/sdk/instrumentation" sdkmetric "go.opentelemetry.io/otel/sdk/metric" "go.opentelemetry.io/otel/sdk/metric/metricdata" + "go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest" "github.com/smartcontractkit/chainlink-common/pkg/chipingress" "github.com/smartcontractkit/chainlink-common/pkg/chipingress/mocks" @@ -1157,6 +1159,77 @@ func TestBatchClient_Metrics(t *testing.T) { client.Stop() rm := collectResourceMetrics(t, reader) + metricdatatest.AssertEqual(t, metricdata.ScopeMetrics{ + Scope: instrumentation.Scope{Name: "chipingress/batch_client"}, + Metrics: []metricdata.Metrics{ + { + Name: "chip_ingress.batch.send_requests_total", + Description: "Total PublishBatch requests sent by batch client", + Unit: "{request}", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{ + {Attributes: attribute.NewSet(attribute.String("status", "success"))}, + }, + }, + }, + { + Name: "chip_ingress.batch.request_size_messages", + Description: "PublishBatch request size measured in number of events", + Unit: "{event}", + Data: metricdata.Histogram[int64]{ + Temporality: metricdata.CumulativeTemporality, + DataPoints: []metricdata.HistogramDataPoint[int64]{ + {Attributes: attribute.NewSet(attribute.Int("max_batch_size", 1))}, + }, + }, + }, + { + Name: "chip_ingress.batch.request_size_bytes", + Description: "PublishBatch request size measured in bytes", + Unit: "By", + Data: metricdata.Histogram[int64]{ + Temporality: metricdata.CumulativeTemporality, + DataPoints: []metricdata.HistogramDataPoint[int64]{ + {Attributes: attribute.NewSet(attribute.Int("max_grpc_request_size_bytes", 2048))}, + }, + }, + }, + { + Name: "chip_ingress.batch.request_latency_ms", + Description: "PublishBatch end-to-end latency in milliseconds", + Unit: "ms", + Data: metricdata.Histogram[float64]{ + Temporality: metricdata.CumulativeTemporality, + DataPoints: []metricdata.HistogramDataPoint[float64]{ + {Attributes: attribute.NewSet(attribute.String("status", "success"))}, + }, + }, + }, + { + Name: "chip_ingress.batch.config.info", + Description: "Batch client configuration info metric", + Unit: "{info}", + Data: metricdata.Gauge[int64]{ + DataPoints: []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet( + attribute.Int("max_batch_size", 1), + attribute.Int("message_buffer_size", 10), + attribute.Int("max_concurrent_sends", 1), + attribute.Int64("batch_interval_ms", 1000), + attribute.Int64("max_publish_timeout_ms", 5000), + attribute.Int64("shutdown_timeout_ms", 5000), + attribute.Bool("clone_event", true), + attribute.Int("max_grpc_request_size_bytes", 2048), + ), + }, + }, + }, + }, + }, + }, mustScopeMetrics(t, rm, "chipingress/batch_client"), metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreValue()) reqTotal := mustMetric(t, rm, "chip_ingress.batch.send_requests_total") reqSum, ok := reqTotal.Data.(metricdata.Sum[int64]) @@ -1222,6 +1295,87 @@ func TestBatchClient_Metrics(t *testing.T) { client.Stop() rm := collectResourceMetrics(t, reader) + metricdatatest.AssertEqual(t, metricdata.ScopeMetrics{ + Scope: instrumentation.Scope{Name: "chipingress/batch_client"}, + Metrics: []metricdata.Metrics{ + { + Name: "chip_ingress.batch.send_requests_total", + Description: "Total PublishBatch requests sent by batch client", + Unit: "{request}", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{ + {Attributes: attribute.NewSet(attribute.String("status", "failure"))}, + }, + }, + }, + { + Name: "chip_ingress.batch.send_failures_total", + Description: "Total failed PublishBatch requests sent by batch client", + Unit: "{request}", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{{}}, + }, + }, + { + Name: "chip_ingress.batch.request_size_messages", + Description: "PublishBatch request size measured in number of events", + Unit: "{event}", + Data: metricdata.Histogram[int64]{ + Temporality: metricdata.CumulativeTemporality, + DataPoints: []metricdata.HistogramDataPoint[int64]{ + {Attributes: attribute.NewSet(attribute.Int("max_batch_size", 1))}, + }, + }, + }, + { + Name: "chip_ingress.batch.request_size_bytes", + Description: "PublishBatch request size measured in bytes", + Unit: "By", + Data: metricdata.Histogram[int64]{ + Temporality: metricdata.CumulativeTemporality, + DataPoints: []metricdata.HistogramDataPoint[int64]{ + {Attributes: attribute.NewSet(attribute.Int("max_grpc_request_size_bytes", 16*1024*1024))}, + }, + }, + }, + { + Name: "chip_ingress.batch.request_latency_ms", + Description: "PublishBatch end-to-end latency in milliseconds", + Unit: "ms", + Data: metricdata.Histogram[float64]{ + Temporality: metricdata.CumulativeTemporality, + DataPoints: []metricdata.HistogramDataPoint[float64]{ + {Attributes: attribute.NewSet(attribute.String("status", "failure"))}, + }, + }, + }, + { + Name: "chip_ingress.batch.config.info", + Description: "Batch client configuration info metric", + Unit: "{info}", + Data: metricdata.Gauge[int64]{ + DataPoints: []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet( + attribute.Int("max_batch_size", 1), + attribute.Int("message_buffer_size", 10), + attribute.Int("max_concurrent_sends", 1), + attribute.Int64("batch_interval_ms", 100), + attribute.Int64("max_publish_timeout_ms", 5000), + attribute.Int64("shutdown_timeout_ms", 5000), + attribute.Bool("clone_event", true), + attribute.Int("max_grpc_request_size_bytes", 16*1024*1024), + ), + }, + }, + }, + }, + }, + }, mustScopeMetrics(t, rm, "chipingress/batch_client"), metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreValue()) reqTotal := mustMetric(t, rm, "chip_ingress.batch.send_requests_total") reqSum, ok := reqTotal.Data.(metricdata.Sum[int64]) @@ -1302,6 +1456,17 @@ func mustMetric(t *testing.T, rm metricdata.ResourceMetrics, name string) metric return metricdata.Metrics{} } +func mustScopeMetrics(t *testing.T, rm metricdata.ResourceMetrics, name string) metricdata.ScopeMetrics { + t.Helper() + for _, sm := range rm.ScopeMetrics { + if sm.Scope.Name == name { + return sm + } + } + t.Fatalf("scope metrics %q not found", name) + return metricdata.ScopeMetrics{} +} + func mustInt64SumPointWithAttr(t *testing.T, sum metricdata.Sum[int64], key, want string) metricdata.DataPoint[int64] { t.Helper() for _, dp := range sum.DataPoints {