Skip to main content

Paper Insights #32 - Photon: Fault-tolerant and Scalable Joining of Continuous Data Streams

Presented at SIGMOD 2013, this paper from Google details another innovation stemming from Google Ads, a platform known for its planet-scale data processing. Notable authors include Ashish Gupta, a senior engineering leader within Google Ads, and Manpreet Singh, a principal engineer at Google. The year 2013 marked a significant period for stream processing, as Google was concurrently developing MillWheel and Dataflow, foundational technologies that influenced the creation of Apache Flink and Apache Beam.

Paper Link

Must Read

Let's begin with what streaming join is.

Streaming Joins

Query engines enable users to query data from a variety of sources. If the retrieved data is in a relational format, it can be joined based on common keys. This process, where joins are performed on static data already retrieved from the sources, is termed a static join.

An alternative approach is the streaming join, where at least one of the data sources is a continuous event stream rather than static data. Streaming joins can be categorized into two types:

  • Stream-Stream Join: This involves joining two independent streams of data.
  • Stream-Table Join: Here, a data stream is joined with a static dataset (table).

Ordered Stream

These are data streams where all events are ordered according to the join key. Joining such streams is straightforward, resembling a merge join operation on two tables with sorted keys, as illustrated in the diagram below.


When a join key doesn't find a match, the outcome depends on the join type: an inner join will discard the unmatched event, while an outer join will pair it with NULL values.

Unordered Stream

These are streams where at least one of the event streams has events arriving out-of-order with respect to the join key when compared to the other stream.

Case: Only One Stream Unordered and No Delayed Events

It is slightly easier to handle cases in which only one of the stream is unordered. A watermark is required on the unordered stream to mark the end of events up to a specific key. This watermark signals to the processing engine the potential arrival of corresponding events from the other stream.

The join operation remains conceptually similar to a merge join. However, watermarks play a vital role in determining when to finalize and trigger the join windows, as illustrated in the diagram below. Watermarks can be as simple as timeouts.


Case: Delayed Events or Both Streams Unordered

Streams can experience delayed event arrival. In the earlier example, an event with a timestamp preceding the watermark of 20 could arrive after the watermark has already been processed. Furthermore, scenarios can involve both input streams being unordered. In both of these situations, stream materialization becomes necessary. A common destination for this materialization is a relational table, which stores all events from the streams. As events are added to one table, a lookup is performed on the other to fire all joined events.



However, this approach can lead to unbounded table growth if the number of distinct keys is high and the join operation is sparse. Several optimizations can be employed based on specific user needs:

  • Watermark-Based Pruning: Watermarks can be used to selectively remove rows from the materialized table. By maintaining the latest watermarks from both streams, the table can be pruned to retain only events newer than these watermarks. For instance, in the previous example, if a watermark of 20 is received on the right stream, all events in the left table with a key less than 20 can be discarded.
  • Single-Join Optimization: If events in one stream are guaranteed to join with exactly one event in the other stream, only the unjoined events from the former stream need to be persisted. Continuing the example, if each event in the right stream joins with exactly one event in the left stream, only the right-stream events that haven't found a match in real-time need to be stored in the right table. Later, when a join occurs, those matched events can be removed immediately.

Online Advertising

Online advertising stands out as a prominent application of streaming joins. The internet has vast majority of literature discussing advertising as a streaming join example. 

Before delving into Photon, it's helpful to understand the basics of how Google's advertising works. Google's frontend servers deliver ads to users viewing webpages. When a user clicks on an ad, this click event is recorded by a separate server, distinct from the ad-serving server. The paper further elaborates on the architectural reasons behind this separation of concerns, primarily driven by performance considerations.

Given Google Ads' massive scale, it handles millions of queries per minute. Advertisers are typically charged based on the clicks their ads receive.

Fundamentally, this involves two continuous event streams:

  • Query Event Stream: Comprising all queries served by the frontend servers.
  • Click Event Stream: Encompassing all ad clicks collected by click servers.

Both streams share a common key, query_id, which serves as the basis for the necessary join operation. A key is composed of <timestamp, server ip, process id> and so it is possible to define a total order on the keys.

Photon

Photon is a streaming join system deployed within Google Ads. Its primary function is to join web search queries with corresponding user click data.

Let's analyze the characteristics of the input streams:

  • The query stream is usually ordered but might contain delayed events.
  • The click stream is unordered (based on query_id) and can include late-arriving events.

Effectively, both streams exhibit characteristics of unordered streams. However, a key advantage is that each click event is guaranteed to join with exactly one query event. Once a click event has been successfully joined, it can be discarded.

The design of Photon aims to achieve the following:

  • At-most-once semantics: Ensuring that each click event is processed no more than once to prevent overcharging.
  • Near real-time accuracy: Maximizing the number of click events joined with their corresponding query events with minimal delay.
  • Eventual exactly-once semantics: Guaranteeing that all late-arriving click events are eventually processed.

The joined output from Photon is critical for various low-latency applications, including billing, click statistics and analysis, spam detection, and budget control. Consequently, Photon must process joins with latencies in the order of seconds.

Strawman

A straightforward approach to joining these events would involve a single server ingesting all data. All incoming query events would be stored in a table. Upon the arrival of a click event, the system would perform a lookup in the query event table to find a match and output the joined result.

If a click event doesn't immediately find a corresponding query event (potentially due to the query event arriving late), the click event would be temporarily held and the lookup retried when the delayed query event arrives.



While this basic design is conceptually sound and could function for the Google Ads use case, it falls short in providing the necessary fault tolerance and scalability required to handle Google Ads' immense volume of millions of clicks per minute.

Architecture

At a high level, Photon is designed for multi-homing, providing resilience against datacenter-level failures.


All query events are stored in a dedicated query event store. The Photon pipeline operates across multiple data centers, coordinating through IdRegistry. This component ensures, in a fault-tolerant manner, that each event is processed exclusively within a single datacenter.

Within each datacenter, a Dispatcher processes incoming click events. The Dispatcher continuously attempts to process each click event until successful, guaranteeing at-least-once processing. It then forwards the click event to the Joiner. The Joiner's role is to look up the corresponding query in the query event store and perform a crucial step: inserting the click's ID into the IdRegistry. This insertion ensures the at-most-once processing of each click. Finally, the joined event is outputted to the downstream sink.

IdRegistry

IdRegistry is an key-value store that maintains consistent replication across multiple datacenters. This helps with at-most-once guarantee for click event processing.

Internally, IdRegistry is built upon PaxosDB, a fault-tolerant, distributed key-value store. PaxosDB utilizes the Paxos algorithm to ensure a single master and linearizability for all updates, which are routed through the Paxos leader. For those familiar with distributed systems, this resembles Apache ZooKeeper in its function of providing consistent coordination, albeit with a raw key-value data model instead of a hierarchical file system. A key difference is that while ZooKeeper is typically confined to a datacenter or zone, IdRegistry is globally replicated. Operating Paxos at this global scale presents significant challenges, which Photon addresses through:

Server-Side Batching

Committing larger groups of click events together, leveraging the system's tolerance for slight delays to improve efficiency.

Sharding

Partitioning the IdRegistry into multiple independent PaxosDB instances, each responsible for a specific key range. However, sharding can introduce consistency challenges during changes. To mitigate this, IdRegistry employs TrueTime.

TrueTime provides guarantees on clock skew across machines, ensuring that the difference is bounded by a small delta (ϵ). An event intended for time T will certainly have occurred by T + ϵ. Consequently, when a shard change is scheduled at time T, it will be completed by T + ϵ, and all shards will use the new sharding information. This allows for consistency even during sharding updates, using the old modulo before T + ϵ and the new modulo at or after T + ϵ. This approach mirrors the strategy in Spanner for managing Paxos leader transitions.

Fault Tolerance

Let's consider potential failure scenarios to understand the design considerations for each component.

What if the Dispatcher fails before processing a click event and sending it to the Joiner?

This scenario poses no harm. Click events are not directly streamed for joining but are stored as logs. These logs are further divided into smaller files, and the Dispatcher maintains its processing state by periodically recording the file offset up to which events have been processed.

Regardless of when a Dispatcher crashes, the worst-case outcome is a restart that reprocesses click events from the beginning of the last recorded offset. However, the Dispatcher's responsibility is to ensure at-least-once processing, so this reprocessing is acceptable.

What if the Joiner fails after receiving a click from the Dispatcher?

  • Before recording in IdRegistry: This is effectively a no-op. The Dispatcher will retry sending the click event to another Joiner instance.
  • After recording in IdRegistry but before outputting the joined event: This is where the value for a key stored in IdRegistry becomes crucial. The IdRegistry stores a token against a click which uniquely identifies the joiner which added the event. When a Joiner restarts and receives the same click event (due to the Dispatcher's at-least-once delivery), it will attempt to record the click in IdRegistry again with the same token. IdRegistry will return success as the token would match.
Despite this mechanism, it's still possible for joined events to be missed in the output. This can occur if a subsequent retry from the Dispatcher is handled by a different Joiner, whose attempt to add the click to IdRegistry will be rejected (as it would be already present and the token won't match). To guarantee eventual exactly-once semantics, a separate offline process analyzes both the IdRegistry and the output sink. This analysis identifies any missing joined events, which are then added to the output.

Another potential approach would have been to ensure atomicity between writing to IdRegistry and writing the joined event to the output sink, possibly through a two-phase commit protocol. However, the paper notes that the join process involves adapters and several user-defined annotations. These computations are likely expensive, making it costly to re-execute them if a two-phase commit transaction were to fail.

What happens if a query event arrives late?

In this case, the Dispatcher stores the corresponding click events in the local Google File System. This ensures that these "orphaned" click events will eventually be processed at least once when the late query events finally arrive and are processed by a subsequent Joiner.

The Exactly-Once Semantics

The Dispatcher's at-least-once delivery combined with IdRegistry's at-most-once guarantee combined with offline processing collectively provides Photon's exactly-once processing semantics.

Misc Optimizations

Photon incorporates several key optimizations to enhance its performance.

Pruning Old Keys

As previously mentioned, a click event joins with at most one query event. Therefore, once a click event has been successfully joined, its entry in IdRegistry can be removed. However, before removal, it's crucial to ensure these click events won't reappear from the Click Logs.

To manage the size of IdRegistry, a garbage collection policy is in place, set to retain entries for N days.

Dispatcher IdRegistry Lookup

To avoid unnecessary processing, the Dispatcher performs an initial lookup in IdRegistry before forwarding a click event to a Joiner. If the click is already present, it is assumed the event has been processed.

EventStore

The EventStore implementation features a two-tiered architecture: CacheEventStore and LogsEventStore.

CacheEventStore leverages the temporal locality of events, where recent query events are likely to be associated with recent clicks. This in-memory cache, sharded by the hash of query_id, stores several minutes' worth of query events, significantly reducing disk I/O.

LogsEventStore provides persistent storage for all events in log files. It also maintains a log file map to enable rapid lookup of an event's offset within the log files based on its query_id.

Production Numbers

  • Photon achieves an end-to-end latency (p90) of 7-8 seconds from event log to join completion.
  • Server-side batching effectively reduces the number of transactions on PaxosDB.
  • The Dispatcher's lookup on IdRegistry saves resources. Joiners have only about 5% overlap in the events they process.
  • Less than 0.0001% of joined events are missed by Joiners after registering in IdRegistry.

Paper Review

Fundamentally, from an academic viewpoint, Photon's core concept is quite simple, resembling a basic stream joining system as initially described. The primary value of this paper lies in the extensive engineering work and numerous optimizations required to build and operate such a system at Google's scale. For those already familiar with the principles of stream joins, the paper should be a relatively straightforward read.

Comments

Popular Posts

Paper Insights #25 - CliqueMap: Productionizing an RMA-Based Distributed Caching System

Memcached is a popular in-memory cache, but I'd like to discuss CliqueMap, Google's caching solution. Having worked closely with CliqueMap, I have a deep understanding of its architecture. One major difference from Memcached is CliqueMap's use of RMA for reads. We'll also take a closer look at RDMA, a crucial cloud technology that emerged in the 2010s. Paper Link Let's begin with some basic concepts. Network Interface Card (NIC) The NIC facilitates data reception and transmission. Understanding its operation requires examining the fundamental interaction between the CPU and memory. CPU <-> Memory Communication In a Von Neumann Architecture , the CPU and memory are core components, enabling Turing computation. Their communication relies on the system bus (e.g. PCIe ), a set of electrical pathways connecting the CPU, memory, and I/O devices. The system bus comprises three primary logical components: Data Bus : Bidirectional, carrying the actual data being tran...

Paper Insights #26 - Don't Settle for Eventual: Scalable Causal Consistency for Wide-Area Storage with COPS

This work provides a strong foundation for understanding causality , both within distributed systems and more broadly. Its principles underpin systems achieving causal consistency, a powerful form of consistency that ensures high availability. Presented at SOSP 2011, this paper features contributions from prominent distributed systems researchers Wyatt Lloyd and Michael Freedman . Paper Link Let's begin with some basic concepts. Causal Ordering In 1978, Leslie Lamport published Time, Clocks, and the Ordering of Events in a Distributed System , a seminal paper that significantly impacted distributed system design. This work, alongside Paxos and TLA+ , stands as one of Lamport's most influential contributions. A fundamental challenge in distributed systems is clock synchronization . Perfect synchronization is unattainable, a fact rooted in both computer science and physics. However, the goal isn't perfect synchronization itself, but rather the ability to totally order even...

Paper Insights #24 - Spanner: Google's Globally-Distributed Database

This landmark paper, presented at ODSI '12, has become one of Google's most significant contributions to distributed computing. It didn't solve the long-standing core problem of scalability of 2PC in distributed systems, rather, it introduced  TrueTime  that revolutionized system assumptions. Authored by J.C. Corbett , with contributions from pioneers like Jeff Dean and Sanjay Ghemawat , this paper effectively ended my exploration of distributed SQL databases. It represents the leading edge of the field. Paper Link I would highly recommend reading the following before jumping into this article: 1.  Practical Uses of Synchronized Clocks in Distributed Systems where I introduced why clock synchronization is necessary but not sufficient for external consistency. 2.  A New Presumed Commit Optimization for Two Phase Commit where I introduced two-phase commits (2PC) and how it is solved in a distributed system. 3.  Amazon Aurora: Design Considerations for High Th...