Skip to main content

Paper Insights #11 - Apache Flink™: Stream and Batch Processing in a Single Engine

This paper was presented at the IEEE International Conference on Cloud Engineering Workshop in 2015. Workshops, generally considered less prestigious than the main conference itself, provide a forum for more focused discussions and specialized research. The authors of the paper acknowledge that the work presented is not solely their own, demonstrating commendable modesty in recognizing the contributions of others.

Batch and Stream Processing

Data processing involves performing operations on collections of data, often represented as records. These records can reside in various locations, such as database tables or files. Two primary approaches to data processing are:

Batch Processing

Processes data in discrete batches or groups. There are certain primitive operations in batch processing:

  • Map: Applies a function to each individual record within a batch, producing an output for each input.
  • Reduce: Aggregates the results of the map operation, often by combining values associated with the same key.
  • Shuffle: (Optional) Redistributes data between processing stages, typically between the map and reduce phases, to improve data locality and efficiency.

Example

Counting word frequencies across multiple documents:
  • Map: Count occurrences of each word within each document.
  • Shuffle: Group all occurrences of the same word from different documents.
  • Reduce: Sum the counts for each word across all documents.
Batch processing is suitable for tasks that can be processed periodically, such as batch updates, nightly reports, and data warehousing.

Stream Processing

Processes data continuously as it arrives, one record at a time. Prime characteristics are:

  • Low latency: Processes data with minimal delay.
  • Real-time analysis: Enables immediate insights and responses to events.
Stream processing is ideal for real-time applications like fraud detection, stock market analysis, and IoT data processing.

A Bit of History

Early systems, such as FlumeJava (2010) built upon Google's MapReduce framework, primarily focused on batch processing. Apache Spark, another prominent early player, also initially centered around batch processing capabilities.

Around 2013, the landscape shifted towards stream processing. Spark Streaming emerged, addressing stream processing by dividing the incoming data stream into short intervals and processing each interval as a mini-batch. While effective, this approach retains some characteristics of batch processing. 

Google's Millwheel, on the other hand, pioneered true stream processing by enabling continuous, real-time processing of data with features like stateful operations (e.g., incrementing a counter for each incoming record).

The terms "batch-native" and "stream-native" are often used to categorize systems. Batch-native systems, like early versions of Spark, were initially designed for batch processing and later extended to support stream processing. Conversely, stream-native systems, such as Apache Flink, were inherently designed for stream processing and subsequently adapted to handle batch processing as a special case.

Apache Flink, drawing significant inspiration from Millwheel, exemplifies a stream-native approach. Its core architecture is fundamentally designed for stream processing, with batch processing effectively treated as a specialized form of stream processing.

Data Flow Graph

A data flow graph is a directed acyclic graph (DAG) consisting of stateful operators.

This example illustrates data flow in a stream processing system by extending the word frequency count scenario to real-time processing:

  • Counter: This is a stateless operator. It receives a document as input and produces a map of words to their frequencies within that document.

  • Shuffle: Another stateless operator, it distributes individual words from the word-frequency maps to corresponding "Adder" operators based on the word's alphabetical range. This ensures efficient distribution of data.

  • Adder: A stateful operator responsible for maintaining the current count of each word across all processed documents.

    • It receives word-frequency maps from the "Shuffle" operator.
    • Updates its internal state by adding the frequencies of incoming words to the existing counts.
    • Can be sharded (e.g., one Adder for words A-M, another for N-Z) to improve performance and scalability.
    • The current state of the Adders (word counts) can be visualized on an analytics dashboard to provide real-time insights.

This example demonstrates a basic data flow in a stream processing system. However, real-world systems incorporate a much wider range of operators, enabling complex data transformations and analyses.

Pipelined Operators

In stream-native processing systems, operators must be designed to support continuous data flow, enabling efficient pipelining of events. Key examples of pipelined operators include:

  • Window:

    • Collects a stream of events within a defined time window (e.g., sliding window).
    • Pipelined because the collection of subsequent windows can proceed concurrently, without waiting for the processing of previous windows to complete.
    • In systems like Flink, batch processing can be emulated by applying a window operator to collect all events within a specific batch interval.
  • Map:

    • Applies a function to each incoming event independently.
    • Highly pipelined as each event can be processed concurrently without relying on other events.
  • Group By (Shuffle):

    • Redistributes events based on a key.
    • Pipelined because events are continuously forwarded to the appropriate downstream operator (e.g., reducer) responsible for the corresponding key.
  • Reduce:

    • Aggregates incoming events.
    • Supports "rolling reduce" for continuous aggregation, requiring an associative function (e.g., sum, count).
    • The output represents the ongoing reduction of all events seen thus far.
  • Interval Join:

    • The only type of join that can be effectively pipelined.
    • Joins events within a specific time interval.
    • Essentially equivalent to applying a window operator followed by a join on the accumulated events within that window.

Non-pipelined Operators

Certain operators are inherently non-pipelined, meaning they cannot process data continuously and require access to the entire dataset before producing results.

Example: A sort-merge join that requires sorting the entire dataset before performing the join operation is a classic example of a non-pipelined operation.

Data Exchange in Flink

Operators in Flink communicate by exchanging data.

  • Pipelined Streams: Characterize continuous, uninterrupted data flow from non-window operators (e.g., Counter, Shuffle), ideal for stream processing.
  • Blocking Streams: Generated by window operators, requiring the entire window to be collected before processing, typically used in batch processing.
Operators exchange data through serialization and buffering (accumulating data before transmission). Buffers are periodically flushed to the consumer, improving throughput but potentially introducing latency.

Control Events in Flink

In addition to data records, control messages can flow through the data flow graph, enabling the triggering of specific conditions or actions within the processing pipeline.

Watermarks

Stream processing involves two notions of time:

  • Processing Time: The time at which an event is processed by the system.
  • Event Time: The time at which the event occurred in the real world.

Windowing operations rely heavily on these time concepts.

  • Processing Time Windows: These windows are defined by the system's processing time. Events arriving within a specific processing time window are grouped together. This approach can be problematic when event arrival times are unpredictable, as processing time may not accurately reflect the true event times.

  • Event Time Windows: These windows are defined by the event times themselves. Events with event times falling within a specific window are grouped together. This approach provides a more accurate representation of real-world event occurrences.

For example, consider a stream of application logs from mobile devices. Due to network latency, these logs may experience delays before reaching the processing system. This discrepancy between the event's actual occurrence time (event time) and its processing time can lead to inaccurate results when using processing time windows.

Watermarks address this challenge. A watermark signals that no further events with an event time earlier than the watermark's timestamp are expected to arrive. This allows the system to proceed with window processing even if some late events might still be in transit.

In other words, processing time advances monotonically with the system clock. In contrast, event time requires explicit mechanisms like watermarks to progress, ensuring timely window closure and subsequent processing.

Limitations of Watermarks

While watermarks effectively advance event time processing, they can be less effective in handling significantly late events (e.g., events arriving long after the watermark has passed).

In our example, what happens if a user disconnects from the internet? Logs generated during this period might be delayed significantly. The watermark may have already progressed? That where Apache Beam is useful. It has the capability of handling late events.

Recommended ReadThe Dataflow Model: A Practical Approach to Balancing Correctness, Latency, and Cost in Massive-Scale, Unbounded, Out-of-Order Data Processing

Examples

  • Spark Streaming: Only works with processing time windows.
  • Flink: Supports both processing time and event time windows, providing greater flexibility.
  • Apache Beam: Offers robust support for handling late events, providing more sophisticated mechanisms for dealing with data arriving out of order.

Checkpoint Barriers

Flink guarantees exactly-once processing semantics, ensuring that each event in the stream affects the system state exactly once. To achieve this, Flink leverages Asynchronous Barrier Snapshotting (ABS).

  • ABS Mechanism: Checkpoint barriers are propagated through the operator pipeline. Upon encountering a barrier, each operator records its current state (e.g., the current count in a counter operator) to a persistent storage. This creates a consistent snapshot of the system's state across all operators.

  • Restart and Recovery: In case of a failure, operators can recover from the last successful checkpoint. They resume processing from the point where the checkpoint was taken, replaying events and applying updates to their state accordingly.

Limitations of ABS

ABS ensures exactly-once processing within the Flink system itself. However, it cannot prevent duplicate external triggers by the processing logic. For instance, if an operator initiates an external transaction (e.g., booking a flight ticket) as part of its processing, recovering from a checkpoint will result in the transaction being executed again, leading to duplicates.

Extending Exactly-Once Semantics Beyond Flink

Since 2017, Flink has extended its exactly-once guarantees to external systems. This is achieved through a two-phase commit protocol:

  1. Prepare Phase: When the last operator in the pipeline receives a checkpoint barrier, it initiates a two-phase commit with all downstream sinks. Before committing any data, the operator buffers all output records in memory.

  2. Commit Phase: Once all operators have successfully recorded their checkpoints, the last operator commits all buffered output records to the sinks.

Example: Kafka Sink

For a Kafka sink, the last operator starts a Kafka transaction when a checkpoint barrier arrives. Subsequent barriers trigger the completion of the previous transaction, ensuring that each event is written to Kafka exactly once.

Windowing Algorithms in Flink

Flink offers several windowing algorithms for grouping and processing data streams:

Tumbling Windows

  • Divide the data stream into fixed-size, non-overlapping intervals.
  • Suitable for applications requiring regular, discrete time intervals for analysis, such as hourly or daily aggregations.

Sliding Windows

  • Divide the data stream into fixed-size windows that overlap.
  • Enable continuous analysis with overlapping intervals, useful for rolling averages or trend detection.

Session Windows

  • Dynamically sized windows defined by periods of inactivity in the data stream.
  • Ideal for analyzing user sessions or other scenarios where activity patterns are irregular.

Global Windows

  • Unbounded windows that encompass the entire data stream.
  • Typically used for global aggregations or analyses requiring consideration of all available data.
  • Require an external trigger (e.g., a manual command or a scheduled event) to close the window and trigger processing.

Bulk Synchronous Parallel v/s Stale Synchronous Parallel

BSP requires all workers to synchronize at the end of each iteration, leading to potential bottlenecks due to barrier synchronization.


SSP allows workers to proceed independently, improving speed but potentially compromising consistency.


SSP is suitable for algorithms like stochastic gradient descent where some degree of staleness is acceptable.

Difference b/w Apache Flink, Apache Spark, Apache Storm?

Apache FlinkDesigned primarily for stream processing with strong support for stateful computations and event-time semantics. Ensures data is processed exactly once, even in case of failures.

Apache SparkExcels at batch processing tasks, with stream processing capabilities added later. Known for its fast and general-purpose nature, supporting a wide range of data processing workloads.

Apache StormFocused solely on real-time stream processing with high throughput and low latency. Guarantees that each message will be processed at least once, but may be processed multiple times in case of failures.

Comments

Popular Posts

Paper Insights #25 - CliqueMap: Productionizing an RMA-Based Distributed Caching System

Memcached is a popular in-memory cache, but I'd like to discuss CliqueMap, Google's caching solution. Having worked closely with CliqueMap, I have a deep understanding of its architecture. One major difference from Memcached is CliqueMap's use of RMA for reads. We'll also take a closer look at RDMA, a crucial cloud technology that emerged in the 2010s. Paper Link Let's begin with some basic concepts. Network Interface Card (NIC) The NIC facilitates data reception and transmission. Understanding its operation requires examining the fundamental interaction between the CPU and memory. CPU <-> Memory Communication In a Von Neumann Architecture , the CPU and memory are core components, enabling Turing computation. Their communication relies on the system bus (e.g. PCIe ), a set of electrical pathways connecting the CPU, memory, and I/O devices. The system bus comprises three primary logical components: Data Bus : Bidirectional, carrying the actual data being tran...

Paper Insights #26 - Don't Settle for Eventual: Scalable Causal Consistency for Wide-Area Storage with COPS

This work provides a strong foundation for understanding causality , both within distributed systems and more broadly. Its principles underpin systems achieving causal consistency, a powerful form of consistency that ensures high availability. Presented at SOSP 2011, this paper features contributions from prominent distributed systems researchers Wyatt Lloyd and Michael Freedman . Paper Link Let's begin with some basic concepts. Causal Ordering In 1978, Leslie Lamport published Time, Clocks, and the Ordering of Events in a Distributed System , a seminal paper that significantly impacted distributed system design. This work, alongside Paxos and TLA+ , stands as one of Lamport's most influential contributions. A fundamental challenge in distributed systems is clock synchronization . Perfect synchronization is unattainable, a fact rooted in both computer science and physics. However, the goal isn't perfect synchronization itself, but rather the ability to totally order even...

Paper Insights #24 - Spanner: Google's Globally-Distributed Database

This landmark paper, presented at ODSI '12, has become one of Google's most significant contributions to distributed computing. It didn't solve the long-standing core problem of scalability of 2PC in distributed systems, rather, it introduced  TrueTime  that revolutionized system assumptions. Authored by J.C. Corbett , with contributions from pioneers like Jeff Dean and Sanjay Ghemawat , this paper effectively ended my exploration of distributed SQL databases. It represents the leading edge of the field. Paper Link I would highly recommend reading the following before jumping into this article: 1.  Practical Uses of Synchronized Clocks in Distributed Systems where I introduced why clock synchronization is necessary but not sufficient for external consistency. 2.  A New Presumed Commit Optimization for Two Phase Commit where I introduced two-phase commits (2PC) and how it is solved in a distributed system. 3.  Amazon Aurora: Design Considerations for High Th...