Setting up a Multi-Broker Kafka Cluster with kafka cluster manager — 2021

Mertseen
4 min readApr 13, 2021

I have faced tons of problems when i first try to create kafka clusters.Each blog either out of date or not working properly. In this post I’m gonna explain detailed guidance of multi node cluster installation Kafka on 3 different ubuntu 16.04 servers.

In 2021, most of the companies prefer to use the support of confluent kafka rather than traditional kafka. Confluent basically improves Kafka with additional community and commercial features designed to enhance the streaming experience of both operators and developers in production, at massive scale. You can install confluent platform as a whole from→ https://docs.confluent.io/5.5.0/installation/installing_cp/zip-tar.html#prod-kafka-cli-install

However in this blog we are not going to install confluent platform instead we will be creating original apache-Kafka distribution page. https://kafka.apache.org/quickstart

System prerequisites

  • Java & javac
sudo apt-get update
sudo apt-get install openjdk-8-jdk

Clusters

We have 3 different server each server will include 3 zookeeper and 3 kafka server instance. You can easily give alias by editing/etc/hosts . I will be using internal ips such as ‘10.xx.xx.xx1 …’

10.xx.xx.xx1 → zoo1

10.xx.xx.xx2 → zoo2

10.xx.xx.xx2 → zoo3

10.xx.xx.xx1 → kafka1

10.xx.xx.xx2 → kafka2

10.xx.xx.xx2 → kafka3

Caution !!

Those insturctions needs to be applied each kafka instances.

1. Install kafka

- Once download is completed, you need to extract files.

wget https://ftp.itu.edu.tr/Mirror/Apache/kafka/2.7.0/kafka_2.13-2.7.0.tgz

wget https://ftp.itu.edu.tr/Mirror/Apache/kafka/2.8.0/kafka-2.8.0-src.tgz

- Create kafka folder and extract it.

mkdir /tmp/kafka

tar xvzf kafka_2.13–2.7.0.tgz -C /tmp/kafka

Now we moved our file into /tmp/kafka ,inside of this folder there are two important folder: bin and config

- bin

/tmp/kafka/bin folder hold all executable file such as starting zookeeper, kafka …

- config

/tmp/kafka/config folder includes all properties and setting file. We are focusing zookeeper.properties and server.properties

2.Create log.dir folders

- In order to store your log data create kafka and zookeeper folder

mkdir /kafka
mkdir /zookeeper

Go to /zookeeper and create myid file for each instance.

For machine 1 (10.xx.xx.xx1)

cd /zookeepeer
echo 1 > myid

For machine 2(10.xx.xx.xx2)

cd /zookeepeer
echo 2 > myid

For machine 3(10.xx.xx.xx3)

cd /zookeepeer
echo 3 > myid

3.Zookeeper.properties

Open zookeeper.properties with nano or your best text editor.

nano /tmp/kafka/config/zookeeper.properties

Change the followings:

# the directory where the snapshot is stored.
dataDir=/zookeeper
# the port at which the clients will connect
clientPort=2181
# disable the per-ip limit on the number of connections since this is a non-production config
maxClientCnxns=0
# Disable the adminserver by default to avoid port conflicts.
# Set the port to something non-conflicting if choosing to enable this
# admin.enableServer=false
# admin.serverPort=8080
# keeps a heartbeat of zookeeper in milliseconds
tickTime=2000
# time for initial synchronization
initLimit=5
# how many ticks can pass before timeout
syncLimit=5
# define servers ip and internal ports to zookeeper
server.1=10.xx.xx.xx1:2888:3888
server.2=10.xx.xx.xx2:2888:3888
server.3=10.xx.xx.xx3:2888:3888

Do not forget to apply for each machine.

4. server.properties

Open server.properties with nano or your best text editor.

nano /tmp/kafka/config/server.properties

Change the followings:

For machine 1:

broker.id=1
listeners = PLAINTEXT://xx.xx.xx1:9092
delete.topic.enable=true
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
log.dirs=/kafka
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.segment.bytes=1073741824

log.retention.check.interval.ms=300000
zookeeper.connect=xx.xx.xx1:2181,xx.xx.xx2:2181,xx.xx.xx3:2181
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0

For machine 2:

broker.id=2
listeners = PLAINTEXT://xx.xx.xx2:9092
delete.topic.enable=true
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
log.dirs=/kafka
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=xx.xx.xx1:2181,xx.xx.xx2:2181,xx.xx.xx3:2181
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0

For machine 3:

broker.id=3
listeners = PLAINTEXT://xx.xx.xx3:9092
delete.topic.enable=true
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
log.dirs=/kafka
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=xx.xx.xx1:2181,xx.xx.xx2:2181,xx.xx.xx3:2181
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0

5. Start all zookeeper

go to kafka folder again cd /tmp/kafka

Start zookeeper for all instances

1-

bin/zookeeper-server-start.sh config/zookeeper.properties

2-

bin/zookeeper-server-start.sh config/zookeeper.properties

3-

bin/zookeeper-server-start.sh config/zookeeper.properties

6.Start kafka server

1-

bin/kafka-server-start.sh config/server.properties

2-

bin/kafka-server-start.sh config/server.properties

3-

bin/kafka-server-start.sh config/server.properties

7. Test the kafka servers

Lets check our server

Produce message

bin/kafka-console-producer.sh — topic test --bootstrap-server xx.xx.xx1:9092

Consume

bin/kafka-console-consumer.sh --from-beginning --topic 1-day-of  --bootstrap-server xx.xx.xx1:9092

8 CMAK Kafka Control Manager

CMAK (previously known as Kafka Manager) is a tool for managing Apache Kafka clusters. See link for details about the name change.

Clone repo

git clone https://github.com/yahoo/CMAK.git

The minimum configuration is the zookeeper hosts which are to be used for CMAK (pka kafka manager) state. This can be found in the application.conf file in conf directory. The same file will be packaged in the distribution zip file; you may modify settings after unzipping the file on the desired server.

In application.conf ( conf/application.conf )

# https://github.com/yahoo/CMAK/issues/713
kafkamanager.zkhosts="xx.xx.xx1:2181,xx.xx.xx2:2181,xx.xx.xx3:2181"
kafka-manager.zkhosts=${?ZK_HOSTS}
cmak.zkhosts="xx.xx.xx1:2181,xx.xx.xx2:2181,xx.xx.xx3:21811"
cmak.zkhosts=${?ZK_HOSTS}

Run cmak on 9000 port

bin/cmak -Dconfig.file=conf/application.conf -Dhttp.port=9000

Open and enter url to your browser -xx/xx/xx1.9000

Conclusion

--

--