Setup a Kafka cluster with 3 nodes on CentOS 7
Apache Kafka is an open source distributed stream processing platform. From a high-level perspective, Kafka is a distributed messaging system that allows producers to send messages to a topic and consumers to read messages from a topic. Kafka is massively scalable and offers high throughput and low latency when operated in a cluster. This post explains how to set up a Kafka cluster consisting of 3 nodes for a development environment.
As we are going to set up a 3 nodes Kafka cluster we need 3 CentOS 7 Linux servers with the latest updates and JDK 1.8. In this example, I use 3 virtual machines. You can find the process to set up such a Virtual Box image in this post.
The diagram below shows the network design we will use to set up the cluster. The hostnames of the virtual machines are kafka1, kafka2, and kafka3. We also assign a fixed IP address for each of the servers (see next section). Additionally, we will create firewall rules to ensure that the cluster nodes can communicate with each other. The required ports are 2888, 3888 and 2181 for ZooKeeper and 9092 for Kafka.
Assign static IP Addresses
To set up the cluster we should assign fixed IP addresses to our cluster nodes. This simplifies the ZooKeeper and Kafka configurations described in subsequent steps.
The easiest way to assign a fixed IP address for each server is to add a DHCP reservation rule in our router. We only need the MAC address of the virtual machine network interface. We can get the MAC address by executing
ip a on each of the servers.
[user@kafka1 ~]$ ip a 1: lo:
mtu 65536 qdisc noqueue state UNKNOWN qlen 1 link/loopback 00:00:00:00:00:00 brd 00:00:00:00:00:00 inet 127.0.0.1/8 scope host lo valid_lft forever preferred_lft forever inet6 ::1/128 scope host valid_lft forever preferred_lft forever 2: enp0s3: mtu 1500 qdisc pfifo_fast state UP qlen 1000 link/ether 08:00:27:74:3a:f4 brd ff:ff:ff:ff:ff:ff inet 192.168.1.101/24 brd 192.168.1.255 scope global dynamic enp0s3 valid_lft 86389sec preferred_lft 86389sec inet6 2001:569:70df:8b00:8728:ced9:1c2a:387f/64 scope global noprefixroute dynamic valid_lft 14696sec preferred_lft 14396sec inet6 fe80::ae37:4c20:e5bd:8014/64 scope link valid_lft forever preferred_lft forever
The example above shows that the virtual machine uses MAC address 08:00:27:74:3a:f4. Now we can use the MAC address to assign a DHCP rule in the router. The picture below shows the rules for all server nodes.
In case you don't have access to the router and cannot set a DHCP rule you can configure a static IP address as described here.
Adjust /etc/hosts file
I usually prefer to use hostnames instead of fixed IP addresses within the configuration files. To achieve this without publicly available domain names we have to add the server names to the /etc/hosts file on each server.
[user@kafka1 ~]$ sudo vi /etc/hosts
Let's add the following entries:
192.168.1.101 kafka1 192.168.1.102 kafka2 192.168.1.103 kafka3
Create firewall rules
As mentioned above we have to open several ports to allow clients to connect to our Kafka cluster and to allow the nodes to communicate with each other. In CentOS 7 we have to add the corresponding firewall rules.
ZooKeeper firewall rule
Let's start with the firewall rule for ZooKeeper. We have to open ports 2888, 3888 and 2181.
[user@kafka1 ~]$ sudo vi /etc/firewalld/services/zooKeeper.xml
We have to add the following content to the zooKeeper.xml file.
<?xml version="1.0" encoding="utf-8"?> <service> <short>ZooKeeper</short> <description>Firewall rule for ZooKeeper ports</description> <port protocol="tcp" port="2888"/> <port protocol="tcp" port="3888"/> <port protocol="tcp" port="2181"/> </service>
Kafka firewall rule
For Kafka, we have to open port 9092.
[user@kafka1 ~]$ sudo vi /etc/firewalld/services/kafka.xml
We have to add the following content to the kafka.xml file.
<?xml version="1.0" encoding="utf-8"?> <service> <short>Kafka</short> <description>Firewall rule for Kafka port</description> <port protocol="tcp" port="9092"/> </service>
Activate the new rules
Now we can activate the new firewall rules. Let's first restart the firewalld service to enforce that all existing service specifications are reloaded.
[user@kafka1 ~]$ sudo service firewalld restart
We can now permanently add the firewall rules for ZooKeeper and Kafka.
[user@kafka1 ~]$ sudo firewall-cmd --permanent --add-service=zooKeeper [user@kafka1 ~]$ sudo firewall-cmd --permanent --add-service=kafka
After activating the rules we have to restart firewalld.
[user@kafka1 ~]$ sudo service firewalld restart
To ensure that everything works as expected we can check if the new services have been activated.
[user@kafka1 ~]$ sudo firewall-cmd --list-services ssh dhcpv6-client ntp zooKeeper kafka
The output shows that both services are active.
Create Kafka user
I recommend to create a separate kafka user for operating the Kafka node.
[user@kafka1 ~]$ sudo adduser kafka [user@kafka1 ~]$ sudo passwd kafka
After creating the user we can switch to the new user and can perform the remaining configuration.
[user@kafka1 ~]$ su kafka
Install ZooKeeper and Kafka
Now we can download Apache Kafka
[kafka@kafka1 ~]$ wget http://apache.forsale.plus/kafka/1.0.0/kafka_2.11-1.0.0.tgz
Let's decompress the tar.gz file
[kafka@kafka1 ~]$ tar -xzf kafka_2.11-1.0.0.tgz
That's it! As the Kafka archive already includes ZooKeeper we already have a fully working Kafka installation on our system. To operate a cluster we have to adjust the configuration.
We have to create data and log directories for ZooKeeper and Kafka. To simplify this process we can add the directories within the user home directory. In a production environment, we would use different locations, e.g. separate mount points or physical disks for data and log directories.
[kafka@kafka1 ~]$ mkdir -p /home/kafka/zookeeper/data [kafka@kafka1 ~]$ mkdir -p /home/kafka/kafka/kafka-logs
The configuration file of the embedded ZooKeeper instance is located at kafka_2.11-1.0.0/config/zookeeper.properties.
[kafka@kafka1 ~]$ vi kafka_2.11-1.0.0/config/zookeeper.properties
Within this file, we have to locate the dataDir property and set the value to point to the new ZooKeeper directory we created above.
At the end of this file, we have to add all available ZooKeeper servers. We also add the initLimit and syncLimit properties. Additional information about these properties can be found here.
server.1=kafka1:2888:3888 server.2=kafka2:2888:3888 server.3=kafka3:2888:3888 initLimit=5 syncLimit=2
Each of our cluster nodes needs a unique server id. ZooKeeper looks up this information from the following file: /home/kafka/zookeeper/data/myid. We have to execute a command like this on each server - using a different value for each instance. For instance 1 on server kafka1 we use the value "1".
[kafka@kafka1 ~]$ echo "1" > /home/kafka/zookeeper/data/myid
Apache Kafka configuration
Now we can adjust the Kafka configuration files stored here: kafka_2.11-1.0.0/config/server.properties.
[kafka@kafka1 ~]$ vi kafka_2.11-1.0.0/config/server.properties
Similar to the ZooKeeper configuration, each Kafka cluster node needs a unique id. We have to find the broker.id property in the configuration file and change the id for each server. I recommend to use 1, 2 and 3.
We also have to change the log directory location specified in the log.dirs parameter.
Additionally, we have to update the listeners and advertised.listeners properties with the current Kafka node hostname.
- listeners: the address/server name and protocol kafka is listening to (internal traffic between Kafka nodes)
- advertised.listener: the address/server name and protocol clients can use to connect to the Kafka cluster (external traffic). Only need to be specified if different from above setting.
The next step is to tell Kafka which ZooKeeper nodes can be used to connect to. Especially when operating a big cluster with hundreds of nodes, not all available server nodes have to be added here. It is sufficient to add a couple of seed nodes. Kafka will identify all available nodes and updates the available nodes if new nodes join or leave the cluster.
In a development environment, I usually add the property delete.topic.enable. Setting this property to true allows us to easily delete topics at runtime. If this property is not being set, Kafka will only mark topics as deleted.
That's it, we have configured our 3 node Kafka cluster!
Start and test the cluster setup
We finally can startup ZooKeeper and Kafka and perform a quick test.
To start ZooKeeper execute the following command on each node:
[kafka@kafka1 ~]$ cd kafka_2.11-1.0.0 [kafka@kafka1 ~]$ nohup bin/zookeeper-server-start.sh config/zookeeper.properties &
Start Apache Kafka
Now we can start Kafka:
[kafka@kafka1 ~]$ nohup bin/kafka-server-start.sh config/server.properties &
Create a new topic
To test the setup we have to create a topic.
[kafka@kafka1 ~]$ bin/kafka-topics.sh --create --zookeeper kafka1:2181,kafka2:2181,kafka3:2181 --replication-factor 1 --partitions 6 --topic topic1 --config cleanup.policy=delete --config delete.retention.ms=60000
The command above creates a new topic named topic1 with 6 partitions. Each cluster will be responsible for 2 partitions. The replication-factor has been set to 1, which means data is not being replicated and the data for a particular partition will only be stored on one server.
We can also get a list of all existing topics
[kafka@kafka1 ~]$ bin/kafka-topics.sh --list --zookeeper kafka1:2181
And we can get a detailed description of our topic.
[kafka@kafka1 ~]$ bin/kafka-topics.sh --describe --zookeeper kafka1:2181 --topic topic1
In my case the command above prints out:
Topic:topic1 PartitionCount:6 ReplicationFactor:1 Configs:delete.retention.ms=60000,cleanup.policy=delete Topic: topic1 Partition: 0 Leader: 2 Replicas: 2 Isr: 2 Topic: topic1 Partition: 1 Leader: 3 Replicas: 3 Isr: 3 Topic: topic1 Partition: 2 Leader: 1 Replicas: 1 Isr: 1 Topic: topic1 Partition: 3 Leader: 2 Replicas: 2 Isr: 2 Topic: topic1 Partition: 4 Leader: 3 Replicas: 3 Isr: 3 Topic: topic1 Partition: 5 Leader: 1 Replicas: 1 Isr: 1
The command shows which server is responsible for which partition and which server replicates the data.
Test the cluster
The Kafka package already includes two command line tools to create a producer and a consumer that can be used to check if the cluster works.
We can start the producer on one of our servers. The command opens a prompt and anything we enter here will be sent to the topic.
[kafka@kafka1 ~]$ bin/kafka-console-producer.sh --broker-list kafka1:9092 --topic topic1
Now we can start a consumer on one of our servers.
[kafka@kafka1 ~]$ bin/kafka-console-consumer.sh --bootstrap-server kafka1:9092 --topic topic1
Whenever we enter something in the producer prompt it will be printed out in our consumer terminal.
This means our test was successful, the Kafka cluster has been set up. In the next couple of weeks, I will add more Kafka examples and use cases, including concrete examples in Java.