Monday, June 22, 2015

Apache Kafka - First Contact

Finally I caught some time to dig into Kafka, a high-throughput distributed messaging system, as authors define it.

Essentially, Kafka is publish-subscribe messaging system, it can be used for several use case scenarios but in this post we will focus on tradinional message broker use case.
Kafka stores every message on disk. Yet, it is very fast. If you are interested how is this achieved, read it later :)
It is distributed by design - it is run as a cluster comprised of one or more servers each of which is called a broker.

Important concepts

- Kafka stores messages in categories called topics.
- Topics consists of one or more "partitions"
- Each partition is an ordered, immutable sequence of messages that is continually appended to
- Processes that publish messages to Kafka are called producers
- Processes that subscribe to topics and fetch messages are called consumers
- Messages are not deleted after delivery, but after configured period of time.

Partitions are replicated across Kafka cluster. Partition copies are distributed among multiple brokers but only one broker is "partition leader", meaning that all writes and reads to that partition must be handled through that broker and other brokers are used for only redundancy. If partition leader broker crashes, Kafka will automatically elect another leader among brokers holding that partition copy.

Let the games begin!

Now hat we know basic stuff, lets play.
In this "how to" I am using Kafka 0.8.2.1 so first step is to download it from this link.

After download, unzip it, open terminal console and go to kafka directory you just extracted from downloaded archive.

First, we must run Apache ZooKeeper since Kafka uses it for managing cluster configuration information, bot no worries, it is included in Kafka distribution and you only need to run it :
bin/zookeeper-server-start.sh config/zookeeper.properties
Zookeeper should be started and produce some output with last line looking something like this:
INFO binding to port 0.0.0.0/0.0.0.0:2181 (org.apache.zookeeper.server.NIOServerCnxnFactory)

Next step, open another terminal (unless you started zookeeper in background), and let's run Kafka.
One node will be enough for this test:
bin/kafka-server-start.sh config/server.properties
Hopefully everything went fine, and we have Kafka server running.

Finally, open third terminal and create  topic for testing purposes:
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic my-test-topic

We set replication-factor argument to 1, which means we don't have replication (we only have one broker currently).
Also, we set number of partitions to 2, which will be important later, when we will be talking about consumers.
As last option we specify topic name: "my-test-topic".
After we run this command, following output should appear:
Created topic "my-test-topic".

Now we have Kafka up and running! It is just one node (broker) but running more nodes is not complicated (step 6 in kafka quickstart) and it's not so important for the purpose of this post, which is showing how to produce and consume some messages from Java application.

Java time

We need following dependency for setting up our test application:
 <dependency>  
      <groupId>org.apache.kafka</groupId>  
      <artifactId>kafka_2.11</artifactId>  
      <version>0.8.2.1</version>  
 </dependency>  
You can get complete example code at my github repository.

Producer


When publishing message to Kafka cluster, producer must send it directly to partition leader, so it must know it's address. But have no fear, Producer API handles that part for you. All you have to do is provide addresses of one or more brokers (at least one but more is better for redundancy reasons) to producer configuration. Any broker can be queried for metadata containing information about topics and partitions, so API will use this list you provide to do so before really sending the message.
Let's  define producer properties:

Properties props = new Properties();
props.put("metadata.broker.list", "localhost:9092");
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("request.required.acks", "1");
ProducerConfig config = new kafka.producer.ProducerConfig(props);

metadata.broker.list property,  addresses of one or more brokers for fetching metadata,  we have only one broker localhost:9092. For providing more broker addresses format is: host1:port1,host2:port2
serializer.class property, defines message serializer. We use built in StringEncoder class.
request.required.acks property, 1 means that the producer gets an acknowledgement after the leader replica has received the data, not waiting for other brokers to replicate the data. If we had more replicas we could ask for more then one acknowledgement, ensuring better durability guarantees.

Configuration is ready, let's create producer:
Producer p = new Producer(config);
and we can start sending messages:
KeyedMessage keyedMessage = new KeyedMessage(topic, key, message);
p.send(keyedMessage);

OK, that will work, but... what is this  argument named key ?
Remember when we created our topic in terminal console, we specified that we want 2 partitions for it. And now is time to explain why.

Our goal is to have more then one consumer to consume messages from this topic. In Kafka, only one consumer can read messages from specific partition, so number of active consumers per topic is limited by the number of partitions in that topic. Having two partitions allow us to have two simultaneously active consumers.

So far so good, we have two partitions, and now we want producer to distribute messages to both of them, right?
In order to do that, we must provide a message key (String) which will be used to determine target partition for each message. This is called semantic partitioning.
Default partition resolving strategy is implemented as hash(key) % numberOfPartitions , meaning same key will always resolve same partition number (partitions inside topic are numbered as 0,1...P)
KeyedMessage can be instantiated without key : new KeyedMessage<>(topic, message) in which case Kafka should choose random partition (according to  documentation), but in my tests all messages went to first partition.
In our producer example we will generate keys also using modulo by numberOfPartitionsin order to achieve "round-robin" message distribution.

Complete producer example:
package hr.ib.kafka.test;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

import java.util.Properties;

public class ProducerTest {


    public ProducerTest(String topic, int partitionCount, int msgProduceDelay) {

        Properties props = new Properties();

        props.put("metadata.broker.list", "localhost:9092");
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        props.put("request.required.acks", "1");

        ProducerConfig config = new kafka.producer.ProducerConfig(props);
        Producer p = new Producer(config);

        run(topic, partitionCount, msgProduceDelay, p);
    }


    private void run(String topic, int partitionCount, int msgProduceDelay, Producer p) {

        int i = 0;
        while(true) {
            String message = "Message #" + (i + 1);
            String key = "" + i % partitionCount;
            KeyedMessage keyedMessage = new KeyedMessage<>(topic, key, message);
            p.send(keyedMessage);
            System.out.println("Sent message :" + message);
            sleep(msgProduceDelay);
            i++;
        }
    }


    private void sleep(int millis) {
        try {
            Thread.sleep(millis);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }


    public static void main(String[] args) {
        new ProducerTest("my-test-topic", 2, 500);
    }

}

Consumer


Before we dive into consumer code, we must know few things:

1. In Kafka, brokers do not push messages to consumers, consumers pull them.
2. Kafka has "Simple API" (low level) and "High level API" for consumers.
3. Consumer group defines set of consumers for handling specific business task. We can think of consumer group as logical subscriber for specific topic.
4. For each partition, Kafka tracks "consumer offset" for each consumer group - a number of last  message in partition consumed by that consumer group.

High level API handles offset tracking  automatically, but if you want to use some advanced features, like start processing messages from the beginning on consumer restart, you must use Simple API and handle offset tracking your self.
High level API is enough for most use cases, and in this example we will use it.

As we already mentioned, one partition can be consumed by only one consumer at the same time.
Kafka implements this behavior  using KafkaStream - only one stream per partition will be active in same time. If we try to open more streams then number of partitions in a topic, "extra streams" will be opened but no messages will be fed to them.

First we have to create consumer connector:
String zooKeeper = "localhost:2181";
String groupId = "myApp";
String topic = "my-test-topic"; 

Properties props = new Properties();
props.put("zookeeper.connect", zooKeeper);
props.put("group.id", groupId);
props.put("zookeeper.session.timeout.ms", "400");
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "1000");

ConsumerConfig consumerConfig = new ConsumerConfig(props);

consumerConnector =  Consumer.createJavaConsumerConnector(consumerConfig);

Consumer config is provided with zookeper location,  so it can get all Kafka cluster information it needs. And maybe most important parameter here - group.id, it is used to tell Kafka that this consumer is part of "myApp" consumer group.
We have established consumer connection, now we can get streams, and consume them. We could fetch multiple streams and handle them in separate threads, but we will rather have each stream consumed in separate JVM process, so we will request only one stream via this consumer connection, and start multiple consumer instances in order to handle more streams.
This is how we fetch consumer streams from consumer connection:
 

Map<String, Integer> topicCountMap = new HashMap<>();  
 topicCountMap.put(topic, new Integer(1));  
 Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumerConnector.createMessageStreams(topicCountMap);  
   
   
 List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);  
   
 for (KafkaStream<byte[], byte[]> stream : streams) {  
   StreamConsumer streamConsumer = new StreamConsumer(stream);  
   new Thread(streamConsumer).start();  
 }  


topicCountMap is used to define which streams we want. We can ask for streams from multiple topics, and  specify how many stream for each topic we want. We just want one stream from our topic.
Stream consuming is pretty simple.
 
 ConsumerIterator<byte[], byte[]> it = stream.iterator();  
 while (it.hasNext()) {  
    System.out.println("Consumed message: " + new String(it.next().message()));  
 }  

And here is complete consumer example:
 
package hr.ib.kafka.test;  
   
   
 import kafka.consumer.Consumer;  
 import kafka.consumer.ConsumerConfig;  
 import kafka.consumer.ConsumerIterator;  
 import kafka.consumer.KafkaStream;  
 import kafka.javaapi.consumer.ConsumerConnector;  
   
 import java.util.HashMap;  
 import java.util.List;  
 import java.util.Map;  
 import java.util.Properties;  
   
 public class ConsumerTest {  
   
   
   String zooKeeper = "localhost:2181";  
   String groupId = "myApp";  
   String topic = "my-test-topic";  
   
   ConsumerConnector consumerConnector;  
   
   public ConsumerTest() {  
   
     Properties props = new Properties();  
     props.put("zookeeper.connect", zooKeeper);  
     props.put("group.id", groupId);  
     props.put("zookeeper.session.timeout.ms", "400");  
     props.put("zookeeper.sync.time.ms", "200");  
     props.put("auto.commit.interval.ms", "1000");  
   
     ConsumerConfig consumerConfig = new ConsumerConfig(props);  
   
     consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);  
   
     Map<String, Integer> topicCountMap = new HashMap<>();  
     topicCountMap.put(topic, new Integer(1));  
     Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumerConnector.createMessageStreams(topicCountMap);  
   
   
     List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);  
   
     for (KafkaStream<byte[], byte[]> stream : streams) {  
       StreamConsumer streamConsumer = new StreamConsumer(stream);  
       new Thread(streamConsumer).start();  
     }  
   }  
   
   
   
   
   class StreamConsumer implements Runnable {  
   
     private KafkaStream stream;  
   
     public StreamConsumer(KafkaStream stream) {  
       this.stream = stream;  
     }  
   
     public void run() {  
       ConsumerIterator<byte[], byte[]> it = this.stream.iterator();  
       while (it.hasNext()) {  
         System.out.println("Consumed message: " + new String(it.next().message()));  
       }  
     }  
   }  
   
   
   
   public static void main(String[] args) {  
     new ConsumerTest();  
   }  
   
   
 }  

Now we can run one producer and  several consumers to see all this in action.

I suggest following steps:

  1. Run ProducerTest and leave it running.
  2. Run one ConsumerTest and observer output. You will notice it consumes all messages.
  3. Run second ConsumerTest and give it few seconds. Kafka will re-balance streams, and second consumer will start receiving messages (in my case even number messages), and first consumer will continue to consume only odd number messages.
  4. Start third ConsumerTest, and notice that it doesn't process any messages. It is because we only have two partitions, so third kafka stream will be empty. Leave it running anyway.
  5. Kill second consumer, and observe third one, it will soon start to process some messages. This is because Kafka re-balanced again after disconnection of second consumer and started to feed third consumer stream with messages.


And that's it.

I will close with notice that Kafka 0.9 will introduce new consumers API, so stay tuned for another "how to"  soon :)