From d116c5493cfc5fa297fa8785afb376b845af76b1 Mon Sep 17 00:00:00 2001 From: Kim Romero Date: Wed, 6 May 2026 13:22:10 +0200 Subject: [PATCH 1/8] Remove info-level log for mirrored requests --- proxy/http.go | 96 ++++++++++++--------------------------------------- 1 file changed, 22 insertions(+), 74 deletions(-) diff --git a/proxy/http.go b/proxy/http.go index 1b662c9..7c625a9 100644 --- a/proxy/http.go +++ b/proxy/http.go @@ -689,77 +689,7 @@ func (p *HTTP) execMirrorJob(job *mirrorJob) { err := p.peer.Do(job.req, job.res) - loggedFields := make([]zap.Field, 0, 12) - { // add log fields - loggedFields = append(loggedFields, - zap.String("mirror_host", job.host), - zap.Int("request_size", len(job.req.Body())), - zap.Int("response_size", len(job.res.Body())), - ) - - if p.cfg.proxy.LogRequests && len(job.req.Body()) <= p.cfg.proxy.LogRequestsMaxSize && p.shouldLogMethod(job.jrpcMethodForMetrics) { - var jsonRequest any - if err := json.Unmarshal(job.req.Body(), &jsonRequest); err == nil { - jrpc.Sanitize(jsonRequest) - loggedFields = append(loggedFields, - zap.Any("json_request", jsonRequest), - ) - } else { - loggedFields = append(loggedFields, - zap.String("http_request", utils.Str(job.req.Body())), - ) - } - } - - if err == nil { - loggedFields = append(loggedFields, - zap.Int("http_status", job.res.StatusCode()), - ) - - if p.cfg.proxy.LogResponses && len(job.res.Body()) <= p.cfg.proxy.LogResponsesMaxSize && p.shouldLogMethod(job.jrpcMethodForMetrics) { - switch utils.Str(job.res.Header.ContentEncoding()) { - default: - var jsonResponse any - if err := json.Unmarshal(job.res.Body(), &jsonResponse); err == nil { - jrpc.Sanitize(jsonResponse) - loggedFields = append(loggedFields, - zap.Any("json_response", jsonResponse), - ) - } else { - loggedFields = append(loggedFields, - zap.String("http_response", utils.Str(job.res.Body())), - ) - } - - case "gzip": - if body, err := job.res.BodyGunzip(); err == nil { - var jsonResponse any - if err := json.Unmarshal(body, &jsonResponse); err == nil { - jrpc.Sanitize(jsonResponse) - loggedFields = append(loggedFields, - zap.Any("json_response", jsonResponse), - ) - } else { - loggedFields = append(loggedFields, - zap.String("http_response", utils.Str(body)), - ) - } - } else { - loggedFields = append(loggedFields, - zap.NamedError("error_gzip", err), - zap.String("hex_response", hex.EncodeToString(job.res.Body())), - ) - } - } - } - } else { - loggedFields = append(loggedFields, - zap.NamedError("error_mirror", err), - ) - } - } - - { // emit logs and metrics + { // emit metrics metricAttributes := otelapi.WithAttributes( attribute.KeyValue{Key: "proxy", Value: attribute.StringValue(p.cfg.name)}, attribute.KeyValue{Key: "mirror_host", Value: attribute.StringValue(job.host)}, @@ -767,14 +697,32 @@ func (p *HTTP) execMirrorJob(job *mirrorJob) { ) if err == nil { - job.log.Info("Mirrored the request", loggedFields...) metrics.MirrorSuccessCount.Add(context.TODO(), 1, metricAttributes) } else { + // log err + loggedFields := []zap.Field{ + zap.String("mirror_host", job.host), + zap.Int("request_size", len(job.req.Body())), + zap.NamedError("error_mirror", err), + } + + if p.cfg.proxy.LogRequests && len(job.req.Body()) <= p.cfg.proxy.LogRequestsMaxSize && p.shouldLogMethod(job.jrpcMethodForMetrics) { + var jsonRequest any + if err := json.Unmarshal(job.req.Body(), &jsonRequest); err == nil { + jrpc.Sanitize(jsonRequest) + loggedFields = append(loggedFields, zap.Any("json_request", jsonRequest)) + } else { + loggedFields = append(loggedFields, + zap.NamedError("error_unmarshal", err), + zap.String("http_request", utils.Str(job.req.Body())), + ) + } + } + job.log.Error("Failed to mirror the request", loggedFields...) + _ = job.log.Sync() metrics.MirrorFailureCount.Add(context.TODO(), 1, metricAttributes) } - - _ = job.log.Sync() } } From b1d4250672b68ec3dc7ed222f9b739efff633db3 Mon Sep 17 00:00:00 2001 From: Kim Romero Date: Wed, 6 May 2026 13:38:53 +0200 Subject: [PATCH 2/8] Remove per-message info log from websocket pump --- proxy/websocket_pump.go | 51 ++++++++++++++++++++++------------------- 1 file changed, 27 insertions(+), 24 deletions(-) diff --git a/proxy/websocket_pump.go b/proxy/websocket_pump.go index a5502b4..b125ab5 100644 --- a/proxy/websocket_pump.go +++ b/proxy/websocket_pump.go @@ -200,8 +200,6 @@ func (p *websocketPump) pumpMessages( return case m := <-messages: - loggedFields := make([]zap.Field, 0, 6) - if p.cfg.proxy.Chaos.Enabled { // inject chaos dropMessage := rand.Float64() < p.cfg.proxy.Chaos.DroppedMessageProbability/100 injectInvalidFlashblockPayload := rand.Float64() < p.cfg.proxy.Chaos.InjectedInvalidFlashblockPayloadProbability/100 @@ -216,9 +214,9 @@ func (p *websocketPump) pumpMessages( if injectInvalidFlashblockPayload { // inject invalid flashblock payload if err := m.chaosMangle(); err == nil { - loggedFields = append(loggedFields, + l.Info("Injected invalid flashblock payload", p.prepareLogFields(m, zap.Bool("chaos_injected_invalid_flashblock_payload", true), - ) + )...) } else { l.Warn("Failed to generate invalid flashblock payload", zap.Error(err), @@ -227,42 +225,47 @@ func (p *websocketPump) pumpMessages( } if injectMalformedJsonMessage { // inject malformed json - loggedFields = append(loggedFields, - zap.Bool("chaos_injected_malformed_json_message", true), - ) m.bytes = m.bytes[1 : len(m.bytes)-1] + l.Info("Injected malformed JSON message", p.prepareLogFields(m, + zap.Bool("chaos_injected_malformed_json_message", true), + )...) } - { // chaos-inject latency - if p.cfg.proxy.Chaos.MinInjectedLatency > 0 || p.cfg.proxy.Chaos.MaxInjectedLatency > 0 { - loggedFields = append(loggedFields, - zap.Bool("chaos_injected_latency", true), - ) - latency := time.Duration(rand.Int64N(int64(p.cfg.proxy.Chaos.MaxInjectedLatency) + 1)) - latency = max(latency, p.cfg.proxy.Chaos.MinInjectedLatency) - time.Sleep(latency - time.Since(m.ts)) - } + if p.cfg.proxy.Chaos.MinInjectedLatency > 0 || p.cfg.proxy.Chaos.MaxInjectedLatency > 0 { // chaos-inject latency + latency := time.Duration(rand.Int64N(int64(p.cfg.proxy.Chaos.MaxInjectedLatency) + 1)) + latency = max(latency, p.cfg.proxy.Chaos.MinInjectedLatency) + time.Sleep(latency - time.Since(m.ts)) + l.Info("Injected latency", p.prepareLogFields(m, + zap.Bool("chaos_injected_latency", true), + zap.Duration("chaos_latency", latency), + )...) } } if err := to.SetWriteDeadline(utils.Deadline(timeout)); err != nil { + l.Error("Failed to set write deadline", + zap.Int("message_type", m.msgType), + zap.Int("message_size", len(m.bytes)), + zap.Error(err), + ) notifyOnFailure(err) continue } if err := to.WriteMessage(m.msgType, m.bytes); err != nil { + l.Error("Failed to write websocket message", + zap.Int("message_type", m.msgType), + zap.Int("message_size", len(m.bytes)), + zap.Error(err), + ) notifyOnFailure(err) continue } - { // emit logs and metrics - metrics.ProxySuccessCount.Add(context.TODO(), 1, otelapi.WithAttributes( - attribute.KeyValue{Key: "proxy", Value: attribute.StringValue(p.cfg.name)}, - attribute.KeyValue{Key: "direction", Value: attribute.StringValue(direction)}, - )) - - l.Info("Proxied message", p.prepareLogFields(m, loggedFields...)...) - } + metrics.ProxySuccessCount.Add(context.TODO(), 1, otelapi.WithAttributes( + attribute.KeyValue{Key: "proxy", Value: attribute.StringValue(p.cfg.name)}, + attribute.KeyValue{Key: "direction", Value: attribute.StringValue(direction)}, + )) } } }() From c7b6b67dbc4b82a7387a0e14cbd9d79b20054ad5 Mon Sep 17 00:00:00 2001 From: Kim Romero Date: Wed, 6 May 2026 13:44:27 +0200 Subject: [PATCH 3/8] fix: remove redundant DecodeEthRawTransaction call in prepareLogFields --- proxy/websocket_pump.go | 1 - 1 file changed, 1 deletion(-) diff --git a/proxy/websocket_pump.go b/proxy/websocket_pump.go index b125ab5..200ca5b 100644 --- a/proxy/websocket_pump.go +++ b/proxy/websocket_pump.go @@ -317,7 +317,6 @@ func (p *websocketPump) prepareLogFields( Nonce: tx.Nonce(), }) } else { - _, _, err := jrpc.DecodeEthRawTransaction(strTx) errs = append(errs, err) } From 2871dd1e701176acd27cb4b178c3bb63005dd9ce Mon Sep 17 00:00:00 2001 From: Kim Romero Date: Wed, 6 May 2026 15:05:03 +0200 Subject: [PATCH 4/8] Skip success proxy log for methods without data --- proxy/http.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/proxy/http.go b/proxy/http.go index 7c625a9..2b81ade 100644 --- a/proxy/http.go +++ b/proxy/http.go @@ -675,7 +675,10 @@ func (p *HTTP) execProxyJob(job *proxyJob) { job.log.Error("Failed to proxy the request", loggedFields...) } else if job.triage.Proxy { metrics.ProxySuccessCount.Add(context.TODO(), 1, metricAttributes) - job.log.Info("Proxied the request", loggedFields...) + // emit success logs only when triage enriched the request with detail + if len(job.triage.Transactions) > 0 || !job.triage.Deadline.IsZero() { + job.log.Info("Proxied the request", loggedFields...) + } } else { metrics.ProxyFakeCount.Add(context.TODO(), 1, metricAttributes) job.log.Info("Faked the request", loggedFields...) From 84958ad209343966b04171388627483f47533655 Mon Sep 17 00:00:00 2001 From: Kim Romero Date: Thu, 7 May 2026 15:33:18 +0200 Subject: [PATCH 5/8] Restore success logs; downgrade to Debug --- proxy/http.go | 10 ++++++---- proxy/websocket_pump.go | 12 ++++++++---- 2 files changed, 14 insertions(+), 8 deletions(-) diff --git a/proxy/http.go b/proxy/http.go index 2b81ade..a532903 100644 --- a/proxy/http.go +++ b/proxy/http.go @@ -675,10 +675,7 @@ func (p *HTTP) execProxyJob(job *proxyJob) { job.log.Error("Failed to proxy the request", loggedFields...) } else if job.triage.Proxy { metrics.ProxySuccessCount.Add(context.TODO(), 1, metricAttributes) - // emit success logs only when triage enriched the request with detail - if len(job.triage.Transactions) > 0 || !job.triage.Deadline.IsZero() { - job.log.Info("Proxied the request", loggedFields...) - } + job.log.Debug("Proxied the request", loggedFields...) } else { metrics.ProxyFakeCount.Add(context.TODO(), 1, metricAttributes) job.log.Info("Faked the request", loggedFields...) @@ -700,6 +697,11 @@ func (p *HTTP) execMirrorJob(job *mirrorJob) { ) if err == nil { + job.log.Debug("Mirrored the request", + zap.String("mirror_host", job.host), + zap.Int("request_size", len(job.req.Body())), + zap.Int("response_size", len(job.res.Body())), + ) metrics.MirrorSuccessCount.Add(context.TODO(), 1, metricAttributes) } else { // log err diff --git a/proxy/websocket_pump.go b/proxy/websocket_pump.go index 200ca5b..e014ac8 100644 --- a/proxy/websocket_pump.go +++ b/proxy/websocket_pump.go @@ -262,10 +262,14 @@ func (p *websocketPump) pumpMessages( continue } - metrics.ProxySuccessCount.Add(context.TODO(), 1, otelapi.WithAttributes( - attribute.KeyValue{Key: "proxy", Value: attribute.StringValue(p.cfg.name)}, - attribute.KeyValue{Key: "direction", Value: attribute.StringValue(direction)}, - )) + { // emit logs and metrics + metrics.ProxySuccessCount.Add(context.TODO(), 1, otelapi.WithAttributes( + attribute.KeyValue{Key: "proxy", Value: attribute.StringValue(p.cfg.name)}, + attribute.KeyValue{Key: "direction", Value: attribute.StringValue(direction)}, + )) + + l.Debug("Proxied message", p.prepareLogFields(m)...) + } } } }() From c30f89bc87a4129fb826a47745a6531745c9df5f Mon Sep 17 00:00:00 2001 From: Kim Romero Date: Thu, 7 May 2026 15:42:50 +0200 Subject: [PATCH 6/8] Restore field building for mirror Debug log --- proxy/http.go | 102 +++++++++++++++++++++++++++++++++++++------------- 1 file changed, 75 insertions(+), 27 deletions(-) diff --git a/proxy/http.go b/proxy/http.go index a532903..bbad602 100644 --- a/proxy/http.go +++ b/proxy/http.go @@ -689,7 +689,78 @@ func (p *HTTP) execMirrorJob(job *mirrorJob) { err := p.peer.Do(job.req, job.res) - { // emit metrics + loggedFields := make([]zap.Field, 0, 12) + { // add log fields + loggedFields = append(loggedFields, + zap.String("mirror_host", job.host), + zap.Int("request_size", len(job.req.Body())), + zap.Int("response_size", len(job.res.Body())), + ) + + if p.cfg.proxy.LogRequests && len(job.req.Body()) <= p.cfg.proxy.LogRequestsMaxSize && p.shouldLogMethod(job.jrpcMethodForMetrics) { + var jsonRequest any + if err := json.Unmarshal(job.req.Body(), &jsonRequest); err == nil { + jrpc.Sanitize(jsonRequest) + loggedFields = append(loggedFields, + zap.Any("json_request", jsonRequest), + ) + } else { + loggedFields = append(loggedFields, + zap.NamedError("error_unmarshal", err), + zap.String("http_request", utils.Str(job.req.Body())), + ) + } + } + + if err == nil { + loggedFields = append(loggedFields, + zap.Int("http_status", job.res.StatusCode()), + ) + + if p.cfg.proxy.LogResponses && len(job.res.Body()) <= p.cfg.proxy.LogResponsesMaxSize && p.shouldLogMethod(job.jrpcMethodForMetrics) { + switch utils.Str(job.res.Header.ContentEncoding()) { + default: + var jsonResponse any + if err := json.Unmarshal(job.res.Body(), &jsonResponse); err == nil { + jrpc.Sanitize(jsonResponse) + loggedFields = append(loggedFields, + zap.Any("json_response", jsonResponse), + ) + } else { + loggedFields = append(loggedFields, + zap.String("http_response", utils.Str(job.res.Body())), + ) + } + + case "gzip": + if body, err := job.res.BodyGunzip(); err == nil { + var jsonResponse any + if err := json.Unmarshal(body, &jsonResponse); err == nil { + jrpc.Sanitize(jsonResponse) + loggedFields = append(loggedFields, + zap.Any("json_response", jsonResponse), + ) + } else { + loggedFields = append(loggedFields, + zap.String("http_response", utils.Str(body)), + ) + } + } else { + loggedFields = append(loggedFields, + zap.NamedError("error_gzip", err), + zap.String("hex_response", hex.EncodeToString(job.res.Body())), + ) + } + } + } + } else { + loggedFields = append(loggedFields, + zap.NamedError("error_mirror", err), + ) + } + } + + { // emit logs and metrics metricAttributes := otelapi.WithAttributes( attribute.KeyValue{Key: "proxy", Value: attribute.StringValue(p.cfg.name)}, attribute.KeyValue{Key: "mirror_host", Value: attribute.StringValue(job.host)}, @@ -697,37 +768,14 @@ func (p *HTTP) execMirrorJob(job *mirrorJob) { ) if err == nil { - job.log.Debug("Mirrored the request", - zap.String("mirror_host", job.host), - zap.Int("request_size", len(job.req.Body())), - zap.Int("response_size", len(job.res.Body())), - ) + job.log.Debug("Mirrored the request", loggedFields...) metrics.MirrorSuccessCount.Add(context.TODO(), 1, metricAttributes) } else { - // log err - loggedFields := []zap.Field{ - zap.String("mirror_host", job.host), - zap.Int("request_size", len(job.req.Body())), - zap.NamedError("error_mirror", err), - } - - if p.cfg.proxy.LogRequests && len(job.req.Body()) <= p.cfg.proxy.LogRequestsMaxSize && p.shouldLogMethod(job.jrpcMethodForMetrics) { - var jsonRequest any - if err := json.Unmarshal(job.req.Body(), &jsonRequest); err == nil { - jrpc.Sanitize(jsonRequest) - loggedFields = append(loggedFields, zap.Any("json_request", jsonRequest)) - } else { - loggedFields = append(loggedFields, - zap.NamedError("error_unmarshal", err), - zap.String("http_request", utils.Str(job.req.Body())), - ) - } - } - job.log.Error("Failed to mirror the request", loggedFields...) - _ = job.log.Sync() metrics.MirrorFailureCount.Add(context.TODO(), 1, metricAttributes) } + + _ = job.log.Sync() } } From 0d788c8ff6099776988e1446b30f978f16e31083 Mon Sep 17 00:00:00 2001 From: Kim Romero Date: Thu, 7 May 2026 15:51:56 +0200 Subject: [PATCH 7/8] Wrap websocket write errors with message context --- proxy/websocket_pump.go | 15 +++------------ 1 file changed, 3 insertions(+), 12 deletions(-) diff --git a/proxy/websocket_pump.go b/proxy/websocket_pump.go index e014ac8..0eb0a42 100644 --- a/proxy/websocket_pump.go +++ b/proxy/websocket_pump.go @@ -5,6 +5,7 @@ import ( "encoding/hex" "encoding/json" "errors" + "fmt" "math/rand/v2" "strings" "sync/atomic" @@ -243,22 +244,12 @@ func (p *websocketPump) pumpMessages( } if err := to.SetWriteDeadline(utils.Deadline(timeout)); err != nil { - l.Error("Failed to set write deadline", - zap.Int("message_type", m.msgType), - zap.Int("message_size", len(m.bytes)), - zap.Error(err), - ) - notifyOnFailure(err) + notifyOnFailure(fmt.Errorf("set write deadline (msg_type=%d, msg_size=%d): %w", m.msgType, len(m.bytes), err)) continue } if err := to.WriteMessage(m.msgType, m.bytes); err != nil { - l.Error("Failed to write websocket message", - zap.Int("message_type", m.msgType), - zap.Int("message_size", len(m.bytes)), - zap.Error(err), - ) - notifyOnFailure(err) + notifyOnFailure(fmt.Errorf("write message (msg_type=%d, msg_size=%d): %w", m.msgType, len(m.bytes), err)) continue } From e04657638544e174da4c01d810d8e707e0e3e852 Mon Sep 17 00:00:00 2001 From: Kim Romero Date: Thu, 7 May 2026 16:03:52 +0200 Subject: [PATCH 8/8] Revert chaos restructuring --- proxy/websocket_pump.go | 31 +++++++++++++++++-------------- 1 file changed, 17 insertions(+), 14 deletions(-) diff --git a/proxy/websocket_pump.go b/proxy/websocket_pump.go index 0eb0a42..5293eb9 100644 --- a/proxy/websocket_pump.go +++ b/proxy/websocket_pump.go @@ -201,6 +201,8 @@ func (p *websocketPump) pumpMessages( return case m := <-messages: + loggedFields := make([]zap.Field, 0, 6) + if p.cfg.proxy.Chaos.Enabled { // inject chaos dropMessage := rand.Float64() < p.cfg.proxy.Chaos.DroppedMessageProbability/100 injectInvalidFlashblockPayload := rand.Float64() < p.cfg.proxy.Chaos.InjectedInvalidFlashblockPayloadProbability/100 @@ -215,9 +217,9 @@ func (p *websocketPump) pumpMessages( if injectInvalidFlashblockPayload { // inject invalid flashblock payload if err := m.chaosMangle(); err == nil { - l.Info("Injected invalid flashblock payload", p.prepareLogFields(m, + loggedFields = append(loggedFields, zap.Bool("chaos_injected_invalid_flashblock_payload", true), - )...) + ) } else { l.Warn("Failed to generate invalid flashblock payload", zap.Error(err), @@ -226,20 +228,21 @@ func (p *websocketPump) pumpMessages( } if injectMalformedJsonMessage { // inject malformed json - m.bytes = m.bytes[1 : len(m.bytes)-1] - l.Info("Injected malformed JSON message", p.prepareLogFields(m, + loggedFields = append(loggedFields, zap.Bool("chaos_injected_malformed_json_message", true), - )...) + ) + m.bytes = m.bytes[1 : len(m.bytes)-1] } - if p.cfg.proxy.Chaos.MinInjectedLatency > 0 || p.cfg.proxy.Chaos.MaxInjectedLatency > 0 { // chaos-inject latency - latency := time.Duration(rand.Int64N(int64(p.cfg.proxy.Chaos.MaxInjectedLatency) + 1)) - latency = max(latency, p.cfg.proxy.Chaos.MinInjectedLatency) - time.Sleep(latency - time.Since(m.ts)) - l.Info("Injected latency", p.prepareLogFields(m, - zap.Bool("chaos_injected_latency", true), - zap.Duration("chaos_latency", latency), - )...) + { // chaos-inject latency + if p.cfg.proxy.Chaos.MinInjectedLatency > 0 || p.cfg.proxy.Chaos.MaxInjectedLatency > 0 { + loggedFields = append(loggedFields, + zap.Bool("chaos_injected_latency", true), + ) + latency := time.Duration(rand.Int64N(int64(p.cfg.proxy.Chaos.MaxInjectedLatency) + 1)) + latency = max(latency, p.cfg.proxy.Chaos.MinInjectedLatency) + time.Sleep(latency - time.Since(m.ts)) + } } } @@ -259,7 +262,7 @@ func (p *websocketPump) pumpMessages( attribute.KeyValue{Key: "direction", Value: attribute.StringValue(direction)}, )) - l.Debug("Proxied message", p.prepareLogFields(m)...) + l.Debug("Proxied message", p.prepareLogFields(m, loggedFields...)...) } } }