paleolimbot commented on code in PR #21984:
URL: https://github.com/apache/datafusion/pull/21984#discussion_r3253991816
##########
datafusion/functions-nested/src/make_array.rs:
##########
@@ -129,7 +161,30 @@ impl ScalarUDFImpl for MakeArray {
/// `make_array_inner` is the implementation of the `make_array` function.
/// Constructs an array using the input `data` as `ArrayRef`.
/// Returns a reference-counted `Array` instance result.
+///
+/// This entry point synthesizes a bare inner field with no metadata. Prefer
+/// [`make_array_inner_with_field`] from contexts where a planning-time
+/// `FieldRef` is available (e.g. `MakeArray::invoke_with_args`) so that
+/// per-field metadata (notably Arrow extension types) flows through.
Review Comment:
I feel like I may be missing something with respect to how these get called
(any place that this gets called otherwise feels like it may be a bug or an
opportunity to propagate metadata that isn't captured in this PR). If this
causes CI mayhem it could be tracked as a follow-up ticket.
##########
datafusion/functions-nested/src/map.rs:
##########
@@ -63,8 +63,46 @@ fn can_evaluate_to_const(args: &[ColumnarValue]) -> bool {
.all(|arg| matches!(arg, ColumnarValue::Scalar(_)))
}
+#[cfg(test)]
fn make_map_batch(args: &[ColumnarValue]) -> Result<ColumnarValue> {
Review Comment:
Should this move to `mod test`?
##########
datafusion/functions-aggregate/src/array_agg.rs:
##########
@@ -1204,6 +1246,31 @@ mod tests {
use datafusion_physical_expr::PhysicalExpr;
use datafusion_physical_expr::expressions::Column;
+ fn int32_field() -> FieldRef {
+ Arc::new(Field::new("c", DataType::Int32, true))
+ }
+
+ /// Regression test for #21982: `array_agg` must propagate the input
+ /// field's metadata onto the resulting list's inner field.
+ #[test]
+ fn array_agg_preserves_inner_field_metadata() -> Result<()> {
+ let metadata = std::collections::HashMap::from([(
+ "ARROW:extension:name".to_string(),
+ "arrow.uuid".to_string(),
+ )]);
+ let input: FieldRef =
+ Arc::new(Field::new("c", DataType::Int64,
true).with_metadata(metadata));
Review Comment:
This can use arbitrary key/value metadata
##########
datafusion/functions/src/core/struct.rs:
##########
Review Comment:
This can return internal_err now (I missed some others I think)
##########
datafusion/functions/src/core/struct.rs:
##########
@@ -137,3 +159,42 @@ impl ScalarUDFImpl for StructFunc {
self.doc()
}
}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use std::collections::HashMap;
+
+ /// Regression test for #21982: `struct(...)` must propagate each input
+ /// field's metadata onto the corresponding member of the output struct.
+ #[test]
+ fn struct_preserves_member_metadata() -> Result<()> {
+ let with_meta = |k: &str, v: &str, dt: DataType| -> FieldRef {
+ let metadata = HashMap::from([(k.to_string(), v.to_string())]);
+ Arc::new(Field::new("c", dt, true).with_metadata(metadata))
+ };
+ let a = with_meta("ARROW:extension:name", "arrow.uuid",
DataType::Int64);
+ let b = with_meta("ARROW:extension:name", "arrow.json",
DataType::Utf8);
Review Comment:
These can probably just be arbitrary key/value metadata and not invalid
extension types
##########
datafusion/functions-nested/src/map.rs:
##########
@@ -398,8 +456,44 @@ impl ScalarUDFImpl for MapFunc {
))
}
+ fn return_field_from_args(&self, args: ReturnFieldArgs) ->
Result<FieldRef> {
+ let [keys_arg, values_arg] = take_function_args(self.name(),
args.arg_fields)?;
+
+ // Element fields preserve the input lists' element-field metadata
+ // (e.g. Arrow extension types). Override only the name and nullable
+ // flag to match the canonical map entries schema (key non-null,
+ // value nullable).
+ let key_field = get_element_field(keys_arg)?
+ .as_ref()
+ .clone()
+ .with_name("key")
+ .with_nullable(false);
+ let value_field = get_element_field(values_arg)?
+ .as_ref()
+ .clone()
+ .with_name("value")
+ .with_nullable(true);
+
+ let mut builder = SchemaBuilder::new();
+ builder.push(key_field);
+ builder.push(value_field);
+ let fields = builder.finish().fields;
+ let entries = Arc::new(Field::new("entries", DataType::Struct(fields),
false));
Review Comment:
Does this work/is this any cleaner?
```suggestion
let entries = Arc::new(Field::new("entries",
DataType::Struct(vec![Arc::new(key_field), Arc::new(value_field)]), false));
```
##########
datafusion/functions-nested/src/repeat.rs:
##########
Review Comment:
I think this can return `internal_err()` now
##########
datafusion/functions-nested/src/arrays_zip.rs:
##########
@@ -223,12 +266,21 @@ fn arrays_zip_inner(args: &[ArrayRef]) ->
Result<ArrayRef> {
.map(|v| v.as_ref().map(|view| view.values.to_data()))
.collect();
- let struct_fields: Fields = element_types
- .iter()
- .enumerate()
- .map(|(i, dt)| Field::new(format!("{}", i + 1), dt.clone(), true))
- .collect::<Vec<_>>()
- .into();
+ // Use the planning-time struct fields (which preserve input metadata).
+ // Fall back to building bare fields from element types only if the
+ // planning-time inner field doesn't carry a struct — that shouldn't
+ // happen for a normal `arrays_zip` invocation (`return_field_from_args`
+ // always returns `List<Struct<...>>`), but it is reachable if a caller
+ // constructs `ScalarFunctionArgs` manually with a custom `return_field`.
Review Comment:
Should this case return `internal_err!()` instead? (Or should there be/is
there already a test that asserts this behaviour?)
##########
datafusion/functions-nested/src/map.rs:
##########
@@ -280,14 +329,23 @@ fn make_map_batch_internal(
let entry_struct: Vec<(Arc<Field>, ArrayRef)> = entry_struct_buffer.into();
let entry_struct = StructArray::from(entry_struct);
- let map_data_type = DataType::Map(
+ // The planning-time `entries` field may declare a struct shape that
+ // doesn't match the runtime `entry_struct` (e.g. when the key/value
+ // arguments are not list-shaped). In that case, fall back to building
+ // a fresh entries field from the runtime struct so the resulting
+ // MapArray is well-formed; otherwise use the planning-time field.
+ let entries_field = if let DataType::Struct(_) = entries.data_type()
Review Comment:
There are a few different reasons listed in the comments in this file about
why this might happen but `return_field_from_args()` seems to only accept
list-like arguments.
##########
datafusion/functions-nested/src/repeat.rs:
##########
@@ -129,8 +130,26 @@ impl ScalarUDFImpl for ArrayRepeat {
}
}
+ fn return_field_from_args(&self, args: ReturnFieldArgs) ->
Result<FieldRef> {
+ let element = &args.arg_fields[0];
+ let inner = nullable_list_item_field_from(element);
+ let data_type = match element.data_type() {
+ LargeList(_) => LargeList(inner),
+ _ => List(inner),
+ };
+ Ok(Arc::new(Field::new(self.name(), data_type, true)))
+ }
+
fn invoke_with_args(&self, args: ScalarFunctionArgs) ->
Result<ColumnarValue> {
- make_scalar_function(array_repeat_inner)(&args.args)
+ let inner_field = match args.return_field.data_type() {
+ List(field) | LargeList(field) | DataType::FixedSizeList(field, _)
=> {
+ Some(Arc::clone(field))
+ }
+ _ => None,
Review Comment:
This seems like it should be an internal_err (and FixedSizeList should be
omitted from the list unless it is returned)
##########
datafusion/functions-nested/src/arrays_zip.rs:
##########
@@ -315,15 +367,66 @@ fn arrays_zip_inner(args: &[ArrayRef]) ->
Result<ArrayRef> {
let null_buffer = null_builder.finish();
- let result = ListArray::try_new(
+ // Reuse the planning-time inner field when its data type matches the
+ // assembled struct; otherwise (see fallback above) rebuild it.
+ let list_inner = if inner_field.data_type() == struct_array.data_type() {
+ inner_field
+ } else {
Arc::new(Field::new_list_field(
struct_array.data_type().clone(),
true,
- )),
+ ))
+ };
+ let result = ListArray::try_new(
+ list_inner,
OffsetBuffer::new(offsets.into()),
Arc::new(struct_array),
null_buffer,
)?;
Ok(Arc::new(result))
}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use std::collections::HashMap;
+
+ /// Regression test for #21982: `arrays_zip` must propagate each input
+ /// list's element-field metadata onto the corresponding struct member.
+ ///
+ /// Uses arbitrary key/value metadata (not the official `ARROW:extension:*`
+ /// keys) since the bare data types here would not be valid extension-type
+ /// storage anyway; what we're asserting is that metadata flows through,
+ /// not extension-type semantics.
Review Comment:
Thanks for humouring me on this one
##########
datafusion/functions/src/core/struct.rs:
##########
@@ -114,6 +117,25 @@ impl ScalarUDFImpl for StructFunc {
Ok(DataType::Struct(fields))
}
+ fn return_field_from_args(&self, args: ReturnFieldArgs) ->
Result<FieldRef> {
+ if args.arg_fields.is_empty() {
+ return exec_err!("struct requires at least one argument, got 0
instead");
+ }
+ // Preserve each input field's metadata on the corresponding struct
+ // member field so Arrow extension types survive `struct(...)` calls.
+ let fields: Fields = args
+ .arg_fields
+ .iter()
+ .enumerate()
+ .map(|(pos, f)| nullable_inner_field_from(f, &format!("c{pos}")))
Review Comment:
Should probably inherit the nullability of the input (although maybe this
breaks tests that expected otherwise)
##########
datafusion/functions-nested/src/repeat.rs:
##########
@@ -142,20 +161,23 @@ impl ScalarUDFImpl for ArrayRepeat {
}
}
-fn array_repeat_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
+fn array_repeat_inner_with_field(
+ args: &[ArrayRef],
+ inner_field: Option<FieldRef>,
Review Comment:
If you return internal_err above I think this (and the other optional field
refs) can be just `FieldRef`
##########
datafusion/functions-aggregate/src/array_agg.rs:
##########
@@ -1996,7 +2063,10 @@ mod tests {
#[test]
fn retract_basic_sliding_window() -> Result<()> {
- let mut acc = ArrayAggAccumulator::try_new(&DataType::Utf8, false)?;
+ let mut acc = ArrayAggAccumulator::try_new(
+ &Arc::new(Field::new("c", DataType::Utf8, true)),
+ false,
+ )?;
Review Comment:
`&DataType::Utf8.into_nullable_field()` is also available for this
##########
datafusion/spark/src/function/aggregate/collect.rs:
##########
@@ -143,7 +143,7 @@ impl AggregateUDFImpl for SparkCollectSet {
let data_type = field.data_type().clone();
let ignore_nulls = true;
Ok(Box::new(NullToEmptyListAccumulator::new(
- DistinctArrayAggAccumulator::try_new(&data_type, None,
ignore_nulls)?,
+ DistinctArrayAggAccumulator::try_new(field, None, ignore_nulls)?,
data_type,
)))
Review Comment:
Is there a test for this one? Maybe `VALUES (('a', NULL), ('a', NULL), ('b',
<something with metadata>, ('b', <something with metadata>))` with a group by +
collect?
##########
datafusion/functions-aggregate/src/array_agg.rs:
##########
Review Comment:
This can now just be internall_err
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]