Monday, February 9, 2015

KafkaMirror Fine Tuning Performance - Increasing Throughput

Recently my colleague(Debbie) and I were tasked to increase the data transfer rate between our data centers. We were seeing an increased amount of traffic everyday and with every passing day we had to move more data. Time is of essence here as we were getting very close to hitting our retention bytes and would have soon started to lose data. A quick search suggested some buffer size changes, but it did not help either. Although it was part of the solution, it was not complete. After some arduous effort we figured out a right set of changes to increase the transfer rate(really fast!!!). I thought this might be useful for someone who is looking to do the same. Below are the changes we had to do to achieve it:

I am not suggesting that the below settings are the exact configuration you should use, but this might be a good starting point. You might have to fine tune it based on resource availability, bandwidth between data centers...    

TCP Changes (Source Kafka Brokers, Kafka Mirror Box, Destination Kafka Brokers)
      % sudo sysctl -w net.core.rmem_max=67108864
      % sudo sysctl -w net.core.wmem_max=67108864
      % sudo sysctl -w net.ipv4.tcp_rmem='4096 87380 33554432'  
      % sudo sysctl -w net.ipv4.tcp_wmem='4096 65536 33554432'
      % sudo sysctl -w net.core.netdev_max_backlog=30000
      % sudo sysctl -w net.ipv4.tcp_max_syn_backlog=4096
      % sudo sysctl -p (this to make sure these changes take effect)

Consumer Properties (Kafka Mirror Box)
      zookeeper.connect=<IP>:<PORT>,<IP>:<PORT>,<IP>:<PORT>
      zookeeper.connection.timeout.ms=60000
      group.id=KafkaMirror
      auto.offset.reset=smallest (if you want to start from beginning)
      fetch.message.max.bytes=10000000
      rebalance.backoff.ms=5000
      zookeeper.session.timeout.ms=5000
      socket.receive.buffer.bytes=33554432

Producer Properties (Kafka Mirror Box)
      metadata.broker.list=<IP>:<PORT>,<IP>:<PORT>,<IP>:<PORT>
      partitioner.class=<partitioner class>
      producer.type=async
      compression.codec=(Use it based on your requirement)
      serializer.class=kafka.serializer.DefaultEncoder
      request.required.acks=1,0,-1 (Use it based on your requirement)
      message.send.max.retries=3
      queue.buffering.max.ms=1000
      queue.buffering.max.messages=200000
      batch.num.messages=100000

With remote profiling we found if we set batch.num.messages as half the size of queue.buffering.max.messages along with the configurations that suited our setup, it made fetcher and producer threads active for most of the time.

Server Properties (Source Cluster Brokers, Destination Cluster Brokers)
      socket.send.buffer.bytes=33554432
      socket.receive.buffer.bytes=33554432
      socket.request.max.bytes=104857600

Brokers need to be restarted for these changes to take effect.

JVM Options
      -Xmx 8G
      -Xms 2G

change the above parameters based on message size and the configured queue size.

Debugging

Remote Profiling is also helpful to monitor the threads, memory…
     -Dcom.sun.management.jmxremote.port=<PORT_NUMBER>
     -Dcom.sun.management.jmxremote.authenticate=<BASED ON YOUR SETUP>   
     -Dcom.sun.management.jmxremote.ssl=<BASED ON YOUR SETUP>

Changing to trace mode you can see the amount of data being read, queue size over the period of time…
     $KAFKA_HOME/config/tools-log4j.properties to TRACE

With these kind of settings(we use different settings on our production server), we have seen 50x-100x increase in data transfer rate.

Thanks to Debbie for her help and contribution in achieving these results.

Enjoy mirroring data!!!

1 comment: