From bbceda84da6604fc01a6935daf995448c32e8ccb Mon Sep 17 00:00:00 2001 From: Abhishek Rai Date: Thu, 21 May 2026 15:24:41 -0700 Subject: [PATCH] Deepen run transition metrics --- src/github.go | 130 ++------------------------ src/integration_test.go | 92 +++++++++++++++++- src/metrics_test.go | 8 +- src/run_transition.go | 187 +++++++++++++++++++++++++++++++++++++ src/run_transition_test.go | 155 ++++++++++++++++++++++++++++++ src/test_support_test.go | 2 + 6 files changed, 448 insertions(+), 126 deletions(-) create mode 100644 src/run_transition.go create mode 100644 src/run_transition_test.go diff --git a/src/github.go b/src/github.go index 699ff4e..872ddcf 100644 --- a/src/github.go +++ b/src/github.go @@ -11,7 +11,6 @@ import ( "strings" "time" - "github.com/prometheus/client_golang/prometheus" "go.uber.org/zap" ) @@ -82,29 +81,6 @@ type GithubPullRequest struct { Repository GithubRepo `json:"repository"` } -type runMetricDetails struct { - repository string - branch string - name string - status string - conclusion string - startedAt string - endedAt string -} - -type runMetricSet struct { - statusCounter *prometheus.CounterVec - queuedGauge *prometheus.GaugeVec - inProgressGauge *prometheus.GaugeVec - completedGauge *prometheus.GaugeVec - durationHistogram *prometheus.HistogramVec -} - -type runStoreMethods struct { - get func(context.Context, int) (RunState, bool, error) - update func(context.Context, int, RunState) error -} - const ( statusQueued = "queued" statusInProgress = "in_progress" @@ -202,86 +178,6 @@ func shouldApplyStateTransition(previous, next RunState) bool { return true } -func applyGaugeDelta(details RunState, delta float64, queuedGauge, inProgressGauge, completedGauge *prometheus.GaugeVec) { - switch normalizeStatus(details.Status) { - case statusQueued: - queuedGauge.WithLabelValues(details.Repository, details.Branch, details.Name).Add(delta) - case statusInProgress: - inProgressGauge.WithLabelValues(details.Repository, details.Branch, details.Name).Add(delta) - case statusCompleted: - completedGauge.WithLabelValues(details.Repository, details.Branch, details.Conclusion, details.Name).Add(delta) - } -} - -func observeDuration(details RunState, durationHistogram *prometheus.HistogramVec) { - if normalizeStatus(details.Status) != statusCompleted { - return - } - - startedAt, startedOK := parseMetricTime(details.StartedAt) - endedAt, endedOK := parseMetricTime(details.EndedAt) - if !startedOK || !endedOK || endedAt.Before(startedAt) { - return - } - - durationHistogram.WithLabelValues( - details.Repository, - details.Branch, - details.Name, - details.Status, - details.Conclusion, - ).Observe(endedAt.Sub(startedAt).Seconds()) -} - -func applyStatefulMetrics(details RunState, previous *RunState, metrics runMetricSet) { - metrics.statusCounter.WithLabelValues( - details.Repository, - details.Branch, - details.Name, - details.Status, - details.Conclusion, - ).Inc() - - if previous != nil { - applyGaugeDelta(*previous, -1, metrics.queuedGauge, metrics.inProgressGauge, metrics.completedGauge) - } - applyGaugeDelta(details, 1, metrics.queuedGauge, metrics.inProgressGauge, metrics.completedGauge) - - if previous == nil || normalizeStatus(previous.Status) != statusCompleted { - observeDuration(details, metrics.durationHistogram) - } -} - -func getPreviousState(ctx context.Context, id int, getFn func(context.Context, int) (RunState, bool, error), entityName string) (*RunState, bool) { - if stateStore == nil { - return nil, true - } - - previous, found, err := getFn(ctx, id) - if err != nil { - logger.Error("Failed to load run state from redis", zap.String("entity", entityName), zap.Int("id", id), zap.Error(err)) - return nil, false - } - if !found { - return nil, true - } - - return &previous, true -} - -func persistRunState(ctx context.Context, id int, next RunState, updateFn func(context.Context, int, RunState) error, entityName string) bool { - if stateStore == nil { - return true - } - - if err := updateFn(ctx, id, next); err != nil { - logger.Error("Failed to update run state in redis", zap.String("entity", entityName), zap.Int("id", id), zap.Error(err)) - return false - } - - return true -} - func updateTrackedRunMetrics( ctx context.Context, id int, @@ -290,26 +186,18 @@ func updateTrackedRunMetrics( entityName string, metrics runMetricSet, ) { - nextState := normalizeRunState(details) - - if stateStore == nil { - applyStatefulMetrics(nextState, nil, metrics) - return + var storeAdapter runTransitionStore + if stateStore != nil { + storeAdapter = runStoreAdapter{methods: store} } - previousState, ok := getPreviousState(ctx, id, store.get, entityName) - if !ok { - return - } - if previousState != nil && !shouldApplyStateTransition(*previousState, nextState) { - logger.Debug("Skipping stale or duplicate run transition", zap.String("entity", entityName), zap.Int("id", id), zap.String("status", nextState.Status), zap.String("conclusion", nextState.Conclusion)) - return + processor := &runTransitionProcessor{ + store: storeAdapter, + recorder: prometheusRunTransitionRecorder{metrics: metrics}, + logger: logger, + entityName: entityName, } - if !persistRunState(ctx, id, nextState, store.update, entityName) { - return - } - - applyStatefulMetrics(nextState, previousState, metrics) + processor.Apply(ctx, id, details) } func workflowRunStoreMethods() runStoreMethods { diff --git a/src/integration_test.go b/src/integration_test.go index d90cdd0..c938ddd 100644 --- a/src/integration_test.go +++ b/src/integration_test.go @@ -8,6 +8,8 @@ import ( "crypto/hmac" "crypto/sha256" "encoding/hex" + "encoding/json" + "errors" "fmt" "io" "net/http" @@ -22,6 +24,11 @@ import ( "go.uber.org/zap" ) +const ( + integrationRunStartedAt = "2023-01-01T00:00:00Z" + integrationRunCompletedAt = "2023-01-01T01:00:00Z" +) + func TestIntegrationWebhookMetrics(t *testing.T) { testCases := []struct { name string @@ -188,6 +195,58 @@ func TestIntegrationDuplicateDeliveryDoesNotInflateMetrics(t *testing.T) { } } +func TestIntegrationDeliveryStoreFailurePreventsWebhookProcessing(t *testing.T) { + server := newIntegrationTestServerWithStateStore(t, asyncProcessorConfig{WorkerCount: 1, QueueSize: 8}, failingDeliveryStateStore{ + inMemoryStateStore: newInMemoryStateStore(), + }) + defer server.Close() + + body := mustReadFixture(t, "workflow_run.json") + resp := sendWebhookRequest(t, server.URL, githubEventWorkflowRun, body, "delivery-store-failure") + assertResponseStatus(t, resp, http.StatusInternalServerError) + + metrics := mustFetchMetrics(t, server.URL) + if strings.Contains(metrics, `promgithub_workflow_status{branch="main",conclusion="success",repository="user/repo",workflow_name="CI",workflow_status="completed"} 1`) { + t.Fatalf("workflow metrics changed after delivery store failure:\n%s", metrics) + } + if strings.Contains(metrics, `promgithub_event_processed_total{event_type="workflow_run"}`) { + t.Fatalf("event was enqueued after delivery store failure:\n%s", metrics) + } +} + +func TestIntegrationWorkflowRunLifecycleBalancesStatefulMetrics(t *testing.T) { + server := newIntegrationTestServer(t) + defer server.Close() + + queuedBody := workflowRunFixtureWithStatus(t, statusQueued, "", integrationRunStartedAt) + queuedResp := sendWebhookRequest(t, server.URL, githubEventWorkflowRun, queuedBody, "delivery-lifecycle-queued") + assertResponseStatus(t, queuedResp, http.StatusAccepted) + waitForMetricsSubstring(t, server.URL, `promgithub_workflow_queued{branch="main",repository="user/repo",workflow_name="CI"} 1`) + + inProgressBody := workflowRunFixtureWithStatus(t, statusInProgress, "", integrationRunStartedAt) + inProgressResp := sendWebhookRequest(t, server.URL, githubEventWorkflowRun, inProgressBody, "delivery-lifecycle-in-progress") + assertResponseStatus(t, inProgressResp, http.StatusAccepted) + waitForMetricsSubstring(t, server.URL, `promgithub_workflow_in_progress{branch="main",repository="user/repo",workflow_name="CI"} 1`) + + completedBody := workflowRunFixtureWithStatus(t, statusCompleted, testConclusionSuccess, integrationRunCompletedAt) + completedResp := sendWebhookRequest(t, server.URL, githubEventWorkflowRun, completedBody, "delivery-lifecycle-completed") + assertResponseStatus(t, completedResp, http.StatusAccepted) + + metrics := waitForMetricsSubstring(t, server.URL, `promgithub_workflow_completed{branch="main",repository="user/repo",workflow_conclusion="success",workflow_name="CI"} 1`) + expectedMetrics := []string{ + `promgithub_workflow_queued{branch="main",repository="user/repo",workflow_name="CI"} 0`, + `promgithub_workflow_in_progress{branch="main",repository="user/repo",workflow_name="CI"} 0`, + `promgithub_workflow_completed{branch="main",repository="user/repo",workflow_conclusion="success",workflow_name="CI"} 1`, + `promgithub_workflow_duration_sum{branch="main",conclusion="success",repository="user/repo",workflow_name="CI",workflow_status="completed"} 3600`, + `promgithub_workflow_duration_count{branch="main",conclusion="success",repository="user/repo",workflow_name="CI",workflow_status="completed"} 1`, + } + for _, expected := range expectedMetrics { + if !strings.Contains(metrics, expected) { + t.Fatalf("expected metrics to contain %q, got:\n%s", expected, metrics) + } + } +} + func TestIntegrationAsyncQueueFullReturnsUnavailableAndExposesQueueDropMetrics(t *testing.T) { server := newIntegrationTestServerWithAsyncConfig(t, asyncProcessorConfig{WorkerCount: 1, QueueSize: 1}) defer server.Close() @@ -315,11 +374,16 @@ func newIntegrationTestServer(t *testing.T) *httptest.Server { } func newIntegrationTestServerWithAsyncConfig(t *testing.T, cfg asyncProcessorConfig) *httptest.Server { + t.Helper() + return newIntegrationTestServerWithStateStore(t, cfg, newInMemoryStateStore()) +} + +func newIntegrationTestServerWithStateStore(t *testing.T, cfg asyncProcessorConfig, store StateStore) *httptest.Server { t.Helper() resetIntegrationTestMetrics() githubWebhookSecret = []byte("integration-test-secret") - stateStore = newInMemoryStateStore() + stateStore = store eventProcessor = newAsyncEventProcessor(cfg, zap.NewNop()) eventProcessor.Start() t.Cleanup(func() { @@ -334,6 +398,14 @@ func newIntegrationTestServerWithAsyncConfig(t *testing.T, cfg asyncProcessorCon return httptest.NewServer(router) } +type failingDeliveryStateStore struct { + *inMemoryStateStore +} + +func (s failingDeliveryStateStore) MarkDeliveryProcessed(context.Context, string) (bool, error) { + return false, errors.New("delivery store unavailable") +} + func assertResponseStatus(t *testing.T, resp *http.Response, expectedStatus int) { t.Helper() defer func() { _ = resp.Body.Close() }() @@ -390,6 +462,24 @@ func mustReadFixture(t *testing.T, name string) []byte { return body } +func workflowRunFixtureWithStatus(t *testing.T, status, conclusion, updatedAt string) []byte { + t.Helper() + + var payload GithubWorkflow + if err := json.Unmarshal(mustReadFixture(t, "workflow_run.json"), &payload); err != nil { + t.Fatalf("failed to unmarshal workflow fixture: %v", err) + } + payload.Workflow.Status = status + payload.Workflow.Conclusion = conclusion + payload.Workflow.UpdatedAt = updatedAt + + body, err := json.Marshal(payload) + if err != nil { + t.Fatalf("failed to marshal workflow fixture: %v", err) + } + return body +} + func sendWebhookRequest(t *testing.T, serverURL, eventType string, body []byte, deliveryID string) *http.Response { t.Helper() signature := webhookSignature(body, githubWebhookSecret) diff --git a/src/metrics_test.go b/src/metrics_test.go index b72c4af..fb1e40a 100644 --- a/src/metrics_test.go +++ b/src/metrics_test.go @@ -354,7 +354,7 @@ func TestWorkflowGaugeTransitionIsIdempotent(t *testing.T) { updateWorkflowMetrics(context.Background(), inProgressBody) payload.Workflow.Status = statusCompleted - payload.Workflow.Conclusion = "success" + payload.Workflow.Conclusion = testConclusionSuccess payload.Workflow.UpdatedAt = "2024-11-21T12:00:00Z" completedBody, _ := json.Marshal(payload) updateWorkflowMetrics(context.Background(), completedBody) @@ -366,7 +366,7 @@ func TestWorkflowGaugeTransitionIsIdempotent(t *testing.T) { if got := testutil.ToFloat64(workflowInProgressGauge.WithLabelValues("user/repo", "main", "CI")); got != 0 { t.Fatalf("expected in progress gauge to be 0, got %v", got) } - if got := testutil.ToFloat64(workflowCompletedGauge.WithLabelValues("user/repo", "main", "success", "CI")); got != 1 { + if got := testutil.ToFloat64(workflowCompletedGauge.WithLabelValues("user/repo", "main", testConclusionSuccess, "CI")); got != 1 { t.Fatalf("expected completed gauge to be 1, got %v", got) } } @@ -401,7 +401,7 @@ func TestJobGaugeTransitionIsIdempotent(t *testing.T) { updateJobMetrics(context.Background(), inProgressBody) payload.Job.Status = statusCompleted - payload.Job.Conclusion = "success" + payload.Job.Conclusion = testConclusionSuccess payload.Job.CompletedAt = "2024-11-21T12:00:00Z" completedBody, _ := json.Marshal(payload) updateJobMetrics(context.Background(), completedBody) @@ -413,7 +413,7 @@ func TestJobGaugeTransitionIsIdempotent(t *testing.T) { if got := testutil.ToFloat64(jobInProgressGauge.WithLabelValues("user/repo", "main", "CI")); got != 0 { t.Fatalf("expected in progress gauge to be 0, got %v", got) } - if got := testutil.ToFloat64(jobCompletedGauge.WithLabelValues("user/repo", "main", "success", "CI")); got != 1 { + if got := testutil.ToFloat64(jobCompletedGauge.WithLabelValues("user/repo", "main", testConclusionSuccess, "CI")); got != 1 { t.Fatalf("expected completed gauge to be 1, got %v", got) } } diff --git a/src/run_transition.go b/src/run_transition.go new file mode 100644 index 0000000..6f14820 --- /dev/null +++ b/src/run_transition.go @@ -0,0 +1,187 @@ +package main + +import ( + "context" + + "github.com/prometheus/client_golang/prometheus" + "go.uber.org/zap" +) + +type runMetricDetails struct { + repository string + branch string + name string + status string + conclusion string + startedAt string + endedAt string +} + +type runMetricSet struct { + statusCounter *prometheus.CounterVec + queuedGauge *prometheus.GaugeVec + inProgressGauge *prometheus.GaugeVec + completedGauge *prometheus.GaugeVec + durationHistogram *prometheus.HistogramVec +} + +type runStoreMethods struct { + get func(context.Context, int) (RunState, bool, error) + update func(context.Context, int, RunState) error +} + +type runTransitionStore interface { + GetRunState(context.Context, int) (RunState, bool, error) + UpdateRunState(context.Context, int, RunState) error +} + +type runTransitionRecorder interface { + RecordStatus(RunState) + AddGauge(RunState, float64) + ObserveDuration(RunState, float64) +} + +type runTransitionResult struct { + Applied bool + Skipped bool + Err error +} + +type runTransitionProcessor struct { + store runTransitionStore + recorder runTransitionRecorder + logger *zap.Logger + entityName string +} + +func (p *runTransitionProcessor) Apply(ctx context.Context, id int, details runMetricDetails) runTransitionResult { + nextState := normalizeRunState(details) + + if p.store == nil { + p.recordTransition(nextState, nil) + return runTransitionResult{Applied: true} + } + + previousState, found, err := p.store.GetRunState(ctx, id) + if err != nil { + p.logError("Failed to load run state from redis", id, err) + return runTransitionResult{Err: err} + } + + var previous *RunState + if found { + previous = &previousState + if !shouldApplyStateTransition(previousState, nextState) { + p.logDebug("Skipping stale or duplicate run transition", id, nextState) + return runTransitionResult{Skipped: true} + } + } + + if err := p.store.UpdateRunState(ctx, id, nextState); err != nil { + p.logError("Failed to update run state in redis", id, err) + return runTransitionResult{Err: err} + } + + p.recordTransition(nextState, previous) + return runTransitionResult{Applied: true} +} + +func (p *runTransitionProcessor) recordTransition(nextState RunState, previous *RunState) { + if p.recorder == nil { + return + } + + p.recorder.RecordStatus(nextState) + if previous != nil { + p.recorder.AddGauge(*previous, -1) + } + p.recorder.AddGauge(nextState, 1) + + if previous == nil || normalizeStatus(previous.Status) != statusCompleted { + if duration, ok := runDurationSeconds(nextState); ok { + p.recorder.ObserveDuration(nextState, duration) + } + } +} + +func (p *runTransitionProcessor) logError(message string, id int, err error) { + if p.logger == nil { + return + } + + p.logger.Error(message, zap.String("entity", p.entityName), zap.Int("id", id), zap.Error(err)) +} + +func (p *runTransitionProcessor) logDebug(message string, id int, state RunState) { + if p.logger == nil { + return + } + + p.logger.Debug(message, + zap.String("entity", p.entityName), + zap.Int("id", id), + zap.String("status", state.Status), + zap.String("conclusion", state.Conclusion), + ) +} + +func runDurationSeconds(state RunState) (float64, bool) { + if normalizeStatus(state.Status) != statusCompleted { + return 0, false + } + + startedAt, startedOK := parseMetricTime(state.StartedAt) + endedAt, endedOK := parseMetricTime(state.EndedAt) + if !startedOK || !endedOK || endedAt.Before(startedAt) { + return 0, false + } + + return endedAt.Sub(startedAt).Seconds(), true +} + +type runStoreAdapter struct { + methods runStoreMethods +} + +func (a runStoreAdapter) GetRunState(ctx context.Context, id int) (RunState, bool, error) { + return a.methods.get(ctx, id) +} + +func (a runStoreAdapter) UpdateRunState(ctx context.Context, id int, state RunState) error { + return a.methods.update(ctx, id, state) +} + +type prometheusRunTransitionRecorder struct { + metrics runMetricSet +} + +func (r prometheusRunTransitionRecorder) RecordStatus(state RunState) { + r.metrics.statusCounter.WithLabelValues( + state.Repository, + state.Branch, + state.Name, + state.Status, + state.Conclusion, + ).Inc() +} + +func (r prometheusRunTransitionRecorder) AddGauge(state RunState, delta float64) { + switch normalizeStatus(state.Status) { + case statusQueued: + r.metrics.queuedGauge.WithLabelValues(state.Repository, state.Branch, state.Name).Add(delta) + case statusInProgress: + r.metrics.inProgressGauge.WithLabelValues(state.Repository, state.Branch, state.Name).Add(delta) + case statusCompleted: + r.metrics.completedGauge.WithLabelValues(state.Repository, state.Branch, state.Conclusion, state.Name).Add(delta) + } +} + +func (r prometheusRunTransitionRecorder) ObserveDuration(state RunState, durationSeconds float64) { + r.metrics.durationHistogram.WithLabelValues( + state.Repository, + state.Branch, + state.Name, + state.Status, + state.Conclusion, + ).Observe(durationSeconds) +} diff --git a/src/run_transition_test.go b/src/run_transition_test.go new file mode 100644 index 0000000..d13509e --- /dev/null +++ b/src/run_transition_test.go @@ -0,0 +1,155 @@ +//go:build !integration + +package main + +import ( + "context" + "errors" + "testing" +) + +const ( + runTransitionTestRepository = "user/repo" + runTransitionTestBranch = "main" + runTransitionTestName = "CI" + runTransitionStartedAt = "2023-01-01T00:00:00Z" + runTransitionCompletedAt = "2023-01-01T01:00:00Z" +) + +type fakeRunTransitionStore struct { + state RunState + found bool + getErr error + updateErr error + updateCalls int +} + +func (s *fakeRunTransitionStore) GetRunState(_ context.Context, _ int) (RunState, bool, error) { + if s.getErr != nil { + return RunState{}, false, s.getErr + } + + return s.state, s.found, nil +} + +func (s *fakeRunTransitionStore) UpdateRunState(_ context.Context, _ int, state RunState) error { + if s.updateErr != nil { + return s.updateErr + } + + s.state = state + s.found = true + s.updateCalls++ + return nil +} + +type fakeRunTransitionRecorder struct { + statuses []RunState + gauges []recordedGaugeDelta + durations []float64 +} + +type recordedGaugeDelta struct { + state RunState + delta float64 +} + +func (r *fakeRunTransitionRecorder) RecordStatus(state RunState) { + r.statuses = append(r.statuses, state) +} + +func (r *fakeRunTransitionRecorder) AddGauge(state RunState, delta float64) { + r.gauges = append(r.gauges, recordedGaugeDelta{state: state, delta: delta}) +} + +func (r *fakeRunTransitionRecorder) ObserveDuration(_ RunState, durationSeconds float64) { + r.durations = append(r.durations, durationSeconds) +} + +func TestRunTransitionProcessorAppliesLifecycleThroughInterface(t *testing.T) { + store := &fakeRunTransitionStore{} + recorder := &fakeRunTransitionRecorder{} + processor := &runTransitionProcessor{ + store: store, + recorder: recorder, + } + + queued := runTransitionDetails(statusQueued, "", runTransitionStartedAt) + if result := processor.Apply(context.Background(), 1001, queued); !result.Applied { + t.Fatalf("expected queued transition to apply, got %#v", result) + } + + inProgress := runTransitionDetails(statusInProgress, "", runTransitionStartedAt) + if result := processor.Apply(context.Background(), 1001, inProgress); !result.Applied { + t.Fatalf("expected in-progress transition to apply, got %#v", result) + } + + completed := runTransitionDetails(statusCompleted, testConclusionSuccess, runTransitionCompletedAt) + if result := processor.Apply(context.Background(), 1001, completed); !result.Applied { + t.Fatalf("expected completed transition to apply, got %#v", result) + } + + if store.updateCalls != 3 { + t.Fatalf("expected 3 stored transitions, got %d", store.updateCalls) + } + if store.state.Status != statusCompleted || store.state.Conclusion != testConclusionSuccess { + t.Fatalf("expected final completed state, got %#v", store.state) + } + if len(recorder.statuses) != 3 { + t.Fatalf("expected 3 status records, got %d", len(recorder.statuses)) + } + + assertGaugeDelta(t, recorder.gauges, 0, statusQueued, 1) + assertGaugeDelta(t, recorder.gauges, 1, statusQueued, -1) + assertGaugeDelta(t, recorder.gauges, 2, statusInProgress, 1) + assertGaugeDelta(t, recorder.gauges, 3, statusInProgress, -1) + assertGaugeDelta(t, recorder.gauges, 4, statusCompleted, 1) + + if len(recorder.durations) != 1 || recorder.durations[0] != 3600 { + t.Fatalf("expected one 3600s duration observation, got %#v", recorder.durations) + } +} + +func TestRunTransitionProcessorDoesNotRecordMetricsWhenStoreFails(t *testing.T) { + store := &fakeRunTransitionStore{getErr: errors.New("store unavailable")} + recorder := &fakeRunTransitionRecorder{} + processor := &runTransitionProcessor{ + store: store, + recorder: recorder, + } + + result := processor.Apply(context.Background(), 1001, runTransitionDetails(statusCompleted, testConclusionSuccess, runTransitionCompletedAt)) + + if result.Err == nil { + t.Fatal("expected store error") + } + if len(recorder.statuses) != 0 || len(recorder.gauges) != 0 || len(recorder.durations) != 0 { + t.Fatalf("expected no metric records, got %#v", recorder) + } + if store.updateCalls != 0 { + t.Fatalf("expected no store updates, got %d", store.updateCalls) + } +} + +func runTransitionDetails(status, conclusion, endedAt string) runMetricDetails { + return runMetricDetails{ + repository: runTransitionTestRepository, + branch: runTransitionTestBranch, + name: runTransitionTestName, + status: status, + conclusion: conclusion, + startedAt: runTransitionStartedAt, + endedAt: endedAt, + } +} + +func assertGaugeDelta(t *testing.T, gauges []recordedGaugeDelta, index int, status string, delta float64) { + t.Helper() + + if len(gauges) <= index { + t.Fatalf("expected gauge delta at index %d, got %#v", index, gauges) + } + if gauges[index].state.Status != status || gauges[index].delta != delta { + t.Fatalf("expected gauge %d to be %s/%v, got %#v", index, status, delta, gauges[index]) + } +} diff --git a/src/test_support_test.go b/src/test_support_test.go index 301a3a6..ba424ad 100644 --- a/src/test_support_test.go +++ b/src/test_support_test.go @@ -2,6 +2,8 @@ package main import "context" +const testConclusionSuccess = "success" + type inMemoryStateStore struct { deliveries map[string]struct{} workflow map[int]RunState