From de3c8f39e06771e553ba16fbf316d8c598bc63ac Mon Sep 17 00:00:00 2001 From: Shariq Naiyer Date: Tue, 5 May 2026 07:44:43 -0600 Subject: [PATCH 1/4] feat: add validator sync-lag duty gate --- src/lean_spec/subspecs/sync/peer_manager.py | 17 ++ src/lean_spec/subspecs/validator/constants.py | 22 ++ src/lean_spec/subspecs/validator/service.py | 106 +++++++- .../subspecs/sync/test_peer_manager.py | 51 ++++ .../subspecs/validator/test_service.py | 244 ++++++++++++++++++ 5 files changed, 429 insertions(+), 11 deletions(-) create mode 100644 src/lean_spec/subspecs/validator/constants.py diff --git a/src/lean_spec/subspecs/sync/peer_manager.py b/src/lean_spec/subspecs/sync/peer_manager.py index da679e4bd..97203141f 100644 --- a/src/lean_spec/subspecs/sync/peer_manager.py +++ b/src/lean_spec/subspecs/sync/peer_manager.py @@ -167,6 +167,23 @@ def get_network_finalized_slot(self) -> Slot | None: return None return counter.most_common(1)[0][0] + def get_network_head_slot(self) -> Slot | None: + """ + Highest head slot reported by any connected peer with status. + + Used to distinguish a node that is itself behind from a network where + every peer is also behind (e.g. a streak of skipped proposals). + Returns None when no connected peer has reported a status yet. + """ + slots = [ + peer.status.head.slot + for peer in self._peers.values() + if peer.status is not None and peer.is_connected() + ] + if not slots: + return None + return max(slots) + def on_request_success(self, peer_id: PeerId) -> None: """Record a successful request to a peer.""" peer = self._peers.get(peer_id) diff --git a/src/lean_spec/subspecs/validator/constants.py b/src/lean_spec/subspecs/validator/constants.py new file mode 100644 index 000000000..8ef0b8675 --- /dev/null +++ b/src/lean_spec/subspecs/validator/constants.py @@ -0,0 +1,22 @@ +""" +Validator service constants. + +Operational thresholds governing validator duty execution. +""" + +from __future__ import annotations + +from typing import Final + +SYNC_LAG_THRESHOLD: Final[int] = 4 +""" +Maximum tolerated lag, in slots, between wall-clock and the local head before +validator duties are skipped. + +A node whose local head trails wall clock by more than this attests against a +stale subtree, depositing fork-choice weight on the wrong branch. + +The gate also checks peer-reported head slots: if no peer claims a recent head, +the network as a whole is lagging (e.g. a streak of skipped proposals) and the +gate stays open so the chain can keep progressing. +""" diff --git a/src/lean_spec/subspecs/validator/service.py b/src/lean_spec/subspecs/validator/service.py index ab60e12de..5c07bab86 100644 --- a/src/lean_spec/subspecs/validator/service.py +++ b/src/lean_spec/subspecs/validator/service.py @@ -54,6 +54,7 @@ from lean_spec.subspecs.xmss.containers import Signature from lean_spec.types import Bytes32, Slot, Uint64, ValidatorIndex +from .constants import SYNC_LAG_THRESHOLD from .registry import ValidatorEntry, ValidatorRegistry logger = logging.getLogger(__name__) @@ -111,6 +112,9 @@ class ValidatorService: _attested_slots: set[Slot] = field(default_factory=set, repr=False) """Slots for which we've already produced attestations (prevents duplicates).""" + _duties_skipped_lag: int = field(default=0, repr=False) + """Counter for duties skipped because the local head trails wall clock.""" + async def run(self) -> None: """ Main loop - check duties every interval. @@ -168,7 +172,10 @@ async def run(self) -> None: # # Check if any of our validators is the proposer. logger.debug("ValidatorService: checking block production for slot %d", slot) - await self._maybe_produce_block(slot) + if self._is_synced_for_duties(slot): + await self._maybe_produce_block(slot) + else: + self._record_lag_skip(slot, "block") logger.debug("ValidatorService: done block production check for slot %d", slot) # Re-fetch interval after block production. @@ -197,16 +204,21 @@ async def run(self) -> None: slot, interval, ) - await self._produce_attestations(slot) - logger.debug("ValidatorService: done producing attestations for slot %d", slot) - self._attested_slots.add(slot) - - # Prune old entries to prevent unbounded growth. - # - # Keep only recent slots (current slot - 4) to bound memory usage. - # We never need to attest for slots that far in the past. - prune_threshold = Slot(max(0, int(slot) - 4)) - self._attested_slots = {s for s in self._attested_slots if s >= prune_threshold} + if self._is_synced_for_duties(slot): + await self._produce_attestations(slot) + logger.debug("ValidatorService: done producing attestations for slot %d", slot) + self._attested_slots.add(slot) + + # Prune old entries to prevent unbounded growth. + # + # Keep only recent slots (current slot - 4) to bound memory usage. + # We never need to attest for slots that far in the past. + prune_threshold = Slot(max(0, int(slot) - 4)) + self._attested_slots = {s for s in self._attested_slots if s >= prune_threshold} + else: + # Do NOT mark the slot attested: if the node catches up before + # the slot ends, the next iteration should retry the duty. + self._record_lag_skip(slot, "attestation") # Intervals 2-4 have no additional validator duties. @@ -498,6 +510,73 @@ def _sign_with_key( self.registry.add(updated_entry) return updated_entry, signature + def _is_synced_for_duties(self, slot: Slot) -> bool: + """ + Decide whether validator duties should run for the given slot. + + Combines two signals to avoid two failure modes: + + 1. Local lag: if our head is close to wall clock, attest and propose. + 2. Peer max head: if even the most up-to-date peer is also far behind, + the network is stalling (a streak of skipped proposals) and gating + our duties would only deepen the stall. + + Args: + slot: Wall-clock slot for which a duty would be performed. + + Returns: + True when duties should run; False when the local node is + materially behind a network that has otherwise made progress. + """ + store = self.sync_service.store + head_block = store.blocks.get(store.head) + # No head yet: nothing to compare against; duty methods will no-op. + if head_block is None: + return True + head_slot = head_block.slot + # Clock skew or chain ahead of wall clock: trust the chain. + if int(slot) <= int(head_slot): + return True + lag = int(slot) - int(head_slot) + if lag <= SYNC_LAG_THRESHOLD: + return True + # Local node is behind. Check whether the network is also behind. + peer_max = self.sync_service.peer_manager.get_network_head_slot() + # No peer evidence at all: isolated node, fall through and try. + if peer_max is None: + return True + # Network-wide skip: the highest peer head is also lagged. Keep duties + # alive so the chain can keep progressing through the skipped slots. + if int(slot) - int(peer_max) > SYNC_LAG_THRESHOLD: + return True + return False + + def _record_lag_skip(self, slot: Slot, duty: str) -> None: + """ + Emit a structured log and increment the lag-skip counter. + + Args: + slot: Slot whose duty was skipped. + duty: One of "block" or "attestation". Identifies the duty type + in the log so operators can attribute missed signatures. + """ + store = self.sync_service.store + head_block = store.blocks.get(store.head) + head_slot = head_block.slot if head_block is not None else Slot(0) + lag = max(0, int(slot) - int(head_slot)) + peer_max = self.sync_service.peer_manager.get_network_head_slot() + peer_max_repr = int(peer_max) if peer_max is not None else "none" + logger.info( + "Validator duty skipped due to sync lag: " + "duty=%s slot=%d head_slot=%d lag=%d peer_max_head_slot=%s", + duty, + int(slot), + int(head_slot), + lag, + peer_max_repr, + ) + self._duties_skipped_lag += 1 + def stop(self) -> None: """ Stop the service. @@ -521,3 +600,8 @@ def blocks_produced(self) -> int: def attestations_produced(self) -> int: """Total attestations produced since creation.""" return self._attestations_produced + + @property + def duties_skipped_lag(self) -> int: + """Total duties (block + attestation) skipped because of sync lag.""" + return self._duties_skipped_lag diff --git a/tests/lean_spec/subspecs/sync/test_peer_manager.py b/tests/lean_spec/subspecs/sync/test_peer_manager.py index d8f14576a..aee53f757 100644 --- a/tests/lean_spec/subspecs/sync/test_peer_manager.py +++ b/tests/lean_spec/subspecs/sync/test_peer_manager.py @@ -279,6 +279,57 @@ def test_get_network_finalized_slot_ignores_disconnected( finalized = manager.get_network_finalized_slot() assert finalized == Slot(100) + def test_get_network_head_slot_returns_max( + self, peer_id: PeerId, peer_id_2: PeerId, peer_id_3: PeerId + ) -> None: + """get_network_head_slot returns the maximum head slot across peers.""" + manager = PeerManager() + + for pid, head_slot in [(peer_id, 100), (peer_id_2, 250), (peer_id_3, 175)]: + info = PeerInfo(peer_id=pid, state=ConnectionState.CONNECTED) + sync_peer = manager.add_peer(info) + sync_peer.status = Status( + finalized=Checkpoint(root=Bytes32.zero(), slot=Slot(50)), + head=Checkpoint(root=Bytes32.zero(), slot=Slot(head_slot)), + ) + + assert manager.get_network_head_slot() == Slot(250) + + def test_get_network_head_slot_none_without_status(self, connected_peer_info: PeerInfo) -> None: + """get_network_head_slot returns None when no peer has reported status.""" + manager = PeerManager() + manager.add_peer(connected_peer_info) + assert manager.get_network_head_slot() is None + + def test_get_network_head_slot_none_with_no_peers(self) -> None: + """get_network_head_slot returns None when there are no peers at all.""" + manager = PeerManager() + assert manager.get_network_head_slot() is None + + def test_get_network_head_slot_ignores_disconnected( + self, peer_id: PeerId, peer_id_2: PeerId + ) -> None: + """Disconnected peers are excluded even if they have a recent reported head.""" + manager = PeerManager() + + info1 = PeerInfo(peer_id=peer_id, state=ConnectionState.CONNECTED) + info2 = PeerInfo(peer_id=peer_id_2, state=ConnectionState.DISCONNECTED) + + sync_peer1 = manager.add_peer(info1) + sync_peer2 = manager.add_peer(info2) + + sync_peer1.status = Status( + finalized=Checkpoint(root=Bytes32.zero(), slot=Slot(50)), + head=Checkpoint(root=Bytes32.zero(), slot=Slot(100)), + ) + # Disconnected peer reports a more recent head; must be ignored. + sync_peer2.status = Status( + finalized=Checkpoint(root=Bytes32.zero(), slot=Slot(200)), + head=Checkpoint(root=Bytes32.zero(), slot=Slot(500)), + ) + + assert manager.get_network_head_slot() == Slot(100) + class TestPeerManagerRequestCallbacks: """Tests for PeerManager request callbacks.""" diff --git a/tests/lean_spec/subspecs/validator/test_service.py b/tests/lean_spec/subspecs/validator/test_service.py index 0d0a4ebfc..1607edc09 100644 --- a/tests/lean_spec/subspecs/validator/test_service.py +++ b/tests/lean_spec/subspecs/validator/test_service.py @@ -22,6 +22,7 @@ from lean_spec.subspecs.sync.peer_manager import PeerManager from lean_spec.subspecs.sync.service import SyncService from lean_spec.subspecs.validator import ValidatorRegistry, ValidatorService +from lean_spec.subspecs.validator.constants import SYNC_LAG_THRESHOLD from lean_spec.subspecs.validator.registry import ValidatorEntry from lean_spec.subspecs.xmss import TARGET_SIGNATURE_SCHEME from lean_spec.subspecs.xmss.aggregation import AggregatedSignatureProof @@ -1315,3 +1316,246 @@ async def capture_attestation(attestation: SignedAttestation) -> None: sig=signed_att.signature, ) assert not is_invalid, "Signature should not verify with the wrong slot" + + +def _set_head_slot(sync_service: SyncService, slot: Slot) -> None: + """Replace the head block in-place so its slot reads as `slot`. + + The genesis store has a single block at the head root. Mutating its slot + lets the duty gate observe whatever lag the test wants to exercise without + constructing a full chain. + """ + head_root = sync_service.store.head + new_head = sync_service.store.blocks[head_root].model_copy(update={"slot": slot}) + sync_service.store = sync_service.store.model_copy(update={"blocks": {head_root: new_head}}) + + +class TestSyncLagGate: + """ + Tests for the sync-lag duty gate. + + The gate combines local lag with peer-reported head slots so that: + + - A node that is itself behind a network making progress is silenced + - A node behind because the network is also behind keeps signing duties + """ + + def test_within_threshold_allows_duties(self, sync_service: SyncService) -> None: + """Lag of 0..SYNC_LAG_THRESHOLD slots is allowed.""" + _set_head_slot(sync_service, Slot(10)) + service = ValidatorService( + sync_service=sync_service, + clock=SlotClock(genesis_time=Uint64(0)), + registry=ValidatorRegistry(), + ) + + for lag in range(SYNC_LAG_THRESHOLD + 1): + assert service._is_synced_for_duties(Slot(10 + lag)) + + def test_just_over_threshold_with_recent_peer_gates(self, sync_service: SyncService) -> None: + """Lag > THRESHOLD is gated when at least one peer has a fresh head.""" + _set_head_slot(sync_service, Slot(10)) + service = ValidatorService( + sync_service=sync_service, + clock=SlotClock(genesis_time=Uint64(0)), + registry=ValidatorRegistry(), + ) + + with patch.object( + PeerManager, + "get_network_head_slot", + return_value=Slot(15), + ): + assert not service._is_synced_for_duties(Slot(15)) + + def test_clock_skew_does_not_gate(self, sync_service: SyncService) -> None: + """If wall clock is behind head slot, duties are allowed (trust the chain).""" + _set_head_slot(sync_service, Slot(20)) + service = ValidatorService( + sync_service=sync_service, + clock=SlotClock(genesis_time=Uint64(0)), + registry=ValidatorRegistry(), + ) + assert service._is_synced_for_duties(Slot(15)) + + def test_no_peer_status_does_not_gate(self, sync_service: SyncService) -> None: + """An isolated node with no peer status keeps duties live.""" + _set_head_slot(sync_service, Slot(0)) + service = ValidatorService( + sync_service=sync_service, + clock=SlotClock(genesis_time=Uint64(0)), + registry=ValidatorRegistry(), + ) + # peer_max defaults to None on a fresh PeerManager; lag is 100. + assert service._is_synced_for_duties(Slot(100)) + + def test_network_wide_stall_does_not_gate(self, sync_service: SyncService) -> None: + """When even the most up-to-date peer is far behind, duties stay live. + + Regression for the consecutive skipped-slots edge case: the simple + local-lag check would silence every validator at the same moment and + make recovery impossible. + """ + _set_head_slot(sync_service, Slot(0)) + service = ValidatorService( + sync_service=sync_service, + clock=SlotClock(genesis_time=Uint64(0)), + registry=ValidatorRegistry(), + ) + with patch.object( + PeerManager, + "get_network_head_slot", + return_value=Slot(0), + ): + assert service._is_synced_for_duties(Slot(50)) + + def test_boundary_lag_equal_threshold_allowed(self, sync_service: SyncService) -> None: + """Boundary: lag == SYNC_LAG_THRESHOLD is still allowed.""" + _set_head_slot(sync_service, Slot(10)) + service = ValidatorService( + sync_service=sync_service, + clock=SlotClock(genesis_time=Uint64(0)), + registry=ValidatorRegistry(), + ) + wall_clock = Slot(10 + SYNC_LAG_THRESHOLD) + with patch.object( + PeerManager, + "get_network_head_slot", + return_value=wall_clock, + ): + assert service._is_synced_for_duties(wall_clock) + + def test_boundary_lag_one_over_threshold_gated(self, sync_service: SyncService) -> None: + """Boundary: lag == SYNC_LAG_THRESHOLD + 1 with recent peer is gated.""" + _set_head_slot(sync_service, Slot(10)) + service = ValidatorService( + sync_service=sync_service, + clock=SlotClock(genesis_time=Uint64(0)), + registry=ValidatorRegistry(), + ) + wall_clock = Slot(10 + SYNC_LAG_THRESHOLD + 1) + with patch.object( + PeerManager, + "get_network_head_slot", + return_value=wall_clock, + ): + assert not service._is_synced_for_duties(wall_clock) + + async def test_run_loop_skips_block_production_when_gated( + self, sync_service: SyncService, key_manager: XmssKeyManager + ) -> None: + """Interval 0 in a gated slot does not invoke _maybe_produce_block.""" + _set_head_slot(sync_service, Slot(0)) + clock = SlotClock(genesis_time=Uint64(0), time_fn=lambda: _interval_time(10, 0)) + service = ValidatorService( + sync_service=sync_service, + clock=clock, + registry=_make_registry(key_manager, 0), + ) + + block_calls: list[Slot] = [] + + async def mock_block(_self, slot: Slot) -> None: + block_calls.append(slot) + + async def stop_on_sleep(_d: float) -> None: + service.stop() + + with ( + patch.object( + PeerManager, + "get_network_head_slot", + return_value=Slot(10), + ), + patch.object(ValidatorService, "_maybe_produce_block", mock_block), + patch("asyncio.sleep", new=stop_on_sleep), + ): + await service.run() + + assert block_calls == [] + assert service.duties_skipped_lag >= 1 + + async def test_run_loop_skips_attestation_when_gated( + self, sync_service: SyncService, key_manager: XmssKeyManager + ) -> None: + """Gated attestation: duty is skipped AND the slot stays unmarked. + + The slot must stay out of `_attested_slots` so a node that catches up + before the slot ends can still attest in this same slot. + """ + _set_head_slot(sync_service, Slot(0)) + clock = SlotClock(genesis_time=Uint64(0), time_fn=lambda: _interval_time(10, 1)) + service = ValidatorService( + sync_service=sync_service, + clock=clock, + registry=_make_registry(key_manager, 0), + ) + + attest_calls: list[Slot] = [] + + async def mock_attest(_self, slot: Slot) -> None: + attest_calls.append(slot) + + async def stop_on_sleep(_d: float) -> None: + service.stop() + + with ( + patch.object( + PeerManager, + "get_network_head_slot", + return_value=Slot(10), + ), + patch.object(ValidatorService, "_produce_attestations", mock_attest), + patch("asyncio.sleep", new=stop_on_sleep), + ): + await service.run() + + assert attest_calls == [] + assert Slot(10) not in service._attested_slots + assert service.duties_skipped_lag >= 1 + + def test_record_lag_skip_logs_structured_fields_and_increments_counter( + self, sync_service: SyncService, caplog: pytest.LogCaptureFixture + ) -> None: + """The skip log carries duty type, slot, head_slot, lag, and peer_max.""" + _set_head_slot(sync_service, Slot(3)) + service = ValidatorService( + sync_service=sync_service, + clock=SlotClock(genesis_time=Uint64(0)), + registry=ValidatorRegistry(), + ) + + with ( + caplog.at_level("INFO"), + patch.object( + PeerManager, + "get_network_head_slot", + return_value=Slot(20), + ), + ): + service._record_lag_skip(Slot(20), "block") + + assert "duty=block" in caplog.text + assert "slot=20" in caplog.text + assert "head_slot=3" in caplog.text + assert "lag=17" in caplog.text + assert "peer_max_head_slot=20" in caplog.text + assert service.duties_skipped_lag == 1 + + def test_record_lag_skip_with_no_peer_status_renders_none( + self, sync_service: SyncService, caplog: pytest.LogCaptureFixture + ) -> None: + """When no peer has reported status, the log renders peer_max as 'none'.""" + _set_head_slot(sync_service, Slot(3)) + service = ValidatorService( + sync_service=sync_service, + clock=SlotClock(genesis_time=Uint64(0)), + registry=ValidatorRegistry(), + ) + + with caplog.at_level("INFO"): + service._record_lag_skip(Slot(20), "attestation") + + assert "duty=attestation" in caplog.text + assert "peer_max_head_slot=none" in caplog.text + assert service.duties_skipped_lag == 1 From a2a26adc181419bb163a13b67653630d768f9b19 Mon Sep 17 00:00:00 2001 From: Shariq Naiyer Date: Sun, 10 May 2026 19:52:43 -0600 Subject: [PATCH 2/4] chore: inline logging --- src/lean_spec/subspecs/validator/service.py | 42 ++++++---------- .../subspecs/validator/test_service.py | 48 ++++++------------- 2 files changed, 29 insertions(+), 61 deletions(-) diff --git a/src/lean_spec/subspecs/validator/service.py b/src/lean_spec/subspecs/validator/service.py index 5c07bab86..d63cf09a6 100644 --- a/src/lean_spec/subspecs/validator/service.py +++ b/src/lean_spec/subspecs/validator/service.py @@ -172,10 +172,8 @@ async def run(self) -> None: # # Check if any of our validators is the proposer. logger.debug("ValidatorService: checking block production for slot %d", slot) - if self._is_synced_for_duties(slot): + if self._is_synced_for_duties(slot, "block"): await self._maybe_produce_block(slot) - else: - self._record_lag_skip(slot, "block") logger.debug("ValidatorService: done block production check for slot %d", slot) # Re-fetch interval after block production. @@ -204,7 +202,10 @@ async def run(self) -> None: slot, interval, ) - if self._is_synced_for_duties(slot): + # A gated slot is intentionally NOT added to _attested_slots: + # if the node catches up before the slot ends, the next loop + # iteration retries the duty. + if self._is_synced_for_duties(slot, "attestation"): await self._produce_attestations(slot) logger.debug("ValidatorService: done producing attestations for slot %d", slot) self._attested_slots.add(slot) @@ -215,10 +216,6 @@ async def run(self) -> None: # We never need to attest for slots that far in the past. prune_threshold = Slot(max(0, int(slot) - 4)) self._attested_slots = {s for s in self._attested_slots if s >= prune_threshold} - else: - # Do NOT mark the slot attested: if the node catches up before - # the slot ends, the next iteration should retry the duty. - self._record_lag_skip(slot, "attestation") # Intervals 2-4 have no additional validator duties. @@ -510,7 +507,7 @@ def _sign_with_key( self.registry.add(updated_entry) return updated_entry, signature - def _is_synced_for_duties(self, slot: Slot) -> bool: + def _is_synced_for_duties(self, slot: Slot, duty: str) -> bool: """ Decide whether validator duties should run for the given slot. @@ -521,8 +518,13 @@ def _is_synced_for_duties(self, slot: Slot) -> bool: the network is stalling (a streak of skipped proposals) and gating our duties would only deepen the stall. + Logs a structured skip line and increments the lag counter when the + gate fires, so callers stay a single boolean check. + Args: slot: Wall-clock slot for which a duty would be performed. + duty: One of "block" or "attestation". Recorded on the skip log + so operators can attribute missed signatures. Returns: True when duties should run; False when the local node is @@ -549,33 +551,17 @@ def _is_synced_for_duties(self, slot: Slot) -> bool: # alive so the chain can keep progressing through the skipped slots. if int(slot) - int(peer_max) > SYNC_LAG_THRESHOLD: return True - return False - - def _record_lag_skip(self, slot: Slot, duty: str) -> None: - """ - Emit a structured log and increment the lag-skip counter. - - Args: - slot: Slot whose duty was skipped. - duty: One of "block" or "attestation". Identifies the duty type - in the log so operators can attribute missed signatures. - """ - store = self.sync_service.store - head_block = store.blocks.get(store.head) - head_slot = head_block.slot if head_block is not None else Slot(0) - lag = max(0, int(slot) - int(head_slot)) - peer_max = self.sync_service.peer_manager.get_network_head_slot() - peer_max_repr = int(peer_max) if peer_max is not None else "none" logger.info( "Validator duty skipped due to sync lag: " - "duty=%s slot=%d head_slot=%d lag=%d peer_max_head_slot=%s", + "duty=%s slot=%d head_slot=%d lag=%d peer_max_head_slot=%d", duty, int(slot), int(head_slot), lag, - peer_max_repr, + int(peer_max), ) self._duties_skipped_lag += 1 + return False def stop(self) -> None: """ diff --git a/tests/lean_spec/subspecs/validator/test_service.py b/tests/lean_spec/subspecs/validator/test_service.py index 1607edc09..be7fe3fa0 100644 --- a/tests/lean_spec/subspecs/validator/test_service.py +++ b/tests/lean_spec/subspecs/validator/test_service.py @@ -1350,7 +1350,7 @@ def test_within_threshold_allows_duties(self, sync_service: SyncService) -> None ) for lag in range(SYNC_LAG_THRESHOLD + 1): - assert service._is_synced_for_duties(Slot(10 + lag)) + assert service._is_synced_for_duties(Slot(10 + lag), "block") def test_just_over_threshold_with_recent_peer_gates(self, sync_service: SyncService) -> None: """Lag > THRESHOLD is gated when at least one peer has a fresh head.""" @@ -1366,7 +1366,7 @@ def test_just_over_threshold_with_recent_peer_gates(self, sync_service: SyncServ "get_network_head_slot", return_value=Slot(15), ): - assert not service._is_synced_for_duties(Slot(15)) + assert not service._is_synced_for_duties(Slot(15), "block") def test_clock_skew_does_not_gate(self, sync_service: SyncService) -> None: """If wall clock is behind head slot, duties are allowed (trust the chain).""" @@ -1376,7 +1376,7 @@ def test_clock_skew_does_not_gate(self, sync_service: SyncService) -> None: clock=SlotClock(genesis_time=Uint64(0)), registry=ValidatorRegistry(), ) - assert service._is_synced_for_duties(Slot(15)) + assert service._is_synced_for_duties(Slot(15), "block") def test_no_peer_status_does_not_gate(self, sync_service: SyncService) -> None: """An isolated node with no peer status keeps duties live.""" @@ -1387,7 +1387,7 @@ def test_no_peer_status_does_not_gate(self, sync_service: SyncService) -> None: registry=ValidatorRegistry(), ) # peer_max defaults to None on a fresh PeerManager; lag is 100. - assert service._is_synced_for_duties(Slot(100)) + assert service._is_synced_for_duties(Slot(100), "block") def test_network_wide_stall_does_not_gate(self, sync_service: SyncService) -> None: """When even the most up-to-date peer is far behind, duties stay live. @@ -1407,7 +1407,7 @@ def test_network_wide_stall_does_not_gate(self, sync_service: SyncService) -> No "get_network_head_slot", return_value=Slot(0), ): - assert service._is_synced_for_duties(Slot(50)) + assert service._is_synced_for_duties(Slot(50), "block") def test_boundary_lag_equal_threshold_allowed(self, sync_service: SyncService) -> None: """Boundary: lag == SYNC_LAG_THRESHOLD is still allowed.""" @@ -1423,7 +1423,7 @@ def test_boundary_lag_equal_threshold_allowed(self, sync_service: SyncService) - "get_network_head_slot", return_value=wall_clock, ): - assert service._is_synced_for_duties(wall_clock) + assert service._is_synced_for_duties(wall_clock, "block") def test_boundary_lag_one_over_threshold_gated(self, sync_service: SyncService) -> None: """Boundary: lag == SYNC_LAG_THRESHOLD + 1 with recent peer is gated.""" @@ -1439,7 +1439,7 @@ def test_boundary_lag_one_over_threshold_gated(self, sync_service: SyncService) "get_network_head_slot", return_value=wall_clock, ): - assert not service._is_synced_for_duties(wall_clock) + assert not service._is_synced_for_duties(wall_clock, "block") async def test_run_loop_skips_block_production_when_gated( self, sync_service: SyncService, key_manager: XmssKeyManager @@ -1514,10 +1514,10 @@ async def stop_on_sleep(_d: float) -> None: assert Slot(10) not in service._attested_slots assert service.duties_skipped_lag >= 1 - def test_record_lag_skip_logs_structured_fields_and_increments_counter( + def test_gated_call_logs_structured_fields_and_increments_counter( self, sync_service: SyncService, caplog: pytest.LogCaptureFixture ) -> None: - """The skip log carries duty type, slot, head_slot, lag, and peer_max.""" + """When the gate fires, it logs duty/slot/head/lag/peer_max and bumps the counter.""" _set_head_slot(sync_service, Slot(3)) service = ValidatorService( sync_service=sync_service, @@ -1533,29 +1533,11 @@ def test_record_lag_skip_logs_structured_fields_and_increments_counter( return_value=Slot(20), ), ): - service._record_lag_skip(Slot(20), "block") + gated = service._is_synced_for_duties(Slot(20), "block") - assert "duty=block" in caplog.text - assert "slot=20" in caplog.text - assert "head_slot=3" in caplog.text - assert "lag=17" in caplog.text - assert "peer_max_head_slot=20" in caplog.text - assert service.duties_skipped_lag == 1 - - def test_record_lag_skip_with_no_peer_status_renders_none( - self, sync_service: SyncService, caplog: pytest.LogCaptureFixture - ) -> None: - """When no peer has reported status, the log renders peer_max as 'none'.""" - _set_head_slot(sync_service, Slot(3)) - service = ValidatorService( - sync_service=sync_service, - clock=SlotClock(genesis_time=Uint64(0)), - registry=ValidatorRegistry(), - ) - - with caplog.at_level("INFO"): - service._record_lag_skip(Slot(20), "attestation") - - assert "duty=attestation" in caplog.text - assert "peer_max_head_slot=none" in caplog.text + assert gated is False + assert [r.getMessage() for r in caplog.records] == [ + "Validator duty skipped due to sync lag: " + "duty=block slot=20 head_slot=3 lag=17 peer_max_head_slot=20" + ] assert service.duties_skipped_lag == 1 From a7461b7e1340c9e656ca5e3192144800c88f062a Mon Sep 17 00:00:00 2001 From: Thomas Coratger <60488569+tcoratger@users.noreply.github.com> Date: Mon, 11 May 2026 15:19:52 +0200 Subject: [PATCH 3/4] refactor(validator): harden sync-lag gate per consensus + py review Address review findings on the sync-lag duty gate. Decision logic - Replace peer-reported head-slot signal with local-store evidence: the freshest slot across blocks already validated into the store. Drops the unauthenticated peer.status.head.slot path entirely. - Add NETWORK_STALL_THRESHOLD (= 2 * SYNC_LAG_THRESHOLD) distinct from the local threshold so jitter at the local boundary cannot also trip the network-wide branch. - Add HYSTERESIS_BAND so a closed gate reopens only when lag drops to threshold - band, preventing slot-over-slot flap. - Persist gate state on the service; log only on state transitions instead of every query. - Saturate the future-head case at zero lag rather than trusting the chain unconditionally. API and types - duty parameter typed as Literal["block", "attestation"]. - Split _duties_skipped_lag into _blocks_skipped_lag and _attestations_skipped_lag, owned by the run loop. Attribution flattened so wrong-interval slots never tick the gate counter. - Drop redundant int(slot) casts where Slot arithmetic suffices. - Remove PeerManager.get_network_head_slot and its tests now that no caller remains. Tests - Replace patch.object(PeerManager, ...) mocks with real PeerManager and store manipulation, matching repo policy. - New helpers preserve the block-map key-equals-root invariant. - Add hysteresis flap test, split-counter test, transition-only log assertion. Use substring checks instead of exact log strings. Documentation - Constants, gate method, properties, fields, and inline comments rewritten per /doc rules: structured Why/Effect/Decision matrix labels, one idea per line, no function or variable names in prose, concrete numbers throughout. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/lean_spec/subspecs/sync/peer_manager.py | 17 - src/lean_spec/subspecs/validator/constants.py | 45 ++- src/lean_spec/subspecs/validator/service.py | 185 ++++++--- .../subspecs/sync/test_peer_manager.py | 51 --- .../subspecs/validator/test_service.py | 361 +++++++++++------- 5 files changed, 386 insertions(+), 273 deletions(-) diff --git a/src/lean_spec/subspecs/sync/peer_manager.py b/src/lean_spec/subspecs/sync/peer_manager.py index 97203141f..da679e4bd 100644 --- a/src/lean_spec/subspecs/sync/peer_manager.py +++ b/src/lean_spec/subspecs/sync/peer_manager.py @@ -167,23 +167,6 @@ def get_network_finalized_slot(self) -> Slot | None: return None return counter.most_common(1)[0][0] - def get_network_head_slot(self) -> Slot | None: - """ - Highest head slot reported by any connected peer with status. - - Used to distinguish a node that is itself behind from a network where - every peer is also behind (e.g. a streak of skipped proposals). - Returns None when no connected peer has reported a status yet. - """ - slots = [ - peer.status.head.slot - for peer in self._peers.values() - if peer.status is not None and peer.is_connected() - ] - if not slots: - return None - return max(slots) - def on_request_success(self, peer_id: PeerId) -> None: """Record a successful request to a peer.""" peer = self._peers.get(peer_id) diff --git a/src/lean_spec/subspecs/validator/constants.py b/src/lean_spec/subspecs/validator/constants.py index 8ef0b8675..5337be0ce 100644 --- a/src/lean_spec/subspecs/validator/constants.py +++ b/src/lean_spec/subspecs/validator/constants.py @@ -1,22 +1,43 @@ -""" -Validator service constants. +"""Validator duty-gate thresholds. -Operational thresholds governing validator duty execution. -""" +Informative, not normative: -from __future__ import annotations +- Shape when this node signs. +- Do not change what consensus accepts. +- Clients may diverge without breaking interop. +""" from typing import Final SYNC_LAG_THRESHOLD: Final[int] = 4 +"""Slot lag past which the local view is too stale to sign. + +Why: + Lean justifies and finalizes within a handful of slots. + A 4-slot lag is one full justification window behind real time. + A vote from that view lands on a subtree the network has left. """ -Maximum tolerated lag, in slots, between wall-clock and the local head before -validator duties are skipped. -A node whose local head trails wall clock by more than this attests against a -stale subtree, depositing fork-choice weight on the wrong branch. +NETWORK_STALL_THRESHOLD: Final[int] = 8 +"""Slot lag past which the whole network is treated as stalled. + +Why: + Set to twice the local threshold (8 = 2 * 4). + Ordinary jitter at the local boundary must not trip this branch. + +Effect: + Even the freshest locally validated block is 8 slots behind. + The cause is a streak of skipped proposals, not this node lagging. + Duties stay live so the chain can advance through the gap. +""" + +HYSTERESIS_BAND: Final[int] = 2 +"""Slot band that holds the gate closed near the threshold. + +Why: + Without a band a single late gossip block flips the decision. + Slot-over-slot flips would stutter the attestation stream. -The gate also checks peer-reported head slots: if no peer claims a recent head, -the network as a whole is lagging (e.g. a streak of skipped proposals) and the -gate stays open so the chain can keep progressing. +Effect: + Once closed, the gate reopens only when lag drops to 4 - 2 = 2. """ diff --git a/src/lean_spec/subspecs/validator/service.py b/src/lean_spec/subspecs/validator/service.py index d63cf09a6..b129b87ce 100644 --- a/src/lean_spec/subspecs/validator/service.py +++ b/src/lean_spec/subspecs/validator/service.py @@ -54,7 +54,7 @@ from lean_spec.subspecs.xmss.containers import Signature from lean_spec.types import Bytes32, Slot, Uint64, ValidatorIndex -from .constants import SYNC_LAG_THRESHOLD +from .constants import HYSTERESIS_BAND, NETWORK_STALL_THRESHOLD, SYNC_LAG_THRESHOLD from .registry import ValidatorEntry, ValidatorRegistry logger = logging.getLogger(__name__) @@ -112,8 +112,14 @@ class ValidatorService: _attested_slots: set[Slot] = field(default_factory=set, repr=False) """Slots for which we've already produced attestations (prevents duplicates).""" - _duties_skipped_lag: int = field(default=0, repr=False) - """Counter for duties skipped because the local head trails wall clock.""" + _blocks_skipped_lag: int = field(default=0, repr=False) + """Block proposals skipped because the local view was too stale.""" + + _attestations_skipped_lag: int = field(default=0, repr=False) + """Attestations skipped because the local view was too stale.""" + + _duty_gate_closed: bool = field(default=False, repr=False) + """Hysteresis flag. True while signing is silenced.""" async def run(self) -> None: """ @@ -174,6 +180,8 @@ async def run(self) -> None: logger.debug("ValidatorService: checking block production for slot %d", slot) if self._is_synced_for_duties(slot, "block"): await self._maybe_produce_block(slot) + else: + self._blocks_skipped_lag += 1 logger.debug("ValidatorService: done block production check for slot %d", slot) # Re-fetch interval after block production. @@ -196,26 +204,41 @@ async def run(self) -> None: slot, slot in self._attested_slots, ) - if interval >= Uint64(1) and slot not in self._attested_slots: + # Decide whether this iteration owes an attestation. + # + # Two conditions: + # + # - Interval has reached the attestation slot (>= 1). + # - This slot has not already been attested. + # + # Why split eligibility from the sync gate: the skip counter + # must only tick on real misses, never on wrong-interval + # iterations. + needs_attestation = interval >= Uint64(1) and slot not in self._attested_slots + if needs_attestation: logger.debug( "ValidatorService: producing attestations for slot %d (interval %d)", slot, interval, ) - # A gated slot is intentionally NOT added to _attested_slots: - # if the node catches up before the slot ends, the next loop + # Apply the sync gate. + # + # Invariant: a gated slot stays out of the attested set. + # If the node catches up before the slot ends, the next # iteration retries the duty. if self._is_synced_for_duties(slot, "attestation"): await self._produce_attestations(slot) logger.debug("ValidatorService: done producing attestations for slot %d", slot) self._attested_slots.add(slot) - # Prune old entries to prevent unbounded growth. + # Prune old entries to bound memory. # - # Keep only recent slots (current slot - 4) to bound memory usage. - # We never need to attest for slots that far in the past. + # Keep only slots at or after (current slot - 4). + # Older slots are no longer attestable. prune_threshold = Slot(max(0, int(slot) - 4)) self._attested_slots = {s for s in self._attested_slots if s >= prune_threshold} + else: + self._attestations_skipped_lag += 1 # Intervals 2-4 have no additional validator duties. @@ -507,61 +530,103 @@ def _sign_with_key( self.registry.add(updated_entry) return updated_entry, signature - def _is_synced_for_duties(self, slot: Slot, duty: str) -> bool: - """ - Decide whether validator duties should run for the given slot. - - Combines two signals to avoid two failure modes: - - 1. Local lag: if our head is close to wall clock, attest and propose. - 2. Peer max head: if even the most up-to-date peer is also far behind, - the network is stalling (a streak of skipped proposals) and gating - our duties would only deepen the stall. + def _is_synced_for_duties( + self, + slot: Slot, + duty: Literal["block", "attestation"], + ) -> bool: + """Decide whether duties may run for the given slot. - Logs a structured skip line and increments the lag counter when the - gate fires, so callers stay a single boolean check. + Combines local lag and local-store stall evidence with + hysteresis. Returns False only when the local view is stale + relative to a network that is otherwise making progress. Args: - slot: Wall-clock slot for which a duty would be performed. - duty: One of "block" or "attestation". Recorded on the skip log - so operators can attribute missed signatures. + slot: Wall-clock slot for which a duty would run. + duty: Tag for the transition log. Returns: - True when duties should run; False when the local node is - materially behind a network that has otherwise made progress. + True when duties should run, False to silence them. """ store = self.sync_service.store head_block = store.blocks.get(store.head) - # No head yet: nothing to compare against; duty methods will no-op. + + # No head: nothing to compare against, let downstream code no-op. if head_block is None: return True + head_slot = head_block.slot - # Clock skew or chain ahead of wall clock: trust the chain. - if int(slot) <= int(head_slot): - return True - lag = int(slot) - int(head_slot) - if lag <= SYNC_LAG_THRESHOLD: - return True - # Local node is behind. Check whether the network is also behind. - peer_max = self.sync_service.peer_manager.get_network_head_slot() - # No peer evidence at all: isolated node, fall through and try. - if peer_max is None: - return True - # Network-wide skip: the highest peer head is also lagged. Keep duties - # alive so the chain can keep progressing through the skipped slots. - if int(slot) - int(peer_max) > SYNC_LAG_THRESHOLD: - return True - logger.info( - "Validator duty skipped due to sync lag: " - "duty=%s slot=%d head_slot=%d lag=%d peer_max_head_slot=%d", - duty, - int(slot), - int(head_slot), - lag, - int(peer_max), + + # Saturate at zero lag when the head is ahead of wall clock. + # + # Why: + # Local clock drift is normal. Unconditional trust would let + # a chain 100 slots in the future bypass every check. + lag = 0 if head_slot >= slot else int(slot - head_slot) + + # Local stall evidence from the block map. + # + # Why: + # Only blocks with valid signatures enter the map, so the + # freshest entry is an authenticated lower bound on the + # network tip. A stale max here means the network is not + # producing. + max_seen_slot = max( + (b.slot for b in store.blocks.values()), + default=head_slot, ) - self._duties_skipped_lag += 1 - return False + network_lag = 0 if max_seen_slot >= slot else int(slot - max_seen_slot) + network_stalling = network_lag > NETWORK_STALL_THRESHOLD + + # Decision matrix: + # + # - Network stalling: keep signing, reopen if currently closed. + # - Gate closed: reopen only when lag drops to 4 - 2 = 2. + # - Gate open: close as soon as lag crosses 4. + if network_stalling: + allow = True + if self._duty_gate_closed: + self._duty_gate_closed = False + logger.info( + "Validator duty gate reopened: network stall detected. " + "duty=%s slot=%d head_slot=%d lag=%d max_seen_slot=%d network_lag=%d", + duty, + int(slot), + int(head_slot), + lag, + int(max_seen_slot), + network_lag, + ) + elif self._duty_gate_closed: + # Hysteresis: reopen only well below the threshold. + allow = lag <= SYNC_LAG_THRESHOLD - HYSTERESIS_BAND + if allow: + self._duty_gate_closed = False + logger.info( + "Validator duty gate reopened: local view caught up. " + "duty=%s slot=%d head_slot=%d lag=%d", + duty, + int(slot), + int(head_slot), + lag, + ) + else: + # Open gate: close once the local threshold is crossed. + allow = lag <= SYNC_LAG_THRESHOLD + if not allow: + self._duty_gate_closed = True + logger.info( + "Validator duty gate closed: local view is stale. " + "duty=%s slot=%d head_slot=%d lag=%d max_seen_slot=%d network_lag=%d", + duty, + int(slot), + int(head_slot), + lag, + int(max_seen_slot), + network_lag, + ) + + return allow def stop(self) -> None: """ @@ -588,6 +653,16 @@ def attestations_produced(self) -> int: return self._attestations_produced @property - def duties_skipped_lag(self) -> int: - """Total duties (block + attestation) skipped because of sync lag.""" - return self._duties_skipped_lag + def blocks_skipped_lag(self) -> int: + """Block proposals skipped because the local view was too stale.""" + return self._blocks_skipped_lag + + @property + def attestations_skipped_lag(self) -> int: + """Attestations skipped because the local view was too stale.""" + return self._attestations_skipped_lag + + @property + def duty_gate_closed(self) -> bool: + """True while the sync-lag gate is silencing duties.""" + return self._duty_gate_closed diff --git a/tests/lean_spec/subspecs/sync/test_peer_manager.py b/tests/lean_spec/subspecs/sync/test_peer_manager.py index aee53f757..d8f14576a 100644 --- a/tests/lean_spec/subspecs/sync/test_peer_manager.py +++ b/tests/lean_spec/subspecs/sync/test_peer_manager.py @@ -279,57 +279,6 @@ def test_get_network_finalized_slot_ignores_disconnected( finalized = manager.get_network_finalized_slot() assert finalized == Slot(100) - def test_get_network_head_slot_returns_max( - self, peer_id: PeerId, peer_id_2: PeerId, peer_id_3: PeerId - ) -> None: - """get_network_head_slot returns the maximum head slot across peers.""" - manager = PeerManager() - - for pid, head_slot in [(peer_id, 100), (peer_id_2, 250), (peer_id_3, 175)]: - info = PeerInfo(peer_id=pid, state=ConnectionState.CONNECTED) - sync_peer = manager.add_peer(info) - sync_peer.status = Status( - finalized=Checkpoint(root=Bytes32.zero(), slot=Slot(50)), - head=Checkpoint(root=Bytes32.zero(), slot=Slot(head_slot)), - ) - - assert manager.get_network_head_slot() == Slot(250) - - def test_get_network_head_slot_none_without_status(self, connected_peer_info: PeerInfo) -> None: - """get_network_head_slot returns None when no peer has reported status.""" - manager = PeerManager() - manager.add_peer(connected_peer_info) - assert manager.get_network_head_slot() is None - - def test_get_network_head_slot_none_with_no_peers(self) -> None: - """get_network_head_slot returns None when there are no peers at all.""" - manager = PeerManager() - assert manager.get_network_head_slot() is None - - def test_get_network_head_slot_ignores_disconnected( - self, peer_id: PeerId, peer_id_2: PeerId - ) -> None: - """Disconnected peers are excluded even if they have a recent reported head.""" - manager = PeerManager() - - info1 = PeerInfo(peer_id=peer_id, state=ConnectionState.CONNECTED) - info2 = PeerInfo(peer_id=peer_id_2, state=ConnectionState.DISCONNECTED) - - sync_peer1 = manager.add_peer(info1) - sync_peer2 = manager.add_peer(info2) - - sync_peer1.status = Status( - finalized=Checkpoint(root=Bytes32.zero(), slot=Slot(50)), - head=Checkpoint(root=Bytes32.zero(), slot=Slot(100)), - ) - # Disconnected peer reports a more recent head; must be ignored. - sync_peer2.status = Status( - finalized=Checkpoint(root=Bytes32.zero(), slot=Slot(200)), - head=Checkpoint(root=Bytes32.zero(), slot=Slot(500)), - ) - - assert manager.get_network_head_slot() == Slot(100) - class TestPeerManagerRequestCallbacks: """Tests for PeerManager request callbacks.""" diff --git a/tests/lean_spec/subspecs/validator/test_service.py b/tests/lean_spec/subspecs/validator/test_service.py index be7fe3fa0..5a0c067fa 100644 --- a/tests/lean_spec/subspecs/validator/test_service.py +++ b/tests/lean_spec/subspecs/validator/test_service.py @@ -1318,134 +1318,207 @@ async def capture_attestation(attestation: SignedAttestation) -> None: assert not is_invalid, "Signature should not verify with the wrong slot" -def _set_head_slot(sync_service: SyncService, slot: Slot) -> None: - """Replace the head block in-place so its slot reads as `slot`. +def _replace_head_at_slot(sync_service: SyncService, head_slot: Slot) -> None: + """Rewrite the head block at the given slot, preserving the map invariant. + + Preserves + --------- + - All other blocks already in the store stay in place. + - The new head is keyed by the cryptographic root of its content. + + Why + --- + The duty gate reads both the head block and the freshest block in + the map. A helper that broke the key-equals-root invariant would + mask real bugs. + """ + blocks = dict(sync_service.store.blocks) + old_head_block = blocks.pop(sync_service.store.head) + new_head_block = old_head_block.model_copy(update={"slot": head_slot}) + new_root = hash_tree_root(new_head_block) + blocks[new_root] = new_head_block + sync_service.store = sync_service.store.model_copy(update={"blocks": blocks, "head": new_root}) + - The genesis store has a single block at the head root. Mutating its slot - lets the duty gate observe whatever lag the test wants to exercise without - constructing a full chain. +def _add_block_at_slot(sync_service: SyncService, slot: Slot) -> Bytes32: + """Add a non-head block at the given slot, returning its root. + + Why + --- + Injects freshness evidence without touching the head. The gate's + stall signal scans the highest slot across every block in the map. """ - head_root = sync_service.store.head - new_head = sync_service.store.blocks[head_root].model_copy(update={"slot": slot}) - sync_service.store = sync_service.store.model_copy(update={"blocks": {head_root: new_head}}) + template = next(iter(sync_service.store.blocks.values())) + new_block = template.model_copy(update={"slot": slot}) + new_root = hash_tree_root(new_block) + new_blocks = {**sync_service.store.blocks, new_root: new_block} + sync_service.store = sync_service.store.model_copy(update={"blocks": new_blocks}) + return new_root -class TestSyncLagGate: +def _build_gate_service(sync_service: SyncService) -> ValidatorService: + """Build a service for gate-only tests with an empty registry. + + The gate logic never consults the registry, so emptying it keeps + the focus on the predicate under test. """ - Tests for the sync-lag duty gate. + return ValidatorService( + sync_service=sync_service, + clock=SlotClock(genesis_time=Uint64(0)), + registry=ValidatorRegistry(), + ) - The gate combines local lag with peer-reported head slots so that: - - A node that is itself behind a network making progress is silenced - - A node behind because the network is also behind keeps signing duties +class TestSyncLagGate: + """Sync-lag duty gate. + + Decision matrix + --------------- + - Lag at or under threshold: duties run. + - Lag over threshold, fresh blocks locally: duties skip. + - Lag over threshold, no fresh blocks: duties run (network stall). + - Once closed, the gate reopens only after lag drops past the band. """ - def test_within_threshold_allows_duties(self, sync_service: SyncService) -> None: - """Lag of 0..SYNC_LAG_THRESHOLD slots is allowed.""" - _set_head_slot(sync_service, Slot(10)) - service = ValidatorService( - sync_service=sync_service, - clock=SlotClock(genesis_time=Uint64(0)), - registry=ValidatorRegistry(), - ) + def test_lag_within_threshold_allows_duties(self, sync_service: SyncService) -> None: + """Lag 0..threshold leaves the gate open.""" + # Head at slot 10, wall clock sweeps 10..14 (lag 0..4). + _replace_head_at_slot(sync_service, Slot(10)) + service = _build_gate_service(sync_service) + + # Every lag in the inclusive range must pass. for lag in range(SYNC_LAG_THRESHOLD + 1): assert service._is_synced_for_duties(Slot(10 + lag), "block") - def test_just_over_threshold_with_recent_peer_gates(self, sync_service: SyncService) -> None: - """Lag > THRESHOLD is gated when at least one peer has a fresh head.""" - _set_head_slot(sync_service, Slot(10)) - service = ValidatorService( - sync_service=sync_service, - clock=SlotClock(genesis_time=Uint64(0)), - registry=ValidatorRegistry(), - ) + def test_lag_over_threshold_with_fresh_local_block_gates( + self, sync_service: SyncService + ) -> None: + """Stale head plus a fresh local block: gate closes.""" - with patch.object( - PeerManager, - "get_network_head_slot", - return_value=Slot(15), - ): - assert not service._is_synced_for_duties(Slot(15), "block") + # Head at slot 10, wall clock at 20: local lag 10 (> 4). + _replace_head_at_slot(sync_service, Slot(10)) + + # Fresh local block at slot 20 makes the freshest seen slot 20. + # Network is not stalling, only local lag drives the decision. + _add_block_at_slot(sync_service, Slot(20)) + service = _build_gate_service(sync_service) + + assert not service._is_synced_for_duties(Slot(20), "block") + + def test_clock_skew_saturates_to_zero_lag(self, sync_service: SyncService) -> None: + """Head ahead of wall clock saturates to zero lag, not unlimited trust.""" + + # Head at slot 20, wall clock at slot 15: head leads by 5 slots. + # Saturation pins lag at 0, which trivially passes the threshold. + _replace_head_at_slot(sync_service, Slot(20)) + service = _build_gate_service(sync_service) - def test_clock_skew_does_not_gate(self, sync_service: SyncService) -> None: - """If wall clock is behind head slot, duties are allowed (trust the chain).""" - _set_head_slot(sync_service, Slot(20)) - service = ValidatorService( - sync_service=sync_service, - clock=SlotClock(genesis_time=Uint64(0)), - registry=ValidatorRegistry(), - ) assert service._is_synced_for_duties(Slot(15), "block") - def test_no_peer_status_does_not_gate(self, sync_service: SyncService) -> None: - """An isolated node with no peer status keeps duties live.""" - _set_head_slot(sync_service, Slot(0)) - service = ValidatorService( - sync_service=sync_service, - clock=SlotClock(genesis_time=Uint64(0)), - registry=ValidatorRegistry(), - ) - # peer_max defaults to None on a fresh PeerManager; lag is 100. + def test_no_extra_blocks_treats_isolation_as_network_stall( + self, sync_service: SyncService + ) -> None: + """Isolated node with only a stale head: gate stays open.""" + + # Head at slot 0, wall clock at slot 100, nothing else in the map. + # Freshest seen slot is 0, network lag is 100 (> 8): stall fires. + _replace_head_at_slot(sync_service, Slot(0)) + service = _build_gate_service(sync_service) + assert service._is_synced_for_duties(Slot(100), "block") - def test_network_wide_stall_does_not_gate(self, sync_service: SyncService) -> None: - """When even the most up-to-date peer is far behind, duties stay live. + def test_network_wide_stall_keeps_duties_live(self, sync_service: SyncService) -> None: + """All locally-known blocks stale: gate stays open.""" - Regression for the consecutive skipped-slots edge case: the simple - local-lag check would silence every validator at the same moment and - make recovery impossible. - """ - _set_head_slot(sync_service, Slot(0)) - service = ValidatorService( - sync_service=sync_service, - clock=SlotClock(genesis_time=Uint64(0)), - registry=ValidatorRegistry(), - ) - with patch.object( - PeerManager, - "get_network_head_slot", - return_value=Slot(0), - ): - assert service._is_synced_for_duties(Slot(50), "block") + # Head at slot 0, wall clock at slot 50, no fresh blocks. + # Network lag 50 (> 8). Without this branch every validator + # would silence at once and recovery would be impossible. + _replace_head_at_slot(sync_service, Slot(0)) + service = _build_gate_service(sync_service) + + assert service._is_synced_for_duties(Slot(50), "block") def test_boundary_lag_equal_threshold_allowed(self, sync_service: SyncService) -> None: - """Boundary: lag == SYNC_LAG_THRESHOLD is still allowed.""" - _set_head_slot(sync_service, Slot(10)) - service = ValidatorService( - sync_service=sync_service, - clock=SlotClock(genesis_time=Uint64(0)), - registry=ValidatorRegistry(), - ) - wall_clock = Slot(10 + SYNC_LAG_THRESHOLD) - with patch.object( - PeerManager, - "get_network_head_slot", - return_value=wall_clock, - ): - assert service._is_synced_for_duties(wall_clock, "block") + """Lag exactly at the threshold (4) leaves the gate open.""" + + # Head at slot 10, wall clock at slot 14: lag equals threshold. + # Fresh block at slot 14 keeps the stall branch from masking this. + _replace_head_at_slot(sync_service, Slot(10)) + _add_block_at_slot(sync_service, Slot(14)) + service = _build_gate_service(sync_service) + + assert service._is_synced_for_duties(Slot(10 + SYNC_LAG_THRESHOLD), "block") def test_boundary_lag_one_over_threshold_gated(self, sync_service: SyncService) -> None: - """Boundary: lag == SYNC_LAG_THRESHOLD + 1 with recent peer is gated.""" - _set_head_slot(sync_service, Slot(10)) - service = ValidatorService( - sync_service=sync_service, - clock=SlotClock(genesis_time=Uint64(0)), - registry=ValidatorRegistry(), - ) - wall_clock = Slot(10 + SYNC_LAG_THRESHOLD + 1) - with patch.object( - PeerManager, - "get_network_head_slot", - return_value=wall_clock, - ): - assert not service._is_synced_for_duties(wall_clock, "block") + """Lag of threshold + 1 closes the gate.""" + + # Head at slot 10, wall clock at slot 15: lag is 5. + _replace_head_at_slot(sync_service, Slot(10)) + _add_block_at_slot(sync_service, Slot(15)) + service = _build_gate_service(sync_service) + + assert not service._is_synced_for_duties(Slot(10 + SYNC_LAG_THRESHOLD + 1), "block") + + def test_hysteresis_prevents_flap(self, sync_service: SyncService) -> None: + """Closed gate stays closed near the threshold. + + Lag sequence + ------------ + - 5 -> gate closes (lag past threshold of 4). + - 4 -> stays closed (still inside the band). + - 5 -> stays closed (no flap). + - 2 -> reopens (lag at or below 4 - 2). + """ + + # Initial head at slot 10, fresh local block at slot 20. + # The fresh block keeps the stall escape from masking the band test. + _replace_head_at_slot(sync_service, Slot(10)) + _add_block_at_slot(sync_service, Slot(20)) + service = _build_gate_service(sync_service) + + # Lag = 5: gate closes. + assert not service._is_synced_for_duties(Slot(15), "block") + + # Lag = 4: stays closed because the band requires lag <= 2. + _replace_head_at_slot(sync_service, Slot(11)) + _add_block_at_slot(sync_service, Slot(20)) + assert not service._is_synced_for_duties(Slot(15), "block") + + # Lag back to 5: still closed, no flap event. + _replace_head_at_slot(sync_service, Slot(10)) + _add_block_at_slot(sync_service, Slot(20)) + assert not service._is_synced_for_duties(Slot(15), "block") + + # Lag = 2: at or below the 4 - 2 band, gate reopens. + _replace_head_at_slot(sync_service, Slot(13)) + _add_block_at_slot(sync_service, Slot(20)) + assert service._is_synced_for_duties(Slot(15), "block") + + def test_counters_split_block_and_attestation(self, sync_service: SyncService) -> None: + """Counters live on the loop, not on the gate.""" + + # Head 0, wall clock 20, fresh block at 20: gate closes. + _replace_head_at_slot(sync_service, Slot(0)) + _add_block_at_slot(sync_service, Slot(20)) + service = _build_gate_service(sync_service) + + # Invariant: the gate never moves counters. Attribution belongs + # to the run loop. Querying the gate must leave them at zero. + assert not service._is_synced_for_duties(Slot(20), "block") + assert service.blocks_skipped_lag == 0 + assert service.attestations_skipped_lag == 0 + assert service.duty_gate_closed is True async def test_run_loop_skips_block_production_when_gated( self, sync_service: SyncService, key_manager: XmssKeyManager ) -> None: - """Interval 0 in a gated slot does not invoke _maybe_produce_block.""" - _set_head_slot(sync_service, Slot(0)) + """Closed gate at interval 0 skips block production and ticks only the block counter.""" + + # Wall clock at slot 10 interval 0, head stuck at slot 0. + # Fresh local block at slot 10 makes the lag local, not network-wide. + _replace_head_at_slot(sync_service, Slot(0)) + _add_block_at_slot(sync_service, Slot(10)) clock = SlotClock(genesis_time=Uint64(0), time_fn=lambda: _interval_time(10, 0)) service = ValidatorService( sync_service=sync_service, @@ -1462,28 +1535,31 @@ async def stop_on_sleep(_d: float) -> None: service.stop() with ( - patch.object( - PeerManager, - "get_network_head_slot", - return_value=Slot(10), - ), patch.object(ValidatorService, "_maybe_produce_block", mock_block), patch("asyncio.sleep", new=stop_on_sleep), ): await service.run() + # Block path bypassed, attestation counter untouched. assert block_calls == [] - assert service.duties_skipped_lag >= 1 + assert service.blocks_skipped_lag >= 1 + assert service.attestations_skipped_lag == 0 async def test_run_loop_skips_attestation_when_gated( self, sync_service: SyncService, key_manager: XmssKeyManager ) -> None: - """Gated attestation: duty is skipped AND the slot stays unmarked. + """Closed gate at interval 1 skips attestation and leaves the slot retryable. - The slot must stay out of `_attested_slots` so a node that catches up - before the slot ends can still attest in this same slot. + Why + --- + Keeping the slot out of the attested set lets the next loop + iteration retry within the same slot if the node catches up + before slot end. """ - _set_head_slot(sync_service, Slot(0)) + + # Same setup as the block path but advanced to interval 1. + _replace_head_at_slot(sync_service, Slot(0)) + _add_block_at_slot(sync_service, Slot(10)) clock = SlotClock(genesis_time=Uint64(0), time_fn=lambda: _interval_time(10, 1)) service = ValidatorService( sync_service=sync_service, @@ -1500,44 +1576,53 @@ async def stop_on_sleep(_d: float) -> None: service.stop() with ( - patch.object( - PeerManager, - "get_network_head_slot", - return_value=Slot(10), - ), patch.object(ValidatorService, "_produce_attestations", mock_attest), patch("asyncio.sleep", new=stop_on_sleep), ): await service.run() + # Attestation skipped, slot retryable, block counter untouched. assert attest_calls == [] assert Slot(10) not in service._attested_slots - assert service.duties_skipped_lag >= 1 + assert service.attestations_skipped_lag >= 1 + assert service.blocks_skipped_lag == 0 - def test_gated_call_logs_structured_fields_and_increments_counter( + def test_gate_logs_only_on_transition( self, sync_service: SyncService, caplog: pytest.LogCaptureFixture ) -> None: - """When the gate fires, it logs duty/slot/head/lag/peer_max and bumps the counter.""" - _set_head_slot(sync_service, Slot(3)) - service = ValidatorService( - sync_service=sync_service, - clock=SlotClock(genesis_time=Uint64(0)), - registry=ValidatorRegistry(), - ) + """One INFO record per state change, not one per query. + + Fields recorded + --------------- + - duty + - slot + - head_slot + - lag + - max_seen_slot + """ - with ( - caplog.at_level("INFO"), - patch.object( - PeerManager, - "get_network_head_slot", - return_value=Slot(20), - ), - ): - gated = service._is_synced_for_duties(Slot(20), "block") + # Head at slot 3, fresh block at slot 20. + # Wall clock 20 puts lag at 17 with no stall escape. + _replace_head_at_slot(sync_service, Slot(3)) + _add_block_at_slot(sync_service, Slot(20)) + service = _build_gate_service(sync_service) + + with caplog.at_level("INFO"): + # Two consecutive queries: only the first is a transition. + first = service._is_synced_for_duties(Slot(20), "block") + second = service._is_synced_for_duties(Slot(20), "block") + + assert first is False + assert second is False - assert gated is False - assert [r.getMessage() for r in caplog.records] == [ - "Validator duty skipped due to sync lag: " - "duty=block slot=20 head_slot=3 lag=17 peer_max_head_slot=20" + # Exactly one closure record, with the expected fields. + transition_records = [ + r.getMessage() for r in caplog.records if "duty gate closed" in r.getMessage() ] - assert service.duties_skipped_lag == 1 + assert len(transition_records) == 1 + message = transition_records[0] + assert "duty=block" in message + assert "slot=20" in message + assert "head_slot=3" in message + assert "lag=17" in message + assert "max_seen_slot=20" in message From f3b4f12078bb261ccbf0c1ff693c141a3b1f2450 Mon Sep 17 00:00:00 2001 From: Thomas Coratger <60488569+tcoratger@users.noreply.github.com> Date: Mon, 11 May 2026 15:21:48 +0200 Subject: [PATCH 4/4] docs(validator): use first-person voice in threshold rationale Co-Authored-By: Claude Opus 4.7 (1M context) --- src/lean_spec/subspecs/validator/constants.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/lean_spec/subspecs/validator/constants.py b/src/lean_spec/subspecs/validator/constants.py index 5337be0ce..9e9a7345f 100644 --- a/src/lean_spec/subspecs/validator/constants.py +++ b/src/lean_spec/subspecs/validator/constants.py @@ -13,7 +13,7 @@ """Slot lag past which the local view is too stale to sign. Why: - Lean justifies and finalizes within a handful of slots. + We justify and finalize within a handful of slots. A 4-slot lag is one full justification window behind real time. A vote from that view lands on a subtree the network has left. """