timsaucer commented on code in PR #74: URL: https://github.com/apache/datafusion-site/pull/74#discussion_r2135540165
########## content/blog/2025-06-15-optimizing-sql-dataframes-part-one.md: ########## @@ -0,0 +1,249 @@ +--- +layout: post +title: Optimizing SQL (and DataFrames) in DataFusion, Part 1: Query Optimization Overview +date: 2025-06-15 +author: alamb, akurmustafa +categories: [core] +--- + +<!-- +{% comment %} +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to you under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +{% endcomment %} +--> + + + +*Note: this blog was originally published [on the InfluxData blog](https://www.influxdata.com/blog/optimizing-sql-dataframes-part-one/)* + + +## Introduction + +Sometimes Query Optimizers are seen as a sort of black magic, [“the most +challenging problem in computer +science,”](https://15799.courses.cs.cmu.edu/spring2025/) according to Father +Pavlo, or some behind-the-scenes player. We believe this perception is because: + + +1. One must implement the rest of a database system (data storage, transactions, + SQL parser, expression evaluation, plan execution, etc.) **before** the + optimizer becomes critical[^5]. + +2. Some parts of the optimizer are tightly tied to the rest of the system (e.g., + storage or indexes), so many classic optimizers are described with + system-specific terminology. + +3. Some optimizer tasks, such as access path selection and join order are known + challenges and not yet solved (practically)—maybe they really do require + black magic 🤔. + +However, Query Optimizers are no more complicated in theory or practice than other parts of a database system, as we will argue in a series of posts: + +**Part 1: (this post)**: + +* Review what a Query Optimizer is, what it does, and why you need one for SQL and DataFrames. +* Describe how industrial Query Optimizers are structured and standard optimization classes. + +**Part 2:** + +* Describe the optimization categories with examples and pointers to implementations. +* Describe [Apache DataFusion](https://datafusion.apache.org/)’s rationale and approach to query optimization, specifically for access path and join ordering. + +After reading these blogs, we hope people will use DataFusion to: + +1. Build their own system specific optimizers. +2. Perform practical academic research on optimization (especially researchers + working on new optimizations / join ordering—looking at you [CMU + 15-799](https://15799.courses.cs.cmu.edu/spring2025/), next year). + + +## Query Optimizer Background + +The key pitch for querying databases, and likely the key to the longevity of SQL +(despite people’s love/hate relationship—see [SQL or Death? Seminar Series – +Spring 2025](https://db.cs.cmu.edu/seminar2025/)), is that it disconnects the +`WHAT` you want to compute from the `HOW` to do it. SQL is a *declarative* +language—it describes what answers are desired rather than an *imperative* +language such as Python, where you describe how to do the computation as shown +in Figure 1. + +<img src="/blog/images/optimizing-sql-dataframes/query-execution.png" width="80%" class="img-responsive" alt="Fig 1: Query Execution."/> + +**Figure 1**: Query Execution: Users describe the answer they want using either +a DataFrame or SQL. The query planner or DataFrame API translates that +description into an *Initial Plan*, which is correct but slow. The Query +Optimizer then rewrites the initial plan to an *Optimized Plan*, which computes +the same results but faster and more efficiently. Finally, the Execution Engine +executes the optimized plan producing results. + +## SQL, DataFrames, LogicalPlan Equivalence + +Given their name, it is not surprising that Query Optimizers can improve the +performance of SQL queries. However, it is under-appreciated that this also +applies to DataFrame style APIs. + +Classic DataFrame systems such as [pandas] and [Polars] (by default) execute +eagerly and thus have limited opportunities for optimization. However, more +modern APIs such as [Polar's lazy API], [Apache Spark's DataFrame]. and +[DataFusion's DataFrame] are much faster as they use the design shown in Figure +1 and apply many query optimization techniques. + +[pandas]: https://pandas.pydata.org/ +[Polars]: https://pola.rs/) +[Polar'’'s lazy API]: https://docs.pola.rs/user-guide/lazy/using/ +[Apache Spark's DataFrame]: https://spark.apache.org/docs/latest/sql-programming-guide.html#datasets-and-dataframes), +[DataFusion's DataFrame]: https://datafusion.apache.org/user-guide/dataframe.html + +## Example of Query Optimizer + +This section motivates the value of a Query Optimizer with an example. Let’s say +you have some observations of animal behavior, as illustrated in Table 1. + +<img src="/blog/images/optimizing-sql-dataframes/table1.png" width="75%" class="img-responsive" alt="Table 1: Observational Data."/> + +**Table 1**: Example observational data. + +If the user wants to know the average population for some species in the last +month, a user can write a SQL query or a DataFrame such as the following: + +SQL: + +```sql +SELECT location, AVG(population) +FROM observations +WHERE species = ‘contrarian spider’ AND + observation_time >= now() - interval '1 month' +GROUP BY location +``` + +DataFrame: + +```rust +df.scan("observations") + .filter(col("species").eq("contrarian spider")) + .filter(col("observation_time").ge(now()).sub(interval('1 month'))) + .agg(vec![col(location)], vec![avg(col("population")]) +``` + +Within DataFusion, both the SQL and DataFrame are translated into the same +[`LogicalPlan`] , a “tree of relational operators.” This is a fancy way of +saying data flow graphs where the edges represent tabular data (rows + columns) +and the nodes represent a transformation (see [this DataFusion overview video] +for more details). The initial `LogicalPlan` for the queries above is shown in +Figure 2. + +[`LogicalPlan`]: https://docs.rs/datafusion/latest/datafusion/logical_expr/enum.LogicalPlan.html +[this DataFusion overview video]: https://youtu.be/EzZTLiSJnhY + +<img src="/blog/images/optimizing-sql-dataframes/initial-logical-plan.png" width="72%" class="img-responsive" alt="Fig 2: Initial Logical Plan."/> + +**Figure 2**: Example initial `LogicalPlan` for SQL and DataFrame query. The +plan is read from bottom to top, computing the results in each step. + +The optimizer's job is to take this query plan and rewrite it into an alternate +plan that computes the same results but faster, such as the one shown in Figure +3. + +<img src="/blog/images/optimizing-sql-dataframes/optimized-logical-plan.png" width="80%" class="img-responsive" alt="Fig 3: Optimized Logical Plan."/> + +**Figure 3**: An example optimized plan that computes the same result as the +plan in Figure 2 more efficiently. The diagram highlights where the optimizer +has applied *Projection Pushdown*, *Filter Pushdown*, and *Constant Evaluation*. +Note that this is a simplified example for explanatory purposes, and actual +optimizers such as the one in DataFusion perform additional tasks such as +choosing specific aggregation algorithms. + + +## Query Optimizer Implementation + +Industrial optimizers, such as +DataFusion’s ([source](https://github.com/apache/datafusion/tree/334d6ec50f36659403c96e1bffef4228be7c458e/datafusion/optimizer/src)), +ClickHouse ([source](https://github.com/ClickHouse/ClickHouse/tree/master/src/Analyzer/Passes),[source](https://github.com/ClickHouse/ClickHouse/tree/master/src/Processors/QueryPlan/Optimizations)), +DuckDB ([source](https://github.com/duckdb/duckdb/tree/4afa85c6a4dacc39524d1649fd8eb8c19c28ad14/src/optimizer)), +and Apache Spark ([source](https://github.com/apache/spark/tree/7bc8e99cde424c59b98fe915e3fdaaa30beadb76/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer)), +are implemented as a series of passes or rules that rewrite a query plan. The +overall optimizer is composed of a sequence of these rules,[^6] as shown in Review Comment: Link didn't work ########## content/blog/2025-06-15-optimizing-sql-dataframes-part-two.md: ########## @@ -0,0 +1,533 @@ +--- +layout: post +title: Optimizing SQL (and DataFrames) in DataFusion, Part 2: Optimizers in Apache DataFusion +date: 2025-06-15 +author: alamb, akurmustafa +categories: [core] +--- + +<!-- +{% comment %} +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to you under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +{% endcomment %} +--> + +*Note, this blog was originally published [on the InfluxData blog].* + +[on the InfluxData blog]: https://www.influxdata.com/blog/optimizing-sql-dataframes-part-two/ + +In the [first part of this post], we discussed what a Query Optimizer is, what +role it plays, and described how industrial optimizers are organized. In this +second post, we describe various optimizations that are found in [Apache +DataFusion](https://datafusion.apache.org/) and other industrial systems in more +detail. + + +DataFusion contains high quality, full-featured implementations for *Always +Optimizations* and *Engine Specific Optimizations* (defined in Part 1). +Optimizers are implemented as rewrites of `LogicalPlan` in the [logical +optimizer](https://github.com/apache/datafusion/tree/main/datafusion/optimizer) +or rewrites of `ExecutionPlan` in the [physical +optimizer](https://github.com/apache/datafusion/tree/main/datafusion/physical-optimizer). +This design means the same optimizer passes are applied for SQL queries, +DataFrame queries, as well as plans for other query language frontends such as +[InfluxQL](https://github.com/influxdata/influxdb3_core/tree/26a30bf8d6e2b6b3f1dd905c4ec27e3db6e20d5f/iox_query_influxql) +in InfluxDB 3.0, +[PromQL](https://github.com/GreptimeTeam/greptimedb/blob/0bd322a078cae4f128b791475ec91149499de33a/src/query/src/promql/planner.rs#L1) +in [Greptime](https://greptime.com/), and +[vega](https://github.com/vega/vegafusion/tree/dc15c1b9fc7d297f12bea919795d58cda1c88fcf/vegafusion-core/src/planning) +in [VegaFusion](https://vegafusion.io/). + + +[first part of this post]: https://datafusion.apache.org/blog/2025/06/15/optimizing-sql-dataframes-part-one + +## Always Optimizations + +Some optimizations are so important they are found in almost all query engines +and are typically the first implemented as they provide the largest cost / +benefit ratio (and performance is terrible without them). + + +### Predicate/Filter Pushdown + +**Why**: Avoid carrying unneeded *rows *as soon as possible + +**What**: Moves filters “down” in the plan so they run earlier during execution, as shown in Figure 1. + +**Example Implementations**: [DataFusion], [DuckDB], [ClickHouse] + +[DataFusion]: https://github.com/apache/datafusion/blob/main/datafusion/optimizer/src/push_down_filter.rs +[DuckDB]: https://github.com/duckdb/duckdb/blob/main/src/optimizer/filter_pushdown.cpp +[ClickHouse]: https://github.com/ClickHouse/ClickHouse/blob/master/src/Processors/QueryPlan/Optimizations/filterPushDown.cpp + +The earlier data is filtered out in the plan, the less work the rest of the plan +has to do. Most mature databases aggressively use filter pushdown / early +filtering combined with techniques such as partition and storage pruning (e.g. +[Parquet Row Group pruning]) for performance. + +[Parquet Row Group pruning]: https://blog.xiangpeng.systems/posts/parquet-to-arrow/ + +An extreme, and somewhat contrived, is the query + +```sql +SELECT city, COUNT(*) FROM population GROUP BY city HAVING city = 'BOSTON'; +``` + +Semantically, `HAVING` is [evaluated after] `GROUP BY` in SQL. However, computing +the population of all cities and discarding everything except Boston is much +slower than only computing the population for Boston and so most Query +Optimizers will evaluate the filter before the aggregation. + +[evaluated after]: https://www.datacamp.com/tutorial/sql-order-of-execution + +<img src="/blog/images/optimizing-sql-dataframes/filter-pushdown.png" width="80%" class="img-responsive" alt="Fig 1: Filter Pushdown."/> + +**Figure 1**: Filter Pushdown. In (**A**) without filter pushdown, the operator +processes more rows, reducing efficiency. In (**B**) with filter pushdown, the +operator receives fewer rows, resulting in less overall work and leading to a +faster and more efficient query. + + +### Projection Pushdown + +**Why**: Avoid carrying unneeded *columns *as soon as possible + +**What: **Pushes “projection” (keeping only certain columns) earlier in the plan, as shown in Figure 2. + +**Example Implementations: **Implementations: [DataFusion](https://github.com/apache/datafusion/blob/main/datafusion/physical-optimizer/src/projection_pushdown.rs), [DuckDB](https://github.com/duckdb/duckdb/blob/a8a6a080c8809d5d4b3c955e9f113574f6f0bfe0/src/optimizer/pushdown/pushdown_projection.cpp), [ClickHouse](https://github.com/ClickHouse/ClickHouse/blob/master/src/Processors/QueryPlan/Optimizations/optimizeUseNormalProjection.cpp) + +Similarly to the motivation for *Filter Pushdown*, the earlier the plan stops +doing something, the less work it does overall and thus the faster it runs. For +Projection Pushdown, if columns are not needed later in a plan, copying the data +to the output of other operators is unnecessary and the costs of copying can add +up. For example, in Figure 3 of Part 1, the `species` column is only needed to +evaluate the Filter within the scan and `notes` are never used, so it is +unnecessary to copy them through the rest of the plan. + +Projection Pushdown is especially effective and important for column store +databases, where the storage format itself (such as [Apache Parquet]) supports +efficiently reading only a subset of required columns, and is [especially +powerful in combination with filter pushdown]. Projection Pushdown is still +important, but less effective for row oriented formats such as JSON or CSV where +each column in each row must be parsed even if it is not used in the plan. + +[Apache Parquet]: https://parquet.apache.org/ +[especially powerful in combination with filter pushdown]: https://blog.xiangpeng.systems/posts/parquet-pushdown/ + +<img src="/blog/images/optimizing-sql-dataframes/projection-pushdown.png" width="80%" class="img-responsive" alt="Fig 2: Projection Pushdown."/> + +**Figure 2:** In (**A**) without projection pushdown, the operator receives more +columns, reducing efficiency. In (**B**) with projection pushdown, the operator +receives fewer columns, leading to optimized execution. + +### Limit Pushdown + +**Why**: The earlier the plan stops generating data, the less overall work it +does, and some operators have more efficient limited implementations. + +**What: **Pushes limits (maximum row counts) down in a plan as early as possible. + +**Example Implementations:** [DataFusion](https://github.com/apache/datafusion/blob/main/datafusion/optimizer/src/push_down_limit.rs), [DuckDB](https://github.com/duckdb/duckdb/blob/main/src/optimizer/limit_pushdown.cpp), [ClickHouse](https://github.com/ClickHouse/ClickHouse/blob/master/src/Processors/QueryPlan/Optimizations/limitPushDown.cpp), Spark ([Window](https://github.com/apache/spark/blob/7bc8e99cde424c59b98fe915e3fdaaa30beadb76/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushDownThroughWindow.scala) and [Projection](https://github.com/apache/spark/blob/7bc8e99cde424c59b98fe915e3fdaaa30beadb76/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PushProjectionThroughLimit.scala)) + +Often queries have a `LIMIT ` or other clause that allows them to stop generating +results early so the sooner they can stop execution, the more efficiently they +will execute. + +In addition, DataFusion and other systems have more efficient implementations of +some operators that can be used if there is a limit. The classic example is +replacing a full sort + limit with a [TopK] operator that only tracks the top +values using a heap. Similarly, DataFusion’s Parquet reader stops fetching and +opening additional files once the limit has been hit. + +[TopK]: https://docs.rs/datafusion/latest/datafusion/physical_plan/struct.TopK.html + +<img src="/blog/images/optimizing-sql-dataframes/limit-pushdown.png" width="80%" class="img-responsive" alt="Fig 3: Limit Pushdown."/> + +**Figure 3**: In (**A**), without limit pushdown all data is sorted and +everything except the first few rows are discarded. In (**B**), with limit +pushdown, Sort is replaced with TopK operator which does much less work. + + +### Expression Simplification / Constant Folding + +**Why**: Evaluating the same expression for each row when the value doesn’t change is wasteful. + +**What**: Partially evaluates and/or algebraically simplify expressions. + +**Example Implementations:** [DataFusion](https://github.com/apache/datafusion/tree/main/datafusion/optimizer/src/simplify_expressions), DuckDB (has several [rules](https://github.com/duckdb/duckdb/tree/7b18f0f3691c1b6367cf68ed2598d7034e14f41b/src/optimizer/rule) such as [constant folding](https://github.com/duckdb/duckdb/blob/7b18f0f3691c1b6367cf68ed2598d7034e14f41b/src/optimizer/rule/constant_folding.cpp), and [comparison simplification](https://github.com/duckdb/duckdb/blob/7b18f0f3691c1b6367cf68ed2598d7034e14f41b/src/optimizer/rule/comparison_simplification.cpp)), [Spark](https://github.com/apache/spark/blob/7bc8e99cde424c59b98fe915e3fdaaa30beadb76/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala) + +If an expression doesn’t change from row to row, it is better to evaluate the +expression **once** during planning. This is a classic compiler technique and is +also used in database systems + +For example, given a query that finds all values from the current year + +```sql +SELECT … WHERE extract(year from time_column) = extract(year from now()) +``` + +Evaluating `extract(year from now())` on every row is much more expensive than +evaluating it once during planning time so that the query becomes comparison to +a constant + +```sql +SELECT … WHERE extract(year from time_column) = 2025 +``` + +Furthermore, it is often possible to push such predicates **into** scans. + +### Rewriting `OUTER JOIN` → `INNER JOIN` + +**Why:** `INNER JOIN` implementations are almost always faster (as they are +simpler) than `OUTER JOIN` implementations, and `INNER JOIN` s impose fewer +restrictions on other optimizer passes (such as join reordering and additional +filter pushdown). + +**What**: In cases where it is known that NULL rows introduced by an `OUTER +JOIN` will not appear in the results, it can be rewritten to an <code>INNER +JOIN</code>. + +**Example Implementations:** [DataFusion](https://github.com/apache/datafusion/blob/6028474969f0bfead96eb7f413791470afb6bf82/datafusion/optimizer/src/eliminate_outer_join.rs), [Spark](https://github.com/apache/spark/blob/7bc8e99cde424c59b98fe915e3fdaaa30beadb76/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala#L124-L158), [ClickHouse](https://github.com/ClickHouse/ClickHouse/blob/master/src/Processors/QueryPlan/Optimizations/convertOuterJoinToInnerJoin.cpp). + +For example, given a query such as the following + +```SQL +SELECT … +FROM orders LEFT OUTER JOIN customer ON (orders.cid = customer.id) +WHERE customer.last_name = 'Lamb' +``` + +The `LEFT OUTER JOIN` keeps all rows in `orders` that don’t have a matching +customer, but fills in the fields with `null`. All such rows will be filtered +out by `customer.last_name = 'Lamb'`, and thus an INNER JOIN produces the same +answer. This is illustrated in Figure 4. + +<img src="/blog/images/optimizing-sql-dataframes/join-rewrite.png" width="80%" class="img-responsive" alt="Fig 4: Join Rewrite."/> + +**Figure 4**: Rewriting `OUTER JOIN` to `INNER JOIN`. In (A) the original query +contains an `OUTER JOIN` but also a filter on `customer.last_name`, which +filters out all rows that might be introduced by the `OUTER JOIN`. In (B) the +`OUTER JOIN` is converted to inner join, a more efficient implementation can be +used. + + +## Engine Specific Optimizations + +As discussed in Part 1 of this blog, optimizers also contain a set of passes +that are still always good to do, but are closely tied to the specifics of the +query engine. This section describes some common types + +### Subquery Rewrites + +**Why**: Actually implementing subqueries by running a query for each row of the outer query is very expensive. + +**What**: It is possible to rewrite subqueries as joins which often perform much better. + +**Example Implementations:** DataFusion ([one](https://github.com/apache/datafusion/blob/main/datafusion/optimizer/src/decorrelate.rs), [two](https://github.com/apache/datafusion/blob/main/datafusion/optimizer/src/decorrelate_predicate_subquery.rs), [three](https://github.com/apache/datafusion/blob/main/datafusion/optimizer/src/scalar_subquery_to_join.rs)), [Spark](https://github.com/apache/spark/blob/7bc8e99cde424c59b98fe915e3fdaaa30beadb76/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala) + +Evaluating subqueries a row at a time is so expensive that execution engines in +high performance analytic systems such as DataFusion and [Vertica] may not even +support row-at-a-time evaluation given how terrible the performance would be. +Instead, analytic systems rewrite such queries into joins which can perform 100s +or 1000s of times faster for large datasets. However, transforming subqueries to +joins requires “exotic” join semantics such as `SEMI JOIN`, `ANTI JOIN` and +variations on how to treat equality with null[^7]. Review Comment: Link didn't work ########## content/blog/2025-06-15-optimizing-sql-dataframes-part-one.md: ########## @@ -0,0 +1,249 @@ +--- +layout: post +title: Optimizing SQL (and DataFrames) in DataFusion, Part 1: Query Optimization Overview +date: 2025-06-15 +author: alamb, akurmustafa +categories: [core] +--- + +<!-- +{% comment %} +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to you under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +{% endcomment %} +--> + + + +*Note: this blog was originally published [on the InfluxData blog](https://www.influxdata.com/blog/optimizing-sql-dataframes-part-one/)* + + +## Introduction + +Sometimes Query Optimizers are seen as a sort of black magic, [“the most +challenging problem in computer +science,”](https://15799.courses.cs.cmu.edu/spring2025/) according to Father +Pavlo, or some behind-the-scenes player. We believe this perception is because: + + +1. One must implement the rest of a database system (data storage, transactions, + SQL parser, expression evaluation, plan execution, etc.) **before** the + optimizer becomes critical[^5]. + +2. Some parts of the optimizer are tightly tied to the rest of the system (e.g., + storage or indexes), so many classic optimizers are described with + system-specific terminology. + +3. Some optimizer tasks, such as access path selection and join order are known + challenges and not yet solved (practically)—maybe they really do require + black magic 🤔. + +However, Query Optimizers are no more complicated in theory or practice than other parts of a database system, as we will argue in a series of posts: + +**Part 1: (this post)**: + +* Review what a Query Optimizer is, what it does, and why you need one for SQL and DataFrames. +* Describe how industrial Query Optimizers are structured and standard optimization classes. + +**Part 2:** + +* Describe the optimization categories with examples and pointers to implementations. +* Describe [Apache DataFusion](https://datafusion.apache.org/)’s rationale and approach to query optimization, specifically for access path and join ordering. + +After reading these blogs, we hope people will use DataFusion to: + +1. Build their own system specific optimizers. +2. Perform practical academic research on optimization (especially researchers + working on new optimizations / join ordering—looking at you [CMU + 15-799](https://15799.courses.cs.cmu.edu/spring2025/), next year). + + +## Query Optimizer Background + +The key pitch for querying databases, and likely the key to the longevity of SQL +(despite people’s love/hate relationship—see [SQL or Death? Seminar Series – +Spring 2025](https://db.cs.cmu.edu/seminar2025/)), is that it disconnects the +`WHAT` you want to compute from the `HOW` to do it. SQL is a *declarative* +language—it describes what answers are desired rather than an *imperative* +language such as Python, where you describe how to do the computation as shown +in Figure 1. + +<img src="/blog/images/optimizing-sql-dataframes/query-execution.png" width="80%" class="img-responsive" alt="Fig 1: Query Execution."/> Review Comment: The text blocks saying "DataFrame API" and "Query Planner" makes it seem like when you use the DataFrame API that you don't go through the query planner. From my understanding of the code base, both go through the query planner. Rather it's just a difference in how we interface - parsing SQL or using DataFrame - to get the initial plan. ########## content/blog/2025-06-15-optimizing-sql-dataframes-part-one.md: ########## @@ -0,0 +1,249 @@ +--- +layout: post +title: Optimizing SQL (and DataFrames) in DataFusion, Part 1: Query Optimization Overview +date: 2025-06-15 +author: alamb, akurmustafa +categories: [core] +--- + +<!-- +{% comment %} +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to you under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +{% endcomment %} +--> + + + +*Note: this blog was originally published [on the InfluxData blog](https://www.influxdata.com/blog/optimizing-sql-dataframes-part-one/)* + + +## Introduction + +Sometimes Query Optimizers are seen as a sort of black magic, [“the most +challenging problem in computer +science,”](https://15799.courses.cs.cmu.edu/spring2025/) according to Father +Pavlo, or some behind-the-scenes player. We believe this perception is because: + + +1. One must implement the rest of a database system (data storage, transactions, + SQL parser, expression evaluation, plan execution, etc.) **before** the + optimizer becomes critical[^5]. + +2. Some parts of the optimizer are tightly tied to the rest of the system (e.g., + storage or indexes), so many classic optimizers are described with + system-specific terminology. + +3. Some optimizer tasks, such as access path selection and join order are known + challenges and not yet solved (practically)—maybe they really do require + black magic 🤔. + +However, Query Optimizers are no more complicated in theory or practice than other parts of a database system, as we will argue in a series of posts: + +**Part 1: (this post)**: + +* Review what a Query Optimizer is, what it does, and why you need one for SQL and DataFrames. +* Describe how industrial Query Optimizers are structured and standard optimization classes. + +**Part 2:** + +* Describe the optimization categories with examples and pointers to implementations. +* Describe [Apache DataFusion](https://datafusion.apache.org/)’s rationale and approach to query optimization, specifically for access path and join ordering. + +After reading these blogs, we hope people will use DataFusion to: + +1. Build their own system specific optimizers. +2. Perform practical academic research on optimization (especially researchers + working on new optimizations / join ordering—looking at you [CMU + 15-799](https://15799.courses.cs.cmu.edu/spring2025/), next year). + + +## Query Optimizer Background + +The key pitch for querying databases, and likely the key to the longevity of SQL +(despite people’s love/hate relationship—see [SQL or Death? Seminar Series – +Spring 2025](https://db.cs.cmu.edu/seminar2025/)), is that it disconnects the +`WHAT` you want to compute from the `HOW` to do it. SQL is a *declarative* +language—it describes what answers are desired rather than an *imperative* +language such as Python, where you describe how to do the computation as shown +in Figure 1. + +<img src="/blog/images/optimizing-sql-dataframes/query-execution.png" width="80%" class="img-responsive" alt="Fig 1: Query Execution."/> + +**Figure 1**: Query Execution: Users describe the answer they want using either +a DataFrame or SQL. The query planner or DataFrame API translates that +description into an *Initial Plan*, which is correct but slow. The Query +Optimizer then rewrites the initial plan to an *Optimized Plan*, which computes +the same results but faster and more efficiently. Finally, the Execution Engine +executes the optimized plan producing results. + +## SQL, DataFrames, LogicalPlan Equivalence + +Given their name, it is not surprising that Query Optimizers can improve the +performance of SQL queries. However, it is under-appreciated that this also +applies to DataFrame style APIs. + +Classic DataFrame systems such as [pandas] and [Polars] (by default) execute +eagerly and thus have limited opportunities for optimization. However, more +modern APIs such as [Polar's lazy API], [Apache Spark's DataFrame]. and +[DataFusion's DataFrame] are much faster as they use the design shown in Figure +1 and apply many query optimization techniques. + +[pandas]: https://pandas.pydata.org/ +[Polars]: https://pola.rs/) +[Polar'’'s lazy API]: https://docs.pola.rs/user-guide/lazy/using/ +[Apache Spark's DataFrame]: https://spark.apache.org/docs/latest/sql-programming-guide.html#datasets-and-dataframes), +[DataFusion's DataFrame]: https://datafusion.apache.org/user-guide/dataframe.html + +## Example of Query Optimizer + +This section motivates the value of a Query Optimizer with an example. Let’s say +you have some observations of animal behavior, as illustrated in Table 1. + +<img src="/blog/images/optimizing-sql-dataframes/table1.png" width="75%" class="img-responsive" alt="Table 1: Observational Data."/> + +**Table 1**: Example observational data. + +If the user wants to know the average population for some species in the last +month, a user can write a SQL query or a DataFrame such as the following: + +SQL: + +```sql +SELECT location, AVG(population) +FROM observations +WHERE species = ‘contrarian spider’ AND + observation_time >= now() - interval '1 month' +GROUP BY location +``` + +DataFrame: + +```rust +df.scan("observations") + .filter(col("species").eq("contrarian spider")) + .filter(col("observation_time").ge(now()).sub(interval('1 month'))) + .agg(vec![col(location)], vec![avg(col("population")]) +``` + +Within DataFusion, both the SQL and DataFrame are translated into the same +[`LogicalPlan`] , a “tree of relational operators.” This is a fancy way of +saying data flow graphs where the edges represent tabular data (rows + columns) +and the nodes represent a transformation (see [this DataFusion overview video] +for more details). The initial `LogicalPlan` for the queries above is shown in +Figure 2. + +[`LogicalPlan`]: https://docs.rs/datafusion/latest/datafusion/logical_expr/enum.LogicalPlan.html +[this DataFusion overview video]: https://youtu.be/EzZTLiSJnhY + +<img src="/blog/images/optimizing-sql-dataframes/initial-logical-plan.png" width="72%" class="img-responsive" alt="Fig 2: Initial Logical Plan."/> + +**Figure 2**: Example initial `LogicalPlan` for SQL and DataFrame query. The +plan is read from bottom to top, computing the results in each step. + +The optimizer's job is to take this query plan and rewrite it into an alternate +plan that computes the same results but faster, such as the one shown in Figure +3. + +<img src="/blog/images/optimizing-sql-dataframes/optimized-logical-plan.png" width="80%" class="img-responsive" alt="Fig 3: Optimized Logical Plan."/> + +**Figure 3**: An example optimized plan that computes the same result as the +plan in Figure 2 more efficiently. The diagram highlights where the optimizer +has applied *Projection Pushdown*, *Filter Pushdown*, and *Constant Evaluation*. +Note that this is a simplified example for explanatory purposes, and actual +optimizers such as the one in DataFusion perform additional tasks such as +choosing specific aggregation algorithms. + + +## Query Optimizer Implementation + +Industrial optimizers, such as +DataFusion’s ([source](https://github.com/apache/datafusion/tree/334d6ec50f36659403c96e1bffef4228be7c458e/datafusion/optimizer/src)), +ClickHouse ([source](https://github.com/ClickHouse/ClickHouse/tree/master/src/Analyzer/Passes),[source](https://github.com/ClickHouse/ClickHouse/tree/master/src/Processors/QueryPlan/Optimizations)), Review Comment: nit: add space between two sources in ClickHouse ########## content/blog/2025-06-15-optimizing-sql-dataframes-part-one.md: ########## @@ -0,0 +1,249 @@ +--- +layout: post +title: Optimizing SQL (and DataFrames) in DataFusion, Part 1: Query Optimization Overview +date: 2025-06-15 +author: alamb, akurmustafa +categories: [core] +--- + +<!-- +{% comment %} +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to you under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +{% endcomment %} +--> + + + +*Note: this blog was originally published [on the InfluxData blog](https://www.influxdata.com/blog/optimizing-sql-dataframes-part-one/)* + + +## Introduction + +Sometimes Query Optimizers are seen as a sort of black magic, [“the most +challenging problem in computer +science,”](https://15799.courses.cs.cmu.edu/spring2025/) according to Father +Pavlo, or some behind-the-scenes player. We believe this perception is because: + + +1. One must implement the rest of a database system (data storage, transactions, + SQL parser, expression evaluation, plan execution, etc.) **before** the + optimizer becomes critical[^5]. Review Comment: Link didn't work ########## content/blog/2025-06-15-optimizing-sql-dataframes-part-two.md: ########## @@ -0,0 +1,533 @@ +--- +layout: post +title: Optimizing SQL (and DataFrames) in DataFusion, Part 2: Optimizers in Apache DataFusion +date: 2025-06-15 +author: alamb, akurmustafa +categories: [core] +--- + +<!-- +{% comment %} +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to you under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +{% endcomment %} +--> + +*Note, this blog was originally published [on the InfluxData blog].* + +[on the InfluxData blog]: https://www.influxdata.com/blog/optimizing-sql-dataframes-part-two/ + +In the [first part of this post], we discussed what a Query Optimizer is, what +role it plays, and described how industrial optimizers are organized. In this +second post, we describe various optimizations that are found in [Apache +DataFusion](https://datafusion.apache.org/) and other industrial systems in more +detail. + + +DataFusion contains high quality, full-featured implementations for *Always +Optimizations* and *Engine Specific Optimizations* (defined in Part 1). +Optimizers are implemented as rewrites of `LogicalPlan` in the [logical +optimizer](https://github.com/apache/datafusion/tree/main/datafusion/optimizer) +or rewrites of `ExecutionPlan` in the [physical +optimizer](https://github.com/apache/datafusion/tree/main/datafusion/physical-optimizer). +This design means the same optimizer passes are applied for SQL queries, +DataFrame queries, as well as plans for other query language frontends such as +[InfluxQL](https://github.com/influxdata/influxdb3_core/tree/26a30bf8d6e2b6b3f1dd905c4ec27e3db6e20d5f/iox_query_influxql) +in InfluxDB 3.0, +[PromQL](https://github.com/GreptimeTeam/greptimedb/blob/0bd322a078cae4f128b791475ec91149499de33a/src/query/src/promql/planner.rs#L1) +in [Greptime](https://greptime.com/), and +[vega](https://github.com/vega/vegafusion/tree/dc15c1b9fc7d297f12bea919795d58cda1c88fcf/vegafusion-core/src/planning) +in [VegaFusion](https://vegafusion.io/). + + +[first part of this post]: https://datafusion.apache.org/blog/2025/06/15/optimizing-sql-dataframes-part-one + +## Always Optimizations + +Some optimizations are so important they are found in almost all query engines +and are typically the first implemented as they provide the largest cost / +benefit ratio (and performance is terrible without them). + + +### Predicate/Filter Pushdown + +**Why**: Avoid carrying unneeded *rows *as soon as possible + +**What**: Moves filters “down” in the plan so they run earlier during execution, as shown in Figure 1. + +**Example Implementations**: [DataFusion], [DuckDB], [ClickHouse] + +[DataFusion]: https://github.com/apache/datafusion/blob/main/datafusion/optimizer/src/push_down_filter.rs +[DuckDB]: https://github.com/duckdb/duckdb/blob/main/src/optimizer/filter_pushdown.cpp +[ClickHouse]: https://github.com/ClickHouse/ClickHouse/blob/master/src/Processors/QueryPlan/Optimizations/filterPushDown.cpp + +The earlier data is filtered out in the plan, the less work the rest of the plan +has to do. Most mature databases aggressively use filter pushdown / early +filtering combined with techniques such as partition and storage pruning (e.g. +[Parquet Row Group pruning]) for performance. + +[Parquet Row Group pruning]: https://blog.xiangpeng.systems/posts/parquet-to-arrow/ + +An extreme, and somewhat contrived, is the query + +```sql +SELECT city, COUNT(*) FROM population GROUP BY city HAVING city = 'BOSTON'; +``` + +Semantically, `HAVING` is [evaluated after] `GROUP BY` in SQL. However, computing +the population of all cities and discarding everything except Boston is much +slower than only computing the population for Boston and so most Query +Optimizers will evaluate the filter before the aggregation. + +[evaluated after]: https://www.datacamp.com/tutorial/sql-order-of-execution + +<img src="/blog/images/optimizing-sql-dataframes/filter-pushdown.png" width="80%" class="img-responsive" alt="Fig 1: Filter Pushdown."/> + +**Figure 1**: Filter Pushdown. In (**A**) without filter pushdown, the operator +processes more rows, reducing efficiency. In (**B**) with filter pushdown, the +operator receives fewer rows, resulting in less overall work and leading to a +faster and more efficient query. + + +### Projection Pushdown + +**Why**: Avoid carrying unneeded *columns *as soon as possible + +**What: **Pushes “projection” (keeping only certain columns) earlier in the plan, as shown in Figure 2. + +**Example Implementations: **Implementations: [DataFusion](https://github.com/apache/datafusion/blob/main/datafusion/physical-optimizer/src/projection_pushdown.rs), [DuckDB](https://github.com/duckdb/duckdb/blob/a8a6a080c8809d5d4b3c955e9f113574f6f0bfe0/src/optimizer/pushdown/pushdown_projection.cpp), [ClickHouse](https://github.com/ClickHouse/ClickHouse/blob/master/src/Processors/QueryPlan/Optimizations/optimizeUseNormalProjection.cpp) + +Similarly to the motivation for *Filter Pushdown*, the earlier the plan stops +doing something, the less work it does overall and thus the faster it runs. For +Projection Pushdown, if columns are not needed later in a plan, copying the data +to the output of other operators is unnecessary and the costs of copying can add +up. For example, in Figure 3 of Part 1, the `species` column is only needed to +evaluate the Filter within the scan and `notes` are never used, so it is +unnecessary to copy them through the rest of the plan. + +Projection Pushdown is especially effective and important for column store +databases, where the storage format itself (such as [Apache Parquet]) supports +efficiently reading only a subset of required columns, and is [especially +powerful in combination with filter pushdown]. Projection Pushdown is still +important, but less effective for row oriented formats such as JSON or CSV where +each column in each row must be parsed even if it is not used in the plan. + +[Apache Parquet]: https://parquet.apache.org/ +[especially powerful in combination with filter pushdown]: https://blog.xiangpeng.systems/posts/parquet-pushdown/ + +<img src="/blog/images/optimizing-sql-dataframes/projection-pushdown.png" width="80%" class="img-responsive" alt="Fig 2: Projection Pushdown."/> + +**Figure 2:** In (**A**) without projection pushdown, the operator receives more +columns, reducing efficiency. In (**B**) with projection pushdown, the operator +receives fewer columns, leading to optimized execution. + +### Limit Pushdown + +**Why**: The earlier the plan stops generating data, the less overall work it +does, and some operators have more efficient limited implementations. + +**What: **Pushes limits (maximum row counts) down in a plan as early as possible. + +**Example Implementations:** [DataFusion](https://github.com/apache/datafusion/blob/main/datafusion/optimizer/src/push_down_limit.rs), [DuckDB](https://github.com/duckdb/duckdb/blob/main/src/optimizer/limit_pushdown.cpp), [ClickHouse](https://github.com/ClickHouse/ClickHouse/blob/master/src/Processors/QueryPlan/Optimizations/limitPushDown.cpp), Spark ([Window](https://github.com/apache/spark/blob/7bc8e99cde424c59b98fe915e3fdaaa30beadb76/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushDownThroughWindow.scala) and [Projection](https://github.com/apache/spark/blob/7bc8e99cde424c59b98fe915e3fdaaa30beadb76/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PushProjectionThroughLimit.scala)) + +Often queries have a `LIMIT ` or other clause that allows them to stop generating +results early so the sooner they can stop execution, the more efficiently they +will execute. + +In addition, DataFusion and other systems have more efficient implementations of +some operators that can be used if there is a limit. The classic example is +replacing a full sort + limit with a [TopK] operator that only tracks the top +values using a heap. Similarly, DataFusion’s Parquet reader stops fetching and +opening additional files once the limit has been hit. + +[TopK]: https://docs.rs/datafusion/latest/datafusion/physical_plan/struct.TopK.html + +<img src="/blog/images/optimizing-sql-dataframes/limit-pushdown.png" width="80%" class="img-responsive" alt="Fig 3: Limit Pushdown."/> + +**Figure 3**: In (**A**), without limit pushdown all data is sorted and +everything except the first few rows are discarded. In (**B**), with limit +pushdown, Sort is replaced with TopK operator which does much less work. + + +### Expression Simplification / Constant Folding + +**Why**: Evaluating the same expression for each row when the value doesn’t change is wasteful. + +**What**: Partially evaluates and/or algebraically simplify expressions. + +**Example Implementations:** [DataFusion](https://github.com/apache/datafusion/tree/main/datafusion/optimizer/src/simplify_expressions), DuckDB (has several [rules](https://github.com/duckdb/duckdb/tree/7b18f0f3691c1b6367cf68ed2598d7034e14f41b/src/optimizer/rule) such as [constant folding](https://github.com/duckdb/duckdb/blob/7b18f0f3691c1b6367cf68ed2598d7034e14f41b/src/optimizer/rule/constant_folding.cpp), and [comparison simplification](https://github.com/duckdb/duckdb/blob/7b18f0f3691c1b6367cf68ed2598d7034e14f41b/src/optimizer/rule/comparison_simplification.cpp)), [Spark](https://github.com/apache/spark/blob/7bc8e99cde424c59b98fe915e3fdaaa30beadb76/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala) + +If an expression doesn’t change from row to row, it is better to evaluate the +expression **once** during planning. This is a classic compiler technique and is +also used in database systems + +For example, given a query that finds all values from the current year + +```sql +SELECT … WHERE extract(year from time_column) = extract(year from now()) +``` + +Evaluating `extract(year from now())` on every row is much more expensive than +evaluating it once during planning time so that the query becomes comparison to +a constant + +```sql +SELECT … WHERE extract(year from time_column) = 2025 +``` + +Furthermore, it is often possible to push such predicates **into** scans. + +### Rewriting `OUTER JOIN` → `INNER JOIN` + +**Why:** `INNER JOIN` implementations are almost always faster (as they are +simpler) than `OUTER JOIN` implementations, and `INNER JOIN` s impose fewer +restrictions on other optimizer passes (such as join reordering and additional +filter pushdown). + +**What**: In cases where it is known that NULL rows introduced by an `OUTER +JOIN` will not appear in the results, it can be rewritten to an <code>INNER +JOIN</code>. + +**Example Implementations:** [DataFusion](https://github.com/apache/datafusion/blob/6028474969f0bfead96eb7f413791470afb6bf82/datafusion/optimizer/src/eliminate_outer_join.rs), [Spark](https://github.com/apache/spark/blob/7bc8e99cde424c59b98fe915e3fdaaa30beadb76/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala#L124-L158), [ClickHouse](https://github.com/ClickHouse/ClickHouse/blob/master/src/Processors/QueryPlan/Optimizations/convertOuterJoinToInnerJoin.cpp). + +For example, given a query such as the following + +```SQL +SELECT … +FROM orders LEFT OUTER JOIN customer ON (orders.cid = customer.id) +WHERE customer.last_name = 'Lamb' +``` + +The `LEFT OUTER JOIN` keeps all rows in `orders` that don’t have a matching +customer, but fills in the fields with `null`. All such rows will be filtered +out by `customer.last_name = 'Lamb'`, and thus an INNER JOIN produces the same +answer. This is illustrated in Figure 4. + +<img src="/blog/images/optimizing-sql-dataframes/join-rewrite.png" width="80%" class="img-responsive" alt="Fig 4: Join Rewrite."/> + +**Figure 4**: Rewriting `OUTER JOIN` to `INNER JOIN`. In (A) the original query +contains an `OUTER JOIN` but also a filter on `customer.last_name`, which +filters out all rows that might be introduced by the `OUTER JOIN`. In (B) the +`OUTER JOIN` is converted to inner join, a more efficient implementation can be +used. + + +## Engine Specific Optimizations + +As discussed in Part 1 of this blog, optimizers also contain a set of passes +that are still always good to do, but are closely tied to the specifics of the +query engine. This section describes some common types + +### Subquery Rewrites + +**Why**: Actually implementing subqueries by running a query for each row of the outer query is very expensive. + +**What**: It is possible to rewrite subqueries as joins which often perform much better. + +**Example Implementations:** DataFusion ([one](https://github.com/apache/datafusion/blob/main/datafusion/optimizer/src/decorrelate.rs), [two](https://github.com/apache/datafusion/blob/main/datafusion/optimizer/src/decorrelate_predicate_subquery.rs), [three](https://github.com/apache/datafusion/blob/main/datafusion/optimizer/src/scalar_subquery_to_join.rs)), [Spark](https://github.com/apache/spark/blob/7bc8e99cde424c59b98fe915e3fdaaa30beadb76/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala) + +Evaluating subqueries a row at a time is so expensive that execution engines in +high performance analytic systems such as DataFusion and [Vertica] may not even +support row-at-a-time evaluation given how terrible the performance would be. +Instead, analytic systems rewrite such queries into joins which can perform 100s +or 1000s of times faster for large datasets. However, transforming subqueries to +joins requires “exotic” join semantics such as `SEMI JOIN`, `ANTI JOIN` and +variations on how to treat equality with null[^7]. + +[Vertica]: https://vertica.com/ + +For a simple example, consider that a query like this: + +```sql +SELECT customer.name +FROM customer +WHERE (SELECT sum(value) + FROM orders WHERE + orders.cid = customer.id) > 10; +``` + +Can be rewritten like this: + +```sql +SELECT customer.name +FROM customer +JOIN ( + SELECT customer.id as cid_inner, sum(value) s + FROM orders + GROUP BY customer.id + ) ON (customer.id = cid_inner AND s > 10); +``` + +We don’t have space to detail this transformation or why it is so much faster to +run, but using this and many other transformations allow efficient subquery +evaluation. + +### Optimized Expression Evaluation + +**Why**: The capabilities of expression evaluation vary from system to system. + +**What**: Optimize expression evaluation for the particular execution environment. + +**Example Implementations**: There are many examples of this type of +optimization, including DataFusion’s [Common Subexpression +Elimination](https://github.com/apache/datafusion/blob/main/datafusion/optimizer/src/common_subexpr_eliminate.rs), +[unwrap_cast](https://github.com/apache/datafusion/blob/8f3f70877febaa79be3349875e979d3a6e65c30e/datafusion/optimizer/src/simplify_expressions/unwrap_cast.rs#L70), +and [identifying equality join +predicates](https://github.com/apache/datafusion/blob/main/datafusion/optimizer/src/extract_equijoin_predicate.rs). +DuckDB [rewrites IN +clauses](https://github.com/duckdb/duckdb/blob/main/src/optimizer/in_clause_rewriter.cpp), +and [SUM +expressions](https://github.com/duckdb/duckdb/blob/main/src/optimizer/sum_rewriter.cpp). +Spark also [unwraps casts in binary +comparisons](https://github.com/apache/spark/blob/7bc8e99cde424c59b98fe915e3fdaaa30beadb76/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/UnwrapCastInBinaryComparison.scala), +and [adds special runtime +filters](https://github.com/apache/spark/blob/7bc8e99cde424c59b98fe915e3fdaaa30beadb76/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala). + +To give a specific example of what DataFusion’s common subexpression elimination +does, consider this query that refers to a complex expression multiple times: + +```sql +SELECT date_bin('1 hour', time, '1970-01-01') +FROM table +WHERE date_bin('1 hour', time, '1970-01-01') >= '2025-01-01 00:00:00' +ORDER BY date_bin('1 hour', time, '1970-01-01') +``` + +Evaluating `date_bin('1 hour', time, '1970-01-01')`each time it is encountered +is inefficient compared to calculating its result once, and reusing that result +in when it is encountered again (similar to caching). This reuse is called +*Common Subexpression Elimination*. + +Some execution engines implement this optimization internally to their +expression evaluation engine, but DataFusion represents it explicitly using a +separate Projection plan node, as illustrated in Figure 5. Effectively, the +query above is rewritten to the following + +```sql +SELECT time_chunk +FROM(SELECT date_bin('1 hour', time, '1970-01-01') as time_chunk + FROM table) +WHERE time_chunk >= '2025-01-01 00:00:00' +ORDER BY time_chunk +``` + + +<img src="/blog/images/optimizing-sql-dataframes/common-subexpression-elimination.png" width="80%" class="img-responsive" alt="Fig 5: Common Subquery Elimination."/> + +**Figure 5:** Adding a Projection to evaluate common complex sub expression +decreases complexity for later stages. + + +### Algorithm Selection + +**Why**: Different engines have different specialized operators for certain +operations. + +**What: **Selects specific implementations from the available operators, based +on properties of the query. + +**Example Implementations:** DataFusion’s [EnforceSorting](https://github.com/apache/datafusion/blob/8f3f70877febaa79be3349875e979d3a6e65c30e/datafusion/physical-optimizer/src/enforce_sorting/mod.rs) pass uses sort optimized implementations, Spark’s [rewrite to use a special operator for ASOF joins](https://github.com/apache/spark/blob/7bc8e99cde424c59b98fe915e3fdaaa30beadb76/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteAsOfJoin.scala), and ClickHouse’s[ join algorithm selection ](https://github.com/ClickHouse/ClickHouse/blob/7d15deda4b33282f356bb3e40a190d005acf72f2/src/Interpreters/ExpressionAnalyzer.cpp#L1066-L1080) such as [when to use MergeJoin](https://github.com/ClickHouse/ClickHouse/blob/7d15deda4b33282f356bb3e40a190d005acf72f2/src/Interpreters/ExpressionAnalyzer.cpp#L1022) + +For example, DataFusion uses a `TopK` ([source]) operator rather than a full +`Sort` if there is also a limit on the query. Similarly, it may choose to use the +more efficient `PartialOrdered` grouping operation when the data is sorted on +group keys or a `MergeJoin` + +[source]: https://docs.rs/datafusion/latest/datafusion/physical_plan/struct.TopK.html + +<img src="/blog/images/optimizing-sql-dataframes/specialized-grouping.png" width="80%" class="img-responsive" alt="Fig 6: Specialized Grouping."/> + +**Figure 6: **An example of specialized operation for grouping. In (**A**), input data has no specified ordering and DataFusion uses a hashing-based grouping operator ([source](https://github.com/apache/datafusion/blob/main/datafusion/physical-plan/src/aggregates/row_hash.rs)) to determine distinct groups. In (**B**), when the input data is ordered by the group keys, DataFusion uses a specialized grouping operator ([source](https://github.com/apache/datafusion/tree/main/datafusion/physical-plan/src/aggregates/order)) to find boundaries that separate groups. + + +### Using Statistics Directly + +**Why**: Using pre-computed statistics from a table, without actually reading or +opening files, is much faster than processing data. + +**What**: Replace calculations on data with the value from statistics. + +**Example Implementations:** [DataFusion](https://github.com/apache/datafusion/blob/8f3f70877febaa79be3349875e979d3a6e65c30e/datafusion/physical-optimizer/src/aggregate_statistics.rs), [DuckDB](https://github.com/duckdb/duckdb/blob/main/src/optimizer/statistics_propagator.cpp), + +Some queries, such as the classic `COUNT(*) from my_table` used for data +exploration can be answered using only statistics. Optimizers often have access +to statistics for other reasons (such as Access Path and Join Order Selection) +and statistics are commonly stored in analytic file formats. For example, the +[Metadata] of Apache Parquet files stores `MIN`, `MAX`, and `COUNT` information. + +[Metadata]: https://docs.rs/parquet/latest/parquet/file/metadata/index.html + +<img src="/blog/images/optimizing-sql-dataframes/using-statistics.png" width="80%" class="img-responsive" alt="Fig 7: Using Statistics."/> + +**Figure 7: **When the aggregation result is already stored in the statistics, +the query can be evaluated using the values from statistics without looking at +any compressed data. The optimizer replaces the Aggregation operation with +values from statistics. + +## Access Path and Join Order Selection + + +### Overview + +Last, but certainly not least, are optimizations that choose between plans with +potentially (very) different performance. The major options in this category are + +1. **Join Order:** In what order to combine tables using JOINs? +2. **Access Paths:** Which copy of the data or index should be read to find matching tuples? +3. **[Materialized View]**: Can the query can be rewritten to use a materialized view (partially computed query results)? This topic deserves its own blog (or book) and we don’t discuss further here. + +[Materialized View]: https://en.wikipedia.org/wiki/Materialized_view + +<img src="/blog/images/optimizing-sql-dataframes/access-path-and-join-order.png" width="80%" class="img-responsive" alt="Fig 8: Access Path and Join Order."/> + +**Figure 8:** Access Path and Join Order Selection in Query Optimizers. Optimizers use heuristics to enumerate some subset of potential join orders (shape) and access paths (color). The plan with the smallest estimated cost according to some cost model is chosen. In this case, Plan 2 with a cost of 180,000 is chosen for execution as it has the lowest estimated cost. + +This class of optimizations is a hard problem for at least the following reasons: + +1. **Exponential Search Space**: the number of potential plans increases + exponentially as the number of joins and indexes increases. + +2. **Performance Sensitivity**: Often different plans that are very similar in + structure perform very differently. For example, swapping the input order to + a hash join can result in 1000x or more (yes, a thousand-fold!) run time + differences. + +3. **Cardinality Estimation Errors**: Determining the optimal plan relies on + cardinality estimates (e.g., how many rows will come out of each join). It is a + [known hard problem] to estimate this cardinality, and in practice queries with + as few as 3 joins often have large cardinality estimation errors. + +[known hard problem]: https://www.vldb.org/pvldb/vol9/p204-leis.pdf + +### Heuristics and Cost-Based Optimization + +Industrial optimizers handle these problems using a combination of + +1. **Heuristics:** to prune the search space and avoid considering plans that + are (almost) never good. Examples include considering left-deep trees, or + using `Foreign Key` / `Primary Key` relationships to pick the build size of a + hash join. + +2. **Cost Model**: Given the smaller set of candidate plans, the Optimizer then + estimates their cost and picks the one using the lowest cost. + +For some examples, you can read about [Spark’s cost-based optimizer] or look at Review Comment: Spark link didn't work ########## content/blog/2025-06-15-optimizing-sql-dataframes-part-one.md: ########## @@ -0,0 +1,249 @@ +--- +layout: post +title: Optimizing SQL (and DataFrames) in DataFusion, Part 1: Query Optimization Overview +date: 2025-06-15 +author: alamb, akurmustafa +categories: [core] +--- + +<!-- +{% comment %} +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to you under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +{% endcomment %} +--> + + + +*Note: this blog was originally published [on the InfluxData blog](https://www.influxdata.com/blog/optimizing-sql-dataframes-part-one/)* + + +## Introduction + +Sometimes Query Optimizers are seen as a sort of black magic, [“the most +challenging problem in computer +science,”](https://15799.courses.cs.cmu.edu/spring2025/) according to Father +Pavlo, or some behind-the-scenes player. We believe this perception is because: + + +1. One must implement the rest of a database system (data storage, transactions, + SQL parser, expression evaluation, plan execution, etc.) **before** the + optimizer becomes critical[^5]. + +2. Some parts of the optimizer are tightly tied to the rest of the system (e.g., + storage or indexes), so many classic optimizers are described with + system-specific terminology. + +3. Some optimizer tasks, such as access path selection and join order are known + challenges and not yet solved (practically)—maybe they really do require + black magic 🤔. + +However, Query Optimizers are no more complicated in theory or practice than other parts of a database system, as we will argue in a series of posts: + +**Part 1: (this post)**: + +* Review what a Query Optimizer is, what it does, and why you need one for SQL and DataFrames. +* Describe how industrial Query Optimizers are structured and standard optimization classes. + +**Part 2:** + +* Describe the optimization categories with examples and pointers to implementations. +* Describe [Apache DataFusion](https://datafusion.apache.org/)’s rationale and approach to query optimization, specifically for access path and join ordering. + +After reading these blogs, we hope people will use DataFusion to: + +1. Build their own system specific optimizers. +2. Perform practical academic research on optimization (especially researchers + working on new optimizations / join ordering—looking at you [CMU + 15-799](https://15799.courses.cs.cmu.edu/spring2025/), next year). + + +## Query Optimizer Background + +The key pitch for querying databases, and likely the key to the longevity of SQL +(despite people’s love/hate relationship—see [SQL or Death? Seminar Series – +Spring 2025](https://db.cs.cmu.edu/seminar2025/)), is that it disconnects the +`WHAT` you want to compute from the `HOW` to do it. SQL is a *declarative* +language—it describes what answers are desired rather than an *imperative* +language such as Python, where you describe how to do the computation as shown +in Figure 1. + +<img src="/blog/images/optimizing-sql-dataframes/query-execution.png" width="80%" class="img-responsive" alt="Fig 1: Query Execution."/> + +**Figure 1**: Query Execution: Users describe the answer they want using either +a DataFrame or SQL. The query planner or DataFrame API translates that +description into an *Initial Plan*, which is correct but slow. The Query +Optimizer then rewrites the initial plan to an *Optimized Plan*, which computes +the same results but faster and more efficiently. Finally, the Execution Engine +executes the optimized plan producing results. + +## SQL, DataFrames, LogicalPlan Equivalence + +Given their name, it is not surprising that Query Optimizers can improve the +performance of SQL queries. However, it is under-appreciated that this also +applies to DataFrame style APIs. + +Classic DataFrame systems such as [pandas] and [Polars] (by default) execute +eagerly and thus have limited opportunities for optimization. However, more +modern APIs such as [Polar's lazy API], [Apache Spark's DataFrame]. and +[DataFusion's DataFrame] are much faster as they use the design shown in Figure +1 and apply many query optimization techniques. + +[pandas]: https://pandas.pydata.org/ +[Polars]: https://pola.rs/) +[Polar'’'s lazy API]: https://docs.pola.rs/user-guide/lazy/using/ +[Apache Spark's DataFrame]: https://spark.apache.org/docs/latest/sql-programming-guide.html#datasets-and-dataframes), +[DataFusion's DataFrame]: https://datafusion.apache.org/user-guide/dataframe.html + +## Example of Query Optimizer + +This section motivates the value of a Query Optimizer with an example. Let’s say +you have some observations of animal behavior, as illustrated in Table 1. + +<img src="/blog/images/optimizing-sql-dataframes/table1.png" width="75%" class="img-responsive" alt="Table 1: Observational Data."/> + +**Table 1**: Example observational data. + +If the user wants to know the average population for some species in the last +month, a user can write a SQL query or a DataFrame such as the following: + +SQL: + +```sql +SELECT location, AVG(population) +FROM observations +WHERE species = ‘contrarian spider’ AND + observation_time >= now() - interval '1 month' +GROUP BY location +``` + +DataFrame: + +```rust +df.scan("observations") + .filter(col("species").eq("contrarian spider")) + .filter(col("observation_time").ge(now()).sub(interval('1 month'))) + .agg(vec![col(location)], vec![avg(col("population")]) +``` + +Within DataFusion, both the SQL and DataFrame are translated into the same +[`LogicalPlan`] , a “tree of relational operators.” This is a fancy way of +saying data flow graphs where the edges represent tabular data (rows + columns) +and the nodes represent a transformation (see [this DataFusion overview video] +for more details). The initial `LogicalPlan` for the queries above is shown in +Figure 2. + +[`LogicalPlan`]: https://docs.rs/datafusion/latest/datafusion/logical_expr/enum.LogicalPlan.html +[this DataFusion overview video]: https://youtu.be/EzZTLiSJnhY + +<img src="/blog/images/optimizing-sql-dataframes/initial-logical-plan.png" width="72%" class="img-responsive" alt="Fig 2: Initial Logical Plan."/> + +**Figure 2**: Example initial `LogicalPlan` for SQL and DataFrame query. The +plan is read from bottom to top, computing the results in each step. + +The optimizer's job is to take this query plan and rewrite it into an alternate +plan that computes the same results but faster, such as the one shown in Figure +3. + +<img src="/blog/images/optimizing-sql-dataframes/optimized-logical-plan.png" width="80%" class="img-responsive" alt="Fig 3: Optimized Logical Plan."/> + +**Figure 3**: An example optimized plan that computes the same result as the +plan in Figure 2 more efficiently. The diagram highlights where the optimizer +has applied *Projection Pushdown*, *Filter Pushdown*, and *Constant Evaluation*. +Note that this is a simplified example for explanatory purposes, and actual +optimizers such as the one in DataFusion perform additional tasks such as +choosing specific aggregation algorithms. + + +## Query Optimizer Implementation + +Industrial optimizers, such as +DataFusion’s ([source](https://github.com/apache/datafusion/tree/334d6ec50f36659403c96e1bffef4228be7c458e/datafusion/optimizer/src)), +ClickHouse ([source](https://github.com/ClickHouse/ClickHouse/tree/master/src/Analyzer/Passes),[source](https://github.com/ClickHouse/ClickHouse/tree/master/src/Processors/QueryPlan/Optimizations)), +DuckDB ([source](https://github.com/duckdb/duckdb/tree/4afa85c6a4dacc39524d1649fd8eb8c19c28ad14/src/optimizer)), +and Apache Spark ([source](https://github.com/apache/spark/tree/7bc8e99cde424c59b98fe915e3fdaaa30beadb76/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer)), +are implemented as a series of passes or rules that rewrite a query plan. The +overall optimizer is composed of a sequence of these rules,[^6] as shown in +Figure 4. The specific order of the rules also often matters, but we will not +discuss this detail in this post. + +A multi-pass design is standard because it helps: + +1. Understand, implement, and test each pass in isolation +2. Easily extend the optimizer by adding new passes + +<img src="/blog/images/optimizing-sql-dataframes/optimizer-passes.png" width="80%" class="img-responsive" alt="Fig 4: Query Optimizer Passes."/> + +**Figure 4**: Query Optimizers are implemented as a series of rules that each +rewrite the query plan. Each rule’s algorithm is expressed as a transformation +of a previous plan. + +There are three major classes of optimizations in industrial optimizers: + +1. **Always Optimizations**: These are always good to do and thus are always + applied. This class of optimization includes expression simplification, + predicate pushdown, and limit pushdown. These optimizations are typically + simple in theory, though they require nontrivial amounts of code and tests to + implement in practice. + +2. **Engine Specific Optimizations: **These optimizations take advantage of + specific engine features, such as how expressions are evaluated or what + particular hash or join implementations are available. + +3. **Access Path and Join Order Selection**: These passes choose one access + method per table and a join order for execution, typically using heuristics + and a cost model to make tradeoffs between the options. Databases often have + multiple ways to access the data (e.g., index scan or full-table scan), as + well as many potential orders to combine (join) multiple tables. These + methods compute the same result but can vary drastically in performance. + +This brings us to the end of Part 1. In Part 2, we will explain these classes of +optimizations in more detail and provide examples of how they are implemented in +DataFusion and other systems. + +# About the Authors + +[Andrew Lamb](https://www.linkedin.com/in/andrewalamb/) is a Staff Engineer at +[InfluxData](https://www.influxdata.com/) and an [Apache +DataFusion](https://datafusion.apache.org/) PMC member. A Database Optimizer +connoisseur, he worked on the [Vertica Analytic +Database](https://vldb.org/pvldb/vol5/p1790_andrewlamb_vldb2012.pdf) Query +Optimizer for six years, has several granted US patents related to query +optimization[^1], co-authored several papers[^2] about the topic (including in +VLDB 2024[^3]), and spent several weeks[^4] deeply geeking out about this topic +with other experts (thank you Dagstuhl). + +[Mustafa Akur](https://www.linkedin.com/in/akurmustafa/) is a PhD Student at +[OHSU](https://www.ohsu.edu/) Knight Cancer Institute and an [Apache +DataFusion](https://datafusion.apache.org/) PMC member. He was previously a +Software Developer at [Synnada](https://www.synnada.ai/) where he contributed +significant features to the DataFusion optimizer, including many [sort-based +optimizations](https://datafusion.apache.org/blog/2025/03/11/ordering-analysis/). + + +## Notes + +[^1]: *Modular Query Optimizer, US 8,312,027 · Issued Nov 13, 2012*, Query Optimizer with schema conversion US 8,086,598 · Issued Dec 27, 2011 + +[^2]: [The Vertica Query Optimizer: The case for specialized Query Optimizers](https://www.researchgate.net/publication/269306314_The_Vertica_Query_Optimizer_The_case_for_specialized_query_optimizers) + +[^3]: [https://www.vldb.org/pvldb/vol17/p1350-justen.pdf](https://www.vldb.org/pvldb/vol17/p1350-justen.pdf) + +[^4]: [https://www.dagstuhl.de/en/seminars/seminar-calendar/seminar-details/24101](https://www.dagstuhl.de/en/seminars/seminar-calendar/seminar-details/24101) , [https://www.dagstuhl.de/en/seminars/seminar-calendar/seminar-details/22111](https://www.dagstuhl.de/en/seminars/seminar-calendar/seminar-details/22111) [https://www.dagstuhl.de/en/seminars/seminar-calendar/seminar-details/12321](https://www.dagstuhl.de/en/seminars/seminar-calendar/seminar-details/12321) Review Comment: This style of specifying the links failed to generate these two as foot notes. ########## content/blog/2025-06-15-optimizing-sql-dataframes-part-one.md: ########## @@ -0,0 +1,249 @@ +--- +layout: post +title: Optimizing SQL (and DataFrames) in DataFusion, Part 1: Query Optimization Overview +date: 2025-06-15 +author: alamb, akurmustafa +categories: [core] +--- + +<!-- +{% comment %} +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to you under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +{% endcomment %} +--> + + + +*Note: this blog was originally published [on the InfluxData blog](https://www.influxdata.com/blog/optimizing-sql-dataframes-part-one/)* + + +## Introduction + +Sometimes Query Optimizers are seen as a sort of black magic, [“the most +challenging problem in computer +science,”](https://15799.courses.cs.cmu.edu/spring2025/) according to Father +Pavlo, or some behind-the-scenes player. We believe this perception is because: + + +1. One must implement the rest of a database system (data storage, transactions, + SQL parser, expression evaluation, plan execution, etc.) **before** the + optimizer becomes critical[^5]. + +2. Some parts of the optimizer are tightly tied to the rest of the system (e.g., + storage or indexes), so many classic optimizers are described with + system-specific terminology. + +3. Some optimizer tasks, such as access path selection and join order are known + challenges and not yet solved (practically)—maybe they really do require + black magic 🤔. + +However, Query Optimizers are no more complicated in theory or practice than other parts of a database system, as we will argue in a series of posts: + +**Part 1: (this post)**: + +* Review what a Query Optimizer is, what it does, and why you need one for SQL and DataFrames. +* Describe how industrial Query Optimizers are structured and standard optimization classes. + +**Part 2:** + +* Describe the optimization categories with examples and pointers to implementations. +* Describe [Apache DataFusion](https://datafusion.apache.org/)’s rationale and approach to query optimization, specifically for access path and join ordering. + +After reading these blogs, we hope people will use DataFusion to: + +1. Build their own system specific optimizers. +2. Perform practical academic research on optimization (especially researchers + working on new optimizations / join ordering—looking at you [CMU + 15-799](https://15799.courses.cs.cmu.edu/spring2025/), next year). + + +## Query Optimizer Background + +The key pitch for querying databases, and likely the key to the longevity of SQL +(despite people’s love/hate relationship—see [SQL or Death? Seminar Series – +Spring 2025](https://db.cs.cmu.edu/seminar2025/)), is that it disconnects the +`WHAT` you want to compute from the `HOW` to do it. SQL is a *declarative* +language—it describes what answers are desired rather than an *imperative* +language such as Python, where you describe how to do the computation as shown +in Figure 1. + +<img src="/blog/images/optimizing-sql-dataframes/query-execution.png" width="80%" class="img-responsive" alt="Fig 1: Query Execution."/> + +**Figure 1**: Query Execution: Users describe the answer they want using either +a DataFrame or SQL. The query planner or DataFrame API translates that +description into an *Initial Plan*, which is correct but slow. The Query +Optimizer then rewrites the initial plan to an *Optimized Plan*, which computes +the same results but faster and more efficiently. Finally, the Execution Engine +executes the optimized plan producing results. + +## SQL, DataFrames, LogicalPlan Equivalence + +Given their name, it is not surprising that Query Optimizers can improve the +performance of SQL queries. However, it is under-appreciated that this also +applies to DataFrame style APIs. + +Classic DataFrame systems such as [pandas] and [Polars] (by default) execute +eagerly and thus have limited opportunities for optimization. However, more +modern APIs such as [Polar's lazy API], [Apache Spark's DataFrame]. and +[DataFusion's DataFrame] are much faster as they use the design shown in Figure +1 and apply many query optimization techniques. + +[pandas]: https://pandas.pydata.org/ +[Polars]: https://pola.rs/) +[Polar'’'s lazy API]: https://docs.pola.rs/user-guide/lazy/using/ +[Apache Spark's DataFrame]: https://spark.apache.org/docs/latest/sql-programming-guide.html#datasets-and-dataframes), +[DataFusion's DataFrame]: https://datafusion.apache.org/user-guide/dataframe.html + +## Example of Query Optimizer + +This section motivates the value of a Query Optimizer with an example. Let’s say +you have some observations of animal behavior, as illustrated in Table 1. + +<img src="/blog/images/optimizing-sql-dataframes/table1.png" width="75%" class="img-responsive" alt="Table 1: Observational Data."/> + +**Table 1**: Example observational data. + +If the user wants to know the average population for some species in the last +month, a user can write a SQL query or a DataFrame such as the following: + +SQL: + +```sql +SELECT location, AVG(population) +FROM observations +WHERE species = ‘contrarian spider’ AND + observation_time >= now() - interval '1 month' +GROUP BY location +``` + +DataFrame: + +```rust +df.scan("observations") + .filter(col("species").eq("contrarian spider")) + .filter(col("observation_time").ge(now()).sub(interval('1 month'))) + .agg(vec![col(location)], vec![avg(col("population")]) +``` + +Within DataFusion, both the SQL and DataFrame are translated into the same +[`LogicalPlan`] , a “tree of relational operators.” This is a fancy way of +saying data flow graphs where the edges represent tabular data (rows + columns) +and the nodes represent a transformation (see [this DataFusion overview video] +for more details). The initial `LogicalPlan` for the queries above is shown in +Figure 2. + +[`LogicalPlan`]: https://docs.rs/datafusion/latest/datafusion/logical_expr/enum.LogicalPlan.html +[this DataFusion overview video]: https://youtu.be/EzZTLiSJnhY + +<img src="/blog/images/optimizing-sql-dataframes/initial-logical-plan.png" width="72%" class="img-responsive" alt="Fig 2: Initial Logical Plan."/> + +**Figure 2**: Example initial `LogicalPlan` for SQL and DataFrame query. The +plan is read from bottom to top, computing the results in each step. + +The optimizer's job is to take this query plan and rewrite it into an alternate +plan that computes the same results but faster, such as the one shown in Figure +3. + +<img src="/blog/images/optimizing-sql-dataframes/optimized-logical-plan.png" width="80%" class="img-responsive" alt="Fig 3: Optimized Logical Plan."/> + +**Figure 3**: An example optimized plan that computes the same result as the +plan in Figure 2 more efficiently. The diagram highlights where the optimizer +has applied *Projection Pushdown*, *Filter Pushdown*, and *Constant Evaluation*. +Note that this is a simplified example for explanatory purposes, and actual +optimizers such as the one in DataFusion perform additional tasks such as +choosing specific aggregation algorithms. + + +## Query Optimizer Implementation + +Industrial optimizers, such as +DataFusion’s ([source](https://github.com/apache/datafusion/tree/334d6ec50f36659403c96e1bffef4228be7c458e/datafusion/optimizer/src)), +ClickHouse ([source](https://github.com/ClickHouse/ClickHouse/tree/master/src/Analyzer/Passes),[source](https://github.com/ClickHouse/ClickHouse/tree/master/src/Processors/QueryPlan/Optimizations)), +DuckDB ([source](https://github.com/duckdb/duckdb/tree/4afa85c6a4dacc39524d1649fd8eb8c19c28ad14/src/optimizer)), +and Apache Spark ([source](https://github.com/apache/spark/tree/7bc8e99cde424c59b98fe915e3fdaaa30beadb76/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer)), +are implemented as a series of passes or rules that rewrite a query plan. The +overall optimizer is composed of a sequence of these rules,[^6] as shown in +Figure 4. The specific order of the rules also often matters, but we will not +discuss this detail in this post. + +A multi-pass design is standard because it helps: + +1. Understand, implement, and test each pass in isolation +2. Easily extend the optimizer by adding new passes + +<img src="/blog/images/optimizing-sql-dataframes/optimizer-passes.png" width="80%" class="img-responsive" alt="Fig 4: Query Optimizer Passes."/> + +**Figure 4**: Query Optimizers are implemented as a series of rules that each +rewrite the query plan. Each rule’s algorithm is expressed as a transformation +of a previous plan. + +There are three major classes of optimizations in industrial optimizers: + +1. **Always Optimizations**: These are always good to do and thus are always + applied. This class of optimization includes expression simplification, + predicate pushdown, and limit pushdown. These optimizations are typically + simple in theory, though they require nontrivial amounts of code and tests to + implement in practice. + +2. **Engine Specific Optimizations: **These optimizations take advantage of + specific engine features, such as how expressions are evaluated or what + particular hash or join implementations are available. + +3. **Access Path and Join Order Selection**: These passes choose one access + method per table and a join order for execution, typically using heuristics + and a cost model to make tradeoffs between the options. Databases often have + multiple ways to access the data (e.g., index scan or full-table scan), as + well as many potential orders to combine (join) multiple tables. These + methods compute the same result but can vary drastically in performance. + +This brings us to the end of Part 1. In Part 2, we will explain these classes of +optimizations in more detail and provide examples of how they are implemented in +DataFusion and other systems. + +# About the Authors + +[Andrew Lamb](https://www.linkedin.com/in/andrewalamb/) is a Staff Engineer at +[InfluxData](https://www.influxdata.com/) and an [Apache +DataFusion](https://datafusion.apache.org/) PMC member. A Database Optimizer +connoisseur, he worked on the [Vertica Analytic +Database](https://vldb.org/pvldb/vol5/p1790_andrewlamb_vldb2012.pdf) Query +Optimizer for six years, has several granted US patents related to query +optimization[^1], co-authored several papers[^2] about the topic (including in +VLDB 2024[^3]), and spent several weeks[^4] deeply geeking out about this topic Review Comment: None of the links jump to the footnotes. I'm guessing our generator is different than the one used in the other blog ########## content/blog/2025-06-15-optimizing-sql-dataframes-part-two.md: ########## @@ -0,0 +1,533 @@ +--- +layout: post +title: Optimizing SQL (and DataFrames) in DataFusion, Part 2: Optimizers in Apache DataFusion +date: 2025-06-15 +author: alamb, akurmustafa +categories: [core] +--- + +<!-- +{% comment %} +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to you under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +{% endcomment %} +--> + +*Note, this blog was originally published [on the InfluxData blog].* + +[on the InfluxData blog]: https://www.influxdata.com/blog/optimizing-sql-dataframes-part-two/ + +In the [first part of this post], we discussed what a Query Optimizer is, what +role it plays, and described how industrial optimizers are organized. In this +second post, we describe various optimizations that are found in [Apache +DataFusion](https://datafusion.apache.org/) and other industrial systems in more +detail. + + +DataFusion contains high quality, full-featured implementations for *Always +Optimizations* and *Engine Specific Optimizations* (defined in Part 1). +Optimizers are implemented as rewrites of `LogicalPlan` in the [logical +optimizer](https://github.com/apache/datafusion/tree/main/datafusion/optimizer) +or rewrites of `ExecutionPlan` in the [physical +optimizer](https://github.com/apache/datafusion/tree/main/datafusion/physical-optimizer). +This design means the same optimizer passes are applied for SQL queries, +DataFrame queries, as well as plans for other query language frontends such as +[InfluxQL](https://github.com/influxdata/influxdb3_core/tree/26a30bf8d6e2b6b3f1dd905c4ec27e3db6e20d5f/iox_query_influxql) +in InfluxDB 3.0, +[PromQL](https://github.com/GreptimeTeam/greptimedb/blob/0bd322a078cae4f128b791475ec91149499de33a/src/query/src/promql/planner.rs#L1) +in [Greptime](https://greptime.com/), and +[vega](https://github.com/vega/vegafusion/tree/dc15c1b9fc7d297f12bea919795d58cda1c88fcf/vegafusion-core/src/planning) +in [VegaFusion](https://vegafusion.io/). + + +[first part of this post]: https://datafusion.apache.org/blog/2025/06/15/optimizing-sql-dataframes-part-one + +## Always Optimizations + +Some optimizations are so important they are found in almost all query engines +and are typically the first implemented as they provide the largest cost / +benefit ratio (and performance is terrible without them). + + +### Predicate/Filter Pushdown + +**Why**: Avoid carrying unneeded *rows *as soon as possible + +**What**: Moves filters “down” in the plan so they run earlier during execution, as shown in Figure 1. + +**Example Implementations**: [DataFusion], [DuckDB], [ClickHouse] + +[DataFusion]: https://github.com/apache/datafusion/blob/main/datafusion/optimizer/src/push_down_filter.rs +[DuckDB]: https://github.com/duckdb/duckdb/blob/main/src/optimizer/filter_pushdown.cpp +[ClickHouse]: https://github.com/ClickHouse/ClickHouse/blob/master/src/Processors/QueryPlan/Optimizations/filterPushDown.cpp + +The earlier data is filtered out in the plan, the less work the rest of the plan +has to do. Most mature databases aggressively use filter pushdown / early +filtering combined with techniques such as partition and storage pruning (e.g. +[Parquet Row Group pruning]) for performance. + +[Parquet Row Group pruning]: https://blog.xiangpeng.systems/posts/parquet-to-arrow/ + +An extreme, and somewhat contrived, is the query + +```sql +SELECT city, COUNT(*) FROM population GROUP BY city HAVING city = 'BOSTON'; +``` + +Semantically, `HAVING` is [evaluated after] `GROUP BY` in SQL. However, computing +the population of all cities and discarding everything except Boston is much +slower than only computing the population for Boston and so most Query +Optimizers will evaluate the filter before the aggregation. + +[evaluated after]: https://www.datacamp.com/tutorial/sql-order-of-execution + +<img src="/blog/images/optimizing-sql-dataframes/filter-pushdown.png" width="80%" class="img-responsive" alt="Fig 1: Filter Pushdown."/> + +**Figure 1**: Filter Pushdown. In (**A**) without filter pushdown, the operator +processes more rows, reducing efficiency. In (**B**) with filter pushdown, the +operator receives fewer rows, resulting in less overall work and leading to a +faster and more efficient query. + + +### Projection Pushdown + +**Why**: Avoid carrying unneeded *columns *as soon as possible + +**What: **Pushes “projection” (keeping only certain columns) earlier in the plan, as shown in Figure 2. + +**Example Implementations: **Implementations: [DataFusion](https://github.com/apache/datafusion/blob/main/datafusion/physical-optimizer/src/projection_pushdown.rs), [DuckDB](https://github.com/duckdb/duckdb/blob/a8a6a080c8809d5d4b3c955e9f113574f6f0bfe0/src/optimizer/pushdown/pushdown_projection.cpp), [ClickHouse](https://github.com/ClickHouse/ClickHouse/blob/master/src/Processors/QueryPlan/Optimizations/optimizeUseNormalProjection.cpp) + +Similarly to the motivation for *Filter Pushdown*, the earlier the plan stops +doing something, the less work it does overall and thus the faster it runs. For +Projection Pushdown, if columns are not needed later in a plan, copying the data +to the output of other operators is unnecessary and the costs of copying can add +up. For example, in Figure 3 of Part 1, the `species` column is only needed to +evaluate the Filter within the scan and `notes` are never used, so it is +unnecessary to copy them through the rest of the plan. + +Projection Pushdown is especially effective and important for column store +databases, where the storage format itself (such as [Apache Parquet]) supports +efficiently reading only a subset of required columns, and is [especially +powerful in combination with filter pushdown]. Projection Pushdown is still +important, but less effective for row oriented formats such as JSON or CSV where +each column in each row must be parsed even if it is not used in the plan. + +[Apache Parquet]: https://parquet.apache.org/ +[especially powerful in combination with filter pushdown]: https://blog.xiangpeng.systems/posts/parquet-pushdown/ + +<img src="/blog/images/optimizing-sql-dataframes/projection-pushdown.png" width="80%" class="img-responsive" alt="Fig 2: Projection Pushdown."/> + +**Figure 2:** In (**A**) without projection pushdown, the operator receives more +columns, reducing efficiency. In (**B**) with projection pushdown, the operator +receives fewer columns, leading to optimized execution. + +### Limit Pushdown + +**Why**: The earlier the plan stops generating data, the less overall work it +does, and some operators have more efficient limited implementations. + +**What: **Pushes limits (maximum row counts) down in a plan as early as possible. + +**Example Implementations:** [DataFusion](https://github.com/apache/datafusion/blob/main/datafusion/optimizer/src/push_down_limit.rs), [DuckDB](https://github.com/duckdb/duckdb/blob/main/src/optimizer/limit_pushdown.cpp), [ClickHouse](https://github.com/ClickHouse/ClickHouse/blob/master/src/Processors/QueryPlan/Optimizations/limitPushDown.cpp), Spark ([Window](https://github.com/apache/spark/blob/7bc8e99cde424c59b98fe915e3fdaaa30beadb76/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushDownThroughWindow.scala) and [Projection](https://github.com/apache/spark/blob/7bc8e99cde424c59b98fe915e3fdaaa30beadb76/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PushProjectionThroughLimit.scala)) + +Often queries have a `LIMIT ` or other clause that allows them to stop generating +results early so the sooner they can stop execution, the more efficiently they +will execute. + +In addition, DataFusion and other systems have more efficient implementations of +some operators that can be used if there is a limit. The classic example is +replacing a full sort + limit with a [TopK] operator that only tracks the top +values using a heap. Similarly, DataFusion’s Parquet reader stops fetching and +opening additional files once the limit has been hit. + +[TopK]: https://docs.rs/datafusion/latest/datafusion/physical_plan/struct.TopK.html + +<img src="/blog/images/optimizing-sql-dataframes/limit-pushdown.png" width="80%" class="img-responsive" alt="Fig 3: Limit Pushdown."/> + +**Figure 3**: In (**A**), without limit pushdown all data is sorted and +everything except the first few rows are discarded. In (**B**), with limit +pushdown, Sort is replaced with TopK operator which does much less work. + + +### Expression Simplification / Constant Folding + +**Why**: Evaluating the same expression for each row when the value doesn’t change is wasteful. + +**What**: Partially evaluates and/or algebraically simplify expressions. + +**Example Implementations:** [DataFusion](https://github.com/apache/datafusion/tree/main/datafusion/optimizer/src/simplify_expressions), DuckDB (has several [rules](https://github.com/duckdb/duckdb/tree/7b18f0f3691c1b6367cf68ed2598d7034e14f41b/src/optimizer/rule) such as [constant folding](https://github.com/duckdb/duckdb/blob/7b18f0f3691c1b6367cf68ed2598d7034e14f41b/src/optimizer/rule/constant_folding.cpp), and [comparison simplification](https://github.com/duckdb/duckdb/blob/7b18f0f3691c1b6367cf68ed2598d7034e14f41b/src/optimizer/rule/comparison_simplification.cpp)), [Spark](https://github.com/apache/spark/blob/7bc8e99cde424c59b98fe915e3fdaaa30beadb76/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala) + +If an expression doesn’t change from row to row, it is better to evaluate the +expression **once** during planning. This is a classic compiler technique and is +also used in database systems + +For example, given a query that finds all values from the current year + +```sql +SELECT … WHERE extract(year from time_column) = extract(year from now()) +``` + +Evaluating `extract(year from now())` on every row is much more expensive than +evaluating it once during planning time so that the query becomes comparison to +a constant + +```sql +SELECT … WHERE extract(year from time_column) = 2025 +``` + +Furthermore, it is often possible to push such predicates **into** scans. + +### Rewriting `OUTER JOIN` → `INNER JOIN` + +**Why:** `INNER JOIN` implementations are almost always faster (as they are +simpler) than `OUTER JOIN` implementations, and `INNER JOIN` s impose fewer +restrictions on other optimizer passes (such as join reordering and additional +filter pushdown). + +**What**: In cases where it is known that NULL rows introduced by an `OUTER +JOIN` will not appear in the results, it can be rewritten to an <code>INNER +JOIN</code>. + +**Example Implementations:** [DataFusion](https://github.com/apache/datafusion/blob/6028474969f0bfead96eb7f413791470afb6bf82/datafusion/optimizer/src/eliminate_outer_join.rs), [Spark](https://github.com/apache/spark/blob/7bc8e99cde424c59b98fe915e3fdaaa30beadb76/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala#L124-L158), [ClickHouse](https://github.com/ClickHouse/ClickHouse/blob/master/src/Processors/QueryPlan/Optimizations/convertOuterJoinToInnerJoin.cpp). + +For example, given a query such as the following + +```SQL +SELECT … +FROM orders LEFT OUTER JOIN customer ON (orders.cid = customer.id) +WHERE customer.last_name = 'Lamb' +``` + +The `LEFT OUTER JOIN` keeps all rows in `orders` that don’t have a matching +customer, but fills in the fields with `null`. All such rows will be filtered +out by `customer.last_name = 'Lamb'`, and thus an INNER JOIN produces the same +answer. This is illustrated in Figure 4. + +<img src="/blog/images/optimizing-sql-dataframes/join-rewrite.png" width="80%" class="img-responsive" alt="Fig 4: Join Rewrite."/> + +**Figure 4**: Rewriting `OUTER JOIN` to `INNER JOIN`. In (A) the original query +contains an `OUTER JOIN` but also a filter on `customer.last_name`, which +filters out all rows that might be introduced by the `OUTER JOIN`. In (B) the +`OUTER JOIN` is converted to inner join, a more efficient implementation can be +used. + + +## Engine Specific Optimizations + +As discussed in Part 1 of this blog, optimizers also contain a set of passes +that are still always good to do, but are closely tied to the specifics of the +query engine. This section describes some common types + +### Subquery Rewrites + +**Why**: Actually implementing subqueries by running a query for each row of the outer query is very expensive. + +**What**: It is possible to rewrite subqueries as joins which often perform much better. + +**Example Implementations:** DataFusion ([one](https://github.com/apache/datafusion/blob/main/datafusion/optimizer/src/decorrelate.rs), [two](https://github.com/apache/datafusion/blob/main/datafusion/optimizer/src/decorrelate_predicate_subquery.rs), [three](https://github.com/apache/datafusion/blob/main/datafusion/optimizer/src/scalar_subquery_to_join.rs)), [Spark](https://github.com/apache/spark/blob/7bc8e99cde424c59b98fe915e3fdaaa30beadb76/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala) + +Evaluating subqueries a row at a time is so expensive that execution engines in +high performance analytic systems such as DataFusion and [Vertica] may not even +support row-at-a-time evaluation given how terrible the performance would be. +Instead, analytic systems rewrite such queries into joins which can perform 100s +or 1000s of times faster for large datasets. However, transforming subqueries to +joins requires “exotic” join semantics such as `SEMI JOIN`, `ANTI JOIN` and +variations on how to treat equality with null[^7]. + +[Vertica]: https://vertica.com/ + +For a simple example, consider that a query like this: + +```sql +SELECT customer.name +FROM customer +WHERE (SELECT sum(value) + FROM orders WHERE + orders.cid = customer.id) > 10; +``` + +Can be rewritten like this: + +```sql +SELECT customer.name +FROM customer +JOIN ( + SELECT customer.id as cid_inner, sum(value) s + FROM orders + GROUP BY customer.id + ) ON (customer.id = cid_inner AND s > 10); +``` + +We don’t have space to detail this transformation or why it is so much faster to +run, but using this and many other transformations allow efficient subquery +evaluation. + +### Optimized Expression Evaluation + +**Why**: The capabilities of expression evaluation vary from system to system. + +**What**: Optimize expression evaluation for the particular execution environment. + +**Example Implementations**: There are many examples of this type of +optimization, including DataFusion’s [Common Subexpression +Elimination](https://github.com/apache/datafusion/blob/main/datafusion/optimizer/src/common_subexpr_eliminate.rs), +[unwrap_cast](https://github.com/apache/datafusion/blob/8f3f70877febaa79be3349875e979d3a6e65c30e/datafusion/optimizer/src/simplify_expressions/unwrap_cast.rs#L70), +and [identifying equality join +predicates](https://github.com/apache/datafusion/blob/main/datafusion/optimizer/src/extract_equijoin_predicate.rs). +DuckDB [rewrites IN +clauses](https://github.com/duckdb/duckdb/blob/main/src/optimizer/in_clause_rewriter.cpp), +and [SUM +expressions](https://github.com/duckdb/duckdb/blob/main/src/optimizer/sum_rewriter.cpp). +Spark also [unwraps casts in binary +comparisons](https://github.com/apache/spark/blob/7bc8e99cde424c59b98fe915e3fdaaa30beadb76/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/UnwrapCastInBinaryComparison.scala), +and [adds special runtime +filters](https://github.com/apache/spark/blob/7bc8e99cde424c59b98fe915e3fdaaa30beadb76/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala). + +To give a specific example of what DataFusion’s common subexpression elimination +does, consider this query that refers to a complex expression multiple times: + +```sql +SELECT date_bin('1 hour', time, '1970-01-01') +FROM table +WHERE date_bin('1 hour', time, '1970-01-01') >= '2025-01-01 00:00:00' +ORDER BY date_bin('1 hour', time, '1970-01-01') +``` + +Evaluating `date_bin('1 hour', time, '1970-01-01')`each time it is encountered +is inefficient compared to calculating its result once, and reusing that result +in when it is encountered again (similar to caching). This reuse is called +*Common Subexpression Elimination*. + +Some execution engines implement this optimization internally to their +expression evaluation engine, but DataFusion represents it explicitly using a +separate Projection plan node, as illustrated in Figure 5. Effectively, the +query above is rewritten to the following + +```sql +SELECT time_chunk +FROM(SELECT date_bin('1 hour', time, '1970-01-01') as time_chunk + FROM table) +WHERE time_chunk >= '2025-01-01 00:00:00' +ORDER BY time_chunk +``` + + +<img src="/blog/images/optimizing-sql-dataframes/common-subexpression-elimination.png" width="80%" class="img-responsive" alt="Fig 5: Common Subquery Elimination."/> + +**Figure 5:** Adding a Projection to evaluate common complex sub expression +decreases complexity for later stages. + + +### Algorithm Selection + +**Why**: Different engines have different specialized operators for certain +operations. + +**What: **Selects specific implementations from the available operators, based +on properties of the query. + +**Example Implementations:** DataFusion’s [EnforceSorting](https://github.com/apache/datafusion/blob/8f3f70877febaa79be3349875e979d3a6e65c30e/datafusion/physical-optimizer/src/enforce_sorting/mod.rs) pass uses sort optimized implementations, Spark’s [rewrite to use a special operator for ASOF joins](https://github.com/apache/spark/blob/7bc8e99cde424c59b98fe915e3fdaaa30beadb76/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteAsOfJoin.scala), and ClickHouse’s[ join algorithm selection ](https://github.com/ClickHouse/ClickHouse/blob/7d15deda4b33282f356bb3e40a190d005acf72f2/src/Interpreters/ExpressionAnalyzer.cpp#L1066-L1080) such as [when to use MergeJoin](https://github.com/ClickHouse/ClickHouse/blob/7d15deda4b33282f356bb3e40a190d005acf72f2/src/Interpreters/ExpressionAnalyzer.cpp#L1022) + +For example, DataFusion uses a `TopK` ([source]) operator rather than a full +`Sort` if there is also a limit on the query. Similarly, it may choose to use the +more efficient `PartialOrdered` grouping operation when the data is sorted on +group keys or a `MergeJoin` + +[source]: https://docs.rs/datafusion/latest/datafusion/physical_plan/struct.TopK.html + +<img src="/blog/images/optimizing-sql-dataframes/specialized-grouping.png" width="80%" class="img-responsive" alt="Fig 6: Specialized Grouping."/> + +**Figure 6: **An example of specialized operation for grouping. In (**A**), input data has no specified ordering and DataFusion uses a hashing-based grouping operator ([source](https://github.com/apache/datafusion/blob/main/datafusion/physical-plan/src/aggregates/row_hash.rs)) to determine distinct groups. In (**B**), when the input data is ordered by the group keys, DataFusion uses a specialized grouping operator ([source](https://github.com/apache/datafusion/tree/main/datafusion/physical-plan/src/aggregates/order)) to find boundaries that separate groups. + + +### Using Statistics Directly + +**Why**: Using pre-computed statistics from a table, without actually reading or +opening files, is much faster than processing data. + +**What**: Replace calculations on data with the value from statistics. + +**Example Implementations:** [DataFusion](https://github.com/apache/datafusion/blob/8f3f70877febaa79be3349875e979d3a6e65c30e/datafusion/physical-optimizer/src/aggregate_statistics.rs), [DuckDB](https://github.com/duckdb/duckdb/blob/main/src/optimizer/statistics_propagator.cpp), + +Some queries, such as the classic `COUNT(*) from my_table` used for data +exploration can be answered using only statistics. Optimizers often have access +to statistics for other reasons (such as Access Path and Join Order Selection) +and statistics are commonly stored in analytic file formats. For example, the +[Metadata] of Apache Parquet files stores `MIN`, `MAX`, and `COUNT` information. + +[Metadata]: https://docs.rs/parquet/latest/parquet/file/metadata/index.html + +<img src="/blog/images/optimizing-sql-dataframes/using-statistics.png" width="80%" class="img-responsive" alt="Fig 7: Using Statistics."/> + +**Figure 7: **When the aggregation result is already stored in the statistics, +the query can be evaluated using the values from statistics without looking at +any compressed data. The optimizer replaces the Aggregation operation with +values from statistics. + +## Access Path and Join Order Selection + + +### Overview + +Last, but certainly not least, are optimizations that choose between plans with +potentially (very) different performance. The major options in this category are + +1. **Join Order:** In what order to combine tables using JOINs? +2. **Access Paths:** Which copy of the data or index should be read to find matching tuples? +3. **[Materialized View]**: Can the query can be rewritten to use a materialized view (partially computed query results)? This topic deserves its own blog (or book) and we don’t discuss further here. + +[Materialized View]: https://en.wikipedia.org/wiki/Materialized_view + +<img src="/blog/images/optimizing-sql-dataframes/access-path-and-join-order.png" width="80%" class="img-responsive" alt="Fig 8: Access Path and Join Order."/> + +**Figure 8:** Access Path and Join Order Selection in Query Optimizers. Optimizers use heuristics to enumerate some subset of potential join orders (shape) and access paths (color). The plan with the smallest estimated cost according to some cost model is chosen. In this case, Plan 2 with a cost of 180,000 is chosen for execution as it has the lowest estimated cost. + +This class of optimizations is a hard problem for at least the following reasons: + +1. **Exponential Search Space**: the number of potential plans increases + exponentially as the number of joins and indexes increases. + +2. **Performance Sensitivity**: Often different plans that are very similar in + structure perform very differently. For example, swapping the input order to + a hash join can result in 1000x or more (yes, a thousand-fold!) run time + differences. + +3. **Cardinality Estimation Errors**: Determining the optimal plan relies on + cardinality estimates (e.g., how many rows will come out of each join). It is a + [known hard problem] to estimate this cardinality, and in practice queries with + as few as 3 joins often have large cardinality estimation errors. + +[known hard problem]: https://www.vldb.org/pvldb/vol9/p204-leis.pdf + +### Heuristics and Cost-Based Optimization + +Industrial optimizers handle these problems using a combination of + +1. **Heuristics:** to prune the search space and avoid considering plans that + are (almost) never good. Examples include considering left-deep trees, or + using `Foreign Key` / `Primary Key` relationships to pick the build size of a + hash join. + +2. **Cost Model**: Given the smaller set of candidate plans, the Optimizer then + estimates their cost and picks the one using the lowest cost. + +For some examples, you can read about [Spark’s cost-based optimizer] or look at +the code for [DataFusion’s join selection] and [DuckDB’s cost model] and [join +order enumeration]. + +[Spark’s cost based optimizer]: https://docs.databricks.com/aws/en/optimizations/cbo +[DataFusion’s join selection]: https://github.com/apache/datafusion/blob/main/datafusion/physical-optimizer/src/join_selection.rs +[DuckDB’s cost model]: https://github.com/duckdb/duckdb/blob/main/src/optimizer/join_order/cost_model.cpp +[join order enumeration]: https://github.com/duckdb/duckdb/blob/84c87b12fa9554a8775dc243b4d0afd5b407321a/src/optimizer/join_order/plan_enumerator.cpp#L469-L472 + +However, the use of heuristics and (imprecise) cost models means optimizers must + + +1. **Make deep assumptions about the execution environment: **For example the + heuristics often include assumptions that joins implement [sideways information + passing (RuntimeFilters)], or that Join operators always preserve a particular + input's order. + +2. **Use one particular objective function: **There are almost always trade-offs + between desirable plan properties, such as execution speed, memory use, and + robustness in the face of cardinality estimation. Industrial optimizers + typically have one cost function which attempts to balance between the + properties or a series of hard to use indirect tuning knobs to control the + behavior. + +3. **Require statistics**: Typically cost models require up-to-date statistics, + which can be expensive to compute, must be kept up to date as new data + arrives, and often have trouble capturing the non-uniformity of real world + datasets + +[sideways information passing (RuntimeFilters)]: (https://www.alibabacloud.com/blog/alibaba-cloud-analyticdb-for-mysql-create-ultimate-runtimefilter-capability_600228 + + +### Join Ordering in DataFusion + +DataFusion purposely does not include a sophisticated cost based optimizer. +Instead, keeping with its [design goals] it provides a reasonable default +implementation along with extension points to customize behavior. + +[design goals]: https://docs.rs/datafusion/latest/datafusion/#design-goals + +Specifically, DataFusion includes + +1. “Syntactic Optimizer” (joins in the order they are listed in the query[^8]) with basic join re-ordering ([source](https://github.com/apache/datafusion/blob/main/datafusion/physical-optimizer/src/join_selection.rs)) to prevent join disasters. Review Comment: Link ^8 didn't work -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org