Overview

This is a common question asked by many developers new to Apache Kafka.

Let's imagine a situation where you have two topics Topic A and Topic B. You want to combine them, apply some necessary transformations, maybe filter out some messages and write it back to a combined topic Topic C or store the result somewhere else. What's the best way to do this?

Although the example uses two input topics, the solutions proposed here also apply to any amount of source topics.

Identifying the problem

  • Combining data. Join or union?
  • Is order important between messages from both topics?
  • How are final users going to access the resulting data? Can they connect to Apache Kafka or data should be served through a Rest API or Database?
  • Combining data

    This can be done in so many different ways that every technique introduced here could be several posts on its own. We will highlight the major ones based on every use case.

    a) Both topics contain similar schemed messages

    This scenario would be equivalent to union in SQL.

    Probably the simplest scenario you can encounter. You can often look at them as subsets of a larger dataset.

    Examples for both topics could be FurnitureProductPurchased and LightingProductPurchased, and you want to abstract those messages into a new topic ProductPurchased.

    If message order is not important, you can subscribe to both topics using a single consumer. If you need to produce them in chronological order, you may need two consumers where you keep polling the one with the earliest messages and producing using one producer.

    For extra convenience and data traceability, you may want to add a discriminator field to those messages to know where they came from once they are in the resulting dataset.

    b) One topic enriches the other one

    This scenario would be a particular case of join.

    One concrete example is having two topics ProductPurchased and Product. You want to enrich the messages in ProductPurchased, that have a productId, with the product information available in the Product topic. The results will be written to a new topic called EnrichedProductPurchased, or stored somewhere else.

    You can think about Product topic as a lookup dataset, which would be a small slow-changing dataset that more often than not can fit in memory.

    If you are familiar with data warehouse techniques like the dimensional modelling introduced by Kimball, ProductPurchased would be the fact topic and Product would be the dimension topic. Instead of tables, you have topics here.

    Main technique

    The main technique here is to first read the Product topic to generate an in-memory snapshot of all the product information available. Once you have done that, you can start consuming ProductPurchased and enriching the incoming messages by looking up the productId in the in-memory mapping you've just generated.

    Generating the lookup snapshot

    In many situations, the dimension or lookup topic Product is quite small. In those cases, you can load it in a normal dictionary using the standard library of the language of choice. You will need to make sure it's small enough to fit in memory and read the whole dimension topic every time the app restarts. Anything in the order of magnitude of thousands should be enough to qualify for this case.

    You can find an implementation of this approach in the Kafka Streams API with GlobalKTable, as the official documentation states:

    If you read the input topic into a GlobalKTable, then the local GlobalKTable instance of each application instance will be populated with data from all partitions of the topic.

    If it's too big to fit in memory or the consumer startup times become an issue, you can also use an in-memory database like Redis or DynamoDB, which delivers single-digit millisecond latencies at any scale for lookup queries. This will prevent future OutOfMemoryException as the Product topic grows, and you won't have to read the whole topic every time for every consumer as the state is persisted outside the application. Make sure you use the same consumer-group across restarts when using this approach.

    Message ordering

    As a general rule of thumb, you will probably want to reuse the same message key of ProductPurchased when you produce them back to Kafka, especially if the message order is important. Since ProductPurchased and EnrichedProductPurchased have a one to one relationship and they will be consumed and produced in the same order they come.

    Just to be clear, a message in ProductPurchased with key X will be written with key X in EnrichedProductPurchased. Every message might have a different key, but consistent across the two topics. This is important so that all messages with the same key fall in the same partition. That's how you leverage the natural chronological order within partitions.

    This scenario will always produce chronological data from the point of view of the main topic.

    Number of consumers

    This is a question that arises more often than not. How many consumers do I need?

    Generally is a good practice to go with one consumer per input topic. This means that in our example, there would be two consumers.

    The only scenario where you only need one topic is if the lookup topic has static data that never changes. In this case, you can read it at the beginning of the application, close it as you won't be getting any new messages from that topic, and then reuse it to read the facts topic.

    Correctness

    If the Product data is going to change over time, for instance, updates in the name or description of it, you will need two consumers. One listening to the dimension topic and updating the snapshot as new product changes are consumed, and another one consuming from the ProductPurchased topic. This will have some out-of-order errors but will be good enough for most use cases.

    Note that GlobalKTable implements this approach. From the documentation,

    GlobalKTable provides the ability to look up current values of data records by keys. [..] Note that a GlobalKTable has no notion of time in contrast to a KTable

    If you want to produce chronologically perfect enrichments, you will have to read from both topics in a synchronized manner. This will avoid the race condition of enriching ProductPurchased events with Product data that was not available when that particular purchase was made.

    The technique here is to only consume messages from the Product topic that has a smaller timestamp than the message being enriched at ProductPurchased. In other words, you "time travel" the Product snapshot to replicate its state at the moment when that ProductPurchased event was generated. This approach is very hard technically, and has many challenges you need to solve like:

  • Consumer reading multiple partitions, where each partition is at a different timestamp. You will have to add logic to always read the earliest message. You will probably have to poll many times a certain partition until it catches the rest.
  • Having a Product snapshot being shared by multiple consumers might be a bad idea, as every consumer could be at a different point in time. You will probably have to have a different snapshot per consumer instance.
  • Your Apache Kafka cluster rebalances the partitions in your consumer group, and now a particular consumer instance gets assigned a new partition with older ProductPurchsed events than the snapshot it is currently at. Do you need to reprocess the snapshot from the beginning to go back in time? Do you implement checkpoints?
  • Multiple dimensions

    You can easily extend this scenario to multiple dimensions or lookup topics.

    c) Topics have different schema

    This scenario would be equivalent of join. This one is by far the hardest and most heavy computationally speaking.

    Kafka Streams

    The Kafka Streams API offers some simple to use in-memory computation for joins. To qualify there is one requirement and one constraint.

    The requirement is what they call co-partitioning. Both topics need to have the same number of partitions and all messages need to have been written using the same partitioning strategy. Co-partitioning ensures that there will be a mapping one to one on partitions for all messages with the same key in both topics, which allows distributing the load horizontally among all stream processor instances. This is an obligatory requirement so you will have to plan ahead about what joins you need to do or reprocess into new topics.

    The constraint has to do with windowing. The reason provided by the official docs:

    KStream-KStream joins are always windowed joins, because otherwise, the size of the internal state store used to perform the join – e.g., a sliding window or “buffer” – would grow indefinitely.

    This means you can only join with messages available in the latest slice, based on time or length.

    There are several strategies on how that window can be computed:

  • Tumbling time windows: time-based, fixed-size, non-overlapping, gap-less windows
  • Hopping time windows: time-based, fixed-size, overlapping windows
  • Sliding time windows: time-based, fixed-size, overlapping windows that work on differences between record timestamps
  • Session Windows: session-based, dynamically-sized, non-overlapping, data-driven windows
  • Beyond Kafka Streams

    The window constraint could be dropped by using a cloud key-value store like DynamoDB, which can grow indefinitely. Essentially, this would allow having a size window larger than the stream itself.

    Once topics are joined together

    Do I persist the results in a new topic?

    This is more of a personal choice and depends on every use case, I would recommend persisting the results in a new combined topic when:

  • results will be used by more than one app, avoiding having the same logic duplicated across your codebase.
  • the merge logic is quite complex, as It will help you debug if something goes wrong by consuming the results directly and comparing those with the input topics.
  • Storing in Apache Kafka

    Be thoughtful about partitions. If you reduce the input topics into one single topic of one partition, it will easily become your bottleneck in the future.

    Storing in Database or serving through Rest API

    It's usually advised to use the Consumer API when you are not writing back to your Kafka cluster, but using the Streams API is definitely possible as well.

    If you want to serve through Rest API, you will either have to keep in memory the resulting messages or store them in an intermediate persistence layer like a database that the Rest API can later query based on users HTTP requests.