faststream-redis-timers is a FastStream broker integration for Redis-backed distributed timer scheduling.
Schedule messages to be delivered to subscribers at a future point in time, with at-least-once delivery across multiple workers.
from datetime import timedelta
from faststream import FastStream
from faststream_redis_timers import TimersBroker
from redis.asyncio import Redis
client = Redis.from_url("redis://localhost:6379")
broker = TimersBroker(client)
app = FastStream(broker)
@broker.subscriber("invoices")
async def handle_invoice(invoice_id: str) -> None:
print(f"Invoice {invoice_id} is due!")
@app.after_startup
async def schedule() -> None:
await broker.publish(
"INV-001",
topic="invoices",
activate_in=timedelta(days=30),
)Schedule at an absolute time with activate_at instead. publish() returns the
resolved timer_id so you can keep it for cancellation:
from datetime import UTC, datetime
timer_id = await broker.publish(
"INV-001",
topic="invoices",
activate_at=datetime(2026, 6, 1, 9, tzinfo=UTC),
)Timers are stored in Redis as two keys per topic:
- A sorted set (
timers_timeline:{topic}) with the activation timestamp as score - A hash (
timers_payloads:{topic}) with the serialized message body
So a timer on the invoices topic lives under timers_timeline:invoices and timers_payloads:invoices — handy when poking around in redis-cli. A TimersRouter(prefix="my-service:") extends the suffix to my-service:invoices.
A polling loop checks for due timers and atomically claims each one via a Lua
script that pushes its score forward by lease_ttl seconds — granting the
worker a lease. The timer is removed from Redis only after the handler
completes successfully. If the worker crashes mid-handler or the handler
raises, the lease eventually expires and another worker re-claims the timer.
This is the standard SQS-style visibility-timeout pattern: at-least-once delivery with no data loss on crash, at the cost of requiring idempotent handlers.
timer_id = await broker.publish("INV-001", topic="invoices", activate_in=timedelta(days=30))
await broker.cancel_timer("invoices", timer_id)publish() accepts correlation_id and headers — both round-trip to the handler via the standard FastStream StreamMessage:
await broker.publish(
{"order_id": 42},
topic="orders",
correlation_id="trace-abc-123",
headers={"x-tenant": "acme", "x-priority": "high"},
)Inside the handler:
from faststream import Context
@broker.subscriber("orders")
async def handle(
body: dict,
correlation_id: str = Context("message.correlation_id"),
tenant: str = Context("message.headers.x-tenant"),
) -> None:
...TimersBroker does not close the Redis client you pass in — the caller owns
its lifecycle. The same client can be shared across multiple brokers, so closing
it from one broker would surprise the others. Manage the client with async with
or a try/finally:
async with Redis.from_url("redis://localhost:6379") as client:
broker = TimersBroker(client)
app = FastStream(broker)
await app.run()Per-subscriber knobs (passed to @broker.subscriber("topic", ...)):
lease_ttl(default30seconds) — how long a worker holds the lease while processing. Handlers must complete within this window or another worker may re-deliver the timer (duplicate). Increase if your handlers are slow.polling_interval(default0.05s) — base poll interval used when the queue has work or just transitioned from idle. Doubles on each consecutive empty cycle, capped atmax_polling_interval, with ±50% jitter applied each sleep.max_polling_interval(default5.0s) — ceiling for the adaptive idle backoff. Lower it for tighter delivery latency on idle queues; raise it to reduce Redis load on workloads with long idle stretches.max_concurrent(default5) — maximum number of handlers running in parallel per subscriber. Also caps the fetch batch size per poll cycle. Handlers must be safe under concurrency in addition to being idempotent.
- Handlers must be idempotent. A handler that ran successfully but whose
ack failed to land in Redis (network blip) will be retried; a handler that
takes longer than
lease_ttlmay be re-delivered to another worker. - Buggy handler retries forever. If a handler always raises, the timer is
retried every
lease_ttlseconds indefinitely — there is no built-in attempt counter. Raisefaststream.exceptions.RejectMessagefrom your handler to drop a poison-pill timer permanently, or track attempts in your own state if you need a hard cap.
Run multiple TimersBroker processes against the same Redis keys. The Lua
claim script ensures each due timer is leased by exactly one worker at a
time; failover is automatic via lease expiry.
Supports single-primary Redis, including Sentinel-managed primary/replica
setups. Redis Cluster is not supported: each topic's timeline and
payload keys must live on the same node, and the broker raises a
TypeError if constructed with a RedisCluster client.
broker = TimersBroker(
Redis.from_url("redis://..."),
timeline_key="my_timeline",
payloads_key="my_payloads",
)