This paper was authored by Jay Kreps, Neha Narkhede, and Jun Rao. This seminal paper, presented at the NetDB '11 workshop, laid the foundation for Apache Kafka, a highly influential open-source project in the realm of distributed systems.
While the paper initially focused on a specific use case – log processing – Kafka has since evolved into a versatile and robust platform for general message delivery. Both Jay Kreps and Neha Narkhede went on to co-found Confluent Inc., a company commercializing Kafka.
Although workshop papers typically carry less weight than conference papers, this particular work garnered significant attention and has had a profound impact on the field. The paper's relatively weak evaluation section may have contributed to its non-selection for the main conference track. However, this in no way diminishes its significance and the lasting influence of Apache Kafka.
Messaging Systems
Messaging systems facilitate the exchange of messages between different components within a distributed system. The sender of a message is known as the producer and the receiver is known as the consumer.
Two prominent types of messaging systems are:
-
Message Queues: These systems employ a point-to-point delivery model, where each message is consumed by only one consumer. This is analogous to a one-to-one communication channel.
-
Publish-Subscribe (Pub/Sub): In Pub/Sub systems, a message published by a producer can be received by multiple consumers. This enables one-to-many communication.
Scenarios
Message Queue: Consider an e-commerce scenario where customer orders are placed. These orders are enqueued for processing. A dedicated worker process retrieves each order from the queue and executes the necessary actions, such as deducting funds from the customer's account and forwarding the order to the supplier.Pub/Sub: In an online shopping application, maintaining accurate inventory information is crucial. When an item's availability changes, a message is published by, say, a change detector. All micro-services that need inventory information (e.g., product pages, shopping cart) receive the message and update their states, ensuring consistent data across the system.
Delivery Guarantees
Messaging systems offer varying levels of delivery guarantees:
At-least-once Delivery (Recommended)
At-most-once Delivery
Exactly-once Delivery (Transactional Delivery)
Ordering Guarantees
Messaging systems provide varying levels of message ordering guarantees:
-
Total Order: Messages are delivered to consumers in the exact same order they were produced. This ensures strict sequential processing.
-
Partial Order: Messages are delivered in a partially defined order. This order is typically determined for messages produced within a specific context, such as those originating from the same producer or within the same batch. Messages across different contexts may be delivered in any order relative to each other.
-
Out-of-Order: Messages can be delivered to consumers in any order, regardless of their production sequence.
A key trade-off exists between ordering and performance. Out-of-order delivery can achieve lower latency because it avoids the need to wait for prior messages. In contrast, maintaining total or partial order can introduce head-of-line blocking, where subsequent messages are delayed while waiting for earlier messages to be processed.
Push vs Pull Model
Consumers generally have the option to either actively "pull" messages from a queue or passively receive them via a "push" mechanism, often implemented using a Remote Procedure Call (RPC) protocol. While push models offer lower latency, they can have lower throughput due to the overhead associated with processing each individual RPC, which consumes valuable computational resources. Conversely, pull-based models may introduce higher latency due to the need for periodic polling. However, they can improve throughput by enabling batch processing of messages, optimizing resource utilization.
Interestingly, many systems that internally support asynchronous push-based calls often implement their consumer libraries using a pull-based model under the hood. This design choice can significantly enhance overall throughput.
PubSub Topics
In a pub/sub system, topics act as channels for routing messages to consumers. Producers (publishers) categorize each message by assigning it to a specific topic. Consumers (subscribers) then subscribe to the topics they're interested in and receive only the messages published to those topics.
To further enhance scalability and throughput, topics are often sharded into sub-topics (also called partitions). Producers write messages to one of these sub-topics, while consumers subscribing to the main topic receive messages from all of its sub-topics. This sharding allows different parts of the system (e.g., different servers or processes) to handle different sub-topics concurrently, thus distributing the workload and improving performance. Each topic shard (sub-topic) is typically managed by a distinct set of resources (e.g., jobs or processes) within the system.
Kafka Architecture
Kafka primarily functions as a publish-subscribe system.
Architecture
Kafka utilizes a topic-based architecture. Each topic is further divided into multiple partitions. A dedicated server, known as a broker, is responsible for handling a specific partition. A broker may handle multiple such partitions.
Guarantees
It offers the following key guarantees:
- At-least-once delivery: Ensures that each message is delivered to the consumer at least once. In most scenarios, Kafka achieves exactly-once delivery. However, consumer restarts can potentially lead to duplicate message deliveries.
- Partial ordering: Guarantees that messages within the same partition are delivered to the consumer in the order they were produced.
Consumer Behavior
Consumers can subscribe to specific topics. A consumer can create multiple "sub-streams" for a single topic. Messages destined for the topic are then multiplexed and delivered across these sub-streams.
Note: Kafka can also operate as a traditional message queue when there is only a single consumer for a given topic.
The Broker Layer
Brokers store messages in log files. Each log file is segmented into approximately 1GB files. Messages are appended sequentially to the current segment file. When a segment file reaches its maximum size, it "rolls over" and a new segment file is created.Message Addressing and Ordering
Each message is uniquely identified by its "offset", a logical position within the log stream. This offset serves as the message ID. Consumers consume messages from a topic partition sequentially, ensuring total ordering within the partition and partial ordering within a topic. If a topic has only one partition, it guarantees total order for all messages within that topic.
Broker Statelessness
When a consumer receives a message, it can independently calculate the offset of the next message. This client-side calculation eliminates the need for broker involvement in message tracking, allowing the broker to maintain a stateless architecture.
Message Deletion
To manage storage space, brokers employ a simple time-based policy for message deletion. Typically, messages are retained for a period of 7 days before being automatically deleted.Consumer Groups
Kafka employs the concept of "consumer groups", where a group consists of multiple consumers that collectively subscribe to a single topic. While each message within a topic is supposed to be delivered to exactly one consumer within the group, this does not guarantee exactly-once delivery across the entire system, as will be discussed later.
Kafka itself does not actively manage message delivery to specific consumers within a group. This responsibility lies with the consumers themselves, who must coordinate their actions. To facilitate this coordination, consumers leverage ZooKeeper, a distributed coordination service. ZooKeeper enables consumers to acquire and maintain exclusive ownership of specific partitions within a topic.
On a side note however, ZooKeeper's ownership mechanism relies on timing assumptions and is not entirely foolproof.
Partition Assignment and Coordination
Consumers coordinate to determine which partitions each member of the group will consume. This involves:
-
Maintaining Registries: ZooKeeper hosts three key registries:
- Consumer Registry: Tracks all consumers belonging to a particular group.
- Ownership Registry: Records the "owner" (consumer) for each partition within the topic.
- Offset Registry: Stores the offset of the last message consumed from each partition.
-
Partition Assignment: When the consumer registry changes (e.g., a consumer joins or leaves the group), all consumers are notified by ZooKeeper. They then execute a deterministic algorithm to re-assign partitions among themselves. This algorithm ensures that all consumers arrive at the same partition assignment, regardless of the order in which they execute the algorithm. The algorithm divides the partitions into N groups (where N is the number of consumers) and assigns partitions to consumers in a round-robin fashion based on their IDs.
Offset Commits and Message Re-delivery
Consumers are responsible for committing their offsets to the offset registry. If a consumer fails before committing an offset, the message associated with that offset may be re-delivered to a different consumer within the group.
For example, in Algorithm 1, the critical step involves assigning ownership of partition p to consumer Ci. This is the lock acquisition phase, mediated by ZooKeeper. ZooKeeper ensures that only the designated owner, Ci, can subsequently update the offset for partition p in the registry. However, existing owners might continue processing messages before they become aware of the ownership transfer (ZooKeeper cannot guarantee that). Therefore, it is crucial that message handling logic within each consumer is idempotent.
Achieving true exactly-once delivery is challenging and often impractical. The Kafka authors recommend relying on the at-least-once delivery guarantee.
Transfer Efficiency
Traditionally, sending data from local files to remote sockets involves a series of costly data copies:
- File to OS Memory: Data is read from the file system into an OS memory page.
- OS Memory to Application Memory: The data is copied from the OS memory page into the application's memory space.
- Application Memory to Kernel Buffer: The data is copied again, this time from the application's memory to a kernel buffer.
- Kernel Buffer to Socket: Finally, the kernel sends the data from the buffer to the network socket.
To circumvent these redundant copy operations, Kafka leverages the sendFile
system call. sendFile
enables the direct transfer of data from the file system to the network socket, bypassing the intermediate copies and significantly improving performance.
Evaluation
- Producer Throughput: Exceeded 400k messages per second, reaching 800 Mbps, significantly higher than other systems.
- Consumer Throughput: Surpassed 20k messages per second.
Related Systems
Message Queues
- RabbitMQ: A widely-used open-source message broker.
- Apache ActiveMQ: Implements the Java Message Service (JMS) specification.
Publish-Subscribe Systems
- Google Cloud Pub/Sub: A cloud-based pub/sub service supporting both push and pull delivery models.
- Apache Pulsar: A popular streaming platform optimized for geo-replication, enabling efficient data distribution across geographically dispersed services. Kafka can support the same through some extensions.
Comments
Post a Comment