Skip to main content

Paper Insights #19 - Kafka: A Distributed Messaging System for Log Processing

This paper was authored by Jay Kreps, Neha Narkhede, and Jun Rao. This seminal paper, presented at the NetDB '11 workshop, laid the foundation for Apache Kafka, a highly influential open-source project in the realm of distributed systems.

Paper Link

While the paper initially focused on a specific use case – log processing – Kafka has since evolved into a versatile and robust platform for general message delivery. Both Jay Kreps and Neha Narkhede went on to co-found Confluent Inc., a company commercializing Kafka.

Although workshop papers typically carry less weight than conference papers, this particular work garnered significant attention and has had a profound impact on the field. The paper's relatively weak evaluation section may have contributed to its non-selection for the main conference track. However, this in no way diminishes its significance and the lasting influence of Apache Kafka.

Messaging Systems

Messaging systems facilitate the exchange of messages between different components within a distributed system. The sender of a message is known as the producer and the receiver is known as the consumer.

Two prominent types of messaging systems are:

  • Message Queues: These systems employ a point-to-point delivery model, where each message is consumed by only one consumer. This is analogous to a one-to-one communication channel.

  • Publish-Subscribe (Pub/Sub): In Pub/Sub systems, a message published by a producer can be received by multiple consumers. This enables one-to-many communication.



Note that a "consumer" may also be a group, which may consist of multiple processes. These processes are typically replicas of the same executable and function identically. In case of message queues, a message will be delivered to exactly one process within this group, whereas, in case of pub/sub, a message will be delivered to exactly one process of each such group.

Scenarios

Message Queue: Consider an e-commerce scenario where customer orders are placed. These orders are enqueued for processing. A dedicated worker process retrieves each order from the queue and executes the necessary actions, such as deducting funds from the customer's account and forwarding the order to the supplier.

Pub/Sub: In an online shopping application, maintaining accurate inventory information is crucial. When an item's availability changes, a message is published by, say, a change detector. All micro-services that need inventory information (e.g., product pages, shopping cart) receive the message and update their states, ensuring consistent data across the system.

Delivery Guarantees

Messaging systems offer varying levels of delivery guarantees:

At-least-once Delivery (Recommended)

This guarantee ensures that each message is delivered to consumers at least once, potentially multiple times. This simpler implementation allows for duplicate deliveries. To handle this, consumer actions must be idempotent—performing the same action multiple times has the same effect as performing it once. At-least-once delivery is widely supported (including by Kafka) and is the most common delivery guarantee.

For example, if a consumer receives a new value for an in-memory key-value state, applying that value updates the state; reapplying the same value has no further effect. 

At-most-once Delivery

This guarantee ensures that each message is delivered to consumers at most once, meaning it might not be delivered at all. This is a weaker guarantee than at-least-once and can be implemented simply by sending a remote procedure call (RPC) from producer to consumer. In a messaging system context, this might involve a "fire-and-forget" approach where the system attempts delivery once and then discards the message. These simple approaches, however, may not handle failure scenarios or retry mechanisms effectively.

A more sophisticated approach to at-most-once delivery was proposed by Liskov in Practical Uses of Synchronized Clocks in Distributed Systems. This approach assumes synchronized clocks across systems with a maximum difference of ε. The receiver tracks the timestamp of the most recent message and discards older messages, accounting for the potential time difference.

Exactly-once Delivery (Transactional Delivery)

This guarantee ensures that each message is delivered exactly once to the consumer.The consumer must complete the action associated with the message before sending an acknowledgment (ack). Until the ack is received, the message is locked to prevent delivery to other consumers. This involves timeouts, which, while not perfect, are generally accepted compromise in distributed systems as a good indication of message failure after a significant duration.

The action itself must be transactional. If the ack fails, the effects of the action must be reversed, as the message will be redelivered. Conversely, the message cannot be acknowledged before the action is completed to prevent data loss.

Implementing exactly-once delivery is complex, requiring two-phase commit protocols and database support with locking mechanisms. This approach can be slow and may not handle high message volumes effectively. It's often recommended to use at-least-once delivery with idempotent operations or manual duplicate handling to achieve the desired effect without the complexity of true exactly-once delivery.

Ordering Guarantees

Messaging systems provide varying levels of message ordering guarantees:

  • Total Order: Messages are delivered to consumers in the exact same order they were produced. This ensures strict sequential processing.

  • Partial Order: Messages are delivered in a partially defined order. This order is typically determined for messages produced within a specific context, such as those originating from the same producer or within the same batch. Messages across different contexts may be delivered in any order relative to each other.

  • Out-of-Order: Messages can be delivered to consumers in any order, regardless of their production sequence.

A key trade-off exists between ordering and performance. Out-of-order delivery can achieve lower latency because it avoids the need to wait for prior messages. In contrast, maintaining total or partial order can introduce head-of-line blocking, where subsequent messages are delayed while waiting for earlier messages to be processed.

Push vs Pull Model

Consumers generally have the option to either actively "pull" messages from a queue or passively receive them via a "push" mechanism, often implemented using a Remote Procedure Call (RPC) protocol. While push models offer lower latency, they can have lower throughput due to the overhead associated with processing each individual RPC, which consumes valuable computational resources. Conversely, pull-based models may introduce higher latency due to the need for periodic polling. However, they can improve throughput by enabling batch processing of messages, optimizing resource utilization.

Interestingly, many systems that internally support asynchronous push-based calls often implement their consumer libraries using a pull-based model under the hood. This design choice can significantly enhance overall throughput.

PubSub Topics

In a pub/sub system, topics act as channels for routing messages to consumers. Producers (publishers) categorize each message by assigning it to a specific topic. Consumers (subscribers) then subscribe to the topics they're interested in and receive only the messages published to those topics.

To further enhance scalability and throughput, topics are often sharded into sub-topics (also called partitions). Producers write messages to one of these sub-topics, while consumers subscribing to the main topic receive messages from all of its sub-topics. This sharding allows different parts of the system (e.g., different servers or processes) to handle different sub-topics concurrently, thus distributing the workload and improving performance. Each topic shard (sub-topic) is typically managed by a distinct set of resources (e.g., jobs or processes) within the system.

Kafka Architecture

Kafka primarily functions as a publish-subscribe system. 

Architecture

Kafka utilizes a topic-based architecture. Each topic is further divided into multiple partitions. A dedicated server, known as a broker, is responsible for handling a specific partition. A broker may handle multiple such partitions.

Guarantees

It offers the following key guarantees:

  • At-least-once delivery: Ensures that each message is delivered to the consumer at least once. In most scenarios, Kafka achieves exactly-once delivery. However, consumer restarts can potentially lead to duplicate message deliveries.
  • Partial ordering: Guarantees that messages within the same partition are delivered to the consumer in the order they were produced.

Consumer Behavior

Consumers can subscribe to specific topics. A consumer can create multiple "sub-streams" for a single topic. Messages destined for the topic are then multiplexed and delivered across these sub-streams.

Note: Kafka can also operate as a traditional message queue when there is only a single consumer for a given topic.

The Broker Layer

Brokers store messages in log files. Each log file is segmented into approximately 1GB files. Messages are appended sequentially to the current segment file. When a segment file reaches its maximum size, it "rolls over" and a new segment file is created.

To optimize performance, message flushes to the segment file are performed periodically by a background process, amortizing the cost of disk I/O. A message is considered committed for delivery only after it has been successfully flushed to the segment file.

Message Addressing and Ordering

Each message is uniquely identified by its "offset", a logical position within the log stream. This offset serves as the message ID. Consumers consume messages from a topic partition sequentially, ensuring total ordering within the partition and partial ordering within a topic. If a topic has only one partition, it guarantees total order for all messages within that topic.

Broker Statelessness

When a consumer receives a message, it can independently calculate the offset of the next message. This client-side calculation eliminates the need for broker involvement in message tracking, allowing the broker to maintain a stateless architecture.

Message Deletion

To manage storage space, brokers employ a simple time-based policy for message deletion. Typically, messages are retained for a period of 7 days before being automatically deleted.

Consumer Groups

Kafka employs the concept of "consumer groups", where a group consists of multiple consumers that collectively subscribe to a single topic. While each message within a topic is supposed to be delivered to exactly one consumer within the group, this does not guarantee exactly-once delivery across the entire system, as will be discussed later.

Kafka itself does not actively manage message delivery to specific consumers within a group. This responsibility lies with the consumers themselves, who must coordinate their actions. To facilitate this coordination, consumers leverage ZooKeeper, a distributed coordination service. ZooKeeper enables consumers to acquire and maintain exclusive ownership of specific partitions within a topic. 

On a side note however, ZooKeeper's ownership mechanism relies on timing assumptions and is not entirely foolproof.

Partition Assignment and Coordination

Consumers coordinate to determine which partitions each member of the group will consume. This involves:

  1. Maintaining Registries: ZooKeeper hosts three key registries:

    • Consumer Registry: Tracks all consumers belonging to a particular group.
    • Ownership Registry: Records the "owner" (consumer) for each partition within the topic.
    • Offset Registry: Stores the offset of the last message consumed from each partition.
  2. Partition Assignment: When the consumer registry changes (e.g., a consumer joins or leaves the group), all consumers are notified by ZooKeeper. They then execute a deterministic algorithm to re-assign partitions among themselves. This algorithm ensures that all consumers arrive at the same partition assignment, regardless of the order in which they execute the algorithm. The algorithm divides the partitions into N groups (where N is the number of consumers) and assigns partitions to consumers in a round-robin fashion based on their IDs.

Offset Commits and Message Re-delivery

Consumers are responsible for committing their offsets to the offset registry. If a consumer fails before committing an offset, the message associated with that offset may be re-delivered to a different consumer within the group.

For example, in Algorithm 1, the critical step involves assigning ownership of partition p to consumer Ci. This is the lock acquisition phase, mediated by ZooKeeper. ZooKeeper ensures that only the designated owner, Ci, can subsequently update the offset for partition p in the registry. However, existing owners might continue processing messages before they become aware of the ownership transfer (ZooKeeper cannot guarantee that). Therefore, it is crucial that message handling logic within each consumer is idempotent.

Achieving true exactly-once delivery is challenging and often impractical. The Kafka authors recommend relying on the at-least-once delivery guarantee.

Transfer Efficiency

Traditionally, sending data from local files to remote sockets involves a series of costly data copies:

  1. File to OS Memory: Data is read from the file system into an OS memory page.
  2. OS Memory to Application Memory: The data is copied from the OS memory page into the application's memory space.
  3. Application Memory to Kernel Buffer: The data is copied again, this time from the application's memory to a kernel buffer.
  4. Kernel Buffer to Socket: Finally, the kernel sends the data from the buffer to the network socket.

To circumvent these redundant copy operations, Kafka leverages the sendFile system call. sendFile enables the direct transfer of data from the file system to the network socket, bypassing the intermediate copies and significantly improving performance.

Evaluation

The paper's evaluation primarily focuses on a specific set of use cases. The authors acknowledge that their system achieved significant performance gains due in part to a reduced feature set compared to other systems (e.g., ActiveMQ, RabbitMQ).
  • Producer Throughput: Exceeded 400k messages per second, reaching 800 Mbps, significantly higher than other systems.
  • Consumer Throughput: Surpassed 20k messages per second.
Kafka lacks producer-side acknowledgments. Producers can send batches of messages without receiving individual acknowledgments from the broker. This omission can lead to data loss if the client crashes before committing the messages to Kafka.

Kafka utilizes the Avro serialization protocol (similar to Thrift and Protocol Buffers). Avro offers significantly lower overhead (9 bytes) compared to other protocols (144 bytes).

Related Systems

Message Queues

Publish-Subscribe Systems

  • Google Cloud Pub/Sub: A cloud-based pub/sub service supporting both push and pull delivery models.
  • Apache Pulsar: A popular streaming platform optimized for geo-replication, enabling efficient data distribution across geographically dispersed services. Kafka can support the same through some extensions.

Legacy Systems

The paper mentions other systems, such as Facebook Scribe and Yahoo's Data Highway, which are no longer actively maintained.

Paper Review

The paper is easy to read and provides a good introduction to some fundamental concepts of distributed systems. I would highly recommend it.

Comments

Popular posts from this blog

Paper Insights #18 - Practical Uses of Synchronized Clocks in Distributed Systems

This influential paper was authored by Barbara Liskov , a renowned computer scientist who pioneered the field of distributed systems. Paper Link The paper provides a valuable overview of several groundbreaking systems: At-most-once delivery (SCMP) : This system ensures that a message is delivered at most once, preventing duplicate messages. Authenticator Systems (Kerebos) : This system focuses on secure authentication and authorization within distributed environments. Cache consistency (Echo) : This system addresses the challenges of maintaining data consistency across distributed caches. Distributed Databases (Thor) : This system explores the design and implementation of distributed databases. Replicated File System (Harp) : This system investigates the principles of replicating files across multiple servers for improved availability and performance. While many of these concepts may seem outdated in the context of modern computing, studying them provides crucial insights in...

Paper Insights #16 - Cassandra - A Decentralized Structured Storage System

This research paper, authored by Avinash Lakshman (co-inventor of Amazon Dynamo) and Prashant Malik , originates from Facebook and dates back to 2008. Paper Link Cassandra, in its design, appears to be a synthesis of Amazon's Dynamo (2007) and Google's Bigtable (2006). It draws heavily upon the concepts of both systems. Notably, this paper was published during the rise of these influential databases. However, Cassandra also introduces novel ideas that warrant further investigation. Recommended Read: Dynamo: Amazon's Highly Available Key-value Store Let's begin with some of fundamental concepts. SQL Databases SQL databases are a category of databases which are inherently consistency. This implies that data integrity is always upheld. For instance, in a banking database, the cumulative balance across all accounts must remain unchanged at any time regardless of the number of transfer transactions. To ensure this data consistency (the C in ACID), SQL databases necessita...

Paper Insights #1 - Moving Beyond End-to-End Path Information to Optimize CDN Performance

This highly influential paper on Content Delivery Networks (CDNs) was authored by Rupa Krishnan   et. al, including Sushant Jain, who was listed fourth among the authors. Sushant was a valued colleague of mine at Google Ads Infrastructure, where he served as Senior Engineering Director for many years. Paper Link Before delving into the paper's concepts, which are generally straightforward to grasp, let's explore some relevant background information. OASIS (2006) OASIS , developed by M. Freedman , K. Lakshminarayanan, and my former Distributed Systems (CS244b) professor at Stanford, D. Mazieres , elegantly addresses the critical challenge for Internet: locating the service replica with the lowest latency for a given client. Prior to OASIS Clients naively pinged every service replica to determine the fastest one based on round-trip time (RTT). While highly accurate, this approach suffered from excessive probing and computationally expensive comparisons. OASIS Architecture OASIS i...

Paper Insights #13 - Delta Lake: High Performance ACID Table Storage over Cloud Object Stores

At the 2020 VLDB conference, a notable paper was presented by  Michael Armbrust  (Databricks), with co-authors including CEO  Ali Ghodsi  and  Matei Zaharia . Paper Link Before we delve into the paper's details, I would like to introduce some topics to readers. Cloud Data Store The paper effectively describes the design of a cloud data store. Due to its key-value nature and simple API, it has seen wider adoption than a fully-fledged distributed file system. Popular examples of cloud data stores include  Google Cloud Storage ,  Amazon S3 , and  Azure Blob Storage . Design Points Key-Value Store with Eventual Consistency : Functions as a key-value store with eventual consistency. Keys resemble file paths (strings) while values can be byte arrays ranging from a few kilobytes to terabytes. Data Immutability : In most cloud stores, data is immutable. Appends are possible but generally not optimal. Unlike a file system where appends result in addin...

Paper Insights #5 - The Design and Implementation of a Log-Structured File System

This paper, authored by M. Rosenblum (co-founder of VMware) and J. Ousterhout, explores Log-Structured File Systems (LFS). While LFS was previously considered obsolete, the rise of Solid State Drives (SSDs) has rekindled interest in its core principles, particularly the concept of immutability. Paper Link Modern file systems, such as RAID 5, incorporate principles from log-structured file systems. HP's commercial AutoRAID product, for example, is based on RAID 5. Let's begin with some basic concepts. File A file is an ordered collection of bytes. Files can reside in various locations, such as on disk, in memory, or across a network. This article focuses on disk-based files. While Von Neumann architecture efficiently utilizes processors and memory, the need for files arose from the desire for persistence. Files provide a mechanism to save the results of a program so they can be retrieved and used later, essentially preserving data across sessions. Essentially File is also ...

Paper Insights #15 - Dynamo: Amazon's Highly Available Key-value Store

This groundbreaking paper, presented at SOSP 2007, has become a cornerstone in the field of computer systems, profoundly influencing subsequent research and development. It served as a blueprint for numerous NoSQL databases, including prominent examples like MongoDB ,  Cassandra , and Azure Cosmos DB . Paper Link A deep dive into this work is essential for anyone interested in distributed systems. It explores several innovative concepts that will captivate and enlighten readers. Let's visit some fundamental ideas (with a caution that there are several of them!). Distributed Hash Tables (DHTs) A DHT is a decentralized system that provides a lookup service akin to a traditional hash table. Key characteristics of DHTs include: Autonomy and Decentralization: Nodes operate independently, forming the system without centralized control. Fault Tolerance: The system remains reliable even when nodes join, leave, or fail. Scalability: It efficiently handles systems with thousands or mil...

Paper Insights #22 - A New Presumed Commit Optimization for Two Phase Commit

Lampson and Lomet 's 1993 paper, from the now-defunct DEC Cambridge Research Lab, remains a classic. Paper Link The paper's concept are hard to grasp. My notes below are elaborated, yet, it may require multiple readings to fully comprehend the reasonings. Let's begin by reviewing fundamental concepts of SQL databases. Serializability Transaction serializability guarantees that, while transactions may execute concurrently for performance reasons, the final outcome is effectively equivalent to some sequential execution of those same transactions. The "effectively" part means that the system ensures a consistent, serializable result even if the underlying execution is parallelized. Strict serializability builds upon serializability by adding a temporal dimension. It mandates that once a transaction commits, its effects are immediately visible to all clients (a.k.a. external consistency ). This differs from linearizability, which focuses on single-object operati...

Paper Insights #24 - Spanner: Google's Globally-Distributed Database

This landmark paper, presented at ODSI '12, has become one of Google's most significant contributions to distributed computing. It didn't solve the long-standing core problem of scalability of 2PC in distributed systems, rather, it introduced  TrueTime  that revolutionized system assumptions. Authored by J.C. Corbett , with contributions from pioneers like Jeff Dean and Sanjay Ghemawat , this paper effectively ended my exploration of distributed SQL databases. It represents the leading edge of the field. Paper Link I would highly recommend reading the following before jumping into this article: 1.  Practical Uses of Synchronized Clocks in Distributed Systems where I introduced why clock synchronization is necessary but not sufficient for external consistency. 2.  A New Presumed Commit Optimization for Two Phase Commit where I introduced two-phase commits (2PC) and how it is solved in a distributed system. 3.  Amazon Aurora: Design Considerations for High Th...