This is a common question asked by many developers new to Apache Kafka.
Let's imagine a situation where you have two topics Topic A
and Topic B.
You want to combine them, apply some necessary transformations, maybe filter out some messages and write it back to a combined topic Topic C
or store the result somewhere else. What's the best way to do this?
Although the example uses two input topics, the solutions proposed here also apply to any amount of source topics.
This can be done in so many different ways that every technique introduced here could be several posts on its own. We will highlight the major ones based on every use case.
This scenario would be equivalent to union
in SQL.
Probably the simplest scenario you can encounter. You can often look at them as subsets of a larger dataset.
Examples for both topics could be FurnitureProductPurchased
and LightingProductPurchased
, and you want to abstract those messages into a new topic ProductPurchased
.
If message order is not important, you can subscribe to both topics using a single consumer. If you need to produce them in chronological order, you may need two consumers where you keep polling the one with the earliest messages and producing using one producer.
For extra convenience and data traceability, you may want to add a discriminator field to those messages to know where they came from once they are in the resulting dataset.
This scenario would be a particular case of join
.
One concrete example is having two topics ProductPurchased
and Product
. You want to enrich the messages in ProductPurchased
, that have a productId
, with the product information available in the Product
topic. The results will be written to a new topic called EnrichedProductPurchased
, or stored somewhere else.
You can think about Product
topic as a lookup dataset, which would be a small slow-changing dataset that more often than not can fit in memory.
If you are familiar with data warehouse techniques like the dimensional modelling introduced by Kimball, ProductPurchased
would be the fact topic and Product
would be the dimension topic. Instead of tables, you have topics here.
Main technique
The main technique here is to first read the Product
topic to generate an in-memory snapshot of all the product information available. Once you have done that, you can start consuming ProductPurchased
and enriching the incoming messages by looking up the productId
in the in-memory mapping you've just generated.
Generating the lookup snapshot
In many situations, the dimension or lookup topic Product
is quite small. In those cases, you can load it in a normal dictionary using the standard library of the language of choice. You will need to make sure it's small enough to fit in memory and read the whole dimension topic every time the app restarts. Anything in the order of magnitude of thousands should be enough to qualify for this case.
You can find an implementation of this approach in the Kafka Streams API with GlobalKTable, as the official documentation states:
If you read the input topic into a GlobalKTable, then the local GlobalKTable instance of each application instance will be populated with data from all partitions of the topic.
If it's too big to fit in memory or the consumer startup times become an issue, you can also use an in-memory database like Redis or DynamoDB, which delivers single-digit millisecond latencies at any scale for lookup queries. This will prevent future OutOfMemoryException
as the Product
topic grows, and you won't have to read the whole topic every time for every consumer as the state is persisted outside the application. Make sure you use the same consumer-group
across restarts when using this approach.
Message ordering
As a general rule of thumb, you will probably want to reuse the same message key
of ProductPurchased
when you produce them back to Kafka, especially if the message order is important. Since ProductPurchased
and EnrichedProductPurchased
have a one to one relationship and they will be consumed and produced in the same order they come.
Just to be clear, a message in ProductPurchased
with key X
will be written with key X
in EnrichedProductPurchased
. Every message might have a different key, but consistent across the two topics. This is important so that all messages with the same key fall in the same partition. That's how you leverage the natural chronological order within partitions.
This scenario will always produce chronological data from the point of view of the main topic.
Number of consumers
This is a question that arises more often than not. How many consumers do I need?
Generally is a good practice to go with one consumer per input topic. This means that in our example, there would be two consumers.
The only scenario where you only need one topic is if the lookup topic has static data that never changes. In this case, you can read it at the beginning of the application, close it as you won't be getting any new messages from that topic, and then reuse it to read the facts topic.
Correctness
If the Product
data is going to change over time, for instance, updates in the name or description of it, you will need two consumers. One listening to the dimension topic and updating the snapshot as new product changes are consumed, and another one consuming from the ProductPurchased
topic. This will have some out-of-order errors but will be good enough for most use cases.
Note that GlobalKTable
implements this approach. From the documentation,
GlobalKTable provides the ability to look up current values of data records by keys. [..] Note that a GlobalKTable has no notion of time in contrast to a KTable
If you want to produce chronologically perfect enrichments, you will have to read from both topics in a synchronized manner. This will avoid the race condition of enriching ProductPurchased
events with Product
data that was not available when that particular purchase was made.
The technique here is to only consume messages from the Product
topic that has a smaller timestamp than the message being enriched at ProductPurchased
. In other words, you "time travel" the Product
snapshot to replicate its state at the moment when that ProductPurchased
event was generated. This approach is very hard technically, and has many challenges you need to solve like:
Product
snapshot being shared by multiple consumers might be a bad idea, as every consumer could be at a different point in time. You will probably have to have a different snapshot per consumer instance.consumer group
, and now a particular consumer instance gets assigned a new partition with older ProductPurchsed
events than the snapshot it is currently at. Do you need to reprocess the snapshot from the beginning to go back in time? Do you implement checkpoints?Multiple dimensions
You can easily extend this scenario to multiple dimensions or lookup topics.
This scenario would be equivalent of join
. This one is by far the hardest and most heavy computationally speaking.
Kafka Streams
The Kafka Streams API offers some simple to use in-memory computation for joins. To qualify there is one requirement and one constraint.
The requirement is what they call co-partitioning. Both topics need to have the same number of partitions and all messages need to have been written using the same partitioning strategy. Co-partitioning ensures that there will be a mapping one to one on partitions for all messages with the same key in both topics, which allows distributing the load horizontally among all stream processor instances. This is an obligatory requirement so you will have to plan ahead about what joins you need to do or reprocess into new topics.
The constraint has to do with windowing. The reason provided by the official docs:
KStream-KStream joins are always windowed joins, because otherwise, the size of the internal state store used to perform the join – e.g., a sliding window or “buffer” – would grow indefinitely.
This means you can only join with messages available in the latest slice, based on time or length.
There are several strategies on how that window can be computed:
Beyond Kafka Streams
The window constraint could be dropped by using a cloud key-value store like DynamoDB, which can grow indefinitely. Essentially, this would allow having a size window larger than the stream itself.
This is more of a personal choice and depends on every use case, I would recommend persisting the results in a new combined topic when:
Be thoughtful about partitions. If you reduce the input topics into one single topic of one partition, it will easily become your bottleneck in the future.
It's usually advised to use the Consumer API
when you are not writing back to your Kafka cluster, but using the Streams API
is definitely possible as well.
If you want to serve through Rest API
, you will either have to keep in memory the resulting messages or store them in an intermediate persistence layer like a database that the Rest API can later query based on users HTTP requests.