This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new eb3e135  add percent rank (#1077)
eb3e135 is described below

commit eb3e13598971e97f2fae4b823de40c247202b693
Author: Jiayu Liu <[email protected]>
AuthorDate: Sat Oct 9 14:59:34 2021 +0800

    add percent rank (#1077)
---
 datafusion/src/physical_plan/expressions/mod.rs    |   2 +-
 datafusion/src/physical_plan/expressions/rank.rs   | 135 ++++++++++++++++++---
 datafusion/src/physical_plan/window_functions.rs   |   9 ++
 datafusion/src/physical_plan/windows/mod.rs        |   5 +-
 .../simple_window_ranked_built_in_functions.sql    |   3 +-
 5 files changed, 130 insertions(+), 24 deletions(-)

diff --git a/datafusion/src/physical_plan/expressions/mod.rs 
b/datafusion/src/physical_plan/expressions/mod.rs
index 9c05112..9f7a6cc 100644
--- a/datafusion/src/physical_plan/expressions/mod.rs
+++ b/datafusion/src/physical_plan/expressions/mod.rs
@@ -75,7 +75,7 @@ pub use negative::{negative, NegativeExpr};
 pub use not::{not, NotExpr};
 pub use nth_value::NthValue;
 pub use nullif::{nullif_func, SUPPORTED_NULLIF_TYPES};
-pub use rank::{dense_rank, rank};
+pub use rank::{dense_rank, percent_rank, rank};
 pub use row_number::RowNumber;
 pub use sum::{sum_return_type, Sum};
 pub use try_cast::{try_cast, TryCastExpr};
diff --git a/datafusion/src/physical_plan/expressions/rank.rs 
b/datafusion/src/physical_plan/expressions/rank.rs
index b82cfa4..b82e900 100644
--- a/datafusion/src/physical_plan/expressions/rank.rs
+++ b/datafusion/src/physical_plan/expressions/rank.rs
@@ -15,14 +15,14 @@
 // specific language governing permissions and limitations
 // under the License.
 
-//! Defines physical expression for `rank` and `dense_rank` that can evaluated
+//! Defines physical expression for `rank`, `dense_rank`, and `percent_rank` 
that can evaluated
 //! at runtime during query execution
 
 use crate::error::Result;
 use crate::physical_plan::window_functions::PartitionEvaluator;
 use crate::physical_plan::{window_functions::BuiltInWindowFunctionExpr, 
PhysicalExpr};
 use arrow::array::ArrayRef;
-use arrow::array::UInt64Array;
+use arrow::array::{Float64Array, UInt64Array};
 use arrow::datatypes::{DataType, Field};
 use arrow::record_batch::RecordBatch;
 use std::any::Any;
@@ -34,17 +34,38 @@ use std::sync::Arc;
 #[derive(Debug)]
 pub struct Rank {
     name: String,
-    dense: bool,
+    rank_type: RankType,
+}
+
+#[derive(Debug, Copy, Clone)]
+pub(crate) enum RankType {
+    Rank,
+    DenseRank,
+    PercentRank,
 }
 
 /// Create a rank window function
 pub fn rank(name: String) -> Rank {
-    Rank { name, dense: false }
+    Rank {
+        name,
+        rank_type: RankType::Rank,
+    }
 }
 
 /// Create a dense rank window function
 pub fn dense_rank(name: String) -> Rank {
-    Rank { name, dense: true }
+    Rank {
+        name,
+        rank_type: RankType::DenseRank,
+    }
+}
+
+/// Create a percent rank window function
+pub fn percent_rank(name: String) -> Rank {
+    Rank {
+        name,
+        rank_type: RankType::PercentRank,
+    }
 }
 
 impl BuiltInWindowFunctionExpr for Rank {
@@ -55,7 +76,10 @@ impl BuiltInWindowFunctionExpr for Rank {
 
     fn field(&self) -> Result<Field> {
         let nullable = false;
-        let data_type = DataType::UInt64;
+        let data_type = match self.rank_type {
+            RankType::Rank | RankType::DenseRank => DataType::UInt64,
+            RankType::PercentRank => DataType::Float64,
+        };
         Ok(Field::new(self.name(), data_type, nullable))
     }
 
@@ -71,12 +95,14 @@ impl BuiltInWindowFunctionExpr for Rank {
         &self,
         _batch: &RecordBatch,
     ) -> Result<Box<dyn PartitionEvaluator>> {
-        Ok(Box::new(RankEvaluator { dense: self.dense }))
+        Ok(Box::new(RankEvaluator {
+            rank_type: self.rank_type,
+        }))
     }
 }
 
 pub(crate) struct RankEvaluator {
-    dense: bool,
+    rank_type: RankType,
 }
 
 impl PartitionEvaluator for RankEvaluator {
@@ -90,18 +116,37 @@ impl PartitionEvaluator for RankEvaluator {
 
     fn evaluate_partition_with_rank(
         &self,
-        _partition: Range<usize>,
+        partition: Range<usize>,
         ranks_in_partition: &[Range<usize>],
     ) -> Result<ArrayRef> {
-        let result = if self.dense {
-            
UInt64Array::from_iter_values(ranks_in_partition.iter().zip(1u64..).flat_map(
-                |(range, rank)| {
-                    let len = range.end - range.start;
-                    iter::repeat(rank).take(len)
-                },
-            ))
-        } else {
-            UInt64Array::from_iter_values(
+        // see https://www.postgresql.org/docs/current/functions-window.html
+        let result: ArrayRef = match self.rank_type {
+            RankType::DenseRank => Arc::new(UInt64Array::from_iter_values(
+                ranks_in_partition
+                    .iter()
+                    .zip(1u64..)
+                    .flat_map(|(range, rank)| {
+                        let len = range.end - range.start;
+                        iter::repeat(rank).take(len)
+                    }),
+            )),
+            RankType::PercentRank => {
+                // Returns the relative rank of the current row, that is (rank 
- 1) / (total partition rows - 1). The value thus ranges from 0 to 1 inclusive.
+                let denominator = (partition.end - partition.start) as f64;
+                Arc::new(Float64Array::from_iter_values(
+                    ranks_in_partition
+                        .iter()
+                        .scan(0_u64, |acc, range| {
+                            let len = range.end - range.start;
+                            let value = (*acc as f64) / (denominator - 
1.0).max(1.0);
+                            let result = iter::repeat(value).take(len);
+                            *acc += len as u64;
+                            Some(result)
+                        })
+                        .flatten(),
+                ))
+            }
+            RankType::Rank => Arc::new(UInt64Array::from_iter_values(
                 ranks_in_partition
                     .iter()
                     .scan(1_u64, |acc, range| {
@@ -111,9 +156,9 @@ impl PartitionEvaluator for RankEvaluator {
                         Some(result)
                     })
                     .flatten(),
-            )
+            )),
         };
-        Ok(Arc::new(result))
+        Ok(result)
     }
 }
 
@@ -135,6 +180,27 @@ mod tests {
         test_i32_result(expr, vec![-2, -2, 1, 3, 3, 3, 7, 8], vec![0..8], 
expected)
     }
 
+    fn test_f64_result(
+        expr: &Rank,
+        data: Vec<i32>,
+        range: Range<usize>,
+        ranks: Vec<Range<usize>>,
+        expected: Vec<f64>,
+    ) -> Result<()> {
+        let arr: ArrayRef = Arc::new(Int32Array::from(data));
+        let values = vec![arr];
+        let schema = Schema::new(vec![Field::new("arr", DataType::Int32, 
false)]);
+        let batch = RecordBatch::try_new(Arc::new(schema), values.clone())?;
+        let result = expr
+            .create_evaluator(&batch)?
+            .evaluate_with_rank(vec![range], ranks)?;
+        assert_eq!(1, result.len());
+        let result = 
result[0].as_any().downcast_ref::<Float64Array>().unwrap();
+        let result = result.values();
+        assert_eq!(expected, result);
+        Ok(())
+    }
+
     fn test_i32_result(
         expr: &Rank,
         data: Vec<i32>,
@@ -170,4 +236,33 @@ mod tests {
         test_with_rank(&r, vec![1, 1, 3, 4, 4, 4, 7, 8])?;
         Ok(())
     }
+
+    #[test]
+    fn test_percent_rank() -> Result<()> {
+        let r = percent_rank("arr".into());
+
+        // empty case
+        let expected = vec![0.0; 0];
+        test_f64_result(&r, vec![0; 0], 0..0, vec![0..0; 0], expected)?;
+
+        // singleton case
+        let expected = vec![0.0];
+        test_f64_result(&r, vec![13], 0..1, vec![0..1], expected)?;
+
+        // uniform case
+        let expected = vec![0.0; 7];
+        test_f64_result(&r, vec![4; 7], 0..7, vec![0..7], expected)?;
+
+        // non-trivial case
+        let expected = vec![0.0, 0.0, 0.0, 0.5, 0.5, 0.5, 0.5];
+        test_f64_result(
+            &r,
+            vec![1, 1, 1, 2, 2, 2, 2],
+            0..7,
+            vec![0..3, 3..7],
+            expected,
+        )?;
+
+        Ok(())
+    }
 }
diff --git a/datafusion/src/physical_plan/window_functions.rs 
b/datafusion/src/physical_plan/window_functions.rs
index 2ea131e..9070ca8 100644
--- a/datafusion/src/physical_plan/window_functions.rs
+++ b/datafusion/src/physical_plan/window_functions.rs
@@ -438,6 +438,15 @@ mod tests {
     }
 
     #[test]
+    fn test_percent_rank_return_type() -> Result<()> {
+        let fun = WindowFunction::from_str("percent_rank")?;
+        let observed = return_type(&fun, &[])?;
+        assert_eq!(DataType::Float64, observed);
+
+        Ok(())
+    }
+
+    #[test]
     fn test_cume_dist_return_type() -> Result<()> {
         let fun = WindowFunction::from_str("cume_dist")?;
         let observed = return_type(&fun, &[])?;
diff --git a/datafusion/src/physical_plan/windows/mod.rs 
b/datafusion/src/physical_plan/windows/mod.rs
index f34649f..0f6d910 100644
--- a/datafusion/src/physical_plan/windows/mod.rs
+++ b/datafusion/src/physical_plan/windows/mod.rs
@@ -22,8 +22,8 @@ use crate::logical_plan::window_frames::WindowFrame;
 use crate::physical_plan::{
     aggregates,
     expressions::{
-        cume_dist, dense_rank, lag, lead, rank, Literal, NthValue, 
PhysicalSortExpr,
-        RowNumber,
+        cume_dist, dense_rank, lag, lead, percent_rank, rank, Literal, 
NthValue,
+        PhysicalSortExpr, RowNumber,
     },
     type_coercion::coerce,
     window_functions::{
@@ -96,6 +96,7 @@ fn create_built_in_window_expr(
         BuiltInWindowFunction::RowNumber => Arc::new(RowNumber::new(name)),
         BuiltInWindowFunction::Rank => Arc::new(rank(name)),
         BuiltInWindowFunction::DenseRank => Arc::new(dense_rank(name)),
+        BuiltInWindowFunction::PercentRank => Arc::new(percent_rank(name)),
         BuiltInWindowFunction::CumeDist => Arc::new(cume_dist(name)),
         BuiltInWindowFunction::Lag => {
             let coerced_args = coerce(args, input_schema, 
&signature_for_built_in(fun))?;
diff --git a/integration-tests/sqls/simple_window_ranked_built_in_functions.sql 
b/integration-tests/sqls/simple_window_ranked_built_in_functions.sql
index adffcbf..868d483 100644
--- a/integration-tests/sqls/simple_window_ranked_built_in_functions.sql
+++ b/integration-tests/sqls/simple_window_ranked_built_in_functions.sql
@@ -18,6 +18,7 @@ select
   c9,
   cume_dist() OVER (PARTITION BY c2 ORDER BY c3) cume_dist_by_c3,
   rank() OVER (PARTITION BY c2 ORDER BY c3) rank_by_c3,
-  dense_rank() OVER (PARTITION BY c2 ORDER BY c3) dense_rank_by_c3
+  dense_rank() OVER (PARTITION BY c2 ORDER BY c3) dense_rank_by_c3,
+  percent_rank() OVER (PARTITION BY c2 ORDER BY c3) percent_rank_by_c3
 FROM test
 ORDER BY c9;

Reply via email to