Setup a Kafka cluster with 3 nodes on CentOS 7

Published by Alexander Braun on 17 Feb 2018 - tagged with Linux, Java, Apache Kafka

Apache Kafka is an open source distributed stream processing platform. From a high-level perspective, Kafka is a distributed messaging system that allows producers to send messages to a topic and consumers to read messages from a topic. Kafka is massively scalable and offers high throughput and low latency when operated in a cluster. This post explains how to set up a Kafka cluster consisting of 3 nodes for a development environment.

Prerequisites

As we are going to set up a 3 nodes Kafka cluster we need 3 CentOS 7 Linux servers with the latest updates and JDK 1.8. In this example, I use 3 virtual machines. You can find the process to set up such a Virtual Box image in this post.

Network Design

The diagram below shows the network design we will use to set up the cluster. The hostnames of the virtual machines are kafka1, kafka2, and kafka3. We also assign a fixed IP address for each of the servers (see next section). Additionally, we will create firewall rules to ensure that the cluster nodes can communicate with each other. The required ports are 2888, 3888 and 2181 for ZooKeeper and 9092 for Kafka.

Kafka Cluster Network Design

Assign static IP Addresses

To set up the cluster we should assign fixed IP addresses to our cluster nodes. This simplifies the ZooKeeper and Kafka configurations described in subsequent steps.

The easiest way to assign a fixed IP address for each server is to add a DHCP reservation rule in our router. We only need the MAC address of the virtual machine network interface. We can get the MAC address by executing ip a on each of the servers.

[user@kafka1 ~]$ ip a
1: lo:  mtu 65536 qdisc noqueue state UNKNOWN qlen 1
    link/loopback 00:00:00:00:00:00 brd 00:00:00:00:00:00
    inet 127.0.0.1/8 scope host lo
       valid_lft forever preferred_lft forever
    inet6 ::1/128 scope host
       valid_lft forever preferred_lft forever
2: enp0s3:  mtu 1500 qdisc pfifo_fast state UP qlen 1000
    link/ether 08:00:27:74:3a:f4 brd ff:ff:ff:ff:ff:ff
    inet 192.168.1.101/24 brd 192.168.1.255 scope global dynamic enp0s3
       valid_lft 86389sec preferred_lft 86389sec
    inet6 2001:569:70df:8b00:8728:ced9:1c2a:387f/64 scope global noprefixroute dynamic
       valid_lft 14696sec preferred_lft 14396sec
    inet6 fe80::ae37:4c20:e5bd:8014/64 scope link
       valid_lft forever preferred_lft forever

The example above shows that the virtual machine uses MAC address 08:00:27:74:3a:f4. Now we can use the MAC address to assign a DHCP rule in the router. The picture below shows the rules for all server nodes.

Static IP Addresses DHCP Rule

In case you don't have access to the router and cannot set a DHCP rule you can configure a static IP address as described here.

Adjust /etc/hosts file

I usually prefer to use hostnames instead of fixed IP addresses within the configuration files. To achieve this without publicly available domain names we have to add the server names to the /etc/hosts file on each server.

[user@kafka1 ~]$ sudo vi /etc/hosts

Let's add the following entries:

192.168.1.101   kafka1
192.168.1.102   kafka2
192.168.1.103   kafka3

Create firewall rules

As mentioned above we have to open several ports to allow clients to connect to our Kafka cluster and to allow the nodes to communicate with each other. In CentOS 7 we have to add the corresponding firewall rules.

ZooKeeper firewall rule

Let's start with the firewall rule for ZooKeeper. We have to open ports 2888, 3888 and 2181.

[user@kafka1 ~]$ sudo vi /etc/firewalld/services/zooKeeper.xml

We have to add the following content to the zooKeeper.xml file.

<?xml version="1.0" encoding="utf-8"?>
<service>
  <short>ZooKeeper</short>
  <description>Firewall rule for ZooKeeper ports</description>
  <port protocol="tcp" port="2888"/>
  <port protocol="tcp" port="3888"/>
  <port protocol="tcp" port="2181"/>
</service>

Kafka firewall rule

For Kafka, we have to open port 9092.

[user@kafka1 ~]$ sudo vi /etc/firewalld/services/kafka.xml

We have to add the following content to the kafka.xml file.

<?xml version="1.0" encoding="utf-8"?>
<service>
  <short>Kafka</short>
  <description>Firewall rule for Kafka port</description>
  <port protocol="tcp" port="9092"/>
</service>

Activate the new rules

Now we can activate the new firewall rules. Let's first restart the firewalld service to enforce that all existing service specifications are reloaded.

[user@kafka1 ~]$ sudo service firewalld restart

We can now permanently add the firewall rules for ZooKeeper and Kafka.

[user@kafka1 ~]$ sudo firewall-cmd --permanent --add-service=zooKeeper
[user@kafka1 ~]$ sudo firewall-cmd --permanent --add-service=kafka

After activating the rules we have to restart firewalld.

[user@kafka1 ~]$ sudo service firewalld restart

To ensure that everything works as expected we can check if the new services have been activated.

[user@kafka1 ~]$ sudo firewall-cmd --list-services
ssh dhcpv6-client ntp zooKeeper kafka

The output shows that both services are active.

Create Kafka user

I recommend to create a separate kafka user for operating the Kafka node.

[user@kafka1 ~]$ sudo adduser kafka
[user@kafka1 ~]$ sudo passwd kafka

After creating the user we can switch to the new user and can perform the remaining configuration.

[user@kafka1 ~]$ su kafka

Install ZooKeeper and Kafka

Now we can download Apache Kafka

[kafka@kafka1 ~]$ wget http://apache.forsale.plus/kafka/1.0.0/kafka_2.11-1.0.0.tgz

Let's decompress the tar.gz file

[kafka@kafka1 ~]$ tar -xzf kafka_2.11-1.0.0.tgz

That's it! As the Kafka archive already includes ZooKeeper we already have a fully working Kafka installation on our system. To operate a cluster we have to adjust the configuration.

Create directories

We have to create data and log directories for ZooKeeper and Kafka. To simplify this process we can add the directories within the user home directory. In a production environment, we would use different locations, e.g. separate mount points or physical disks for data and log directories.

[kafka@kafka1 ~]$ mkdir -p /home/kafka/zookeeper/data
[kafka@kafka1 ~]$ mkdir -p /home/kafka/kafka/kafka-logs

ZooKeeper configuration

The configuration file of the embedded ZooKeeper instance is located at kafka_2.11-1.0.0/config/zookeeper.properties.

[kafka@kafka1 ~]$ vi kafka_2.11-1.0.0/config/zookeeper.properties

Within this file, we have to locate the dataDir property and set the value to point to the new ZooKeeper directory we created above.

dataDir=/home/kafka/zookeeper/data

At the end of this file, we have to add all available ZooKeeper servers. We also add the initLimit and syncLimit properties. Additional information about these properties can be found here.

server.1=kafka1:2888:3888
server.2=kafka2:2888:3888
server.3=kafka3:2888:3888

initLimit=5
syncLimit=2

Each of our cluster nodes needs a unique server id. ZooKeeper looks up this information from the following file: /home/kafka/zookeeper/data/myid. We have to execute a command like this on each server - using a different value for each instance. For instance 1 on server kafka1 we use the value "1".

[kafka@kafka1 ~]$ echo "1" > /home/kafka/zookeeper/data/myid

Apache Kafka configuration

Now we can adjust the Kafka configuration files stored here: kafka_2.11-1.0.0/config/server.properties.

[kafka@kafka1 ~]$ vi kafka_2.11-1.0.0/config/server.properties

Similar to the ZooKeeper configuration, each Kafka cluster node needs a unique id. We have to find the broker.id property in the configuration file and change the id for each server. I recommend to use 1, 2 and 3.

broker.id=1

We also have to change the log directory location specified in the log.dirs parameter.

log.dirs=/home/kafka/kafka/kafka-logs

Additionally, we have to update the listeners and advertised.listeners properties with the current Kafka node hostname.

  • listeners: the address/server name and protocol kafka is listening to (internal traffic between Kafka nodes)
  • advertised.listener: the address/server name and protocol clients can use to connect to the Kafka cluster (external traffic). Only need to be specified if different from above setting.
listeners=PLAINTEXT://kafka1:9092
advertised.listeners=PLAINTEXT://kafka1:9092

The next step is to tell Kafka which ZooKeeper nodes can be used to connect to. Especially when operating a big cluster with hundreds of nodes, not all available server nodes have to be added here. It is sufficient to add a couple of seed nodes. Kafka will identify all available nodes and updates the available nodes if new nodes join or leave the cluster.

zookeeper.connect=kafka1:2181,kafka2:2181,kafka3:2181

In a development environment, I usually add the property delete.topic.enable. Setting this property to true allows us to easily delete topics at runtime. If this property is not being set, Kafka will only mark topics as deleted.

delete.topic.enable=true

That's it, we have configured our 3 node Kafka cluster!

Start and test the cluster setup

We finally can startup ZooKeeper and Kafka and perform a quick test.

Start ZooKeeper

To start ZooKeeper execute the following command on each node:

[kafka@kafka1 ~]$ cd kafka_2.11-1.0.0
[kafka@kafka1 ~]$ nohup bin/zookeeper-server-start.sh config/zookeeper.properties &

Start Apache Kafka

Now we can start Kafka:

[kafka@kafka1 ~]$ nohup bin/kafka-server-start.sh config/server.properties &

Create a new topic

To test the setup we have to create a topic.

[kafka@kafka1 ~]$ bin/kafka-topics.sh --create --zookeeper kafka1:2181,kafka2:2181,kafka3:2181 --replication-factor 1 --partitions 6 --topic topic1 --config cleanup.policy=delete --config delete.retention.ms=60000

The command above creates a new topic named topic1 with 6 partitions. Each cluster will be responsible for 2 partitions. The replication-factor has been set to 1, which means data is not being replicated and the data for a particular partition will only be stored on one server.

We can also get a list of all existing topics

[kafka@kafka1 ~]$ bin/kafka-topics.sh --list --zookeeper kafka1:2181

And we can get a detailed description of our topic.

[kafka@kafka1 ~]$ bin/kafka-topics.sh --describe --zookeeper kafka1:2181 --topic topic1

In my case the command above prints out:

Topic:topic1	PartitionCount:6	ReplicationFactor:1	Configs:delete.retention.ms=60000,cleanup.policy=delete
	Topic: topic1	Partition: 0	Leader: 2	Replicas: 2	Isr: 2
	Topic: topic1	Partition: 1	Leader: 3	Replicas: 3	Isr: 3
	Topic: topic1	Partition: 2	Leader: 1	Replicas: 1	Isr: 1
	Topic: topic1	Partition: 3	Leader: 2	Replicas: 2	Isr: 2
	Topic: topic1	Partition: 4	Leader: 3	Replicas: 3	Isr: 3
	Topic: topic1	Partition: 5	Leader: 1	Replicas: 1	Isr: 1

The command shows which server is responsible for which partition and which server replicates the data.

Test the cluster

The Kafka package already includes two command line tools to create a producer and a consumer that can be used to check if the cluster works.

We can start the producer on one of our servers. The command opens a prompt and anything we enter here will be sent to the topic.

[kafka@kafka1 ~]$ bin/kafka-console-producer.sh --broker-list kafka1:9092 --topic topic1

Now we can start a consumer on one of our servers.

[kafka@kafka1 ~]$ bin/kafka-console-consumer.sh --bootstrap-server kafka1:9092 --topic topic1

Whenever we enter something in the producer prompt it will be printed out in our consumer terminal.

This means our test was successful, the Kafka cluster has been set up. In the next couple of weeks, I will add more Kafka examples and use cases, including concrete examples in Java.