devanbenz commented on code in PR #12122: URL: https://github.com/apache/datafusion/pull/12122#discussion_r1729658937
########## docs/source/user-guide/explain-usage.md: ########## @@ -0,0 +1,423 @@ +<!--- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +--> + +# Reading Explain Plans + +## Introduction + +This section describes of how to read a DataFusion query plan. While fully +comprehending all details of these plans requires significant expertise in the +DataFusion engine, this guide will help you get started with the basics. + +Datafusion executes queries using a `query plan`. To see the plan without +running the query, add the keyword `EXPLAIN` to your SQL query or call the +[DataFrame::explain] method + +[DataFrame::explain]: https://docs.rs/datafusion/latest/datafusion/dataframe/struct.DataFrame.html#method.explain + +## Example: Select and filter + +In this section, we run example queries against the `hits.parquet` file. See +[below](#data-in-this-example)) for information on how to get this file. + +Let's see how DataFusion runs a query that selects the top 5 watch lists for the +site `http://domcheloveplanet.ru/`: + +```sql +EXPLAIN SELECT "WatchID" AS wid, "hits.parquet"."ClientIP" AS ip +FROM 'hits.parquet' +WHERE starts_with("URL", 'http://domcheloveplanet.ru/') +ORDER BY wid ASC, ip DESC +LIMIT 5; +``` + +The output will look like + +``` ++---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| plan_type | plan | ++---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| logical_plan | Sort: wid ASC NULLS LAST, ip DESC NULLS FIRST, fetch=5 | +| | Projection: hits.parquet.WatchID AS wid, hits.parquet.ClientIP AS ip | +| | Filter: starts_with(hits.parquet.URL, Utf8("http://domcheloveplanet.ru/")) | +| | TableScan: hits.parquet projection=[WatchID, ClientIP, URL], partial_filters=[starts_with(hits.parquet.URL, Utf8("http://domcheloveplanet.ru/"))] | +| physical_plan | SortPreservingMergeExec: [wid@0 ASC NULLS LAST,ip@1 DESC], fetch=5 | +| | SortExec: TopK(fetch=5), expr=[wid@0 ASC NULLS LAST,ip@1 DESC], preserve_partitioning=[true] | +| | ProjectionExec: expr=[WatchID@0 as wid, ClientIP@1 as ip] | +| | CoalesceBatchesExec: target_batch_size=8192 | +| | FilterExec: starts_with(URL@2, http://domcheloveplanet.ru/) | +| | ParquetExec: file_groups={16 groups: [[hits.parquet:0..923748528], ...]}, projection=[WatchID, ClientIP, URL], predicate=starts_with(URL@13, http://domcheloveplanet.ru/) | ++---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +2 row(s) fetched. +Elapsed 0.060 seconds. +``` + +There are two sections: logical plan and physical plan + +- **Logical Plan:** is a plan generated for a specific SQL query, DataFrame, or other language without the + knowledge of the underlying data organization. +- **Physical Plan:** is a plan generated from a logical plan along with consideration of the hardware + configuration (e.g number of CPUs) and the underlying data organization (e.g number of files). + This physical plan is specific to your hardware configuration and your data. If you load the same + data to different hardware with different configurations, the same query may generate different query plans. + +Understanding a query plan can help to you understand its performance. For example, when the plan shows your query reads +many files, it signals you to either add more filter in the query to read less data or to modify your file +design to make fewer but larger files. This document focuses on how to read a query plan. How to make a +query run faster depends on the reason it is slow and beyond the scope of this document. + +## Query plans are trees + +A query plan is an upside down tree, and we always read from bottom up. The +physical plan in Figure 1 in tree format will look like + +``` + ▲ + │ + │ +┌─────────────────────────────────────────────────┐ +│ SortPreservingMergeExec │ +│ [wid@0 ASC NULLS LAST,ip@1 DESC] │ +│ fetch=5 │ +└─────────────────────────────────────────────────┘ + ▲ + │ +┌─────────────────────────────────────────────────┐ +│ SortExec TopK(fetch=5), │ +│ expr=[wid@0 ASC NULLS LAST,ip@1 DESC], │ +│ preserve_partitioning=[true] │ +└─────────────────────────────────────────────────┘ + ▲ + │ +┌─────────────────────────────────────────────────┐ +│ ProjectionExec │ +│ expr=[WatchID@0 as wid, ClientIP@1 as ip] │ +└─────────────────────────────────────────────────┘ + ▲ + │ +┌─────────────────────────────────────────────────┐ +│ CoalesceBatchesExec │ +└─────────────────────────────────────────────────┘ + ▲ + │ +┌─────────────────────────────────────────────────┐ +│ FilterExec │ +│ starts_with(URL@2, http://domcheloveplanet.ru/) │ +└─────────────────────────────────────────────────┘ + ▲ + │ +┌────────────────────────────────────────────────┐ +│ ParquetExec │ +│ hits.parquet (filter = ...) │ +└────────────────────────────────────────────────┘ +``` + +Each node in the tree/plan ends with `Exec` and is sometimes also called an `operator` or `ExecutionPlan` where data is +processed, transformed and sent up. + +1. First, data in parquet the `hits.parquet` file us read in parallel using 16 cores in 16 "partitions" (more on this later) from `ParquetExec`, which applies a first pass at filtering during the scan. +2. Next, the output is filtered using `FilterExec` to ensure only rows where `starts_with(URL, 'http://domcheloveplanet.ru/')` evaluates to true are passed on +3. The `CoalesceBatchesExec` then ensures that the data is grouped into larger batches for processing +4. The `ProjectionExec` then projects the data to rename the `WatchID` and `ClientIP` columns to `wid` and `ip` respectively. +5. The `SortExec` then sorts the data by `wid ASC, ip DESC`. The `Topk(fetch=5)` indicates that a special implementation is used that only tracks and emits the top 5 values in each partition. +6. Finally the `SortPreservingMergeExec` merges the sorted data from all partitions and returns the top 5 rows overall. + +## Understanding large query plans + +A large query plan may look intimidating, but you can quickly understand what it does by following these steps + +1. As always, read from bottom up, one operator at a time. +2. Understand the job of this operator by reading + the [Physical Plan documentation](https://docs.rs/datafusion/latest/datafusion/physical_plan/index.html). +3. Understand the input data of the operator and how large/small it may be. +4. Understand how much data that operator produces and what it would look like. + +If you can answer those questions, you will be able to estimate how much work +that plan has to do and thus how long it will take. However, the `EXPLAIN` just +shows you the plan without executing it. + +If you want to know more about how much work each operator in query plan does, +you can use the `EXPLAIN ANALYZE` to get the explain with runtime added (see +next section) + +## More Debugging Information: `EXPLAIN VERBOSE` + +If the plan has to read too many files, not all of them will be shown in the +`EXPLAIN`. To see them, use `EXPLAIN VEBOSE`. Like `EXPLAIN`, `EXPLAIN VERBOSE` +does not run the query. Instead it shows the full explain plan, with information +that is omitted from the default explain, as well as all intermediate physical +plans DataFusion generates before returning. This mode can be very helpful for +debugging to see why and when DataFusion added and removed operators from a plan. + +## Execution Counters: `EXPLAIN ANALYZE` + +During execution, DataFusion operators collect detailed metrics. You can access +them programmatically via [`ExecutionPlan::metrics`] as well as with the +`EXPLAIN ANALYZE` command. For example here is the same query query as +above but with `EXPLAIN ANALYZE` (note the output is edited for clarity) + +[`ExecutionPlan::metrics`]: https://docs.rs/datafusion/latest/datafusion/physical_plan/trait.ExecutionPlan.html#method.metrics + +```sql +> EXPLAIN ANALYZE SELECT "WatchID" AS wid, "hits.parquet"."ClientIP" AS ip +FROM 'hits.parquet' +WHERE starts_with("URL", 'http://domcheloveplanet.ru/') +ORDER BY wid ASC, ip DESC +LIMIT 5; ++-------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| plan_type | plan | ++-------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| Plan with Metrics | SortPreservingMergeExec: [wid@0 ASC NULLS LAST,ip@1 DESC], fetch=5, metrics=[output_rows=5, elapsed_compute=2.375µs] | +| | SortExec: TopK(fetch=5), expr=[wid@0 ASC NULLS LAST,ip@1 DESC], preserve_partitioning=[true], metrics=[output_rows=75, elapsed_compute=7.243038ms, row_replacements=482] | +| | ProjectionExec: expr=[WatchID@0 as wid, ClientIP@1 as ip], metrics=[output_rows=811821, elapsed_compute=66.25µs] | +| | FilterExec: starts_with(URL@2, http://domcheloveplanet.ru/), metrics=[output_rows=811821, elapsed_compute=1.36923816s] | +| | ParquetExec: file_groups={16 groups: [[hits.parquet:0..923748528], ...]}, projection=[WatchID, ClientIP, URL], predicate=starts_with(URL@13, http://domcheloveplanet.ru/), metrics=[output_rows=99997497, elapsed_compute=16ns, ... bytes_scanned=3703192723, ... time_elapsed_opening=308.203002ms, time_elapsed_scanning_total=8.350342183s, ...] | ++-------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +1 row(s) fetched. +Elapsed 0.720 seconds. +``` + +In this case, DataFusion actually ran the query, but discarded any results, and +instead returned an annotated plan with a new field, `metrics=[...]` + +Most operators have the common metrics `output_rows` and `elapsed_compute` and +some have operator specific metrics such as `ParquetExec` which has +`bytes_scanned=3703192723`. Note that times and counters are reported across all +cores, so if you have 16 cores, the time reported is the sum of the time taken +by all 16 cores. + +Again, reading from bottom up: +- `ParquetExec` + - `output_rows=99997497`: A total 99.9M rows were produced + - `bytes_scanned=3703192723`: Of the 14GB file, 3.7GB were actually read (due to projection pushdown) + - `time_elapsed_opening=308.203002ms`: It took 300ms to open the file and prepare to read it + - `time_elapsed_scanning_total=8.350342183s`: It took 8.3 seconds of CPU time (across 16 cores) to actually decode the parquet data +- `FilterExec` + - `output_rows=811821`: Of the 99.9M rows at its input, only 811K rows passed the filter and were produced at the output + - `elapsed_compute=1.36923816s`: In total, 1.36s of CPU time (across 16 cores) was spend evaluating the filter +- `CoalesceBatchesExec` + - `output_rows=811821`, `elapsed_compute=12.873379ms`: Produced 811K rows in 13ms +- `ProjectionExec` + - `output_rows=811821, elapsed_compute=66.25µs`: Produced 811K rows in 66µs (microseconds). This projection is almost instantaneous as it does not manipulate any data +- `SortExec` + - `output_rows=75`: Produced 75 rows in total. Each of 16 cores could produce up to 5 rows, but in this case not all cores did. + - `elapsed_compute=7.243038ms`: 7ms was used to determine the top 5 rows + - `row_replacements=482`: Internally, the TopK operator updated its top list 482 times +- `SortPreservingMergeExec` + - `output_rows=5`, `elapsed_compute=2.375µs`: Produced the final 5 rows in 2.375µs (microseconds) + +## Example of an Aggregate Query + +Let us delve into an example click bench query that aggregates data from the `hits.parquet` file. + +For example, this query from ClickBench finds the top 10 users by the number of hits: + +```sql +SELECT "UserID", COUNT(*) FROM 'hits.parquet' GROUP BY "UserID" ORDER BY COUNT(*) DESC LIMIT 10; +``` + +##### Query and query plan + +TODO: devanbenz can you update this section, trying to follow the model of the simpler one above? Review Comment: That sounds good to me! I'll have some time over the weekend to tackle it :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
