Skip to main content

Paper Insights #30 - Napa: Powering Scalable Data Warehousing with Robust Query Performance at Google

Napa represents the next generation of planet-scale data warehousing at Google, following Mesa. A key system for analytics workloads, Napa stores enormous datasets for various tenants within Google. The extensive authorship of the paper underscores the collaborative effort behind its creation. This paper was presented at VLDB 2021.

Paper Link

Let's begin with some basic concepts.

Data Warehouse

Data warehouses are SQL-compatible databases designed primarily for analytical data storage. Data warehouses, like SQL databases, offer data consistency. However, they typically aren't updated in real-time. In fact, most data warehouses rely on ETL (Extract, Transform, Load) pipelines to feed data into them. 

Consider this example: a data warehouse contains two relational tables:

  • HourlyRevenue: This table stores the revenue generated from orders received each hour. ETL pipelines periodically update this table by pulling data from various sources like logs and OLTP systems. These pipelines ensure that duplicate updates are avoided. Importantly, because updates are periodic, the data warehouse might not reflect the very latest hour's revenue in real-time.


  • DailyRevenue: Similar to HourlyRevenue, this table stores daily revenue figures. To maintain consistency, these daily numbers are essentially materialized views derived from the HourlyRevenue table. There are two main approaches to materializing these views:

    • Real-time Materialization: Any update to the HourlyRevenue table immediately updates the DailyRevenue table, ensuring strong consistency between the two.
    • ETL-based Materialization: A separate ETL pipeline aggregates the hourly revenue data for each day and updates the DailyRevenue table.

It's important to note that data warehouses are not designed for transactional workloads. Furthermore, they typically store numeric data suitable for aggregation, rather than arbitrary data types like structs. For storing diverse data types, other database solutions like SQL or NoSQL databases are more appropriate.

Data Model

Let's explore how data is represented and stored within a data warehouse. You can find more details in the Mesa paper by Google.

Data is organized into tables. The tables are not strictly relational, instead, the underlying model leans towards a key-value structure. The key and value can encompass multiple columns. But crucially, value columns must be of an aggregatable type, typically numeric (such as integers or floats).

It's worth noting that data warehouses almost invariably have aggregated views built on top of their base data. Consider an hourly table with columns <Date-Hour, Revenue>. A common pattern is to have a corresponding <Date, Revenue> table. This "DateRevenue" table effectively drops the "Hour" granularity and aggregates the revenue across all hours within each day.

Such an aggregated table is called a rollup table, while the original, more granular table is referred to as the drilldown table.

To facilitate the creation of these rollup tables, the data model must define an aggregator function for each value column. This function specifies how values should be combined when aggregating across different key values.

Formally, an aggregation function F: V × V → V typically exhibits the following properties:
  • Associative: F(v0, F(v1,v2)) = F(F(v0, v1), v2)
  • Commutative: F(v0, v1) = F(v1, v0)
This concept extends to tables with multiple value columns. Consequently, for a table with N value columns, we would have N corresponding aggregate functions.

Differential Storage Scheme

The core principle behind analytical data storage is often to record data as a sequence of changes, or deltas.

For instance, imagine an ETL pipeline runs and captures the revenue for hour X. Let's say the first run records X -> $100. When the ETL pipeline runs again, it detects new orders (not present in the previous run) that contribute to the revenue for hour X. This second run now reports X -> $150.

Instead of simply overwriting the previous value, the data can be stored as a series of deltas: 

X -> +$100, +$50

This method of storage is known as a differential storage scheme.

Deltas

A delta is essentially a file that contains keys along with the corresponding change in value for each column.

Each time an ETL pipeline runs to bring data into the data warehouse, it generates a new delta. This newly created delta is then applied to both the base table and any associated materialized views.


The entire analytical database is therefore built upon a sequence of these deltas. These deltas are often organized in a structure similar to a Log-Structured Merge (LSM) tree. This involves a hierarchy of delta files, where higher levels contain progressively larger deltas formed by merging smaller deltas from lower levels. The merging process for each key utilizes the aggregator functions defined earlier. The largest, merged deltas residing at the bottommost level are referred to as base deltas.

A significant advantage of this delta-based approach is the ability to execute a query against any prefix of the delta sequence. This will yield a consistent snapshot of the data as it existed at that specific point in time.

Each table independently maintains its data deltas. Consequently, within the database, every table has its own LSM tree structure. Collectively, these individual LSM trees constitute what can be described as a log-structured merge forest.

Physical Format

Delta files are typically organized in a columnar format. Within a delta file, the keys are segmented into blocks. Each of these blocks is then stored column by column, often employing compression techniques to reduce storage space.

Alongside each delta data file, an index file is maintained. This index provides pointers to the different blocks within the corresponding data file. Furthermore, to accelerate query performance, each data file block also stores the actual keys present within it.


The lookup process then involves:

  1. Performing a binary search on the index file to locate the relevant block.
  2. Conducting a binary search within the key list of the identified data file block.

Consistency

Applying incremental updates (deltas) to base tables and their associated materialized views can occur asynchronously. Tables might be updated before their corresponding roll-up views, and in sharded data warehouses replicated across clusters, updates can arrive at different shards independently and out of logical order.

Despite this potential for temporary inconsistencies, it's crucial that queries return consistent results. For instance, the total daily revenue calculated from an hourly table must always align with the revenue recorded in the daily summary table, even though the daily table is updated offline periodically.

A straightforward approach to ensure this consistency is to globally order all delta applications across the database. By restricting queries to the point where all relevant deltas have been applied across all shards, tables, and views, we can achieve consistency. This simplification is viable because data warehouses prioritize eventual consistency over real-time updates, which significantly eases their design compared to distributed SQL databases.

In the example above, the consistent data points are queryable. Conversely, the inconsistent point, lacking the necessary updates for the corresponding hourly table view (the daily rollup table), cannot be queried.

Napa

Napa, Google's next-generation, large-scale data warehouse, builds upon the foundation of Mesa. Its architecture divides data processing into distinct stages: 



This decoupling is strategically designed to optimize for a primary objective: achieving low-latency and consistent query performance.

The ingestion phase in Napa prioritizes high throughput, generating data deltas rapidly. However, unprocessed deltas older than a threshold become inefficient and significantly increase query costs.

Ultimately, Napa aims to strike a balance between three crucial factors:

  1. Data Freshness: How quickly updates become available for querying.
  2. Query Latency: The speed at which queries are executed and results are returned.
  3. Cost: The resources expended to ensure timely data updates.

Achieving all three simultaneously – high freshness, low query latency, and low cost – is inherently challenging. The trade-offs are as follows:

  • High Freshness + Low Cost = High Query Latency: Prioritizing immediate data availability with limited resources leads to slower query execution.
  • Low Cost + Low Query Latency = Low Freshness: Focusing on efficient querying with minimal expenditure necessitates delaying data updates.
  • High Freshness + Low Query Latency = High Cost: Delivering real-time data with fast query responses requires significant resource investment.

Sacrificing data freshness allows for low-latency and cost-effective querying. Traditional data warehouses, including Mesa, exemplify this by batching updates and applying them periodically (e.g., hourly). This consolidation optimizes data for querying; for instance, merging all deltas for a table reduces the need to access multiple delta files during lookups. Consequently, updates are intentionally delayed to enhance query performance and minimize costs.

Conversely, prioritizing high ingestion rates can compromise query performance. The ETL pipeline operates at a high frequency, results in queries needing to process and combine data from numerous deltas, thus increasing latency.

The paper also suggests that data warehouses might opt to sacrifice consistency. However, this assertion is questionable, particularly considering that many applications query multiple tables concurrently, making data consistency a critical requirement.

Queryable Timestamp

The queryable timestamp (QT) marks the point in time up to which all data deltas for a table have been consistently applied and are ready for querying. In essence, QT signifies the data freshness available for querying on a given table, even if it's distributed across multiple storage replicas.

The freshness of a table can be quantified as the duration between the current time and the queryable timestamp: Now() - QT.

The QT value advances from a previous point (X) to a new point (Y) when the data ingested within the time interval (Y - X) has undergone optimization to meet the desired query performance standards.

Many client queries come with specific latency requirements, which, in turn, restrict the number of data deltas that can be accessed and merged to generate a response within the acceptable timeframe. Considering these constraints, the QT represents the most recent delta such that querying all deltas from the oldest up to this point will adhere to the stipulated latency limits and complete within the expected duration.



It's important to note that a QT is maintained individually for each table. The QT of an entire database (when consistency across all its tables is a prerequisite) is determined by the minimum QT value among all the tables within that database.

Architecture

Napa is a distributed data warehouse; indeed, the immense scale required to serve Google necessitates a distributed architecture.

Napa's architecture is logically separated into two distinct planes: the data plane, which handles the storage of the actual data, and the control plane, responsible for managing the associated metadata. 

Ingestion servers, residing within the data plane, receive data from ETL pipelines. This incoming data is then transformed into deltas and persisted on Colossus (formerly the Google File System). 

Once data ingestion is complete, the queryable timestamp (QT) for the affected table is advanced in the control plane. This crucial QT metadata is durably stored in Spanner, Google's globally consistent distributed SQL database. 

When a query engine receives a query, it first consults the control plane to determine the optimal QT. This QT represents the most recent point in time up to which data can be consistently read from the database while adhering to the query's latency requirements. Subsequently, the query engine utilizes the data deltas up to this determined QT to execute the query.


Storage Structure

The underlying storage structure mirrors that of Mesa. Data for each table is organized as a collection of deltas, and these deltas across all tables in a database collectively form an LSM forest.

The primary storage format for these deltas is columnar, where retrieving each column necessitates a separate I/O operation. For smaller deltas, an optimized format called PAX (Portable Archives) is utilized, enabling the entire record to be read with a single I/O operation.

View Maintenance

From here on, the paper becomes very detailed. There are a lot of details mentioned, specially related to various levels of optimizations performed by Napa to achieve performance. Some of them would take a database expert to understand.

When a table is updated, all downstream views are updated asynchrnously via F1 query engine. Note that napa views may be joins of multiple upstream tables as well. Because QT of database only moves when QT of all dereived tables and views move, it is essential to have an efficient pipeline for 

Data Skew - Note that the parent table is usually sharded by the key or key-prefix of the table. However, views are usually reduced keyspace (by dropping certain key columns and then aggregating values across value columns). It might be possible that the reduced keyspace for views are skewed.

For instance consider (A, B, C, D) was the key for parent table and (B) was the key for a view on top of it. Say the sharding of the base table is based on the key prefix <A>. Suppose among all values of B, we have most entries in parent table with B = 1 or B = 2. Then rest, all other values are not frequent. Then the views will face hotspotting on shards were B = 1 or B = 2.

View Sort Order

When a view is materialized.

There are three cases:

1. The view's key is a prefix of base table's key. Say, (A, B) and (A, B, C, D). No sorting required in this case. 
2. The view's key is a partial prefix of base table's key. Say (A, B, D) and (A, B, C, D). (A, B) sorting is not required. However, D needs to be sorted before it can be redcued. But this is still not a large enough skew.
3. The view's key doesnt share any prefix. Say (D, C, A) and (A, B, C, D). Large amount of soritng is required.

Let's try to understand why sorting is required in case 3 with an example.

Say (A, B, C, B) -> (D, C, A), i.e. aggregating on row B.

Let's say the data is - 

1, 1, 1, 1 -> (1,1,1)
1, 1, 1, 2 -> (2,1,1)
1, 1, 1, 3 -> (3,1,1)
....
1, 2, 1, 1 -> (1,1,1) <-- This key was seen before.
....

As a result the view needs to be sorted before it can be constructud.

For fast sorting, the paper mentions several techniques which are beyond the scope of my current comprehension.

Compaction


The compaction process takes N deltas as input and merges them together. Essentially, it runs a N-way merge on those deltas. As soon as one of the deltas (among N) is exhausted, i.e., all keys are merged out of it, then the merge process halts. 

Query Serving

There are several several optimizations that Napa performs for fast query. Let's go through them one by one, lathough, it would really take a database experts to understand all parts of it.

1. Napa uses views instead of base table to answer queries whenever possible. 
2. F1 query pushes down all query filters down to Delta servers to minimize the amount of bytes transferred over the network.
3. Napa maintains a B-tree index on the data. This can be used to prune out the deltas where the keys are not expected to be found.
4. Note that, every query need to essentially look up QT in the control plane. This QT lookup may have been like running a snapshot transaction on Spanner to read the latest value. However, to speed things up, the QT value is only periodiclaly fetched and stored on a distributed metadata cache. There will be some delays as the latest QT won't be known.
5. All data reads go through a distributed read-through cache. This helps in sharing some of the reads for the same query. For example, different subqueries for a query (for example, when a table is joining on itself) may read the same index files for deltas.
6. There is yet another distributed caching layer. The one before was to save repeated reads of the same fiel blocks. This one is used to prefetch blocks into cache. This can be offline or online. The offline prefetching essentially fetches the blocks as soon as QT is advanced. This is apploed for frequently queried table only. The online prefetching is helpful when another query exeuction may have fetched some data for main query executor to use.
7. When a query is to be run, then a delta server reads through multiple deltas and then merges the result from all the deltas before responding back. 
8. The file format is mostly columnar but also PAX to speed up lookup for small files.
9. Misc - Read hedhging. It is popular technique when there are multiple repliacas which have the same data. One sends a request to one replica and then hedges the request with another requet to a different replica when it observes longer than usual response time from the former one.

Production Metrics

Paper Review

The paper presents a wealth of information, and I must admit that certain concepts are likely more accessible to database domain experts. As someone less familiar with the intricacies of this field, some sections were not entirely clear. Additionally, the production metrics, while present, felt somewhat less insightful than I had hoped for.

Despite these points, both this paper and its predecessor on Mesa offer a comprehensive introduction to the inner workings of data warehouses. For those seeking to deepen their understanding, I would also recommend exploring The Snowflake Elastic Data Warehouse. After grasping the concepts presented in these two papers, the Snowflake architecture should be relatively straightforward to comprehend.

Comments

Popular Posts

Paper Insights #25 - CliqueMap: Productionizing an RMA-Based Distributed Caching System

Memcached is a popular in-memory cache, but I'd like to discuss CliqueMap, Google's caching solution. Having worked closely with CliqueMap, I have a deep understanding of its architecture. One major difference from Memcached is CliqueMap's use of RMA for reads. We'll also take a closer look at RDMA, a crucial cloud technology that emerged in the 2010s. Paper Link Let's begin with some basic concepts. Network Interface Card (NIC) The NIC facilitates data reception and transmission. Understanding its operation requires examining the fundamental interaction between the CPU and memory. CPU <-> Memory Communication In a Von Neumann Architecture , the CPU and memory are core components, enabling Turing computation. Their communication relies on the system bus (e.g. PCIe ), a set of electrical pathways connecting the CPU, memory, and I/O devices. The system bus comprises three primary logical components: Data Bus : Bidirectional, carrying the actual data being tran...

Paper Insights #26 - Don't Settle for Eventual: Scalable Causal Consistency for Wide-Area Storage with COPS

This work provides a strong foundation for understanding causality , both within distributed systems and more broadly. Its principles underpin systems achieving causal consistency, a powerful form of consistency that ensures high availability. Presented at SOSP 2011, this paper features contributions from prominent distributed systems researchers Wyatt Lloyd and Michael Freedman . Paper Link Let's begin with some basic concepts. Causal Ordering In 1978, Leslie Lamport published Time, Clocks, and the Ordering of Events in a Distributed System , a seminal paper that significantly impacted distributed system design. This work, alongside Paxos and TLA+ , stands as one of Lamport's most influential contributions. A fundamental challenge in distributed systems is clock synchronization . Perfect synchronization is unattainable, a fact rooted in both computer science and physics. However, the goal isn't perfect synchronization itself, but rather the ability to totally order even...

Paper Insights #24 - Spanner: Google's Globally-Distributed Database

This landmark paper, presented at ODSI '12, has become one of Google's most significant contributions to distributed computing. It didn't solve the long-standing core problem of scalability of 2PC in distributed systems, rather, it introduced  TrueTime  that revolutionized system assumptions. Authored by J.C. Corbett , with contributions from pioneers like Jeff Dean and Sanjay Ghemawat , this paper effectively ended my exploration of distributed SQL databases. It represents the leading edge of the field. Paper Link I would highly recommend reading the following before jumping into this article: 1.  Practical Uses of Synchronized Clocks in Distributed Systems where I introduced why clock synchronization is necessary but not sufficient for external consistency. 2.  A New Presumed Commit Optimization for Two Phase Commit where I introduced two-phase commits (2PC) and how it is solved in a distributed system. 3.  Amazon Aurora: Design Considerations for High Th...