Click to copy

Deep dive

How to Start Using Apache Kafka in Python

Overview

The recent rise of distributed systems has pushed Apache Kafka as one of the most sought-after technologies and is currently used by most of the big companies worldwide.

Apache Kafka is a distributed streaming platform and is useful for building real-time streaming data pipelines to get data between different systems or applications.

In this post, you're going to learn how to leverage Kafka as a Python programmer, so you too can start building distributed systems using this powerful technology.

The first thing we need to do is to set up Kafka locally and run a little "hello world"-type test to check that everything is working as expected (in the Kafka world this means to use the terminal to send and read some messages), then we'll jump to the fun python part where we will write some code to interact with Kafka, specifically a Kafka producer and consumer.

Apache Kafka local development

How to set up Apache Kafka locally

Kafka is available in two different flavors: one offered by the Apache software foundation and the other by Confluent Kafka.

If you're new to Kafka chances are that you don't know who these confluent people are, Confluent is an American big data company founded by three LinkedIn Engineers, who were part of the original team behind the creation of Kafka.

Confluent Kafka is mainly a hosting solution for Apache Kafka. It provides most of the Kafka features plus a few other things.

For the rest of this tutorial, we'll be using the Apache Foundation version of Kafka just to keep things simple.

To get Apache Kafka up and running on your computer, you'll need to follow these steps:

1. The first requirement is that your local environment must have Java 8+ installed.

sudo apt update
sudo apt install default-jdk

2. The next step is to download it.

tar -xzf kafka_2.13-3.1.0.tgz
cd kafka_2.13-3.1.0

3. Running the following command will start all the needed services in the correct order, keep it running in the background.

bin/zookeeper-server-start.sh config/zookeeper.properties

4. Finally, on another terminal session run:

bin/kafka-server-start.sh config/server.properties

Once all services have successfully started, you will have a basic Kafka environment running locally and ready to use. But before we continue, there are some concepts that need to be understood.

Why do we need Zookeeper

Did you notice that we needed 2 different commands to start the Kafka environment? One of them made use of Zookeeper. You may be wondering what it is and why do we require it.

To put it simply, Zookeeper is a top-level software also maintained by Apache that acts as a centralized service used to maintain naming and configuration data. It provides a robust synchronization within distributed systems. Zookeeper keeps track of the status of the Kafka cluster nodes, Kafka topics, partitions, etc.

In the future (soon) ZooKeeper will no longer be required by Apache Kafka, but in the meantime, we'll have to deal with it.

Apache Kafka foundations

The most fundamental Kafka building block

A message, also called a record or event, is the basic piece of data flowing through Kafka. These messages are how data is represented inside Kafka. A message is made out of 4 parts.

    An optional Key.
    A value (where the content of your data goes).
    A timestamp.
    Some optional metadata headers.

Other examples of possible messages might be payment transactions, geolocation updates from mobile phones, shipping orders, sensor measurements from IoT devices or medical equipment, etc.

The Kafka broker

After the message, the second main concept in Kafka is the broker, they can be thought of as the server-side of Kafka. A Kafka cluster is formed by one or more Kafka brokers.

Following this short explanation, we are going to create a topic and send a message to it from the command line first, and then we will do the same thing using a python script.

One thing to note is that Kafka was built with the command-line in mind. There is no official GUI, the closest thing that may aid us are projects like KafkaIDE.

Creating a topic

All messages are stored in a topic. And in turn, topics consist of units called partitions. To say it another way, one or many partitions make up a single topic.

While the topic is used to logically organize messages in Kafka, a partition is the smallest storage unit that holds a subset of records owned by a topic.

To create a new topic, you can run the following command (we will use this topic later in our python demonstration):

bin/kafka-topics.sh --create --topic employees --partitions 1 --replication-factor 1 --bootstrap-server localhost:9092

If we don't specify the number of partitions when creating a new topic, it will use the broker's default configuration, by default only 1 partition per topic.

Kafka producers and consumers

Kafka acts as a centerpiece in the communication between different services. Some of them produce messages, others consume messages, these are the Kafka producer and consumer.

A Kafka producer is a client application that publishes (writes) messages in a Kafka topic, and a Kafka consumer subscribes to that same topic to read and process those messages.

Sending some messages from the terminal

A Kafka client communicates with the Kafka brokers via the network for writing and reading messages. Once received, the brokers will store the events in a durable and fault-tolerant manner for as long as you need them.

Unlike Amazon Kinesis and Google PubSub where a message can be stored for a maximum of 7 days, Apache Kafa can act as a database and store the messages forever.

You may run the following console command to write a few events into your topic (it's a basic Kafka producer client). By default, each line you enter will result in a separate event being written on a topic.

bin/kafka-console-producer.sh --topic employees --bootstrap-server localhost:9092

Consuming messages from the terminal

On the consumer side, the easiest way to read out the data from Kafka is to use the terminal as well.

Open another terminal session and run the following console consumer command to read the events you just created:

bin/kafka-console-consumer.sh --topic employees --from-beginning --bootstrap-server localhost:9092

It's important to remark that events within Kafka are durably stored, as I mentioned earlier, and that they can be read as many times and by many consumers at the same time.

If you're able to read some data using the previous command, it means that everything has been set up correctly on your end and that you're ready to use Python to build your own producer and consumer.

A python library that will help us in the process

The best Kafka Python client

To start working with Kafka in Python, we will need a good library that will aid us in the process.

Luckily there are multiple libraries available for us to choose from, the most important ones are:

  • Kafka-Python: It's an open-source community-based library. It's the most popular by far.
  • Confluent Python Kafka: This Kafka python client is offered by Confluent as a thin wrapper around librdkafka, a C/C++ client, hence its performance is better.
  • PyKafka: It's worth mentioning this third option, although it's more limited than the previous two.
  • For this tutorial, we'll go with the popular option, the kafka-python library. In the words of their creators, it is designed to function much like the official java client, with a sprinkling of pythonic interfaces

    To start working with it, you can open up a terminal and install it using pip:

    pip install kafka-python
    

    Sending some messages using a kafka-python client

    This is the part where we will be doing some actual python coding to create a producer.

    What we will do now is to create a python script to produce some messages that will be sent to the "employees" Kafka topic that we previously created.

    The following code shows you how to connect to the Kafka cluster (our broker) and how to send it different messages. In this example, we have a list of different employees, and we pass their information to the topic one by one.

    import json
    
    from kafka import KafkaProducer
    
    
    def publish_message(kafka_producer, topic_name, key, value):
        try:
            key_bytes = bytes(key, encoding='utf-8')
            value_bytes = bytes(value, encoding='utf-8')
            kafka_producer.send(topic_name, key=key_bytes, value=value_bytes)
            kafka_producer.flush()
            print('Message published successfully.')
        except Exception as ex:
            print(str(ex))
    
    
    if __name__ == '__main__':
        kafka_producer = KafkaProducer(bootstrap_servers=['localhost:9092'], api_version=(0, 10))
        employees = [
            {
                "name": "John Smith",
                "id": 1
            }, {
                "name": "Susan Doe",
                "id": 2
            }, {
                "name": "Karen Rock",
                "id": 3
            },
        ]
        for employee in employees:
            publish_message(
                kafka_producer=kafka_producer,
                topic_name='employees',
                key=employee['id'],
                value=json.dumps(employee)
            )
        if kafka_producer is not None:
            kafka_producer.close()
    

    The interface to communicate with Kafka in python using this library is pretty simple, we instantiate a new KafkaProducer and use its send method to stream some data to the broker.

    Auxiliary tools that will help you code faster

    An example of how how KafkaIDE works

    KafkaIDE is a very useful tool when it comes to getting visual help on what's going on in the brokers. You can use it to help you implement and debug your Kafka applications.

    It is a desktop application that can run pretty much everywhere and can connect to any cluster wherever is located as long as you have connectivity from your laptop or system, so make sure to check it out!

    Consuming messages using the kafka-python client

    Now that we have some data on our topic, it's time to set up a python Kafka consumer using the kafka-python library.

    The following piece of python code connects to the topic and reads all the messages. It's also important to note that a consumer is not only used to get some data, but to do something with it. In this example, we are just printing out some strings to the console, but you can imagine we can do pretty much anything here.

    import json
    from time import sleep
    
    from kafka import KafkaConsumer
    
    if __name__ == '__main__':
        consumer = KafkaConsumer(
            'employees',
            auto_offset_reset='earliest',
            bootstrap_servers=['localhost:9092'],
            api_version=(0, 10),
            consumer_timeout_ms=1000
        )
        for msg in consumer:
            record = json.loads(msg.value)
            employee_id = int(record['id'])
            name = record['name']
    
            if employee_id != 3:
                print(f"This employee is not Karen. It's actually {name}")
            sleep(3)
    
        if consumer is not None:
            consumer.close()
    

    And with these two python programs, the producer and the consumer, now you actually know how to use the combined power of Kafka + Python.

    Real-world use cases

    Why would a software developer be interested in learning about Apache Kafka?

  • Coupling: Kafka can be used as a message broker that sits between services. If you are implementing a microservice architecture you can have a microservice as a producer and another as a consumer, this can help you to dramatically reduce coupling between different parts of your system.
  • ETL: Kafka allows for almost real-time streaming, thus you have all the flexibility required to come up with an ETL according to your needs.
  • Database: Based on some of the points I mentioned above, you may say that Kafka also acts as a database. Not a typical database, but one that can keep data for as long as you want without consuming it.
  • Internet of things (IoT): Kafka can be used to collect data from a website or physical sensors and devices. Producers can publish raw data from data sources that later use it.
  • Log Aggregation: You can use Kafka to collect logs from different services and store them in a centralized way for further processing.
  • Conclusion

    Kafka is an awesome, scalable, fault-tolerant, stream processing, publish-subscribe messaging system that enables you to build distributed applications.

    This technology is quickly becoming the bread and butter of many companies. It's a tool that's in great demand because it provides a lot of value to organizations, and we as programmers must always keep up to date with trends like this one.

    In this post, we covered the gist of Kafka and how to interact with it using python. To learn more about these topics, feel free to explore more articles from this blog ;)