Apache Kafka is a distributed, publish-subscribe messaging infrastructure, commonly used for data integration. It was originally developed at LinkedIn, mainly for Hadoop data ingestion but was made available as an open source project since v0.6, in 2011. Kafka is fast, scalable, durable and reliable and well suited for general purpose messaging usecases (especially those catered to by JMS currently) where high throughput, fault-tolerance and horizontal scalability are important.

Abstractions

Kafka can be seen as a distributed commit log. Messages published by producers/publishers are appended sequentially to this log. A logical representation of the log and its clients is shown below.

Logical Representation

Messages are identified by an increasing offset within the log and consumers/subscribers can retrieve them individually or in batches (called message-sets in Kafka) using it. Besides messages, producers and consumers, other actors of constituting the system are:

  • Topics – A Kafka topic is the category name that identifies a message-stream. It is similar to JMS topics in publish-subscribe semantics.
  • Partitions – A partition, or more specifically topic partition, is the unit of parallelism in Kafka. It is similar to the concept of database sharding. Throughput of a topic essentially depends on the number of partition it has.
  • Consumer Groups – A consumer group identifies the set of consumer processes that are subscribed to a topic. Each consumer in the group is assigned a set of partitions to consume from. This logical grouping of consumers as a single subscriber provides scalability and fault-tolerance for downstream message processing.
  • Replicas – In Kafka, replication is implemented at the partition level. The redundant unit of a topic partition is called a replica. Each partition usually has one or more replicas and each replica maintains its own local log on disk. From the replica set of a partition, one replica is elected as the leader and other replicas act as followers. The leader replica handles all read and write requests for the partition and the followers replicate the state of the leader. If the leader fails, one of the followers will automatically become the new leader.
  • Brokers – A typical Kafka installation (or cluster) consists of one or more servers, hosting partitions belonging to different topics. These server instances are called brokers and they are responsible for handling intra-cluster communication, request processing, log replication and persistence.

Broker Design

Brokers are the real workhorses of Kafka. One of the reasons for Kafka’s high performance, in addition to its replication design, is the simplicity of broker responsibilities. In JMS, message brokers keep track of messages consumed by topic subscribers. This limits scaling out when subscriber count increases. It is not so with Kafka. Kafka does not store the state of message consumption by each subscriber, rather, it expects subscribers to keep track of their log offsets. It retains published messages for a set amount of time and purges or compacts them based on a configurable cleanup policy. This stateless broker design allows Kafka to serve large number of consumers and manage log sizes with minimum overhead.

Another important aspect of broker design is the dependency on Zookeeper. Zookeeper serves as the metadata store for Kafka cluster and point of communication between brokers and clients. Details about brokers, consumers, topics and its partitions are registered with Zookeeper. Kafka relies on Zookeeper-broker sessions to determine broker liveness and for triggering broker failover in case of failures.

The following physical representation would give a better picture of how various Kafka actors interact.

Physcial Representation

Legend

Replication Design

Intra-cluster replication was introduced in v0.8, to improve availability and durability of Kafka cluster. For implementing replication, the replicas of a partition are allotted evenly across live brokers. For a given offset, the same message (that has been committed to the log) can be identified in all replicas of the partition. For a topic with replication factor ‘f’, Kafka tolerates up to ‘f-1’ broker failures without losing any messages committed to the log. This primary-backup approach seems to suit Kafka well than the commonly applied Quorum-based approach which can tolerate only ‘f’ broker failures in a cluster comprising ‘2f+1’ replicas (i.e., there must be at the least 3 live replicas at any instant for failover to happen).

When a producer publishes a message to a partition in a topic, it is forwarded to its leader replica. The leader replica appends the message to its commit log and increments its last message offset pointer called log-end-offset. It then waits for this message to be replicated among its follower replicas. Once enough replicas acknowledge having received the message, the leader commits it. The producer can also choose to wait or not to wait for acknowledgement. But Kafka only exposes a message to a consumer after it has been committed. When a broker fails, partitions with leader on that broker become temporarily unavailable. In such a scenario, broker failover gets triggered via Zookeeper listeners and move leaders of unavailable partitions to some other live replicas to continue serving the client requests. This process is done by one of the brokers designated as the controller.

A message is said to be committed when leader and its follower replicas have persisted it to their local logs. One issue with this approach is that the leader can’t always wait for writes to complete on all follower replicas. This is because any follower replica can fail in the meantime and the leader (and in turn, the producer) would end up waiting indefinitely. To address this scenario, for each partition of a topic, Kafka maintains a smaller subset of followers called the in-sync replica set (ISR). This is the set of replicas that are alive and fully in sync with the leader. When a new partition is created, all replicas are in the ISR. When a new message is published, the leader waits until it reaches all replicas in the ISR before committing it. If a follower replica fails, it will be dropped out of the ISR and the leader continues to commit new messages with fewer replicas in the ISR. The leader also maintains a high-watermark, which is the offset of the last committed message in a partition. This high-watermark is continuously communicated to the followers and checkpointed to disk on each broker periodically for recovery. When a failed replica is restarted, it first recovers the latest high-watermark that was checkpointed to disk and resets its log and log-end-offset pointer to the high-watermark. This is necessary since messages after the high-watermark are not guaranteed to be committed and may need to be thrown away. Then, the replica becomes a follower and starts fetching messages after the high-watermark from the leader. Once it has fully caught up, the replica is added back to the ISR. It is to be noted that committed messages are always preserved during leadership transition while some uncommitted data may be lost. Since a Kafka cluster is usually deployed within a single datacentre, where network failures are rare, the system in general provides high availability and strong consistency, which are ideal parameters of a good distributed system.