Overview

Apache Kafka is an open source data streaming platform, designed with high-throughput and low-latency in mind. In this post I will show you how to run a single-instance Kafka server, how to create a topic, and how to write/read data records to/from that topic.

Source Code

You can grab the source code for this article from GitHub.

Step 1: Download and extract Kafka

Go to kafka.apache.org/downloads and choose the version of Kafka you want to download. Here I download 0.10.2.1 with Scala 2.11.

$ wget http://apache.melbourneitmirror.net/kafka/0.10.2.1/kafka_2.11-0.10.2.1.tgz
$ tar xvf kafka_2.11-0.10.2.1.tgz -C ~/Programs
$ mv ~/Programs/kafka_2.11-0.10.2.1 ~/Programs/kafka-0.10.2.1

Step 2: Start ZooKeeper and Kafka

Kafka uses ZooKeeper for cluster management and configuration storage. When new nodes are added to a Kafka cluster or become unhealthy and die, Kafka relies on ZooKeeper to ensure the cluster remains available and continues working.

$ cd ~/Programs/kafka-0.10.2.1

# Start ZooKeeper on port 2181
$ bin/zookeeper-server-start.sh -daemon config/zookeeper.properties

# Start Kafka on port 9092
$ bin/kafka-server-start.sh -daemon config/server.properties

Step 3: Create a Kafka topic

In Kafka producers write data to topics and consumers read data from those topics. Topics are usually used to separate different categories of data. For example, if you were to use Kafka for a Twitter data streaming application, you could create a topic for tweets and a topic for users.

A Kafka topic can be configured to have one or more partitions. Partitions for a given topic can be distributed among multiple servers. This allows a topic to scale in size beyond the storage available to a single server. Furthermore, it allows multiple producers/consumer write/read data into a topic in parallel, hence increasing that topic’s throughput.

When you run a Kafka cluster with multiple servers, you can replicate data into more than one server for high durability. With such a setup, if one of the servers (also known as nodes) in your cluster dies, you can prevent data loss.

$ bin/kafka-topics.sh --create --topic very_simple_topic \
                      --zookeeper localhost:2181 \
                      --replication-factor 1 --partitions 3
Created topic "very_simple_topic".

Step 4: Write a very simple producer

package org.behrang.examples.kafka;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

/**
 * A very simple Kafka producer.
 */
public class VerySimpleProducer {

    // The topic we are going to write records to
    private static final String KAFKA_TOPIC_NAME = "very_simple_topic";

    public static void main(String[] args) {
        // Set producer configuration properties
        final Properties producerProps = new Properties();
        producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

        // Create a new producer
        try (final KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps)) {
            // Write 10 records into the topic
            for (int i = 0; i < 10; i++) {
                final String key = "key-" + i;
                final String value = "value-" + i;
                producer.send(new ProducerRecord<>(KAFKA_TOPIC_NAME, key, value));
            }
        }
    }
}

Step 5: Write a very simple consumer

package org.behrang.examples.kafka;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Collections;
import java.util.Properties;

/**
 * A very simple Kafka consumer.
 */
public class VerySimpleConsumer {

    // The topic we are going to read records from
    private static final String KAFKA_TOPIC_NAME = "very_simple_topic";

    public static void main(String[] args) {
        // Set consumer configuration properties
        final Properties consumerProps = new Properties();
        consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "very-simple-consumer");

        // Create a new consumer
        try (final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps)) {
            // Subscribe to the topic
            consumer.subscribe(Collections.singleton(KAFKA_TOPIC_NAME));

            // Continuously read records from the topic
            while (true) {
                final ConsumerRecords<String, String> records = consumer.poll(1000);
                for (ConsumerRecord<String, String> record : records) {
                    System.out.println("Received: " + record);
                }
            }
        }
    }

}

Step 6: Run an instance of the consumer and the producer respectively

First build your project:

$ mvn clean install

Then run the consumer in one terminal window:

$ mvn exec:java -Dexec.mainClass=org.behrang.examples.kafka.VerySimpleConsumer

Then open another terminal windows run the producer:

$ mvn exec:java -Dexec.mainClass=org.behrang.examples.kafka.VerySimpleProducer

If you go back to the consumer terminal, you should see the received messages logged in the output:

Received: ConsumerRecord(topic = very_simple_topic, partition = 1, offset = 0, CreateTime = 1513475276740, checksum = 1161559315, serialized key size = 5, serialized value size = 7, key = key-0, value = value-0)
Received: ConsumerRecord(topic = very_simple_topic, partition = 1, offset = 1, CreateTime = 1513475276747, checksum = 3822194193, serialized key size = 5, serialized value size = 7, key = key-7, value = value-7)
Received: ConsumerRecord(topic = very_simple_topic, partition = 1, offset = 2, CreateTime = 1513475276747, checksum = 33131761, serialized key size = 5, serialized value size = 7, key = key-8, value = value-8)
Received: ConsumerRecord(topic = very_simple_topic, partition = 2, offset = 0, CreateTime = 1513475276747, checksum = 186355086, serialized key size = 5, serialized value size = 7, key = key-2, value = value-2)
Received: ConsumerRecord(topic = very_simple_topic, partition = 2, offset = 1, CreateTime = 1513475276747, checksum = 3887722871, serialized key size = 5, serialized value size = 7, key = key-3, value = value-3)
Received: ConsumerRecord(topic = very_simple_topic, partition = 2, offset = 2, CreateTime = 1513475276747, checksum = 3790074274, serialized key size = 5, serialized value size = 7, key = key-5, value = value-5)
Received: ConsumerRecord(topic = very_simple_topic, partition = 2, offset = 3, CreateTime = 1513475276747, checksum = 259027688, serialized key size = 5, serialized value size = 7, key = key-6, value = value-6)
Received: ConsumerRecord(topic = very_simple_topic, partition = 0, offset = 0, CreateTime = 1513475276746, checksum = 2052492634, serialized key size = 5, serialized value size = 7, key = key-1, value = value-1)
Received: ConsumerRecord(topic = very_simple_topic, partition = 0, offset = 1, CreateTime = 1513475276747, checksum = 222661979, serialized key size = 5, serialized value size = 7, key = key-4, value = value-4)
Received: ConsumerRecord(topic = very_simple_topic, partition = 0, offset = 2, CreateTime = 1513475276747, checksum = 3982225416, serialized key size = 5, serialized value size = 7, key = key-9, value = value-9)

As you can see, your consumer has received all the 10 messages sent from the producer. Some of these messages have been sent to partition 1, some to partition 2, and some to partition 0.

Conclusion

Writing basic Kafka clients (producers and consumers) is very simple. However writing efficient, high-throughput Kafka clients is more challenging.

For example, an efficient consumer should ideally start as many threads as the number of partitions it is reading from. If the consumer is reading from T topics and each topic has PT partitions, then, as a rule of thumb, the consumer should start T * PT threads and consume from all the partitions in parallel.