milenkovicm commented on code in PR #1382: URL: https://github.com/apache/datafusion-ballista/pull/1382#discussion_r2698610802
########## docs/source/user-guide/extensions-example.md: ########## @@ -0,0 +1,491 @@ +<!--- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +--> + +# Extensions Example + +This project demonstrates possible extensions mechanisms for the [Ballista distributed compute platform](https://github.com/apache/arrow-ballista). + +The goal of this small project is to enhance Ballista's capabilities by providing new logical and physical operators, utilities, and integration tools to support additional data processing workflows. + +> [!NOTE] +> +> This project has been part of "Extending DataFusion Ballista" show case series +> +> - [DataFusion Ballista Python UDF Support](https://github.com/milenkovicm/ballista_python) +> - [DataFusion Ballista Read Support For Delta Table](https://github.com/milenkovicm/ballista_delta) +> - [Extending DataFusion Ballista](https://github.com/milenkovicm/ballista_extensions) + +This example will implement [`sample()`](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.sample.html) operator which will return a sampled subset of original `DataFrame`: + +```rust +let ctx = SessionContext::remote_with_state("df://localhost:50050", state).await?; +let df = ctx.read_parquet("data/", Default::default()).await?; + +// The `sample` operator, defined in this project, +// samples 30% of the data and displays the result. +let df = df.sample(0.30, None)?; +``` + +To implement this functionality, it is necessary to implement new logical plan extension, physical operators and extend `DataFrame` to expose new operator. + +> [!WARNING] +> Please do not use implemented sampling operator for production, statisticians would not approve it, probably. + +This demo will provide: + +- Custom DataFusion (logical and physical) nodes. +- Logical and physical extension codecs. +- Custom protocol buffer definitions. +- Extension query planner. + +## Logical Plan Extension + +The first step is to implement a custom logical plan extension: + +```rust +//! This module defines the implementation of the `UserDefinedLogicalNodeCore` trait for the `Sample` logical plan node. +//! +//! The `Sample` node represents a custom logical plan extension for sampling data within a query plan. +//! +use std::{hash::Hash, vec}; + +use datafusion::{ + error::DataFusionError, + logical_expr::{LogicalPlan, UserDefinedLogicalNodeCore}, +}; + +#[derive(Debug, Clone, PartialEq, PartialOrd)] +pub struct Sample { + pub fraction: f32, + pub seed: Option<i64>, + pub input: LogicalPlan, +} + +impl Hash for Sample { + fn hash<H: std::hash::Hasher>(&self, state: &mut H) { + self.seed.hash(state); + self.input.hash(state); + } +} + +impl Eq for Sample {} + +impl Sample { + pub fn new(fraction: f32, seed: Option<i64>, input: LogicalPlan) -> Self { + Self { + fraction, + seed, + input, + } + } +} + +impl UserDefinedLogicalNodeCore for Sample { + fn name(&self) -> &str { + "Sample" + } + + fn inputs(&self) -> Vec<&LogicalPlan> { + vec![&self.input] + } + + fn schema(&self) -> &datafusion::common::DFSchemaRef { + self.input.schema() + } + + fn expressions(&self) -> Vec<datafusion::prelude::Expr> { + vec![] + } + + fn fmt_for_explain(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + f.write_fmt(format_args!( + "Sample: fraction: {}, seed: {:?}", + self.fraction, self.seed + ))?; + Ok(()) + } + + fn with_exprs_and_inputs( + &self, + _exprs: Vec<datafusion::prelude::Expr>, + inputs: Vec<LogicalPlan>, + ) -> datafusion::error::Result<Self> { + Ok(Self { + seed: self.seed, + fraction: self.fraction, + input: inputs + .first() + .ok_or(DataFusionError::Plan("expected single input".to_string()))? + .clone(), + }) + } +} +``` + +## DataFrame Extension + +To expose this functionality to end users, a DataFrame extension] is implemented. This extension creates a `LogicalPlan::Extension(extension)` node: + +```rust +use std::sync::Arc; + +use datafusion::{ + error::DataFusionError, + logical_expr::{Extension, LogicalPlan}, + prelude::DataFrame, +}; + +use crate::logical::sample_extension::Sample; + +pub trait DataFrameExt { + fn sample(self, fraction: f32, seed: Option<i64>) -> datafusion::error::Result<DataFrame>; +} + +/// Returns a new `DataFrame` containing a random sample of rows from the original `DataFrame`. +/// +/// # Arguments +/// +/// * `fraction` - The fraction of rows to sample, must be in the range (0.0, 1.0]. +/// * `seed` - An optional seed for the random number generator to ensure reproducibility. +/// +/// # Errors +/// +/// Returns a `DataFusionError::Configuration` if `fraction` is not within the valid range. +/// +impl DataFrameExt for DataFrame { + fn sample(self, fraction: f32, seed: Option<i64>) -> datafusion::error::Result<DataFrame> { + if !(fraction > 0.0 && fraction <= 1.0) { + Err(DataFusionError::Configuration( + "fraction should be in 0 ..= 1 range".to_string(), + ))? + } + + if seed.unwrap_or(0) < 0 { + Err(DataFusionError::Configuration( + "seed should be positive number".to_string(), + ))? + } + + let (state, input) = self.into_parts(); + + let node = Arc::new(Sample { + fraction, + seed, + input, + }); + let extension = Extension { node }; + let plan = LogicalPlan::Extension(extension); + + Ok(DataFrame::new(state, plan)) + } +} +``` + +This approach enables the addition of new methods to the DataFusion DataFrame implementation: + +```rust +let ctx = SessionContext::remote_with_state("df://localhost:50050", state).await?; +let df = ctx.read_parquet("data/", Default::default()).await?; + +// The DataFrame extension provides the `sample` method +let df = df.sample(0.30, None)?; +``` + + + +## Logical Extension Codec + +With the extension in place, a custom logical extension codec is required to transmit the client logical plan to the scheduler. + +The logical extension codec typically consists of two components: Google Protocol Buffer definitions: + +```proto +message LMessage { + oneof Extension { + LSample sample = 1; + } +} + +message LSample { + float fraction = 1; + optional int64 seed = 2; +} +``` + +See [proto/extension.proto](proto/extension.proto). + +`LogicalExtensionCodec` extends `BallistaLogicalExtensionCodec` handling newly defined operator messages: + +```rust +#[derive(Debug, Default)] +pub struct ExtendedBallistaLogicalCodec { + inner: BallistaLogicalExtensionCodec, +} + +impl LogicalExtensionCodec for ExtendedBallistaLogicalCodec { + fn try_decode( + &self, + buf: &[u8], + inputs: &[datafusion::logical_expr::LogicalPlan], + _ctx: &datafusion::prelude::SessionContext, + ) -> datafusion::error::Result<datafusion::logical_expr::Extension> { + let message = + LMessage::decode(buf).map_err(|e| DataFusionError::Internal(e.to_string()))?; + + match message.extension { + Some(Extension::Sample(sample)) => { + let node = Arc::new(Sample { + input: inputs + .first() + .ok_or(DataFusionError::Plan("expected input".to_string()))? + .clone(), + seed: sample.seed, + fraction: sample.fraction, + }); + + Ok(datafusion::logical_expr::Extension { node }) + } + None => plan_err!("Can't cast logical extension "), + } + } + + fn try_encode( + &self, + node: &datafusion::logical_expr::Extension, + buf: &mut Vec<u8>, + ) -> datafusion::error::Result<()> { + if let Some(Sample { seed, fraction, .. }) = node.node.as_any().downcast_ref::<Sample>() { + let sample = LSample { + seed: *seed, + fraction: *fraction, + }; + let message = LMessage { + extension: Some(super::messages::l_message::Extension::Sample(sample)), + }; + + message + .encode(buf) + .map_err(|e| DataFusionError::Internal(e.to_string()))?; + + Ok(()) + } else { + self.inner.try_encode(node, buf) + } + } + // Additional implementation omitted for brevity +} +``` + +[src/codec/extension.rs](src/codec/extension.rs) Review Comment: this link will be dead, we don't really need it, can we remove it? ########## docs/source/user-guide/extensions-example.md: ########## @@ -0,0 +1,491 @@ +<!--- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +--> + +# Extensions Example + +This project demonstrates possible extensions mechanisms for the [Ballista distributed compute platform](https://github.com/apache/arrow-ballista). + +The goal of this small project is to enhance Ballista's capabilities by providing new logical and physical operators, utilities, and integration tools to support additional data processing workflows. + +> [!NOTE] +> +> This project has been part of "Extending DataFusion Ballista" show case series +> +> - [DataFusion Ballista Python UDF Support](https://github.com/milenkovicm/ballista_python) +> - [DataFusion Ballista Read Support For Delta Table](https://github.com/milenkovicm/ballista_delta) +> - [Extending DataFusion Ballista](https://github.com/milenkovicm/ballista_extensions) + +This example will implement [`sample()`](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.sample.html) operator which will return a sampled subset of original `DataFrame`: + +```rust +let ctx = SessionContext::remote_with_state("df://localhost:50050", state).await?; +let df = ctx.read_parquet("data/", Default::default()).await?; + +// The `sample` operator, defined in this project, +// samples 30% of the data and displays the result. +let df = df.sample(0.30, None)?; +``` + +To implement this functionality, it is necessary to implement new logical plan extension, physical operators and extend `DataFrame` to expose new operator. + +> [!WARNING] +> Please do not use implemented sampling operator for production, statisticians would not approve it, probably. + +This demo will provide: + +- Custom DataFusion (logical and physical) nodes. +- Logical and physical extension codecs. +- Custom protocol buffer definitions. +- Extension query planner. + +## Logical Plan Extension + +The first step is to implement a custom logical plan extension: + +```rust +//! This module defines the implementation of the `UserDefinedLogicalNodeCore` trait for the `Sample` logical plan node. +//! +//! The `Sample` node represents a custom logical plan extension for sampling data within a query plan. +//! +use std::{hash::Hash, vec}; + +use datafusion::{ + error::DataFusionError, + logical_expr::{LogicalPlan, UserDefinedLogicalNodeCore}, +}; + +#[derive(Debug, Clone, PartialEq, PartialOrd)] +pub struct Sample { + pub fraction: f32, + pub seed: Option<i64>, + pub input: LogicalPlan, +} + +impl Hash for Sample { + fn hash<H: std::hash::Hasher>(&self, state: &mut H) { + self.seed.hash(state); + self.input.hash(state); + } +} + +impl Eq for Sample {} + +impl Sample { + pub fn new(fraction: f32, seed: Option<i64>, input: LogicalPlan) -> Self { + Self { + fraction, + seed, + input, + } + } +} + +impl UserDefinedLogicalNodeCore for Sample { + fn name(&self) -> &str { + "Sample" + } + + fn inputs(&self) -> Vec<&LogicalPlan> { + vec![&self.input] + } + + fn schema(&self) -> &datafusion::common::DFSchemaRef { + self.input.schema() + } + + fn expressions(&self) -> Vec<datafusion::prelude::Expr> { + vec![] + } + + fn fmt_for_explain(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + f.write_fmt(format_args!( + "Sample: fraction: {}, seed: {:?}", + self.fraction, self.seed + ))?; + Ok(()) + } + + fn with_exprs_and_inputs( + &self, + _exprs: Vec<datafusion::prelude::Expr>, + inputs: Vec<LogicalPlan>, + ) -> datafusion::error::Result<Self> { + Ok(Self { + seed: self.seed, + fraction: self.fraction, + input: inputs + .first() + .ok_or(DataFusionError::Plan("expected single input".to_string()))? + .clone(), + }) + } +} +``` + +## DataFrame Extension + +To expose this functionality to end users, a DataFrame extension] is implemented. This extension creates a `LogicalPlan::Extension(extension)` node: + +```rust +use std::sync::Arc; + +use datafusion::{ + error::DataFusionError, + logical_expr::{Extension, LogicalPlan}, + prelude::DataFrame, +}; + +use crate::logical::sample_extension::Sample; + +pub trait DataFrameExt { + fn sample(self, fraction: f32, seed: Option<i64>) -> datafusion::error::Result<DataFrame>; +} + +/// Returns a new `DataFrame` containing a random sample of rows from the original `DataFrame`. +/// +/// # Arguments +/// +/// * `fraction` - The fraction of rows to sample, must be in the range (0.0, 1.0]. +/// * `seed` - An optional seed for the random number generator to ensure reproducibility. +/// +/// # Errors +/// +/// Returns a `DataFusionError::Configuration` if `fraction` is not within the valid range. +/// +impl DataFrameExt for DataFrame { + fn sample(self, fraction: f32, seed: Option<i64>) -> datafusion::error::Result<DataFrame> { + if !(fraction > 0.0 && fraction <= 1.0) { + Err(DataFusionError::Configuration( + "fraction should be in 0 ..= 1 range".to_string(), + ))? + } + + if seed.unwrap_or(0) < 0 { + Err(DataFusionError::Configuration( + "seed should be positive number".to_string(), + ))? + } + + let (state, input) = self.into_parts(); + + let node = Arc::new(Sample { + fraction, + seed, + input, + }); + let extension = Extension { node }; + let plan = LogicalPlan::Extension(extension); + + Ok(DataFrame::new(state, plan)) + } +} +``` + +This approach enables the addition of new methods to the DataFusion DataFrame implementation: + +```rust +let ctx = SessionContext::remote_with_state("df://localhost:50050", state).await?; +let df = ctx.read_parquet("data/", Default::default()).await?; + +// The DataFrame extension provides the `sample` method +let df = df.sample(0.30, None)?; +``` + + + +## Logical Extension Codec + +With the extension in place, a custom logical extension codec is required to transmit the client logical plan to the scheduler. + +The logical extension codec typically consists of two components: Google Protocol Buffer definitions: + +```proto +message LMessage { + oneof Extension { + LSample sample = 1; + } +} + +message LSample { + float fraction = 1; + optional int64 seed = 2; +} +``` + +See [proto/extension.proto](proto/extension.proto). + +`LogicalExtensionCodec` extends `BallistaLogicalExtensionCodec` handling newly defined operator messages: + +```rust +#[derive(Debug, Default)] +pub struct ExtendedBallistaLogicalCodec { + inner: BallistaLogicalExtensionCodec, +} + +impl LogicalExtensionCodec for ExtendedBallistaLogicalCodec { + fn try_decode( + &self, + buf: &[u8], + inputs: &[datafusion::logical_expr::LogicalPlan], + _ctx: &datafusion::prelude::SessionContext, + ) -> datafusion::error::Result<datafusion::logical_expr::Extension> { + let message = + LMessage::decode(buf).map_err(|e| DataFusionError::Internal(e.to_string()))?; + + match message.extension { + Some(Extension::Sample(sample)) => { + let node = Arc::new(Sample { + input: inputs + .first() + .ok_or(DataFusionError::Plan("expected input".to_string()))? + .clone(), + seed: sample.seed, + fraction: sample.fraction, + }); + + Ok(datafusion::logical_expr::Extension { node }) + } + None => plan_err!("Can't cast logical extension "), + } + } + + fn try_encode( + &self, + node: &datafusion::logical_expr::Extension, + buf: &mut Vec<u8>, + ) -> datafusion::error::Result<()> { + if let Some(Sample { seed, fraction, .. }) = node.node.as_any().downcast_ref::<Sample>() { + let sample = LSample { + seed: *seed, + fraction: *fraction, + }; + let message = LMessage { + extension: Some(super::messages::l_message::Extension::Sample(sample)), + }; + + message + .encode(buf) + .map_err(|e| DataFusionError::Internal(e.to_string()))?; + + Ok(()) + } else { + self.inner.try_encode(node, buf) + } + } + // Additional implementation omitted for brevity +} +``` + +[src/codec/extension.rs](src/codec/extension.rs) + +in short,implementation of the `LogicalExtensionCodec` trait, which handles conversion between Rust structures and protocol buffer definitions. + +## Logical to Physical Plan Translation + +Once the logical plan extension is provided, a translation from the logical node to a physical node is required. The transformation is performed using implementing `ExtensionPlanner` trait: + +```rust +#[derive(Debug, Clone, Default)] +pub struct CustomPlannerExtension {} + +#[async_trait] +impl ExtensionPlanner for CustomPlannerExtension { + async fn plan_extension( + &self, + _planner: &dyn PhysicalPlanner, + node: &dyn UserDefinedLogicalNode, + _logical_inputs: &[&LogicalPlan], + physical_inputs: &[Arc<dyn ExecutionPlan>], + _session_state: &SessionState, + ) -> datafusion::error::Result<Option<Arc<dyn ExecutionPlan>>> { + if let Some(Sample { fraction, seed, .. }) = node.as_any().downcast_ref::<Sample>() { + let input = physical_inputs + .first() + .ok_or(DataFusionError::Plan("expected single input".to_string()))? + .clone(); + let node = SampleExec::new(*fraction, *seed, input); + let node = Arc::new(node); + + Ok(Some(node)) + } else { + Ok(None) + } + } +} +``` + +The [custom planner](src/planner/custom_planner.rs) is registered in the session state as follows: Review Comment: this link will be dead, we don't really need it, can we remove it? ########## docs/source/user-guide/extensions-example.md: ########## @@ -0,0 +1,491 @@ +<!--- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +--> + +# Extensions Example + +This project demonstrates possible extensions mechanisms for the [Ballista distributed compute platform](https://github.com/apache/arrow-ballista). + +The goal of this small project is to enhance Ballista's capabilities by providing new logical and physical operators, utilities, and integration tools to support additional data processing workflows. + +> [!NOTE] +> +> This project has been part of "Extending DataFusion Ballista" show case series +> +> - [DataFusion Ballista Python UDF Support](https://github.com/milenkovicm/ballista_python) +> - [DataFusion Ballista Read Support For Delta Table](https://github.com/milenkovicm/ballista_delta) +> - [Extending DataFusion Ballista](https://github.com/milenkovicm/ballista_extensions) + +This example will implement [`sample()`](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.sample.html) operator which will return a sampled subset of original `DataFrame`: + +```rust +let ctx = SessionContext::remote_with_state("df://localhost:50050", state).await?; +let df = ctx.read_parquet("data/", Default::default()).await?; + +// The `sample` operator, defined in this project, +// samples 30% of the data and displays the result. +let df = df.sample(0.30, None)?; +``` + +To implement this functionality, it is necessary to implement new logical plan extension, physical operators and extend `DataFrame` to expose new operator. + +> [!WARNING] +> Please do not use implemented sampling operator for production, statisticians would not approve it, probably. + +This demo will provide: + +- Custom DataFusion (logical and physical) nodes. +- Logical and physical extension codecs. +- Custom protocol buffer definitions. +- Extension query planner. + +## Logical Plan Extension + +The first step is to implement a custom logical plan extension: + +```rust +//! This module defines the implementation of the `UserDefinedLogicalNodeCore` trait for the `Sample` logical plan node. +//! +//! The `Sample` node represents a custom logical plan extension for sampling data within a query plan. +//! +use std::{hash::Hash, vec}; + +use datafusion::{ + error::DataFusionError, + logical_expr::{LogicalPlan, UserDefinedLogicalNodeCore}, +}; + +#[derive(Debug, Clone, PartialEq, PartialOrd)] +pub struct Sample { + pub fraction: f32, + pub seed: Option<i64>, + pub input: LogicalPlan, +} + +impl Hash for Sample { + fn hash<H: std::hash::Hasher>(&self, state: &mut H) { + self.seed.hash(state); + self.input.hash(state); + } +} + +impl Eq for Sample {} + +impl Sample { + pub fn new(fraction: f32, seed: Option<i64>, input: LogicalPlan) -> Self { + Self { + fraction, + seed, + input, + } + } +} + +impl UserDefinedLogicalNodeCore for Sample { + fn name(&self) -> &str { + "Sample" + } + + fn inputs(&self) -> Vec<&LogicalPlan> { + vec![&self.input] + } + + fn schema(&self) -> &datafusion::common::DFSchemaRef { + self.input.schema() + } + + fn expressions(&self) -> Vec<datafusion::prelude::Expr> { + vec![] + } + + fn fmt_for_explain(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + f.write_fmt(format_args!( + "Sample: fraction: {}, seed: {:?}", + self.fraction, self.seed + ))?; + Ok(()) + } + + fn with_exprs_and_inputs( + &self, + _exprs: Vec<datafusion::prelude::Expr>, + inputs: Vec<LogicalPlan>, + ) -> datafusion::error::Result<Self> { + Ok(Self { + seed: self.seed, + fraction: self.fraction, + input: inputs + .first() + .ok_or(DataFusionError::Plan("expected single input".to_string()))? + .clone(), + }) + } +} +``` + +## DataFrame Extension + +To expose this functionality to end users, a DataFrame extension] is implemented. This extension creates a `LogicalPlan::Extension(extension)` node: + +```rust +use std::sync::Arc; + +use datafusion::{ + error::DataFusionError, + logical_expr::{Extension, LogicalPlan}, + prelude::DataFrame, +}; + +use crate::logical::sample_extension::Sample; + +pub trait DataFrameExt { + fn sample(self, fraction: f32, seed: Option<i64>) -> datafusion::error::Result<DataFrame>; +} + +/// Returns a new `DataFrame` containing a random sample of rows from the original `DataFrame`. +/// +/// # Arguments +/// +/// * `fraction` - The fraction of rows to sample, must be in the range (0.0, 1.0]. +/// * `seed` - An optional seed for the random number generator to ensure reproducibility. +/// +/// # Errors +/// +/// Returns a `DataFusionError::Configuration` if `fraction` is not within the valid range. +/// +impl DataFrameExt for DataFrame { + fn sample(self, fraction: f32, seed: Option<i64>) -> datafusion::error::Result<DataFrame> { + if !(fraction > 0.0 && fraction <= 1.0) { + Err(DataFusionError::Configuration( + "fraction should be in 0 ..= 1 range".to_string(), + ))? + } + + if seed.unwrap_or(0) < 0 { + Err(DataFusionError::Configuration( + "seed should be positive number".to_string(), + ))? + } + + let (state, input) = self.into_parts(); + + let node = Arc::new(Sample { + fraction, + seed, + input, + }); + let extension = Extension { node }; + let plan = LogicalPlan::Extension(extension); + + Ok(DataFrame::new(state, plan)) + } +} +``` + +This approach enables the addition of new methods to the DataFusion DataFrame implementation: + +```rust +let ctx = SessionContext::remote_with_state("df://localhost:50050", state).await?; +let df = ctx.read_parquet("data/", Default::default()).await?; + +// The DataFrame extension provides the `sample` method +let df = df.sample(0.30, None)?; +``` + + + +## Logical Extension Codec + +With the extension in place, a custom logical extension codec is required to transmit the client logical plan to the scheduler. + +The logical extension codec typically consists of two components: Google Protocol Buffer definitions: + +```proto +message LMessage { + oneof Extension { + LSample sample = 1; + } +} + +message LSample { + float fraction = 1; + optional int64 seed = 2; +} +``` + +See [proto/extension.proto](proto/extension.proto). + +`LogicalExtensionCodec` extends `BallistaLogicalExtensionCodec` handling newly defined operator messages: + +```rust +#[derive(Debug, Default)] +pub struct ExtendedBallistaLogicalCodec { + inner: BallistaLogicalExtensionCodec, +} + +impl LogicalExtensionCodec for ExtendedBallistaLogicalCodec { + fn try_decode( + &self, + buf: &[u8], + inputs: &[datafusion::logical_expr::LogicalPlan], + _ctx: &datafusion::prelude::SessionContext, + ) -> datafusion::error::Result<datafusion::logical_expr::Extension> { + let message = + LMessage::decode(buf).map_err(|e| DataFusionError::Internal(e.to_string()))?; + + match message.extension { + Some(Extension::Sample(sample)) => { + let node = Arc::new(Sample { + input: inputs + .first() + .ok_or(DataFusionError::Plan("expected input".to_string()))? + .clone(), + seed: sample.seed, + fraction: sample.fraction, + }); + + Ok(datafusion::logical_expr::Extension { node }) + } + None => plan_err!("Can't cast logical extension "), + } + } + + fn try_encode( + &self, + node: &datafusion::logical_expr::Extension, + buf: &mut Vec<u8>, + ) -> datafusion::error::Result<()> { + if let Some(Sample { seed, fraction, .. }) = node.node.as_any().downcast_ref::<Sample>() { + let sample = LSample { + seed: *seed, + fraction: *fraction, + }; + let message = LMessage { + extension: Some(super::messages::l_message::Extension::Sample(sample)), + }; + + message + .encode(buf) + .map_err(|e| DataFusionError::Internal(e.to_string()))?; + + Ok(()) + } else { + self.inner.try_encode(node, buf) + } + } + // Additional implementation omitted for brevity +} +``` + +[src/codec/extension.rs](src/codec/extension.rs) + +in short,implementation of the `LogicalExtensionCodec` trait, which handles conversion between Rust structures and protocol buffer definitions. + +## Logical to Physical Plan Translation + +Once the logical plan extension is provided, a translation from the logical node to a physical node is required. The transformation is performed using implementing `ExtensionPlanner` trait: + +```rust +#[derive(Debug, Clone, Default)] +pub struct CustomPlannerExtension {} + +#[async_trait] +impl ExtensionPlanner for CustomPlannerExtension { + async fn plan_extension( + &self, + _planner: &dyn PhysicalPlanner, + node: &dyn UserDefinedLogicalNode, + _logical_inputs: &[&LogicalPlan], + physical_inputs: &[Arc<dyn ExecutionPlan>], + _session_state: &SessionState, + ) -> datafusion::error::Result<Option<Arc<dyn ExecutionPlan>>> { + if let Some(Sample { fraction, seed, .. }) = node.as_any().downcast_ref::<Sample>() { + let input = physical_inputs + .first() + .ok_or(DataFusionError::Plan("expected single input".to_string()))? + .clone(); + let node = SampleExec::new(*fraction, *seed, input); + let node = Arc::new(node); + + Ok(Some(node)) + } else { + Ok(None) + } + } +} +``` + +The [custom planner](src/planner/custom_planner.rs) is registered in the session state as follows: + +```rust +let query_planner = Arc::new(QueryPlannerWithExtensions::default()); + +let state = SessionStateBuilder::new() + .with_query_planner(query_planner) + .with_default_features() + .build(); +``` + +Finally, the generated physical plan is serialized using the [physical plan extension codec](src/codec/extension.rs) and transmitted to the executor(s). Implementation is an extension of `BallistaPhysicalExtensionCodec`: + +```rust +#[derive(Debug, Default)] +pub struct ExtendedBallistaPhysicalCodec { + inner: BallistaPhysicalExtensionCodec, +} + +impl PhysicalExtensionCodec for ExtendedBallistaPhysicalCodec { + fn try_decode( + &self, + buf: &[u8], + inputs: &[std::sync::Arc<dyn datafusion::physical_plan::ExecutionPlan>], + registry: &dyn datafusion::execution::FunctionRegistry, + ) -> datafusion::error::Result<std::sync::Arc<dyn datafusion::physical_plan::ExecutionPlan>> + { + let message = + PMessage::decode(buf).map_err(|e| DataFusionError::Internal(e.to_string()))?; + + match message.extension { + Some(super::messages::p_message::Extension::Sample(PSample { + fraction, seed, .. + })) => { + let input = inputs + .first() + .ok_or(DataFusionError::Plan("expected input".to_string()))? + .clone(); + + let node = Arc::new(SampleExec::new(fraction, seed, input)); + + Ok(node) + } + + Some(super::messages::p_message::Extension::Opaque(opaque)) => { + self.inner.try_decode(&opaque, inputs, registry) + } + None => plan_err!("Can't cast physical extension "), + } + } + + fn try_encode( + &self, + node: std::sync::Arc<dyn datafusion::physical_plan::ExecutionPlan>, + buf: &mut Vec<u8>, + ) -> datafusion::error::Result<()> { + if let Some(SampleExec { fraction, seed, .. }) = node.as_any().downcast_ref::<SampleExec>() + { + let message = PMessage { + extension: Some(super::messages::p_message::Extension::Sample(PSample { + fraction: *fraction, + seed: *seed, + })), + }; + + message + .encode(buf) + .map_err(|e| DataFusionError::Internal(e.to_string()))?; + + Ok(()) + } else { + let mut opaque = vec![]; + self.inner + .try_encode(node, &mut opaque) + .map_err(|e| DataFusionError::Internal(e.to_string()))?; + + let message = PMessage { + extension: Some(super::messages::p_message::Extension::Opaque(opaque)), + }; + + message + .encode(buf) + .map_err(|e| DataFusionError::Internal(e.to_string()))?; + + Ok(()) + } + } +} + +``` + +This should be all moving parts necessary to extend ballista functionality. Last step would be to configure [scheduler](examples/ballista_scheduler.rs) and [executor](examples/ballista_executor.rs) to use new features. + +`SchedulerConfig` should be configured overriding logical, physical codec and session builder function: + +```rust +let config: SchedulerConfig = SchedulerConfig { + override_logical_codec: Some(Arc::new(ExtendedBallistaLogicalCodec::default())), + override_physical_codec: Some(Arc::new(ExtendedBallistaPhysicalCodec::default())), + override_session_builder: Some(Arc::new(extended_state_producer)), + ..Default::default() +}; + +let address = format!("{}:{}", config.bind_host, config.bind_port); +let address = address + .parse() + .map_err(|e: AddrParseError| BallistaError::Configuration(e.to_string()))?; + +let cluster = BallistaCluster::new_from_config(&config).await?; + +start_server(cluster, address, Arc::new(config)).await?; +``` + +```rust +pub fn extended_state_producer(config: SessionConfig) -> datafusion::error::Result<SessionState> { + // we need custom query planner to convert logical to physical operator + let query_planner = Arc::new(QueryPlannerWithExtensions::default()); + + let state = SessionStateBuilder::new() + .with_config(config) + .with_query_planner(query_planner) + .with_default_features() + .build(); + + Ok(state) +} +``` + +similarly for `ExecutorProcessConfig`: + +```rust +let config: ExecutorProcessConfig = ExecutorProcessConfig { + override_logical_codec: Some(Arc::new(ExtendedBallistaLogicalCodec::default())), + override_physical_codec: Some(Arc::new(ExtendedBallistaPhysicalCodec::default())), + + ..Default::default() +}; + +start_executor_process(Arc::new(config)).await +``` + +## Conclusion + +This project demonstrates how to extend Ballista with custom logical and physical operators, codecs, and planner logic. By following the outlined steps, you can introduce new DataFrame operations and ensure they are supported throughout the distributed query lifecycle. + +For more details, refer to the source code and the linked example files. Contributions and feedback are welcome! + +--- + +**Related links:** + +- [Ballista Extensions Source Code](https://github.com/milenkovicm/ballista_extensions) +- [Apache Arrow Ballista](https://datafusion.apache.org/ballista/) Review Comment: we do not need this two links as well ########## docs/source/user-guide/extensions-example.md: ########## @@ -0,0 +1,491 @@ +<!--- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +--> + +# Extensions Example + +This project demonstrates possible extensions mechanisms for the [Ballista distributed compute platform](https://github.com/apache/arrow-ballista). + +The goal of this small project is to enhance Ballista's capabilities by providing new logical and physical operators, utilities, and integration tools to support additional data processing workflows. + +> [!NOTE] +> +> This project has been part of "Extending DataFusion Ballista" show case series +> +> - [DataFusion Ballista Python UDF Support](https://github.com/milenkovicm/ballista_python) +> - [DataFusion Ballista Read Support For Delta Table](https://github.com/milenkovicm/ballista_delta) +> - [Extending DataFusion Ballista](https://github.com/milenkovicm/ballista_extensions) + +This example will implement [`sample()`](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.sample.html) operator which will return a sampled subset of original `DataFrame`: + +```rust +let ctx = SessionContext::remote_with_state("df://localhost:50050", state).await?; +let df = ctx.read_parquet("data/", Default::default()).await?; + +// The `sample` operator, defined in this project, +// samples 30% of the data and displays the result. +let df = df.sample(0.30, None)?; +``` + +To implement this functionality, it is necessary to implement new logical plan extension, physical operators and extend `DataFrame` to expose new operator. + +> [!WARNING] +> Please do not use implemented sampling operator for production, statisticians would not approve it, probably. + +This demo will provide: + +- Custom DataFusion (logical and physical) nodes. +- Logical and physical extension codecs. +- Custom protocol buffer definitions. +- Extension query planner. + +## Logical Plan Extension + +The first step is to implement a custom logical plan extension: + +```rust +//! This module defines the implementation of the `UserDefinedLogicalNodeCore` trait for the `Sample` logical plan node. +//! +//! The `Sample` node represents a custom logical plan extension for sampling data within a query plan. +//! +use std::{hash::Hash, vec}; + +use datafusion::{ + error::DataFusionError, + logical_expr::{LogicalPlan, UserDefinedLogicalNodeCore}, +}; + +#[derive(Debug, Clone, PartialEq, PartialOrd)] +pub struct Sample { + pub fraction: f32, + pub seed: Option<i64>, + pub input: LogicalPlan, +} + +impl Hash for Sample { + fn hash<H: std::hash::Hasher>(&self, state: &mut H) { + self.seed.hash(state); + self.input.hash(state); + } +} + +impl Eq for Sample {} + +impl Sample { + pub fn new(fraction: f32, seed: Option<i64>, input: LogicalPlan) -> Self { + Self { + fraction, + seed, + input, + } + } +} + +impl UserDefinedLogicalNodeCore for Sample { + fn name(&self) -> &str { + "Sample" + } + + fn inputs(&self) -> Vec<&LogicalPlan> { + vec![&self.input] + } + + fn schema(&self) -> &datafusion::common::DFSchemaRef { + self.input.schema() + } + + fn expressions(&self) -> Vec<datafusion::prelude::Expr> { + vec![] + } + + fn fmt_for_explain(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + f.write_fmt(format_args!( + "Sample: fraction: {}, seed: {:?}", + self.fraction, self.seed + ))?; + Ok(()) + } + + fn with_exprs_and_inputs( + &self, + _exprs: Vec<datafusion::prelude::Expr>, + inputs: Vec<LogicalPlan>, + ) -> datafusion::error::Result<Self> { + Ok(Self { + seed: self.seed, + fraction: self.fraction, + input: inputs + .first() + .ok_or(DataFusionError::Plan("expected single input".to_string()))? + .clone(), + }) + } +} +``` + +## DataFrame Extension + +To expose this functionality to end users, a DataFrame extension] is implemented. This extension creates a `LogicalPlan::Extension(extension)` node: + +```rust +use std::sync::Arc; + +use datafusion::{ + error::DataFusionError, + logical_expr::{Extension, LogicalPlan}, + prelude::DataFrame, +}; + +use crate::logical::sample_extension::Sample; + +pub trait DataFrameExt { + fn sample(self, fraction: f32, seed: Option<i64>) -> datafusion::error::Result<DataFrame>; +} + +/// Returns a new `DataFrame` containing a random sample of rows from the original `DataFrame`. +/// +/// # Arguments +/// +/// * `fraction` - The fraction of rows to sample, must be in the range (0.0, 1.0]. +/// * `seed` - An optional seed for the random number generator to ensure reproducibility. +/// +/// # Errors +/// +/// Returns a `DataFusionError::Configuration` if `fraction` is not within the valid range. +/// +impl DataFrameExt for DataFrame { + fn sample(self, fraction: f32, seed: Option<i64>) -> datafusion::error::Result<DataFrame> { + if !(fraction > 0.0 && fraction <= 1.0) { + Err(DataFusionError::Configuration( + "fraction should be in 0 ..= 1 range".to_string(), + ))? + } + + if seed.unwrap_or(0) < 0 { + Err(DataFusionError::Configuration( + "seed should be positive number".to_string(), + ))? + } + + let (state, input) = self.into_parts(); + + let node = Arc::new(Sample { + fraction, + seed, + input, + }); + let extension = Extension { node }; + let plan = LogicalPlan::Extension(extension); + + Ok(DataFrame::new(state, plan)) + } +} +``` + +This approach enables the addition of new methods to the DataFusion DataFrame implementation: + +```rust +let ctx = SessionContext::remote_with_state("df://localhost:50050", state).await?; +let df = ctx.read_parquet("data/", Default::default()).await?; + +// The DataFrame extension provides the `sample` method +let df = df.sample(0.30, None)?; +``` + + + +## Logical Extension Codec + +With the extension in place, a custom logical extension codec is required to transmit the client logical plan to the scheduler. + +The logical extension codec typically consists of two components: Google Protocol Buffer definitions: + +```proto +message LMessage { + oneof Extension { + LSample sample = 1; + } +} + +message LSample { + float fraction = 1; + optional int64 seed = 2; +} +``` + +See [proto/extension.proto](proto/extension.proto). + +`LogicalExtensionCodec` extends `BallistaLogicalExtensionCodec` handling newly defined operator messages: + +```rust +#[derive(Debug, Default)] +pub struct ExtendedBallistaLogicalCodec { + inner: BallistaLogicalExtensionCodec, +} + +impl LogicalExtensionCodec for ExtendedBallistaLogicalCodec { + fn try_decode( + &self, + buf: &[u8], + inputs: &[datafusion::logical_expr::LogicalPlan], + _ctx: &datafusion::prelude::SessionContext, + ) -> datafusion::error::Result<datafusion::logical_expr::Extension> { + let message = + LMessage::decode(buf).map_err(|e| DataFusionError::Internal(e.to_string()))?; + + match message.extension { + Some(Extension::Sample(sample)) => { + let node = Arc::new(Sample { + input: inputs + .first() + .ok_or(DataFusionError::Plan("expected input".to_string()))? + .clone(), + seed: sample.seed, + fraction: sample.fraction, + }); + + Ok(datafusion::logical_expr::Extension { node }) + } + None => plan_err!("Can't cast logical extension "), + } + } + + fn try_encode( + &self, + node: &datafusion::logical_expr::Extension, + buf: &mut Vec<u8>, + ) -> datafusion::error::Result<()> { + if let Some(Sample { seed, fraction, .. }) = node.node.as_any().downcast_ref::<Sample>() { + let sample = LSample { + seed: *seed, + fraction: *fraction, + }; + let message = LMessage { + extension: Some(super::messages::l_message::Extension::Sample(sample)), + }; + + message + .encode(buf) + .map_err(|e| DataFusionError::Internal(e.to_string()))?; + + Ok(()) + } else { + self.inner.try_encode(node, buf) + } + } + // Additional implementation omitted for brevity +} +``` + +[src/codec/extension.rs](src/codec/extension.rs) + +in short,implementation of the `LogicalExtensionCodec` trait, which handles conversion between Rust structures and protocol buffer definitions. + +## Logical to Physical Plan Translation + +Once the logical plan extension is provided, a translation from the logical node to a physical node is required. The transformation is performed using implementing `ExtensionPlanner` trait: + +```rust +#[derive(Debug, Clone, Default)] +pub struct CustomPlannerExtension {} + +#[async_trait] +impl ExtensionPlanner for CustomPlannerExtension { + async fn plan_extension( + &self, + _planner: &dyn PhysicalPlanner, + node: &dyn UserDefinedLogicalNode, + _logical_inputs: &[&LogicalPlan], + physical_inputs: &[Arc<dyn ExecutionPlan>], + _session_state: &SessionState, + ) -> datafusion::error::Result<Option<Arc<dyn ExecutionPlan>>> { + if let Some(Sample { fraction, seed, .. }) = node.as_any().downcast_ref::<Sample>() { + let input = physical_inputs + .first() + .ok_or(DataFusionError::Plan("expected single input".to_string()))? + .clone(); + let node = SampleExec::new(*fraction, *seed, input); + let node = Arc::new(node); + + Ok(Some(node)) + } else { + Ok(None) + } + } +} +``` + +The [custom planner](src/planner/custom_planner.rs) is registered in the session state as follows: + +```rust +let query_planner = Arc::new(QueryPlannerWithExtensions::default()); + +let state = SessionStateBuilder::new() + .with_query_planner(query_planner) + .with_default_features() + .build(); +``` + +Finally, the generated physical plan is serialized using the [physical plan extension codec](src/codec/extension.rs) and transmitted to the executor(s). Implementation is an extension of `BallistaPhysicalExtensionCodec`: Review Comment: this link will be dead, we don't really need it, can we remove it? ########## docs/source/user-guide/extensions-example.md: ########## @@ -0,0 +1,491 @@ +<!--- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +--> + +# Extensions Example + +This project demonstrates possible extensions mechanisms for the [Ballista distributed compute platform](https://github.com/apache/arrow-ballista). + +The goal of this small project is to enhance Ballista's capabilities by providing new logical and physical operators, utilities, and integration tools to support additional data processing workflows. + +> [!NOTE] +> +> This project has been part of "Extending DataFusion Ballista" show case series +> +> - [DataFusion Ballista Python UDF Support](https://github.com/milenkovicm/ballista_python) +> - [DataFusion Ballista Read Support For Delta Table](https://github.com/milenkovicm/ballista_delta) +> - [Extending DataFusion Ballista](https://github.com/milenkovicm/ballista_extensions) Review Comment: I would suggest not to have this references, I believe this doc is standalone ########## docs/source/user-guide/extensions-example.md: ########## @@ -0,0 +1,491 @@ +<!--- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +--> + +# Extensions Example + +This project demonstrates possible extensions mechanisms for the [Ballista distributed compute platform](https://github.com/apache/arrow-ballista). + +The goal of this small project is to enhance Ballista's capabilities by providing new logical and physical operators, utilities, and integration tools to support additional data processing workflows. + +> [!NOTE] +> +> This project has been part of "Extending DataFusion Ballista" show case series +> +> - [DataFusion Ballista Python UDF Support](https://github.com/milenkovicm/ballista_python) +> - [DataFusion Ballista Read Support For Delta Table](https://github.com/milenkovicm/ballista_delta) +> - [Extending DataFusion Ballista](https://github.com/milenkovicm/ballista_extensions) + +This example will implement [`sample()`](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.sample.html) operator which will return a sampled subset of original `DataFrame`: + +```rust +let ctx = SessionContext::remote_with_state("df://localhost:50050", state).await?; +let df = ctx.read_parquet("data/", Default::default()).await?; + +// The `sample` operator, defined in this project, +// samples 30% of the data and displays the result. +let df = df.sample(0.30, None)?; +``` + +To implement this functionality, it is necessary to implement new logical plan extension, physical operators and extend `DataFrame` to expose new operator. + +> [!WARNING] +> Please do not use implemented sampling operator for production, statisticians would not approve it, probably. + +This demo will provide: + +- Custom DataFusion (logical and physical) nodes. +- Logical and physical extension codecs. +- Custom protocol buffer definitions. +- Extension query planner. + +## Logical Plan Extension + +The first step is to implement a custom logical plan extension: + +```rust +//! This module defines the implementation of the `UserDefinedLogicalNodeCore` trait for the `Sample` logical plan node. +//! +//! The `Sample` node represents a custom logical plan extension for sampling data within a query plan. +//! +use std::{hash::Hash, vec}; + +use datafusion::{ + error::DataFusionError, + logical_expr::{LogicalPlan, UserDefinedLogicalNodeCore}, +}; + +#[derive(Debug, Clone, PartialEq, PartialOrd)] +pub struct Sample { + pub fraction: f32, + pub seed: Option<i64>, + pub input: LogicalPlan, +} + +impl Hash for Sample { + fn hash<H: std::hash::Hasher>(&self, state: &mut H) { + self.seed.hash(state); + self.input.hash(state); + } +} + +impl Eq for Sample {} + +impl Sample { + pub fn new(fraction: f32, seed: Option<i64>, input: LogicalPlan) -> Self { + Self { + fraction, + seed, + input, + } + } +} + +impl UserDefinedLogicalNodeCore for Sample { + fn name(&self) -> &str { + "Sample" + } + + fn inputs(&self) -> Vec<&LogicalPlan> { + vec![&self.input] + } + + fn schema(&self) -> &datafusion::common::DFSchemaRef { + self.input.schema() + } + + fn expressions(&self) -> Vec<datafusion::prelude::Expr> { + vec![] + } + + fn fmt_for_explain(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + f.write_fmt(format_args!( + "Sample: fraction: {}, seed: {:?}", + self.fraction, self.seed + ))?; + Ok(()) + } + + fn with_exprs_and_inputs( + &self, + _exprs: Vec<datafusion::prelude::Expr>, + inputs: Vec<LogicalPlan>, + ) -> datafusion::error::Result<Self> { + Ok(Self { + seed: self.seed, + fraction: self.fraction, + input: inputs + .first() + .ok_or(DataFusionError::Plan("expected single input".to_string()))? + .clone(), + }) + } +} +``` + +## DataFrame Extension + +To expose this functionality to end users, a DataFrame extension] is implemented. This extension creates a `LogicalPlan::Extension(extension)` node: + +```rust +use std::sync::Arc; + +use datafusion::{ + error::DataFusionError, + logical_expr::{Extension, LogicalPlan}, + prelude::DataFrame, +}; + +use crate::logical::sample_extension::Sample; + +pub trait DataFrameExt { + fn sample(self, fraction: f32, seed: Option<i64>) -> datafusion::error::Result<DataFrame>; +} + +/// Returns a new `DataFrame` containing a random sample of rows from the original `DataFrame`. +/// +/// # Arguments +/// +/// * `fraction` - The fraction of rows to sample, must be in the range (0.0, 1.0]. +/// * `seed` - An optional seed for the random number generator to ensure reproducibility. +/// +/// # Errors +/// +/// Returns a `DataFusionError::Configuration` if `fraction` is not within the valid range. +/// +impl DataFrameExt for DataFrame { + fn sample(self, fraction: f32, seed: Option<i64>) -> datafusion::error::Result<DataFrame> { + if !(fraction > 0.0 && fraction <= 1.0) { + Err(DataFusionError::Configuration( + "fraction should be in 0 ..= 1 range".to_string(), + ))? + } + + if seed.unwrap_or(0) < 0 { + Err(DataFusionError::Configuration( + "seed should be positive number".to_string(), + ))? + } + + let (state, input) = self.into_parts(); + + let node = Arc::new(Sample { + fraction, + seed, + input, + }); + let extension = Extension { node }; + let plan = LogicalPlan::Extension(extension); + + Ok(DataFrame::new(state, plan)) + } +} +``` + +This approach enables the addition of new methods to the DataFusion DataFrame implementation: + +```rust +let ctx = SessionContext::remote_with_state("df://localhost:50050", state).await?; +let df = ctx.read_parquet("data/", Default::default()).await?; + +// The DataFrame extension provides the `sample` method +let df = df.sample(0.30, None)?; +``` + + + +## Logical Extension Codec + +With the extension in place, a custom logical extension codec is required to transmit the client logical plan to the scheduler. + +The logical extension codec typically consists of two components: Google Protocol Buffer definitions: + +```proto +message LMessage { + oneof Extension { + LSample sample = 1; + } +} + +message LSample { + float fraction = 1; + optional int64 seed = 2; +} +``` + +See [proto/extension.proto](proto/extension.proto). + +`LogicalExtensionCodec` extends `BallistaLogicalExtensionCodec` handling newly defined operator messages: + +```rust +#[derive(Debug, Default)] +pub struct ExtendedBallistaLogicalCodec { + inner: BallistaLogicalExtensionCodec, +} + +impl LogicalExtensionCodec for ExtendedBallistaLogicalCodec { + fn try_decode( + &self, + buf: &[u8], + inputs: &[datafusion::logical_expr::LogicalPlan], + _ctx: &datafusion::prelude::SessionContext, + ) -> datafusion::error::Result<datafusion::logical_expr::Extension> { + let message = + LMessage::decode(buf).map_err(|e| DataFusionError::Internal(e.to_string()))?; + + match message.extension { + Some(Extension::Sample(sample)) => { + let node = Arc::new(Sample { + input: inputs + .first() + .ok_or(DataFusionError::Plan("expected input".to_string()))? + .clone(), + seed: sample.seed, + fraction: sample.fraction, + }); + + Ok(datafusion::logical_expr::Extension { node }) + } + None => plan_err!("Can't cast logical extension "), + } + } + + fn try_encode( + &self, + node: &datafusion::logical_expr::Extension, + buf: &mut Vec<u8>, + ) -> datafusion::error::Result<()> { + if let Some(Sample { seed, fraction, .. }) = node.node.as_any().downcast_ref::<Sample>() { + let sample = LSample { + seed: *seed, + fraction: *fraction, + }; + let message = LMessage { + extension: Some(super::messages::l_message::Extension::Sample(sample)), + }; + + message + .encode(buf) + .map_err(|e| DataFusionError::Internal(e.to_string()))?; + + Ok(()) + } else { + self.inner.try_encode(node, buf) + } + } + // Additional implementation omitted for brevity +} +``` + +[src/codec/extension.rs](src/codec/extension.rs) + +in short,implementation of the `LogicalExtensionCodec` trait, which handles conversion between Rust structures and protocol buffer definitions. + +## Logical to Physical Plan Translation + +Once the logical plan extension is provided, a translation from the logical node to a physical node is required. The transformation is performed using implementing `ExtensionPlanner` trait: + +```rust +#[derive(Debug, Clone, Default)] +pub struct CustomPlannerExtension {} + +#[async_trait] +impl ExtensionPlanner for CustomPlannerExtension { + async fn plan_extension( + &self, + _planner: &dyn PhysicalPlanner, + node: &dyn UserDefinedLogicalNode, + _logical_inputs: &[&LogicalPlan], + physical_inputs: &[Arc<dyn ExecutionPlan>], + _session_state: &SessionState, + ) -> datafusion::error::Result<Option<Arc<dyn ExecutionPlan>>> { + if let Some(Sample { fraction, seed, .. }) = node.as_any().downcast_ref::<Sample>() { + let input = physical_inputs + .first() + .ok_or(DataFusionError::Plan("expected single input".to_string()))? + .clone(); + let node = SampleExec::new(*fraction, *seed, input); + let node = Arc::new(node); + + Ok(Some(node)) + } else { + Ok(None) + } + } +} +``` + +The [custom planner](src/planner/custom_planner.rs) is registered in the session state as follows: + +```rust +let query_planner = Arc::new(QueryPlannerWithExtensions::default()); + +let state = SessionStateBuilder::new() + .with_query_planner(query_planner) + .with_default_features() + .build(); +``` + +Finally, the generated physical plan is serialized using the [physical plan extension codec](src/codec/extension.rs) and transmitted to the executor(s). Implementation is an extension of `BallistaPhysicalExtensionCodec`: + +```rust +#[derive(Debug, Default)] +pub struct ExtendedBallistaPhysicalCodec { + inner: BallistaPhysicalExtensionCodec, +} + +impl PhysicalExtensionCodec for ExtendedBallistaPhysicalCodec { + fn try_decode( + &self, + buf: &[u8], + inputs: &[std::sync::Arc<dyn datafusion::physical_plan::ExecutionPlan>], + registry: &dyn datafusion::execution::FunctionRegistry, + ) -> datafusion::error::Result<std::sync::Arc<dyn datafusion::physical_plan::ExecutionPlan>> + { + let message = + PMessage::decode(buf).map_err(|e| DataFusionError::Internal(e.to_string()))?; + + match message.extension { + Some(super::messages::p_message::Extension::Sample(PSample { + fraction, seed, .. + })) => { + let input = inputs + .first() + .ok_or(DataFusionError::Plan("expected input".to_string()))? + .clone(); + + let node = Arc::new(SampleExec::new(fraction, seed, input)); + + Ok(node) + } + + Some(super::messages::p_message::Extension::Opaque(opaque)) => { + self.inner.try_decode(&opaque, inputs, registry) + } + None => plan_err!("Can't cast physical extension "), + } + } + + fn try_encode( + &self, + node: std::sync::Arc<dyn datafusion::physical_plan::ExecutionPlan>, + buf: &mut Vec<u8>, + ) -> datafusion::error::Result<()> { + if let Some(SampleExec { fraction, seed, .. }) = node.as_any().downcast_ref::<SampleExec>() + { + let message = PMessage { + extension: Some(super::messages::p_message::Extension::Sample(PSample { + fraction: *fraction, + seed: *seed, + })), + }; + + message + .encode(buf) + .map_err(|e| DataFusionError::Internal(e.to_string()))?; + + Ok(()) + } else { + let mut opaque = vec![]; + self.inner + .try_encode(node, &mut opaque) + .map_err(|e| DataFusionError::Internal(e.to_string()))?; + + let message = PMessage { + extension: Some(super::messages::p_message::Extension::Opaque(opaque)), + }; + + message + .encode(buf) + .map_err(|e| DataFusionError::Internal(e.to_string()))?; + + Ok(()) + } + } +} + +``` + +This should be all moving parts necessary to extend ballista functionality. Last step would be to configure [scheduler](examples/ballista_scheduler.rs) and [executor](examples/ballista_executor.rs) to use new features. Review Comment: this link will be dead, we don't really need it, can we remove it? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
