alamb commented on code in PR #15832: URL: https://github.com/apache/datafusion/pull/15832#discussion_r2082547957
########## docs/source/library-user-guide/extending-operators.md: ########## @@ -19,4 +19,448 @@ # Extending DataFusion's operators: custom LogicalPlan and Execution Plans -Coming soon +This module contains an end-to-end demonstration of creating a user-defined operator in DataFusion. Specifically, it shows how to define a `TopKNode` that implements `ExtensionPlanNode`, add an OptimizerRule to rewrite a `LogicalPlan` to use that node, create an `ExecutionPlan`, and finally produce results. + +## TopK Background: + +A "Top K" node is a common query optimization which is used for queries such as "find the top 3 customers by revenue". The(simplified) SQL for such a query might be: + +```sql +CREATE EXTERNAL TABLE sales(customer_id VARCHAR, revenue BIGINT) + STORED AS CSV location 'tests/data/customer.csv'; +SELECT customer_id, revenue FROM sales ORDER BY revenue DESC limit 3; +``` + +And a naive plan would be: + +```sql +explain SELECT customer_id, revenue FROM sales ORDER BY revenue DESC limit 3; +``` + +```text ++--------------+----------------------------------------+ +| plan_type | plan | ++--------------+----------------------------------------+ +| logical_plan | Limit: 3 | +| | Sort: revenue DESC NULLS FIRST | +| | Projection: customer_id, revenue | +| | TableScan: sales | ++--------------+----------------------------------------+ +``` + +While this plan produces the correct answer, the careful reader will note it fully sorts the input before discarding everything other than the top 3 elements. +The same answer can be produced by simply keeping track of the top N elements, reducing the total amount of required buffer memory. + +## Process for Defining Extending Operator + +The following example illustrates the implementation of a `TopK` node: + +### LogicalPlan Node Definition + +- This section defines the custom logical plan node `TopKPlanNode`, which represents the `TopK` operation. +- It includes trait implementations like `UserDefinedLogicalNodeCore` and Debug. + **Code:** + +```rust +use datafusion::common::{ + cast::{as_int64_array, as_string_array}, + tree_node::Transformed, + types::TypeSignature::Extension, + DataFusionError, +}; +use datafusion::execution::{SessionState, TaskContext}; +use datafusion::logical_expr::{ + FetchType, LogicalPlan, LogicalPlan::Sort, UserDefinedLogicalNode, +}; +use datafusion::optimizer::{ApplyOrder, OptimizerConfig, OptimizerRule}; +use datafusion::physical_expr::EquivalenceProperties; +use datafusion::physical_plan::{ + execution_plan::{Boundedness, EmissionType}, + internal_err, DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, + PlanProperties, RecordBatchStream, SendableRecordBatchStream, Statistics, +}; +use datafusion::physical_planner::{ + DefaultPhysicalPlanner, ExtensionPlanner, PhysicalPlanner, +}; +use arrow::array::{Int64Array, StringArray}; +use arrow::datatypes::SchemaRef; +use arrow::record_batch::RecordBatch; +use async_trait::async_trait; +use futures::Stream; +use std::{ + any::Any, + collections::BTreeMap, + fmt::{self, Debug}, + sync::Arc, + task::{Context, Poll}, +}; +use datafusion::physical_planner::planner::QueryPlanner; + + +#[derive(Debug)] +struct TopKQueryPlanner {} + +#[async_trait] +impl QueryPlanner for TopKQueryPlanner { + /// Given a `LogicalPlan` created from above, create an + /// `ExecutionPlan` suitable for execution + async fn create_physical_plan( + &self, + logical_plan: &LogicalPlan, + session_state: &SessionState, + ) -> Result<Arc<dyn ExecutionPlan>> { + // Teach the default physical planner how to plan TopK nodes. + let physical_planner = + DefaultPhysicalPlanner::with_extension_planners(vec![Arc::new( + TopKPlanner {}, + )]); + // Delegate most work of physical planning to the default physical planner + physical_planner + .create_physical_plan(logical_plan, session_state) + .await + } +} +``` + +### Optimizer Rule + +- Implements the `TopKOptimizerRule` to detect a `Limit` followed by a Sort and replace it with the `TopKPlanNode`. +- Includes the logic for transforming the logical plan. + code: + +```rust +use std::sync::Arc; +use datafusion::logical_expr::{Expr, Extension}; +use datafusion::logical_expr::Limit; +use datafusion::logical_expr::Sort as LogicalSort; +use crate::invariant_mock::InvariantMock; +#[derive(Default, Debug)] +struct TopKOptimizerRule { + /// A testing-only hashable fixture. + invariant_mock: Option<InvariantMock>, Review Comment: we don't need the invariant mock stuff ########## docs/source/library-user-guide/extending-operators.md: ########## @@ -19,4 +19,448 @@ # Extending DataFusion's operators: custom LogicalPlan and Execution Plans -Coming soon +This module contains an end-to-end demonstration of creating a user-defined operator in DataFusion. Specifically, it shows how to define a `TopKNode` that implements `ExtensionPlanNode`, add an OptimizerRule to rewrite a `LogicalPlan` to use that node, create an `ExecutionPlan`, and finally produce results. + +## TopK Background: + +A "Top K" node is a common query optimization which is used for queries such as "find the top 3 customers by revenue". The(simplified) SQL for such a query might be: + +```sql +CREATE EXTERNAL TABLE sales(customer_id VARCHAR, revenue BIGINT) + STORED AS CSV location 'tests/data/customer.csv'; +SELECT customer_id, revenue FROM sales ORDER BY revenue DESC limit 3; +``` + +And a naive plan would be: + +```sql +explain SELECT customer_id, revenue FROM sales ORDER BY revenue DESC limit 3; +``` + +```text ++--------------+----------------------------------------+ +| plan_type | plan | ++--------------+----------------------------------------+ +| logical_plan | Limit: 3 | +| | Sort: revenue DESC NULLS FIRST | +| | Projection: customer_id, revenue | +| | TableScan: sales | ++--------------+----------------------------------------+ +``` + +While this plan produces the correct answer, the careful reader will note it fully sorts the input before discarding everything other than the top 3 elements. +The same answer can be produced by simply keeping track of the top N elements, reducing the total amount of required buffer memory. + +## Process for Defining Extending Operator + +The following example illustrates the implementation of a `TopK` node: + +### LogicalPlan Node Definition + +- This section defines the custom logical plan node `TopKPlanNode`, which represents the `TopK` operation. +- It includes trait implementations like `UserDefinedLogicalNodeCore` and Debug. + **Code:** + +```rust +use datafusion::common::{ Review Comment: this doesn't seem right to me -- it is not implementing a LogicalPlan node, it is trying to implement an exntesion It is like it was created by an LLM that didn't quite have enough context ########## docs/source/library-user-guide/extending-operators.md: ########## @@ -19,4 +19,448 @@ # Extending DataFusion's operators: custom LogicalPlan and Execution Plans -Coming soon +This module contains an end-to-end demonstration of creating a user-defined operator in DataFusion. Specifically, it shows how to define a `TopKNode` that implements `ExtensionPlanNode`, add an OptimizerRule to rewrite a `LogicalPlan` to use that node, create an `ExecutionPlan`, and finally produce results. + +## TopK Background: + +A "Top K" node is a common query optimization which is used for queries such as "find the top 3 customers by revenue". The(simplified) SQL for such a query might be: + +```sql +CREATE EXTERNAL TABLE sales(customer_id VARCHAR, revenue BIGINT) + STORED AS CSV location 'tests/data/customer.csv'; +SELECT customer_id, revenue FROM sales ORDER BY revenue DESC limit 3; +``` + +And a naive plan would be: + +```sql +explain SELECT customer_id, revenue FROM sales ORDER BY revenue DESC limit 3; +``` + +```text ++--------------+----------------------------------------+ +| plan_type | plan | ++--------------+----------------------------------------+ +| logical_plan | Limit: 3 | +| | Sort: revenue DESC NULLS FIRST | +| | Projection: customer_id, revenue | +| | TableScan: sales | ++--------------+----------------------------------------+ +``` + +While this plan produces the correct answer, the careful reader will note it fully sorts the input before discarding everything other than the top 3 elements. +The same answer can be produced by simply keeping track of the top N elements, reducing the total amount of required buffer memory. + +## Process for Defining Extending Operator + +The following example illustrates the implementation of a `TopK` node: + +### LogicalPlan Node Definition + +- This section defines the custom logical plan node `TopKPlanNode`, which represents the `TopK` operation. +- It includes trait implementations like `UserDefinedLogicalNodeCore` and Debug. + **Code:** + +```rust +use datafusion::common::{ Review Comment: Can you please pull `TopKNode` definition from here? https://github.com/apache/datafusion/blob/8c8b2454cbd78204dc6426f9898b79c179486a86/datafusion/core/tests/user_defined/user_defined_plan.rs#L542-L548 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org