Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 9 additions & 121 deletions src/github.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"strings"
"time"

"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -82,29 +81,6 @@ type GithubPullRequest struct {
Repository GithubRepo `json:"repository"`
}

type runMetricDetails struct {
repository string
branch string
name string
status string
conclusion string
startedAt string
endedAt string
}

type runMetricSet struct {
statusCounter *prometheus.CounterVec
queuedGauge *prometheus.GaugeVec
inProgressGauge *prometheus.GaugeVec
completedGauge *prometheus.GaugeVec
durationHistogram *prometheus.HistogramVec
}

type runStoreMethods struct {
get func(context.Context, int) (RunState, bool, error)
update func(context.Context, int, RunState) error
}

const (
statusQueued = "queued"
statusInProgress = "in_progress"
Expand Down Expand Up @@ -202,86 +178,6 @@ func shouldApplyStateTransition(previous, next RunState) bool {
return true
}

func applyGaugeDelta(details RunState, delta float64, queuedGauge, inProgressGauge, completedGauge *prometheus.GaugeVec) {
switch normalizeStatus(details.Status) {
case statusQueued:
queuedGauge.WithLabelValues(details.Repository, details.Branch, details.Name).Add(delta)
case statusInProgress:
inProgressGauge.WithLabelValues(details.Repository, details.Branch, details.Name).Add(delta)
case statusCompleted:
completedGauge.WithLabelValues(details.Repository, details.Branch, details.Conclusion, details.Name).Add(delta)
}
}

func observeDuration(details RunState, durationHistogram *prometheus.HistogramVec) {
if normalizeStatus(details.Status) != statusCompleted {
return
}

startedAt, startedOK := parseMetricTime(details.StartedAt)
endedAt, endedOK := parseMetricTime(details.EndedAt)
if !startedOK || !endedOK || endedAt.Before(startedAt) {
return
}

durationHistogram.WithLabelValues(
details.Repository,
details.Branch,
details.Name,
details.Status,
details.Conclusion,
).Observe(endedAt.Sub(startedAt).Seconds())
}

func applyStatefulMetrics(details RunState, previous *RunState, metrics runMetricSet) {
metrics.statusCounter.WithLabelValues(
details.Repository,
details.Branch,
details.Name,
details.Status,
details.Conclusion,
).Inc()

if previous != nil {
applyGaugeDelta(*previous, -1, metrics.queuedGauge, metrics.inProgressGauge, metrics.completedGauge)
}
applyGaugeDelta(details, 1, metrics.queuedGauge, metrics.inProgressGauge, metrics.completedGauge)

if previous == nil || normalizeStatus(previous.Status) != statusCompleted {
observeDuration(details, metrics.durationHistogram)
}
}

func getPreviousState(ctx context.Context, id int, getFn func(context.Context, int) (RunState, bool, error), entityName string) (*RunState, bool) {
if stateStore == nil {
return nil, true
}

previous, found, err := getFn(ctx, id)
if err != nil {
logger.Error("Failed to load run state from redis", zap.String("entity", entityName), zap.Int("id", id), zap.Error(err))
return nil, false
}
if !found {
return nil, true
}

return &previous, true
}

func persistRunState(ctx context.Context, id int, next RunState, updateFn func(context.Context, int, RunState) error, entityName string) bool {
if stateStore == nil {
return true
}

if err := updateFn(ctx, id, next); err != nil {
logger.Error("Failed to update run state in redis", zap.String("entity", entityName), zap.Int("id", id), zap.Error(err))
return false
}

return true
}

func updateTrackedRunMetrics(
ctx context.Context,
id int,
Expand All @@ -290,26 +186,18 @@ func updateTrackedRunMetrics(
entityName string,
metrics runMetricSet,
) {
nextState := normalizeRunState(details)

if stateStore == nil {
applyStatefulMetrics(nextState, nil, metrics)
return
var storeAdapter runTransitionStore
if stateStore != nil {
storeAdapter = runStoreAdapter{methods: store}
}

previousState, ok := getPreviousState(ctx, id, store.get, entityName)
if !ok {
return
}
if previousState != nil && !shouldApplyStateTransition(*previousState, nextState) {
logger.Debug("Skipping stale or duplicate run transition", zap.String("entity", entityName), zap.Int("id", id), zap.String("status", nextState.Status), zap.String("conclusion", nextState.Conclusion))
return
processor := &runTransitionProcessor{
store: storeAdapter,
recorder: prometheusRunTransitionRecorder{metrics: metrics},
logger: logger,
entityName: entityName,
}
if !persistRunState(ctx, id, nextState, store.update, entityName) {
return
}

applyStatefulMetrics(nextState, previousState, metrics)
processor.Apply(ctx, id, details)
}

func workflowRunStoreMethods() runStoreMethods {
Expand Down
92 changes: 91 additions & 1 deletion src/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"crypto/hmac"
"crypto/sha256"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
Expand All @@ -22,6 +24,11 @@ import (
"go.uber.org/zap"
)

const (
integrationRunStartedAt = "2023-01-01T00:00:00Z"
integrationRunCompletedAt = "2023-01-01T01:00:00Z"
)

func TestIntegrationWebhookMetrics(t *testing.T) {
testCases := []struct {
name string
Expand Down Expand Up @@ -188,6 +195,58 @@ func TestIntegrationDuplicateDeliveryDoesNotInflateMetrics(t *testing.T) {
}
}

func TestIntegrationDeliveryStoreFailurePreventsWebhookProcessing(t *testing.T) {
server := newIntegrationTestServerWithStateStore(t, asyncProcessorConfig{WorkerCount: 1, QueueSize: 8}, failingDeliveryStateStore{
inMemoryStateStore: newInMemoryStateStore(),
})
defer server.Close()

body := mustReadFixture(t, "workflow_run.json")
resp := sendWebhookRequest(t, server.URL, githubEventWorkflowRun, body, "delivery-store-failure")
assertResponseStatus(t, resp, http.StatusInternalServerError)

metrics := mustFetchMetrics(t, server.URL)
if strings.Contains(metrics, `promgithub_workflow_status{branch="main",conclusion="success",repository="user/repo",workflow_name="CI",workflow_status="completed"} 1`) {
t.Fatalf("workflow metrics changed after delivery store failure:\n%s", metrics)
}
if strings.Contains(metrics, `promgithub_event_processed_total{event_type="workflow_run"}`) {
t.Fatalf("event was enqueued after delivery store failure:\n%s", metrics)
}
}

func TestIntegrationWorkflowRunLifecycleBalancesStatefulMetrics(t *testing.T) {
server := newIntegrationTestServer(t)
defer server.Close()

queuedBody := workflowRunFixtureWithStatus(t, statusQueued, "", integrationRunStartedAt)
queuedResp := sendWebhookRequest(t, server.URL, githubEventWorkflowRun, queuedBody, "delivery-lifecycle-queued")
assertResponseStatus(t, queuedResp, http.StatusAccepted)
waitForMetricsSubstring(t, server.URL, `promgithub_workflow_queued{branch="main",repository="user/repo",workflow_name="CI"} 1`)

inProgressBody := workflowRunFixtureWithStatus(t, statusInProgress, "", integrationRunStartedAt)
inProgressResp := sendWebhookRequest(t, server.URL, githubEventWorkflowRun, inProgressBody, "delivery-lifecycle-in-progress")
assertResponseStatus(t, inProgressResp, http.StatusAccepted)
waitForMetricsSubstring(t, server.URL, `promgithub_workflow_in_progress{branch="main",repository="user/repo",workflow_name="CI"} 1`)

completedBody := workflowRunFixtureWithStatus(t, statusCompleted, testConclusionSuccess, integrationRunCompletedAt)
completedResp := sendWebhookRequest(t, server.URL, githubEventWorkflowRun, completedBody, "delivery-lifecycle-completed")
assertResponseStatus(t, completedResp, http.StatusAccepted)

metrics := waitForMetricsSubstring(t, server.URL, `promgithub_workflow_completed{branch="main",repository="user/repo",workflow_conclusion="success",workflow_name="CI"} 1`)
expectedMetrics := []string{
`promgithub_workflow_queued{branch="main",repository="user/repo",workflow_name="CI"} 0`,
`promgithub_workflow_in_progress{branch="main",repository="user/repo",workflow_name="CI"} 0`,
`promgithub_workflow_completed{branch="main",repository="user/repo",workflow_conclusion="success",workflow_name="CI"} 1`,
`promgithub_workflow_duration_sum{branch="main",conclusion="success",repository="user/repo",workflow_name="CI",workflow_status="completed"} 3600`,
`promgithub_workflow_duration_count{branch="main",conclusion="success",repository="user/repo",workflow_name="CI",workflow_status="completed"} 1`,
}
for _, expected := range expectedMetrics {
if !strings.Contains(metrics, expected) {
t.Fatalf("expected metrics to contain %q, got:\n%s", expected, metrics)
}
}
}

func TestIntegrationAsyncQueueFullReturnsUnavailableAndExposesQueueDropMetrics(t *testing.T) {
server := newIntegrationTestServerWithAsyncConfig(t, asyncProcessorConfig{WorkerCount: 1, QueueSize: 1})
defer server.Close()
Expand Down Expand Up @@ -315,11 +374,16 @@ func newIntegrationTestServer(t *testing.T) *httptest.Server {
}

func newIntegrationTestServerWithAsyncConfig(t *testing.T, cfg asyncProcessorConfig) *httptest.Server {
t.Helper()
return newIntegrationTestServerWithStateStore(t, cfg, newInMemoryStateStore())
}

func newIntegrationTestServerWithStateStore(t *testing.T, cfg asyncProcessorConfig, store StateStore) *httptest.Server {
t.Helper()
resetIntegrationTestMetrics()

githubWebhookSecret = []byte("integration-test-secret")
stateStore = newInMemoryStateStore()
stateStore = store
eventProcessor = newAsyncEventProcessor(cfg, zap.NewNop())
eventProcessor.Start()
t.Cleanup(func() {
Expand All @@ -334,6 +398,14 @@ func newIntegrationTestServerWithAsyncConfig(t *testing.T, cfg asyncProcessorCon
return httptest.NewServer(router)
}

type failingDeliveryStateStore struct {
*inMemoryStateStore
}

func (s failingDeliveryStateStore) MarkDeliveryProcessed(context.Context, string) (bool, error) {
return false, errors.New("delivery store unavailable")
}

func assertResponseStatus(t *testing.T, resp *http.Response, expectedStatus int) {
t.Helper()
defer func() { _ = resp.Body.Close() }()
Expand Down Expand Up @@ -390,6 +462,24 @@ func mustReadFixture(t *testing.T, name string) []byte {
return body
}

func workflowRunFixtureWithStatus(t *testing.T, status, conclusion, updatedAt string) []byte {
t.Helper()

var payload GithubWorkflow
if err := json.Unmarshal(mustReadFixture(t, "workflow_run.json"), &payload); err != nil {
t.Fatalf("failed to unmarshal workflow fixture: %v", err)
}
payload.Workflow.Status = status
payload.Workflow.Conclusion = conclusion
payload.Workflow.UpdatedAt = updatedAt

body, err := json.Marshal(payload)
if err != nil {
t.Fatalf("failed to marshal workflow fixture: %v", err)
}
return body
}

func sendWebhookRequest(t *testing.T, serverURL, eventType string, body []byte, deliveryID string) *http.Response {
t.Helper()
signature := webhookSignature(body, githubWebhookSecret)
Expand Down
8 changes: 4 additions & 4 deletions src/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ func TestWorkflowGaugeTransitionIsIdempotent(t *testing.T) {
updateWorkflowMetrics(context.Background(), inProgressBody)

payload.Workflow.Status = statusCompleted
payload.Workflow.Conclusion = "success"
payload.Workflow.Conclusion = testConclusionSuccess
payload.Workflow.UpdatedAt = "2024-11-21T12:00:00Z"
completedBody, _ := json.Marshal(payload)
updateWorkflowMetrics(context.Background(), completedBody)
Expand All @@ -366,7 +366,7 @@ func TestWorkflowGaugeTransitionIsIdempotent(t *testing.T) {
if got := testutil.ToFloat64(workflowInProgressGauge.WithLabelValues("user/repo", "main", "CI")); got != 0 {
t.Fatalf("expected in progress gauge to be 0, got %v", got)
}
if got := testutil.ToFloat64(workflowCompletedGauge.WithLabelValues("user/repo", "main", "success", "CI")); got != 1 {
if got := testutil.ToFloat64(workflowCompletedGauge.WithLabelValues("user/repo", "main", testConclusionSuccess, "CI")); got != 1 {
t.Fatalf("expected completed gauge to be 1, got %v", got)
}
}
Expand Down Expand Up @@ -401,7 +401,7 @@ func TestJobGaugeTransitionIsIdempotent(t *testing.T) {
updateJobMetrics(context.Background(), inProgressBody)

payload.Job.Status = statusCompleted
payload.Job.Conclusion = "success"
payload.Job.Conclusion = testConclusionSuccess
payload.Job.CompletedAt = "2024-11-21T12:00:00Z"
completedBody, _ := json.Marshal(payload)
updateJobMetrics(context.Background(), completedBody)
Expand All @@ -413,7 +413,7 @@ func TestJobGaugeTransitionIsIdempotent(t *testing.T) {
if got := testutil.ToFloat64(jobInProgressGauge.WithLabelValues("user/repo", "main", "CI")); got != 0 {
t.Fatalf("expected in progress gauge to be 0, got %v", got)
}
if got := testutil.ToFloat64(jobCompletedGauge.WithLabelValues("user/repo", "main", "success", "CI")); got != 1 {
if got := testutil.ToFloat64(jobCompletedGauge.WithLabelValues("user/repo", "main", testConclusionSuccess, "CI")); got != 1 {
t.Fatalf("expected completed gauge to be 1, got %v", got)
}
}
Loading
Loading