Every week, Drawbridge holds “Tech Talks,” where team members give presentations on their current areas of focus. This Tech Talk was presented by Heedong and Sanjay.
At Drawbridge, ad request logs, impression logs, click logs, and conversion logs play a critical role. These raw logs are not only the source of our efficient programmatic ad serving, but are also fed into our reporting pipeline to provide accurate and up-to-date reports to our partners. However, recently our log aggregation infrastructure faced a challenge, as we are rapidly growing to serve about 10 billion requests per day. Servers responsible for aggregating log files from online servers and transferring to our Hadoop clusters were suffering high loads at peak times. Log transmissions were delayed as a result, and jobs down the pipeline were delayed as well, causing programmatic ad serving to be less effective and delaying reports.
Our first attempt to solve this problem was to add more machines to our current log aggregation infrastructure. However, we quickly realized that it would not scale, and it would entail cumbersome manual changes every time we add more machines. We then explored alternative frameworks like Kafka and flume. We decided to try Kafka not only because it is used by companies like LinkedIn, Twitter, and Netflix, but also because it provides fast, reliable, durable, and scalable framework.
Kafka consists of three components – producer, broker, and consumer. In our case, online servers are producers that send log files to Kafka brokers every minute. Consumers sit on HDFS data nodes and pull logs from the brokers and directly write to HDFS. Producer and consumer programs were written by our team to meet the specific requirements we had, but the integration process was otherwise seamless. After the initial integration was done, we had to tailor some parameters to fine-tune Kafka. For example, we had to make the Kafka broker run with 24G of heap space, a different garbage-collection scheme, and 22G of java new size. This was due to out-of-memory issues we faced while running with small memory size. Messages are stored in memory until it is written to the disk, but since our message are large and sent from many online servers every minute, the initial memory size was not sufficient to hold messages until it’s written to the disk. We also added support for JMX for better monitoring. In addition we had to set the maximum message size allowed to be 300MB since our minute log file size is around 200MB at peak times. Below are the few settings that we changed.
- export KAFKA_HEAP_OPTS=”-Xmx24G -Xms24G“
- export KAFKA_JVM_PERFORMANCE_OPTS=”-server
- -XX:+UseCompressedOops -XX:+UseParNewGC -XX:NewSize=22G
- -XX:+CMSClassUnloadingEnabled -XX:+CMSScavengeBeforeRemark
- -XX:+DisableExplicitGC -Djava.awt.headless=true”
- export JMX_PORT=9111
While integrating Kafka into our platform, we had to make some design decisions. First of all, we had to decide where to run producers, brokers and consumers. It was intuitive to run producers on the online servers, since those servers generate logs. We decided to run brokers on dedicated machines with enough memory and disk space, since all the messages are stored in the broker machine. We didn’t want resources like memory, CPU and disk space to be shared with other programs which might interfere with the broker’s proper functionality. Consumers could have run on any machine, but we decided to run it on hadoop data nodes since the logs should be written to HDFS for consumption and data nodes are closest to HDFS.
Second, we decided to share zookeeper with Hadoop framework. We could have set up a separate and dedicated zookeeper cluster for Kafka, but Kafka supports zookeeper’s chroot capability allowing Kafka to specify its own zookeeper root directory to store information separately from others. Another decision we made was not to compress logs before sending. Compressed logs are smaller in size but it comes in the expense of CPU cycles. Online servers are processing a lot of requests per second, and we don’t want to overload the cpu and interfere with serving capability. We are able to deliver 200MB size logs within two minutes to HDFS, even without the compression.
The last (but not least) decision we made was the replication factor. We set it to 1, which means no replication. The reason behind this is that we would want producers to fail to send messages when broker goes down. When a broker goes down and it comes back up, it needs to copy missing logs from its replica. However, if the replica goes down before the first broker copies all the messages from the replica, the messages that are not copied will be lost (assuming the replication factor 2). We thought it would be safer to fail to send messages. Besides, it is not easy to retrieve a certain message from Kafka’s message queue.
Our log transfer time reduced greatly with Kafka. It takes a maximum two minutes to transfer logs from the online servers to the HDFS which is almost 10x better than our old way. Also, it is easier to scale. If brokers are overloaded, it’s less cumbersome to add a broker than adding more machines to our old infrastructure. We are in the early stages of using Kafka and we believe there’s a room for improvements. We will continue to improve our integration with Kafka and we will share the findings.