Setup a single Apache Kafka node on CentOS 7

Published by Alexander Braun on 30 Mar 2018 - tagged with Java, Apache Kafka

Apache Kafka is an open source distributed stream processing platform. From a high-level perspective, Kafka is a distributed messaging system that allows producers to send messages to a topic and consumers to read messages from a topic. Kafka is massively scalable and offers high throughput and low latency when operated in a cluster. This post explains how to set up a single Kafka node for a development environment.

Prerequisites

We are going to setup Kafka on a CentOS 7 Linux server. For this purpose, we need a CentOS 7 virtual machine with the latest updates and JDK 1.8. You can find the process to set up such a Virtual Box image in this post.

In case you prefer to set up a small clustered Kafka environment, you can find detailed instructions in this post.

Create firewall rule

The default minimal installation of CentOS 7 activates firewalld. To connect to our Kafka instance we have to create a rule to open port 9092. In CentOS 7 we have to add the corresponding firewall rule.

Kafka firewall rule

For Kafka, we have to open port 9092.

[user@kafka1 ~]$ sudo vi /etc/firewalld/services/kafka.xml

We have to add the following content to the kafka.xml file.

<?xml version="1.0" encoding="utf-8"?>
<service>
  <short>Kafka</short>
  <description>Firewall rule for Kafka port</description>
  <port protocol="tcp" port="9092"/>
</service>

Activate the new rule

Now we can activate the new firewall rule. Let's first restart the firewalld service to enforce that all existing service specifications are reloaded.

[user@kafka1 ~]$ sudo service firewalld restart

We can now permanently add the firewall rule for Kafka.

[user@kafka1 ~]$ sudo firewall-cmd --permanent --add-service=kafka

After activating the rule we have to restart firewalld.

[user@kafka1 ~]$ sudo service firewalld restart

To ensure that everything works as expected we can check if the new rule has been activated.

[user@kafka1 ~]$ sudo firewall-cmd --list-services
ssh dhcpv6-client ntp kafka

The outout shows that the Kafka rule is active.

Create kafka user

I recommend creating a separate kafka user for operating the Kafka node.

[user@kafka1 ~]$ sudo adduser kafka
[user@kafka1 ~]$ sudo passwd kafka

After creating the user we can switch to the new user and can perform the remaining configuration.

[user@kafka1 ~]$ su kafka

Install ZooKeeper and Kafka

Now we can download Apache Kafka

[kafka@kafka1 ~]$ wget http://apache.forsale.plus/kafka/1.0.0/kafka_2.11-1.0.0.tgz

Let's decompress the tar.gz file

[kafka@kafka1 ~]$ tar -xzf kafka_2.11-1.0.0.tgz

That's it! As the Kafka archive already includes ZooKeeper we already have a fully working Kafka installation on our system. Let's adjust the configuration.

Create directories

We have to create data and log directories for ZooKeeper and Kafka. To simplify this process we can add the directories within the user home directory. In a production environment, we would use different locations, e.g. separate mount points or physical disks for data and log directories.

[kafka@kafka1 ~]$ mkdir -p /home/kafka/zookeeper/data
[kafka@kafka1 ~]$ mkdir -p /home/kafka/kafka/kafka-logs

ZooKeeper configuration

The configuration file of the embedded ZooKeeper instance is located at kafka_2.11-1.0.0/config/zookeeper.properties.

[kafka@kafka1 ~]$ vi kafka_2.11-1.0.0/config/zookeeper.properties

Within this file, we have to locate the dataDir property and set the value to point to the new ZooKeeper directory we created above.

dataDir=/home/kafka/zookeeper/data

Each Kafka node needs a unique server id. ZooKeeper looks up this information from the following file: /home/kafka/zookeeper/data/myid. As we have only one node we can simply assign the value "1" for our instance.

[kafka@kafka1 ~]$ echo "1" > /home/kafka/zookeeper/data/myid

Apache Kafka configuration

Now we can adjust the Kafka configuration files stored here: kafka_2.11-1.0.0/config/server.properties.

[kafka@kafka1 ~]$ vi kafka_2.11-1.0.0/config/server.properties

Similar to the ZooKeeper configuration, each Kafka node needs a unique id. We have to find the broker.id property in the configuration file and change the id. I recommend using the same value as we used for ZooKeeper: 1.

broker.id=1

We also have to change the log directory location specified in the log.dirs parameter.

log.dirs=/home/kafka/kafka/kafka-logs

Additionally, we have to update the listeners and advertised.listeners properties with the Kafka node IP address - in my example 192.168.1.120. We can look up the IP address with ip a.

  • listeners: the address / server name and protocol kafka is listening to (internal traffic between Kafka nodes)
  • advertised.listener: the address / server name and protocol clients can use to connect to the Kafka cluster (external traffic). Only need to be specified if different from above setting.
listeners=PLAINTEXT://192.168.1.120:9092
advertised.listeners=PLAINTEXT://192.168.1.120:9092

In a development environment, I usually add the property delete.topic.enable. Setting this property to true allows us to easily delete topics at runtime. If this property is not being set, Kafka will only mark topics as deleted.

delete.topic.enable=true

That's it, we have configured our single node Kafka server!

Start and test the cluster setup

We finally can startup ZooKeeper and Kafka and perform a quick test.

Start ZooKeeper

To start ZooKeeper we execute the following command:

[kafka@kafka1 ~]$ cd kafka_2.11-1.0.0
[kafka@kafka1 ~]$ nohup bin/zookeeper-server-start.sh config/zookeeper.properties &

Start Apache Kafka

Now we can start Kafka:

[kafka@kafka1 ~]$ nohup bin/kafka-server-start.sh config/server.properties &

Create a new topic

To test the setup we have to create a topic.

[kafka@kafka1 ~]$ bin/kafka-topics.sh --create --zookeeper 192.168.1.120:2181 --replication-factor 1 --partitions 6 --topic topic1 --config cleanup.policy=delete --config delete.retention.ms=60000

The command above creates a new topic named topic1 with 6 partitions.

We can also get a list of all existing topics

[kafka@kafka1 ~]$ bin/kafka-topics.sh --list --zookeeper 192.168.1.120:2181

And we can get a detailed description of our topic.

[kafka@kafka1 ~]$ bin/kafka-topics.sh --describe --zookeeper kafka:2181 --topic topic1

In my case the command above prints out:

Topic:topic1	PartitionCount:6	ReplicationFactor:1	Configs:delete.retention.ms=60000,cleanup.policy=delete
	Topic: topic1	Partition: 0	Leader: 1	Replicas: 1	Isr: 1
	Topic: topic1	Partition: 1	Leader: 1	Replicas: 1	Isr: 1
	Topic: topic1	Partition: 2	Leader: 1	Replicas: 1	Isr: 1
	Topic: topic1	Partition: 3	Leader: 1	Replicas: 1	Isr: 1
	Topic: topic1	Partition: 4	Leader: 1	Replicas: 1	Isr: 1
	Topic: topic1	Partition: 5	Leader: 1	Replicas: 1	Isr: 1

The command shows which server is responsible for which partition and which server replicates the data. In our case, with only one node, node 1 stores all data.

Test the cluster

The Kafka package already includes two command line tools to create a producer and a consumer that can be used to check if the node works.

We can start the producer on our Kafka server using the command below. The command opens a prompt and anything we enter here will be sent to the topic.

[kafka@kafka1 ~]$ bin/kafka-console-producer.sh --broker-list 192.168.1.120:9092 --topic topic1

Now we can start a consumer on our Kafka server.

[kafka@kafka1 ~]$ bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.120:9092 --topic topic1

Whenever we enter something in the producer prompt it will be printed out in our consumer terminal.

This means our test was successful, the Kafka instance has been set up. In the next couple of weeks, I will add more Kafka examples and use cases, including concrete examples in Java.