This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 597ad62 ARROW-9617: [Rust] [DataFusion] Add length of string array
597ad62 is described below
commit 597ad62aeae9c07ec38a83720d436ae8f0322d0d
Author: Jorge C. Leitao <[email protected]>
AuthorDate: Sat Aug 15 09:29:46 2020 -0600
ARROW-9617: [Rust] [DataFusion] Add length of string array
Adds the function to DataFusion.
Closes #7878 from jorgecarleitao/length-df
Authored-by: Jorge C. Leitao <[email protected]>
Signed-off-by: Andy Grove <[email protected]>
---
rust/datafusion/README.md | 3 +-
rust/datafusion/src/execution/context.rs | 6 ++-
.../execution/physical_plan/math_expressions.rs | 46 ++++++++++++----------
rust/datafusion/src/execution/physical_plan/mod.rs | 20 +++++++++-
rust/datafusion/src/logicalplan.rs | 5 +++
rust/datafusion/tests/sql.rs | 22 ++++++++++-
6 files changed, 75 insertions(+), 27 deletions(-)
diff --git a/rust/datafusion/README.md b/rust/datafusion/README.md
index 46160ae..0a19368 100644
--- a/rust/datafusion/README.md
+++ b/rust/datafusion/README.md
@@ -54,7 +54,8 @@ DataFusion includes a simple command-line interactive SQL
utility. See the [CLI
- [x] Aggregate
- [x] UDFs
- [x] Common math functions
-- [ ] Common string functions
+- String functions
+ - [x] Length of the string
- [ ] Common date/time functions
- [x] Sorting
- [ ] Nested types
diff --git a/rust/datafusion/src/execution/context.rs
b/rust/datafusion/src/execution/context.rs
index 56aa810..cb0e608 100644
--- a/rust/datafusion/src/execution/context.rs
+++ b/rust/datafusion/src/execution/context.rs
@@ -41,11 +41,11 @@ use crate::execution::physical_plan::expressions::{
};
use crate::execution::physical_plan::hash_aggregate::HashAggregateExec;
use crate::execution::physical_plan::limit::GlobalLimitExec;
-use crate::execution::physical_plan::math_expressions::register_math_functions;
use crate::execution::physical_plan::memory::MemoryExec;
use crate::execution::physical_plan::merge::MergeExec;
use crate::execution::physical_plan::parquet::ParquetExec;
use crate::execution::physical_plan::projection::ProjectionExec;
+use crate::execution::physical_plan::scalar_functions;
use crate::execution::physical_plan::selection::SelectionExec;
use crate::execution::physical_plan::sort::{SortExec, SortOptions};
use crate::execution::physical_plan::udf::{ScalarFunction, ScalarFunctionExpr};
@@ -117,7 +117,9 @@ impl ExecutionContext {
scalar_functions: HashMap::new(),
config,
};
- register_math_functions(&mut ctx);
+ for udf in scalar_functions() {
+ ctx.register_udf(udf);
+ }
ctx
}
diff --git a/rust/datafusion/src/execution/physical_plan/math_expressions.rs
b/rust/datafusion/src/execution/physical_plan/math_expressions.rs
index aa578b6..97098d6 100644
--- a/rust/datafusion/src/execution/physical_plan/math_expressions.rs
+++ b/rust/datafusion/src/execution/physical_plan/math_expressions.rs
@@ -18,7 +18,6 @@
//! Math expressions
use crate::error::ExecutionError;
-use crate::execution::context::ExecutionContext;
use crate::execution::physical_plan::udf::ScalarFunction;
use arrow::array::{Array, ArrayRef, Float64Array, Float64Builder};
@@ -56,32 +55,37 @@ macro_rules! math_unary_function {
};
}
-/// Register math scalar functions with the context
-pub fn register_math_functions(ctx: &mut ExecutionContext) {
- ctx.register_udf(math_unary_function!("sqrt", sqrt));
- ctx.register_udf(math_unary_function!("sin", sin));
- ctx.register_udf(math_unary_function!("cos", cos));
- ctx.register_udf(math_unary_function!("tan", tan));
- ctx.register_udf(math_unary_function!("asin", asin));
- ctx.register_udf(math_unary_function!("acos", acos));
- ctx.register_udf(math_unary_function!("atan", atan));
- ctx.register_udf(math_unary_function!("floor", floor));
- ctx.register_udf(math_unary_function!("ceil", ceil));
- ctx.register_udf(math_unary_function!("round", round));
- ctx.register_udf(math_unary_function!("trunc", trunc));
- ctx.register_udf(math_unary_function!("abs", abs));
- ctx.register_udf(math_unary_function!("signum", signum));
- ctx.register_udf(math_unary_function!("exp", exp));
- ctx.register_udf(math_unary_function!("log", ln));
- ctx.register_udf(math_unary_function!("log2", log2));
- ctx.register_udf(math_unary_function!("log10", log10));
+/// vector of math scalar functions
+pub fn scalar_functions() -> Vec<ScalarFunction> {
+ vec![
+ math_unary_function!("sqrt", sqrt),
+ math_unary_function!("sin", sin),
+ math_unary_function!("cos", cos),
+ math_unary_function!("tan", tan),
+ math_unary_function!("asin", asin),
+ math_unary_function!("acos", acos),
+ math_unary_function!("atan", atan),
+ math_unary_function!("floor", floor),
+ math_unary_function!("ceil", ceil),
+ math_unary_function!("round", round),
+ math_unary_function!("trunc", trunc),
+ math_unary_function!("abs", abs),
+ math_unary_function!("signum", signum),
+ math_unary_function!("exp", exp),
+ math_unary_function!("log", ln),
+ math_unary_function!("log2", log2),
+ math_unary_function!("log10", log10),
+ ]
}
#[cfg(test)]
mod tests {
use super::*;
use crate::error::Result;
- use crate::logicalplan::{col, sqrt, LogicalPlanBuilder};
+ use crate::{
+ execution::context::ExecutionContext,
+ logicalplan::{col, sqrt, LogicalPlanBuilder},
+ };
use arrow::datatypes::Schema;
#[test]
diff --git a/rust/datafusion/src/execution/physical_plan/mod.rs
b/rust/datafusion/src/execution/physical_plan/mod.rs
index 96fd851..de76916 100644
--- a/rust/datafusion/src/execution/physical_plan/mod.rs
+++ b/rust/datafusion/src/execution/physical_plan/mod.rs
@@ -25,8 +25,12 @@ use std::sync::{Arc, Mutex};
use crate::error::Result;
use crate::logicalplan::ScalarValue;
use arrow::array::ArrayRef;
-use arrow::datatypes::{DataType, Schema, SchemaRef};
-use arrow::record_batch::{RecordBatch, RecordBatchReader};
+use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
+use arrow::{
+ compute::kernels::length::length,
+ record_batch::{RecordBatch, RecordBatchReader},
+};
+use udf::ScalarFunction;
/// Partition-aware execution plan for a relation
pub trait ExecutionPlan: Debug {
@@ -77,6 +81,18 @@ pub trait Accumulator: Debug {
fn get_value(&self) -> Result<Option<ScalarValue>>;
}
+/// Vector of scalar functions declared in this module
+pub fn scalar_functions() -> Vec<ScalarFunction> {
+ let mut udfs = vec![ScalarFunction::new(
+ "length",
+ vec![Field::new("n", DataType::Utf8, true)],
+ DataType::UInt32,
+ Arc::new(|args: &[ArrayRef]| Ok(Arc::new(length(args[0].as_ref())?))),
+ )];
+ udfs.append(&mut math_expressions::scalar_functions());
+ udfs
+}
+
pub mod common;
pub mod csv;
pub mod datasource;
diff --git a/rust/datafusion/src/logicalplan.rs
b/rust/datafusion/src/logicalplan.rs
index 63397c0..55c9b14 100644
--- a/rust/datafusion/src/logicalplan.rs
+++ b/rust/datafusion/src/logicalplan.rs
@@ -580,6 +580,11 @@ unary_math_expr!("log", ln);
unary_math_expr!("log2", log2);
unary_math_expr!("log10", log10);
+/// returns the length of a string in bytes
+pub fn length(e: Expr) -> Expr {
+ scalar_function("length", vec![e], DataType::UInt32)
+}
+
/// Create an aggregate expression
pub fn aggregate_expr(name: &str, expr: Expr, return_type: DataType) -> Expr {
Expr::AggregateFunction {
diff --git a/rust/datafusion/tests/sql.rs b/rust/datafusion/tests/sql.rs
index 5b3dc1d..a258d3d 100644
--- a/rust/datafusion/tests/sql.rs
+++ b/rust/datafusion/tests/sql.rs
@@ -25,7 +25,7 @@ use arrow::array::*;
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use arrow::record_batch::RecordBatch;
-use datafusion::datasource::csv::CsvReadOptions;
+use datafusion::datasource::{csv::CsvReadOptions, MemTable};
use datafusion::error::Result;
use datafusion::execution::context::ExecutionContext;
use datafusion::execution::physical_plan::udf::ScalarFunction;
@@ -619,3 +619,23 @@ fn result_str(results: &[RecordBatch]) -> Vec<String> {
}
result
}
+
+#[test]
+fn query_length() -> Result<()> {
+ let schema = Arc::new(Schema::new(vec![Field::new("c1", DataType::Utf8,
false)]));
+
+ let data = RecordBatch::try_new(
+ schema.clone(),
+ vec![Arc::new(StringArray::from(vec!["", "a", "aa", "aaa"]))],
+ )?;
+
+ let table = MemTable::new(schema, vec![vec![data]])?;
+
+ let mut ctx = ExecutionContext::new();
+ ctx.register_table("test", Box::new(table));
+ let sql = "SELECT length(c1) FROM test";
+ let actual = execute(&mut ctx, sql).join("\n");
+ let expected = "0\n1\n2\n3".to_string();
+ assert_eq!(expected, actual);
+ Ok(())
+}