From 66e9088f3a42210e1f952288c728e41353ba381d Mon Sep 17 00:00:00 2001 From: Fabian Martinez <46371672+famarting@users.noreply.github.com> Date: Thu, 12 Mar 2026 19:39:30 +0100 Subject: [PATCH 1/5] fix: apply DAPR_API_TIMEOUT_SECONDS to workflow gRPC connections The workflow extension's gRPC connections (DaprWorkflowClient, async DaprWorkflowClient, and WorkflowRuntime) were not respecting the DAPR_API_TIMEOUT_SECONDS environment variable, unlike the core SDK's DaprGrpcClient which applies it via a timeout interceptor. Pass DaprClientTimeoutInterceptor (sync) and DaprClientTimeoutInterceptorAsync (async) to the durabletask TaskHubGrpcClient, AsyncTaskHubGrpcClient, and TaskHubGrpcWorker so that workflow gRPC calls get the configured default timeout. Signed-off-by: Fabian Martinez Signed-off-by: Fabian Martinez <46371672+famarting@users.noreply.github.com> --- .../dapr/ext/workflow/aio/dapr_workflow_client.py | 2 ++ .../dapr/ext/workflow/dapr_workflow_client.py | 2 ++ ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py | 6 +++++- 3 files changed, 9 insertions(+), 1 deletion(-) diff --git a/ext/dapr-ext-workflow/dapr/ext/workflow/aio/dapr_workflow_client.py b/ext/dapr-ext-workflow/dapr/ext/workflow/aio/dapr_workflow_client.py index fb8367a04..dfe7ac3bc 100644 --- a/ext/dapr-ext-workflow/dapr/ext/workflow/aio/dapr_workflow_client.py +++ b/ext/dapr-ext-workflow/dapr/ext/workflow/aio/dapr_workflow_client.py @@ -27,6 +27,7 @@ from grpc.aio import AioRpcError from dapr.clients import DaprInternalError +from dapr.aio.clients.grpc.interceptors import DaprClientTimeoutInterceptorAsync from dapr.clients.http.client import DAPR_API_TOKEN_HEADER from dapr.conf import settings from dapr.conf.helpers import GrpcEndpoint @@ -68,6 +69,7 @@ def __init__( secure_channel=uri.tls, log_handler=options.log_handler, log_formatter=options.log_formatter, + interceptors=[DaprClientTimeoutInterceptorAsync()], ) async def schedule_new_workflow( diff --git a/ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_client.py b/ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_client.py index bdb72aca4..d732f7747 100644 --- a/ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_client.py +++ b/ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_client.py @@ -26,6 +26,7 @@ from grpc import RpcError from dapr.clients import DaprInternalError +from dapr.clients.grpc.interceptors import DaprClientTimeoutInterceptor from dapr.clients.http.client import DAPR_API_TOKEN_HEADER from dapr.conf import settings from dapr.conf.helpers import GrpcEndpoint @@ -70,6 +71,7 @@ def __init__( secure_channel=uri.tls, log_handler=options.log_handler, log_formatter=options.log_formatter, + interceptors=[DaprClientTimeoutInterceptor()], ) def schedule_new_workflow( diff --git a/ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py b/ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py index 11bae78ac..08f5a4029 100644 --- a/ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py +++ b/ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py @@ -27,6 +27,7 @@ from dapr.ext.workflow.workflow_context import Workflow from dapr.clients import DaprInternalError +from dapr.clients.grpc.interceptors import DaprClientTimeoutInterceptor from dapr.clients.http.client import DAPR_API_TOKEN_HEADER from dapr.conf import settings from dapr.conf.helpers import GrpcEndpoint @@ -73,13 +74,16 @@ def __init__( raise DaprInternalError(f'{error}') from error options = self._logger.get_options() + all_interceptors = [DaprClientTimeoutInterceptor()] + if interceptors: + all_interceptors.extend(interceptors) self.__worker = worker.TaskHubGrpcWorker( host_address=uri.endpoint, metadata=metadata, secure_channel=uri.tls, log_handler=options.log_handler, log_formatter=options.log_formatter, - interceptors=interceptors, + interceptors=all_interceptors, concurrency_options=worker.ConcurrencyOptions( maximum_concurrent_activity_work_items=maximum_concurrent_activity_work_items, maximum_concurrent_orchestration_work_items=maximum_concurrent_orchestration_work_items, From a05fb79319b07687f55411dcbf8ed7ebdbc769a9 Mon Sep 17 00:00:00 2001 From: Fabian Martinez <46371672+famarting@users.noreply.github.com> Date: Fri, 13 Mar 2026 08:53:12 +0100 Subject: [PATCH 2/5] lint Signed-off-by: Fabian Martinez <46371672+famarting@users.noreply.github.com> --- .../dapr/ext/workflow/aio/dapr_workflow_client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ext/dapr-ext-workflow/dapr/ext/workflow/aio/dapr_workflow_client.py b/ext/dapr-ext-workflow/dapr/ext/workflow/aio/dapr_workflow_client.py index dfe7ac3bc..b72be558d 100644 --- a/ext/dapr-ext-workflow/dapr/ext/workflow/aio/dapr_workflow_client.py +++ b/ext/dapr-ext-workflow/dapr/ext/workflow/aio/dapr_workflow_client.py @@ -26,8 +26,8 @@ from dapr.ext.workflow.workflow_state import WorkflowState from grpc.aio import AioRpcError -from dapr.clients import DaprInternalError from dapr.aio.clients.grpc.interceptors import DaprClientTimeoutInterceptorAsync +from dapr.clients import DaprInternalError from dapr.clients.http.client import DAPR_API_TOKEN_HEADER from dapr.conf import settings from dapr.conf.helpers import GrpcEndpoint From 567479945eb3676f444321e93c8150487bb645c4 Mon Sep 17 00:00:00 2001 From: Fabian Martinez <46371672+famarting@users.noreply.github.com> Date: Fri, 13 Mar 2026 13:36:48 +0100 Subject: [PATCH 3/5] Potential fix for pull request finding Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> Signed-off-by: Fabian Martinez <46371672+famarting@users.noreply.github.com> --- ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py b/ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py index 08f5a4029..f33622a15 100644 --- a/ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py +++ b/ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py @@ -74,9 +74,10 @@ def __init__( raise DaprInternalError(f'{error}') from error options = self._logger.get_options() - all_interceptors = [DaprClientTimeoutInterceptor()] + all_interceptors = [] if interceptors: all_interceptors.extend(interceptors) + all_interceptors.append(DaprClientTimeoutInterceptor()) self.__worker = worker.TaskHubGrpcWorker( host_address=uri.endpoint, metadata=metadata, From 23dab4b02db078917f05b6c91f48bbd89318a26e Mon Sep 17 00:00:00 2001 From: Fabian Martinez <46371672+famarting@users.noreply.github.com> Date: Thu, 23 Apr 2026 08:55:26 +0200 Subject: [PATCH 4/5] add unit tests Signed-off-by: Fabian Martinez <46371672+famarting@users.noreply.github.com> --- .../tests/test_workflow_client.py | 19 ++++++- .../tests/test_workflow_client_aio.py | 19 ++++++- .../tests/test_workflow_runtime.py | 57 ++++++++++++++++++- 3 files changed, 92 insertions(+), 3 deletions(-) diff --git a/ext/dapr-ext-workflow/tests/test_workflow_client.py b/ext/dapr-ext-workflow/tests/test_workflow_client.py index 26dcbdb61..11ee6b19f 100644 --- a/ext/dapr-ext-workflow/tests/test_workflow_client.py +++ b/ext/dapr-ext-workflow/tests/test_workflow_client.py @@ -18,10 +18,11 @@ from typing import Any, Union from unittest import mock +from grpc import RpcError + from dapr.ext.workflow._durabletask import client from dapr.ext.workflow.dapr_workflow_client import DaprWorkflowClient from dapr.ext.workflow.dapr_workflow_context import DaprWorkflowContext -from grpc import RpcError mock_schedule_result = 'workflow001' mock_raise_event_result = 'event001' @@ -111,6 +112,20 @@ def _inner_get_orchestration_state(self, instance_id, state: client.Orchestratio ) +class WorkflowClientTimeoutInterceptorTest(unittest.TestCase): + def test_timeout_interceptor_is_passed_to_client(self): + with mock.patch('durabletask.client.TaskHubGrpcClient') as mock_client_cls: + DaprWorkflowClient() + mock_client_cls.assert_called_once() + call_kwargs = mock_client_cls.call_args[1] + interceptors = call_kwargs['interceptors'] + self.assertEqual(len(interceptors), 1) + from dapr.clients.grpc.interceptors import \ + DaprClientTimeoutInterceptor + + self.assertIsInstance(interceptors[0], DaprClientTimeoutInterceptor) + + class WorkflowClientTest(unittest.TestCase): def mock_client_wf(ctx: DaprWorkflowContext, input): print(f'{input}') @@ -186,3 +201,5 @@ def test_client_functions(self): actual_purge_result = wfClient.purge_workflow(instance_id=mock_instance_id) assert actual_purge_result == mock_purge_result + actual_purge_result = wfClient.purge_workflow(instance_id=mock_instance_id) + assert actual_purge_result == mock_purge_result diff --git a/ext/dapr-ext-workflow/tests/test_workflow_client_aio.py b/ext/dapr-ext-workflow/tests/test_workflow_client_aio.py index 6e5d610f3..cbfe0f5e4 100644 --- a/ext/dapr-ext-workflow/tests/test_workflow_client_aio.py +++ b/ext/dapr-ext-workflow/tests/test_workflow_client_aio.py @@ -18,10 +18,11 @@ from typing import Any, Union from unittest import mock +from grpc.aio import AioRpcError + from dapr.ext.workflow._durabletask import client from dapr.ext.workflow.aio import DaprWorkflowClient from dapr.ext.workflow.dapr_workflow_context import DaprWorkflowContext -from grpc.aio import AioRpcError mock_schedule_result = 'workflow001' mock_raise_event_result = 'event001' @@ -112,6 +113,20 @@ def _inner_get_orchestration_state(self, instance_id, state: client.Orchestratio ) +class WorkflowClientAioTimeoutInterceptorTest(unittest.IsolatedAsyncioTestCase): + async def test_timeout_interceptor_is_passed_to_client(self): + with mock.patch('durabletask.aio.client.AsyncTaskHubGrpcClient') as mock_client_cls: + DaprWorkflowClient() + mock_client_cls.assert_called_once() + call_kwargs = mock_client_cls.call_args[1] + interceptors = call_kwargs['interceptors'] + self.assertEqual(len(interceptors), 1) + from dapr.aio.clients.grpc.interceptors import \ + DaprClientTimeoutInterceptorAsync + + self.assertIsInstance(interceptors[0], DaprClientTimeoutInterceptorAsync) + + class WorkflowClientAioTest(unittest.IsolatedAsyncioTestCase): def mock_client_wf(ctx: DaprWorkflowContext, input): print(f'{input}') @@ -190,3 +205,5 @@ async def test_client_functions(self): actual_purge_result = await wfClient.purge_workflow(instance_id=mock_instance_id) assert actual_purge_result == mock_purge_result + actual_purge_result = await wfClient.purge_workflow(instance_id=mock_instance_id) + assert actual_purge_result == mock_purge_result diff --git a/ext/dapr-ext-workflow/tests/test_workflow_runtime.py b/ext/dapr-ext-workflow/tests/test_workflow_runtime.py index 233bd032f..4b7f7daa4 100644 --- a/ext/dapr-ext-workflow/tests/test_workflow_runtime.py +++ b/ext/dapr-ext-workflow/tests/test_workflow_runtime.py @@ -17,10 +17,12 @@ from typing import List, Optional from unittest import mock +import grpc +from pydantic import BaseModel, ValidationError + from dapr.ext.workflow.dapr_workflow_context import DaprWorkflowContext from dapr.ext.workflow.workflow_activity_context import WorkflowActivityContext from dapr.ext.workflow.workflow_runtime import WorkflowRuntime, alternate_name -from pydantic import BaseModel, ValidationError class Order(BaseModel): @@ -46,6 +48,59 @@ def add_named_activity(self, name: str, fn): self._activity_fns[name] = fn +class WorkflowRuntimeTimeoutInterceptorTest(unittest.TestCase): + def setUp(self): + listActivities.clear() + listOrchestrators.clear() + self._registry_patch = mock.patch( + 'durabletask.worker._Registry', return_value=FakeTaskHubGrpcWorker() + ) + self._registry_patch.start() + + def tearDown(self): + mock.patch.stopall() + + def test_timeout_interceptor_is_prepended(self): + with mock.patch('durabletask.worker.TaskHubGrpcWorker') as mock_worker_cls: + WorkflowRuntime() + mock_worker_cls.assert_called_once() + call_kwargs = mock_worker_cls.call_args[1] + interceptors = call_kwargs['interceptors'] + self.assertEqual(len(interceptors), 1) + from dapr.clients.grpc.interceptors import \ + DaprClientTimeoutInterceptor + + self.assertIsInstance(interceptors[0], DaprClientTimeoutInterceptor) + + def test_timeout_interceptor_with_custom_interceptors(self): + custom_interceptor = mock.MagicMock(spec=grpc.UnaryUnaryClientInterceptor) + with mock.patch('durabletask.worker.TaskHubGrpcWorker') as mock_worker_cls: + WorkflowRuntime(interceptors=[custom_interceptor]) + call_kwargs = mock_worker_cls.call_args[1] + interceptors = call_kwargs['interceptors'] + self.assertEqual(len(interceptors), 2) + from dapr.clients.grpc.interceptors import \ + DaprClientTimeoutInterceptor + + self.assertIsInstance(interceptors[0], DaprClientTimeoutInterceptor) + self.assertIs(interceptors[1], custom_interceptor) + + def test_timeout_interceptor_preserves_custom_interceptor_order(self): + custom1 = mock.MagicMock(spec=grpc.UnaryUnaryClientInterceptor) + custom2 = mock.MagicMock(spec=grpc.UnaryStreamClientInterceptor) + with mock.patch('durabletask.worker.TaskHubGrpcWorker') as mock_worker_cls: + WorkflowRuntime(interceptors=[custom1, custom2]) + call_kwargs = mock_worker_cls.call_args[1] + interceptors = call_kwargs['interceptors'] + self.assertEqual(len(interceptors), 3) + from dapr.clients.grpc.interceptors import \ + DaprClientTimeoutInterceptor + + self.assertIsInstance(interceptors[0], DaprClientTimeoutInterceptor) + self.assertIs(interceptors[1], custom1) + self.assertIs(interceptors[2], custom2) + + class WorkflowRuntimeTest(unittest.TestCase): def setUp(self): listActivities.clear() From dc600a69f2c00ec5e9b8322cf77bb1eb016e381f Mon Sep 17 00:00:00 2001 From: Fabian Martinez <46371672+famarting@users.noreply.github.com> Date: Mon, 23 Mar 2026 08:26:00 +0100 Subject: [PATCH 5/5] lint Signed-off-by: Fabian Martinez <46371672+famarting@users.noreply.github.com> --- ext/dapr-ext-workflow/tests/test_workflow_client.py | 1 + ext/dapr-ext-workflow/tests/test_workflow_client_aio.py | 1 + ext/dapr-ext-workflow/tests/test_workflow_runtime.py | 6 ++++++ 3 files changed, 8 insertions(+) diff --git a/ext/dapr-ext-workflow/tests/test_workflow_client.py b/ext/dapr-ext-workflow/tests/test_workflow_client.py index 11ee6b19f..5ba6c4f00 100644 --- a/ext/dapr-ext-workflow/tests/test_workflow_client.py +++ b/ext/dapr-ext-workflow/tests/test_workflow_client.py @@ -18,6 +18,7 @@ from typing import Any, Union from unittest import mock +from durabletask import client from grpc import RpcError from dapr.ext.workflow._durabletask import client diff --git a/ext/dapr-ext-workflow/tests/test_workflow_client_aio.py b/ext/dapr-ext-workflow/tests/test_workflow_client_aio.py index cbfe0f5e4..7f9081f35 100644 --- a/ext/dapr-ext-workflow/tests/test_workflow_client_aio.py +++ b/ext/dapr-ext-workflow/tests/test_workflow_client_aio.py @@ -18,6 +18,7 @@ from typing import Any, Union from unittest import mock +from durabletask import client from grpc.aio import AioRpcError from dapr.ext.workflow._durabletask import client diff --git a/ext/dapr-ext-workflow/tests/test_workflow_runtime.py b/ext/dapr-ext-workflow/tests/test_workflow_runtime.py index 4b7f7daa4..2810bf2b5 100644 --- a/ext/dapr-ext-workflow/tests/test_workflow_runtime.py +++ b/ext/dapr-ext-workflow/tests/test_workflow_runtime.py @@ -820,3 +820,9 @@ def my_act(ctx, order: Optional[Order]): wrapper = self.fake_registry._activity_fns['optional_no_default_act'] self.assertIsNone(wrapper(mock.MagicMock(), None)) + wrapper = self.fake_registry._activity_fns['optional_no_default_act'] + + self.assertIsNone(wrapper(mock.MagicMock(), None)) + wrapper = self.fake_registry._activity_fns['optional_no_default_act'] + + self.assertIsNone(wrapper(mock.MagicMock(), None))