Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
44ceff7
Add workflow_streams samples: order_workflow scenario
jssmith Apr 29, 2026
faac49f
samples: workflow_stream: add reconnecting-subscriber scenario
jssmith Apr 29, 2026
b607117
samples: workflow_stream: add external-publisher scenario
jssmith Apr 29, 2026
91233b0
samples: workflow_stream: add truncating-ticker scenario
jssmith Apr 29, 2026
78062b4
samples: rename workflow_stream → workflow_streams; migrate to topic …
jssmith Apr 30, 2026
5d67b9e
samples: workflow_streams review polish
jssmith Apr 30, 2026
6294691
workflow_streams: deliver terminal events + fix run_publisher subscri…
jssmith Apr 30, 2026
bfbb2ed
workflow_streams README: document the stream-end pattern
jssmith Apr 30, 2026
fb3c8fc
Merge main into workflow-streams-samples
jssmith May 3, 2026
0962379
samples: workflow_streams: README and wheel packages cleanup
jssmith May 3, 2026
d5cc2fe
samples: workflow_streams: drop force_flush=True from charge_card
jssmith May 3, 2026
553bfdb
samples: workflow_streams: drop temp-file resume offset; add stats co…
jssmith May 3, 2026
c107687
samples: workflow_streams: surface multiple truncation jumps in ticker
jssmith May 3, 2026
31b6cf0
samples: workflow_streams: add LLM-streaming scenario
jssmith May 3, 2026
e8620c6
samples: workflow_streams: drop chat-stream openai upper cap
jssmith May 3, 2026
0b4cbc8
samples: workflow_streams: chat consumer header + cursor save/restore
jssmith May 3, 2026
81bf605
samples: workflow_streams: rename chat -> llm in scenario 5
jssmith May 3, 2026
c8663e5
samples: workflow_streams: race the LLM consumer with workflow result
jssmith May 3, 2026
44d944b
samples: workflow_streams: drop race_with_workflow helper
jssmith May 3, 2026
a760ad3
samples: workflow_streams: reorganize README; drop closing section
jssmith May 3, 2026
dc381c5
samples: workflow_streams: drop README Notes section
jssmith May 3, 2026
7a5065e
samples: workflow_streams: lock llm-stream dependency group
jssmith May 3, 2026
51f2f2d
samples: workflow_streams: fix lint failures (ruff isort + format)
jssmith May 3, 2026
2f39146
samples: workflow_streams: drop BFF jargon and Expected output block
jssmith May 3, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ Some examples require extra dependencies. See each sample's directory for specif
* [patching](patching) - Alter workflows safely with `patch` and `deprecate_patch`.
* [polling](polling) - Recommended implementation of an activity that needs to periodically poll an external resource waiting its successful completion.
* [prometheus](prometheus) - Configure Prometheus metrics on clients/workers.
* [workflow_streams](workflow_streams) - Workflow-hosted durable event stream via `temporalio.contrib.workflow_streams`. **Experimental — requires the [`contrib/pubsub` branch](https://github.com/temporalio/sdk-python/tree/contrib/pubsub) of sdk-python.**
* [pydantic_converter](pydantic_converter) - Data converter for using Pydantic models.
* [schedules](schedules) - Demonstrates a Workflow Execution that occurs according to a schedule.
* [sentry](sentry) - Report errors to Sentry.
Expand Down
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ openai-agents = [
pydantic-converter = ["pydantic>=2.10.6,<3"]
sentry = ["sentry-sdk>=2.13.0"]
trio-async = ["trio>=0.28.0,<0.29", "trio-asyncio>=0.15.0,<0.16"]
llm-stream = ["openai>=1.0"]
cloud-export-to-parquet = [
"pandas>=2.2.2,<3 ; python_version >= '3.10' and python_version < '4.0'",
"numpy>=1.26.0,<2 ; python_version >= '3.10' and python_version < '3.13'",
Expand Down Expand Up @@ -94,6 +95,7 @@ packages = [
"updatable_timer",
"worker_specific_task_queues",
"worker_versioning",
"workflow_streams",
]

[tool.hatch.build.targets.wheel.sources]
Expand Down
4 changes: 4 additions & 0 deletions uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

120 changes: 120 additions & 0 deletions workflow_streams/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
# Workflow Streams

> **Experimental.** These samples use
> `temporalio.contrib.workflow_streams`, which ships in
> `temporalio>=1.27.0`. The module is considered experimental and its
> API may change in future versions.

`temporalio.contrib.workflow_streams` lets a workflow host a durable,
offset-addressed event channel. The workflow holds an append-only log;
external clients (activities, starters, web backends) publish to topics via
signals and subscribe via long-poll updates. This packages the
boilerplate — batching, offset tracking, topic filtering,
continue-as-new hand-off — into a reusable stream.

This directory has five scenarios. The first four share one worker;
the fifth has its own worker because it needs the `openai` package
and an `OPENAI_API_KEY`.

**Scenario 1 — basic publish/subscribe with heterogeneous topics:**

* `workflows/order_workflow.py` — a workflow that hosts a
`WorkflowStream` and publishes status events as it processes an order.
* `activities/payment_activity.py` — an activity that publishes
intermediate progress to the stream via
`WorkflowStreamClient.from_within_activity()`.
* `run_publisher.py` — starts the workflow, subscribes to both topics,
decodes each by `item.topic`, and prints events as they arrive.

**Scenario 2 — reconnecting subscriber:**

* `workflows/pipeline_workflow.py` — a multi-stage pipeline that
publishes stage transitions over ~10 seconds, leaving room for a
consumer to disconnect and reconnect mid-run.
* `run_reconnecting_subscriber.py` — connects, reads a couple of
events, "disconnects," then reopens a fresh client and resumes via
`subscribe(from_offset=...)`. This is the central Workflow Streams
use case: a consumer can disappear (page refresh, server restart,
laptop closed) and resume later without missing events or seeing
duplicates.

**Scenario 3 — external (non-Activity) publisher:**

* `workflows/hub_workflow.py` — a passive workflow that does no work
of its own; it exists only to host a `WorkflowStream` and shut down
when signaled.
* `run_external_publisher.py` — starts the hub, then publishes events
into it from a plain Python coroutine using
`WorkflowStreamClient.create(client, workflow_id)`. A subscriber
task runs alongside; when the publisher is done it emits a sentinel
event and signals `HubWorkflow.close`. The shape that fits a
backend service or scheduled job pushing events into a workflow it
didn't itself start.

**Scenario 4 — bounded log via `truncate()`:**

* `workflows/ticker_workflow.py` — a long-running workflow that
publishes events at a fixed cadence and calls
`self.stream.truncate(...)` periodically to bound log growth,
keeping only the most recent N entries.
* `run_truncating_ticker.py` — runs a fast subscriber and a slow
subscriber side by side. The fast one keeps up and sees every
offset in order; the slow one falls behind a truncation and
silently jumps forward to the new base offset. The output makes
the trade visible: bounded log size in exchange for intermediate
events being invisible to slow consumers.

**Scenario 5 — LLM streaming:**

* `workflows/llm_workflow.py` — hosts a `WorkflowStream` and runs
`stream_completion` as a single activity. The workflow itself
does no streaming; the activity owns the non-deterministic OpenAI
call.
* `activities/llm_activity.py` — calls
`openai.AsyncOpenAI().chat.completions.create(stream=True)`,
publishes each token chunk on the `delta` topic, the final
accumulated text on `complete`, and a `RetryEvent` on `retry`
when running on attempt > 1.
* `run_llm.py` — subscribes to all three topics, renders deltas to
the terminal as they arrive, and on a `retry` event uses ANSI
escapes to rewind the printed output before the retried attempt
re-publishes.

Scenario 5 runs on its own worker (`run_llm_worker.py`, on
`workflow-stream-llm-task-queue`) because it needs the `openai`
dependency and an `OPENAI_API_KEY`, and because killing this worker
mid-stream is the easiest way to demonstrate retry handling without
disrupting the other four scenarios.

## Run it

For scenarios 1–4, start the shared worker:

```bash
uv run workflow_streams/run_worker.py
```

For scenario 5, install the extra, export the key, and start the
LLM worker:

```bash
uv sync --group llm-stream
export OPENAI_API_KEY=...
uv run workflow_streams/run_llm_worker.py
```

Then in another terminal, pick a scenario:

```bash
uv run workflow_streams/run_publisher.py # scenario 1
uv run workflow_streams/run_reconnecting_subscriber.py # scenario 2
uv run workflow_streams/run_external_publisher.py # scenario 3
uv run workflow_streams/run_truncating_ticker.py # scenario 4
uv run workflow_streams/run_llm.py # scenario 5
```

To exercise scenario 5's retry path, kill `run_llm_worker.py`
(`Ctrl-C`) while output is streaming and start it again. The
activity's next attempt sends a `RetryEvent` first; the consumer
clears its on-screen output via ANSI escapes and re-renders from
scratch.
Empty file added workflow_streams/__init__.py
Empty file.
Empty file.
74 changes: 74 additions & 0 deletions workflow_streams/activities/llm_activity.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
from __future__ import annotations

from datetime import timedelta

from openai import AsyncOpenAI
from temporalio import activity
from temporalio.contrib.workflow_streams import WorkflowStreamClient

from workflow_streams.llm_shared import (
TOPIC_COMPLETE,
TOPIC_DELTA,
TOPIC_RETRY,
LLMInput,
RetryEvent,
TextComplete,
TextDelta,
)


@activity.defn
async def stream_completion(input: LLMInput) -> str:
"""Stream an LLM completion to the parent workflow's stream.

Activity-as-publisher: each delta from the OpenAI streaming API is
pushed to the workflow's stream as a ``TextDelta`` event on the
``delta`` topic. The accumulated full text returns as the
activity's result and is also published on the ``complete`` topic
as a terminator. On retry attempts (``activity.info().attempt > 1``)
a ``RetryEvent`` lands on the ``retry`` topic before the new
attempt's deltas, so consumers can reset their accumulated state
instead of concatenating the failed attempt's partial output with
the retried attempt's full output.

No ``force_flush=True``: the 200ms ``batch_interval`` is fast
enough for an interactive feel, and the WorkflowStreamClient's
``__aexit__`` cancels a sleeping flusher cleanly. (The doc example
uses ``force_flush=True`` on the first delta; that path currently
wedges the activity's exit on a cancel-mid-flight bug — fix is
pending in ``temporalio.contrib.workflow_streams``.)
"""
stream_client = WorkflowStreamClient.from_within_activity(
batch_interval=timedelta(milliseconds=200),
)
# Disable provider-side retries; let Temporal own retry policy at
# the activity layer.
openai_client = AsyncOpenAI(max_retries=0)

async with stream_client:
deltas = stream_client.topic(TOPIC_DELTA, type=TextDelta)
complete = stream_client.topic(TOPIC_COMPLETE, type=TextComplete)
retry = stream_client.topic(TOPIC_RETRY, type=RetryEvent)

attempt = activity.info().attempt
if attempt > 1:
retry.publish(RetryEvent(attempt=attempt))

full: list[str] = []
oai_stream = await openai_client.chat.completions.create(
model=input.model,
messages=[{"role": "user", "content": input.prompt}],
stream=True,
)
async for chunk in oai_stream:
if not chunk.choices:
continue
text = chunk.choices[0].delta.content
if not text:
continue
deltas.publish(TextDelta(text=text))
full.append(text)

full_text = "".join(full)
complete.publish(TextComplete(full_text=full_text))
return full_text
30 changes: 30 additions & 0 deletions workflow_streams/activities/payment_activity.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
from __future__ import annotations

import asyncio
from datetime import timedelta

from temporalio import activity
from temporalio.contrib.workflow_streams import WorkflowStreamClient

from workflow_streams.shared import TOPIC_PROGRESS, ProgressEvent


@activity.defn
async def charge_card(order_id: str) -> str:
"""Pretend to charge a card, publishing progress to the parent workflow.
`WorkflowStreamClient.from_within_activity()` reads the parent
workflow id and the Temporal client from the activity context, so
this activity can push events back without any wiring.
"""
client = WorkflowStreamClient.from_within_activity(
batch_interval=timedelta(milliseconds=200)
)
async with client:
progress = client.topic(TOPIC_PROGRESS, type=ProgressEvent)
progress.publish(ProgressEvent(message="charging card..."))
await asyncio.sleep(1.0)
progress.publish(
ProgressEvent(message="card charged"),
)
return f"charge-{order_id}"
44 changes: 44 additions & 0 deletions workflow_streams/llm_shared.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
"""Types and constants for the LLM-streaming scenario.

Kept separate from ``shared.py`` because the other scenarios don't
use these — and this scenario runs on its own worker and task queue
so the ``openai`` dependency stays out of everyone else's path.
"""

from __future__ import annotations

from dataclasses import dataclass

from temporalio.contrib.workflow_streams import WorkflowStreamState

# Scenario 5 runs on its own worker so the openai dependency only
# matters for that scenario.
LLM_TASK_QUEUE = "workflow-stream-llm-task-queue"

# Topics published by the activity.
TOPIC_DELTA = "delta"
TOPIC_COMPLETE = "complete"
TOPIC_RETRY = "retry"


@dataclass
class LLMInput:
prompt: str
model: str = "gpt-5-mini"
# Carries stream state across continue-as-new. None on a fresh start.
stream_state: WorkflowStreamState | None = None


@dataclass
class TextDelta:
text: str


@dataclass
class TextComplete:
full_text: str


@dataclass
class RetryEvent:
attempt: int
Loading
Loading