This is an automated email from the ASF dual-hosted git repository.
dheres pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new eb3e135 add percent rank (#1077)
eb3e135 is described below
commit eb3e13598971e97f2fae4b823de40c247202b693
Author: Jiayu Liu <[email protected]>
AuthorDate: Sat Oct 9 14:59:34 2021 +0800
add percent rank (#1077)
---
datafusion/src/physical_plan/expressions/mod.rs | 2 +-
datafusion/src/physical_plan/expressions/rank.rs | 135 ++++++++++++++++++---
datafusion/src/physical_plan/window_functions.rs | 9 ++
datafusion/src/physical_plan/windows/mod.rs | 5 +-
.../simple_window_ranked_built_in_functions.sql | 3 +-
5 files changed, 130 insertions(+), 24 deletions(-)
diff --git a/datafusion/src/physical_plan/expressions/mod.rs
b/datafusion/src/physical_plan/expressions/mod.rs
index 9c05112..9f7a6cc 100644
--- a/datafusion/src/physical_plan/expressions/mod.rs
+++ b/datafusion/src/physical_plan/expressions/mod.rs
@@ -75,7 +75,7 @@ pub use negative::{negative, NegativeExpr};
pub use not::{not, NotExpr};
pub use nth_value::NthValue;
pub use nullif::{nullif_func, SUPPORTED_NULLIF_TYPES};
-pub use rank::{dense_rank, rank};
+pub use rank::{dense_rank, percent_rank, rank};
pub use row_number::RowNumber;
pub use sum::{sum_return_type, Sum};
pub use try_cast::{try_cast, TryCastExpr};
diff --git a/datafusion/src/physical_plan/expressions/rank.rs
b/datafusion/src/physical_plan/expressions/rank.rs
index b82cfa4..b82e900 100644
--- a/datafusion/src/physical_plan/expressions/rank.rs
+++ b/datafusion/src/physical_plan/expressions/rank.rs
@@ -15,14 +15,14 @@
// specific language governing permissions and limitations
// under the License.
-//! Defines physical expression for `rank` and `dense_rank` that can evaluated
+//! Defines physical expression for `rank`, `dense_rank`, and `percent_rank`
that can evaluated
//! at runtime during query execution
use crate::error::Result;
use crate::physical_plan::window_functions::PartitionEvaluator;
use crate::physical_plan::{window_functions::BuiltInWindowFunctionExpr,
PhysicalExpr};
use arrow::array::ArrayRef;
-use arrow::array::UInt64Array;
+use arrow::array::{Float64Array, UInt64Array};
use arrow::datatypes::{DataType, Field};
use arrow::record_batch::RecordBatch;
use std::any::Any;
@@ -34,17 +34,38 @@ use std::sync::Arc;
#[derive(Debug)]
pub struct Rank {
name: String,
- dense: bool,
+ rank_type: RankType,
+}
+
+#[derive(Debug, Copy, Clone)]
+pub(crate) enum RankType {
+ Rank,
+ DenseRank,
+ PercentRank,
}
/// Create a rank window function
pub fn rank(name: String) -> Rank {
- Rank { name, dense: false }
+ Rank {
+ name,
+ rank_type: RankType::Rank,
+ }
}
/// Create a dense rank window function
pub fn dense_rank(name: String) -> Rank {
- Rank { name, dense: true }
+ Rank {
+ name,
+ rank_type: RankType::DenseRank,
+ }
+}
+
+/// Create a percent rank window function
+pub fn percent_rank(name: String) -> Rank {
+ Rank {
+ name,
+ rank_type: RankType::PercentRank,
+ }
}
impl BuiltInWindowFunctionExpr for Rank {
@@ -55,7 +76,10 @@ impl BuiltInWindowFunctionExpr for Rank {
fn field(&self) -> Result<Field> {
let nullable = false;
- let data_type = DataType::UInt64;
+ let data_type = match self.rank_type {
+ RankType::Rank | RankType::DenseRank => DataType::UInt64,
+ RankType::PercentRank => DataType::Float64,
+ };
Ok(Field::new(self.name(), data_type, nullable))
}
@@ -71,12 +95,14 @@ impl BuiltInWindowFunctionExpr for Rank {
&self,
_batch: &RecordBatch,
) -> Result<Box<dyn PartitionEvaluator>> {
- Ok(Box::new(RankEvaluator { dense: self.dense }))
+ Ok(Box::new(RankEvaluator {
+ rank_type: self.rank_type,
+ }))
}
}
pub(crate) struct RankEvaluator {
- dense: bool,
+ rank_type: RankType,
}
impl PartitionEvaluator for RankEvaluator {
@@ -90,18 +116,37 @@ impl PartitionEvaluator for RankEvaluator {
fn evaluate_partition_with_rank(
&self,
- _partition: Range<usize>,
+ partition: Range<usize>,
ranks_in_partition: &[Range<usize>],
) -> Result<ArrayRef> {
- let result = if self.dense {
-
UInt64Array::from_iter_values(ranks_in_partition.iter().zip(1u64..).flat_map(
- |(range, rank)| {
- let len = range.end - range.start;
- iter::repeat(rank).take(len)
- },
- ))
- } else {
- UInt64Array::from_iter_values(
+ // see https://www.postgresql.org/docs/current/functions-window.html
+ let result: ArrayRef = match self.rank_type {
+ RankType::DenseRank => Arc::new(UInt64Array::from_iter_values(
+ ranks_in_partition
+ .iter()
+ .zip(1u64..)
+ .flat_map(|(range, rank)| {
+ let len = range.end - range.start;
+ iter::repeat(rank).take(len)
+ }),
+ )),
+ RankType::PercentRank => {
+ // Returns the relative rank of the current row, that is (rank
- 1) / (total partition rows - 1). The value thus ranges from 0 to 1 inclusive.
+ let denominator = (partition.end - partition.start) as f64;
+ Arc::new(Float64Array::from_iter_values(
+ ranks_in_partition
+ .iter()
+ .scan(0_u64, |acc, range| {
+ let len = range.end - range.start;
+ let value = (*acc as f64) / (denominator -
1.0).max(1.0);
+ let result = iter::repeat(value).take(len);
+ *acc += len as u64;
+ Some(result)
+ })
+ .flatten(),
+ ))
+ }
+ RankType::Rank => Arc::new(UInt64Array::from_iter_values(
ranks_in_partition
.iter()
.scan(1_u64, |acc, range| {
@@ -111,9 +156,9 @@ impl PartitionEvaluator for RankEvaluator {
Some(result)
})
.flatten(),
- )
+ )),
};
- Ok(Arc::new(result))
+ Ok(result)
}
}
@@ -135,6 +180,27 @@ mod tests {
test_i32_result(expr, vec![-2, -2, 1, 3, 3, 3, 7, 8], vec![0..8],
expected)
}
+ fn test_f64_result(
+ expr: &Rank,
+ data: Vec<i32>,
+ range: Range<usize>,
+ ranks: Vec<Range<usize>>,
+ expected: Vec<f64>,
+ ) -> Result<()> {
+ let arr: ArrayRef = Arc::new(Int32Array::from(data));
+ let values = vec![arr];
+ let schema = Schema::new(vec![Field::new("arr", DataType::Int32,
false)]);
+ let batch = RecordBatch::try_new(Arc::new(schema), values.clone())?;
+ let result = expr
+ .create_evaluator(&batch)?
+ .evaluate_with_rank(vec![range], ranks)?;
+ assert_eq!(1, result.len());
+ let result =
result[0].as_any().downcast_ref::<Float64Array>().unwrap();
+ let result = result.values();
+ assert_eq!(expected, result);
+ Ok(())
+ }
+
fn test_i32_result(
expr: &Rank,
data: Vec<i32>,
@@ -170,4 +236,33 @@ mod tests {
test_with_rank(&r, vec![1, 1, 3, 4, 4, 4, 7, 8])?;
Ok(())
}
+
+ #[test]
+ fn test_percent_rank() -> Result<()> {
+ let r = percent_rank("arr".into());
+
+ // empty case
+ let expected = vec![0.0; 0];
+ test_f64_result(&r, vec![0; 0], 0..0, vec![0..0; 0], expected)?;
+
+ // singleton case
+ let expected = vec![0.0];
+ test_f64_result(&r, vec![13], 0..1, vec![0..1], expected)?;
+
+ // uniform case
+ let expected = vec![0.0; 7];
+ test_f64_result(&r, vec![4; 7], 0..7, vec![0..7], expected)?;
+
+ // non-trivial case
+ let expected = vec![0.0, 0.0, 0.0, 0.5, 0.5, 0.5, 0.5];
+ test_f64_result(
+ &r,
+ vec![1, 1, 1, 2, 2, 2, 2],
+ 0..7,
+ vec![0..3, 3..7],
+ expected,
+ )?;
+
+ Ok(())
+ }
}
diff --git a/datafusion/src/physical_plan/window_functions.rs
b/datafusion/src/physical_plan/window_functions.rs
index 2ea131e..9070ca8 100644
--- a/datafusion/src/physical_plan/window_functions.rs
+++ b/datafusion/src/physical_plan/window_functions.rs
@@ -438,6 +438,15 @@ mod tests {
}
#[test]
+ fn test_percent_rank_return_type() -> Result<()> {
+ let fun = WindowFunction::from_str("percent_rank")?;
+ let observed = return_type(&fun, &[])?;
+ assert_eq!(DataType::Float64, observed);
+
+ Ok(())
+ }
+
+ #[test]
fn test_cume_dist_return_type() -> Result<()> {
let fun = WindowFunction::from_str("cume_dist")?;
let observed = return_type(&fun, &[])?;
diff --git a/datafusion/src/physical_plan/windows/mod.rs
b/datafusion/src/physical_plan/windows/mod.rs
index f34649f..0f6d910 100644
--- a/datafusion/src/physical_plan/windows/mod.rs
+++ b/datafusion/src/physical_plan/windows/mod.rs
@@ -22,8 +22,8 @@ use crate::logical_plan::window_frames::WindowFrame;
use crate::physical_plan::{
aggregates,
expressions::{
- cume_dist, dense_rank, lag, lead, rank, Literal, NthValue,
PhysicalSortExpr,
- RowNumber,
+ cume_dist, dense_rank, lag, lead, percent_rank, rank, Literal,
NthValue,
+ PhysicalSortExpr, RowNumber,
},
type_coercion::coerce,
window_functions::{
@@ -96,6 +96,7 @@ fn create_built_in_window_expr(
BuiltInWindowFunction::RowNumber => Arc::new(RowNumber::new(name)),
BuiltInWindowFunction::Rank => Arc::new(rank(name)),
BuiltInWindowFunction::DenseRank => Arc::new(dense_rank(name)),
+ BuiltInWindowFunction::PercentRank => Arc::new(percent_rank(name)),
BuiltInWindowFunction::CumeDist => Arc::new(cume_dist(name)),
BuiltInWindowFunction::Lag => {
let coerced_args = coerce(args, input_schema,
&signature_for_built_in(fun))?;
diff --git a/integration-tests/sqls/simple_window_ranked_built_in_functions.sql
b/integration-tests/sqls/simple_window_ranked_built_in_functions.sql
index adffcbf..868d483 100644
--- a/integration-tests/sqls/simple_window_ranked_built_in_functions.sql
+++ b/integration-tests/sqls/simple_window_ranked_built_in_functions.sql
@@ -18,6 +18,7 @@ select
c9,
cume_dist() OVER (PARTITION BY c2 ORDER BY c3) cume_dist_by_c3,
rank() OVER (PARTITION BY c2 ORDER BY c3) rank_by_c3,
- dense_rank() OVER (PARTITION BY c2 ORDER BY c3) dense_rank_by_c3
+ dense_rank() OVER (PARTITION BY c2 ORDER BY c3) dense_rank_by_c3,
+ percent_rank() OVER (PARTITION BY c2 ORDER BY c3) percent_rank_by_c3
FROM test
ORDER BY c9;