Skip to main content

Paper Insights #32 - Napa: Powering Scalable Data Warehousing with Robust Query Performance at Google

Napa represents the next generation of planet-scale data warehousing at Google, following Mesa. Napa is a key system for analytics workloads that stores enormous datasets for various tenants within Google. The extensive authorship of the paper underscores the collaborative effort behind its creation. This paper was presented at VLDB 2021.

Paper Link

Let's begin with some basic concepts.

Data Warehouse

Data warehouses are SQL-compatible databases designed primarily for analytical data storage. Data warehouses, like SQL databases, offer data consistency. However, they typically aren't updated in real-time. In fact, most data warehouses rely on ETL (Extract, Transform, Load) pipelines to feed data into them. 

Consider the following example of a data warehouse that contains two relational tables:

  • HourlyRevenue: This table stores the revenue generated from orders received each hour. ETL pipelines periodically update this table by pulling data from various sources like logs and OLTP databases. These pipelines tail the sources such that it ensures that duplicate updates are avoided. Importantly, because updates are periodic, the data warehouse might not reflect the very latest hour's revenue in real-time.


  • DailyRevenue: Similar to HourlyRevenue, this table stores daily revenue figures. To maintain consistency, these daily numbers are essentially materialized views derived from the HourlyRevenue table. There are two main approaches to materializing these views:

    • Real-time Materialization: Any update to the HourlyRevenue table immediately updates the DailyRevenue table, ensuring strong consistency between the two. This technique is rare as it can slow down the ingestion pipeline.
    • ETL-based Materialization: A separate ETL pipeline aggregates the hourly revenue data for each day and updates the DailyRevenue table.

Note that data warehouses are designed to typically store numeric data suitable for aggregation, rather than arbitrary data types like structs. For storing diverse data types, other database solutions like SQL or NoSQL databases are more appropriate.

Data Model

Let's explore how data is represented and stored within a data warehouse. You can find more details in the Mesa paper by Google. Here, I am summarizing a few crucial points.

Data is organized into tables. The tables are not strictly relational, instead, the underlying model leans towards a key-value structure. Both key and value can encompass multiple columns. But crucially, value columns must be of an aggregatable type, typically numeric (such as integers or floats).

Data warehouses almost invariably have aggregated views built on top of some base tables. Consider an hourly table with columns <Date-Hour, Revenue>. A common pattern is to have a corresponding <Date, Revenue> table. This daily revenue table effectively drops the "Hour" granularity and aggregates the revenue across all hours within each day. Such an aggregated table is called a rollup table, while the original, more granular table is referred to as the drilldown table.

To facilitate the creation of these rollup tables, the data model must define an aggregator function for each value column. This function specifies how values should be combined when aggregating across different key values.

Formally, an aggregation function F: V × V → V typically exhibits the following properties:
  • Associative: F(v0, F(v1, v2)) = F(F(v0, v1), v2)
  • Commutative: F(v0, v1) = F(v1, v0)
This concept extends to tables with multiple value columns. Consequently, for a table with N value columns, we would have N corresponding aggregate functions.

Differential Storage Scheme

The core principle behind analytical data storage is often to record data as a sequence of changes, or deltas.

For instance, imagine an ETL pipeline runs and captures the revenue for hour X. Let's say the first run records X -> $100. When the ETL pipeline runs again, it detects new orders (not present in the previous run) that contribute to the revenue for hour X. This second run now reports X -> $150.

Instead of simply overwriting the previous value, the data can be stored as a series of deltas: 

X -> +$100, +$50

This method of storage is known as a differential storage scheme.

Deltas

A delta is essentially a file that contains keys along with the corresponding change in value for each column.

Each time an ETL pipeline runs to bring data into the data warehouse, it generates a new delta. This newly created delta is then applied to both the base table and any associated materialized views.


The entire analytical database is therefore built upon a sequence of these deltas. These deltas are often organized in a structure similar to a Log-Structured Merge (LSM) tree. This involves a hierarchy of delta files, where higher levels contain progressively larger deltas formed by merging smaller deltas from lower levels. The merging process for each key utilizes the aggregator functions defined earlier. The largest, merged delta residing at the bottommost level is referred to as base delta/table.

A significant advantage of this delta-based approach is the ability to execute a query against any prefix of the delta sequence. This will yield a consistent snapshot of the data as it existed at that specific point in time.

Each table independently maintains its data deltas. Consequently, within the database, every table has its own LSM tree structure. Collectively, these individual LSM trees constitute what can be described as a log-structured merge forest.

Physical Format

Delta files are typically organized in a columnar format. Within a delta file, the keys are segmented into blocks. Each of these blocks is then stored column by column, often employing compression techniques to reduce storage space.

Alongside each delta data file, an index file is maintained. This index provides pointers to the different blocks within the corresponding data file. Furthermore, to accelerate query performance, each data file block also stores the actual keys present within it.

The lookup process then involves:

  1. Performing a binary search on the index file to locate the relevant block.
  2. Conducting a binary search within the key list of the identified data file block.

Consistency

Applying incremental updates (deltas) to base tables and their associated materialized views can occur asynchronously. Tables might be updated before their corresponding roll-up views. Additionally, in sharded data warehouses replicated across clusters, updates can arrive at different shards independently and out of logical order.

Despite this potential for temporary inconsistencies, it's crucial that queries return consistent results. For instance, the total daily revenue calculated from an hourly table must always align with the revenue recorded in the daily summary table, even though the daily table is updated offline periodically.

A straightforward approach to ensure this consistency is to restrict queries to the point where all relevant deltas have been applied across all shards, tables, and views. This simplification is viable because data warehouses do not need real-time updates, which significantly eases their design compared to distributed SQL databases.

In the example above, the consistent data points are queryable. Conversely, the inconsistent point, lacking the necessary updates for the corresponding daily table view, cannot be queried.

Napa

Napa, Google's next-generation, large-scale data warehouse, builds upon the foundation of Mesa. Its architecture divides data processing into distinct stages: 



This decoupling is strategically designed to optimize for a primary objective: achieving low-latency and consistent query performance.

The ingestion phase in Napa prioritizes high throughput, generating data deltas rapidly. However, unmerged deltas become inefficient and significantly increase query costs.

Ultimately, Napa aims to strike a balance between three crucial factors:

  1. Data Freshness: How quickly updates become available for querying.
  2. Query Latency: The speed at which queries are executed and results are returned.
  3. Cost: The resources expended to ensure timely data updates.

Achieving all three simultaneously – high freshness, low query latency, and low cost – is inherently challenging. The trade-offs are as follows:

  • High Freshness + Low Cost = High Query Latency: Prioritizing immediate data availability with limited resources leads to slower query execution.
  • Low Cost + Low Query Latency = Low Freshness: Focusing on efficient querying with minimal expenditure necessitates delaying data updates.
  • High Freshness + Low Query Latency = High Cost: Delivering real-time data with fast query responses requires significant resource investment.

Sacrificing data freshness allows for low-latency and cost-effective querying. Traditional data warehouses, including Mesa, exemplify this by batching updates and applying them periodically (e.g., hourly). This consolidation optimizes data for querying; for instance, merging all deltas for a table reduces the need to access multiple delta files during lookups. Consequently, updates are intentionally delayed to enhance query performance and minimize costs.

Conversely, prioritizing high ingestion rates can compromise query performance. The ETL pipeline operates at a high frequency, results in queries needing to process and combine data from numerous deltas, thus increasing latency.

The paper also suggests that data warehouses might opt to sacrifice consistency. However, this assertion is questionable, particularly considering that many applications query multiple tables concurrently, making data consistency a critical requirement.

Queryable Timestamp

The queryable timestamp (QT) marks the point in time up to which all data deltas for a table have been consistently applied and are ready for querying. In essence, QT signifies the data freshness available for querying on a given table, even if it's distributed across multiple storage replicas.

The freshness of a table can be quantified as the duration between the current time and the queryable timestamp: Now() - QT.

The QT value advances from a previous point (X) to a new point (Y) when the data ingested within the time interval (Y - X) has undergone optimization to meet the desired query performance standards.

Many client queries come with specific latency requirements, which, in turn, restrict the number of data deltas that can be accessed and merged to generate a response within the acceptable timeframe. Considering these constraints, the QT represents the most recent delta such that querying all deltas from the oldest up to this point will adhere to the stipulated latency limits and complete within the expected duration.


It's important to note that a QT is maintained individually for each table. The QT of an entire database (when consistency across all its tables is a prerequisite) is determined by the minimum QT value among all the tables within that database.

Architecture

Napa is a distributed data warehouse; indeed, the immense scale required to serve Google necessitates a distributed architecture.

Napa's architecture is logically separated into two distinct planes: the data plane, which handles the storage and retrieval of the actual data, and the control plane, responsible for managing the associated metadata, essentially QT. 

Ingestion servers, residing within the data plane, receive data from ETL pipelines. This incoming data is then transformed into deltas and persisted on Colossus (formerly the Google File System). 

Once data ingestion is complete, the QT for the affected table is advanced in the control plane. This crucial QT metadata is durably stored in Spanner, Google's globally consistent distributed SQL database. 

When the F1 query engine receives a query, it first consults the control plane to determine the optimal QT. This QT represents the most recent point in time up to which data can be consistently read from the database while adhering to the query's latency requirements. Subsequently, the query engine sends the request to delta servers that utilize the data deltas up to this determined QT to execute the query.


Storage Structure

The underlying storage structure mirrors that of Mesa. Data for each table is organized as a collection of deltas, and these deltas across all tables in a database collectively form an LSM forest.

The primary storage format for these deltas is columnar, where retrieving each column necessitates a separate I/O operation. For smaller deltas, an optimized format called PAX (Portable Archives) is utilized, enabling all records to be read with a single I/O operation.

View Maintenance

From this point forward, the paper delves into significant technical detail, particularly concerning Napa's multi-layered optimizations for achieving high performance. Understanding some of these intricate strategies likely requires expertise in database internals.

Upon a table update, downstream views are asynchronously refreshed using the F1 query engine. It's important to note that Napa views can be formed by joining multiple upstream tables. Because a database's QT only advances when the QTs of all its derived tables and views also advance, an efficient pipeline is crucial for maintaining data consistency.

Data Skew

Parent tables are typically sharded based on their key or a key prefix. However, views often operate on a reduced keyspace due to the exclusion of certain key columns and the aggregation of value columns. This reduction can lead to data skew in the views.

Consider a parent table with the key columns <A, B, C, D> and a view with the key <B>. If the parent table's sharding is based on the prefix <A>, and the majority of entries have B = 1 or B = 2, then the view will likely experience hotspotting on the shards corresponding to these frequent B values.

View Sort Order

The materialization of a view requires different sorting strategies depending on the relationship between the view's key and the base table's key:
  • Prefix Key: If the view's key is a prefix of the base table's key (e.g., base key <A, B, C, D>, view key <A, B>), no sorting is necessary.
  • Partial Prefix Key: If the view's key is a partial prefix (e.g., view key base key <A, B, C, D><A, B, D>), the non-prefix components of the view's key (i.e., D) need to be sorted before reduction. However, this scenario might not introduce significant sorting requirements.
  • Non-Prefix Key: If the view's key shares no prefix with the base table's key (e.g., base key <A, B, C, D>, view key <D, C, A>), substantial sorting is required.
To understand the necessity of sorting in the non-prefix case, consider the transformation <A, B, C, D> -> <D, C, A> where aggregation occurs on B. Consider the case of following base table keys and their transformation:

<1, 1, 1, 1>  ->  <1, 1, 1>
<1, 1, 1, 2>  ->  <2, 1, 1>
<1, 1, 1, 3>  ->  <3, 1, 1>
...
<1, 2, 1, 1>  ->  <1, 1, 1>  
...

The key <1, 1, 1> appears multiple times after transformation, potentially, in different shard locations. Consequently, the view must be sorted based on its key <D, C, A> before it can be correctly constructed aggregated. The paper further details several techniques for efficient sorting in such scenarios, the specifics of which are beyond the scope of my current understanding!

Compaction

The compaction process merges N input deltas through an N-way merge. The merging operation stops as soon as any one of the N deltas is fully processed (i.e., all its keys have been merged). The rationale behind halting the entire merge when a single delta is exhausted is unclear to me, and this is an area where I hoped the paper would offer further explanation.

Query Serving

Napa employs several key optimizations to accelerate query performance. A comprehensive understanding of each might require database expertise. Let's examine them individually:

View-Based Querying

Napa preferentially uses pre-computed views over base tables to serve query results whenever feasible. This can significantly reduce the computational overhead of query execution.

Filter Pushdown

The F1 query engine pushes query filters down to the Delta servers. This minimizes the volume of data transferred across the network, improving efficiency.

B-tree Indexing for Delta Pruning

Napa maintains a B-tree index on the data stored in deltas. This index allows the system to efficiently identify and skip deltas that are unlikely to contain the requested keys, reducing the amount of data that needs to be scanned.

Optimized QT Lookup

Every query requires looking up the current QT in the control plane. Usually, this involves a snapshot read on Spanner for the latest value. Napa optimizes this by periodically fetching the QT and storing it in a distributed metadata cache. This approach introduces a degree of staleness, as the cached QT might not always reflect the absolute latest state but speeds up query processing.

Distributed Read-Through Cache

All data reads pass through a distributed read-through cache. This cache helps share read operations for the same data blocks across different parts of a query. For instance, in self-joins or queries with subqueries, multiple components might access the same index files in deltas, benefiting from the cached data.

Prefetching Cache Layer

Napa utilizes another distributed caching layer focused on prefetching data blocks. This prefetching can be either offline or online.
  • Offline Prefetching: For frequently queried tables, blocks are proactively fetched into the cache as soon as the QT advances.
  • Online Prefetching: If a shadow query execution has already fetched relevant data blocks, the main query executor can leverage this cached data. The introduction of shadow query execution lacks clarity and detail.

Lazy Merging Across Deltas

When a query is executed, a Delta server reads data from multiple relevant deltas and merges the results before returning the response to the client. This process is described as optimized; however, the mechanism of this optimization is unclear, and the explanation is somewhat vague.

From what I understand, they are likely generating subqueries from the main query such that each involved delta server reads only one delta.

Hybrid File Format (Columnar and PAX)

Napa's file format is primarily columnar for efficient analytical processing. However, it also incorporates PAX for faster lookups in small files.

Read Hedging

To mitigate the impact of slow replicas, Napa employs read hedging. If a response from an initial replica takes longer than expected, a parallel request is sent to another replica containing the same data. The fastest response is then used.

Production Metrics

The paper dedicates a significant portion to design details, leaving insufficient space for a thorough evaluation of the system. The evaluation section primarily presents a limited set of production metrics:
  • Query latency decreases with an increasing number of materialized views. This is anticipated, as materialized views enable queries to avoid costly operations like joins or sorts on frequently accessed data paths.
  • Query latency increases with a greater number of deltas involved in a query. This is attributed to the increased I/O and merging operations required by the delta servers.
  • When the view maintenance pipeline experienced issues, query latency remained unaffected, although data freshness was compromised. This stability was due to the use of QT for query execution.
  • Napa finds application across various domains, including:
    • Internal experimentation and analysis clients, which prioritize query performance and low cost over strict data freshness.
    • Another application type that prioritizes lower resource costs, accepting higher query latency as a trade-off.
    • External, user-facing applications that prioritize both high data freshness and high query performance, incurring higher costs to achieve this.

    Paper Review

    The paper presents a wealth of information, and I must admit that certain concepts are likely more accessible to database domain experts. As someone less familiar with the intricacies of this field, some sections were not entirely clear. Additionally, the production metrics, while present, felt somewhat less insightful than I had hoped for.

    Despite these points, both this paper and its predecessor on Mesa offer a comprehensive introduction to the inner workings of data warehouses. For those seeking to deepen their understanding, I would also recommend exploring The Snowflake Elastic Data Warehouse. After grasping the concepts presented in these two papers, the Snowflake architecture should be relatively straightforward to comprehend.

    Comments

    Popular Posts

    Paper Insights #26 - CliqueMap: Productionizing an RMA-Based Distributed Caching System

    Memcached is a popular in-memory cache, but I'd like to discuss CliqueMap, Google's caching solution. Having worked closely with CliqueMap, I have a deep understanding of its architecture. One major difference from Memcached is CliqueMap's use of RMA for reads. We'll also take a closer look at RDMA, a crucial cloud technology that emerged in the 2010s.

    Paper Insights #27 - Don't Settle for Eventual: Scalable Causal Consistency for Wide-Area Storage with COPS

    This work provides a strong foundation for understanding causality , both within distributed systems and more broadly. Its principles underpin systems achieving causal consistency, a powerful form of consistency that ensures high availability. Presented at SOSP 2011, this paper features contributions from prominent distributed systems researchers Wyatt Lloyd and Michael Freedman .

    Paper Insights #24 - Spanner: Google's Globally-Distributed Database

    This landmark paper, presented at ODSI '12, has become one of Google's most significant contributions to distributed computing. It didn't solve the long-standing core problem of scalability of 2PC in distributed systems, rather, it introduced  TrueTime  that revolutionized system assumptions. Authored by J.C. Corbett , with contributions from pioneers like Jeff Dean and Sanjay Ghemawat , this paper effectively ended my exploration of distributed SQL databases. It represents the leading edge of the field.