Skip to content

stream subscriber mapping#12

Open
GefMar wants to merge 3 commits intomainfrom
StreamSubscribe
Open

stream subscriber mapping#12
GefMar wants to merge 3 commits intomainfrom
StreamSubscribe

Conversation

@GefMar
Copy link
Copy Markdown
Member

@GefMar GefMar commented May 4, 2026

Summary

Adds stream topic subscriptions via broker.subscribe(topic, task, decoder=decoder).

This allows taskiq-aio-kafka workers to consume raw Kafka messages from a topic and route them into an existing Taskiq task. The broker wraps the raw Kafka payload into a normal Taskiq message before execution, so result backend integration, labels, middlewares, and the regular worker execution flow continue to work as expected.

Why

Some Kafka topics contain domain events or external stream messages, not Taskiq-formatted messages. Until now, handling those messages with Taskiq usually required adding another stream framework such as FastStream alongside Taskiq.

That works, but it is not always convenient:

  • it adds another framework and lifecycle to the application;
  • task handlers cannot be reused as regular Taskiq tasks directly;
  • result backend behavior and Taskiq middleware flow are harder to preserve consistently;
  • projects that only need a small subscriber layer have to adopt more infrastructure than necessary.

This change makes simple Kafka stream consumption possible directly inside taskiq-aio-kafka.

Usage

import json

from taskiq_aio_kafka import AioKafkaBroker

broker = AioKafkaBroker(
    bootstrap_servers="localhost",
    kafka_topic="taskiq-topic",
)


@broker.task
async def process_user_created(event: dict[str, object]) -> None:
    print(event)


broker.subscribe(
    "users.created",
    process_user_created,
    decoder=json.loads,
)

Messages received from users.created are treated as raw Kafka payloads. The decoder receives the original bytes value, and the decoded value is passed to the task as the first argument.

The broker also adds the taskiq-stream label with the source Kafka topic name.

When This Is Useful

This is useful when an application already uses Taskiq but also needs to react to Kafka topics that are not produced by Taskiq, for example:

  • domain events from another service;
  • CDC/outbox topics;
  • integration events from external systems;
  • simple event handlers that should still use Taskiq result backends and middlewares;
  • gradual migration from a separate stream consumer into Taskiq-managed workers.

Notes

This does not replace task_with_topic.

task_with_topic controls where Taskiq sends a task when it is kicked.
subscribe controls how raw Kafka messages are received and mapped into an existing task.

@GefMar GefMar requested a review from s3rius May 4, 2026 19:00
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant