Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
b58cb93
Initial plan
Copilot Apr 8, 2026
2536193
fix: support reusing field-input components in event-driven flows
Copilot Apr 8, 2026
e7a822a
chore: address review feedback on event-driven reuse fix
Copilot Apr 8, 2026
5e39023
docs: clarify event-driven reuse shutdown handling
Copilot Apr 8, 2026
544d5c8
docs: clarify event-driven reuse test intent
Copilot Apr 8, 2026
b3c4e16
refactor: expose connected input state on io controller
Copilot Apr 8, 2026
e4047cb
refactor: reuse connected input property internally
Copilot Apr 8, 2026
b442ee0
refactor: simplify connected field input checks
Copilot Apr 8, 2026
21b8ea4
fix: Modifies DataWriter logic to only append new data
chrisk314 Apr 12, 2026
f763d86
refactor: Separates field data tracker reset into new method for easi…
chrisk314 Apr 12, 2026
675e8de
refactor: Adapts DataWriter for trickling input data
chrisk314 Apr 12, 2026
7fe735b
refactor: Rework DataWriter logic
chrisk314 Apr 12, 2026
5fad405
fixup! refactor: Rework DataWriter logic
chrisk314 Apr 16, 2026
8ca6298
test: Adds test for staggered event driven data arrival
chrisk314 Apr 16, 2026
5378b4c
Merge branch 'main' into copilot/feat-improve-component-reusability
chrisk314 Apr 28, 2026
a4a2035
feat: Adds `populates_fields` kwarg to Event.handler
chrisk314 Apr 28, 2026
297a9b3
chore: Modifies input connection logic for event covered fields
chrisk314 Apr 29, 2026
68d31e7
refactor: Alternate approach to track event field coverage
chrisk314 Apr 29, 2026
44199ee
test: Updates validation test
chrisk314 Apr 29, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions plugboard-schemas/plugboard_schemas/_validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
from ._validator_registry import validator


_SYSTEM_STOP_EVENT = "system_stop"


def _build_component_graph(
connectors: dict[str, dict[str, _t.Any]],
) -> dict[str, set[str]]:
Expand Down Expand Up @@ -100,6 +103,9 @@ def validate_all_inputs_connected(
all_inputs = set(io.get("inputs", []))
connected = connected_inputs.get(comp_name, set())
unconnected = all_inputs - connected
if unconnected:
event_covered_fields = set().union(*io.get("event_field_coverage", {}).values())
unconnected -= event_covered_fields
if unconnected:
errors.append(f"Component '{comp_name}' has unconnected inputs: {sorted(unconnected)}")
return errors
Expand Down
17 changes: 13 additions & 4 deletions plugboard/component/component.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ def __init__(
initial_values=self._initial_values,
input_events=self.__class__.io.input_events,
output_events=self.__class__.io.output_events,
event_field_coverage=self.__class__.io.event_field_coverage,
namespace=self.name,
component=self,
)
Expand Down Expand Up @@ -356,7 +357,7 @@ async def _wrapper() -> None:
raise e
self._bind_outputs()
await self.io.write()
self._field_inputs_ready = False
self._reset_input_trackers()
await self._set_status(Status.WAITING, publish=not self._is_running)

return _wrapper
Expand All @@ -365,6 +366,11 @@ async def _wrapper() -> None:
def _has_field_inputs(self) -> bool:
return len(self.io.inputs) > 0

@property
def _has_connected_field_inputs(self) -> bool:
"""Whether any declared field inputs are connected via input channels."""
return self.io.has_connected_field_inputs

@cached_property
def _has_event_inputs(self) -> bool:
input_events = set([evt.safe_type() for evt in self.io.input_events])
Expand Down Expand Up @@ -409,7 +415,7 @@ async def _io_read_with_status_check(self) -> None:
task.cancel()
for task in done:
exc = task.exception()
if isinstance(exc, EventStreamClosedError) and len(self.io.inputs) == 0:
if isinstance(exc, EventStreamClosedError) and not self._has_connected_field_inputs:
await self.io.close() # Call close for final wait and flush event buffer
elif exc is not None:
raise exc
Expand All @@ -422,7 +428,7 @@ async def _periodic_status_check(self) -> None:
# TODO : Eventually producer graph update will be event driven. For now,
# : the update is performed periodically, so it's called here along
# : with the status check.
if len(self.io.inputs) == 0:
if not self._has_connected_field_inputs:
await self._update_producer_graph()

async def _status_check(self) -> None:
Expand Down Expand Up @@ -455,8 +461,11 @@ def _bind_inputs(self) -> None:
for field in self.io.inputs:
field_default = getattr(self, field, None)
value = self._field_inputs.get(field, field_default)
setattr(self, field, value)
super().__setattr__(field, value)

def _reset_input_trackers(self) -> None:
self._field_inputs = {}
self._field_inputs_ready = False

def _bind_outputs(self) -> None:
"""Binds component fields to output fields."""
Expand Down
14 changes: 9 additions & 5 deletions plugboard/component/io_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ def __init__(
initial_values: _t.Optional[dict[str, _t.Iterable]] = None,
input_events: _t.Optional[list[_t.Type[Event]]] = None,
output_events: _t.Optional[list[_t.Type[Event]]] = None,
event_field_coverage: _t.Optional[dict[str, list[str]]] = None,
namespace: str = IO_NS_UNSET,
component: _t.Optional[Component] = None,
) -> None:
Expand All @@ -47,6 +48,7 @@ def __init__(
self.initial_values = initial_values or {}
self.input_events = input_events or []
self.output_events = output_events or []
self.event_field_coverage = event_field_coverage or {}
if set(self.initial_values.keys()) - set(self.inputs):
raise ValueError("Initial values must be for input fields only.")
self._component = component
Expand Down Expand Up @@ -86,8 +88,9 @@ def is_closed(self) -> bool:
"""Returns `True` if the `IOController` is closed, `False` otherwise."""
return self._is_closed

@cached_property
def _has_field_inputs(self) -> bool:
@property
def has_connected_field_inputs(self) -> bool:
"""Returns whether any field inputs are connected via channels."""
return len(self._input_channels) > 0

@cached_property
Expand All @@ -96,7 +99,7 @@ def _has_event_inputs(self) -> bool:

@cached_property
def _has_inputs(self) -> bool:
return self._has_field_inputs or self._has_event_inputs
return self.has_connected_field_inputs or self._has_event_inputs

async def read(self, timeout: float | None = None) -> None:
"""Reads data and/or events from input channels.
Expand Down Expand Up @@ -139,7 +142,7 @@ async def read(self, timeout: float | None = None) -> None:

def _set_read_tasks(self) -> list[asyncio.Task]:
read_tasks: list[asyncio.Task] = []
if self._has_field_inputs:
if self.has_connected_field_inputs:
if _fields_read_task not in self._read_tasks:
read_fields_task = asyncio.create_task(self._read_fields(), name=_fields_read_task)
self._read_tasks[_fields_read_task] = read_fields_task
Expand Down Expand Up @@ -374,7 +377,7 @@ def _add_channel_for_event(

def _create_input_field_group_tasks(self) -> None:
"""Groups input field channels by field name and launches read tasks for group inputs."""
if not self._has_field_inputs:
if not self.has_connected_field_inputs:
return
field_channels: dict[str, list[tuple[_t_field_key, Channel]]] = defaultdict(list)
for key, chan in self._input_channels.items():
Expand Down Expand Up @@ -410,6 +413,7 @@ def dict(self) -> dict[str, _t.Any]: # noqa: D102
"input_events": [e.safe_type() for e in self.input_events],
"output_events": [e.safe_type() for e in self.output_events],
"initial_values": {k: list(v) for k, v in self._initial_values.items()},
"event_field_coverage": {k: list(v) for k, v in self.event_field_coverage.items()},
}


Expand Down
21 changes: 20 additions & 1 deletion plugboard/events/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,28 @@ def safe_type(cls, event_type: _t.Optional[str] = None) -> str:
"""Returns a safe event type string for use in broker topic strings."""
return (event_type or cls.type).replace(".", "_").replace("-", "_")

@_t.overload
@classmethod
def handler(cls, method: AsyncCallable) -> AsyncCallable:
def handler(cls, method: AsyncCallable) -> AsyncCallable: ...

@_t.overload
@classmethod
def handler(
cls, *, populates_fields: _t.Optional[list[str]] = None
) -> _t.Callable[[AsyncCallable], AsyncCallable]: ...

@classmethod
def handler(
cls,
method: _t.Optional[AsyncCallable] = None,
*,
populates_fields: _t.Optional[list[str]] = None,
) -> _t.Union[AsyncCallable, _t.Callable[[AsyncCallable], AsyncCallable]]:
"""Registers a class method as an event handler."""
if method is None:
# Invoked as @Event.handler(populates_fields=[...])
return EventHandlers.add(cls, populates_fields=populates_fields)
# Invoked as @Event.handler
return EventHandlers.add(cls)(method)


Expand Down
25 changes: 22 additions & 3 deletions plugboard/events/event_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,16 @@ class EventHandlers: # pragma: no cover
_handlers: _t.ClassVar[dict[str, dict[str, AsyncCallable]]] = defaultdict(dict)

@classmethod
def add(cls, event: _t.Type[Event] | Event) -> _t.Callable[[AsyncCallable], AsyncCallable]:
def add(
cls,
event: _t.Type[Event] | Event,
populates_fields: _t.Optional[list[str]] = None,
) -> _t.Callable[[AsyncCallable], AsyncCallable]:
"""Decorator that registers class methods as handlers for specific event types.

Args:
event: Event class this handler processes
populates_fields: Optional list of fields that the handler populates

Returns:
Callable: Decorated method
Expand All @@ -31,6 +36,13 @@ def add(cls, event: _t.Type[Event] | Event) -> _t.Callable[[AsyncCallable], Asyn
def decorator(method: AsyncCallable) -> AsyncCallable:
class_path = cls._get_class_path_for_method(method)
cls._handlers[class_path][event.type] = method
if populates_fields is not None:
comp_cls = method.__self__.__class__
if not hasattr(comp_cls, "io"):
raise ValueError(
"populates_fields must be specified on method of Component subclass."
)
comp_cls.io.event_field_coverage[event.type] = populates_fields
return method

return decorator
Expand All @@ -43,6 +55,12 @@ def _get_class_path_for_method(method: AsyncCallable) -> str:
class_name = qualname_parts[-2] # Last part is the method name
return f"{module_name}.{class_name}"

@staticmethod
def _iter_mro(_class: _t.Type) -> _t.Iterator[str]:
"""Iterate over class MRO, yielding fully qualified class paths."""
for base_class in _class.__mro__:
yield f"{base_class.__module__}.{base_class.__name__}"

@classmethod
def get(cls, _class: _t.Type, event: _t.Type[Event] | Event) -> AsyncCallable:
"""Retrieve a handler for a specific class and event type.
Expand All @@ -57,10 +75,11 @@ def get(cls, _class: _t.Type, event: _t.Type[Event] | Event) -> AsyncCallable:
Raises:
KeyError: If no handler found for class or event type
"""
store = cls._handlers
for base_class in _class.__mro__:
base_path = f"{base_class.__module__}.{base_class.__name__}"
if base_path in cls._handlers and event.type in cls._handlers[base_path]:
return cls._handlers[base_path][event.type]
if base_path in store and event.type in store[base_path]:
return store[base_path][event.type]
raise KeyError(
f"No handler found for class '{_class.__name__}' and event type '{event.type}'"
)
32 changes: 27 additions & 5 deletions plugboard/library/data_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ def __init__(
**kwargs: Additional keyword arguments for [`Component`][plugboard.component.Component].
"""
super().__init__(**kwargs)
# Use a single buffer to track everything
self._buffer: dict[str, deque] = defaultdict(deque)
self._chunk_size = chunk_size
self.io = IOController(
Expand Down Expand Up @@ -76,18 +77,39 @@ async def _convert(self, data: dict[str, deque]) -> _t.Any:
def _bind_inputs(self) -> None:
"""Binds input fields to component fields and append to internal buffer."""
super()._bind_inputs()
for field in self.io.inputs:
for field in self._field_inputs:
value = getattr(self, field, None)
self._buffer[field].append(value)

@property
def _completed_rows(self) -> int:
"""Calculates how many fully formed rows exist in the buffer."""
if not self.io.inputs:
return 0
return min((len(self._buffer[f]) for f in self.io.inputs), default=0)

@property
def _can_step(self) -> bool:
"""We can step if we have at least one fully formed row."""
return self._completed_rows > 0

async def _save_chunk(self) -> None:
"""Write data from the buffer."""
"""Write completed data rows from the buffer."""
completed_rows = self._completed_rows
if completed_rows == 0:
return

if self._task is not None:
await self._task
# Create task to save next chunk of data
chunk = await self._convert(self._buffer)

# Extract only the completed rows into a new chunk
chunk_data = {
field: deque([self._buffer[field].popleft() for _ in range(completed_rows)])
for field in self.io.inputs
}

chunk = await self._convert(chunk_data)
self._task = asyncio.create_task(self._save(chunk))
self._buffer = defaultdict(deque)

async def step(self) -> None:
"""Trigger save when buffer is at target size."""
Expand Down
Loading
Loading