diff --git a/README.md b/README.md index d4d6a61b..80cda649 100644 --- a/README.md +++ b/README.md @@ -79,6 +79,7 @@ Some examples require extra dependencies. See each sample's directory for specif * [patching](patching) - Alter workflows safely with `patch` and `deprecate_patch`. * [polling](polling) - Recommended implementation of an activity that needs to periodically poll an external resource waiting its successful completion. * [prometheus](prometheus) - Configure Prometheus metrics on clients/workers. +* [workflow_streams](workflow_streams) - Workflow-hosted durable event stream via `temporalio.contrib.workflow_streams`. **Experimental — requires the [`contrib/pubsub` branch](https://github.com/temporalio/sdk-python/tree/contrib/pubsub) of sdk-python.** * [pydantic_converter](pydantic_converter) - Data converter for using Pydantic models. * [schedules](schedules) - Demonstrates a Workflow Execution that occurs according to a schedule. * [sentry](sentry) - Report errors to Sentry. diff --git a/pyproject.toml b/pyproject.toml index 404f46ad..9c9bace1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -48,6 +48,7 @@ openai-agents = [ pydantic-converter = ["pydantic>=2.10.6,<3"] sentry = ["sentry-sdk>=2.13.0"] trio-async = ["trio>=0.28.0,<0.29", "trio-asyncio>=0.15.0,<0.16"] +llm-stream = ["openai>=1.0"] cloud-export-to-parquet = [ "pandas>=2.2.2,<3 ; python_version >= '3.10' and python_version < '4.0'", "numpy>=1.26.0,<2 ; python_version >= '3.10' and python_version < '3.13'", @@ -94,6 +95,7 @@ packages = [ "updatable_timer", "worker_specific_task_queues", "worker_versioning", + "workflow_streams", ] [tool.hatch.build.targets.wheel.sources] diff --git a/uv.lock b/uv.lock index 4dcdad5a..e025dab8 100644 --- a/uv.lock +++ b/uv.lock @@ -2589,6 +2589,9 @@ langsmith-tracing = [ { name = "openai" }, { name = "temporalio", extra = ["langsmith", "pydantic"] }, ] +llm-stream = [ + { name = "openai" }, +] nexus = [ { name = "nexus-rpc" }, ] @@ -2649,6 +2652,7 @@ langsmith-tracing = [ { name = "openai", specifier = ">=1.4.0" }, { name = "temporalio", extras = ["pydantic", "langsmith"], specifier = ">=1.27.0" }, ] +llm-stream = [{ name = "openai", specifier = ">=1.0" }] nexus = [{ name = "nexus-rpc", specifier = ">=1.1.0,<2" }] open-telemetry = [ { name = "opentelemetry-exporter-otlp-proto-grpc" }, diff --git a/workflow_streams/README.md b/workflow_streams/README.md new file mode 100644 index 00000000..f06bed39 --- /dev/null +++ b/workflow_streams/README.md @@ -0,0 +1,120 @@ +# Workflow Streams + +> **Experimental.** These samples use +> `temporalio.contrib.workflow_streams`, which ships in +> `temporalio>=1.27.0`. The module is considered experimental and its +> API may change in future versions. + +`temporalio.contrib.workflow_streams` lets a workflow host a durable, +offset-addressed event channel. The workflow holds an append-only log; +external clients (activities, starters, web backends) publish to topics via +signals and subscribe via long-poll updates. This packages the +boilerplate — batching, offset tracking, topic filtering, +continue-as-new hand-off — into a reusable stream. + +This directory has five scenarios. The first four share one worker; +the fifth has its own worker because it needs the `openai` package +and an `OPENAI_API_KEY`. + +**Scenario 1 — basic publish/subscribe with heterogeneous topics:** + +* `workflows/order_workflow.py` — a workflow that hosts a + `WorkflowStream` and publishes status events as it processes an order. +* `activities/payment_activity.py` — an activity that publishes + intermediate progress to the stream via + `WorkflowStreamClient.from_within_activity()`. +* `run_publisher.py` — starts the workflow, subscribes to both topics, + decodes each by `item.topic`, and prints events as they arrive. + +**Scenario 2 — reconnecting subscriber:** + +* `workflows/pipeline_workflow.py` — a multi-stage pipeline that + publishes stage transitions over ~10 seconds, leaving room for a + consumer to disconnect and reconnect mid-run. +* `run_reconnecting_subscriber.py` — connects, reads a couple of + events, "disconnects," then reopens a fresh client and resumes via + `subscribe(from_offset=...)`. This is the central Workflow Streams + use case: a consumer can disappear (page refresh, server restart, + laptop closed) and resume later without missing events or seeing + duplicates. + +**Scenario 3 — external (non-Activity) publisher:** + +* `workflows/hub_workflow.py` — a passive workflow that does no work + of its own; it exists only to host a `WorkflowStream` and shut down + when signaled. +* `run_external_publisher.py` — starts the hub, then publishes events + into it from a plain Python coroutine using + `WorkflowStreamClient.create(client, workflow_id)`. A subscriber + task runs alongside; when the publisher is done it emits a sentinel + event and signals `HubWorkflow.close`. The shape that fits a + backend service or scheduled job pushing events into a workflow it + didn't itself start. + +**Scenario 4 — bounded log via `truncate()`:** + +* `workflows/ticker_workflow.py` — a long-running workflow that + publishes events at a fixed cadence and calls + `self.stream.truncate(...)` periodically to bound log growth, + keeping only the most recent N entries. +* `run_truncating_ticker.py` — runs a fast subscriber and a slow + subscriber side by side. The fast one keeps up and sees every + offset in order; the slow one falls behind a truncation and + silently jumps forward to the new base offset. The output makes + the trade visible: bounded log size in exchange for intermediate + events being invisible to slow consumers. + +**Scenario 5 — LLM streaming:** + +* `workflows/llm_workflow.py` — hosts a `WorkflowStream` and runs + `stream_completion` as a single activity. The workflow itself + does no streaming; the activity owns the non-deterministic OpenAI + call. +* `activities/llm_activity.py` — calls + `openai.AsyncOpenAI().chat.completions.create(stream=True)`, + publishes each token chunk on the `delta` topic, the final + accumulated text on `complete`, and a `RetryEvent` on `retry` + when running on attempt > 1. +* `run_llm.py` — subscribes to all three topics, renders deltas to + the terminal as they arrive, and on a `retry` event uses ANSI + escapes to rewind the printed output before the retried attempt + re-publishes. + +Scenario 5 runs on its own worker (`run_llm_worker.py`, on +`workflow-stream-llm-task-queue`) because it needs the `openai` +dependency and an `OPENAI_API_KEY`, and because killing this worker +mid-stream is the easiest way to demonstrate retry handling without +disrupting the other four scenarios. + +## Run it + +For scenarios 1–4, start the shared worker: + +```bash +uv run workflow_streams/run_worker.py +``` + +For scenario 5, install the extra, export the key, and start the +LLM worker: + +```bash +uv sync --group llm-stream +export OPENAI_API_KEY=... +uv run workflow_streams/run_llm_worker.py +``` + +Then in another terminal, pick a scenario: + +```bash +uv run workflow_streams/run_publisher.py # scenario 1 +uv run workflow_streams/run_reconnecting_subscriber.py # scenario 2 +uv run workflow_streams/run_external_publisher.py # scenario 3 +uv run workflow_streams/run_truncating_ticker.py # scenario 4 +uv run workflow_streams/run_llm.py # scenario 5 +``` + +To exercise scenario 5's retry path, kill `run_llm_worker.py` +(`Ctrl-C`) while output is streaming and start it again. The +activity's next attempt sends a `RetryEvent` first; the consumer +clears its on-screen output via ANSI escapes and re-renders from +scratch. diff --git a/workflow_streams/__init__.py b/workflow_streams/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/workflow_streams/activities/__init__.py b/workflow_streams/activities/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/workflow_streams/activities/llm_activity.py b/workflow_streams/activities/llm_activity.py new file mode 100644 index 00000000..8b13b48a --- /dev/null +++ b/workflow_streams/activities/llm_activity.py @@ -0,0 +1,74 @@ +from __future__ import annotations + +from datetime import timedelta + +from openai import AsyncOpenAI +from temporalio import activity +from temporalio.contrib.workflow_streams import WorkflowStreamClient + +from workflow_streams.llm_shared import ( + TOPIC_COMPLETE, + TOPIC_DELTA, + TOPIC_RETRY, + LLMInput, + RetryEvent, + TextComplete, + TextDelta, +) + + +@activity.defn +async def stream_completion(input: LLMInput) -> str: + """Stream an LLM completion to the parent workflow's stream. + + Activity-as-publisher: each delta from the OpenAI streaming API is + pushed to the workflow's stream as a ``TextDelta`` event on the + ``delta`` topic. The accumulated full text returns as the + activity's result and is also published on the ``complete`` topic + as a terminator. On retry attempts (``activity.info().attempt > 1``) + a ``RetryEvent`` lands on the ``retry`` topic before the new + attempt's deltas, so consumers can reset their accumulated state + instead of concatenating the failed attempt's partial output with + the retried attempt's full output. + + No ``force_flush=True``: the 200ms ``batch_interval`` is fast + enough for an interactive feel, and the WorkflowStreamClient's + ``__aexit__`` cancels a sleeping flusher cleanly. (The doc example + uses ``force_flush=True`` on the first delta; that path currently + wedges the activity's exit on a cancel-mid-flight bug — fix is + pending in ``temporalio.contrib.workflow_streams``.) + """ + stream_client = WorkflowStreamClient.from_within_activity( + batch_interval=timedelta(milliseconds=200), + ) + # Disable provider-side retries; let Temporal own retry policy at + # the activity layer. + openai_client = AsyncOpenAI(max_retries=0) + + async with stream_client: + deltas = stream_client.topic(TOPIC_DELTA, type=TextDelta) + complete = stream_client.topic(TOPIC_COMPLETE, type=TextComplete) + retry = stream_client.topic(TOPIC_RETRY, type=RetryEvent) + + attempt = activity.info().attempt + if attempt > 1: + retry.publish(RetryEvent(attempt=attempt)) + + full: list[str] = [] + oai_stream = await openai_client.chat.completions.create( + model=input.model, + messages=[{"role": "user", "content": input.prompt}], + stream=True, + ) + async for chunk in oai_stream: + if not chunk.choices: + continue + text = chunk.choices[0].delta.content + if not text: + continue + deltas.publish(TextDelta(text=text)) + full.append(text) + + full_text = "".join(full) + complete.publish(TextComplete(full_text=full_text)) + return full_text diff --git a/workflow_streams/activities/payment_activity.py b/workflow_streams/activities/payment_activity.py new file mode 100644 index 00000000..b3b2aa29 --- /dev/null +++ b/workflow_streams/activities/payment_activity.py @@ -0,0 +1,30 @@ +from __future__ import annotations + +import asyncio +from datetime import timedelta + +from temporalio import activity +from temporalio.contrib.workflow_streams import WorkflowStreamClient + +from workflow_streams.shared import TOPIC_PROGRESS, ProgressEvent + + +@activity.defn +async def charge_card(order_id: str) -> str: + """Pretend to charge a card, publishing progress to the parent workflow. + + `WorkflowStreamClient.from_within_activity()` reads the parent + workflow id and the Temporal client from the activity context, so + this activity can push events back without any wiring. + """ + client = WorkflowStreamClient.from_within_activity( + batch_interval=timedelta(milliseconds=200) + ) + async with client: + progress = client.topic(TOPIC_PROGRESS, type=ProgressEvent) + progress.publish(ProgressEvent(message="charging card...")) + await asyncio.sleep(1.0) + progress.publish( + ProgressEvent(message="card charged"), + ) + return f"charge-{order_id}" diff --git a/workflow_streams/llm_shared.py b/workflow_streams/llm_shared.py new file mode 100644 index 00000000..2780fd0b --- /dev/null +++ b/workflow_streams/llm_shared.py @@ -0,0 +1,44 @@ +"""Types and constants for the LLM-streaming scenario. + +Kept separate from ``shared.py`` because the other scenarios don't +use these — and this scenario runs on its own worker and task queue +so the ``openai`` dependency stays out of everyone else's path. +""" + +from __future__ import annotations + +from dataclasses import dataclass + +from temporalio.contrib.workflow_streams import WorkflowStreamState + +# Scenario 5 runs on its own worker so the openai dependency only +# matters for that scenario. +LLM_TASK_QUEUE = "workflow-stream-llm-task-queue" + +# Topics published by the activity. +TOPIC_DELTA = "delta" +TOPIC_COMPLETE = "complete" +TOPIC_RETRY = "retry" + + +@dataclass +class LLMInput: + prompt: str + model: str = "gpt-5-mini" + # Carries stream state across continue-as-new. None on a fresh start. + stream_state: WorkflowStreamState | None = None + + +@dataclass +class TextDelta: + text: str + + +@dataclass +class TextComplete: + full_text: str + + +@dataclass +class RetryEvent: + attempt: int diff --git a/workflow_streams/run_external_publisher.py b/workflow_streams/run_external_publisher.py new file mode 100644 index 00000000..8e7d38f8 --- /dev/null +++ b/workflow_streams/run_external_publisher.py @@ -0,0 +1,99 @@ +"""External publisher: a non-Activity process pushes events into a workflow. + +The two earlier scenarios publish from inside the workflow itself +(``OrderWorkflow``, ``PipelineWorkflow``) or from an Activity it runs +(``charge_card``). This scenario shows the third shape: a backend +service, scheduled job, or anything else with a Temporal ``Client`` +publishing into a *running* workflow it didn't start. Same factory as +the subscribe path — :py:meth:`WorkflowStreamClient.create` — used for +publishing instead. + +The script starts a ``HubWorkflow`` (which does no work of its own — +it exists only to host the stream), then runs a publisher and a +subscriber concurrently. When the publisher is done it signals +``HubWorkflow.close``, the workflow's run finishes, and the +subscriber's iterator exits normally. + +Run the worker first (``uv run workflow_streams/run_worker.py``), then:: + + uv run workflow_streams/run_external_publisher.py +""" + +from __future__ import annotations + +import asyncio +import uuid + +from temporalio.client import Client +from temporalio.contrib.workflow_streams import WorkflowStreamClient + +from workflow_streams.shared import ( + TASK_QUEUE, + TOPIC_NEWS, + HubInput, + NewsEvent, +) +from workflow_streams.workflows.hub_workflow import HubWorkflow + +HEADLINES = [ + "rates held", + "merger announced", + "outage resolved", + "earnings beat", + "regulator opens probe", +] + +# In-band terminator the publisher emits before signaling close. The +# subscriber recognizes this value and stops polling — without an +# explicit terminator the consumer would have to rely on the workflow +# returning to break the iterator, which means racing the last item +# delivery against workflow completion. +DONE_HEADLINE = "__done__" + + +async def main() -> None: + client = await Client.connect("localhost:7233") + + workflow_id = f"workflow-stream-hub-{uuid.uuid4().hex[:8]}" + handle = await client.start_workflow( + HubWorkflow.run, + HubInput(hub_id=workflow_id), + id=workflow_id, + task_queue=TASK_QUEUE, + ) + + async def publish_news() -> None: + # WorkflowStreamClient.create takes a Temporal client and a + # workflow id — the same factory used elsewhere for subscribing. + # The async context manager batches publishes and flushes on + # exit; we additionally call flush() before signaling close so + # we know the events landed before the workflow shuts down. + producer = WorkflowStreamClient.create(client, workflow_id) + async with producer: + news = producer.topic(TOPIC_NEWS, type=NewsEvent) + for headline in HEADLINES: + news.publish(NewsEvent(headline=headline)) + print(f"[publisher] sent: {headline}") + await asyncio.sleep(0.5) + news.publish(NewsEvent(headline=DONE_HEADLINE), force_flush=True) + await producer.flush() + # Tell the hub it can stop. The subscriber has already broken + # out of its async-for loop on the sentinel above. + await handle.signal(HubWorkflow.close) + print("[publisher] signaled close") + + async def consume_news() -> None: + consumer = WorkflowStreamClient.create(client, workflow_id) + async for item in consumer.subscribe([TOPIC_NEWS], result_type=NewsEvent): + if item.data.headline == DONE_HEADLINE: + return + print(f"[subscriber] offset={item.offset}: {item.data.headline}") + + await asyncio.gather(publish_news(), consume_news()) + + result = await handle.result() + print(f"\nworkflow result: {result}") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/workflow_streams/run_llm.py b/workflow_streams/run_llm.py new file mode 100644 index 00000000..1b2a9d7a --- /dev/null +++ b/workflow_streams/run_llm.py @@ -0,0 +1,132 @@ +"""Stream LLM output to the terminal, handling retries. + +Starts an ``LLMWorkflow``, subscribes to its delta / complete / retry +topics, and renders the model's output to stdout as it arrives. On a +``RETRY`` event (the activity is on attempt > 1), the consumer rewinds +its rendered output with ANSI escapes and starts fresh — so a killed +worker doesn't leave a half-finished response stuck on screen +followed by the retried attempt's full output. + +Requires ``OPENAI_API_KEY`` in the environment and the ``llm-stream`` +extra:: + + uv sync --group llm-stream + export OPENAI_API_KEY=... + +Run the LLM worker first (``uv run workflow_streams/run_llm_worker.py``), +then:: + + uv run workflow_streams/run_llm.py + +To see retry handling in action, kill the LLM worker mid-stream +(Ctrl-C in its terminal) and start it again. The consumer will clear +its accumulated output on the ``RETRY`` event and re-render the +retried attempt's output from scratch. +""" + +from __future__ import annotations + +import asyncio +import sys +import uuid + +from temporalio.client import Client +from temporalio.common import RawValue +from temporalio.contrib.workflow_streams import WorkflowStreamClient + +from workflow_streams.llm_shared import ( + LLM_TASK_QUEUE, + TOPIC_COMPLETE, + TOPIC_DELTA, + TOPIC_RETRY, + LLMInput, + RetryEvent, + TextComplete, + TextDelta, +) +from workflow_streams.workflows.llm_workflow import LLMWorkflow + +# Long enough that you can comfortably kill the worker mid-stream and +# watch the retry render. Adjust to taste. +DEFAULT_PROMPT = ( + "Write a 500-word comparison of Paxos, Raft, and Viewstamped " + "Replication for a new distributed-systems engineer. Cover the " + "core ideas, leader election, normal-case operation, " + "reconfiguration, and the practical tradeoffs that show up when " + "implementing each. Use short paragraphs." +) + + +# ANSI cursor save / restore. ``\033[s`` saves the current cursor +# position, ``\033[u`` restores it, ``\033[J`` clears from the cursor +# to the end of the screen. Save once before the first delta, and on +# RETRY restore + clear-to-end so the failed attempt's rendered output +# disappears regardless of how it was wrapped by the terminal. Save +# again afterwards so a second retry can rewind to the same point. +ANSI_SAVE = "\033[s" +ANSI_RESTORE_AND_CLEAR = "\033[u\033[J" + + +async def main() -> None: + client = await Client.connect("localhost:7233") + converter = client.data_converter.payload_converter + + workflow_id = f"workflow-stream-llm-{uuid.uuid4().hex[:8]}" + llm_input = LLMInput(prompt=DEFAULT_PROMPT) + handle = await client.start_workflow( + LLMWorkflow.run, + llm_input, + id=workflow_id, + task_queue=LLM_TASK_QUEUE, + ) + + # Print a header so the user sees something immediately. The + # response will start streaming below it once the first delta + # arrives — until then this is the only line on screen. + print( + f"[llm {workflow_id}] streaming response from {llm_input.model}, " + f"awaiting first token..." + ) + print() + sys.stdout.write(ANSI_SAVE) + sys.stdout.flush() + + stream = WorkflowStreamClient.create(client, workflow_id) + + # result_type=RawValue lets us dispatch on item.topic and decode + # against the right dataclass per topic. The loop ends either on + # the `complete` terminator (break) or because the iterator + # naturally exhausts when the workflow reaches a terminal state + # without one (activity exhausted retries, etc.). Either way the + # handle.result() below either returns the full text or raises + # the workflow's failure. + async for item in stream.subscribe( + [TOPIC_DELTA, TOPIC_RETRY, TOPIC_COMPLETE], + result_type=RawValue, + ): + if item.topic == TOPIC_RETRY: + evt = converter.from_payload(item.data.payload, RetryEvent) + sys.stdout.write(ANSI_RESTORE_AND_CLEAR) + sys.stdout.flush() + print(f"[retry attempt {evt.attempt}] resetting output") + print() + sys.stdout.write(ANSI_SAVE) + sys.stdout.flush() + elif item.topic == TOPIC_DELTA: + delta = converter.from_payload(item.data.payload, TextDelta) + sys.stdout.write(delta.text) + sys.stdout.flush() + elif item.topic == TOPIC_COMPLETE: + # The full text is also in the payload (and returned by + # the workflow), but the consumer has already rendered it + # incrementally. Just terminate the line. + converter.from_payload(item.data.payload, TextComplete) + print() + break + + result = await handle.result() + print(f"\n[workflow result: {len(result)} chars]") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/workflow_streams/run_llm_worker.py b/workflow_streams/run_llm_worker.py new file mode 100644 index 00000000..2bad5991 --- /dev/null +++ b/workflow_streams/run_llm_worker.py @@ -0,0 +1,40 @@ +"""Worker for the LLM-streaming scenario. + +Runs separately from ``run_worker.py`` so the ``openai`` dependency +and the ``OPENAI_API_KEY`` requirement stay isolated to this one +scenario. Different task queue too — the other four samples won't +route work to this worker. + +Kill this worker mid-stream while ``run_llm.py`` is running to +trigger a retry: Temporal restarts the activity on the next worker +to come up, the activity publishes a ``RetryEvent`` on its second +attempt, and the consumer resets its rendered output. +""" + +from __future__ import annotations + +import asyncio +import logging + +from temporalio.client import Client +from temporalio.worker import Worker + +from workflow_streams.activities.llm_activity import stream_completion +from workflow_streams.llm_shared import LLM_TASK_QUEUE +from workflow_streams.workflows.llm_workflow import LLMWorkflow + + +async def main() -> None: + logging.basicConfig(level=logging.INFO) + client = await Client.connect("localhost:7233") + worker = Worker( + client, + task_queue=LLM_TASK_QUEUE, + workflows=[LLMWorkflow], + activities=[stream_completion], + ) + await worker.run() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/workflow_streams/run_publisher.py b/workflow_streams/run_publisher.py new file mode 100644 index 00000000..9f76ee41 --- /dev/null +++ b/workflow_streams/run_publisher.py @@ -0,0 +1,60 @@ +from __future__ import annotations + +import asyncio +import uuid + +from temporalio.client import Client +from temporalio.common import RawValue +from temporalio.contrib.workflow_streams import WorkflowStreamClient + +from workflow_streams.shared import ( + TASK_QUEUE, + TOPIC_PROGRESS, + TOPIC_STATUS, + OrderInput, + ProgressEvent, + StatusEvent, +) +from workflow_streams.workflows.order_workflow import OrderWorkflow + + +async def main() -> None: + client = await Client.connect("localhost:7233") + + workflow_id = f"workflow-stream-order-{uuid.uuid4().hex[:8]}" + handle = await client.start_workflow( + OrderWorkflow.run, + OrderInput(order_id="order-1"), + id=workflow_id, + task_queue=TASK_QUEUE, + ) + + stream = WorkflowStreamClient.create(client, workflow_id) + converter = client.data_converter.payload_converter + + # Single iterator over both topics — avoids a cancellation race + # between two concurrent subscribers. result_type=RawValue + # delivers the underlying Payload so we can dispatch heterogeneous + # events on item.topic. The loop ends either on the in-band + # `complete` terminator (break) or because the iterator exhausts + # when the workflow reaches a terminal state without one (e.g. on + # failure). Either way we then await handle.result(), which raises + # if the workflow failed. + async for item in stream.subscribe( + [TOPIC_STATUS, TOPIC_PROGRESS], result_type=RawValue + ): + if item.topic == TOPIC_STATUS: + evt = converter.from_payload(item.data.payload, StatusEvent) + print(f"[status] {evt.kind}: order={evt.order_id}") + if evt.kind == "complete": + break + elif item.topic == TOPIC_PROGRESS: + progress = converter.from_payload(item.data.payload, ProgressEvent) + print(f"[progress] {progress.message}") + + result = await handle.result() + print(f"workflow result: {result}") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/workflow_streams/run_reconnecting_subscriber.py b/workflow_streams/run_reconnecting_subscriber.py new file mode 100644 index 00000000..d0da5e32 --- /dev/null +++ b/workflow_streams/run_reconnecting_subscriber.py @@ -0,0 +1,164 @@ +"""Reconnecting subscriber: read a few events, disconnect, resume. + +Demonstrates the central Workflow Streams use case: a consumer can +disappear mid-stream — page refresh, server restart, laptop closed — +and resume later without missing events or seeing duplicates. The +event log lives in the Workflow, so the consumer just remembers where +it stopped. + +The script runs the pattern in two phases inside one process to keep +the demo short. The same code shape works across actual process +restarts because the resume offset is durable in the workflow, not in +the consumer. + +Output is one line per emit, with current stream stats in a left column +and a phase / event message in a right column. A background poller +calls ``WorkflowStreamClient.get_offset()`` for the whole demo and +emits a heartbeat line once a second so you can watch ``pending`` +(``available - processed``) grow while the consumer is disconnected +and shrink as phase 2 catches up. + +Run the worker first (``uv run workflow_streams/run_worker.py``), then:: + + uv run workflow_streams/run_reconnecting_subscriber.py +""" + +from __future__ import annotations + +import asyncio +import uuid +from dataclasses import dataclass + +from temporalio.client import Client +from temporalio.contrib.workflow_streams import WorkflowStreamClient + +from workflow_streams.shared import ( + TASK_QUEUE, + TOPIC_STATUS, + PipelineInput, + StageEvent, +) +from workflow_streams.workflows.pipeline_workflow import PipelineWorkflow + +# Number of events read in phase 1 before simulating a disconnect. +# Picked small enough that the workflow is still running after. +PHASE_1_EVENTS = 2 + +# How long to stay disconnected. +DISCONNECT_SECONDS = 3.0 + +# Background poller cadence. The poller refreshes state.available this +# often and emits a heartbeat line once per HEARTBEAT_SECONDS. +POLL_INTERVAL_SECONDS = 0.25 +HEARTBEAT_SECONDS = 1.0 + +# Width of the stats column. Picked to fit the longest stats string. +LEFT_WIDTH = 30 + + +@dataclass +class State: + processed: int = 0 + available: int = 0 + + @property + def pending(self) -> int: + return max(0, self.available - self.processed) + + +def emit(state: State, message: str) -> None: + left = ( + f"proc={state.processed:>2} " + f"avail={state.available:>2} " + f"pend={state.pending:>2}" + ) + print(f"{left:<{LEFT_WIDTH}}│ {message}", flush=True) + + +async def main() -> None: + client = await Client.connect("localhost:7233") + + workflow_id = f"workflow-stream-pipeline-{uuid.uuid4().hex[:8]}" + handle = await client.start_workflow( + PipelineWorkflow.run, + PipelineInput(pipeline_id=workflow_id), + id=workflow_id, + task_queue=TASK_QUEUE, + ) + + # In a production web backend the resume offset would live in + # durable storage keyed by (user_id, run_id) — a database row, a + # Redis key, etc. For an in-process demo a State.processed + # attribute works the same way. + state = State() + stream = WorkflowStreamClient.create(client, workflow_id) + emit(state, f"started {workflow_id}") + + stop = asyncio.Event() + + async def poller() -> None: + """Refresh state.available; emit a heartbeat line once a second.""" + loop = asyncio.get_running_loop() + last_emit = loop.time() + while not stop.is_set(): + try: + state.available = await stream.get_offset() + except Exception: + pass + now = loop.time() + if now - last_emit >= HEARTBEAT_SECONDS: + emit(state, "·") + last_emit = now + try: + await asyncio.wait_for(stop.wait(), timeout=POLL_INTERVAL_SECONDS) + except asyncio.TimeoutError: + pass + + poller_task = asyncio.create_task(poller()) + try: + # ---- Phase 1: connect, read a couple of events, "disconnect". + emit(state, "[phase 1] connecting") + seen = 0 + async for item in stream.subscribe([TOPIC_STATUS], result_type=StageEvent): + # Remember *one past* the offset just consumed: on resume we + # want the next unseen event, not the one we already showed. + state.processed = item.offset + 1 + emit(state, f" offset={item.offset:2d} stage={item.data.stage}") + seen += 1 + if seen >= PHASE_1_EVENTS: + break + emit(state, "[phase 1] disconnecting") + + # ---- Disconnect window: nobody reads. The workflow keeps + # publishing — `pend` grows on the heartbeat lines as the offset + # advances past `processed`. + await asyncio.sleep(DISCONNECT_SECONDS) + + # ---- Phase 2: brand-new client + stream, resume from saved + # offset. Same shape as a different process picking up where the + # first one left off. + emit(state, "[phase 2] reconnecting") + client2 = await Client.connect("localhost:7233") + stream2 = WorkflowStreamClient.create(client2, workflow_id) + async for item in stream2.subscribe( + [TOPIC_STATUS], + from_offset=state.processed, + result_type=StageEvent, + ): + state.processed = item.offset + 1 + emit(state, f" offset={item.offset:2d} stage={item.data.stage}") + if item.data.stage == "complete": + break + + result = await handle.result() + emit(state, f"workflow result: {result}") + finally: + stop.set() + try: + await poller_task + except asyncio.CancelledError: + pass + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/workflow_streams/run_truncating_ticker.py b/workflow_streams/run_truncating_ticker.py new file mode 100644 index 00000000..26399447 --- /dev/null +++ b/workflow_streams/run_truncating_ticker.py @@ -0,0 +1,129 @@ +"""Truncating ticker: bounded log + slow vs. fast subscribers. + +The ``TickerWorkflow`` publishes ``count`` events at a fixed interval, +calling ``self.stream.truncate(...)`` periodically to bound log +growth. This script subscribes twice — once fast, once slow — and +prints them in two lanes so the trade is visible at a glance: + +* **Fast lane** (left). Keeps up. Sees every published offset. +* **Slow lane** (right). Sleeps between iterations. When a truncation + has dropped its position by the time it polls again, the iterator + silently jumps forward to the new base offset; the slow lane prints + a ``↪ jumped N → M (K dropped)`` marker for each gap and resumes + at the new offset. + +``truncate()`` is unilateral: the workflow does not know who is +subscribed and does not wait for them. The implicit alternative — +never truncating — keeps every event around forever, lets slow +consumers eventually catch up without losses, and pays for it in +unbounded workflow history. The truncation model is the opposite +trade: bounded log, at-best-effort delivery to slow consumers, no +backpressure on the publisher. Pair it with set-semantic events where +each event carries enough state to make missing the prior ones +recoverable. (If you actually need lossless delivery to slow +consumers, the workflow has to coordinate acknowledgements +explicitly — that is a different sample.) + +Run the worker first (``uv run workflow_streams/run_worker.py``), then:: + + uv run workflow_streams/run_truncating_ticker.py +""" + +from __future__ import annotations + +import asyncio +import uuid + +from temporalio.client import Client +from temporalio.contrib.workflow_streams import WorkflowStreamClient + +from workflow_streams.shared import ( + TASK_QUEUE, + TOPIC_TICK, + TickerInput, + TickEvent, +) +from workflow_streams.workflows.ticker_workflow import TickerWorkflow + +# Aggressive truncation so the log stays at most KEEP_LAST entries +# right after each truncation, which keeps the slow subscriber's +# per-poll batch tiny. Small batches + a slow per-event sleep mean the +# slow subscriber re-polls often, and most of those polls land after a +# truncation that has passed its position — so it sees several jumps +# during the run rather than one batched at the end. +TICKER_COUNT = 30 +INTERVAL_MS = 200 +TRUNCATE_EVERY = 2 +KEEP_LAST = 1 +SLOW_SUBSCRIBER_DELAY_S = 1.5 + +LANE_WIDTH = 32 +SEP = "│" + + +def emit_fast(message: str) -> None: + print(f"{message:<{LANE_WIDTH}} {SEP}", flush=True) + + +def emit_slow(message: str) -> None: + print(f"{' ' * LANE_WIDTH} {SEP} {message}", flush=True) + + +def emit_header() -> None: + rule = "─" * LANE_WIDTH + print( + f"{'fast (every event)':<{LANE_WIDTH}} {SEP} " + f"slow (sleeps {SLOW_SUBSCRIBER_DELAY_S}s between events)" + ) + print(f"{rule} {SEP} {rule}") + + +async def main() -> None: + client = await Client.connect("localhost:7233") + + workflow_id = f"workflow-stream-ticker-{uuid.uuid4().hex[:8]}" + handle = await client.start_workflow( + TickerWorkflow.run, + TickerInput( + count=TICKER_COUNT, + keep_last=KEEP_LAST, + truncate_every=TRUNCATE_EVERY, + interval_ms=INTERVAL_MS, + ), + id=workflow_id, + task_queue=TASK_QUEUE, + ) + stream = WorkflowStreamClient.create(client, workflow_id) + last_n = TICKER_COUNT - 1 + + emit_header() + + async def fast_subscriber() -> None: + async for item in stream.subscribe([TOPIC_TICK], result_type=TickEvent): + emit_fast(f"offset={item.offset:>3} n={item.data.n}") + if item.data.n == last_n: + return + + async def slow_subscriber() -> None: + last_offset = -1 + async for item in stream.subscribe([TOPIC_TICK], result_type=TickEvent): + if last_offset >= 0 and item.offset > last_offset + 1: + gap = item.offset - last_offset - 1 + emit_slow( + f"↪ jumped offset={last_offset} → {item.offset} ({gap} dropped)" + ) + emit_slow(f"offset={item.offset:>3} n={item.data.n}") + last_offset = item.offset + if item.data.n == last_n: + return + await asyncio.sleep(SLOW_SUBSCRIBER_DELAY_S) + + await asyncio.gather(fast_subscriber(), slow_subscriber()) + + result = await handle.result() + print() + print(f"workflow result: {result}") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/workflow_streams/run_worker.py b/workflow_streams/run_worker.py new file mode 100644 index 00000000..8aa12edc --- /dev/null +++ b/workflow_streams/run_worker.py @@ -0,0 +1,30 @@ +from __future__ import annotations + +import asyncio +import logging + +from temporalio.client import Client +from temporalio.worker import Worker + +from workflow_streams.activities.payment_activity import charge_card +from workflow_streams.shared import TASK_QUEUE +from workflow_streams.workflows.hub_workflow import HubWorkflow +from workflow_streams.workflows.order_workflow import OrderWorkflow +from workflow_streams.workflows.pipeline_workflow import PipelineWorkflow +from workflow_streams.workflows.ticker_workflow import TickerWorkflow + + +async def main() -> None: + logging.basicConfig(level=logging.INFO) + client = await Client.connect("localhost:7233") + worker = Worker( + client, + task_queue=TASK_QUEUE, + workflows=[HubWorkflow, OrderWorkflow, PipelineWorkflow, TickerWorkflow], + activities=[charge_card], + ) + await worker.run() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/workflow_streams/shared.py b/workflow_streams/shared.py new file mode 100644 index 00000000..9bf5a4b7 --- /dev/null +++ b/workflow_streams/shared.py @@ -0,0 +1,70 @@ +from __future__ import annotations + +from dataclasses import dataclass + +from temporalio.contrib.workflow_streams import WorkflowStreamState + +TASK_QUEUE = "workflow-stream-sample-task-queue" + +# Topics published by the workflow / activity. +TOPIC_STATUS = "status" +TOPIC_PROGRESS = "progress" +TOPIC_NEWS = "news" +TOPIC_TICK = "tick" + + +@dataclass +class OrderInput: + order_id: str + # Carries stream state across continue-as-new. None on a fresh start. + stream_state: WorkflowStreamState | None = None + + +@dataclass +class StatusEvent: + kind: str + order_id: str + + +@dataclass +class ProgressEvent: + message: str + + +@dataclass +class PipelineInput: + pipeline_id: str + # Carries stream state across continue-as-new. None on a fresh start. + stream_state: WorkflowStreamState | None = None + + +@dataclass +class StageEvent: + stage: str + + +@dataclass +class HubInput: + hub_id: str + # Carries stream state across continue-as-new. None on a fresh start. + stream_state: WorkflowStreamState | None = None + + +@dataclass +class NewsEvent: + headline: str + + +@dataclass +class TickerInput: + count: int = 20 + keep_last: int = 3 + truncate_every: int = 5 + interval_ms: int = 400 + # Carries stream state across continue-as-new. None on a fresh start. + stream_state: WorkflowStreamState | None = None + + +@dataclass +class TickEvent: + n: int diff --git a/workflow_streams/workflows/__init__.py b/workflow_streams/workflows/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/workflow_streams/workflows/hub_workflow.py b/workflow_streams/workflows/hub_workflow.py new file mode 100644 index 00000000..5dcc3c5f --- /dev/null +++ b/workflow_streams/workflows/hub_workflow.py @@ -0,0 +1,40 @@ +from __future__ import annotations + +from datetime import timedelta + +from temporalio import workflow +from temporalio.contrib.workflow_streams import WorkflowStream + +from workflow_streams.shared import HubInput + + +@workflow.defn +class HubWorkflow: + """Passive stream host: starts up, waits, closes when told. + + Unlike OrderWorkflow or PipelineWorkflow, this workflow does no + work of its own — it exists only to host a ``WorkflowStream`` that + external publishers push events into and external subscribers read + from. The shape that fits a backend service or "event bus" pattern, + where the workflow owns durable state but the events come from + outside. + """ + + @workflow.init + def __init__(self, input: HubInput) -> None: + self.stream = WorkflowStream(prior_state=input.stream_state) + self._closed = False + + @workflow.run + async def run(self, input: HubInput) -> str: + await workflow.wait_condition(lambda: self._closed) + # The publisher publishes its own terminator into the stream + # before signaling close (see run_external_publisher.py). + # Hold the run open briefly so subscribers' final poll + # delivers any items still in the log. + await workflow.sleep(timedelta(milliseconds=500)) + return f"hub {input.hub_id} closed" + + @workflow.signal + def close(self) -> None: + self._closed = True diff --git a/workflow_streams/workflows/llm_workflow.py b/workflow_streams/workflows/llm_workflow.py new file mode 100644 index 00000000..b26cfbe4 --- /dev/null +++ b/workflow_streams/workflows/llm_workflow.py @@ -0,0 +1,51 @@ +from __future__ import annotations + +from datetime import timedelta + +from temporalio import workflow +from temporalio.common import RetryPolicy +from temporalio.contrib.workflow_streams import WorkflowStream + +from workflow_streams.llm_shared import LLMInput + +with workflow.unsafe.imports_passed_through(): + from workflow_streams.activities.llm_activity import stream_completion + + +@workflow.defn +class LLMWorkflow: + """Wrapper for an LLM-streaming activity. + + The workflow does no streaming of its own; it hosts the + `WorkflowStream` so external subscribers can attach by workflow + id, kicks off the streaming activity, and returns the full text + the activity produced. + + Streaming is delegated to the activity because the OpenAI call is + non-deterministic. If the activity fails partway through, Temporal + retries it (up to ``max_attempts``); the retried attempt + re-publishes from the start, so the consumer must reset on the + activity's ``RETRY`` event. See + `activities/llm_activity.py` and `run_llm.py`. + """ + + @workflow.init + def __init__(self, input: LLMInput) -> None: + # Construct the stream from `@workflow.init` so the + # publish-Signal handler is registered before any external + # publisher (the activity, here) tries to publish. + self.stream = WorkflowStream(prior_state=input.stream_state) + + @workflow.run + async def run(self, input: LLMInput) -> str: + result = await workflow.execute_activity( + stream_completion, + input, + start_to_close_timeout=timedelta(minutes=5), + retry_policy=RetryPolicy(maximum_attempts=3), + ) + # Hold the run open briefly so the consumer's next poll + # delivers the activity's terminal `complete` event before the + # workflow exits and the in-memory log is gone. + await workflow.sleep(timedelta(milliseconds=500)) + return result diff --git a/workflow_streams/workflows/order_workflow.py b/workflow_streams/workflows/order_workflow.py new file mode 100644 index 00000000..099634cd --- /dev/null +++ b/workflow_streams/workflows/order_workflow.py @@ -0,0 +1,58 @@ +from __future__ import annotations + +from datetime import timedelta + +from temporalio import workflow +from temporalio.contrib.workflow_streams import WorkflowStream + +from workflow_streams.shared import ( + TOPIC_PROGRESS, + TOPIC_STATUS, + OrderInput, + ProgressEvent, + StatusEvent, +) + +with workflow.unsafe.imports_passed_through(): + from workflow_streams.activities.payment_activity import charge_card + + +@workflow.defn +class OrderWorkflow: + """Process a fake order, publishing status and progress events. + + The workflow itself publishes status changes; an activity it runs + publishes finer-grained progress events using a + `WorkflowStreamClient`. A single stream carries both topics — + subscribers can filter on the topic(s) they care about. + """ + + @workflow.init + def __init__(self, input: OrderInput) -> None: + # Construct the stream from @workflow.init so it can register + # signal/update/query handlers before the workflow accepts any + # messages. Threading prior_state lets the workflow survive + # continue-as-new without losing buffered items. + self.stream = WorkflowStream(prior_state=input.stream_state) + self.status = self.stream.topic(TOPIC_STATUS, type=StatusEvent) + self.progress = self.stream.topic(TOPIC_PROGRESS, type=ProgressEvent) + + @workflow.run + async def run(self, input: OrderInput) -> str: + self.status.publish(StatusEvent(kind="received", order_id=input.order_id)) + + charge_id = await workflow.execute_activity( + charge_card, + input.order_id, + start_to_close_timeout=timedelta(seconds=30), + ) + + self.status.publish(StatusEvent(kind="shipped", order_id=input.order_id)) + self.progress.publish(ProgressEvent(message=f"charge id: {charge_id}")) + self.status.publish(StatusEvent(kind="complete", order_id=input.order_id)) + # The "complete" status event above is the in-band terminator + # subscribers break on (see run_publisher.py). Hold the run + # open briefly so subscribers' next poll delivers it before + # this task returns and the in-memory log is gone. + await workflow.sleep(timedelta(milliseconds=500)) + return charge_id diff --git a/workflow_streams/workflows/pipeline_workflow.py b/workflow_streams/workflows/pipeline_workflow.py new file mode 100644 index 00000000..83336905 --- /dev/null +++ b/workflow_streams/workflows/pipeline_workflow.py @@ -0,0 +1,48 @@ +from __future__ import annotations + +from datetime import timedelta + +from temporalio import workflow +from temporalio.contrib.workflow_streams import WorkflowStream + +from workflow_streams.shared import ( + TOPIC_STATUS, + PipelineInput, + StageEvent, +) + + +@workflow.defn +class PipelineWorkflow: + """Multi-stage pipeline that publishes stage transitions over time. + + Stages are spaced out with ``workflow.sleep`` so a subscriber can + realistically disconnect partway through and reconnect without the + pipeline finishing in the meantime — the shape needed to demo the + "show up late and still see what happened" pattern. + """ + + @workflow.init + def __init__(self, input: PipelineInput) -> None: + self.stream = WorkflowStream(prior_state=input.stream_state) + self.status = self.stream.topic(TOPIC_STATUS, type=StageEvent) + + @workflow.run + async def run(self, input: PipelineInput) -> str: + stages = [ + "validating", + "loading data", + "transforming", + "writing output", + "verifying", + "complete", + ] + for stage in stages: + self.status.publish(StageEvent(stage=stage)) + if stage != "complete": + await workflow.sleep(timedelta(seconds=2)) + # The "complete" stage above is the in-band terminator + # subscribers break on. Hold the run open briefly so the final + # poll delivers it. + await workflow.sleep(timedelta(milliseconds=500)) + return f"pipeline {input.pipeline_id} done" diff --git a/workflow_streams/workflows/ticker_workflow.py b/workflow_streams/workflows/ticker_workflow.py new file mode 100644 index 00000000..c3f37b9f --- /dev/null +++ b/workflow_streams/workflows/ticker_workflow.py @@ -0,0 +1,66 @@ +from __future__ import annotations + +from datetime import timedelta + +from temporalio import workflow +from temporalio.contrib.workflow_streams import WorkflowStream + +from workflow_streams.shared import ( + TOPIC_TICK, + TickerInput, + TickEvent, +) + + +@workflow.defn +class TickerWorkflow: + """Long-running ticker that bounds its event log via ``truncate``. + + Long-running workflows that publish high volumes of events would + otherwise grow their event log unboundedly. This workflow shows + the truncation pattern: every ``truncate_every`` events, drop + everything except the last ``keep_last`` entries by calling + ``self.stream.truncate(safe_offset)``. + + Subscribers that fall behind a truncation jump forward to the new + base offset transparently (the iterator handles the + ``TruncatedOffset`` error internally), so consumers stay live but + may not see every intermediate event. That is the trade: bounded + log size in exchange for at-best-effort delivery to slow + consumers. + + To compute the truncation offset the workflow tracks its own + published count. ``WorkflowStream`` does not expose a workflow-side + head-offset accessor, but the running count plus the carried + ``base_offset`` (in continue-as-new chains) is sufficient. + """ + + @workflow.init + def __init__(self, input: TickerInput) -> None: + self.stream = WorkflowStream(prior_state=input.stream_state) + self.tick = self.stream.topic(TOPIC_TICK, type=TickEvent) + # Running count of events published by THIS run. To compute a + # global offset, add the prior_state's base_offset (omitted + # here — this sample doesn't continue-as-new). + self._published = 0 + + @workflow.run + async def run(self, input: TickerInput) -> str: + for n in range(input.count): + self.tick.publish(TickEvent(n=n)) + self._published += 1 + await workflow.sleep(timedelta(milliseconds=input.interval_ms)) + if ( + self._published % input.truncate_every == 0 + and self._published > input.keep_last + ): + # Drop everything except the last `keep_last` entries. + truncate_to = self._published - input.keep_last + self.stream.truncate(truncate_to) + # The final tick (n == count - 1) is the in-band terminator + # subscribers break on. ``keep_last`` guarantees that final + # offset survives the last truncation so even slow consumers + # eventually see it. Hold the run open briefly so the final + # poll delivers it. + await workflow.sleep(timedelta(milliseconds=500)) + return f"ticker emitted {self._published} events"