Conversation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Adds stream topic subscriptions via
broker.subscribe(topic, task, decoder=decoder).This allows
taskiq-aio-kafkaworkers to consume raw Kafka messages from a topic and route them into an existing Taskiq task. The broker wraps the raw Kafka payload into a normal Taskiq message before execution, so result backend integration, labels, middlewares, and the regular worker execution flow continue to work as expected.Why
Some Kafka topics contain domain events or external stream messages, not Taskiq-formatted messages. Until now, handling those messages with Taskiq usually required adding another stream framework such as FastStream alongside Taskiq.
That works, but it is not always convenient:
This change makes simple Kafka stream consumption possible directly inside
taskiq-aio-kafka.Usage
Messages received from
users.createdare treated as raw Kafka payloads. The decoder receives the originalbytesvalue, and the decoded value is passed to the task as the first argument.The broker also adds the
taskiq-streamlabel with the source Kafka topic name.When This Is Useful
This is useful when an application already uses Taskiq but also needs to react to Kafka topics that are not produced by Taskiq, for example:
Notes
This does not replace
task_with_topic.task_with_topiccontrols where Taskiq sends a task when it is kicked.subscribecontrols how raw Kafka messages are received and mapped into an existing task.