gene-bordegaray commented on code in PR #22207:
URL: https://github.com/apache/datafusion/pull/22207#discussion_r3281780208


##########
datafusion/physical-expr/src/partitioning.rs:
##########
@@ -845,4 +1045,140 @@ mod tests {
 
         Ok(())
     }
+
+    fn int_split_point(values: impl IntoIterator<Item = i64>) -> SplitPoint {
+        SplitPoint::new(
+            values
+                .into_iter()
+                .map(|value| ScalarValue::Int64(Some(value)))
+                .collect(),
+        )
+    }
+
+    #[test]
+    fn test_range_partitioning_metadata() -> Result<()> {
+        let schema = Arc::new(Schema::new(vec![Field::new("a", 
DataType::Int64, false)]));
+        let col_a: Arc<dyn PhysicalExpr> =
+            Arc::new(Column::new_with_schema("a", &schema)?);
+
+        let range_partitioning = RangePartitioning::new(
+            [PhysicalSortExpr::new_default(Arc::clone(&col_a))].into(),
+            vec![int_split_point([10]), int_split_point([20])],
+        );
+        assert_eq!(range_partitioning.ordering()[0].to_string(), "a@0 ASC");
+        assert_eq!(
+            range_partitioning.split_points(),
+            &[int_split_point([10]), int_split_point([20])]
+        );
+        let partitioning = Partitioning::Range(range_partitioning);
+
+        assert_eq!(partitioning.partition_count(), 3);
+        assert_eq!(
+            partitioning.to_string(),
+            "Range([a@0 ASC], [(10), (20)], 3)"
+        );
+
+        Ok(())
+    }
+
+    #[test]
+    fn test_range_partitioning_project_preserves_or_degrades() -> Result<()> {
+        let schema = Arc::new(Schema::new(vec![
+            Field::new("a", DataType::Int64, false),
+            Field::new("b", DataType::Int64, false),
+        ]));
+        let col_b: Arc<dyn PhysicalExpr> =
+            Arc::new(Column::new_with_schema("b", &schema)?);
+        let eq_properties = EquivalenceProperties::new(Arc::clone(&schema));
+        let range_partitioning = Partitioning::Range(RangePartitioning::new(
+            [PhysicalSortExpr::new(col_b, SortOptions::new(true, 
false))].into(),
+            vec![int_split_point([10])],
+        ));
+
+        let keep_b_mapping = ProjectionMapping::from_indices(&[1], &schema)?;
+        let projected = range_partitioning.project(&keep_b_mapping, 
&eq_properties);
+        assert_eq!(
+            projected.to_string(),
+            "Range([b@0 DESC NULLS LAST], [(10)], 2)"
+        );
+
+        let drop_b_mapping = ProjectionMapping::from_indices(&[0], &schema)?;
+        let projected = range_partitioning.project(&drop_b_mapping, 
&eq_properties);
+        let Partitioning::UnknownPartitioning(partition_count) = projected 
else {
+            panic!("expected UnknownPartitioning, got {projected:?}");
+        };
+        assert_eq!(partition_count, 2);
+
+        Ok(())
+    }
+
+    #[test]
+    fn test_range_partitioning_project_degrades_if_ordering_collapses() -> 
Result<()> {
+        let schema = Arc::new(Schema::new(vec![
+            Field::new("a", DataType::Int64, false),
+            Field::new("b", DataType::Int64, false),
+        ]));
+        let col_a: Arc<dyn PhysicalExpr> =
+            Arc::new(Column::new_with_schema("a", &schema)?);
+        let col_b: Arc<dyn PhysicalExpr> =
+            Arc::new(Column::new_with_schema("b", &schema)?);
+        let target: Arc<dyn PhysicalExpr> = Arc::new(Column::new("x", 0));
+        let eq_properties = EquivalenceProperties::new(Arc::clone(&schema));
+        let range_partitioning = Partitioning::Range(RangePartitioning::new(
+            [
+                PhysicalSortExpr::new_default(Arc::clone(&col_a)),
+                PhysicalSortExpr::new_default(Arc::clone(&col_b)),
+            ]
+            .into(),
+            vec![int_split_point([10, 100])],
+        ));
+        let mapping = ProjectionMapping::from_iter([
+            (
+                Arc::clone(&col_a),
+                ProjectionTargets::from(vec![(Arc::clone(&target), 0)]),
+            ),
+            (
+                Arc::clone(&col_b),
+                ProjectionTargets::from(vec![(Arc::clone(&target), 0)]),
+            ),
+        ]);
+
+        let projected = range_partitioning.project(&mapping, &eq_properties);
+        let Partitioning::UnknownPartitioning(partition_count) = projected 
else {
+            panic!("expected UnknownPartitioning, got {projected:?}");
+        };
+        assert_eq!(partition_count, 2);
+
+        Ok(())
+    }
+
+    #[test]
+    fn test_multi_partition_range_does_not_satisfy_hash_distribution() -> 
Result<()> {
+        let schema = Arc::new(Schema::new(vec![

Review Comment:
   added a fixture for all partitioning tests, eliminted lots of boiler plate 😄 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to