From b58cb935cd46f68f2cd3b5b5cad96a8961226c8d Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 8 Apr 2026 20:36:23 +0000 Subject: [PATCH 01/18] Initial plan From 253619321fa717267c5874a21690ff251873843e Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 8 Apr 2026 20:59:29 +0000 Subject: [PATCH 02/18] fix: support reusing field-input components in event-driven flows Agent-Logs-Url: https://github.com/plugboard-dev/plugboard/sessions/b2d74c52-309e-40c7-bdd3-5880ba403b35 Co-authored-by: toby-coleman <13170610+toby-coleman@users.noreply.github.com> --- .../plugboard_schemas/_validation.py | 7 +- plugboard/component/component.py | 9 ++- .../test_process_with_components_run.py | 80 +++++++++++++++++++ tests/unit/test_process_validation.py | 15 ++++ 4 files changed, 108 insertions(+), 3 deletions(-) diff --git a/plugboard-schemas/plugboard_schemas/_validation.py b/plugboard-schemas/plugboard_schemas/_validation.py index 98eb886c..ee71c175 100644 --- a/plugboard-schemas/plugboard_schemas/_validation.py +++ b/plugboard-schemas/plugboard_schemas/_validation.py @@ -18,6 +18,9 @@ from ._validator_registry import validator +_SYSTEM_STOP_EVENT = "system_stop" + + def _build_component_graph( connectors: dict[str, dict[str, _t.Any]], ) -> dict[str, set[str]]: @@ -98,9 +101,11 @@ def validate_all_inputs_connected( for comp_name, comp_data in components.items(): io = comp_data.get("io", {}) all_inputs = set(io.get("inputs", [])) + input_events = set(io.get("input_events", [])) + has_non_system_input_events = bool(input_events - {_SYSTEM_STOP_EVENT}) connected = connected_inputs.get(comp_name, set()) unconnected = all_inputs - connected - if unconnected: + if unconnected and not has_non_system_input_events: errors.append(f"Component '{comp_name}' has unconnected inputs: {sorted(unconnected)}") return errors diff --git a/plugboard/component/component.py b/plugboard/component/component.py index 6fe0ad20..ef2d296d 100644 --- a/plugboard/component/component.py +++ b/plugboard/component/component.py @@ -365,6 +365,10 @@ async def _wrapper() -> None: def _has_field_inputs(self) -> bool: return len(self.io.inputs) > 0 + @property + def _has_connected_field_inputs(self) -> bool: + return len(self.io._input_channels) > 0 + @cached_property def _has_event_inputs(self) -> bool: input_events = set([evt.safe_type() for evt in self.io.input_events]) @@ -409,8 +413,9 @@ async def _io_read_with_status_check(self) -> None: task.cancel() for task in done: exc = task.exception() - if isinstance(exc, EventStreamClosedError) and len(self.io.inputs) == 0: + if isinstance(exc, EventStreamClosedError) and not self._has_connected_field_inputs: await self.io.close() # Call close for final wait and flush event buffer + raise IOStreamClosedError(str(exc)) from exc elif exc is not None: raise exc @@ -422,7 +427,7 @@ async def _periodic_status_check(self) -> None: # TODO : Eventually producer graph update will be event driven. For now, # : the update is performed periodically, so it's called here along # : with the status check. - if len(self.io.inputs) == 0: + if not self._has_connected_field_inputs: await self._update_producer_graph() async def _status_check(self) -> None: diff --git a/tests/integration/test_process_with_components_run.py b/tests/integration/test_process_with_components_run.py index fe047ae8..695f5dd8 100644 --- a/tests/integration/test_process_with_components_run.py +++ b/tests/integration/test_process_with_components_run.py @@ -23,6 +23,7 @@ ) from plugboard.events import Event from plugboard.exceptions import ConstraintError, NotInitialisedError, ProcessStatusError +from plugboard.library import FileWriter from plugboard.process import LocalProcess, Process, RayProcess from plugboard.schemas import ConnectorSpec, Status from tests.conftest import ComponentTestHelper, zmq_connector_cls @@ -459,6 +460,85 @@ async def test_event_driven_process_shutdown( await process.destroy() +class MessageEventData(BaseModel): + """Data for a file-writer event.""" + + message: str + + +class MessageEvent(Event): + """Event carrying a file-writer message.""" + + type: _t.ClassVar[str] = "message_event" + data: MessageEventData + + +class MessageEventGenerator(ComponentTestHelper): + """Produces a fixed number of message events.""" + + io = IO(output_events=[MessageEvent]) + + def __init__(self, iters: int, *args: _t.Any, **kwargs: _t.Any) -> None: + super().__init__(*args, **kwargs) + self._iters = iters + + async def init(self) -> None: + await super().init() + self._seq = iter(range(self._iters)) + + async def step(self) -> None: + try: + idx = next(self._seq) + except StopIteration: + await self.io.close() + else: + evt = MessageEvent( + source=self.name, + data=MessageEventData(message=f"Message {idx}"), + ) + self.io.queue_event(evt) + await super().step() + + +class EventReaderFileWriter(FileWriter): + """`FileWriter` variant that populates inputs from events.""" + + io = IO(input_events=[MessageEvent]) + + @MessageEvent.handler + async def handle_message(self, event: MessageEvent) -> None: + self.message = event.data.message + + +@pytest.mark.asyncio +async def test_event_driven_file_writer_reuse(tmp_path: Path) -> None: + """Test that field-input components can be reused in event-driven processes.""" + output_path = tmp_path / "output_messages.csv" + components = [ + MessageEventGenerator(iters=3, name="message_event_generator"), + EventReaderFileWriter( + path=output_path, + name="event_reader_file_writer", + field_names=["message"], + ), + ] + event_connectors = AsyncioConnector.builder().build_event_connectors(components) + process = LocalProcess(components=components, connectors=event_connectors) + + await process.init() + await process.run() + + assert process.status == Status.COMPLETED + assert output_path.read_text().splitlines() == [ + "message", + "Message 0", + "Message 1", + "Message 2", + ] + + await process.destroy() + + _SHORT_TIMEOUT = 0.1 diff --git a/tests/unit/test_process_validation.py b/tests/unit/test_process_validation.py index 02e0a4d2..b0ec7482 100644 --- a/tests/unit/test_process_validation.py +++ b/tests/unit/test_process_validation.py @@ -303,6 +303,21 @@ def test_no_inputs_no_errors(self) -> None: errors = validate_all_inputs_connected(pd) assert errors == [] + def test_missing_inputs_allowed_for_event_driven_component_reuse(self) -> None: + """Unconnected inputs are allowed when non-system input events can populate them.""" + pd = _make_process_dict( + components={ + "producer": _make_component("producer", output_events=["message_event"]), + "writer": _make_component( + "writer", + inputs=["message"], + input_events=["system_stop", "message_event"], + ), + }, + ) + errors = validate_all_inputs_connected(pd) + assert errors == [] + # --------------------------------------------------------------------------- # Tests for validate_input_events From e7a822a7d3da0b079897c3260250e14be4a06da5 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 8 Apr 2026 21:02:30 +0000 Subject: [PATCH 03/18] chore: address review feedback on event-driven reuse fix Agent-Logs-Url: https://github.com/plugboard-dev/plugboard/sessions/b2d74c52-309e-40c7-bdd3-5880ba403b35 Co-authored-by: toby-coleman <13170610+toby-coleman@users.noreply.github.com> --- plugboard/component/component.py | 3 ++- tests/integration/test_process_with_components_run.py | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/plugboard/component/component.py b/plugboard/component/component.py index ef2d296d..2443f087 100644 --- a/plugboard/component/component.py +++ b/plugboard/component/component.py @@ -345,6 +345,8 @@ async def _wrapper() -> None: with self._job_id_ctx(): await self._set_status(Status.RUNNING, publish=not self._is_running) await self._io_read_with_status_check() + if self.io.is_closed: + return await self._handle_events() self._bind_inputs() if self._can_step: @@ -415,7 +417,6 @@ async def _io_read_with_status_check(self) -> None: exc = task.exception() if isinstance(exc, EventStreamClosedError) and not self._has_connected_field_inputs: await self.io.close() # Call close for final wait and flush event buffer - raise IOStreamClosedError(str(exc)) from exc elif exc is not None: raise exc diff --git a/tests/integration/test_process_with_components_run.py b/tests/integration/test_process_with_components_run.py index 695f5dd8..95673fae 100644 --- a/tests/integration/test_process_with_components_run.py +++ b/tests/integration/test_process_with_components_run.py @@ -501,7 +501,7 @@ async def step(self) -> None: class EventReaderFileWriter(FileWriter): - """`FileWriter` variant that populates inputs from events.""" + """`FileWriter` variant that reuses the `message` input by populating it from events.""" io = IO(input_events=[MessageEvent]) From 5e3902396c2103e2bd9a6442bb52bc5f07b332f1 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 8 Apr 2026 21:04:57 +0000 Subject: [PATCH 04/18] docs: clarify event-driven reuse shutdown handling Agent-Logs-Url: https://github.com/plugboard-dev/plugboard/sessions/b2d74c52-309e-40c7-bdd3-5880ba403b35 Co-authored-by: toby-coleman <13170610+toby-coleman@users.noreply.github.com> --- plugboard/component/component.py | 2 ++ tests/integration/test_process_with_components_run.py | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/plugboard/component/component.py b/plugboard/component/component.py index 2443f087..e99f6481 100644 --- a/plugboard/component/component.py +++ b/plugboard/component/component.py @@ -345,6 +345,8 @@ async def _wrapper() -> None: with self._job_id_ctx(): await self._set_status(Status.RUNNING, publish=not self._is_running) await self._io_read_with_status_check() + # Event-driven consumers can close here when their producer graph is exhausted. + # Return before rebinding inputs so stale field values are not replayed. if self.io.is_closed: return await self._handle_events() diff --git a/tests/integration/test_process_with_components_run.py b/tests/integration/test_process_with_components_run.py index 95673fae..7e549708 100644 --- a/tests/integration/test_process_with_components_run.py +++ b/tests/integration/test_process_with_components_run.py @@ -501,7 +501,7 @@ async def step(self) -> None: class EventReaderFileWriter(FileWriter): - """`FileWriter` variant that reuses the `message` input by populating it from events.""" + """`FileWriter` variant that reuses the inherited `message` input via events.""" io = IO(input_events=[MessageEvent]) From 544d5c8deae2094e0612ddddb854b4e95f8ef8ca Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 8 Apr 2026 21:07:20 +0000 Subject: [PATCH 05/18] docs: clarify event-driven reuse test intent Agent-Logs-Url: https://github.com/plugboard-dev/plugboard/sessions/b2d74c52-309e-40c7-bdd3-5880ba403b35 Co-authored-by: toby-coleman <13170610+toby-coleman@users.noreply.github.com> --- plugboard/component/component.py | 1 + tests/integration/test_process_with_components_run.py | 4 ++-- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/plugboard/component/component.py b/plugboard/component/component.py index e99f6481..b6b1b6ad 100644 --- a/plugboard/component/component.py +++ b/plugboard/component/component.py @@ -371,6 +371,7 @@ def _has_field_inputs(self) -> bool: @property def _has_connected_field_inputs(self) -> bool: + """Whether any declared field inputs are connected via input channels.""" return len(self.io._input_channels) > 0 @cached_property diff --git a/tests/integration/test_process_with_components_run.py b/tests/integration/test_process_with_components_run.py index 7e549708..6b2ec352 100644 --- a/tests/integration/test_process_with_components_run.py +++ b/tests/integration/test_process_with_components_run.py @@ -461,7 +461,7 @@ async def test_event_driven_process_shutdown( class MessageEventData(BaseModel): - """Data for a file-writer event.""" + """Data for a message event.""" message: str @@ -501,7 +501,7 @@ async def step(self) -> None: class EventReaderFileWriter(FileWriter): - """`FileWriter` variant that reuses the inherited `message` input via events.""" + """`FileWriter` variant that relies on IO inheritance to reuse `message` via events.""" io = IO(input_events=[MessageEvent]) From b3c4e16caa6f20eaa99fac9348b3d587e9cd59a1 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 8 Apr 2026 21:09:58 +0000 Subject: [PATCH 06/18] refactor: expose connected input state on io controller Agent-Logs-Url: https://github.com/plugboard-dev/plugboard/sessions/b2d74c52-309e-40c7-bdd3-5880ba403b35 Co-authored-by: toby-coleman <13170610+toby-coleman@users.noreply.github.com> --- plugboard/component/component.py | 5 +++-- plugboard/component/io_controller.py | 5 +++++ tests/integration/test_process_with_components_run.py | 2 +- 3 files changed, 9 insertions(+), 3 deletions(-) diff --git a/plugboard/component/component.py b/plugboard/component/component.py index b6b1b6ad..84295e35 100644 --- a/plugboard/component/component.py +++ b/plugboard/component/component.py @@ -346,7 +346,8 @@ async def _wrapper() -> None: await self._set_status(Status.RUNNING, publish=not self._is_running) await self._io_read_with_status_check() # Event-driven consumers can close here when their producer graph is exhausted. - # Return before rebinding inputs so stale field values are not replayed. + # Return before rebinding inputs so the last event-populated field values are not + # replayed as if they were fresh inputs in another step. if self.io.is_closed: return await self._handle_events() @@ -372,7 +373,7 @@ def _has_field_inputs(self) -> bool: @property def _has_connected_field_inputs(self) -> bool: """Whether any declared field inputs are connected via input channels.""" - return len(self.io._input_channels) > 0 + return self.io.has_connected_field_inputs @cached_property def _has_event_inputs(self) -> bool: diff --git a/plugboard/component/io_controller.py b/plugboard/component/io_controller.py index 7500aee2..8590ada6 100644 --- a/plugboard/component/io_controller.py +++ b/plugboard/component/io_controller.py @@ -86,6 +86,11 @@ def is_closed(self) -> bool: """Returns `True` if the `IOController` is closed, `False` otherwise.""" return self._is_closed + @property + def has_connected_field_inputs(self) -> bool: + """Returns whether any field inputs are connected via channels.""" + return len(self._input_channels) > 0 + @cached_property def _has_field_inputs(self) -> bool: return len(self._input_channels) > 0 diff --git a/tests/integration/test_process_with_components_run.py b/tests/integration/test_process_with_components_run.py index 6b2ec352..8f48a2dc 100644 --- a/tests/integration/test_process_with_components_run.py +++ b/tests/integration/test_process_with_components_run.py @@ -501,7 +501,7 @@ async def step(self) -> None: class EventReaderFileWriter(FileWriter): - """`FileWriter` variant that relies on IO inheritance to reuse `message` via events.""" + """`FileWriter` variant that adds event handling instead of a connector for `message`.""" io = IO(input_events=[MessageEvent]) From e4047cbcd561ad9453ab32077f3904601b272422 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 8 Apr 2026 21:12:18 +0000 Subject: [PATCH 07/18] refactor: reuse connected input property internally Agent-Logs-Url: https://github.com/plugboard-dev/plugboard/sessions/b2d74c52-309e-40c7-bdd3-5880ba403b35 Co-authored-by: toby-coleman <13170610+toby-coleman@users.noreply.github.com> --- plugboard/component/component.py | 2 +- plugboard/component/io_controller.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/plugboard/component/component.py b/plugboard/component/component.py index 84295e35..4bd246b0 100644 --- a/plugboard/component/component.py +++ b/plugboard/component/component.py @@ -345,7 +345,7 @@ async def _wrapper() -> None: with self._job_id_ctx(): await self._set_status(Status.RUNNING, publish=not self._is_running) await self._io_read_with_status_check() - # Event-driven consumers can close here when their producer graph is exhausted. + # IO can close here when an event-only producer graph is exhausted. # Return before rebinding inputs so the last event-populated field values are not # replayed as if they were fresh inputs in another step. if self.io.is_closed: diff --git a/plugboard/component/io_controller.py b/plugboard/component/io_controller.py index 8590ada6..52e65369 100644 --- a/plugboard/component/io_controller.py +++ b/plugboard/component/io_controller.py @@ -91,9 +91,9 @@ def has_connected_field_inputs(self) -> bool: """Returns whether any field inputs are connected via channels.""" return len(self._input_channels) > 0 - @cached_property + @property def _has_field_inputs(self) -> bool: - return len(self._input_channels) > 0 + return self.has_connected_field_inputs @cached_property def _has_event_inputs(self) -> bool: From b442ee045af357f8a1faf9079ff63e844f782bdb Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 8 Apr 2026 21:14:31 +0000 Subject: [PATCH 08/18] refactor: simplify connected field input checks Agent-Logs-Url: https://github.com/plugboard-dev/plugboard/sessions/b2d74c52-309e-40c7-bdd3-5880ba403b35 Co-authored-by: toby-coleman <13170610+toby-coleman@users.noreply.github.com> --- plugboard/component/component.py | 6 +++--- plugboard/component/io_controller.py | 10 +++------- 2 files changed, 6 insertions(+), 10 deletions(-) diff --git a/plugboard/component/component.py b/plugboard/component/component.py index 4bd246b0..35a2a4c6 100644 --- a/plugboard/component/component.py +++ b/plugboard/component/component.py @@ -345,9 +345,9 @@ async def _wrapper() -> None: with self._job_id_ctx(): await self._set_status(Status.RUNNING, publish=not self._is_running) await self._io_read_with_status_check() - # IO can close here when an event-only producer graph is exhausted. - # Return before rebinding inputs so the last event-populated field values are not - # replayed as if they were fresh inputs in another step. + # IO can close here once all producers for the component's event-only inputs have + # finished emitting. Return before rebinding inputs so the last event-populated + # field values are not replayed as if they were fresh inputs in another step. if self.io.is_closed: return await self._handle_events() diff --git a/plugboard/component/io_controller.py b/plugboard/component/io_controller.py index 52e65369..5ac67f7c 100644 --- a/plugboard/component/io_controller.py +++ b/plugboard/component/io_controller.py @@ -91,17 +91,13 @@ def has_connected_field_inputs(self) -> bool: """Returns whether any field inputs are connected via channels.""" return len(self._input_channels) > 0 - @property - def _has_field_inputs(self) -> bool: - return self.has_connected_field_inputs - @cached_property def _has_event_inputs(self) -> bool: return len(self._input_event_channels) > 0 @cached_property def _has_inputs(self) -> bool: - return self._has_field_inputs or self._has_event_inputs + return self.has_connected_field_inputs or self._has_event_inputs async def read(self, timeout: float | None = None) -> None: """Reads data and/or events from input channels. @@ -144,7 +140,7 @@ async def read(self, timeout: float | None = None) -> None: def _set_read_tasks(self) -> list[asyncio.Task]: read_tasks: list[asyncio.Task] = [] - if self._has_field_inputs: + if self.has_connected_field_inputs: if _fields_read_task not in self._read_tasks: read_fields_task = asyncio.create_task(self._read_fields(), name=_fields_read_task) self._read_tasks[_fields_read_task] = read_fields_task @@ -379,7 +375,7 @@ def _add_channel_for_event( def _create_input_field_group_tasks(self) -> None: """Groups input field channels by field name and launches read tasks for group inputs.""" - if not self._has_field_inputs: + if not self.has_connected_field_inputs: return field_channels: dict[str, list[tuple[_t_field_key, Channel]]] = defaultdict(list) for key, chan in self._input_channels.items(): From 21b8ea46c69139b2439cd2159eebf588046b0133 Mon Sep 17 00:00:00 2001 From: Chris Knight Date: Sun, 12 Apr 2026 19:42:48 +0200 Subject: [PATCH 09/18] fix: Modifies DataWriter logic to only append new data --- plugboard/component/component.py | 5 ----- plugboard/library/data_writer.py | 7 ++++--- 2 files changed, 4 insertions(+), 8 deletions(-) diff --git a/plugboard/component/component.py b/plugboard/component/component.py index 35a2a4c6..d4080ca5 100644 --- a/plugboard/component/component.py +++ b/plugboard/component/component.py @@ -345,11 +345,6 @@ async def _wrapper() -> None: with self._job_id_ctx(): await self._set_status(Status.RUNNING, publish=not self._is_running) await self._io_read_with_status_check() - # IO can close here once all producers for the component's event-only inputs have - # finished emitting. Return before rebinding inputs so the last event-populated - # field values are not replayed as if they were fresh inputs in another step. - if self.io.is_closed: - return await self._handle_events() self._bind_inputs() if self._can_step: diff --git a/plugboard/library/data_writer.py b/plugboard/library/data_writer.py index 96d14538..d0651914 100644 --- a/plugboard/library/data_writer.py +++ b/plugboard/library/data_writer.py @@ -76,9 +76,10 @@ async def _convert(self, data: dict[str, deque]) -> _t.Any: def _bind_inputs(self) -> None: """Binds input fields to component fields and append to internal buffer.""" super()._bind_inputs() - for field in self.io.inputs: - value = getattr(self, field, None) - self._buffer[field].append(value) + if self._field_inputs_ready: + for field in self.io.inputs: + value = getattr(self, field, None) + self._buffer[field].append(value) async def _save_chunk(self) -> None: """Write data from the buffer.""" From f763d86ee261d87318fb2247b48dce8fd15a4b88 Mon Sep 17 00:00:00 2001 From: Chris Knight Date: Sun, 12 Apr 2026 20:25:36 +0200 Subject: [PATCH 10/18] refactor: Separates field data tracker reset into new method for easier extension --- plugboard/component/component.py | 5 ++++- plugboard/library/data_writer.py | 7 +++---- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/plugboard/component/component.py b/plugboard/component/component.py index d4080ca5..be95951a 100644 --- a/plugboard/component/component.py +++ b/plugboard/component/component.py @@ -356,7 +356,7 @@ async def _wrapper() -> None: raise e self._bind_outputs() await self.io.write() - self._field_inputs_ready = False + self._reset_input_trackers() await self._set_status(Status.WAITING, publish=not self._is_running) return _wrapper @@ -461,7 +461,10 @@ def _bind_inputs(self) -> None: field_default = getattr(self, field, None) value = self._field_inputs.get(field, field_default) setattr(self, field, value) + + def _reset_input_trackers(self) -> None: self._field_inputs = {} + self._field_inputs_ready = False def _bind_outputs(self) -> None: """Binds component fields to output fields.""" diff --git a/plugboard/library/data_writer.py b/plugboard/library/data_writer.py index d0651914..8035fee5 100644 --- a/plugboard/library/data_writer.py +++ b/plugboard/library/data_writer.py @@ -76,10 +76,9 @@ async def _convert(self, data: dict[str, deque]) -> _t.Any: def _bind_inputs(self) -> None: """Binds input fields to component fields and append to internal buffer.""" super()._bind_inputs() - if self._field_inputs_ready: - for field in self.io.inputs: - value = getattr(self, field, None) - self._buffer[field].append(value) + for field in self._field_inputs: + value = getattr(self, field, None) + self._buffer[field].append(value) async def _save_chunk(self) -> None: """Write data from the buffer.""" From 675e8de56044fa861271c1c1764f5264f210482e Mon Sep 17 00:00:00 2001 From: Chris Knight Date: Sun, 12 Apr 2026 20:37:28 +0200 Subject: [PATCH 11/18] refactor: Adapts DataWriter for trickling input data --- plugboard/library/data_writer.py | 24 +++++++++++++++++++----- 1 file changed, 19 insertions(+), 5 deletions(-) diff --git a/plugboard/library/data_writer.py b/plugboard/library/data_writer.py index 8035fee5..b3051a01 100644 --- a/plugboard/library/data_writer.py +++ b/plugboard/library/data_writer.py @@ -43,7 +43,8 @@ def __init__( **kwargs: Additional keyword arguments for [`Component`][plugboard.component.Component]. """ super().__init__(**kwargs) - self._buffer: dict[str, deque] = defaultdict(deque) + self._accum_buffer: dict[str, deque] = defaultdict(deque) + self._write_buffer: dict[str, deque] = defaultdict(deque) self._chunk_size = chunk_size self.io = IOController( inputs=field_names, @@ -54,6 +55,8 @@ def __init__( component=self, ) self._task: _t.Optional[Task] = None + self._pending_fields: set[str] = set(field_names) + self._new_row_added: bool = False def __init_subclass__(cls, *args: _t.Any, **kwargs: _t.Any) -> None: try: @@ -78,20 +81,31 @@ def _bind_inputs(self) -> None: super()._bind_inputs() for field in self._field_inputs: value = getattr(self, field, None) - self._buffer[field].append(value) + self._accum_buffer[field].append(value) + self._pending_fields.discard(field) + if not self._pending_fields: + for field in self.io.inputs: + self._write_buffer[field].append(self._accum_buffer[field].popleft()) + if len(self._accum_buffer[field]) == 0: + self._pending_fields.add(field) + self._new_row_added = True + + @property + def _can_step(self) -> bool: + return self._new_row_added async def _save_chunk(self) -> None: """Write data from the buffer.""" if self._task is not None: await self._task # Create task to save next chunk of data - chunk = await self._convert(self._buffer) + chunk = await self._convert(self._write_buffer) self._task = asyncio.create_task(self._save(chunk)) - self._buffer = defaultdict(deque) + self._write_buffer = defaultdict(deque) async def step(self) -> None: """Trigger save when buffer is at target size.""" - if self._chunk_size and len(self._buffer[self.io.inputs[0]]) >= self._chunk_size: + if self._chunk_size and len(self._write_buffer[self.io.inputs[0]]) >= self._chunk_size: await self._save_chunk() async def run(self) -> None: From 7fe735be31e17d5ff3dcd4bdc5cfcf3798511991 Mon Sep 17 00:00:00 2001 From: Chris Knight Date: Sun, 12 Apr 2026 20:49:29 +0200 Subject: [PATCH 12/18] refactor: Rework DataWriter logic --- plugboard/component/component.py | 2 +- plugboard/library/data_writer.py | 42 +++++++++++++++++++------------- 2 files changed, 26 insertions(+), 18 deletions(-) diff --git a/plugboard/component/component.py b/plugboard/component/component.py index be95951a..2db9e92b 100644 --- a/plugboard/component/component.py +++ b/plugboard/component/component.py @@ -460,7 +460,7 @@ def _bind_inputs(self) -> None: for field in self.io.inputs: field_default = getattr(self, field, None) value = self._field_inputs.get(field, field_default) - setattr(self, field, value) + super().__setattr__(field, value) def _reset_input_trackers(self) -> None: self._field_inputs = {} diff --git a/plugboard/library/data_writer.py b/plugboard/library/data_writer.py index b3051a01..1c0dbf72 100644 --- a/plugboard/library/data_writer.py +++ b/plugboard/library/data_writer.py @@ -43,8 +43,8 @@ def __init__( **kwargs: Additional keyword arguments for [`Component`][plugboard.component.Component]. """ super().__init__(**kwargs) - self._accum_buffer: dict[str, deque] = defaultdict(deque) - self._write_buffer: dict[str, deque] = defaultdict(deque) + # Use a single buffer to track everything + self._buffer: dict[str, deque] = defaultdict(deque) self._chunk_size = chunk_size self.io = IOController( inputs=field_names, @@ -55,8 +55,6 @@ def __init__( component=self, ) self._task: _t.Optional[Task] = None - self._pending_fields: set[str] = set(field_names) - self._new_row_added: bool = False def __init_subclass__(cls, *args: _t.Any, **kwargs: _t.Any) -> None: try: @@ -81,27 +79,37 @@ def _bind_inputs(self) -> None: super()._bind_inputs() for field in self._field_inputs: value = getattr(self, field, None) - self._accum_buffer[field].append(value) - self._pending_fields.discard(field) - if not self._pending_fields: - for field in self.io.inputs: - self._write_buffer[field].append(self._accum_buffer[field].popleft()) - if len(self._accum_buffer[field]) == 0: - self._pending_fields.add(field) - self._new_row_added = True + self._buffer[field].append(value) + + @property + def _completed_rows(self) -> int: + """Calculates how many fully formed rows exist in the buffer.""" + if not self.io.inputs: + return 0 + return min((len(self._buffer[f]) for f in self.io.inputs), default=0) @property def _can_step(self) -> bool: - return self._new_row_added + """We can step if we have at least one fully formed row.""" + return self._completed_rows > 0 async def _save_chunk(self) -> None: - """Write data from the buffer.""" + """Write completed data rows from the buffer.""" + completed_rows = self._completed_rows + if completed_rows == 0: + return + if self._task is not None: await self._task - # Create task to save next chunk of data - chunk = await self._convert(self._write_buffer) + + # Extract only the completed rows into a new chunk + chunk_data = { + field: deque([self._buffer[field].popleft() for _ in range(completed_rows)]) + for field in self.io.inputs + } + + chunk = await self._convert(chunk_data) self._task = asyncio.create_task(self._save(chunk)) - self._write_buffer = defaultdict(deque) async def step(self) -> None: """Trigger save when buffer is at target size.""" From 5fad40526f79938c5ea49f1466fed52dbf0770c8 Mon Sep 17 00:00:00 2001 From: Chris Knight Date: Thu, 16 Apr 2026 10:14:04 +0200 Subject: [PATCH 13/18] fixup! refactor: Rework DataWriter logic --- plugboard/library/data_writer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugboard/library/data_writer.py b/plugboard/library/data_writer.py index 1c0dbf72..f281a531 100644 --- a/plugboard/library/data_writer.py +++ b/plugboard/library/data_writer.py @@ -113,7 +113,7 @@ async def _save_chunk(self) -> None: async def step(self) -> None: """Trigger save when buffer is at target size.""" - if self._chunk_size and len(self._write_buffer[self.io.inputs[0]]) >= self._chunk_size: + if self._chunk_size and len(self._buffer[self.io.inputs[0]]) >= self._chunk_size: await self._save_chunk() async def run(self) -> None: From 8ca62989889453627acff62a3f8d808d869d7f45 Mon Sep 17 00:00:00 2001 From: Chris Knight Date: Thu, 16 Apr 2026 11:16:31 +0200 Subject: [PATCH 14/18] test: Adds test for staggered event driven data arrival --- .../test_process_with_components_run.py | 103 +++++++++++++++++- 1 file changed, 101 insertions(+), 2 deletions(-) diff --git a/tests/integration/test_process_with_components_run.py b/tests/integration/test_process_with_components_run.py index 8f48a2dc..2a16c41f 100644 --- a/tests/integration/test_process_with_components_run.py +++ b/tests/integration/test_process_with_components_run.py @@ -478,15 +478,29 @@ class MessageEventGenerator(ComponentTestHelper): io = IO(output_events=[MessageEvent]) - def __init__(self, iters: int, *args: _t.Any, **kwargs: _t.Any) -> None: + def __init__( + self, + iters: int, + *args: _t.Any, + delay: float = 0.0, + start: int = 0, + stride: int = 1, + **kwargs: _t.Any, + ) -> None: super().__init__(*args, **kwargs) self._iters = iters + self._delay = delay + self._start = start + self._stride = stride async def init(self) -> None: await super().init() - self._seq = iter(range(self._iters)) + self._seq = iter(range(self._start, self._start + self._iters * self._stride, self._stride)) async def step(self) -> None: + # Optional delay to simulate staggered event arrival + if self._delay > 0.0: + await asyncio.sleep(self._delay) try: idx = next(self._seq) except StopIteration: @@ -616,3 +630,88 @@ async def test_constraint_error_stops_background_status_check() -> None: ) await process.destroy() + + +class StaggeredEventFileWriter(FileWriter): + """`FileWriter` variant that adds event handling instead of a connector for `message`.""" + + io = IO(input_events=[MessageEvent]) + + def __init__(self, *args: _t.Any, field_names: list[str], **kwargs: _t.Any) -> None: + super().__init__(*args, field_names=field_names, **kwargs) + self.step_count: int = 0 + self.step_for_message: dict[str, int] = {} + + @MessageEvent.handler + async def handle_message(self, event: MessageEvent) -> None: + msg = event.data.message + match event.source: + case "mg1": + self.mg1 = msg + case "mg2": + self.mg2 = msg + case "mg3": + self.mg3 = msg + case _: + raise ValueError(f"Unexpected event source: {event.source}") + self.step_for_message[msg] = self.step_count + self.step_count += 1 + + +@pytest.mark.asyncio +@pytest_cases.parametrize( + "process_cls, connector_cls", + [ + (LocalProcess, AsyncioConnector), + ], +) +async def test_data_writer_handles_staggered_input_events( + process_cls: type[Process], connector_cls: type[Connector], tmp_path: Path, ray_ctx: None +) -> None: + """Test that a FileWriter can handle input events arriving in different steps. + + Input messages with data for different fields may arrive in different steps. The FileWriter + should write out a new row only when all required fields have received data, and should not + overwrite field values if only a subset of fields receive new data in a step. + """ + output_path = tmp_path / "staggered_output_messages.csv" + + writer = StaggeredEventFileWriter( + path=output_path, field_names=["mg1", "mg2", "mg3"], name="writer" + ) + components = [ + # 3 inputs with different delays + MessageEventGenerator(iters=10, delay=0.005, start=0, stride=3, name="mg1"), + MessageEventGenerator(iters=10, delay=0.010, start=1, stride=3, name="mg2"), + MessageEventGenerator(iters=10, delay=0.020, start=2, stride=3, name="mg3"), + writer, + ] + + async with process_cls( + components=components, + connectors=AsyncioConnector.builder().build_event_connectors(components), + ) as process: + await process.run() + + with output_path.open() as f: + content = f.read().splitlines() + + assert len(content) == 11 # header + 10 rows of data + assert content[0] == "mg1,mg2,mg3" + assert content[1] == "Message 0,Message 1,Message 2" + assert content[2] == "Message 3,Message 4,Message 5" + assert content[3] == "Message 6,Message 7,Message 8" + assert content[4] == "Message 9,Message 10,Message 11" + assert content[5] == "Message 12,Message 13,Message 14" + assert content[6] == "Message 15,Message 16,Message 17" + assert content[7] == "Message 18,Message 19,Message 20" + assert content[8] == "Message 21,Message 22,Message 23" + assert content[9] == "Message 24,Message 25,Message 26" + assert content[10] == "Message 27,Message 28,Message 29" + + # Verify that messages from different generators were received in different steps + assert writer.step_count == 30 + assert len(writer.step_for_message) == 30 + assert len(set(writer.step_for_message.values())) == 30, ( + "Expected each message to be received in a different step" + ) From a4a2035b1b370c83dce697af447d02624dd96d97 Mon Sep 17 00:00:00 2001 From: Chris Knight Date: Tue, 28 Apr 2026 21:29:30 +0200 Subject: [PATCH 15/18] feat: Adds `populates_fields` kwarg to Event.handler --- plugboard/events/event.py | 21 ++++++++++++++- plugboard/events/event_handlers.py | 41 ++++++++++++++++++++++++++---- 2 files changed, 56 insertions(+), 6 deletions(-) diff --git a/plugboard/events/event.py b/plugboard/events/event.py index 2becb56d..858e26a5 100644 --- a/plugboard/events/event.py +++ b/plugboard/events/event.py @@ -75,9 +75,28 @@ def safe_type(cls, event_type: _t.Optional[str] = None) -> str: """Returns a safe event type string for use in broker topic strings.""" return (event_type or cls.type).replace(".", "_").replace("-", "_") + @_t.overload @classmethod - def handler(cls, method: AsyncCallable) -> AsyncCallable: + def handler(cls, method: AsyncCallable) -> AsyncCallable: ... + + @_t.overload + @classmethod + def handler( + cls, *, populates_fields: _t.Optional[list[str]] = None + ) -> _t.Callable[[AsyncCallable], AsyncCallable]: ... + + @classmethod + def handler( + cls, + method: _t.Optional[AsyncCallable] = None, + *, + populates_fields: _t.Optional[list[str]] = None, + ) -> _t.Union[AsyncCallable, _t.Callable[[AsyncCallable], AsyncCallable]]: """Registers a class method as an event handler.""" + if method is None: + # Invoked as @Event.handler(populates_fields=[...]) + return EventHandlers.add(cls, populates_fields=populates_fields) + # Invoked as @Event.handler return EventHandlers.add(cls)(method) diff --git a/plugboard/events/event_handlers.py b/plugboard/events/event_handlers.py index 344522ce..b190e06d 100644 --- a/plugboard/events/event_handlers.py +++ b/plugboard/events/event_handlers.py @@ -16,13 +16,19 @@ class EventHandlers: # pragma: no cover """`EventHandlers` provides a decorator for registering event handlers.""" _handlers: _t.ClassVar[dict[str, dict[str, AsyncCallable]]] = defaultdict(dict) + _handler_field_coverage: _t.ClassVar[dict[str, dict[str, list[str]]]] = defaultdict(dict) @classmethod - def add(cls, event: _t.Type[Event] | Event) -> _t.Callable[[AsyncCallable], AsyncCallable]: + def add( + cls, + event: _t.Type[Event] | Event, + populates_fields: _t.Optional[list[str]] = None, + ) -> _t.Callable[[AsyncCallable], AsyncCallable]: """Decorator that registers class methods as handlers for specific event types. Args: event: Event class this handler processes + populates_fields: Optional list of fields that the handler populates Returns: Callable: Decorated method @@ -31,6 +37,8 @@ def add(cls, event: _t.Type[Event] | Event) -> _t.Callable[[AsyncCallable], Asyn def decorator(method: AsyncCallable) -> AsyncCallable: class_path = cls._get_class_path_for_method(method) cls._handlers[class_path][event.type] = method + if populates_fields is not None: + cls._handler_field_coverage[class_path][event.type] = populates_fields return method return decorator @@ -43,6 +51,12 @@ def _get_class_path_for_method(method: AsyncCallable) -> str: class_name = qualname_parts[-2] # Last part is the method name return f"{module_name}.{class_name}" + @staticmethod + def _iter_mro(_class: _t.Type) -> _t.Iterator[str]: + """Iterate over class MRO, yielding fully qualified class paths.""" + for base_class in _class.__mro__: + yield f"{base_class.__module__}.{base_class.__name__}" + @classmethod def get(cls, _class: _t.Type, event: _t.Type[Event] | Event) -> AsyncCallable: """Retrieve a handler for a specific class and event type. @@ -57,10 +71,27 @@ def get(cls, _class: _t.Type, event: _t.Type[Event] | Event) -> AsyncCallable: Raises: KeyError: If no handler found for class or event type """ - for base_class in _class.__mro__: - base_path = f"{base_class.__module__}.{base_class.__name__}" - if base_path in cls._handlers and event.type in cls._handlers[base_path]: - return cls._handlers[base_path][event.type] + store = cls._handlers + for base_path in cls._iter_mro(_class): + if base_path in store and event.type in store[base_path]: + return store[base_path][event.type] raise KeyError( f"No handler found for class '{_class.__name__}' and event type '{event.type}'" ) + + @classmethod + def get_field_coverage(cls, _class: _t.Type, event: _t.Type[Event] | Event) -> list[str]: + """Retrieve the fields populated by the handler for a specific class and event type. + + Args: + _class: Class to handle event for + event: Event class or instance to handle + + Returns: + list[str]: List of fields populated by the handler + """ + store = cls._handler_field_coverage + for base_path in cls._iter_mro(_class): + if base_path in store and event.type in store[base_path]: + return store[base_path][event.type] + return [] From 297a9b389ff233e6cd5b3ac30e30fd3a85d6a5c0 Mon Sep 17 00:00:00 2001 From: Chris Knight Date: Wed, 29 Apr 2026 21:07:27 +0200 Subject: [PATCH 16/18] chore: Modifies input connection logic for event covered fields --- .../plugboard_schemas/_validation.py | 27 ++++++++++++++++--- 1 file changed, 24 insertions(+), 3 deletions(-) diff --git a/plugboard-schemas/plugboard_schemas/_validation.py b/plugboard-schemas/plugboard_schemas/_validation.py index ee71c175..cbb9bc91 100644 --- a/plugboard-schemas/plugboard_schemas/_validation.py +++ b/plugboard-schemas/plugboard_schemas/_validation.py @@ -14,6 +14,8 @@ from collections import defaultdict import typing as _t +from plugboard.events.event_handlers import EventHandlers + from ._graph import simple_cycles from ._validator_registry import validator @@ -74,6 +76,24 @@ def _get_edges_in_cycle( return cycle_edges +def _get_event_field_coverage(comp_data: dict[str, _t.Any]) -> set[str]: + """Get the fields populated by the handler for a specific component and event type.""" + io = comp_data.get("io", {}) + try: + comp_cls_path = comp_data["__export"]["type"] + except KeyError as e: + raise ValueError(f"Component data missing '__export.type': {comp_data}") from e + input_events = set(io.get("input_events", [])) + non_system_input_events = input_events - {_SYSTEM_STOP_EVENT} + event_covered_fields = set().union( + *[ + EventHandlers.get_field_coverage(comp_cls_path, evt_type) + for evt_type in non_system_input_events + ] + ) + return event_covered_fields + + @validator def validate_all_inputs_connected( process_dict: dict[str, _t.Any], @@ -101,11 +121,12 @@ def validate_all_inputs_connected( for comp_name, comp_data in components.items(): io = comp_data.get("io", {}) all_inputs = set(io.get("inputs", [])) - input_events = set(io.get("input_events", [])) - has_non_system_input_events = bool(input_events - {_SYSTEM_STOP_EVENT}) connected = connected_inputs.get(comp_name, set()) unconnected = all_inputs - connected - if unconnected and not has_non_system_input_events: + if unconnected: + event_covered_fields = _get_event_field_coverage(comp_data) + unconnected -= event_covered_fields + if unconnected: errors.append(f"Component '{comp_name}' has unconnected inputs: {sorted(unconnected)}") return errors From 68d31e79a33a43b402d6be1dd7570c045e9f86e1 Mon Sep 17 00:00:00 2001 From: Chris Knight Date: Wed, 29 Apr 2026 21:33:31 +0200 Subject: [PATCH 17/18] refactor: Alternate approach to track event field coverage --- .../plugboard_schemas/_validation.py | 22 +-------------- plugboard/component/component.py | 1 + plugboard/component/io_controller.py | 3 ++ plugboard/events/event_handlers.py | 28 ++++++------------- 4 files changed, 13 insertions(+), 41 deletions(-) diff --git a/plugboard-schemas/plugboard_schemas/_validation.py b/plugboard-schemas/plugboard_schemas/_validation.py index cbb9bc91..e7b6bf93 100644 --- a/plugboard-schemas/plugboard_schemas/_validation.py +++ b/plugboard-schemas/plugboard_schemas/_validation.py @@ -14,8 +14,6 @@ from collections import defaultdict import typing as _t -from plugboard.events.event_handlers import EventHandlers - from ._graph import simple_cycles from ._validator_registry import validator @@ -76,24 +74,6 @@ def _get_edges_in_cycle( return cycle_edges -def _get_event_field_coverage(comp_data: dict[str, _t.Any]) -> set[str]: - """Get the fields populated by the handler for a specific component and event type.""" - io = comp_data.get("io", {}) - try: - comp_cls_path = comp_data["__export"]["type"] - except KeyError as e: - raise ValueError(f"Component data missing '__export.type': {comp_data}") from e - input_events = set(io.get("input_events", [])) - non_system_input_events = input_events - {_SYSTEM_STOP_EVENT} - event_covered_fields = set().union( - *[ - EventHandlers.get_field_coverage(comp_cls_path, evt_type) - for evt_type in non_system_input_events - ] - ) - return event_covered_fields - - @validator def validate_all_inputs_connected( process_dict: dict[str, _t.Any], @@ -124,7 +104,7 @@ def validate_all_inputs_connected( connected = connected_inputs.get(comp_name, set()) unconnected = all_inputs - connected if unconnected: - event_covered_fields = _get_event_field_coverage(comp_data) + event_covered_fields = set().union(*io.get("event_field_coverage", {}).values()) unconnected -= event_covered_fields if unconnected: errors.append(f"Component '{comp_name}' has unconnected inputs: {sorted(unconnected)}") diff --git a/plugboard/component/component.py b/plugboard/component/component.py index 2db9e92b..608689b1 100644 --- a/plugboard/component/component.py +++ b/plugboard/component/component.py @@ -95,6 +95,7 @@ def __init__( initial_values=self._initial_values, input_events=self.__class__.io.input_events, output_events=self.__class__.io.output_events, + event_field_coverage=self.__class__.io.event_field_coverage, namespace=self.name, component=self, ) diff --git a/plugboard/component/io_controller.py b/plugboard/component/io_controller.py index 5ac67f7c..870c49c6 100644 --- a/plugboard/component/io_controller.py +++ b/plugboard/component/io_controller.py @@ -38,6 +38,7 @@ def __init__( initial_values: _t.Optional[dict[str, _t.Iterable]] = None, input_events: _t.Optional[list[_t.Type[Event]]] = None, output_events: _t.Optional[list[_t.Type[Event]]] = None, + event_field_coverage: _t.Optional[dict[str, list[str]]] = None, namespace: str = IO_NS_UNSET, component: _t.Optional[Component] = None, ) -> None: @@ -47,6 +48,7 @@ def __init__( self.initial_values = initial_values or {} self.input_events = input_events or [] self.output_events = output_events or [] + self.event_field_coverage = event_field_coverage or {} if set(self.initial_values.keys()) - set(self.inputs): raise ValueError("Initial values must be for input fields only.") self._component = component @@ -411,6 +413,7 @@ def dict(self) -> dict[str, _t.Any]: # noqa: D102 "input_events": [e.safe_type() for e in self.input_events], "output_events": [e.safe_type() for e in self.output_events], "initial_values": {k: list(v) for k, v in self._initial_values.items()}, + "event_field_coverage": {k: list(v) for k, v in self.event_field_coverage.items()}, } diff --git a/plugboard/events/event_handlers.py b/plugboard/events/event_handlers.py index b190e06d..3eb8223d 100644 --- a/plugboard/events/event_handlers.py +++ b/plugboard/events/event_handlers.py @@ -16,7 +16,6 @@ class EventHandlers: # pragma: no cover """`EventHandlers` provides a decorator for registering event handlers.""" _handlers: _t.ClassVar[dict[str, dict[str, AsyncCallable]]] = defaultdict(dict) - _handler_field_coverage: _t.ClassVar[dict[str, dict[str, list[str]]]] = defaultdict(dict) @classmethod def add( @@ -38,7 +37,12 @@ def decorator(method: AsyncCallable) -> AsyncCallable: class_path = cls._get_class_path_for_method(method) cls._handlers[class_path][event.type] = method if populates_fields is not None: - cls._handler_field_coverage[class_path][event.type] = populates_fields + comp_cls = method.__self__.__class__ + if not hasattr(comp_cls, "io"): + raise ValueError( + "populates_fields must be specified on method of Component subclass." + ) + comp_cls.io.event_field_coverage[event.type] = populates_fields return method return decorator @@ -72,26 +76,10 @@ def get(cls, _class: _t.Type, event: _t.Type[Event] | Event) -> AsyncCallable: KeyError: If no handler found for class or event type """ store = cls._handlers - for base_path in cls._iter_mro(_class): + for base_class in _class.__mro__: + base_path = f"{base_class.__module__}.{base_class.__name__}" if base_path in store and event.type in store[base_path]: return store[base_path][event.type] raise KeyError( f"No handler found for class '{_class.__name__}' and event type '{event.type}'" ) - - @classmethod - def get_field_coverage(cls, _class: _t.Type, event: _t.Type[Event] | Event) -> list[str]: - """Retrieve the fields populated by the handler for a specific class and event type. - - Args: - _class: Class to handle event for - event: Event class or instance to handle - - Returns: - list[str]: List of fields populated by the handler - """ - store = cls._handler_field_coverage - for base_path in cls._iter_mro(_class): - if base_path in store and event.type in store[base_path]: - return store[base_path][event.type] - return [] From 44199ee2d0a47603d53cfad6e0314ab82d578e53 Mon Sep 17 00:00:00 2001 From: Chris Knight Date: Wed, 29 Apr 2026 21:40:08 +0200 Subject: [PATCH 18/18] test: Updates validation test --- tests/unit/test_process_validation.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/tests/unit/test_process_validation.py b/tests/unit/test_process_validation.py index b0ec7482..df20132e 100644 --- a/tests/unit/test_process_validation.py +++ b/tests/unit/test_process_validation.py @@ -95,6 +95,7 @@ def _make_component( outputs: list[str] | None = None, input_events: list[str] | None = None, output_events: list[str] | None = None, + event_field_coverage: dict[str, list[str]] | None = None, initial_values: dict[str, _t.Any] | None = None, ) -> dict[str, _t.Any]: """Build a component dict matching process.dict() format.""" @@ -108,6 +109,7 @@ def _make_component( "outputs": outputs or [], "input_events": input_events or [], "output_events": output_events or [], + "event_field_coverage": event_field_coverage or {}, "initial_values": initial_values or {}, }, } @@ -303,7 +305,7 @@ def test_no_inputs_no_errors(self) -> None: errors = validate_all_inputs_connected(pd) assert errors == [] - def test_missing_inputs_allowed_for_event_driven_component_reuse(self) -> None: + def test_event_covered_fields(self) -> None: """Unconnected inputs are allowed when non-system input events can populate them.""" pd = _make_process_dict( components={ @@ -312,6 +314,7 @@ def test_missing_inputs_allowed_for_event_driven_component_reuse(self) -> None: "writer", inputs=["message"], input_events=["system_stop", "message_event"], + event_field_coverage={"message_event": ["message"]}, ), }, )