Presented at SIGMOD 2013, this paper from Google details another innovation stemming from Google Ads, a platform known for its planet-scale data processing. Notable authors include Ashish Gupta, a senior engineering leader within Google Ads, and Manpreet Singh, a principal engineer at Google. The year 2013 marked a significant period for stream processing, as Google was concurrently developing MillWheel and Dataflow, foundational technologies that influenced the creation of Apache Flink and Apache Beam.
Must Read:
- Paper Insights - Apache Flink™: Stream and Batch Processing in a Single Engine
- Paper Insights - The Dataflow Model: A Practical Approach to Balancing Correctness, Latency, and Cost in Massive-Scale, Unbounded, Out-of-Order Data Processing
Let's begin with what streaming join is.
Streaming Joins
Query engines enable users to query data from a variety of sources. If the retrieved data is in a relational format, it can be joined based on common keys. This process, where joins are performed on static data already retrieved from the sources, is termed a static join.
An alternative approach is the streaming join, where at least one of the data sources is a continuous event stream rather than static data. Streaming joins can be categorized into two types:
- Stream-Stream Join: This involves joining two independent streams of data.
- Stream-Table Join: Here, a data stream is joined with a static dataset (table).
Ordered Stream
These are data streams where all events are ordered according to the join key. Joining such streams is straightforward, resembling a merge join operation on two tables with sorted keys, as illustrated in the diagram below.
When a join key doesn't find a match, the outcome depends on the join type: an inner join will discard the unmatched event, while an outer join will pair it with NULL values.
Unordered Stream
These are streams where at least one of the event streams has events arriving out-of-order with respect to the join key when compared to the other stream.
Case: Only One Stream Unordered and No Delayed Events
It is slightly easier to handle cases in which only one of the stream is unordered. A watermark is required on the unordered stream to mark the end of events up to a specific key. This watermark signals to the processing engine the potential arrival of corresponding events from the other stream.
The join operation remains conceptually similar to a merge join. However, watermarks play a vital role in determining when to finalize and trigger the join windows, as illustrated in the diagram below. Watermarks can be as simple as timeouts.
Case: Delayed Events or Both Streams Unordered
Streams can experience delayed event arrival. In the earlier example, an event with a timestamp preceding the watermark of 20 could arrive after the watermark has already been processed. Furthermore, scenarios can involve both input streams being unordered. In both of these situations, stream materialization becomes necessary. A common destination for this materialization is a relational table, which stores all events from the streams. As events are added to one table, a lookup is performed on the other to fire all joined events.
However, this approach can lead to unbounded table growth if the number of distinct keys is high and the join operation is sparse. Several optimizations can be employed based on specific user needs:
- Watermark-Based Pruning: Watermarks can be used to selectively remove rows from the materialized table. By maintaining the latest watermarks from both streams, the table can be pruned to retain only events newer than these watermarks. For instance, in the previous example, if a watermark of 20 is received on the right stream, all events in the left table with a key less than 20 can be discarded.
- Single-Join Optimization: If events in one stream are guaranteed to join with exactly one event in the other stream, only the unjoined events from the former stream need to be persisted. Continuing the example, if each event in the right stream joins with exactly one event in the left stream, only the right-stream events that haven't found a match in real-time need to be stored in the right table. Later, when a join occurs, those matched events can be removed immediately.
Online Advertising
Online advertising stands out as a prominent application of streaming joins. The internet has vast majority of literature discussing advertising as a streaming join example.
Before delving into Photon, it's helpful to understand the basics of how Google's advertising works. Google's frontend servers deliver ads to users viewing webpages. When a user clicks on an ad, this click event is recorded by a separate server, distinct from the ad-serving server. The paper further elaborates on the architectural reasons behind this separation of concerns, primarily driven by performance considerations.
Given Google Ads' massive scale, it handles millions of queries per minute. Advertisers are typically charged based on the clicks their ads receive.
Fundamentally, this involves two continuous event streams:
- Query Event Stream: Comprising all queries served by the frontend servers.
- Click Event Stream: Encompassing all ad clicks collected by click servers.
Both streams share a common key, query_id, which serves as the basis for the necessary join operation. A key is composed of <timestamp, server ip, process id> and so it is possible to define a total order on the keys.
Photon
Photon is a streaming join system deployed within Google Ads. Its primary function is to join web search queries with corresponding user click data.
Let's analyze the characteristics of the input streams:
- The query stream is usually ordered but might contain delayed events.
- The click stream is unordered (based on query_id) and can include late-arriving events.
Effectively, both streams exhibit characteristics of unordered streams. However, a key advantage is that each click event is guaranteed to join with exactly one query event. Once a click event has been successfully joined, it can be discarded.
The design of Photon aims to achieve the following:
- At-most-once semantics: Ensuring that each click event is processed no more than once to prevent overcharging.
- Near real-time accuracy: Maximizing the number of click events joined with their corresponding query events with minimal delay.
- Eventual exactly-once semantics: Guaranteeing that all late-arriving click events are eventually processed.
The joined output from Photon is critical for various low-latency applications, including billing, click statistics and analysis, spam detection, and budget control. Consequently, Photon must process joins with latencies in the order of seconds.
Strawman
A straightforward approach to joining these events would involve a single server ingesting all data. All incoming query events would be stored in a table. Upon the arrival of a click event, the system would perform a lookup in the query event table to find a match and output the joined result.
If a click event doesn't immediately find a corresponding query event (potentially due to the query event arriving late), the click event would be temporarily held and the lookup retried when the delayed query event arrives.
While this basic design is conceptually sound and could function for the Google Ads use case, it falls short in providing the necessary fault tolerance and scalability required to handle Google Ads' immense volume of millions of clicks per minute.
Architecture
At a high level, Photon is designed for multi-homing, providing resilience against datacenter-level failures.
All query events are stored in a dedicated query event store. The Photon pipeline operates across multiple data centers, coordinating through IdRegistry. This component ensures, in a fault-tolerant manner, that each event is processed exclusively within a single datacenter.
Within each datacenter, a Dispatcher processes incoming click events. The Dispatcher continuously attempts to process each click event until successful, guaranteeing at-least-once processing. It then forwards the click event to the Joiner. The Joiner's role is to look up the corresponding query in the query event store and perform a crucial step: inserting the click's ID into the IdRegistry. This insertion ensures the at-most-once processing of each click. Finally, the joined event is outputted to the downstream sink.
IdRegistry
IdRegistry is an key-value store that maintains consistent replication across multiple datacenters. This helps with at-most-once guarantee for click event processing.
Internally, IdRegistry is built upon PaxosDB, a fault-tolerant, distributed key-value store. PaxosDB utilizes the Paxos algorithm to ensure a single master and linearizability for all updates, which are routed through the Paxos leader. For those familiar with distributed systems, this resembles Apache ZooKeeper in its function of providing consistent coordination, albeit with a raw key-value data model instead of a hierarchical file system. A key difference is that while ZooKeeper is typically confined to a datacenter or zone, IdRegistry is globally replicated. Operating Paxos at this global scale presents significant challenges, which Photon addresses through:
Server-Side Batching
Sharding
TrueTime provides guarantees on clock skew across machines, ensuring that the difference is bounded by a small delta (ϵ). An event intended for time T will certainly have occurred by T + ϵ. Consequently, when a shard change is scheduled at time T, it will be completed by T + ϵ, and all shards will use the new sharding information. This allows for consistency even during sharding updates, using the old modulo before T + ϵ and the new modulo at or after T + ϵ. This approach mirrors the strategy in Spanner for managing Paxos leader transitions.
Fault Tolerance
Let's consider potential failure scenarios to understand the design considerations for each component.
What if the Dispatcher fails before processing a click event and sending it to the Joiner?
This scenario poses no harm. Click events are not directly streamed for joining but are stored as logs. These logs are further divided into smaller files, and the Dispatcher maintains its processing state by periodically recording the file offset up to which events have been processed.
Regardless of when a Dispatcher crashes, the worst-case outcome is a restart that reprocesses click events from the beginning of the last recorded offset. However, the Dispatcher's responsibility is to ensure at-least-once processing, so this reprocessing is acceptable.
What if the Joiner fails after receiving a click from the Dispatcher?
- Before recording in IdRegistry: This is effectively a no-op. The Dispatcher will retry sending the click event to another Joiner instance.
- After recording in IdRegistry but before outputting the joined event: This is where the value for a key stored in IdRegistry becomes crucial. The IdRegistry stores a token against a click which uniquely identifies the joiner which added the event. When a Joiner restarts and receives the same click event (due to the Dispatcher's at-least-once delivery), it will attempt to record the click in IdRegistry again with the same token. IdRegistry will return success as the token would match.
What happens if a query event arrives late?
In this case, the Dispatcher stores the corresponding click events in the local Google File System. This ensures that these "orphaned" click events will eventually be processed at least once when the late query events finally arrive and are processed by a subsequent Joiner.
The Exactly-Once Semantics
The Dispatcher's at-least-once delivery combined with IdRegistry's at-most-once guarantee combined with offline processing collectively provides Photon's exactly-once processing semantics.
Misc Optimizations
Photon incorporates several key optimizations to enhance its performance.
Pruning Old Keys
As previously mentioned, a click event joins with at most one query event. Therefore, once a click event has been successfully joined, its entry in IdRegistry can be removed. However, before removal, it's crucial to ensure these click events won't reappear from the Click Logs.
To manage the size of IdRegistry, a garbage collection policy is in place, set to retain entries for N days.
Dispatcher IdRegistry Lookup
To avoid unnecessary processing, the Dispatcher performs an initial lookup in IdRegistry before forwarding a click event to a Joiner. If the click is already present, it is assumed the event has been processed.
EventStore
The EventStore implementation features a two-tiered architecture: CacheEventStore and LogsEventStore.
CacheEventStore leverages the temporal locality of events, where recent query events are likely to be associated with recent clicks. This in-memory cache, sharded by the hash of query_id, stores several minutes' worth of query events, significantly reducing disk I/O.
LogsEventStore provides persistent storage for all events in log files. It also maintains a log file map to enable rapid lookup of an event's offset within the log files based on its query_id.
Production Numbers
- Photon achieves an end-to-end latency (p90) of 7-8 seconds from event log to join completion.
- Server-side batching effectively reduces the number of transactions on PaxosDB.
- The Dispatcher's lookup on IdRegistry saves resources. Joiners have only about 5% overlap in the events they process.
- Less than 0.0001% of joined events are missed by Joiners after registering in IdRegistry.
Paper Review
Fundamentally, from an academic viewpoint, Photon's core concept is quite simple, resembling a basic stream joining system as initially described. The primary value of this paper lies in the extensive engineering work and numerous optimizations required to build and operate such a system at Google's scale. For those already familiar with the principles of stream joins, the paper should be a relatively straightforward read.
Comments
Post a Comment