Query Engine
SQL (Structured Query Language) stands as the standard formal language for data extraction and processing. A query engine, then, is the system that executes these SQL queries.
Query Engines vs. SQL Databases
SQL databases manage the storage of data in a relational format and handle transactions on that data. They inherently provide atomicity, concurrency control, durability, and transaction isolation. In contrast, a query engine's role is to extract a view of the data from a SQL database based on the user's formal query.
In traditional, standalone databases, the query engine is integrated directly within the SQL database itself. For instance, in PostgreSQL, the query parser, planner, and executor are all components of the core database system.
However, distributed SQL databases often separate the query engine into a distinct layer. In this architecture, query engines operate as independent units that can interface with various underlying SQL databases. These databases can be OLTP (like Spanner or Aurora) or OLAP (such as Napa/Mesa or Snowflake). Query engines can also be used with NoSQL databases.
Examples of Query Engines
Several query engines exists, many offered through cloud:
- Dremel: A pioneering query engine developed by Google, which later formed the foundation for Google Cloud's BigQuery.
- F1 Query: The successor to Dremel, currently in use within Google.
- Apache Hive: Originally developed by Facebook for querying and managing large datasets residing in distributed storage.
- Amazon Athena: Interactive query service provided by Amazon Web Services that enables querying data in Amazon S3 using standard SQL.
- Apache Drill: An open-source, schema-free SQL query engine for Big Data exploration.
Core Components of Query Engines
- Executor (Driver): It parses and analyzes the incoming query, optimizes it, breaks it down into executable tasks (often for a parallel processing framework like MapReduce), and assigns these tasks to available workers.
- Workers: These are the processes responsible for carrying out the tasks required by a query.
- Metadata Store: This component holds crucial information such as the location of data, access methods, and user-defined functions (UDFs).
Read Consistency
Regardless of whether you're using a SQL or NoSQL database, query engines themselves cannot guarantee a consistent read without support from the underlying data store.
Consider our previous example to illustrate this potential issue. When a query is executed across multiple workers, each worker might begin processing at different times. This can lead to inconsistencies. For instance, one mapper might read its data instance before certain transactions are committed, while another mapper reads its instance after those same transactions.
Let's say the first mapper reads the first instance for Computer Science department. Subsequently, a transaction adds a new row to the first instance for the Computer Science department with a professor whose name begins with 's'. Simultaneously, another transaction adds a row to the second instance for the Mechanical department, also with a professor whose name begins with 's'. Finally, the mapper reading from the second instance performs its read.
The problem here is that the first mapper's read doesn't reflect either of the new transactions, while the second mapper's read includes both the transaction. This results in an inconsistent view of the data.
To achieve consistent reads, the underlying data store needs to be versioned and multi-row transactional. Google Spanner is an example of such a database. It globally orders all transactions in real-time. Consequently, when a query is executed in Spanner, all reads occur at a specific point in time, providing a consistent snapshot of the data (a.k.a snapshot isolation). Even with concurrent modifications, the data returned by the query will represent the state of the database at the chosen query time.
Regardless of whether you're using a SQL or NoSQL database, query engines themselves cannot guarantee a consistent read without support from the underlying data store.
Consider our previous example to illustrate this potential issue. When a query is executed across multiple workers, each worker might begin processing at different times. This can lead to inconsistencies. For instance, one mapper might read its data instance before certain transactions are committed, while another mapper reads its instance after those same transactions.
Let's say the first mapper reads the first instance for Computer Science department. Subsequently, a transaction adds a new row to the first instance for the Computer Science department with a professor whose name begins with 's'. Simultaneously, another transaction adds a row to the second instance for the Mechanical department, also with a professor whose name begins with 's'. Finally, the mapper reading from the second instance performs its read.
The problem here is that the first mapper's read doesn't reflect either of the new transactions, while the second mapper's read includes both the transaction. This results in an inconsistent view of the data.
To achieve consistent reads, the underlying data store needs to be versioned and multi-row transactional. Google Spanner is an example of such a database. It globally orders all transactions in real-time. Consequently, when a query is executed in Spanner, all reads occur at a specific point in time, providing a consistent snapshot of the data (a.k.a snapshot isolation). Even with concurrent modifications, the data returned by the query will represent the state of the database at the chosen query time.
SQL Joins
A frequent operation in data processing involves combining two relations based on a related attribute, followed by further processing of the resulting dataset. Joins are therefore a fundamental operation in query engines.
Query engines employ several strategies to perform join operations, each optimized for different data characteristics.
Lookup Joins
These are particularly effective when joining a small table with a significantly larger one. The query engine reads the entirety of the smaller table and then performs targeted lookups in the larger table for matching rows based on the join condition.Hash Joins
Hash joins are a common and versatile join strategy that proceeds in the following steps:
- The query engine reads both tables and identifies the smaller one.
- An in-memory hash table is constructed using the join key(s) from the smaller table. If the hash table exceeds available memory, portions of it may be spilled to disk.
- The larger table is then scanned row by row. For each row in the larger table, the join key(s) are used to probe the hash table of the smaller table to find matching rows.
Merge Joins
Merge joins are efficient when both tables involved in the join are already sorted (or have indexes that enforce an ordering) on the columns participating in the equality join condition. The algorithm operates as follows (analogous to merge step in merge sort):
- The query engine verifies the existence of suitable indexes on the equality columns of both tables.
- The rows of both tables are then merged based on the sorted order of the join columns.
- For each match found in the join columns, the corresponding rows are combined in the result. If there are multiple matching rows in either table for a given join key, a cartesian product of those matching rows is generated.
- If the join column values do not match, the row with the smaller value is discarded, and the next row from that table is considered. This process continues until all rows from both tables have been processed.
View Materialization
Once a query is executed, the resulting data is typically sent back to the client. However, query engines also often have the capability to materialize these results into a persistent table stored within the same database as the original data.
These materialized tables, commonly known as materialized views, offer significant performance benefits for subsequent queries that can leverage the pre-computed data.
A prime use case is the creation of roll-up tables in analytical databases. By materializing aggregated data into these roll-up tables, future analytical queries against this summarized information execute much faster.
Query Optimization
Filter Pushdown
Constant Folding
Outer Join Narrowing
Sort Elimination
Common Plan Deduplication
Materialized View Rewrites
Constraint Propagation
Attribute Pruning
F1 Query
Working with Structured Data
Extensible
User-Defined Functions (UDFs)
User-Defined Aggregate Functions (UDAs)
Table-Valued Functions (TVFs)
Architecture
The F1 workers are strategically distributed across multiple data centers. To optimize data access, if a worker initially receives a query for a dataset that isn't locally available, the client is transparently redirected to another worker that is geographically closer to the data's source and destination.
The F1 workers make use of UDF servers which stores all the custom functions. UDF servers process bulk inputs to amortize the cost.
Query Execution Phases
- Optimization: The raw query from user is optimized using various techniques discussed previously.
- Physical Plan: The optimized query is converted to a physical execution plan, for example, map-reduce tasks.
- Execution: The tasks are executed according to the selected mode - interactive or batch.
Operators
The F1 workers can be conceptually viewed as MapReduce counterparts, with their functionalities aligning with standard SQL operators:
- SCAN: This operator is responsible for reading data directly from the underlying data source. It functions similarly to a Map in the MapReduce paradigm.
- AGGREGATION: This operator performs data aggregation, potentially based on specific keys, consolidating data received from various SCAN operators (a.k.a., a join). This mirrors the role of a Reduce in MapReduce.
- SORT: This operator handles the sorting of data streams originating from upstream operators. In the MapReduce analogy, this corresponds to a Shuffle.
The complete execution pipeline for a given query is constructed as a tree of these F1 operators, with the corresponding F1 workers distributed as needed to perform each stage of the processing.
Example: The paper provides an example of the following query and how it translates to the different operators:
Execution Mode
Two execution modes exist: interactive and batch. Interactive execution is well-suited for applications that issue individual, short-lived queries. Conversely, batch execution is intended for applications that execute longer, recurring queries.
Interactive Execution
This mode prioritizes speed by keeping all data in memory and transmitting results through RPC streams. If a failure occurs, the client library automatically retries the entire query. Interactive execution can be implemented in two ways:
- Single-threaded: A single process handles the complete query execution, with all operators running within a single thread.
- Distributed: The query execution plan is broken down into fragments and executed by a cluster of F1 worker processes. Multiple F1 workers can be involved at each stage of the query execution.
When a query is executed across multiple F1 workers, for each operator, the input data must be divided among them. This partitioning is handled differently depending on the operator:
- SCAN Operator Partitioning: The input for the SCAN operator is divided into N partitions based on the table's primary key. For example, a range-based query might have each partition correspond to a specific segment of the row space. These partitions are then distributed to the available F1 workers.
- AGGREGATION Operator Partitioning: The input for the AGGREGATION operator is partitioned based on the key(s) being aggregated, which may or may not be the same as the table's primary key. The SCAN operator is aware of the aggregation key and routes its output to the correct AGGREGATION worker instances. Furthermore, the SCAN operator may perform some initial aggregation steps before sending the data.
Batch Execution
Many ETL pipelines leverage F1 Query in batch execution mode for continuous data extraction. In this mode, queries are executed reliably using the MapReduce framework. Given that these queries are expected to be long-running, potentially taking several hours to complete, it is essential to checkpoint the progress to prevent data loss in the event of worker failures. This feature is provided by the MapReduce framework.
Another key difference between interactive and batch modes lies in worker activation. In interactive mode, all F1 workers are immediately active to process results from downstream operators. Conversely, in batch mode, workers become active only upon the completion of the downstream operators.
Long-running queries need to be stored in a registry, known as the query registry, which is built on top of Spanner. A query scheduler then schedules these queries on a query executor that manages the MapReduce worker pool.
Join Processing
Array Join
Join Type Selection
- Creative (<CustomerId, CreativeId>): Stores all creative assets (text, images, etc.) associated with a customer.
- AdgroupCreative (<CustomerId, CampaignId, AdgroupId, CreativeId>): Represents an ad, linking a creative to a specific ad group within a campaign for a customer. An ad is uniquely identified by <AdgroupId, CreativeId> within a <CustomerId, CampaignId> shard.
- AdClick (<AdgroupId, CreativeId, ClickId>): Records ad clicks, with each ad potentially having numerous clicks.
Example Execution
Performance Considerations
F1 Query Optimizer
The F1 query optimizer is a notably intricate part of the F1 query system – personally, I've always found advanced compilers to be complex as well. Section 5.1 is still unclear to me.
In essence, the query optimizer takes the initial query and builds an abstract syntax tree (AST). It likely then uses context-free grammar rules to convert this AST into a relational algebra plan. This plan is then optimized through a series of rewriting steps. As mentioned earlier, F1 query utilizes several rewrite rules, such as filter pushdown, constant folding, attribute pruning, constraint propagation, outer join narrowing, sort elimination, common subplan deduplication, and materialized view rewrites.
The resulting optimized relational algebra plan tree is converted to physical plan tree that is organized into modules called strategies. Each strategy is responsible for converting the logical operators from the relational algebra into concrete physical operators, where each physical operator is represented by a class that manages its data.
Ultimately, the physical operators are assigned to a set of fragments ready for execution.
Production Numbers
- Interactive queries executed in centralized mode demonstrate impressive performance, with 99% completing in under one second. The system handles a substantial load of 500k interactive queries per second, totaling 40 billion queries daily. Interactive queries running in distributed mode typically experience longer processing times.
- Batch processing handles approximately 55,000 queries each day. However, the sheer volume of data processed by these batch jobs is immense, scaling to tens of petabytes daily. While most batch queries finish within minutes, some can take several hours to complete due to the data intensity.
- Despite a rapid doubling of the query volume every quarter, the execution latency for interactive queries has remained stable. This consistent performance underscores the remarkable scalability of the F1 Query system.
Comments
Post a Comment