tobixdev opened a new issue, #14828: URL: https://github.com/apache/datafusion/issues/14828
### Is your feature request related to a problem or challenge? In our system we are working heavily with tagged unions. Basically every column in our results are a `DataType::Union`. However, comparing unions is not natively supported by DF/arrow-rs. This makes sense in a general setting; however, we only have a single union type that we know how to compare. This issue should address the following two problems in this context: 1. Sorting by a column 2. Returns only distinct results I think 2. is a consequence of 1. as somewhere sorting is required during the execution of the query. We have built a workaround for 1. by project to a "sortable" value that DF can support natively. While we may can do some similar workaround with `Distinct::On` for 2., we hope to find a better solution to our problem. An extension of this issue could also allow users to override the default sorting behavior for certain types. ### Describe the solution you'd like I have read [this blog post](https://arrow.apache.org/blog/2022/11/07/multi-column-sorts-in-arrow-rust-part-1/) on sorting in `arrow-rs`. I think it would be nice if we can extend this mechanism in DF/`arrow-rs`. Maybe something like providing a byte encoding for a particular `DataType`? However, I am not really experienced in this area. Looking forward to your opinions! ### Describe alternatives you've considered I think one good way to achieve this would be to integrate this work with logical and extension types (#12622, #12644). However, we would love to be able to use these capabilities before getting full support for logical/extension types. Another way is also to work with the workarounds (basically we could materialize the comparison byte arrays in a UDF). However, using this projection technique is cumbersome as every internal call to sort (see problems with distinct) can cause crashes as the actual column is not sortable. ### Additional context Maybe we also need a "sister-issue" in arrow-rs for tracking this issue. I'd also be interested in helping out to implement this feature. However, I'd need some more experienced input before tackling it. Here are some crude tests that raise the following error. ``` Error: ArrowError(NotYetImplemented("Row format support not yet implemented for: [SortField { options: SortOptions { descending: false, nulls_first: true }, data_type: Union([(0, Field { name: \"A\", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), (1, Field { name: \"B\", data_type: Float64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} })], Dense) }]"), None) ``` # Test Case (Distinct) Here is a test case that can create this problem: ```rust #[tokio::test] async fn test_distinct_on_union() -> Result<()> { let fields = [ (0, Arc::new(Field::new("A", DataType::Int32, false))), (1, Arc::new(Field::new("B", DataType::Float64, false))), ] .into_iter() .collect(); let schema = Schema::new(vec![Field::new( "my_union", DataType::Union(fields, UnionMode::Dense), false, )]); let mut builder = UnionBuilder::new_dense(); builder.append::<Int32Type>("A", 1)?; builder.append::<Float64Type>("B", 3.0)?; builder.append::<Int32Type>("A", 1)?; builder.append::<Float64Type>("B", 3.0)?; let union = builder.build()?; let ctx = SessionContext::new(); ctx.register_table( "test_table", Arc::new(MemTable::try_new( Arc::new(schema.clone()), vec![vec![RecordBatch::try_new( Arc::new(schema), vec![Arc::new(union)], )?]], )?), )?; let result = ctx.table("test_table").await?.distinct()?.count().await?; assert_eq!(result, 1); Ok(()) } ``` # Test Case for Sort This test fails with the same error. Note that I think this test should also fail in the future. However, by extending the row converter procedure, we hope to get this test running. ```rust #[tokio::test] async fn test_sort_on_union() -> Result<()> { let fields = [ (0, Arc::new(Field::new("A", DataType::Int32, false))), (1, Arc::new(Field::new("B", DataType::Float64, false))), ] .into_iter() .collect(); let schema = Schema::new(vec![Field::new( "my_union", DataType::Union(fields, UnionMode::Dense), false, )]); let mut builder = UnionBuilder::new_dense(); builder.append::<Int32Type>("A", 1)?; builder.append::<Float64Type>("B", 3.0)?; builder.append::<Int32Type>("A", 1)?; builder.append::<Float64Type>("B", 3.0)?; let union = builder.build()?; let ctx = SessionContext::new(); ctx.register_table( "test_table", Arc::new(MemTable::try_new( Arc::new(schema.clone()), vec![vec![RecordBatch::try_new( Arc::new(schema), vec![Arc::new(union)], )?]], )?), )?; ctx.table("test_table") .await? .sort_by(vec![Expr::from(datafusion::common::Column::from( "my_union", ))])? .execute_stream() .await? .next() .await .unwrap()?; Ok(()) } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org