Skip to main content

Paper Insights #31 - F1 Query: Declarative Querying at Scale

We shift our focus from databases to a query engine. Google presented this paper at VLDB, the premier global database conference, in 2018. Notably, this paper has a number of authors and is incredibly dense. With so many parts, the paper only provides a high-level idea of its different components.



Let's begin with some basic concepts.

Query Engine

SQL (Structured Query Language) stands as the standard formal language for data extraction and processing. A query engine, then, is the system that executes these SQL queries.

Query Engines vs. SQL Databases

SQL databases manage the storage of data in a relational format and handle transactions on that data. They inherently provide atomicity, concurrency control, durability, and transaction isolation. In contrast, a query engine's role is to extract a view of the data from a SQL database based on the user's formal query.

In traditional, standalone databases, the query engine is integrated directly within the SQL database itself. For instance, in PostgreSQL, the query parser, planner, and executor are all components of the core database system.

However, distributed SQL databases often separate the query engine into a distinct layer. In this architecture, query engines operate as independent units that can interface with various underlying SQL databases. These databases can be OLTP (like Spanner or Aurora) or OLAP (such as Napa/Mesa or Snowflake). Query engines can also be used with NoSQL databases.

Examples of Query Engines

Several query engines exists, many offered through cloud:

  • Dremel: A pioneering query engine developed by Google, which later formed the foundation for Google Cloud's BigQuery.
  • F1 Query: The successor to Dremel, currently in use within Google.
  • Apache Hive: Originally developed by Facebook for querying and managing large datasets residing in distributed storage.
  • Amazon Athena: Interactive query service provided by Amazon Web Services that enables querying data in Amazon S3 using standard SQL.
  • Apache Drill: An open-source, schema-free SQL query engine for Big Data exploration.

Core Components of Query Engines

While the specific architectures of query engines vary based on their intended use, they generally share the following high-level components:
  • Executor (Driver): It parses and analyzes the incoming query, optimizes it, breaks it down into executable tasks (often for a parallel processing framework like MapReduce), and assigns these tasks to available workers.
  • Workers: These are the processes responsible for carrying out the tasks required by a query.
  • Metadata Store: This component holds crucial information such as the location of data, access methods, and user-defined functions (UDFs).
Let's illustrate the components with an example query:

SELECT Department, COUNT(*)
FROM Professors
WHERE STARTS_WITH(Name, "s")
GROUP BY Department;

This query aims to count the number of professors whose names begin with "s", grouped by their department. Here STARTS_WITH is a UDF.

Consider a scenario where the underlying Professors table is sharded across multiple database instances, with each instance potentially holding data for different departments. The metadata store provides information about the shard locations of the Professors table. The metadata store also provides the executor with the definition of the UDF.

Upon receiving this query, the executor optimizes and translates the SQL query into a MapReduce pipeline. The executor would then generate map tasks that operate on the shards. It's even possible to have multiple map tasks for the same department if its corresponding data slice is large.

The map function for this query might look conceptually like this:

map(Professor p):
  if STARTS_WITH(p.Name, "s"):
    emit (p.Department, 1)

Finally, a reducer would aggregate the results from all the map tasks, performing a count for each department.


Read Consistency

Regardless of whether you're using a SQL or NoSQL database, query engines themselves cannot guarantee a consistent read without support from the underlying data store.

Consider our previous example to illustrate this potential issue. When a query is executed across multiple workers, each worker might begin processing at different times. This can lead to inconsistencies. For instance, one mapper might read its data instance before certain transactions are committed, while another mapper reads its instance after those same transactions.

Let's say the first mapper reads the first instance for Computer Science department. Subsequently, a transaction adds a new row to the first instance for the Computer Science department with a professor whose name begins with 's'. Simultaneously, another transaction adds a row to the second instance for the Mechanical department, also with a professor whose name begins with 's'. Finally, the mapper reading from the second instance performs its read.

The problem here is that the first mapper's read doesn't reflect either of the new transactions, while the second mapper's read includes both the transaction. This results in an inconsistent view of the data.

To achieve consistent reads, the underlying data store needs to be versioned and multi-row transactional. Google Spanner is an example of such a database. It globally orders all transactions in real-time. Consequently, when a query is executed in Spanner, all reads occur at a specific point in time, providing a consistent snapshot of the data (a.k.a snapshot isolation). Even with concurrent modifications, the data returned by the query will represent the state of the database at the chosen query time.

SQL Joins

A frequent operation in data processing involves combining two relations based on a related attribute, followed by further processing of the resulting dataset. Joins are therefore a fundamental operation in query engines.

Query engines employ several strategies to perform join operations, each optimized for different data characteristics.

Lookup Joins

These are particularly effective when joining a small table with a significantly larger one. The query engine reads the entirety of the smaller table and then performs targeted lookups in the larger table for matching rows based on the join condition.


Note: For efficient lookup joins, the larger table must be indexed on the columns specified in the join condition. This allows for rapid retrieval of matching rows.

Hash Joins

Hash joins are a common and versatile join strategy that proceeds in the following steps:

  • The query engine reads both tables and identifies the smaller one.
  • An in-memory hash table is constructed using the join key(s) from the smaller table. If the hash table exceeds available memory, portions of it may be spilled to disk.
  • The larger table is then scanned row by row. For each row in the larger table, the join key(s) are used to probe the hash table of the smaller table to find matching rows.

Merge Joins

Merge joins are efficient when both tables involved in the join are already sorted (or have indexes that enforce an ordering) on the columns participating in the equality join condition. The algorithm operates as follows (analogous to merge step in merge sort):

  • The query engine verifies the existence of suitable indexes on the equality columns of both tables.
  • The rows of both tables are then merged based on the sorted order of the join columns.
  • For each match found in the join columns, the corresponding rows are combined in the result. If there are multiple matching rows in either table for a given join key, a cartesian product of those matching rows is generated.
  • If the join column values do not match, the row with the smaller value is discarded, and the next row from that table is considered. This process continues until all rows from both tables have been processed.


Note: The examples provided above illustrate the concept of an inner join. The underlying principles remain consistent across other join types like outer, left outer, and right outer joins. The key difference lies in how non-matching rows are handled. In contrast to inner joins, these other join types preserve non-matching rows from one or both of the tables involved in the join.

View Materialization

Once a query is executed, the resulting data is typically sent back to the client. However, query engines also often have the capability to materialize these results into a persistent table stored within the same database as the original data.

These materialized tables, commonly known as materialized views, offer significant performance benefits for subsequent queries that can leverage the pre-computed data. 

A prime use case is the creation of roll-up tables in analytical databases. By materializing aggregated data into these roll-up tables, future analytical queries against this summarized information execute much faster.




Another application arises when dealing with complex queries involving joins across multiple tables, which can be computationally expensive to run repeatedly. In such scenarios, materializing the result of the join into a physical table can reduce the execution time for subsequent identical or similar queries.

A key drawback is the need to refresh the materialized table whenever the underlying source data is updated. This requires re-executing the defining query to bring the materialized view in sync with the latest changes. Consequently, materialization is most beneficial for data that is relatively static (like those in data warehouses).

Query Optimization

Query optimizers employ several techniques to enhance query execution efficiency. While the intricate details of these optimizations are beyond our current scope, let's explore some key strategies:

Filter Pushdown

This optimization involves relocating filter conditions earlier in the query execution plan, ideally to the data source itself.

Example: Consider the query:

SELECT * FROM Customers WHERE City = "New York";

Without filter pushdown, the query engine would retrieve all customer records and then filter them at the client to identify those in New York. This approach can be inefficient, especially in distributed environments. Filter pushdown optimizes this by sending the WHERE City = "New York" condition directly to the data source. Consequently, when a worker reads from a database instance, it only receives the rows matching the filter.

Constant Folding

This technique involves evaluating constant expressions during the compilation phase rather than at runtime.

Example: The query:

SELECT 2 * 3 + 6;

is optimized to:

SELECT 12;

Outer Join Narrowing

This optimization aims to reduce the number of rows processed during an outer join operation by applying filters to the outer join condition.

Example: Consider the query:

SELECT * FROM (Customers c 
LEFT JOIN Orders o 
ON c.CustomerId = o.CustomerId)
WHERE o.OrderDate > '2023-01-01';

Although a left outer join is used to retrieve all Customers, the WHERE o.OrderDate > '2023-01-01' condition can be pushed down to the Orders table before the join operation. This limits the number of order records that need to be considered in the join.

Sort Elimination

Sorting is often necessary for SQL operators like ORDER BY, GROUP BY, UNION, and INTERSECT. Sort elimination strategies avoid unnecessary sorting by leveraging existing indexes or restructuring the query execution plan. For instance, if data is already sorted based on an index, an explicit ORDER BY clause on the same column might be redundant.

Common Plan Deduplication

A subplan represents a smaller, reusable component of a query that can be executed independently. By identifying and eliminating redundant subplans, the query optimizer can save processing time.

Example: Consider the following query:

SELECT *
FROM Products p
JOIN OrderDetails od ON p.ProductId = od.ProductId
JOIN Orders o ON od.OrderId = o.OrderId
WHERE o.OrderDate BETWEEN '2023-01-01' AND '2023-12-31' AND p.Price > 100;

This can be optimized by extracting the common filtering condition on Products:

WITH FilteredProducts AS (
  SELECT *
  FROM Products
  WHERE price > 100
)
SELECT *
FROM FilteredProducts fp
JOIN OrderDetails od ON fp.ProductId = od.ProductId
JOIN Orders o ON od.OrderId = o.OrderId
WHERE o.OrderDate BETWEEN '2023-01-01' AND '2023-12-31';

In this optimized version, the filtering of products with a price greater than 100 is performed only once in the FilteredProducts common table expression, which is then used in the subsequent joins.

Materialized View Rewrites

If a materialized view contains the data needed to answer a query, the optimizer can rewrite the original query to directly use the materialized view, significantly reducing execution time.

Constraint Propagation

Query optimizers utilize declarative constraints defined on the database schema (such as primary keys, foreign keys, unique constraints, and non-null constraints) to infer properties about the data and reduce the search space for a query. For example, if a JOIN is performed on a foreign key column that is also defined as NOT NULL, the optimizer might be able to make assumptions about the presence of matching rows.

Attribute Pruning

This optimization eliminates the retrieval of unnecessary columns from the query execution plan.

Example: For the query:

SELECT p.ProductId, p.ProductName, o.OrderDate
FROM Products p
JOIN Orders o ON p.ProductId = o.ProductId;

The query optimizer will ensure that only the ProductId, ProductName from the Products table, and OrderDate from the Orders table are fetched during the execution, avoiding the overhead of retrieving other columns that are not required in the final result.

F1 Query

Before diving into the paper's content, I want to be upfront: my expertise in query engines is limited, and there are certain sections where my understanding is incomplete. Specifically, sections 5.1 and 7.1 contain details that I don't fully grasp enough to provide insightful commentary on them.

Google developed F1 Query as a powerful SQL query engine, initially for its internal F1 database. Its origins lie in the demanding data landscape of Google Ads. The sheer scale of Ads data necessitated highly scalable infrastructure, and F1 Query emerged as a solution. F1 Query is the successor to Dremel, another pioneering data processing technology developed at Google.

This versatile engine can query data across a diverse range of Google's storage systems. It provides a unified SQL interface over OLTP databases like Spanner and F1, OLAP systems such as Napa/Mesa, NoSQL databases like Bigtable, and even simple tabular data in formats like RecordIO, Capacitor, CSV files, and Google Sheets. This cross-functional capability even allows joins between data residing in different systems like Spanner and Bigtable, although this might come with consistency trade-offs. Importantly, F1 Query presents all data as relational tables with standard data types, abstracting away the underlying storage format, even for non-relational data (e.g., representing graph data as tables). F1 Query's architecture is also extensible. Clients have the flexibility to define their own data source and sink adapters, tailoring it to specific needs.

Query results are typically returned directly to the client. However, F1 Query also supports materialization, allowing clients to persist query outputs as RecordIO or Capacitor files on Colossus. Furthermore, results can be temporarily sessioned for efficient reuse in subsequent operations.

Working with Structured Data

Beyond standard SQL, F1 Query offers a rich set of functions for handling structured data – columns containing multiple primitive data types, conceptually similar to C++ structs. These primitive types include common ones like int32, int64, string, bool, and float. Structured data can then be composed of these, for instance:

struct OrderData {
  string name;
  string address;
  float total_micros;
};

Accessing specific fields within structured data is straightforward using the familiar dot (.) operator. For example:

SELECT OrderId, OrderData.name FROM Orders;

F1 Query also supports arrays of primitive types (e.g., Array<int32>) and complex nesting, such as arrays of structs or structs containing arrays. Handling arrays requires a bit more finesse. F1 Query provides the UNNEST operator for this purpose, which we will explore in more detail later.

Finally, F1 Query can also seamlessly work with popular serializable formats like protocol buffers as structured data types. XML and JSON are also supported but protocol buffers are far more efficient than them.

Extensible

F1 Query SQL offers a rich feature set, but its flexibility extends further, allowing for custom enhancements through user-defined functions (UDFs), user-defined aggregate functions (UDAs), and table-valued functions (TVFs).

These extensions can be implemented directly in SQL or leveraging other languages like the scripting language Lua.

User-Defined Functions (UDFs)

UDFs are scalar functions that operate on individual projected values. The term "scalar" implies that these functions are stateless, meaning their output depends solely on the input value for each row. Consider this example:

SELECT DATE(timestamp) FROM Orders;

Here, DATE is a UDF that transforms the timestamp into a more readable date format. 

User-Defined Aggregate Functions (UDAs)

In contrast to stateless UDFs, UDAs are stateful functions, typically implemented with Initialize, Accumulate, and Finalize steps. Notably, UDAs can be constructed using UDFs that share a persistent state across rows.

SELECT COUNT_DISTINCT_DATE(*) FROM Orders;

Here, COUNT_DISTINCT_DATE represents a UDA designed to count the number of distinct dates within the Orders table.

Table-Valued Functions (TVFs)

TVFs differ significantly by accepting an entire relation (a table or the result of a subquery) as input and producing another relation as output. Consequently, TVFs are typically used within the FROM clause of a query.

SELECT * FROM EXTRACT_LAST_DAY(Orders);

In this case, EXTRACT_LAST_DAY is a TVF that returns a table containing orders placed on the last day. Note that even TVFs ultimately expand into standard SQL queries. This underlying SQL representation allows the entire query, including the expanded TVF logic, to be subject to F1 Query's powerful optimization engine.

SELECT * FROM (SELECT * FROM Orders WHERE DATE(timestamp) = CURRENT_DATE());

Architecture

Users interact with F1 Query via a client library, which sends requests to a pool of dedicated servers known as F1 servers. A central F1 master orchestrates all F1 servers.

Each F1 server takes on the responsibility of processing incoming queries, which includes optimization. The optimized query is then broken down into a series of tasks that are distributed to F1 workers for execution. Notably, both F1 servers and F1 workers are designed to be stateless.

The F1 workers are strategically distributed across multiple data centers. To optimize data access, if a worker initially receives a query for a dataset that isn't locally available, the client is transparently redirected to another worker that is geographically closer to the data's source and destination.

The F1 workers make use of UDF servers which stores all the custom functions. UDF servers process bulk inputs to amortize the cost.

Query Execution Phases

The execution of a query in F1 Query proceeds through distinct phases which can be broken down as:
  • Optimization: The raw query from user is optimized using various techniques discussed previously.
  • Physical Plan: The optimized query is converted to a physical execution plan, for example, map-reduce tasks.
  • Execution: The tasks are executed according to the selected mode - interactive or batch.

Operators

The F1 workers can be conceptually viewed as MapReduce counterparts, with their functionalities aligning with standard SQL operators:

  • SCAN: This operator is responsible for reading data directly from the underlying data source. It functions similarly to a Map in the MapReduce paradigm.
  • AGGREGATION: This operator performs data aggregation, potentially based on specific keys, consolidating data received from various SCAN operators (a.k.a., a join). This mirrors the role of a Reduce in MapReduce.
  • SORT: This operator handles the sorting of data streams originating from upstream operators. In the MapReduce analogy, this corresponds to a Shuffle.

The complete execution pipeline for a given query is constructed as a tree of these F1 operators, with the corresponding F1 workers distributed as needed to perform each stage of the processing.

Example: The paper provides an example of the following query and how it translates to the different operators:

Execution Mode

Two execution modes exist: interactive and batch. Interactive execution is well-suited for applications that issue individual, short-lived queries. Conversely, batch execution is intended for applications that execute longer, recurring queries.

Interactive Execution

This mode prioritizes speed by keeping all data in memory and transmitting results through RPC streams. If a failure occurs, the client library automatically retries the entire query. Interactive execution can be implemented in two ways:

  • Single-threaded: A single process handles the complete query execution, with all operators running within a single thread.
  • Distributed: The query execution plan is broken down into fragments and executed by a cluster of F1 worker processes. Multiple F1 workers can be involved at each stage of the query execution.

When a query is executed across multiple F1 workers, for each operator, the input data must be divided among them. This partitioning is handled differently depending on the operator:

  • SCAN Operator Partitioning: The input for the SCAN operator is divided into N partitions based on the table's primary key. For example, a range-based query might have each partition correspond to a specific segment of the row space. These partitions are then distributed to the available F1 workers.
  • AGGREGATION Operator Partitioning: The input for the AGGREGATION operator is partitioned based on the key(s) being aggregated, which may or may not be the same as the table's primary key. The SCAN operator is aware of the aggregation key and routes its output to the correct AGGREGATION worker instances. Furthermore, the SCAN operator may perform some initial aggregation steps before sending the data.

Batch Execution

Many ETL pipelines leverage F1 Query in batch execution mode for continuous data extraction. In this mode, queries are executed reliably using the MapReduce framework. Given that these queries are expected to be long-running, potentially taking several hours to complete, it is essential to checkpoint the progress to prevent data loss in the event of worker failures. This feature is provided by the MapReduce framework.

Another key difference between interactive and batch modes lies in worker activation. In interactive mode, all F1 workers are immediately active to process results from downstream operators. Conversely, in batch mode, workers become active only upon the completion of the downstream operators.

Long-running queries need to be stored in a registry, known as the query registry, which is built on top of Spanner. A query scheduler then schedules these queries on a query executor that manages the MapReduce worker pool.



Join Processing

Joins are fundamental to any query engine. F1 Query implements several types of joins: lookup, hash, and merge joins, and also supports array joins for structured data.

Array Join

Array joins are common in query engines that support structured data. When a structured column contains an array of data, it often needs to be unfolded. A simplified example illustrates this:



Join Type Selection

F1 Query infers the optimal join strategy based on the tables involved. The original F1 database paper illustrates the F1 Query's join type selection process. 

Again, note that, F1 was built to handle Google Ads' immense data demands—driven by the planet-scale traffic it serves (virtually every web page visit triggers multiple queries to Ads servers).

Consider the following schema:
  • Creative (<CustomerId, CreativeId>): Stores all creative assets (text, images, etc.) associated with a customer.
  • AdgroupCreative (<CustomerId, CampaignId, AdgroupId, CreativeId>): Represents an ad, linking a creative to a specific ad group within a campaign for a customer. An ad is uniquely identified by <AdgroupId, CreativeId> within a <CustomerId, CampaignId> shard.
  • AdClick (<AdgroupId, CreativeId, ClickId>): Records ad clicks, with each ad potentially having numerous clicks.
Let's examine the join operation: <AdClick> ⊕ <AdgroupCreative> ⊕ <Creative>

Step 1: <AdClick> ⊕ <AdgroupCreative>: F1 Query employs a lookup join here. Given the typically massive size of the AdClick table (containing click data), the system efficiently looks up corresponding entries in the AdgroupCreative table.

Step 2: <Step 1> ⊕ <Creative>: A hash join is utilized in this step. The Creative table is assumed to be readily accessible in memory, making a hash join an efficient choice for this final stage.

Example Execution

The paper offers an example of how a join is processed by F1 Query, specifically joining Ads and Clicks tables.




The Ads table is stored in an OLTP F1 database, while Clicks are stored in an OLAP Mesa database. Separate F1 workers are designated to extract data from these sources. A dedicated F1 worker, resembling a reducer, receives the streamed results. These workers are sharded to aggregate values based on a specific key.

In this case, F1 Query decides to use a hash join. The hash of the Ads table is stored in-memory in a hash table. When the clicks are streamed back from F1 workers on Mesa, they are joined in-memory with the Ads data and then aggregated. Finally, the aggregated results are streamed back to the F1 server.

Performance Considerations

In hash joins, partitioning the hash space across different workers can lead to skew. This occurs because the distribution of data might cause some hash slices to receive significantly more results than others, making some workers run hotter. To mitigate this, F1 Query supports broadcast hash join, which copies the entire hash to all workers, eliminating partitioning.

In lookup joins, the lookup happens directly within the same fragment. For instance, a SCAN fragment will perform the lookup on the other (larger) table. Since each worker will look up the larger table, it's efficient to perform a bulk lookup of all keys derived from the smaller table. However, skew can still occur if some partitions from the smaller table join with too many values in the larger table. The query optimizer can dynamically repartition the input on the fly. Dynamic range repartitioning is beneficial and often outperforms static partitioning strategies.

F1 Query Optimizer

The F1 query optimizer is a notably intricate part of the F1 query system – personally, I've always found advanced compilers to be complex as well. Section 5.1 is still unclear to me.

In essence, the query optimizer takes the initial query and builds an abstract syntax tree (AST). It likely then uses context-free grammar rules to convert this AST into a relational algebra plan. This plan is then optimized through a series of rewriting steps. As mentioned earlier, F1 query utilizes several rewrite rules, such as filter pushdown, constant folding, attribute pruning, constraint propagation, outer join narrowing, sort elimination, common subplan deduplication, and materialized view rewrites.

The resulting optimized relational algebra plan tree is converted to physical plan tree that is organized into modules called strategies. Each strategy is responsible for converting the logical operators from the relational algebra into concrete physical operators, where each physical operator is represented by a class that manages its data.

Ultimately, the physical operators are assigned to a set of fragments ready for execution.

Production Numbers

Section 8 offers a glimpse into the real-world application of F1 Query within Google:
  • Interactive queries executed in centralized mode demonstrate impressive performance, with 99% completing in under one second. The system handles a substantial load of 500k interactive queries per second, totaling 40 billion queries daily. Interactive queries running in distributed mode typically experience longer processing times.
  • Batch processing handles approximately 55,000 queries each day. However, the sheer volume of data processed by these batch jobs is immense, scaling to tens of petabytes daily. While most batch queries finish within minutes, some can take several hours to complete due to the data intensity.
  • Despite a rapid doubling of the query volume every quarter, the execution latency for interactive queries has remained stable. This consistent performance underscores the remarkable scalability of the F1 Query system.

Paper Review

I found this paper to be a significant challenge, needing multiple readings to fully grasp its intricate details. Perhaps my unfamiliarity with query engine literature contributed to this initial difficulty. The paper is undeniably dense and may present a steep learning curve for newcomers to the database domain.

However, the depth of knowledge gained from persevering through it is immense. I highly recommend this paper to anyone interested in the engineering behind Google Ads – one of the most massive software systems globally – and how it tackles Big Data challenges at planetary scale.

Comments

Popular Posts

Paper Insights #25 - CliqueMap: Productionizing an RMA-Based Distributed Caching System

Memcached is a popular in-memory cache, but I'd like to discuss CliqueMap, Google's caching solution. Having worked closely with CliqueMap, I have a deep understanding of its architecture. One major difference from Memcached is CliqueMap's use of RMA for reads. We'll also take a closer look at RDMA, a crucial cloud technology that emerged in the 2010s. Paper Link Let's begin with some basic concepts. Network Interface Card (NIC) The NIC facilitates data reception and transmission. Understanding its operation requires examining the fundamental interaction between the CPU and memory. CPU <-> Memory Communication In a Von Neumann Architecture , the CPU and memory are core components, enabling Turing computation. Their communication relies on the system bus (e.g. PCIe ), a set of electrical pathways connecting the CPU, memory, and I/O devices. The system bus comprises three primary logical components: Data Bus : Bidirectional, carrying the actual data being tran...

Paper Insights #26 - Don't Settle for Eventual: Scalable Causal Consistency for Wide-Area Storage with COPS

This work provides a strong foundation for understanding causality , both within distributed systems and more broadly. Its principles underpin systems achieving causal consistency, a powerful form of consistency that ensures high availability. Presented at SOSP 2011, this paper features contributions from prominent distributed systems researchers Wyatt Lloyd and Michael Freedman . Paper Link Let's begin with some basic concepts. Causal Ordering In 1978, Leslie Lamport published Time, Clocks, and the Ordering of Events in a Distributed System , a seminal paper that significantly impacted distributed system design. This work, alongside Paxos and TLA+ , stands as one of Lamport's most influential contributions. A fundamental challenge in distributed systems is clock synchronization . Perfect synchronization is unattainable, a fact rooted in both computer science and physics. However, the goal isn't perfect synchronization itself, but rather the ability to totally order even...

Paper Insights #24 - Spanner: Google's Globally-Distributed Database

This landmark paper, presented at ODSI '12, has become one of Google's most significant contributions to distributed computing. It didn't solve the long-standing core problem of scalability of 2PC in distributed systems, rather, it introduced  TrueTime  that revolutionized system assumptions. Authored by J.C. Corbett , with contributions from pioneers like Jeff Dean and Sanjay Ghemawat , this paper effectively ended my exploration of distributed SQL databases. It represents the leading edge of the field. Paper Link I would highly recommend reading the following before jumping into this article: 1.  Practical Uses of Synchronized Clocks in Distributed Systems where I introduced why clock synchronization is necessary but not sufficient for external consistency. 2.  A New Presumed Commit Optimization for Two Phase Commit where I introduced two-phase commits (2PC) and how it is solved in a distributed system. 3.  Amazon Aurora: Design Considerations for High Th...