From 7532e7daf27d0e2544efb7f83a5f38ccaff26795 Mon Sep 17 00:00:00 2001 From: Johann Schleier-Smith Date: Tue, 28 Apr 2026 21:10:03 -0700 Subject: [PATCH 1/5] samples: rename pubsub samples to workflow_stream MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Mirrors the contrib rename in sdk-python (commit 5890c589) where temporalio.contrib.pubsub became temporalio.contrib.workflow_stream and PubSub/PubSubClient/PubSubState became WorkflowStream/WorkflowStreamClient/ WorkflowStreamState. samples directory pubsub/ -> workflow_stream/, and the openai_agents streaming sample updates its imports/usages. The carrier field on the workflow input dataclass goes from pubsub_state to stream_state, and the sample task queue and starter workflow id prefix lose the pubsub label. The contrib branch reference (contrib/pubsub) on sdk-python stays as-is per the rename doc — only the module/class/file names rotate. Co-Authored-By: Claude Opus 4.7 (1M context) --- README.md | 1 + openai_agents/README.md | 3 + openai_agents/streaming/README.md | 90 +++++++++++++++++++ openai_agents/streaming/__init__.py | 0 .../streaming/activities/__init__.py | 0 .../streaming/activities/joke_activities.py | 11 +++ .../streaming/run_stream_items_workflow.py | 70 +++++++++++++++ .../streaming/run_stream_text_workflow.py | 64 +++++++++++++ openai_agents/streaming/run_worker.py | 55 ++++++++++++ openai_agents/streaming/shared.py | 79 ++++++++++++++++ openai_agents/streaming/workflows/__init__.py | 0 .../workflows/stream_items_workflow.py | 61 +++++++++++++ .../workflows/stream_text_workflow.py | 78 ++++++++++++++++ uv.lock | 4 + workflow_stream/README.md | 51 +++++++++++ workflow_stream/__init__.py | 0 workflow_stream/activities/__init__.py | 0 .../activities/payment_activity.py | 41 +++++++++ workflow_stream/run_publisher.py | 56 ++++++++++++ workflow_stream/run_worker.py | 27 ++++++ workflow_stream/shared.py | 87 ++++++++++++++++++ workflow_stream/workflows/__init__.py | 0 workflow_stream/workflows/order_workflow.py | 60 +++++++++++++ 23 files changed, 838 insertions(+) create mode 100644 openai_agents/streaming/README.md create mode 100644 openai_agents/streaming/__init__.py create mode 100644 openai_agents/streaming/activities/__init__.py create mode 100644 openai_agents/streaming/activities/joke_activities.py create mode 100644 openai_agents/streaming/run_stream_items_workflow.py create mode 100644 openai_agents/streaming/run_stream_text_workflow.py create mode 100644 openai_agents/streaming/run_worker.py create mode 100644 openai_agents/streaming/shared.py create mode 100644 openai_agents/streaming/workflows/__init__.py create mode 100644 openai_agents/streaming/workflows/stream_items_workflow.py create mode 100644 openai_agents/streaming/workflows/stream_text_workflow.py create mode 100644 workflow_stream/README.md create mode 100644 workflow_stream/__init__.py create mode 100644 workflow_stream/activities/__init__.py create mode 100644 workflow_stream/activities/payment_activity.py create mode 100644 workflow_stream/run_publisher.py create mode 100644 workflow_stream/run_worker.py create mode 100644 workflow_stream/shared.py create mode 100644 workflow_stream/workflows/__init__.py create mode 100644 workflow_stream/workflows/order_workflow.py diff --git a/README.md b/README.md index d4d6a61b..5a5c937f 100644 --- a/README.md +++ b/README.md @@ -79,6 +79,7 @@ Some examples require extra dependencies. See each sample's directory for specif * [patching](patching) - Alter workflows safely with `patch` and `deprecate_patch`. * [polling](polling) - Recommended implementation of an activity that needs to periodically poll an external resource waiting its successful completion. * [prometheus](prometheus) - Configure Prometheus metrics on clients/workers. +* [workflow_stream](workflow_stream) - Workflow-hosted durable event stream via `temporalio.contrib.workflow_stream`. **Experimental — requires the [`contrib/pubsub` branch](https://github.com/temporalio/sdk-python/tree/contrib/pubsub) of sdk-python.** * [pydantic_converter](pydantic_converter) - Data converter for using Pydantic models. * [schedules](schedules) - Demonstrates a Workflow Execution that occurs according to a schedule. * [sentry](sentry) - Report errors to Sentry. diff --git a/openai_agents/README.md b/openai_agents/README.md index 9404278f..99fb677e 100644 --- a/openai_agents/README.md +++ b/openai_agents/README.md @@ -36,3 +36,6 @@ Each directory contains a complete example with its own README for detailed inst - **[Customer Service](./customer_service/README.md)** - Interactive customer service agent with escalation capabilities, demonstrating conversational workflows. - **[Reasoning Content](./reasoning_content/README.md)** - Example of how to retrieve the thought process of reasoning models. - **[Financial Research Agent](./financial_research_agent/README.md)** - Multi-agent financial research system with planner, search, analyst, writer, and verifier agents collaborating. +- **[Streaming](./streaming/README.md)** - Buffered token streaming (events coalesced into batches over a configurable flush interval, default 100ms) via `temporalio.contrib.workflow_stream`. **Experimental — requires the [`contrib/pubsub` branch][workflow-stream-branch] of sdk-python.** + +[workflow-stream-branch]: https://github.com/temporalio/sdk-python/tree/contrib/pubsub diff --git a/openai_agents/streaming/README.md b/openai_agents/streaming/README.md new file mode 100644 index 00000000..43081e5e --- /dev/null +++ b/openai_agents/streaming/README.md @@ -0,0 +1,90 @@ +# Streaming OpenAI Agents + +> **Experimental.** These samples target the streaming hooks added to +> `temporalio.contrib.openai_agents` on the [`contrib/pubsub` branch of +> sdk-python][branch], which is not yet released. Install sdk-python +> from that branch (e.g. `uv pip install -e ` after +> checking out the branch) to run them locally. + +[branch]: https://github.com/temporalio/sdk-python/tree/contrib/pubsub + +The OpenAI Agents SDK supports streaming via `Runner.run_streamed`, which +yields `TResponseStreamEvent`s as the model produces them. Inside a +Temporal workflow the model call runs in an activity, so the workflow +cannot iterate the live HTTP stream directly. The plugin's streaming +support runs `model.stream_response()` in the activity and publishes +each event to the workflow's `temporalio.contrib.workflow_stream`. The +publisher coalesces events into batches over `streaming_event_batch_interval` +(default 100ms) before sending them as a signal — call this **buffered +token streaming**: deltas reach external subscribers within a batch +window of being produced, not on every byte. At typical model speeds a +single batch carries multiple tokens, so output arrives in small bursts +rather than glyph-by-glyph — close enough for most UIs, though the +cadence is visible next to a true per-token render. Tune +`streaming_event_batch_interval` to trade signal volume for smoothness. + +The two samples here mirror the upstream openai-agents-python basic +streaming examples. + +## `stream_text` — buffered text deltas + +Adapted from [`examples/basic/stream_text.py`][upstream-text]. Subscribes +to `ResponseTextDeltaEvent`s and prints them as they arrive (batched at +the broker's flush interval, see above). + +[upstream-text]: https://github.com/openai/openai-agents-python/blob/main/examples/basic/stream_text.py + +```bash +# Terminal 1 +uv run openai_agents/streaming/run_worker.py + +# Terminal 2 +uv run openai_agents/streaming/run_stream_text_workflow.py +``` + +## `stream_items` — agent-level events with a tool call + +Adapted from [`examples/basic/stream_items.py`][upstream-items]. Renders +agent updates, tool calls, tool outputs, and message outputs as a +play-by-play. + +[upstream-items]: https://github.com/openai/openai-agents-python/blob/main/examples/basic/stream_items.py + +```bash +uv run openai_agents/streaming/run_stream_items_workflow.py +``` + +## How it works + +1. The workflow constructs a `WorkflowStream` from `@workflow.init`. +2. The plugin's `OpenAIAgentsPlugin` is configured with + `streaming_event_topic="events"`. The plugin routes + `Runner.run_streamed` calls to `invoke_model_activity_streaming`. +3. Inside that activity, each `TResponseStreamEvent` from the live HTTP + stream is appended to a list (returned to the workflow when the + activity completes) **and** published to the stream via + `WorkflowStreamClient.from_activity()`. +4. The workflow publishes a sentinel to a separate `done` topic right + before returning, so the subscriber knows the stream is finished. +5. External code subscribes with `WorkflowStreamClient.create(...).subscribe( + ["events", "done"])` and breaks on the `done` event. We leave + `result_type` unset and decode events manually because the two + topics carry different types. The runner also races the consumer + against `handle.result()` so a workflow failure surfaces as an + exception rather than blocking the subscriber forever. + +In the workflow, `stream_events()` resolves only after the activity +returns, so the workflow itself does not see deltas as they arrive — the +streaming benefit is for external observers. If you want the workflow to +react incrementally, subscribe from a child workflow or activity rather +than from the workflow that hosts the stream. + +## Notes + +* `streaming_event_topic` defaults to `None` (no publishing). Set it on + `ModelActivityParameters` to a topic such as `"events"` to publish raw + stream events. +* Streaming is incompatible with `use_local_activity=True`: local + activities can neither heartbeat nor send signals back to the workflow. +* The workflow must host a `WorkflowStream`. Without one, the plugin's + publish signals are unhandled and silently dropped by Temporal. diff --git a/openai_agents/streaming/__init__.py b/openai_agents/streaming/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/openai_agents/streaming/activities/__init__.py b/openai_agents/streaming/activities/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/openai_agents/streaming/activities/joke_activities.py b/openai_agents/streaming/activities/joke_activities.py new file mode 100644 index 00000000..7fe1c499 --- /dev/null +++ b/openai_agents/streaming/activities/joke_activities.py @@ -0,0 +1,11 @@ +from __future__ import annotations + +import random + +from temporalio import activity + + +@activity.defn +async def how_many_jokes() -> int: + """Return a random integer of jokes to tell between 1 and 10 (inclusive).""" + return random.randint(1, 10) diff --git a/openai_agents/streaming/run_stream_items_workflow.py b/openai_agents/streaming/run_stream_items_workflow.py new file mode 100644 index 00000000..b697223a --- /dev/null +++ b/openai_agents/streaming/run_stream_items_workflow.py @@ -0,0 +1,70 @@ +from __future__ import annotations + +import asyncio +import uuid + +from agents import ItemHelpers +from agents.items import TResponseStreamEvent +from temporalio.api.common.v1 import Payload +from temporalio.client import Client +from temporalio.contrib.openai_agents import OpenAIAgentsPlugin +from temporalio.contrib.workflow_stream import WorkflowStreamClient + +from openai_agents.streaming.shared import ( + TASK_QUEUE, + TOPIC_DONE, + TOPIC_EVENTS, + race_with_workflow, +) +from openai_agents.streaming.workflows.stream_items_workflow import ( + StreamItemsInput, + StreamItemsWorkflow, +) + + +async def main() -> None: + client = await Client.connect( + "localhost:7233", + plugins=[OpenAIAgentsPlugin()], + ) + + workflow_id = f"stream-items-{uuid.uuid4().hex[:8]}" + handle = await client.start_workflow( + StreamItemsWorkflow.run, + StreamItemsInput(), + id=workflow_id, + task_queue=TASK_QUEUE, + ) + + stream = WorkflowStreamClient.create(client, workflow_id) + converter = client.data_converter.payload_converter + + async def render() -> None: + print("=== Run starting ===") + async for item in stream.subscribe([TOPIC_EVENTS, TOPIC_DONE]): + if item.topic == TOPIC_DONE: + return + assert isinstance(item.data, Payload) + event = converter.from_payload(item.data, TResponseStreamEvent) + if event.type == "raw_response_event": + continue + if event.type == "agent_updated_stream_event": + print(f"Agent updated: {event.new_agent.name}") + elif event.type == "run_item_stream_event": + if event.item.type == "tool_call_item": + name = getattr(event.item.raw_item, "name", "Unknown Tool") + print(f"-- Tool was called: {name}") + elif event.item.type == "tool_call_output_item": + print(f"-- Tool output: {event.item.output}") + elif event.item.type == "message_output_item": + print( + "-- Message output:\n " + f"{ItemHelpers.text_message_output(event.item)}" + ) + + await race_with_workflow(render(), handle) + print("=== Run complete ===") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/openai_agents/streaming/run_stream_text_workflow.py b/openai_agents/streaming/run_stream_text_workflow.py new file mode 100644 index 00000000..be2653c3 --- /dev/null +++ b/openai_agents/streaming/run_stream_text_workflow.py @@ -0,0 +1,64 @@ +from __future__ import annotations + +import asyncio +import uuid + +from agents.items import TResponseStreamEvent +from openai.types.responses import ResponseTextDeltaEvent +from temporalio.api.common.v1 import Payload +from temporalio.client import Client +from temporalio.contrib.openai_agents import OpenAIAgentsPlugin +from temporalio.contrib.workflow_stream import WorkflowStreamClient + +from openai_agents.streaming.shared import ( + TASK_QUEUE, + TOPIC_DONE, + TOPIC_EVENTS, + race_with_workflow, +) +from openai_agents.streaming.workflows.stream_text_workflow import ( + StreamTextInput, + StreamTextWorkflow, +) + + +async def main() -> None: + client = await Client.connect( + "localhost:7233", + plugins=[OpenAIAgentsPlugin()], + ) + + workflow_id = f"stream-text-{uuid.uuid4().hex[:8]}" + handle = await client.start_workflow( + StreamTextWorkflow.run, + StreamTextInput(prompt="Please tell me 5 jokes."), + id=workflow_id, + task_queue=TASK_QUEUE, + ) + + stream = WorkflowStreamClient.create(client, workflow_id) + converter = client.data_converter.payload_converter + + async def render() -> None: + # Subscribe to both the streaming-event topic and the workflow's + # done-sentinel so we can break cleanly without racing + # handle.result() against the next poll. result_type is left + # unset (we get raw Payloads) because the two topics carry + # different types — we decode based on item.topic. + async for item in stream.subscribe([TOPIC_EVENTS, TOPIC_DONE]): + if item.topic == TOPIC_DONE: + return + assert isinstance(item.data, Payload) + event = converter.from_payload(item.data, TResponseStreamEvent) + if event.type == "raw_response_event" and isinstance( + event.data, ResponseTextDeltaEvent + ): + print(event.data.delta, end="", flush=True) + + result = await race_with_workflow(render(), handle) + print("\n--- final result ---") + print(result) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/openai_agents/streaming/run_worker.py b/openai_agents/streaming/run_worker.py new file mode 100644 index 00000000..b1b751b8 --- /dev/null +++ b/openai_agents/streaming/run_worker.py @@ -0,0 +1,55 @@ +from __future__ import annotations + +import asyncio +import logging +from datetime import timedelta + +from temporalio.client import Client +from temporalio.contrib.openai_agents import ( + ModelActivityParameters, + OpenAIAgentsPlugin, +) +from temporalio.worker import Worker + +from openai_agents.streaming.activities.joke_activities import how_many_jokes +from openai_agents.streaming.shared import TASK_QUEUE, TOPIC_EVENTS +from openai_agents.streaming.workflows.stream_items_workflow import ( + StreamItemsWorkflow, +) +from openai_agents.streaming.workflows.stream_text_workflow import ( + StreamTextWorkflow, +) + + +async def main() -> None: + logging.basicConfig(level=logging.INFO) + client = await Client.connect( + "localhost:7233", + plugins=[ + OpenAIAgentsPlugin( + model_params=ModelActivityParameters( + # Streaming relies on heartbeats to detect a stuck + # LLM call. Pick a heartbeat_timeout comfortably + # larger than the expected delta cadence. + heartbeat_timeout=timedelta(seconds=10), + start_to_close_timeout=timedelta(minutes=5), + # streaming_event_topic defaults to None (no + # publishing). Set to a topic to publish raw stream + # events for external subscribers. + streaming_event_topic=TOPIC_EVENTS, + ), + ), + ], + ) + + worker = Worker( + client, + task_queue=TASK_QUEUE, + workflows=[StreamTextWorkflow, StreamItemsWorkflow], + activities=[how_many_jokes], + ) + await worker.run() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/openai_agents/streaming/shared.py b/openai_agents/streaming/shared.py new file mode 100644 index 00000000..c80dc886 --- /dev/null +++ b/openai_agents/streaming/shared.py @@ -0,0 +1,79 @@ +from __future__ import annotations + +import asyncio +from collections.abc import Coroutine +from typing import Any, TypeVar + +from temporalio.client import WorkflowHandle + +TASK_QUEUE = "openai-agents-streaming-task-queue" + +# Topic the plugin publishes raw model stream events to. Must match +# OpenAIAgentsPlugin(model_params=ModelActivityParameters(streaming_event_topic=...)). +TOPIC_EVENTS = "events" + +# Sentinel topic the workflow publishes to once Runner.run_streamed has +# finished. Subscribers iterate (events, done) and break on the done +# event — this avoids racing handle.result() against the subscriber's +# poll cycle. +TOPIC_DONE = "done" + + +T = TypeVar("T") + + +async def race_with_workflow( + consumer: Coroutine[Any, Any, None], + handle: WorkflowHandle[Any, T], +) -> T: + """Run a subscriber concurrently with the workflow. + + If the workflow finishes (success or failure) before the subscriber + sees its sentinel, cancel the subscriber and surface the workflow + result. If the subscriber finishes first (clean sentinel exit), + wait for the workflow result. A non-cancellation failure in the + subscriber is propagated either way. + + Without this, a workflow that raises before publishing the sentinel + would leave the subscriber blocked on its next poll forever. + """ + consumer_task = asyncio.create_task(consumer) + result_task = asyncio.create_task(handle.result()) + we_cancelled_consumer = False + try: + await asyncio.wait( + [consumer_task, result_task], + return_when=asyncio.FIRST_COMPLETED, + ) + # Stop the subscriber whether it reached its sentinel or not. + if not consumer_task.done(): + consumer_task.cancel() + we_cancelled_consumer = True + # gather(return_exceptions=True) drains both tasks. Child + # cancellation surfaces as a returned CancelledError; only + # cancellation we initiated is expected — anything else + # (including a third party cancelling the consumer behind + # our back) propagates. + consumer_outcome, workflow_outcome = await asyncio.gather( + consumer_task, result_task, return_exceptions=True + ) + if isinstance(consumer_outcome, asyncio.CancelledError): + if not we_cancelled_consumer: + raise consumer_outcome + elif isinstance(consumer_outcome, BaseException): + raise consumer_outcome + if isinstance(workflow_outcome, BaseException): + raise workflow_outcome + return workflow_outcome + finally: + # Idempotent cleanup. try/finally re-raises the in-flight + # exception (if any) after finally completes, so swallowing + # cleanup failures here is safe. + for task in (consumer_task, result_task): + if not task.done(): + task.cancel() + for task in (consumer_task, result_task): + try: + await task + except BaseException: + pass diff --git a/openai_agents/streaming/workflows/__init__.py b/openai_agents/streaming/workflows/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/openai_agents/streaming/workflows/stream_items_workflow.py b/openai_agents/streaming/workflows/stream_items_workflow.py new file mode 100644 index 00000000..8ca9390b --- /dev/null +++ b/openai_agents/streaming/workflows/stream_items_workflow.py @@ -0,0 +1,61 @@ +from __future__ import annotations + +from dataclasses import dataclass +from datetime import timedelta + +from agents import Agent, ItemHelpers, Runner +from temporalio import workflow +from temporalio.contrib import openai_agents as temporal_agents +from temporalio.contrib.workflow_stream import WorkflowStream, WorkflowStreamState + +from openai_agents.streaming.activities.joke_activities import how_many_jokes +from openai_agents.streaming.shared import TOPIC_DONE + +"""Streaming counterpart to the OpenAI Agents SDK ``stream_items.py`` example. + +Adapted from https://github.com/openai/openai-agents-python/blob/main/examples/basic/stream_items.py + +This variant streams higher-level events: tool calls, tool outputs, +agent updates, and message outputs. External subscribers can render a +play-by-play of the agent's reasoning as it unfolds, while the workflow +itself just waits for the final answer. +""" + + +@dataclass +class StreamItemsInput: + stream_state: WorkflowStreamState | None = None + + +@workflow.defn +class StreamItemsWorkflow: + @workflow.init + def __init__(self, input: StreamItemsInput) -> None: + self.stream = WorkflowStream(prior_state=input.stream_state) + + @workflow.run + async def run(self, input: StreamItemsInput) -> str: + del input # only used in @workflow.init for prior_state + agent = Agent( + name="Joker", + instructions=( + "First call the `how_many_jokes` tool, " + "then tell that many jokes." + ), + tools=[ + temporal_agents.workflow.activity_as_tool( + how_many_jokes, start_to_close_timeout=timedelta(seconds=10) + ) + ], + ) + result = Runner.run_streamed(agent, input="Hello") + + messages: list[str] = [] + async for event in result.stream_events(): + if event.type == "run_item_stream_event" and event.item.type == ( + "message_output_item" + ): + messages.append(ItemHelpers.text_message_output(event.item)) + # Sentinel for the external subscriber. + self.stream.publish(TOPIC_DONE, None) + return "\n\n".join(messages) if messages else result.final_output diff --git a/openai_agents/streaming/workflows/stream_text_workflow.py b/openai_agents/streaming/workflows/stream_text_workflow.py new file mode 100644 index 00000000..a1598ad5 --- /dev/null +++ b/openai_agents/streaming/workflows/stream_text_workflow.py @@ -0,0 +1,78 @@ +from __future__ import annotations + +from dataclasses import dataclass + +from agents import Agent, Runner +from openai.types.responses import ResponseTextDeltaEvent +from temporalio import workflow +from temporalio.contrib.workflow_stream import WorkflowStream, WorkflowStreamState + +from openai_agents.streaming.shared import TOPIC_DONE + +"""Streaming counterpart to the OpenAI Agents SDK ``stream_text.py`` example. + +Adapted from https://github.com/openai/openai-agents-python/blob/main/examples/basic/stream_text.py + +The upstream example calls ``Runner.run_streamed`` and iterates raw +``ResponseTextDeltaEvent``s as they arrive over HTTP. Inside a Temporal +workflow the model call runs in an activity, so the workflow cannot +iterate the live HTTP stream directly. The plugin's streaming support +runs ``model.stream_response()`` inside the activity and publishes each +``TResponseStreamEvent`` to the workflow's stream. Events are coalesced +into batches over ``streaming_event_batch_interval`` (default 100ms) +before being delivered to subscribers as signals — buffered token +streaming, not per-token. Output arrives in small bursts; the cadence +is visible compared to a true per-token render but is close enough for +most UIs. + +The workflow itself only needs to: + +1. host a ``WorkflowStream`` so the activity has somewhere to publish to; +2. call ``Runner.run_streamed`` (rather than ``Runner.run``) so the agent + framework drives the streaming activity. + +In a Temporal workflow ``stream_events()`` resolves only after the +underlying activity returns, so any in-workflow consumption is on the +final list — not deltas-as-they-arrive. +""" + + +@dataclass +class StreamTextInput: + prompt: str + stream_state: WorkflowStreamState | None = None + + +@workflow.defn +class StreamTextWorkflow: + @workflow.init + def __init__(self, input: StreamTextInput) -> None: + # Required: the streaming activity publishes to this stream. + # Without it, the publish signals are unhandled and dropped. + self.stream = WorkflowStream(prior_state=input.stream_state) + + @workflow.run + async def run(self, input: StreamTextInput) -> str: + agent = Agent( + name="Joker", + instructions="You are a helpful assistant.", + ) + result = Runner.run_streamed(agent, input=input.prompt) + + # Runner.run_streamed launches the agent loop in a background + # task; iterating consumes from it and waits for completion. + # The workflow side only sees the events once the activity + # returns, so this loop accumulates a count for logging. + # External subscribers receive them as the activity publishes. + deltas = 0 + async for event in result.stream_events(): + if event.type == "raw_response_event" and isinstance( + event.data, ResponseTextDeltaEvent + ): + deltas += 1 + workflow.logger.info("collected %d delta events", deltas) + # Sentinel for the external subscriber. Without it the + # subscriber's async iterator would block on its next poll + # waiting for events that never come. + self.stream.publish(TOPIC_DONE, None) + return result.final_output diff --git a/uv.lock b/uv.lock index 3c9990eb..a751b8a5 100644 --- a/uv.lock +++ b/uv.lock @@ -8,6 +8,10 @@ resolution-markers = [ "python_full_version < '3.11'", ] +[options] +exclude-newer = "2026-04-22T03:40:21.857325Z" +exclude-newer-span = "P7D" + [[package]] name = "aiohappyeyeballs" version = "2.6.1" diff --git a/workflow_stream/README.md b/workflow_stream/README.md new file mode 100644 index 00000000..3a57a8d7 --- /dev/null +++ b/workflow_stream/README.md @@ -0,0 +1,51 @@ +# Workflow Streams + +> **Experimental.** These samples target the +> `temporalio.contrib.workflow_stream` module on the +> [`contrib/pubsub` branch of sdk-python][branch], which is not yet +> released. To run them locally, install sdk-python from that branch +> (e.g. `uv pip install -e ` after checking out the +> branch). + +[branch]: https://github.com/temporalio/sdk-python/tree/contrib/pubsub + +`temporalio.contrib.workflow_stream` lets a workflow host a durable, +offset-addressed event channel. The workflow holds an append-only log; +external clients (activities, starters, BFFs) publish to topics via +signals and subscribe via long-poll updates. This packages the +boilerplate — batching, offset tracking, topic filtering, continue-as-new +hand-off — into a reusable stream. + +This directory has a minimal end-to-end example: + +* `workflows/order_workflow.py` — a workflow that hosts a + `WorkflowStream` and publishes status events as it processes an order. +* `activities/payment_activity.py` — an activity that publishes + intermediate progress to the stream via + `WorkflowStreamClient.from_activity()`. +* `run_worker.py` — registers the workflow and activity. +* `run_publisher.py` — starts the workflow, then prints subscribed + events as they arrive. + +## Run it + +```bash +# Terminal 1: worker +uv run workflow_stream/run_worker.py + +# Terminal 2: starter + subscriber +uv run workflow_stream/run_publisher.py +``` + +Expected output on the publisher side, with events streaming in as the +workflow progresses: + +``` +[status] received: order=order-1 +[progress] charging card... +[progress] card charged +[status] shipped: order=order-1 +[progress] charge id: charge-order-1 +[status] complete: order=order-1 +workflow result: charge-order-1 +``` diff --git a/workflow_stream/__init__.py b/workflow_stream/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/workflow_stream/activities/__init__.py b/workflow_stream/activities/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/workflow_stream/activities/payment_activity.py b/workflow_stream/activities/payment_activity.py new file mode 100644 index 00000000..f69f8c8d --- /dev/null +++ b/workflow_stream/activities/payment_activity.py @@ -0,0 +1,41 @@ +from __future__ import annotations + +import asyncio +from datetime import timedelta + +from temporalio import activity +from temporalio.contrib.workflow_stream import WorkflowStreamClient + +from workflow_stream.shared import TOPIC_PROGRESS, ProgressEvent + + +@activity.defn +async def charge_card(order_id: str) -> str: + """Pretend to charge a card, publishing progress to the parent workflow. + + `WorkflowStreamClient.from_activity()` reads the parent workflow id + and the Temporal client from the activity context, so this activity + can push events back without any wiring. + + Caveat: each call to ``from_activity()`` creates a fresh client with + a random ``publisher_id``, so dedup does not protect against an + activity retry republishing the same events. For activities that + must be exactly-once on the stream side, derive a stable + ``publisher_id`` from ``activity.info().activity_id`` (this is + invariant across attempts of the same scheduled activity). The + current ``WorkflowStreamClient`` API does not yet expose + ``publisher_id`` on its constructors; this sample accepts + at-most-once-per-attempt semantics. + """ + client = WorkflowStreamClient.from_activity( + batch_interval=timedelta(milliseconds=200) + ) + async with client: + client.publish(TOPIC_PROGRESS, ProgressEvent(message="charging card...")) + await asyncio.sleep(1.0) + client.publish( + TOPIC_PROGRESS, + ProgressEvent(message="card charged"), + force_flush=True, + ) + return f"charge-{order_id}" diff --git a/workflow_stream/run_publisher.py b/workflow_stream/run_publisher.py new file mode 100644 index 00000000..cdfb210d --- /dev/null +++ b/workflow_stream/run_publisher.py @@ -0,0 +1,56 @@ +from __future__ import annotations + +import asyncio +import uuid + +from temporalio.api.common.v1 import Payload +from temporalio.client import Client +from temporalio.contrib.workflow_stream import WorkflowStreamClient + +from workflow_stream.shared import ( + TASK_QUEUE, + TOPIC_PROGRESS, + TOPIC_STATUS, + OrderInput, + ProgressEvent, + StatusEvent, + race_with_workflow, +) +from workflow_stream.workflows.order_workflow import OrderWorkflow + + +async def main() -> None: + client = await Client.connect("localhost:7233") + + workflow_id = f"workflow-stream-order-{uuid.uuid4().hex[:8]}" + handle = await client.start_workflow( + OrderWorkflow.run, + OrderInput(order_id="order-1"), + id=workflow_id, + task_queue=TASK_QUEUE, + ) + + stream = WorkflowStreamClient.create(client, workflow_id) + converter = client.data_converter.payload_converter + + async def consume() -> None: + # Single iterator over both topics — avoids a cancellation race + # between two concurrent subscribers. result_type is left unset + # so we can dispatch heterogeneous events on item.topic. + async for item in stream.subscribe([TOPIC_STATUS, TOPIC_PROGRESS]): + assert isinstance(item.data, Payload) + if item.topic == TOPIC_STATUS: + evt = converter.from_payload(item.data, StatusEvent) + print(f"[status] {evt.kind}: order={evt.order_id}") + if evt.kind == "complete": + return + elif item.topic == TOPIC_PROGRESS: + progress = converter.from_payload(item.data, ProgressEvent) + print(f"[progress] {progress.message}") + + result = await race_with_workflow(consume(), handle) + print(f"workflow result: {result}") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/workflow_stream/run_worker.py b/workflow_stream/run_worker.py new file mode 100644 index 00000000..8fef1ab9 --- /dev/null +++ b/workflow_stream/run_worker.py @@ -0,0 +1,27 @@ +from __future__ import annotations + +import asyncio +import logging + +from temporalio.client import Client +from temporalio.worker import Worker + +from workflow_stream.activities.payment_activity import charge_card +from workflow_stream.shared import TASK_QUEUE +from workflow_stream.workflows.order_workflow import OrderWorkflow + + +async def main() -> None: + logging.basicConfig(level=logging.INFO) + client = await Client.connect("localhost:7233") + worker = Worker( + client, + task_queue=TASK_QUEUE, + workflows=[OrderWorkflow], + activities=[charge_card], + ) + await worker.run() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/workflow_stream/shared.py b/workflow_stream/shared.py new file mode 100644 index 00000000..ca1368c1 --- /dev/null +++ b/workflow_stream/shared.py @@ -0,0 +1,87 @@ +from __future__ import annotations + +import asyncio +from collections.abc import Coroutine +from dataclasses import dataclass +from typing import Any, TypeVar + +from temporalio.client import WorkflowHandle +from temporalio.contrib.workflow_stream import WorkflowStreamState + +TASK_QUEUE = "workflow-stream-sample-task-queue" + +# Topics published by the workflow / activity. +TOPIC_STATUS = "status" +TOPIC_PROGRESS = "progress" + + +@dataclass +class OrderInput: + order_id: str + # Carries stream state across continue-as-new. None on a fresh start. + stream_state: WorkflowStreamState | None = None + + +@dataclass +class StatusEvent: + kind: str + order_id: str + + +@dataclass +class ProgressEvent: + message: str + + +T = TypeVar("T") + + +async def race_with_workflow( + consumer: Coroutine[Any, Any, None], + handle: WorkflowHandle[Any, T], +) -> T: + """Run a subscriber concurrently with the workflow. + + If the workflow finishes before the subscriber sees its terminal + event, cancel the subscriber and surface the workflow's result + (raising on failure). If the subscriber finishes first, wait for + the workflow result. A non-cancellation failure in the subscriber + is propagated either way. + + Without this, a workflow that raises before publishing its terminal + event would leave the subscriber blocked on its next poll forever. + """ + consumer_task = asyncio.create_task(consumer) + result_task = asyncio.create_task(handle.result()) + we_cancelled_consumer = False + try: + await asyncio.wait( + [consumer_task, result_task], + return_when=asyncio.FIRST_COMPLETED, + ) + if not consumer_task.done(): + consumer_task.cancel() + we_cancelled_consumer = True + # gather(return_exceptions=True) drains both tasks. Only + # cancellation we initiated is expected — anything else + # propagates. + consumer_outcome, workflow_outcome = await asyncio.gather( + consumer_task, result_task, return_exceptions=True + ) + if isinstance(consumer_outcome, asyncio.CancelledError): + if not we_cancelled_consumer: + raise consumer_outcome + elif isinstance(consumer_outcome, BaseException): + raise consumer_outcome + if isinstance(workflow_outcome, BaseException): + raise workflow_outcome + return workflow_outcome + finally: + for task in (consumer_task, result_task): + if not task.done(): + task.cancel() + for task in (consumer_task, result_task): + try: + await task + except BaseException: + pass diff --git a/workflow_stream/workflows/__init__.py b/workflow_stream/workflows/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/workflow_stream/workflows/order_workflow.py b/workflow_stream/workflows/order_workflow.py new file mode 100644 index 00000000..4b4f4a82 --- /dev/null +++ b/workflow_stream/workflows/order_workflow.py @@ -0,0 +1,60 @@ +from __future__ import annotations + +from datetime import timedelta + +from temporalio import workflow +from temporalio.contrib.workflow_stream import WorkflowStream + +from workflow_stream.shared import ( + TOPIC_PROGRESS, + TOPIC_STATUS, + OrderInput, + ProgressEvent, + StatusEvent, +) + +with workflow.unsafe.imports_passed_through(): + from workflow_stream.activities.payment_activity import charge_card + + +@workflow.defn +class OrderWorkflow: + """Process a fake order, publishing status and progress events. + + The workflow itself publishes status changes; an activity it runs + publishes finer-grained progress events using a + `WorkflowStreamClient`. A single stream carries both topics — + subscribers can filter on the topic(s) they care about. + """ + + @workflow.init + def __init__(self, input: OrderInput) -> None: + # Construct the stream from @workflow.init so it can register + # signal/update/query handlers before the workflow accepts any + # messages. Threading prior_state lets the workflow survive + # continue-as-new without losing buffered items. + self.stream = WorkflowStream(prior_state=input.stream_state) + + @workflow.run + async def run(self, input: OrderInput) -> str: + self.stream.publish( + TOPIC_STATUS, StatusEvent(kind="received", order_id=input.order_id) + ) + + charge_id = await workflow.execute_activity( + charge_card, + input.order_id, + start_to_close_timeout=timedelta(seconds=30), + ) + + self.stream.publish( + TOPIC_STATUS, StatusEvent(kind="shipped", order_id=input.order_id) + ) + self.stream.publish( + TOPIC_PROGRESS, + ProgressEvent(message=f"charge id: {charge_id}"), + ) + self.stream.publish( + TOPIC_STATUS, StatusEvent(kind="complete", order_id=input.order_id) + ) + return charge_id From 6c80c60336aadc544f08696c1581bc336b8bf11a Mon Sep 17 00:00:00 2001 From: Johann Schleier-Smith Date: Wed, 29 Apr 2026 10:07:33 -0700 Subject: [PATCH 2/5] samples: workflow_stream: add reconnecting-subscriber scenario MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds a second scenario demonstrating the central Workflow Streams use case: a consumer disconnects mid-stream and resumes later via subscribe(from_offset=...), with no events lost or duplicated. The existing OrderWorkflow finishes too quickly to make the pattern visible, so this introduces a multi-stage PipelineWorkflow paced with workflow.sleep between stages. The runner reads a couple of events, persists item.offset + 1 to a temp file, sleeps "disconnected" while the workflow keeps publishing, then opens a fresh Client + WorkflowStreamClient and resumes from the persisted offset — the same shape that works across actual process restarts. Co-Authored-By: Claude Opus 4.7 (1M context) --- workflow_stream/README.md | 48 ++++++-- .../run_reconnecting_subscriber.py | 107 ++++++++++++++++++ workflow_stream/run_worker.py | 3 +- workflow_stream/shared.py | 12 ++ .../workflows/pipeline_workflow.py | 43 +++++++ 5 files changed, 205 insertions(+), 8 deletions(-) create mode 100644 workflow_stream/run_reconnecting_subscriber.py create mode 100644 workflow_stream/workflows/pipeline_workflow.py diff --git a/workflow_stream/README.md b/workflow_stream/README.md index 3a57a8d7..dd4fcf49 100644 --- a/workflow_stream/README.md +++ b/workflow_stream/README.md @@ -16,16 +16,31 @@ signals and subscribe via long-poll updates. This packages the boilerplate — batching, offset tracking, topic filtering, continue-as-new hand-off — into a reusable stream. -This directory has a minimal end-to-end example: +This directory has two scenarios sharing one Worker. + +**Scenario 1 — basic publish/subscribe with heterogeneous topics:** * `workflows/order_workflow.py` — a workflow that hosts a `WorkflowStream` and publishes status events as it processes an order. * `activities/payment_activity.py` — an activity that publishes intermediate progress to the stream via `WorkflowStreamClient.from_activity()`. -* `run_worker.py` — registers the workflow and activity. -* `run_publisher.py` — starts the workflow, then prints subscribed - events as they arrive. +* `run_publisher.py` — starts the workflow, subscribes to both topics, + decodes each by `item.topic`, and prints events as they arrive. + +**Scenario 2 — reconnecting subscriber:** + +* `workflows/pipeline_workflow.py` — a multi-stage pipeline that + publishes stage transitions over ~10 seconds, leaving room for a + consumer to disconnect and reconnect mid-run. +* `run_reconnecting_subscriber.py` — connects, reads a couple of + events, persists `item.offset + 1` to disk, "disconnects," then + reopens a fresh client and resumes via `subscribe(from_offset=...)`. + This is the central Workflow Streams use case: a consumer can + disappear (page refresh, server restart, laptop closed) and resume + later without missing events or seeing duplicates. + +`run_worker.py` registers both workflows and the activity. ## Run it @@ -33,12 +48,13 @@ This directory has a minimal end-to-end example: # Terminal 1: worker uv run workflow_stream/run_worker.py -# Terminal 2: starter + subscriber +# Terminal 2: pick a scenario uv run workflow_stream/run_publisher.py +# or +uv run workflow_stream/run_reconnecting_subscriber.py ``` -Expected output on the publisher side, with events streaming in as the -workflow progresses: +Expected output on the basic publisher side: ``` [status] received: order=order-1 @@ -49,3 +65,21 @@ workflow progresses: [status] complete: order=order-1 workflow result: charge-order-1 ``` + +Expected output on the reconnecting subscriber side (note the offsets +are continuous across the disconnect — no events lost, none duplicated): + +``` +[phase 1] connecting and reading first few events + offset= 0 stage=validating + offset= 1 stage=loading data +[phase 1] persisted resume offset=2 -> /tmp/...; disconnecting + +[phase 2] reconnecting and resuming from persisted offset + offset= 2 stage=transforming + offset= 3 stage=writing output + offset= 4 stage=verifying + offset= 5 stage=complete + +workflow result: pipeline workflow-stream-pipeline-... done +``` diff --git a/workflow_stream/run_reconnecting_subscriber.py b/workflow_stream/run_reconnecting_subscriber.py new file mode 100644 index 00000000..3a5eee11 --- /dev/null +++ b/workflow_stream/run_reconnecting_subscriber.py @@ -0,0 +1,107 @@ +"""Reconnecting subscriber: persist offset, disconnect, resume. + +Demonstrates the central Workflow Streams use case: a consumer can +disappear mid-stream — page refresh, server restart, laptop closed — +and resume later without missing events or seeing duplicates. The +event log lives in the Workflow, so the consumer just remembers where +it stopped. + +The script runs the pattern in two phases inside one process to keep +the demo short. The same code shape works across actual process +restarts because the resume offset is persisted to disk between phases. + +Run the worker first (``uv run workflow_stream/run_worker.py``), then:: + + uv run workflow_stream/run_reconnecting_subscriber.py +""" + +from __future__ import annotations + +import asyncio +import tempfile +import uuid +from pathlib import Path + +from temporalio.client import Client +from temporalio.contrib.workflow_stream import WorkflowStreamClient + +from workflow_stream.shared import ( + TASK_QUEUE, + TOPIC_STATUS, + PipelineInput, + StageEvent, +) +from workflow_stream.workflows.pipeline_workflow import PipelineWorkflow + +# Number of events read in phase 1 before simulating a disconnect. +# Picked small enough that the workflow is still running after. +PHASE_1_EVENTS = 2 + + +async def main() -> None: + client = await Client.connect("localhost:7233") + + workflow_id = f"workflow-stream-pipeline-{uuid.uuid4().hex[:8]}" + handle = await client.start_workflow( + PipelineWorkflow.run, + PipelineInput(pipeline_id=workflow_id), + id=workflow_id, + task_queue=TASK_QUEUE, + ) + + # Where the consumer remembers its position. In a real BFF or UI + # backend this would be a database row keyed by (user_id, run_id); + # a temp file keeps the sample self-contained. + offset_path = Path(tempfile.gettempdir()) / f"{workflow_id}.offset" + + # ---- Phase 1: connect, read a couple of events, persist offset, disconnect. + print("[phase 1] connecting and reading first few events") + stream = WorkflowStreamClient.create(client, workflow_id) + seen = 0 + next_offset = 0 + async for item in stream.subscribe([TOPIC_STATUS], result_type=StageEvent): + print(f" offset={item.offset:2d} stage={item.data.stage}") + # Persist *one past* the offset just consumed. On resume we want + # the *next* unseen event, not the one we already showed. + next_offset = item.offset + 1 + offset_path.write_text(str(next_offset)) + seen += 1 + if seen >= PHASE_1_EVENTS: + break + + print( + f"[phase 1] persisted resume offset={next_offset} -> {offset_path}; disconnecting\n" + ) + # The async for loop exits the subscribe() iterator. Any background + # poll Update is cancelled. The workflow keeps running in the + # background, accumulating events into its log. + await asyncio.sleep(3) # let the workflow publish more in our absence + + # ---- Phase 2: reconnect, read persisted offset, resume from there. + print("[phase 2] reconnecting and resuming from persisted offset") + resume_from = int(offset_path.read_text()) + # A brand-new client and stream object — same shape as a different + # process picking up where the first one left off. + client2 = await Client.connect("localhost:7233") + stream2 = WorkflowStreamClient.create(client2, workflow_id) + async for item in stream2.subscribe( + [TOPIC_STATUS], + from_offset=resume_from, + result_type=StageEvent, + ): + print(f" offset={item.offset:2d} stage={item.data.stage}") + # Continue persisting after each event so a second crash here + # would also resume cleanly. + offset_path.write_text(str(item.offset + 1)) + if item.data.stage == "complete": + break + + result = await handle.result() + print(f"\nworkflow result: {result}") + # Clean up the offset file; in a real consumer you'd retain it as + # long as the user might reconnect. + offset_path.unlink(missing_ok=True) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/workflow_stream/run_worker.py b/workflow_stream/run_worker.py index 8fef1ab9..4b9a4ed5 100644 --- a/workflow_stream/run_worker.py +++ b/workflow_stream/run_worker.py @@ -9,6 +9,7 @@ from workflow_stream.activities.payment_activity import charge_card from workflow_stream.shared import TASK_QUEUE from workflow_stream.workflows.order_workflow import OrderWorkflow +from workflow_stream.workflows.pipeline_workflow import PipelineWorkflow async def main() -> None: @@ -17,7 +18,7 @@ async def main() -> None: worker = Worker( client, task_queue=TASK_QUEUE, - workflows=[OrderWorkflow], + workflows=[OrderWorkflow, PipelineWorkflow], activities=[charge_card], ) await worker.run() diff --git a/workflow_stream/shared.py b/workflow_stream/shared.py index ca1368c1..652c8fa5 100644 --- a/workflow_stream/shared.py +++ b/workflow_stream/shared.py @@ -33,6 +33,18 @@ class ProgressEvent: message: str +@dataclass +class PipelineInput: + pipeline_id: str + # Carries stream state across continue-as-new. None on a fresh start. + stream_state: WorkflowStreamState | None = None + + +@dataclass +class StageEvent: + stage: str + + T = TypeVar("T") diff --git a/workflow_stream/workflows/pipeline_workflow.py b/workflow_stream/workflows/pipeline_workflow.py new file mode 100644 index 00000000..5f53c1bf --- /dev/null +++ b/workflow_stream/workflows/pipeline_workflow.py @@ -0,0 +1,43 @@ +from __future__ import annotations + +from datetime import timedelta + +from temporalio import workflow +from temporalio.contrib.workflow_stream import WorkflowStream + +from workflow_stream.shared import ( + TOPIC_STATUS, + PipelineInput, + StageEvent, +) + + +@workflow.defn +class PipelineWorkflow: + """Multi-stage pipeline that publishes stage transitions over time. + + Stages are spaced out with ``workflow.sleep`` so a subscriber can + realistically disconnect partway through and reconnect without the + pipeline finishing in the meantime — the shape needed to demo the + "show up late and still see what happened" pattern. + """ + + @workflow.init + def __init__(self, input: PipelineInput) -> None: + self.stream = WorkflowStream(prior_state=input.stream_state) + + @workflow.run + async def run(self, input: PipelineInput) -> str: + stages = [ + "validating", + "loading data", + "transforming", + "writing output", + "verifying", + "complete", + ] + for stage in stages: + self.stream.publish(TOPIC_STATUS, StageEvent(stage=stage)) + if stage != "complete": + await workflow.sleep(timedelta(seconds=2)) + return f"pipeline {input.pipeline_id} done" From 5ed034fa938d9821a8558265ea9a11acc8cef6e1 Mon Sep 17 00:00:00 2001 From: Johann Schleier-Smith Date: Wed, 29 Apr 2026 10:10:10 -0700 Subject: [PATCH 3/5] samples: workflow_stream: add external-publisher scenario MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds a third scenario covering the third publisher shape: a backend service or scheduled job pushing events into a workflow it didn't itself start. The earlier scenarios publish either from inside the workflow or from one of its activities; this one uses WorkflowStreamClient.create() externally. HubWorkflow is a passive stream host — it does no work of its own and just waits to be told to close, fitting the event-bus pattern. The runner publishes a series of news headlines, runs a subscriber task alongside, signals close, and exits when both tasks complete. Co-Authored-By: Claude Opus 4.7 (1M context) --- workflow_stream/README.md | 18 ++++- workflow_stream/run_external_publisher.py | 91 +++++++++++++++++++++++ workflow_stream/run_worker.py | 3 +- workflow_stream/shared.py | 13 ++++ workflow_stream/workflows/hub_workflow.py | 36 +++++++++ 5 files changed, 159 insertions(+), 2 deletions(-) create mode 100644 workflow_stream/run_external_publisher.py create mode 100644 workflow_stream/workflows/hub_workflow.py diff --git a/workflow_stream/README.md b/workflow_stream/README.md index dd4fcf49..7b43ce08 100644 --- a/workflow_stream/README.md +++ b/workflow_stream/README.md @@ -40,7 +40,21 @@ This directory has two scenarios sharing one Worker. disappear (page refresh, server restart, laptop closed) and resume later without missing events or seeing duplicates. -`run_worker.py` registers both workflows and the activity. +**Scenario 3 — external (non-Activity) publisher:** + +* `workflows/hub_workflow.py` — a passive workflow that does no work + of its own; it exists only to host a `WorkflowStream` and shut down + when signaled. +* `run_external_publisher.py` — starts the hub, then publishes events + into it from a plain Python coroutine using + `WorkflowStreamClient.create(client, workflow_id)`. A subscriber + task runs alongside; when the publisher is done it signals + `HubWorkflow.close`, the workflow's run finishes, and the + subscriber's iterator exits normally. This is the shape that fits a + backend service or scheduled job pushing events into a workflow it + didn't itself start. + +`run_worker.py` registers all three workflows and the activity. ## Run it @@ -52,6 +66,8 @@ uv run workflow_stream/run_worker.py uv run workflow_stream/run_publisher.py # or uv run workflow_stream/run_reconnecting_subscriber.py +# or +uv run workflow_stream/run_external_publisher.py ``` Expected output on the basic publisher side: diff --git a/workflow_stream/run_external_publisher.py b/workflow_stream/run_external_publisher.py new file mode 100644 index 00000000..5ef7e27e --- /dev/null +++ b/workflow_stream/run_external_publisher.py @@ -0,0 +1,91 @@ +"""External publisher: a non-Activity process pushes events into a workflow. + +The two earlier scenarios publish from inside the workflow itself +(``OrderWorkflow``, ``PipelineWorkflow``) or from an Activity it runs +(``charge_card``). This scenario shows the third shape: a backend +service, scheduled job, or anything else with a Temporal ``Client`` +publishing into a *running* workflow it didn't start. Same factory as +the subscribe path — :py:meth:`WorkflowStreamClient.create` — used for +publishing instead. + +The script starts a ``HubWorkflow`` (which does no work of its own — +it exists only to host the stream), then runs a publisher and a +subscriber concurrently. When the publisher is done it signals +``HubWorkflow.close``, the workflow's run finishes, and the +subscriber's iterator exits normally. + +Run the worker first (``uv run workflow_stream/run_worker.py``), then:: + + uv run workflow_stream/run_external_publisher.py +""" + +from __future__ import annotations + +import asyncio +import uuid + +from temporalio.client import Client +from temporalio.contrib.workflow_stream import WorkflowStreamClient + +from workflow_stream.shared import ( + TASK_QUEUE, + TOPIC_NEWS, + HubInput, + NewsEvent, +) +from workflow_stream.workflows.hub_workflow import HubWorkflow + + +HEADLINES = [ + "rates held", + "merger announced", + "outage resolved", + "earnings beat", + "regulator opens probe", +] + + +async def main() -> None: + client = await Client.connect("localhost:7233") + + workflow_id = f"workflow-stream-hub-{uuid.uuid4().hex[:8]}" + handle = await client.start_workflow( + HubWorkflow.run, + HubInput(hub_id=workflow_id), + id=workflow_id, + task_queue=TASK_QUEUE, + ) + + async def publish_news() -> None: + # WorkflowStreamClient.create takes a Temporal client and a + # workflow id — the same factory used elsewhere for subscribing. + # The async context manager batches publishes and flushes on + # exit; we additionally call flush() before signaling close so + # we know the events landed before the workflow shuts down. + producer = WorkflowStreamClient.create(client, workflow_id) + async with producer: + for headline in HEADLINES: + producer.publish(TOPIC_NEWS, NewsEvent(headline=headline)) + print(f"[publisher] sent: {headline}") + await asyncio.sleep(0.5) + await producer.flush() + # Tell the hub it can stop. The workflow's run() returns, and + # any in-flight subscribers see their async-for loop exit. + await handle.signal(HubWorkflow.close) + print("[publisher] signaled close") + + async def consume_news() -> None: + consumer = WorkflowStreamClient.create(client, workflow_id) + async for item in consumer.subscribe( + [TOPIC_NEWS], result_type=NewsEvent + ): + print(f"[subscriber] offset={item.offset}: {item.data.headline}") + + await asyncio.gather(publish_news(), consume_news()) + + result = await handle.result() + print(f"\nworkflow result: {result}") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/workflow_stream/run_worker.py b/workflow_stream/run_worker.py index 4b9a4ed5..b118c22f 100644 --- a/workflow_stream/run_worker.py +++ b/workflow_stream/run_worker.py @@ -8,6 +8,7 @@ from workflow_stream.activities.payment_activity import charge_card from workflow_stream.shared import TASK_QUEUE +from workflow_stream.workflows.hub_workflow import HubWorkflow from workflow_stream.workflows.order_workflow import OrderWorkflow from workflow_stream.workflows.pipeline_workflow import PipelineWorkflow @@ -18,7 +19,7 @@ async def main() -> None: worker = Worker( client, task_queue=TASK_QUEUE, - workflows=[OrderWorkflow, PipelineWorkflow], + workflows=[HubWorkflow, OrderWorkflow, PipelineWorkflow], activities=[charge_card], ) await worker.run() diff --git a/workflow_stream/shared.py b/workflow_stream/shared.py index 652c8fa5..42e94015 100644 --- a/workflow_stream/shared.py +++ b/workflow_stream/shared.py @@ -13,6 +13,7 @@ # Topics published by the workflow / activity. TOPIC_STATUS = "status" TOPIC_PROGRESS = "progress" +TOPIC_NEWS = "news" @dataclass @@ -45,6 +46,18 @@ class StageEvent: stage: str +@dataclass +class HubInput: + hub_id: str + # Carries stream state across continue-as-new. None on a fresh start. + stream_state: WorkflowStreamState | None = None + + +@dataclass +class NewsEvent: + headline: str + + T = TypeVar("T") diff --git a/workflow_stream/workflows/hub_workflow.py b/workflow_stream/workflows/hub_workflow.py new file mode 100644 index 00000000..eb686963 --- /dev/null +++ b/workflow_stream/workflows/hub_workflow.py @@ -0,0 +1,36 @@ +from __future__ import annotations + +from temporalio import workflow +from temporalio.contrib.workflow_stream import WorkflowStream + +from workflow_stream.shared import HubInput + + +@workflow.defn +class HubWorkflow: + """Passive stream host: starts up, waits, closes when told. + + Unlike OrderWorkflow or PipelineWorkflow, this workflow does no + work of its own — it exists only to host a ``WorkflowStream`` that + external publishers push events into and external subscribers read + from. The shape that fits a backend service or "event bus" pattern, + where the workflow owns durable state but the events come from + outside. + """ + + @workflow.init + def __init__(self, input: HubInput) -> None: + self.stream = WorkflowStream(prior_state=input.stream_state) + self._closed = False + + @workflow.run + async def run(self, input: HubInput) -> str: + await workflow.wait_condition(lambda: self._closed) + return f"hub {input.hub_id} closed" + + @workflow.signal + def close(self) -> None: + # Custom signal handler that does not read stream state, so the + # synchronous-handler race documented in the README does not + # apply. + self._closed = True From 9e751d16e6c7e1f653b4d8fbaaa2291f613069f7 Mon Sep 17 00:00:00 2001 From: Johann Schleier-Smith Date: Wed, 29 Apr 2026 10:12:56 -0700 Subject: [PATCH 4/5] samples: workflow_stream: add truncating-ticker scenario MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds a fourth scenario for long-running workflows that need to bound their event log: the workflow publishes events at a fixed cadence and calls self.stream.truncate(...) periodically to keep only the most recent entries. The runner subscribes twice — fast and slow — to make the trade visible: the fast subscriber sees every offset in order; the slow one falls behind a truncation, has its iterator transparently jump forward to the new base offset, and shows the offset gap that intermediate events fell into. This is the model for high-volume long-running streams: bounded log size, slow consumers may miss intermediate events but always see the most recent state. Co-Authored-By: Claude Opus 4.7 (1M context) --- workflow_stream/README.md | 17 +++- workflow_stream/run_truncating_ticker.py | 83 ++++++++++++++++++++ workflow_stream/run_worker.py | 3 +- workflow_stream/shared.py | 16 ++++ workflow_stream/workflows/ticker_workflow.py | 59 ++++++++++++++ 5 files changed, 176 insertions(+), 2 deletions(-) create mode 100644 workflow_stream/run_truncating_ticker.py create mode 100644 workflow_stream/workflows/ticker_workflow.py diff --git a/workflow_stream/README.md b/workflow_stream/README.md index 7b43ce08..a452e24b 100644 --- a/workflow_stream/README.md +++ b/workflow_stream/README.md @@ -54,7 +54,20 @@ This directory has two scenarios sharing one Worker. backend service or scheduled job pushing events into a workflow it didn't itself start. -`run_worker.py` registers all three workflows and the activity. +**Scenario 4 — bounded log via `truncate()`:** + +* `workflows/ticker_workflow.py` — a long-running workflow that + publishes events at a fixed cadence and calls + `self.stream.truncate(...)` periodically to bound log growth, keeping + only the most recent N entries. +* `run_truncating_ticker.py` — runs a fast subscriber and a slow + subscriber side by side. The fast one keeps up and sees every offset + in order; the slow one sleeps between iterations, falls behind a + truncation, and silently jumps forward to the new base offset. The + output makes the trade visible: bounded log size in exchange for + intermediate events being invisible to slow consumers. + +`run_worker.py` registers all four workflows and the activity. ## Run it @@ -68,6 +81,8 @@ uv run workflow_stream/run_publisher.py uv run workflow_stream/run_reconnecting_subscriber.py # or uv run workflow_stream/run_external_publisher.py +# or +uv run workflow_stream/run_truncating_ticker.py ``` Expected output on the basic publisher side: diff --git a/workflow_stream/run_truncating_ticker.py b/workflow_stream/run_truncating_ticker.py new file mode 100644 index 00000000..069ab4e4 --- /dev/null +++ b/workflow_stream/run_truncating_ticker.py @@ -0,0 +1,83 @@ +"""Truncating ticker: bounded log + slow vs. fast subscribers. + +The ``TickerWorkflow`` publishes ``count`` events at a fixed interval, +calling ``self.stream.truncate(...)`` periodically to bound log +growth. This script subscribes twice — once fast, once slow — and +prints both side-by-side so the trade is visible: + +* The fast subscriber keeps up and sees every published offset in + order. +* The slow subscriber sleeps between iterations. When a truncation + runs past its position, the iterator silently jumps forward to the + new base offset — the slow subscriber's offsets jump too, and + intermediate events are not visible to it. + +This is the bounded-log model: log size is capped, slow consumers may +miss intermediate events, but they always see the most recent state. +For long-running workflows pushing high event volumes this is usually +the right trade — pair with set-semantic events where each event +carries enough state to make missing the prior ones recoverable. + +Run the worker first (``uv run workflow_stream/run_worker.py``), then:: + + uv run workflow_stream/run_truncating_ticker.py +""" + +from __future__ import annotations + +import asyncio +import uuid + +from temporalio.client import Client +from temporalio.contrib.workflow_stream import WorkflowStreamClient + +from workflow_stream.shared import ( + TASK_QUEUE, + TOPIC_TICK, + TickerInput, + TickEvent, +) +from workflow_stream.workflows.ticker_workflow import TickerWorkflow + + +SLOW_SUBSCRIBER_DELAY_S = 1.5 + + +async def main() -> None: + client = await Client.connect("localhost:7233") + + workflow_id = f"workflow-stream-ticker-{uuid.uuid4().hex[:8]}" + handle = await client.start_workflow( + TickerWorkflow.run, + TickerInput( + count=20, + keep_last=3, + truncate_every=5, + interval_ms=400, + ), + id=workflow_id, + task_queue=TASK_QUEUE, + ) + + stream = WorkflowStreamClient.create(client, workflow_id) + + async def fast_subscriber() -> None: + async for item in stream.subscribe([TOPIC_TICK], result_type=TickEvent): + print(f"[fast] offset={item.offset:3d} n={item.data.n}") + + async def slow_subscriber() -> None: + async for item in stream.subscribe([TOPIC_TICK], result_type=TickEvent): + print(f"[SLOW] offset={item.offset:3d} n={item.data.n}") + await asyncio.sleep(SLOW_SUBSCRIBER_DELAY_S) + + # Both iterators exit normally when the workflow completes. No + # terminal sentinel is needed — see the doc's "When the Workflow + # run completes" note. + await asyncio.gather(fast_subscriber(), slow_subscriber()) + + result = await handle.result() + print(f"\nworkflow result: {result}") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/workflow_stream/run_worker.py b/workflow_stream/run_worker.py index b118c22f..9e21982e 100644 --- a/workflow_stream/run_worker.py +++ b/workflow_stream/run_worker.py @@ -11,6 +11,7 @@ from workflow_stream.workflows.hub_workflow import HubWorkflow from workflow_stream.workflows.order_workflow import OrderWorkflow from workflow_stream.workflows.pipeline_workflow import PipelineWorkflow +from workflow_stream.workflows.ticker_workflow import TickerWorkflow async def main() -> None: @@ -19,7 +20,7 @@ async def main() -> None: worker = Worker( client, task_queue=TASK_QUEUE, - workflows=[HubWorkflow, OrderWorkflow, PipelineWorkflow], + workflows=[HubWorkflow, OrderWorkflow, PipelineWorkflow, TickerWorkflow], activities=[charge_card], ) await worker.run() diff --git a/workflow_stream/shared.py b/workflow_stream/shared.py index 42e94015..fd97a6a8 100644 --- a/workflow_stream/shared.py +++ b/workflow_stream/shared.py @@ -14,6 +14,7 @@ TOPIC_STATUS = "status" TOPIC_PROGRESS = "progress" TOPIC_NEWS = "news" +TOPIC_TICK = "tick" @dataclass @@ -58,6 +59,21 @@ class NewsEvent: headline: str +@dataclass +class TickerInput: + count: int = 20 + keep_last: int = 3 + truncate_every: int = 5 + interval_ms: int = 400 + # Carries stream state across continue-as-new. None on a fresh start. + stream_state: WorkflowStreamState | None = None + + +@dataclass +class TickEvent: + n: int + + T = TypeVar("T") diff --git a/workflow_stream/workflows/ticker_workflow.py b/workflow_stream/workflows/ticker_workflow.py new file mode 100644 index 00000000..61f895a2 --- /dev/null +++ b/workflow_stream/workflows/ticker_workflow.py @@ -0,0 +1,59 @@ +from __future__ import annotations + +from datetime import timedelta + +from temporalio import workflow +from temporalio.contrib.workflow_stream import WorkflowStream + +from workflow_stream.shared import ( + TOPIC_TICK, + TickEvent, + TickerInput, +) + + +@workflow.defn +class TickerWorkflow: + """Long-running ticker that bounds its event log via ``truncate``. + + Long-running workflows that publish high volumes of events would + otherwise grow their event log unboundedly. This workflow shows + the truncation pattern: every ``truncate_every`` events, drop + everything except the last ``keep_last`` entries by calling + ``self.stream.truncate(safe_offset)``. + + Subscribers that fall behind a truncation jump forward to the new + base offset transparently (the iterator handles the + ``TruncatedOffset`` error internally), so consumers stay live but + may not see every intermediate event. That is the trade: bounded + log size in exchange for at-best-effort delivery to slow + consumers. + + To compute the truncation offset the workflow tracks its own + published count. ``WorkflowStream`` does not expose a workflow-side + head-offset accessor, but the running count plus the carried + ``base_offset`` (in continue-as-new chains) is sufficient. + """ + + @workflow.init + def __init__(self, input: TickerInput) -> None: + self.stream = WorkflowStream(prior_state=input.stream_state) + # Running count of events published by THIS run. To compute a + # global offset, add the prior_state's base_offset (omitted + # here — this sample doesn't continue-as-new). + self._published = 0 + + @workflow.run + async def run(self, input: TickerInput) -> str: + for n in range(input.count): + self.stream.publish(TOPIC_TICK, TickEvent(n=n)) + self._published += 1 + await workflow.sleep(timedelta(milliseconds=input.interval_ms)) + if ( + self._published % input.truncate_every == 0 + and self._published > input.keep_last + ): + # Drop everything except the last `keep_last` entries. + truncate_to = self._published - input.keep_last + self.stream.truncate(truncate_to) + return f"ticker emitted {self._published} events" From 438d9c1fd0a709919b639b39f4714c6641e48ae6 Mon Sep 17 00:00:00 2001 From: Johann Schleier-Smith Date: Wed, 29 Apr 2026 17:29:13 -0700 Subject: [PATCH 5/5] =?UTF-8?q?samples:=20rename=20workflow=5Fstream=20?= =?UTF-8?q?=E2=86=92=20workflow=5Fstreams;=20migrate=20to=20topic=20handle?= =?UTF-8?q?s?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Directory and module path renamed to plural to match sdk-python `temporalio.contrib.workflow_streams` rename. - Workflow-side: bind a typed topic handle in `@workflow.init` and call `topic.publish(value)` — the removed `WorkflowStream.publish` form is gone. Same change applied to the activity and external-publisher. - Activity: `WorkflowStreamClient.from_activity()` → `from_within_activity()`. Co-Authored-By: Claude Opus 4.7 (1M context) --- README.md | 2 +- .../activities/payment_activity.py | 41 ------------------- .../README.md | 16 ++++---- .../__init__.py | 0 .../activities/__init__.py | 0 .../activities/payment_activity.py | 41 +++++++++++++++++++ .../run_external_publisher.py | 13 +++--- .../run_publisher.py | 6 +-- .../run_reconnecting_subscriber.py | 10 ++--- .../run_truncating_ticker.py | 10 ++--- .../run_worker.py | 12 +++--- .../shared.py | 2 +- .../workflows/__init__.py | 0 .../workflows/hub_workflow.py | 4 +- .../workflows/order_workflow.py | 25 ++++------- .../workflows/pipeline_workflow.py | 7 ++-- .../workflows/ticker_workflow.py | 7 ++-- 17 files changed, 96 insertions(+), 100 deletions(-) delete mode 100644 workflow_stream/activities/payment_activity.py rename {workflow_stream => workflow_streams}/README.md (91%) rename {workflow_stream => workflow_streams}/__init__.py (100%) rename {workflow_stream => workflow_streams}/activities/__init__.py (100%) create mode 100644 workflow_streams/activities/payment_activity.py rename {workflow_stream => workflow_streams}/run_external_publisher.py (86%) rename {workflow_stream => workflow_streams}/run_publisher.py (90%) rename {workflow_stream => workflow_streams}/run_reconnecting_subscriber.py (92%) rename {workflow_stream => workflow_streams}/run_truncating_ticker.py (89%) rename {workflow_stream => workflow_streams}/run_worker.py (57%) rename {workflow_stream => workflow_streams}/shared.py (98%) rename {workflow_stream => workflow_streams}/workflows/__init__.py (100%) rename {workflow_stream => workflow_streams}/workflows/hub_workflow.py (91%) rename {workflow_stream => workflow_streams}/workflows/order_workflow.py (66%) rename {workflow_stream => workflow_streams}/workflows/pipeline_workflow.py (83%) rename {workflow_stream => workflow_streams}/workflows/ticker_workflow.py (91%) diff --git a/README.md b/README.md index 5a5c937f..80cda649 100644 --- a/README.md +++ b/README.md @@ -79,7 +79,7 @@ Some examples require extra dependencies. See each sample's directory for specif * [patching](patching) - Alter workflows safely with `patch` and `deprecate_patch`. * [polling](polling) - Recommended implementation of an activity that needs to periodically poll an external resource waiting its successful completion. * [prometheus](prometheus) - Configure Prometheus metrics on clients/workers. -* [workflow_stream](workflow_stream) - Workflow-hosted durable event stream via `temporalio.contrib.workflow_stream`. **Experimental — requires the [`contrib/pubsub` branch](https://github.com/temporalio/sdk-python/tree/contrib/pubsub) of sdk-python.** +* [workflow_streams](workflow_streams) - Workflow-hosted durable event stream via `temporalio.contrib.workflow_streams`. **Experimental — requires the [`contrib/pubsub` branch](https://github.com/temporalio/sdk-python/tree/contrib/pubsub) of sdk-python.** * [pydantic_converter](pydantic_converter) - Data converter for using Pydantic models. * [schedules](schedules) - Demonstrates a Workflow Execution that occurs according to a schedule. * [sentry](sentry) - Report errors to Sentry. diff --git a/workflow_stream/activities/payment_activity.py b/workflow_stream/activities/payment_activity.py deleted file mode 100644 index f69f8c8d..00000000 --- a/workflow_stream/activities/payment_activity.py +++ /dev/null @@ -1,41 +0,0 @@ -from __future__ import annotations - -import asyncio -from datetime import timedelta - -from temporalio import activity -from temporalio.contrib.workflow_stream import WorkflowStreamClient - -from workflow_stream.shared import TOPIC_PROGRESS, ProgressEvent - - -@activity.defn -async def charge_card(order_id: str) -> str: - """Pretend to charge a card, publishing progress to the parent workflow. - - `WorkflowStreamClient.from_activity()` reads the parent workflow id - and the Temporal client from the activity context, so this activity - can push events back without any wiring. - - Caveat: each call to ``from_activity()`` creates a fresh client with - a random ``publisher_id``, so dedup does not protect against an - activity retry republishing the same events. For activities that - must be exactly-once on the stream side, derive a stable - ``publisher_id`` from ``activity.info().activity_id`` (this is - invariant across attempts of the same scheduled activity). The - current ``WorkflowStreamClient`` API does not yet expose - ``publisher_id`` on its constructors; this sample accepts - at-most-once-per-attempt semantics. - """ - client = WorkflowStreamClient.from_activity( - batch_interval=timedelta(milliseconds=200) - ) - async with client: - client.publish(TOPIC_PROGRESS, ProgressEvent(message="charging card...")) - await asyncio.sleep(1.0) - client.publish( - TOPIC_PROGRESS, - ProgressEvent(message="card charged"), - force_flush=True, - ) - return f"charge-{order_id}" diff --git a/workflow_stream/README.md b/workflow_streams/README.md similarity index 91% rename from workflow_stream/README.md rename to workflow_streams/README.md index a452e24b..1d6f167c 100644 --- a/workflow_stream/README.md +++ b/workflow_streams/README.md @@ -1,7 +1,7 @@ # Workflow Streams > **Experimental.** These samples target the -> `temporalio.contrib.workflow_stream` module on the +> `temporalio.contrib.workflow_streams` module on the > [`contrib/pubsub` branch of sdk-python][branch], which is not yet > released. To run them locally, install sdk-python from that branch > (e.g. `uv pip install -e ` after checking out the @@ -9,7 +9,7 @@ [branch]: https://github.com/temporalio/sdk-python/tree/contrib/pubsub -`temporalio.contrib.workflow_stream` lets a workflow host a durable, +`temporalio.contrib.workflow_streams` lets a workflow host a durable, offset-addressed event channel. The workflow holds an append-only log; external clients (activities, starters, BFFs) publish to topics via signals and subscribe via long-poll updates. This packages the @@ -24,7 +24,7 @@ This directory has two scenarios sharing one Worker. `WorkflowStream` and publishes status events as it processes an order. * `activities/payment_activity.py` — an activity that publishes intermediate progress to the stream via - `WorkflowStreamClient.from_activity()`. + `WorkflowStreamClient.from_within_activity()`. * `run_publisher.py` — starts the workflow, subscribes to both topics, decodes each by `item.topic`, and prints events as they arrive. @@ -73,16 +73,16 @@ This directory has two scenarios sharing one Worker. ```bash # Terminal 1: worker -uv run workflow_stream/run_worker.py +uv run workflow_streams/run_worker.py # Terminal 2: pick a scenario -uv run workflow_stream/run_publisher.py +uv run workflow_streams/run_publisher.py # or -uv run workflow_stream/run_reconnecting_subscriber.py +uv run workflow_streams/run_reconnecting_subscriber.py # or -uv run workflow_stream/run_external_publisher.py +uv run workflow_streams/run_external_publisher.py # or -uv run workflow_stream/run_truncating_ticker.py +uv run workflow_streams/run_truncating_ticker.py ``` Expected output on the basic publisher side: diff --git a/workflow_stream/__init__.py b/workflow_streams/__init__.py similarity index 100% rename from workflow_stream/__init__.py rename to workflow_streams/__init__.py diff --git a/workflow_stream/activities/__init__.py b/workflow_streams/activities/__init__.py similarity index 100% rename from workflow_stream/activities/__init__.py rename to workflow_streams/activities/__init__.py diff --git a/workflow_streams/activities/payment_activity.py b/workflow_streams/activities/payment_activity.py new file mode 100644 index 00000000..d94a071b --- /dev/null +++ b/workflow_streams/activities/payment_activity.py @@ -0,0 +1,41 @@ +from __future__ import annotations + +import asyncio +from datetime import timedelta + +from temporalio import activity +from temporalio.contrib.workflow_streams import WorkflowStreamClient + +from workflow_streams.shared import TOPIC_PROGRESS, ProgressEvent + + +@activity.defn +async def charge_card(order_id: str) -> str: + """Pretend to charge a card, publishing progress to the parent workflow. + + `WorkflowStreamClient.from_within_activity()` reads the parent + workflow id and the Temporal client from the activity context, so + this activity can push events back without any wiring. + + Caveat: each call to ``from_within_activity()`` creates a fresh + client with a random ``publisher_id``, so dedup does not protect + against an activity retry republishing the same events. For + activities that must be exactly-once on the stream side, derive a + stable ``publisher_id`` from ``activity.info().activity_id`` (this + is invariant across attempts of the same scheduled activity). The + current ``WorkflowStreamClient`` API does not yet expose + ``publisher_id`` on its constructors; this sample accepts + at-most-once-per-attempt semantics. + """ + client = WorkflowStreamClient.from_within_activity( + batch_interval=timedelta(milliseconds=200) + ) + async with client: + progress = client.topic(TOPIC_PROGRESS, type=ProgressEvent) + progress.publish(ProgressEvent(message="charging card...")) + await asyncio.sleep(1.0) + progress.publish( + ProgressEvent(message="card charged"), + force_flush=True, + ) + return f"charge-{order_id}" diff --git a/workflow_stream/run_external_publisher.py b/workflow_streams/run_external_publisher.py similarity index 86% rename from workflow_stream/run_external_publisher.py rename to workflow_streams/run_external_publisher.py index 5ef7e27e..1663ed31 100644 --- a/workflow_stream/run_external_publisher.py +++ b/workflow_streams/run_external_publisher.py @@ -14,9 +14,9 @@ ``HubWorkflow.close``, the workflow's run finishes, and the subscriber's iterator exits normally. -Run the worker first (``uv run workflow_stream/run_worker.py``), then:: +Run the worker first (``uv run workflow_streams/run_worker.py``), then:: - uv run workflow_stream/run_external_publisher.py + uv run workflow_streams/run_external_publisher.py """ from __future__ import annotations @@ -25,15 +25,15 @@ import uuid from temporalio.client import Client -from temporalio.contrib.workflow_stream import WorkflowStreamClient +from temporalio.contrib.workflow_streams import WorkflowStreamClient -from workflow_stream.shared import ( +from workflow_streams.shared import ( TASK_QUEUE, TOPIC_NEWS, HubInput, NewsEvent, ) -from workflow_stream.workflows.hub_workflow import HubWorkflow +from workflow_streams.workflows.hub_workflow import HubWorkflow HEADLINES = [ @@ -64,8 +64,9 @@ async def publish_news() -> None: # we know the events landed before the workflow shuts down. producer = WorkflowStreamClient.create(client, workflow_id) async with producer: + news = producer.topic(TOPIC_NEWS, type=NewsEvent) for headline in HEADLINES: - producer.publish(TOPIC_NEWS, NewsEvent(headline=headline)) + news.publish(NewsEvent(headline=headline)) print(f"[publisher] sent: {headline}") await asyncio.sleep(0.5) await producer.flush() diff --git a/workflow_stream/run_publisher.py b/workflow_streams/run_publisher.py similarity index 90% rename from workflow_stream/run_publisher.py rename to workflow_streams/run_publisher.py index cdfb210d..85c967ee 100644 --- a/workflow_stream/run_publisher.py +++ b/workflow_streams/run_publisher.py @@ -5,9 +5,9 @@ from temporalio.api.common.v1 import Payload from temporalio.client import Client -from temporalio.contrib.workflow_stream import WorkflowStreamClient +from temporalio.contrib.workflow_streams import WorkflowStreamClient -from workflow_stream.shared import ( +from workflow_streams.shared import ( TASK_QUEUE, TOPIC_PROGRESS, TOPIC_STATUS, @@ -16,7 +16,7 @@ StatusEvent, race_with_workflow, ) -from workflow_stream.workflows.order_workflow import OrderWorkflow +from workflow_streams.workflows.order_workflow import OrderWorkflow async def main() -> None: diff --git a/workflow_stream/run_reconnecting_subscriber.py b/workflow_streams/run_reconnecting_subscriber.py similarity index 92% rename from workflow_stream/run_reconnecting_subscriber.py rename to workflow_streams/run_reconnecting_subscriber.py index 3a5eee11..3aae76c6 100644 --- a/workflow_stream/run_reconnecting_subscriber.py +++ b/workflow_streams/run_reconnecting_subscriber.py @@ -10,9 +10,9 @@ the demo short. The same code shape works across actual process restarts because the resume offset is persisted to disk between phases. -Run the worker first (``uv run workflow_stream/run_worker.py``), then:: +Run the worker first (``uv run workflow_streams/run_worker.py``), then:: - uv run workflow_stream/run_reconnecting_subscriber.py + uv run workflow_streams/run_reconnecting_subscriber.py """ from __future__ import annotations @@ -23,15 +23,15 @@ from pathlib import Path from temporalio.client import Client -from temporalio.contrib.workflow_stream import WorkflowStreamClient +from temporalio.contrib.workflow_streams import WorkflowStreamClient -from workflow_stream.shared import ( +from workflow_streams.shared import ( TASK_QUEUE, TOPIC_STATUS, PipelineInput, StageEvent, ) -from workflow_stream.workflows.pipeline_workflow import PipelineWorkflow +from workflow_streams.workflows.pipeline_workflow import PipelineWorkflow # Number of events read in phase 1 before simulating a disconnect. # Picked small enough that the workflow is still running after. diff --git a/workflow_stream/run_truncating_ticker.py b/workflow_streams/run_truncating_ticker.py similarity index 89% rename from workflow_stream/run_truncating_ticker.py rename to workflow_streams/run_truncating_ticker.py index 069ab4e4..50876a0d 100644 --- a/workflow_stream/run_truncating_ticker.py +++ b/workflow_streams/run_truncating_ticker.py @@ -18,9 +18,9 @@ the right trade — pair with set-semantic events where each event carries enough state to make missing the prior ones recoverable. -Run the worker first (``uv run workflow_stream/run_worker.py``), then:: +Run the worker first (``uv run workflow_streams/run_worker.py``), then:: - uv run workflow_stream/run_truncating_ticker.py + uv run workflow_streams/run_truncating_ticker.py """ from __future__ import annotations @@ -29,15 +29,15 @@ import uuid from temporalio.client import Client -from temporalio.contrib.workflow_stream import WorkflowStreamClient +from temporalio.contrib.workflow_streams import WorkflowStreamClient -from workflow_stream.shared import ( +from workflow_streams.shared import ( TASK_QUEUE, TOPIC_TICK, TickerInput, TickEvent, ) -from workflow_stream.workflows.ticker_workflow import TickerWorkflow +from workflow_streams.workflows.ticker_workflow import TickerWorkflow SLOW_SUBSCRIBER_DELAY_S = 1.5 diff --git a/workflow_stream/run_worker.py b/workflow_streams/run_worker.py similarity index 57% rename from workflow_stream/run_worker.py rename to workflow_streams/run_worker.py index 9e21982e..8aa12edc 100644 --- a/workflow_stream/run_worker.py +++ b/workflow_streams/run_worker.py @@ -6,12 +6,12 @@ from temporalio.client import Client from temporalio.worker import Worker -from workflow_stream.activities.payment_activity import charge_card -from workflow_stream.shared import TASK_QUEUE -from workflow_stream.workflows.hub_workflow import HubWorkflow -from workflow_stream.workflows.order_workflow import OrderWorkflow -from workflow_stream.workflows.pipeline_workflow import PipelineWorkflow -from workflow_stream.workflows.ticker_workflow import TickerWorkflow +from workflow_streams.activities.payment_activity import charge_card +from workflow_streams.shared import TASK_QUEUE +from workflow_streams.workflows.hub_workflow import HubWorkflow +from workflow_streams.workflows.order_workflow import OrderWorkflow +from workflow_streams.workflows.pipeline_workflow import PipelineWorkflow +from workflow_streams.workflows.ticker_workflow import TickerWorkflow async def main() -> None: diff --git a/workflow_stream/shared.py b/workflow_streams/shared.py similarity index 98% rename from workflow_stream/shared.py rename to workflow_streams/shared.py index fd97a6a8..746ee73d 100644 --- a/workflow_stream/shared.py +++ b/workflow_streams/shared.py @@ -6,7 +6,7 @@ from typing import Any, TypeVar from temporalio.client import WorkflowHandle -from temporalio.contrib.workflow_stream import WorkflowStreamState +from temporalio.contrib.workflow_streams import WorkflowStreamState TASK_QUEUE = "workflow-stream-sample-task-queue" diff --git a/workflow_stream/workflows/__init__.py b/workflow_streams/workflows/__init__.py similarity index 100% rename from workflow_stream/workflows/__init__.py rename to workflow_streams/workflows/__init__.py diff --git a/workflow_stream/workflows/hub_workflow.py b/workflow_streams/workflows/hub_workflow.py similarity index 91% rename from workflow_stream/workflows/hub_workflow.py rename to workflow_streams/workflows/hub_workflow.py index eb686963..fdf7da56 100644 --- a/workflow_stream/workflows/hub_workflow.py +++ b/workflow_streams/workflows/hub_workflow.py @@ -1,9 +1,9 @@ from __future__ import annotations from temporalio import workflow -from temporalio.contrib.workflow_stream import WorkflowStream +from temporalio.contrib.workflow_streams import WorkflowStream -from workflow_stream.shared import HubInput +from workflow_streams.shared import HubInput @workflow.defn diff --git a/workflow_stream/workflows/order_workflow.py b/workflow_streams/workflows/order_workflow.py similarity index 66% rename from workflow_stream/workflows/order_workflow.py rename to workflow_streams/workflows/order_workflow.py index 4b4f4a82..8b944508 100644 --- a/workflow_stream/workflows/order_workflow.py +++ b/workflow_streams/workflows/order_workflow.py @@ -3,9 +3,9 @@ from datetime import timedelta from temporalio import workflow -from temporalio.contrib.workflow_stream import WorkflowStream +from temporalio.contrib.workflow_streams import WorkflowStream -from workflow_stream.shared import ( +from workflow_streams.shared import ( TOPIC_PROGRESS, TOPIC_STATUS, OrderInput, @@ -14,7 +14,7 @@ ) with workflow.unsafe.imports_passed_through(): - from workflow_stream.activities.payment_activity import charge_card + from workflow_streams.activities.payment_activity import charge_card @workflow.defn @@ -34,12 +34,12 @@ def __init__(self, input: OrderInput) -> None: # messages. Threading prior_state lets the workflow survive # continue-as-new without losing buffered items. self.stream = WorkflowStream(prior_state=input.stream_state) + self.status = self.stream.topic(TOPIC_STATUS, type=StatusEvent) + self.progress = self.stream.topic(TOPIC_PROGRESS, type=ProgressEvent) @workflow.run async def run(self, input: OrderInput) -> str: - self.stream.publish( - TOPIC_STATUS, StatusEvent(kind="received", order_id=input.order_id) - ) + self.status.publish(StatusEvent(kind="received", order_id=input.order_id)) charge_id = await workflow.execute_activity( charge_card, @@ -47,14 +47,7 @@ async def run(self, input: OrderInput) -> str: start_to_close_timeout=timedelta(seconds=30), ) - self.stream.publish( - TOPIC_STATUS, StatusEvent(kind="shipped", order_id=input.order_id) - ) - self.stream.publish( - TOPIC_PROGRESS, - ProgressEvent(message=f"charge id: {charge_id}"), - ) - self.stream.publish( - TOPIC_STATUS, StatusEvent(kind="complete", order_id=input.order_id) - ) + self.status.publish(StatusEvent(kind="shipped", order_id=input.order_id)) + self.progress.publish(ProgressEvent(message=f"charge id: {charge_id}")) + self.status.publish(StatusEvent(kind="complete", order_id=input.order_id)) return charge_id diff --git a/workflow_stream/workflows/pipeline_workflow.py b/workflow_streams/workflows/pipeline_workflow.py similarity index 83% rename from workflow_stream/workflows/pipeline_workflow.py rename to workflow_streams/workflows/pipeline_workflow.py index 5f53c1bf..a2d96d95 100644 --- a/workflow_stream/workflows/pipeline_workflow.py +++ b/workflow_streams/workflows/pipeline_workflow.py @@ -3,9 +3,9 @@ from datetime import timedelta from temporalio import workflow -from temporalio.contrib.workflow_stream import WorkflowStream +from temporalio.contrib.workflow_streams import WorkflowStream -from workflow_stream.shared import ( +from workflow_streams.shared import ( TOPIC_STATUS, PipelineInput, StageEvent, @@ -25,6 +25,7 @@ class PipelineWorkflow: @workflow.init def __init__(self, input: PipelineInput) -> None: self.stream = WorkflowStream(prior_state=input.stream_state) + self.status = self.stream.topic(TOPIC_STATUS, type=StageEvent) @workflow.run async def run(self, input: PipelineInput) -> str: @@ -37,7 +38,7 @@ async def run(self, input: PipelineInput) -> str: "complete", ] for stage in stages: - self.stream.publish(TOPIC_STATUS, StageEvent(stage=stage)) + self.status.publish(StageEvent(stage=stage)) if stage != "complete": await workflow.sleep(timedelta(seconds=2)) return f"pipeline {input.pipeline_id} done" diff --git a/workflow_stream/workflows/ticker_workflow.py b/workflow_streams/workflows/ticker_workflow.py similarity index 91% rename from workflow_stream/workflows/ticker_workflow.py rename to workflow_streams/workflows/ticker_workflow.py index 61f895a2..e11616b4 100644 --- a/workflow_stream/workflows/ticker_workflow.py +++ b/workflow_streams/workflows/ticker_workflow.py @@ -3,9 +3,9 @@ from datetime import timedelta from temporalio import workflow -from temporalio.contrib.workflow_stream import WorkflowStream +from temporalio.contrib.workflow_streams import WorkflowStream -from workflow_stream.shared import ( +from workflow_streams.shared import ( TOPIC_TICK, TickEvent, TickerInput, @@ -38,6 +38,7 @@ class TickerWorkflow: @workflow.init def __init__(self, input: TickerInput) -> None: self.stream = WorkflowStream(prior_state=input.stream_state) + self.tick = self.stream.topic(TOPIC_TICK, type=TickEvent) # Running count of events published by THIS run. To compute a # global offset, add the prior_state's base_offset (omitted # here — this sample doesn't continue-as-new). @@ -46,7 +47,7 @@ def __init__(self, input: TickerInput) -> None: @workflow.run async def run(self, input: TickerInput) -> str: for n in range(input.count): - self.stream.publish(TOPIC_TICK, TickEvent(n=n)) + self.tick.publish(TickEvent(n=n)) self._published += 1 await workflow.sleep(timedelta(milliseconds=input.interval_ms)) if (