akurmustafa commented on code in PR #58: URL: https://github.com/apache/datafusion-site/pull/58#discussion_r1987767954
########## content/blog/2025-03-05-ordering-analysis.md: ########## @@ -0,0 +1,353 @@ +--- +layout: post +title: Analysis of Ordering for Better Plans +date: 2025-03-05 +author: Mustafa Akur, Andrew Lamb +categories: [tutorial] +--- + +<!-- +{% comment %} +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to you under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +{% endcomment %} +--> + +<!-- see https://github.com/apache/datafusion/issues/11631 for details --> + +## Introduction +In this blog post, we will explore how to determine whether an ordering requirement of an operator is satisfied by its input data. This analysis is essential for order-based optimizations and is often more complex than one might initially think. +<blockquote style="border-left: 4px solid #007bff; padding: 10px; background-color: #f8f9fa;"> + <strong>Ordering Requirement</strong> for an operator refers to the condition that input data must be sorted in a certain way for the operator to function as intended. If this condition is not met, the operator may not perform as expected. It is the job of the planner to make sure that the requirements - such as specific ordering, specific distribution, etc. - of all operators are satisfied during execution. +</blockquote> + +There are various use cases, where this type of analysis can be useful. +### Removing Unnecessary Sorts +Imagine a user wants to execute the following query: +```SQL +SELECT hostname, log_line +FROM telemetry ORDER BY time ASC limit 10 +``` +If we don't know anything about the `telemetry` table, we need to sort it by `time ASC` and then retrieve the first 10 rows to get the correct result. However, if the table is already ordered by `time ASC`, simply retrieving the first 10 rows is sufficient. This approach executes much faster and uses less memory compared to the first version. + +The key is that the query optimizer needs to know the data is already sorted. For simple queries that is likely simple, but it gets complicated fast, like for example, what if your data is sorted by `[hostname, time ASC]` and your query is +```sql +SELECT hostname, log_line +FROM telemetry WHERE hostname = 'app.example.com' ORDER BY time ASC; +``` +In this case, the system still doesn't have to do any sorting -- but only if it has enough analysis to be able to reason about the sortedness of the stream when we know `hostname` has a single value. + +### Optimizing Execution Modes Using Ordering Information +As another use case, some operators can utilize the ordering information to change its underlying algorithm to execute more efficiently. Consider the following query: +```SQL +SELECT COUNT(log_line) +FROM telemetry GROUP BY hostname; +``` +when `telemetry` is sorted by `hostname`, aggregation doesn't need to hash the entire data at its input. It can use a much more efficient algorithm for grouping the data according to the `hostname` values. Failure to detect the ordering can result in choosing the sub-optimal algorithm variant for the operator. To see this in practice, check out the [source](https://github.com/apache/datafusion/tree/main/datafusion/physical-plan/src/aggregates/order) for ordered variant of the `Aggregation` in `Datafusion`. + +### Streaming-Friendly Execution + +Stream processing aims to produce results immediately as they become available, ensuring minimal latency for real-time workloads. However, some operators need to consume all input data before producing any output. Consider the `Sort` operation: before it can start generating output, the algorithm must first process all input data. As a result, data flow halts whenever such an operator is encountered until all input is consumed. When a physical query plan contains such an operator (`Sort`, `CrossJoin`, ..), we refer to this as query pipeline breaking, meaning the query cannot be executed in a streaming fashion. + +For a query to be executed in a streaming fashion, we need to satisfy 2 conditions: + +**Logically Streamable** +It should be possible to generate what user wants in streaming fashion. Consider following query: + +```SQL +SELECT SUM(amount) +FROM orders +``` +Here, the user wants to compute the sum of all amounts in the orders table. By nature, this query requires scanning the entire table to generate a result, making it impossible to execute in a streaming fashion. + +**Planner should be smart** +Being logically streamable does not guarantee that a query will execute in a streaming fashion. SQL is a declarative language, meaning it specifies 'WHAT' user wants. It is up to the planner, 'HOW' to generate the result. In most cases, there are many ways to compute the correct result for a given query. The query planner is responsible for choosing "a way" (ideally the best<sup id="optimal1">[*](#optimal)</sup> one) among the all alternatives to generate what user asks for. If a plan contains a pipeline-breaking operator, the execution will not be streaming—even if the query is logically streamable. To generate truly streaming plans from logically streamable queries, the planner must carefully analyze the existing orderings in the source tables to ensure that the final plan does not contain any pipeline-breaking operators. + + +## Analysis +Let's start by creating an example table that we will refer throughout the post. This table models the input data of an operator for the analysis: + +### Example Virtual Table + +<style> + table { + border-collapse: collapse; + width: 80%; + font-family: -apple-system, BlinkMacSystemFont, "Segoe UI", Roboto, Helvetica, Arial, sans-serif; + } + th, td { + padding: 12px 16px; + text-align: left; + border-bottom: 1px solid #e0e0e0; + } + th { + background-color: #f9f9f9; + font-weight: 600; + } + tr:hover { + background-color: #f1f1f1; + } +</style> + +<table> + <tr> + <th>amount</th> <th>price</th> <th>hostname</th><th>currency</th><th>time_bin</th> <th>time</th> <th>price_cloned</th> <th>time_cloned</th> + </tr> + <tr> + <td>12</td> <td>25</td> <td>app.example.com</td> <td>USD</td> <td>08:00:00</td> <td>08:01:30</td> <td>25</td> <td>08:01:30</td> + </tr> + <tr> + <td>12</td> <td>26</td> <td>app.example.com</td> <td>USD</td> <td>08:00:00</td> <td>08:11:30</td> <td>26</td> <td>08:11:30</td> + </tr> + <tr> + <td>15</td> <td>30</td> <td>app.example.com</td> <td>USD</td> <td>08:00:00</td> <td>08:41:30</td> <td>30</td> <td>08:41:30</td> + </tr> + <tr> + <td>15</td> <td>32</td> <td>app.example.com</td> <td>USD</td> <td>08:00:00</td> <td>08:55:15</td> <td>32</td> <td>08:55:15</td> + </tr> + <tr> + <td>15</td> <td>35</td> <td>app.example.com</td> <td>USD</td> <td>09:00:00</td> <td>09:10:23</td> <td>35</td> <td>09:10:23</td> + </tr> + <tr> + <td>20</td> <td>18</td> <td>app.example.com</td> <td>USD</td> <td>09:00:00</td> <td>09:20:33</td> <td>18</td> <td>09:20:33</td> + </tr> + <tr> + <td>20</td> <td>22</td> <td>app.example.com</td> <td>USD</td> <td>09:00:00</td> <td>09:40:15</td> <td>22</td> <td>09:40:15</td> + </tr> +</table> + +<br> + +A naive approach for analysing whether the ordering requirement of an operator is satisfied by its input would be: + + - Store all the valid ordering expressions that the tables satisfies + - Check whether the ordering requirement by the operator is among valid orderings. + +This naive algorithm works and correct. However, listing all valid orderings can be quite lenghy and not scalable. As an example, for the example table following orderings are all valid (This is only a small subset of all valid orderings). + +`[amount ASC]` +`[amount ASC, price ASC]` +`[amount ASC, price_cloned ASC]` +`[hostname ASC, amount ASC, price_cloned ASC]` +`[amount ASC, hostname ASC, price_cloned ASC]` +`[amount ASC, price_cloned ASC, hostname ASC]` +. +. +. + +As can be seen from the listing above. Storing all of the valid orderings is wasteful, and contains lots of redundancy. Some of the problems in this approach are: + + +- Storing prefix of another valid ordering is redundant. If the table satisfies lexicographic ordering<sup id="fn1">[1](#footnote1)</sup>: `[amount ASC, price ASC]`, it already satisfies ordering `[amount ASC]` trivially. Hence, once we store `[amount ASC, price ASC]` we do not need to store `[amount ASC]` separately. + +- Using all columns that are equal to each other in the listings is redundant. If we know that ordering `[amount ASC, price ASC]` is satisfied by the table, table also satisfies `[amount ASC, price_cloned ASC]` since `price` and `price_cloned` are copy of each other. It is enough to use just one expression among the expressions that exact copy of each other. + +- Constant expressions can be inserted into any place inside valid ordering with an arbitrary direction (e.g. `ASC`, `DESC`). Hence, If ordering `[amount ASC, price ASC]` is valid, orderings: <br> + `[hostname ASC, amount ASC, price ASC]`, + `[hostname DESC, amount ASC, price ASC]`, + `[amount ASC, hostname ASC, price ASC]`, + . + . + +are all also valid. This is clearly redundant. For this reason, it is better to not use any constant expression during existing ordering construction. + +In summary, + +- We should only use the longest lexicographic ordering as a valid ordering (shouldn't use any prefix of it) +- Using all of the expressions that are exact copy of each other is redundant. +- Ordering expressions shouldn't contain any constant expression. + + +## Key Concepts for Analyzing Orderings +To solve the shortcomings above, we need to keep track of following properties for the table: + +- Constant Expresssions +- Equivalent Expression Groups (will be explained shortly) +- Succinct Valid Orderings (will be explained shortly) + +<blockquote style="border-left: 4px solid #007bff; padding: 10px; background-color: #f8f9fa;"> + <strong>Note:</strong> These propeties are implemented in the <code>EquivalenceProperties</code> structure in <code>DataFusion</code>, please see the <a href="https://github.com/apache/datafusion/blob/f47ea73b87eec4af044f9b9923baf042682615b2/datafusion/physical-expr/src/equivalence/properties/mod.rs#L134"; target="_blank">source</a> for more details<br> +</blockquote> + +These properties allow us to analyze whether the ordering requirement is satisfied by the data already. + +### 1. Constant Expressions +Constant expressions are those where each row in the expression has the same value across all rows. Although constant expressions may seem odd in a table, they can arise after operations like `Filter` or `Join`. + +For instance in the example table: + +- Columns `hostname` and `currency` are constant because every row in the table has the same value ('app.example.com' for 'hostname', and 'USD' for 'currency') for these columns. + +<blockquote style="border-left: 4px solid #007bff; padding: 10px; background-color: #f8f9fa;"> + <strong>Note:</strong> Constant expressions can arise during the query execution, in following query:<br> + <code>SELECT hostname FROM logs</code><br><code>WHERE hostname='app.example.com'</code> <br> + after filtering is done, for subsequent operators 'hostname' column will be constant. +</blockquote> + +### 2. Equivalent Expression Groups +Equivalent expression groups are expressions that always hold the same value across rows. These expressions can be thought of as clones of each other and may arise from operations like `Filter`, `Join`, or `Projection`. + +In the example table, the expressions `price` and `price_cloned` form one equivalence group, and `time` and `time_cloned` form another equivalence group. + +<blockquote style="border-left: 4px solid #007bff; padding: 10px; background-color: #f8f9fa;"> + <strong>Note:</strong>Equivalent expression groups can arise during the query execution, in the following query:<br> + <code>SELECT time, time as time_cloned FROM logs</code> <br> + after the projection is done, for subsequent operators 'time' and 'time_cloned' will form an equivalence group. As another example, in the following query:<br> + <code>SELECT employees.id, employees.name, departments.department_name</code> +<code>FROM employees</code> +<code>JOIN departments ON employees.department_id = departments.id;</code> <br> +after joining, 'employees.department_id' and 'departments.id' will form an equivalence group. +</blockquote> + +### 3. Succint Valid Orderings +Valid orderings are the orderings that the table already satisfies. However, naively enlisting them is not scalable as discussed before. We enlist all of the valid orderings after following constraints are applied: + +- Do not use any constant expressions in the valid ordering construction +- Only use one of the entries (by convention first entry) in the equivalent expression group. +- Lexicographic ordering shouldn't contain any leading ordering<sup id="fn2">[2](#footnote2)</sup>except the first position <sup id="fn3">[3](#footnote3)</sup>. +- Do not use any prefix of a valid lexicographic ordering<sup id="fn4">[4](#footnote4)</sup>. + +After applying the first and second constraint, example table simplifies to + +<style> + table { + border-collapse: collapse; + width: 80%; + font-family: -apple-system, BlinkMacSystemFont, "Segoe UI", Roboto, Helvetica, Arial, sans-serif; + } + th, td { + padding: 12px 16px; + text-align: left; + border-bottom: 1px solid #e0e0e0; + } + th { + background-color: #f9f9f9; + font-weight: 600; + } + tr:hover { + background-color: #f1f1f1; + } +</style> + +<table> + <tr> + <th>amount</th> <th>price</th><th>time_bin</th> <th>time</th> + </tr> + <tr> + <td>12</td> <td>25</td><td>08:00:00</td> <td>08:01:30</td> + </tr> + <tr> + <td>12</td> <td>26</td><td>08:00:00</td> <td>08:11:30</td> + </tr> + <tr> + <td>15</td> <td>30</td><td>08:00:00</td> <td>08:41:30</td> + </tr> + <tr> + <td>15</td> <td>32</td><td>08:00:00</td> <td>08:55:15</td> + </tr> + <tr> + <td>15</td> <td>35</td><td>09:00:00</td> <td>09:10:23</td> + </tr> + <tr> + <td>20</td> <td>18</td><td>09:00:00</td> <td>09:20:33</td> + </tr> + <tr> + <td>20</td> <td>22</td><td>09:00:00</td> <td>09:40:15</td> + </tr> +</table> +<br> +Following third and fourth constraints for the simplified table, succinct valid orderings are:<br> +`[amount ASC, price ASC]`, +`[time_bin ASC]`, +`[time ASC]` + +### Table Properties + +In summary, for the example table, the following properties can be derived: + +- **Constant Expressions** = `hostname, currency` +- **Equivalent Expression Groups** = `[price, price_cloned], [time, time_cloned]` +- **Valid Orderings** = `[amount ASC, price ASC], [time_bin ASC], [time ASC]` + +### Algorithm for Analyzing Ordering Requirements + +After deriving these properties for the data, following algorithm can be used to check whether an ordering requirement is satisfied by the table: + +1. **Prune constant expressions**: Remove any constant expressions from the ordering requirement. +2. **Normalize the requirement**: Replace each expression in the ordering requirement with the first entry from its equivalence group. +3. **De-duplicate expressions**: If an expression appears more than once, remove duplicates, keeping only the first occurrence. +4. **Match leading orderings**: Check whether the leading ordering requirement<sup id="fn5">[5](#footnote5)</sup> matches the leading valid orderings<sup id="fn6">[6](#footnote6)</sup> of table. If so: + - Remove the leading ordering requirement from the ordering requirement + - Remove the matching leading valid ordering from the valid orderings of table. +5. **Iterate through the remaining expressions**: Go back to step 4 until ordering requirement is empty or leading ordering requirement is not found among the leading valid orderings of table. + +If, at the end of the procedure above, the ordering requirement is an empty list, we can conclude that the requirement is satisfied by the table. + +### Example Walkthrough + +Let's check if the ordering requirement `[hostname DESC, amount ASC, time_bin ASC, price_cloned ASC, time ASC, currency ASC, price DESC]` is satisfied by the table with properties: + +- **Constant Expressions** = `hostname, currency` +- **Equivalent Expressions Groups** = `[price, price_cloned], [time, time_cloned]` +- **Succinct Valid Orderings** = `[amount ASC, price ASC], [time_bin ASC], [time ASC]` + +### Algorithm Steps + +1. **Prune constant expressions**: + Remove `hostname` and `curreny` from the requirement. The requirement becomes: + `[amount ASC, time_bin ASC, price_cloned ASC, time ASC, price DESC]`. + +2. **Normalize using equivalent groups**: + Replace `price_cloned` with `price` and `time_cloned` with `time`. The requirement becomes: + `[amount ASC, time_bin ASC, price ASC, time ASC, price DESC]`. + +3. **De-duplicate expressions**: + Since `price` appears twice, we simplify the requirement to: + `[amount ASC, time_bin ASC, price ASC, time ASC]` (keeping the first occurrence from the left side). + +4. **Match leading orderings**: + Check if leading ordering requirement `amount ASC` is among the leading valid orderings: `amount ASC, time_bin ASC, time ASC`. Since this is the case, we remove `amount ASC` from both the ordering requirement and the valid orderings of the table. Review Comment: The reason I use a different term is following, for the ordering `[a ASC, b ASC, c ASC]` prefixes would be `[a ASC]` and `[a ASC, b ASC]`. However, leading ordering would be just (e.g. first ordering) `[a ASC]` I wanted to distinguish between the two. However, if this is confusing I can also use `prefix`. ########## content/blog/2025-03-05-ordering-analysis.md: ########## @@ -0,0 +1,353 @@ +--- +layout: post +title: Analysis of Ordering for Better Plans +date: 2025-03-05 +author: Mustafa Akur, Andrew Lamb +categories: [tutorial] +--- + +<!-- +{% comment %} +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to you under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +{% endcomment %} +--> + +<!-- see https://github.com/apache/datafusion/issues/11631 for details --> + +## Introduction +In this blog post, we will explore how to determine whether an ordering requirement of an operator is satisfied by its input data. This analysis is essential for order-based optimizations and is often more complex than one might initially think. +<blockquote style="border-left: 4px solid #007bff; padding: 10px; background-color: #f8f9fa;"> + <strong>Ordering Requirement</strong> for an operator refers to the condition that input data must be sorted in a certain way for the operator to function as intended. If this condition is not met, the operator may not perform as expected. It is the job of the planner to make sure that the requirements - such as specific ordering, specific distribution, etc. - of all operators are satisfied during execution. +</blockquote> + +There are various use cases, where this type of analysis can be useful. +### Removing Unnecessary Sorts +Imagine a user wants to execute the following query: +```SQL +SELECT hostname, log_line +FROM telemetry ORDER BY time ASC limit 10 +``` +If we don't know anything about the `telemetry` table, we need to sort it by `time ASC` and then retrieve the first 10 rows to get the correct result. However, if the table is already ordered by `time ASC`, simply retrieving the first 10 rows is sufficient. This approach executes much faster and uses less memory compared to the first version. + +The key is that the query optimizer needs to know the data is already sorted. For simple queries that is likely simple, but it gets complicated fast, like for example, what if your data is sorted by `[hostname, time ASC]` and your query is +```sql +SELECT hostname, log_line +FROM telemetry WHERE hostname = 'app.example.com' ORDER BY time ASC; +``` +In this case, the system still doesn't have to do any sorting -- but only if it has enough analysis to be able to reason about the sortedness of the stream when we know `hostname` has a single value. + +### Optimizing Execution Modes Using Ordering Information +As another use case, some operators can utilize the ordering information to change its underlying algorithm to execute more efficiently. Consider the following query: +```SQL +SELECT COUNT(log_line) +FROM telemetry GROUP BY hostname; +``` +when `telemetry` is sorted by `hostname`, aggregation doesn't need to hash the entire data at its input. It can use a much more efficient algorithm for grouping the data according to the `hostname` values. Failure to detect the ordering can result in choosing the sub-optimal algorithm variant for the operator. To see this in practice, check out the [source](https://github.com/apache/datafusion/tree/main/datafusion/physical-plan/src/aggregates/order) for ordered variant of the `Aggregation` in `Datafusion`. + +### Streaming-Friendly Execution + +Stream processing aims to produce results immediately as they become available, ensuring minimal latency for real-time workloads. However, some operators need to consume all input data before producing any output. Consider the `Sort` operation: before it can start generating output, the algorithm must first process all input data. As a result, data flow halts whenever such an operator is encountered until all input is consumed. When a physical query plan contains such an operator (`Sort`, `CrossJoin`, ..), we refer to this as query pipeline breaking, meaning the query cannot be executed in a streaming fashion. + +For a query to be executed in a streaming fashion, we need to satisfy 2 conditions: + +**Logically Streamable** +It should be possible to generate what user wants in streaming fashion. Consider following query: + +```SQL +SELECT SUM(amount) +FROM orders +``` +Here, the user wants to compute the sum of all amounts in the orders table. By nature, this query requires scanning the entire table to generate a result, making it impossible to execute in a streaming fashion. + +**Planner should be smart** +Being logically streamable does not guarantee that a query will execute in a streaming fashion. SQL is a declarative language, meaning it specifies 'WHAT' user wants. It is up to the planner, 'HOW' to generate the result. In most cases, there are many ways to compute the correct result for a given query. The query planner is responsible for choosing "a way" (ideally the best<sup id="optimal1">[*](#optimal)</sup> one) among the all alternatives to generate what user asks for. If a plan contains a pipeline-breaking operator, the execution will not be streaming—even if the query is logically streamable. To generate truly streaming plans from logically streamable queries, the planner must carefully analyze the existing orderings in the source tables to ensure that the final plan does not contain any pipeline-breaking operators. + + +## Analysis +Let's start by creating an example table that we will refer throughout the post. This table models the input data of an operator for the analysis: + +### Example Virtual Table + +<style> + table { + border-collapse: collapse; + width: 80%; + font-family: -apple-system, BlinkMacSystemFont, "Segoe UI", Roboto, Helvetica, Arial, sans-serif; + } + th, td { + padding: 12px 16px; + text-align: left; + border-bottom: 1px solid #e0e0e0; + } + th { + background-color: #f9f9f9; + font-weight: 600; + } + tr:hover { + background-color: #f1f1f1; + } +</style> + +<table> Review Comment: I have added a section related to this with [commit](116a80dac30f669e4cd7cd1e9ccc3abeceedbe51) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org