Recently my colleague(Debbie) and I were tasked to increase the data transfer rate between our data centers. We were seeing an increased amount of traffic everyday and with every passing day we had to move more data. Time is of essence here as we were getting very close to hitting our retention bytes and would have soon started to lose data. A quick search suggested some buffer size changes, but it did not help either. Although it was part of the solution, it was not complete. After some arduous effort we figured out a right set of changes to increase the transfer rate(really fast!!!). I thought this might be useful for someone who is looking to do the same. Below are the changes we had to do to achieve it:
I am not suggesting that the below settings are the exact configuration you should use, but this might be a good starting point. You might have to fine tune it based on resource availability, bandwidth between data centers...
TCP Changes (Source Kafka Brokers, Kafka Mirror Box, Destination Kafka Brokers)
% sudo sysctl -w net.core.rmem_max=67108864
% sudo sysctl -w net.core.wmem_max=67108864
% sudo sysctl -w net.ipv4.tcp_rmem='4096 87380 33554432'
% sudo sysctl -w net.ipv4.tcp_wmem='4096 65536 33554432'
% sudo sysctl -w net.core.netdev_max_backlog=30000
% sudo sysctl -w net.ipv4.tcp_max_syn_backlog=4096
% sudo sysctl -p (this to make sure these changes take effect)
Consumer Properties (Kafka Mirror Box)
zookeeper.connect=<IP>:<PORT>,<IP>:<PORT>,<IP>:<PORT>
zookeeper.connection.timeout.ms=60000
group.id=KafkaMirror
auto.offset.reset=smallest (if you want to start from beginning)
fetch.message.max.bytes=10000000
rebalance.backoff.ms=5000
zookeeper.session.timeout.ms=5000
socket.receive.buffer.bytes=33554432
Producer Properties (Kafka Mirror Box)
metadata.broker.list=<IP>:<PORT>,<IP>:<PORT>,<IP>:<PORT>
partitioner.class=<partitioner class>
producer.type=async
compression.codec=(Use it based on your requirement)
serializer.class=kafka.serializer.DefaultEncoder
request.required.acks=1,0,-1 (Use it based on your requirement)
message.send.max.retries=3
queue.buffering.max.ms=1000
queue.buffering.max.messages=200000
batch.num.messages=100000
With remote profiling we found if we set batch.num.messages as half the size of queue.buffering.max.messages along with the configurations that suited our setup, it made fetcher and producer threads active for most of the time.
Server Properties (Source Cluster Brokers, Destination Cluster Brokers)
socket.send.buffer.bytes=33554432
socket.receive.buffer.bytes=33554432
socket.request.max.bytes=104857600
Brokers need to be restarted for these changes to take effect.
JVM Options
-Xmx 8G
-Xms 2G
change the above parameters based on message size and the configured queue size.
Debugging
Remote Profiling is also helpful to monitor the threads, memory…
-Dcom.sun.management.jmxremote.port=<PORT_NUMBER>
-Dcom.sun.management.jmxremote.authenticate=<BASED ON YOUR SETUP>
-Dcom.sun.management.jmxremote.ssl=<BASED ON YOUR SETUP>
Changing to trace mode you can see the amount of data being read, queue size over the period of time…
$KAFKA_HOME/config/tools-log4j.properties to TRACE
With these kind of settings(we use different settings on our production server), we have seen 50x-100x increase in data transfer rate.
Thanks to Debbie for her help and contribution in achieving these results.
Enjoy mirroring data!!!