This is a common question asked by many developers new to Apache Kafka.
Let's imagine a situation where you have two topics
Topic A and
Topic B. You want to combine them, apply some necessary transformations, maybe filter out some messages and write it back to a combined topic
Topic C or store the result somewhere else. What's the best way to do this?
Although the example uses two input topics, the solutions proposed here also apply to any amount of source topics.
This can be done in so many different ways that every technique introduced here could be several posts on its own. We will highlight the major ones based on every use case.
This scenario would be equivalent to
union in SQL.
Probably the simplest scenario you can encounter. You can often look at them as subsets of a larger dataset.
Examples for both topics could be
LightingProductPurchased, and you want to abstract those messages into a new topic
If message order is not important, you can subscribe to both topics using a single consumer. If you need to produce them in chronological order, you may need two consumers where you keep polling the one with the earliest messages and producing using one producer.
For extra convenience and data traceability, you may want to add a discriminator field to those messages to know where they came from once they are in the resulting dataset.
This scenario would be a particular case of
One concrete example is having two topics
Product. You want to enrich the messages in
ProductPurchased, that have a
productId, with the product information available in the
Product topic. The results will be written to a new topic called
EnrichedProductPurchased, or stored somewhere else.
You can think about
Product topic as a lookup dataset, which would be a small slow-changing dataset that more often than not can fit in memory.
If you are familiar with data warehouse techniques like the dimensional modelling introduced by Kimball,
ProductPurchased would be the fact topic and
Product would be the dimension topic. Instead of tables, you have topics here.
The main technique here is to first read the
Product topic to generate an in-memory snapshot of all the product information available. Once you have done that, you can start consuming
ProductPurchased and enriching the incoming messages by looking up the
productId in the in-memory mapping you've just generated.
Generating the lookup snapshot
In many situations, the dimension or lookup topic
Product is quite small. In those cases, you can load it in a normal dictionary using the standard library of the language of choice. You will need to make sure it's small enough to fit in memory and read the whole dimension topic every time the app restarts. Anything in the order of magnitude of thousands should be enough to qualify for this case.
You can find an implementation of this approach in the Kafka Streams API with GlobalKTable, as the official documentation states:
If you read the input topic into a GlobalKTable, then the local GlobalKTable instance of each application instance will be populated with data from all partitions of the topic.
If it's too big to fit in memory or the consumer startup times become an issue, you can also use an in-memory database like Redis or DynamoDB, which delivers single-digit millisecond latencies at any scale for lookup queries. This will prevent future
OutOfMemoryException as the
Product topic grows, and you won't have to read the whole topic every time for every consumer as the state is persisted outside the application. Make sure you use the same
consumer-group across restarts when using this approach.
As a general rule of thumb, you will probably want to reuse the same
message key of
ProductPurchased when you produce them back to Kafka, especially if the message order is important. Since
EnrichedProductPurchased have a one to one relationship and they will be consumed and produced in the same order they come.
Just to be clear, a message in
ProductPurchased with key
X will be written with key
EnrichedProductPurchased. Every message might have a different key, but consistent across the two topics. This is important so that all messages with the same key fall in the same partition. That's how you leverage the natural chronological order within partitions.
This scenario will always produce chronological data from the point of view of the main topic.
Number of consumers
This is a question that arises more often than not. How many consumers do I need?
Generally is a good practice to go with one consumer per input topic. This means that in our example, there would be two consumers.
The only scenario where you only need one topic is if the lookup topic has static data that never changes. In this case, you can read it at the beginning of the application, close it as you won't be getting any new messages from that topic, and then reuse it to read the facts topic.
Product data is going to change over time, for instance, updates in the name or description of it, you will need two consumers. One listening to the dimension topic and updating the snapshot as new product changes are consumed, and another one consuming from the
ProductPurchased topic. This will have some out-of-order errors but will be good enough for most use cases.
GlobalKTable implements this approach. From the documentation,
GlobalKTable provides the ability to look up current values of data records by keys. [..] Note that a GlobalKTable has no notion of time in contrast to a KTable
If you want to produce chronologically perfect enrichments, you will have to read from both topics in a synchronized manner. This will avoid the race condition of enriching
ProductPurchased events with
Product data that was not available when that particular purchase was made.
The technique here is to only consume messages from the
Product topic that has a smaller timestamp than the message being enriched at
ProductPurchased. In other words, you "time travel" the
Product snapshot to replicate its state at the moment when that
ProductPurchased event was generated. This approach is very hard technically, and has many challenges you need to solve like:
Productsnapshot being shared by multiple consumers might be a bad idea, as every consumer could be at a different point in time. You will probably have to have a different snapshot per consumer instance.
consumer group, and now a particular consumer instance gets assigned a new partition with older
ProductPurchsedevents than the snapshot it is currently at. Do you need to reprocess the snapshot from the beginning to go back in time? Do you implement checkpoints?
You can easily extend this scenario to multiple dimensions or lookup topics.
This scenario would be equivalent of
join. This one is by far the hardest and most heavy computationally speaking.
The Kafka Streams API offers some simple to use in-memory computation for joins. To qualify there is one requirement and one constraint.
The requirement is what they call co-partitioning. Both topics need to have the same number of partitions and all messages need to have been written using the same partitioning strategy. Co-partitioning ensures that there will be a mapping one to one on partitions for all messages with the same key in both topics, which allows distributing the load horizontally among all stream processor instances. This is an obligatory requirement so you will have to plan ahead about what joins you need to do or reprocess into new topics.
The constraint has to do with windowing. The reason provided by the official docs:
KStream-KStream joins are always windowed joins, because otherwise, the size of the internal state store used to perform the join – e.g., a sliding window or “buffer” – would grow indefinitely.
This means you can only join with messages available in the latest slice, based on time or length.
There are several strategies on how that window can be computed:
Beyond Kafka Streams
The window constraint could be dropped by using a cloud key-value store like DynamoDB, which can grow indefinitely. Essentially, this would allow having a size window larger than the stream itself.
This is more of a personal choice and depends on every use case, I would recommend persisting the results in a new combined topic when:
Be thoughtful about partitions. If you reduce the input topics into one single topic of one partition, it will easily become your bottleneck in the future.
It's usually advised to use the
Consumer API when you are not writing back to your Kafka cluster, but using the
Streams API is definitely possible as well.
If you want to serve through
Rest API, you will either have to keep in memory the resulting messages or store them in an intermediate persistence layer like a database that the Rest API can later query based on users HTTP requests.