-
Notifications
You must be signed in to change notification settings - Fork 17
[Feature]: add plugin interface #371
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
86d700a
7bb5800
d571b18
5707c4a
c0fa533
c8bf11a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,351 @@ | ||
| import datetime | ||
| import logging | ||
| from abc import ABC | ||
| from concurrent.futures import ThreadPoolExecutor | ||
| from dataclasses import dataclass | ||
| from typing import TYPE_CHECKING | ||
|
|
||
| from aws_durable_execution_sdk_python.lambda_service import ( | ||
| OperationType, | ||
| OperationStatus, | ||
| OperationAction, | ||
| OperationSubType, | ||
| ErrorObject, | ||
| InvocationStatus, | ||
| Operation, | ||
| OperationUpdate, | ||
| ) | ||
| from aws_durable_execution_sdk_python.types import LambdaContext | ||
|
|
||
| if TYPE_CHECKING: | ||
| from aws_durable_execution_sdk_python.execution import ( | ||
| DurableExecutionInvocationOutput, | ||
| ) | ||
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
||
|
|
||
| @dataclass | ||
| class OperationStartInfo: | ||
| operation_id: str | ||
| operation_type: OperationType | ||
| sub_type: OperationSubType | None = None | ||
| name: str | None = None | ||
| parent_id: str | None = None | ||
| start_timestamp: datetime.datetime | None = None | ||
|
|
||
|
|
||
| @dataclass | ||
| class OperationEndInfo(OperationStartInfo): | ||
| status: OperationStatus = OperationStatus.SUCCEEDED | ||
| end_timestamp: datetime.datetime | None = None | ||
| attempt: int | None = None | ||
|
SilanHe marked this conversation as resolved.
|
||
| error: ErrorObject | None = None | ||
|
|
||
|
|
||
| @dataclass | ||
| class AttemptStartInfo(OperationStartInfo): | ||
| attempt: int = 1 | ||
|
|
||
|
|
||
| @dataclass | ||
| class AttemptEndInfo(AttemptStartInfo): | ||
| succeeded: bool | None = None | ||
|
zhongkechen marked this conversation as resolved.
|
||
| end_timestamp: datetime.datetime | None = None | ||
| error: ErrorObject | None = None | ||
| next_attempt_delay_seconds: int | None = None | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this doesn't get populated currently? |
||
|
|
||
|
|
||
| @dataclass | ||
| class InvocationStartInfo: | ||
| request_id: str | None | ||
| execution_arn: str | None | ||
| start_timestamp: datetime.datetime | None | ||
|
|
||
|
|
||
| @dataclass | ||
| class InvocationEndInfo(InvocationStartInfo): | ||
| status: InvocationStatus = InvocationStatus.SUCCEEDED | ||
| end_timestamp: datetime.datetime | None = None | ||
| error: ErrorObject | None = None | ||
|
|
||
|
|
||
| @dataclass | ||
| class ExecutionStartInfo(InvocationStartInfo): | ||
| pass | ||
|
|
||
|
|
||
| @dataclass | ||
| class ExecutionEndInfo(ExecutionStartInfo): | ||
| status: InvocationStatus = InvocationStatus.SUCCEEDED | ||
| end_timestamp: datetime.datetime | None = None | ||
| error: ErrorObject | None = None | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. js has more fields.... // aws-durable-execution-sdk-js/src/types/plugin.ts
export interface ExecutionEndInfo extends InvocationInfo {
status: "SUCCEEDED" | "FAILED";
executionResult?: unknown;
executionError?: Error;
executionInput: unknown;
operations: Record<string, Operation>;
}If the idea of including these is so the execution span's attributes can include aggregate counts ("3 steps", "1 retry") without the plugin maintaining its own parallel state across events? |
||
|
|
||
|
|
||
| class DurableExecutionPlugin(ABC): | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. how does context extractor work? there is no so how will the plugins get the values for traceid/parent? is the idea to use the $ENVs, but the context_extractor interface makes me think that's not the idea? this might be unimplemented in the current JS implementation also, we should align and disambiguate the plan here. maybe?
the extractor must receive |
||
| """Base class for plugins. Override only the methods you need.""" | ||
|
|
||
| def on_execution_start(self, info: ExecutionStartInfo) -> None: | ||
| pass | ||
|
|
||
| def on_execution_end(self, info: ExecutionEndInfo) -> None: | ||
| pass | ||
|
|
||
| def on_invocation_start(self, info: InvocationStartInfo) -> None: | ||
| pass | ||
|
|
||
| def on_invocation_end(self, info: InvocationEndInfo) -> None: | ||
| pass | ||
|
|
||
| def on_operation_start(self, info: OperationStartInfo) -> None: | ||
| pass | ||
|
|
||
| def on_operation_end(self, info: OperationEndInfo) -> None: | ||
| pass | ||
|
|
||
| def on_operation_attempt_start(self, info: AttemptStartInfo) -> None: | ||
| pass | ||
|
|
||
| def on_operation_attempt_end(self, info: AttemptEndInfo) -> None: | ||
| pass | ||
|
|
||
| # Todo: further discussions required to finalize the following interface | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes. Also not sure whether the ABC is actually necessary here, unless there's going to be at least 1 abstractmethod. |
||
| # def enrich_log_context(self, info: OperationStartInfo | None) -> Dict[str, Any] | None: pass | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. TODO: create tracking issue |
||
|
|
||
|
|
||
| class PluginExecutor: | ||
| _DEFAULT_MAX_WORKERS = 4 | ||
|
|
||
| def __init__( | ||
| self, | ||
| plugins: list[DurableExecutionPlugin] | None, | ||
| max_workers: int | None = None, | ||
| ): | ||
| self.plugins = plugins or [] | ||
| self._pending_futures: list = [] | ||
| self._executor: ThreadPoolExecutor | None = ( | ||
| ThreadPoolExecutor( | ||
| max_workers=max_workers or self._DEFAULT_MAX_WORKERS, | ||
| thread_name_prefix="plugin-executor", | ||
| ) | ||
| if self.plugins | ||
| else None | ||
| ) | ||
|
|
||
| def close(self) -> None: | ||
| """Shut down the thread pool, waiting for pending tasks to complete.""" | ||
| if self._executor is None: | ||
| return | ||
| self.flush() | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. doesn't the next line shutdown(wait=True) effectively to the flush? https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.Executor.shutdown
|
||
| self._executor.shutdown(wait=True) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. so this will wait to flush all before closing. maybe make clear on the contract somewhere for so |
||
|
|
||
| def flush(self) -> None: | ||
| """Wait for all pending plugin tasks to complete. Useful for testing.""" | ||
| for future in self._pending_futures: | ||
| future.result() | ||
| self._pending_futures.clear() | ||
|
|
||
| def _dispatch_plugin(self, plugin: DurableExecutionPlugin, info) -> None: | ||
| """Invoke the appropriate plugin callback. Runs inside the thread pool.""" | ||
| try: | ||
| match info: | ||
| case ExecutionEndInfo(): | ||
| plugin.on_execution_end(info) | ||
| case InvocationEndInfo(): | ||
| plugin.on_invocation_end(info) | ||
| case ExecutionStartInfo(): | ||
| plugin.on_execution_start(info) | ||
| case InvocationStartInfo(): | ||
| plugin.on_invocation_start(info) | ||
| case AttemptEndInfo(): | ||
| plugin.on_operation_attempt_end(info) | ||
| case OperationEndInfo(): | ||
| plugin.on_operation_end(info) | ||
| case AttemptStartInfo(): | ||
| plugin.on_operation_attempt_start(info) | ||
| case OperationStartInfo(): | ||
| plugin.on_operation_start(info) | ||
| case _: | ||
| raise ValueError(f"Unknown info type: {type(info)}") | ||
| except Exception: | ||
| # log and ignore the exception | ||
| logger.exception("Plugin %s exception ignored", plugin.__class__.__name__) | ||
|
|
||
| def execute_plugins(self, info): | ||
| if not self.plugins: | ||
| return | ||
| for plugin in self.plugins: | ||
| future = self._executor.submit(self._dispatch_plugin, plugin, info) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this means strictly speaking plugins could execute out of order. at present I don't think the design covers ordering or preserving it explicitly? But I'm thinking specifically for parent -> child relationships, order might well be important here? It looks like the js runs synchronously on the handler thread (i.e sequentially) https://github.com/aws/aws-durable-execution-sdk-js/blob/otel-instrumentation-design-v2/packages/aws-durable-execution-sdk-js/src/utils/plugin/plugin-runner.ts i.e js preserves ordering |
||
| self._pending_futures.append(future) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. doesn't this need a lock?
but I'm also not sure to what degree this goes away depending on where we end up for synchronous/ordering, is the tracking list necessary? |
||
|
|
||
| def on_invocation_start( | ||
| self, | ||
| durable_execution_arn: str, | ||
| context: LambdaContext | None, | ||
| execution_operation: Operation | None, | ||
| is_replaying: bool, | ||
| ) -> None: | ||
| aws_request_id = context.aws_request_id if context else None | ||
| start_timestamp = ( | ||
| execution_operation.start_timestamp if execution_operation else None | ||
| ) | ||
|
|
||
| if not is_replaying: | ||
| self.execute_plugins( | ||
| ExecutionStartInfo( | ||
| request_id=aws_request_id, | ||
| execution_arn=durable_execution_arn, | ||
| start_timestamp=start_timestamp, | ||
| ) | ||
| ) | ||
|
|
||
| self.execute_plugins( | ||
| InvocationStartInfo( | ||
| request_id=aws_request_id, | ||
| execution_arn=durable_execution_arn, | ||
| start_timestamp=start_timestamp, | ||
| ) | ||
| ) | ||
|
|
||
| def on_invocation_end( | ||
| self, | ||
| durable_execution_arn: str | None, | ||
| context: LambdaContext, | ||
| execution_operation: Operation | None, | ||
| output: "DurableExecutionInvocationOutput", | ||
| ) -> None: | ||
| start_timestamp = ( | ||
| execution_operation.start_timestamp if execution_operation else None | ||
| ) | ||
| # the actual end timestamp may be unknown because it's not checkpointed yet | ||
| end_timestamp: datetime.datetime = ( | ||
| execution_operation.end_timestamp if execution_operation else None | ||
| ) or datetime.datetime.now() | ||
|
zhongkechen marked this conversation as resolved.
|
||
| request_id = context.aws_request_id if context else None | ||
|
|
||
| self.execute_plugins( | ||
| InvocationEndInfo( | ||
| request_id=request_id, | ||
| execution_arn=durable_execution_arn, | ||
| start_timestamp=start_timestamp, | ||
| status=output.status, | ||
| end_timestamp=end_timestamp, | ||
| error=output.error, | ||
| ) | ||
| ) | ||
|
|
||
| if output.status in [InvocationStatus.SUCCEEDED, InvocationStatus.FAILED]: | ||
| self.execute_plugins( | ||
| ExecutionEndInfo( | ||
| request_id=request_id, | ||
| execution_arn=durable_execution_arn, | ||
| start_timestamp=start_timestamp, | ||
| status=output.status, | ||
| end_timestamp=end_timestamp, | ||
| error=output.error, | ||
| ) | ||
| ) | ||
|
|
||
| def on_operation_action(self, operation: Operation | None, update: OperationUpdate): | ||
| """Execute any registered plugins for a given operation before it is updated. | ||
|
|
||
| Args: | ||
| update: the operation update that is pending checkpoint | ||
| """ | ||
| if update.action is not OperationAction.START: | ||
| return | ||
|
|
||
| self.execute_plugins( | ||
| OperationStartInfo( | ||
| operation_id=update.operation_id, | ||
| operation_type=update.operation_type, | ||
| sub_type=update.sub_type, | ||
| name=update.name, | ||
| parent_id=update.parent_id, | ||
| start_timestamp=datetime.datetime.now(), | ||
| ) | ||
| ) | ||
|
|
||
| if update.operation_type is OperationType.STEP: | ||
| attempt = ( | ||
| operation.step_details.attempt | ||
| if operation and operation.step_details | ||
| else 1 | ||
| ) | ||
| self.execute_plugins( | ||
| AttemptStartInfo( | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is there an OperationStartInfo and a AttemptStartInfo for each attempt? from #370
I could easily be mistracing this, is this what happens? Maybe it's an artifact of the previous PR #381 introducing the READY logic so every step retry checkpoints START?
might be an idea to introduce a functional test to verify/confirm the behaviour. |
||
| operation_id=update.operation_id, | ||
| operation_type=update.operation_type, | ||
| sub_type=update.sub_type, | ||
| name=update.name, | ||
| parent_id=update.parent_id, | ||
| start_timestamp=datetime.datetime.now(), | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. utc |
||
| attempt=attempt, | ||
| ) | ||
| ) | ||
|
|
||
| def on_operation_update(self, operation): | ||
| """Execute any registered plugins for a given operation after it is updated. | ||
|
|
||
| Updates such as STARTED might be omitted because START and completion action (e.g. SUCCEED/FAIL) may be | ||
| checkpointed in batch and the backend returns only the terminal status (e.g. SUCCEEDED/PENDING/FAILED). | ||
|
|
||
| Args: | ||
| operation: the operation is just checkpointed | ||
| """ | ||
| params = dict( | ||
| operation_id=operation.operation_id, | ||
| operation_type=operation.operation_type, | ||
| sub_type=operation.sub_type, | ||
| name=operation.name, | ||
| parent_id=operation.parent_id, | ||
| start_timestamp=operation.start_timestamp, | ||
| ) | ||
| if operation.step_details and ( | ||
| self._is_terminal_status(operation.status) | ||
| # PENDING in addition to terminal status | ||
| or operation.status is OperationStatus.PENDING | ||
| ): | ||
| self.execute_plugins( | ||
| AttemptEndInfo( | ||
| **params, | ||
| end_timestamp=operation.end_timestamp, | ||
| attempt=operation.step_details.attempt, | ||
| succeeded=operation.status is OperationStatus.SUCCEEDED, | ||
| error=operation.step_details.error, | ||
| ) | ||
| ) | ||
|
|
||
| if self._is_terminal_status(operation.status): | ||
| attempt = operation.step_details.attempt if operation.step_details else None | ||
| self.execute_plugins( | ||
| OperationEndInfo( | ||
| **params, | ||
| end_timestamp=operation.end_timestamp, | ||
| status=operation.status, | ||
| error=self._extract_error(operation), | ||
| attempt=attempt, | ||
| ) | ||
| ) | ||
|
|
||
| @staticmethod | ||
| def _extract_error(operation: Operation): | ||
| if operation.step_details and operation.step_details.error: | ||
| return operation.step_details.error | ||
| if operation.callback_details and operation.callback_details.error: | ||
| return operation.callback_details.error | ||
| if operation.chained_invoke_details and operation.chained_invoke_details.error: | ||
| return operation.chained_invoke_details.error | ||
| if operation.context_details and operation.context_details.error: | ||
| return operation.context_details.error | ||
| return None | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In but so we should probably refactor and move this existing logic so everything can use it. it might be an idea to do this as a no-func change refactor seperately in a preceding PR first, but (I'm typing quickly here just to give the ide) something like: and then in CheckpointedResult: return cls(
operation=operation,
status=operation.status,
result=operation.get_result(),
error=operation.get_error(),
)and here in plugin: |
||
|
|
||
| @staticmethod | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. rather than have this ambient bit of unattached logic, we can keep it with the other or maybe on the enum itself, but I'm not super fond of the forward reference lol: |
||
| def _is_terminal_status(status): | ||
| return status in [ | ||
| OperationStatus.SUCCEEDED, | ||
| OperationStatus.FAILED, | ||
| OperationStatus.TIMED_OUT, | ||
| OperationStatus.CANCELLED, | ||
| OperationStatus.STOPPED, | ||
| ] | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
these can be frozen?