This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new d2da3e6 Add ScalarValue::Struct variant (#1091)
d2da3e6 is described below
commit d2da3e657501af29447fac405ad2cc0441707edb
Author: Jon Mease <[email protected]>
AuthorDate: Mon Oct 11 09:41:40 2021 -0400
Add ScalarValue::Struct variant (#1091)
* Add ScalarValue::Struct variant
* cargo fmt / clippy
* Add curly brackets around Display representation
* Use recursive Debug representations of field values in Debug
* Formatting
* for_each + mutation -> map + unzip
* Test recursive structs
* clippy fixes
---
datafusion/src/scalar.rs | 415 +++++++++++++++++++++++++++++++++++++++++++++++
1 file changed, 415 insertions(+)
diff --git a/datafusion/src/scalar.rs b/datafusion/src/scalar.rs
index 77d4c82..31c48a6 100644
--- a/datafusion/src/scalar.rs
+++ b/datafusion/src/scalar.rs
@@ -86,6 +86,9 @@ pub enum ScalarValue {
IntervalYearMonth(Option<i32>),
/// Interval with DayTime unit
IntervalDayTime(Option<i64>),
+ /// struct of nested ScalarValue (boxed to reduce size_of(ScalarValue))
+ #[allow(clippy::box_vec)]
+ Struct(Option<Box<Vec<ScalarValue>>>, Box<Vec<Field>>),
}
// manual implementation of `PartialEq` that uses OrderedFloat to
@@ -153,6 +156,8 @@ impl PartialEq for ScalarValue {
(IntervalYearMonth(_), _) => false,
(IntervalDayTime(v1), IntervalDayTime(v2)) => v1.eq(v2),
(IntervalDayTime(_), _) => false,
+ (Struct(v1, t1), Struct(v2, t2)) => v1.eq(v2) && t1.eq(t2),
+ (Struct(_, _), _) => false,
}
}
}
@@ -228,6 +233,14 @@ impl PartialOrd for ScalarValue {
(IntervalYearMonth(_), _) => None,
(IntervalDayTime(v1), IntervalDayTime(v2)) => v1.partial_cmp(v2),
(IntervalDayTime(_), _) => None,
+ (Struct(v1, t1), Struct(v2, t2)) => {
+ if t1.eq(t2) {
+ v1.partial_cmp(v2)
+ } else {
+ None
+ }
+ }
+ (Struct(_, _), _) => None,
}
}
}
@@ -273,6 +286,10 @@ impl std::hash::Hash for ScalarValue {
TimestampNanosecond(v) => v.hash(state),
IntervalYearMonth(v) => v.hash(state),
IntervalDayTime(v) => v.hash(state),
+ Struct(v, t) => {
+ v.hash(state);
+ t.hash(state);
+ }
}
}
}
@@ -477,6 +494,7 @@ impl ScalarValue {
DataType::Interval(IntervalUnit::YearMonth)
}
ScalarValue::IntervalDayTime(_) =>
DataType::Interval(IntervalUnit::DayTime),
+ ScalarValue::Struct(_, fields) =>
DataType::Struct(fields.as_ref().clone()),
}
}
@@ -522,6 +540,7 @@ impl ScalarValue {
| ScalarValue::TimestampMillisecond(None)
| ScalarValue::TimestampMicrosecond(None)
| ScalarValue::TimestampNanosecond(None)
+ | ScalarValue::Struct(None, _)
)
}
@@ -758,6 +777,50 @@ impl ScalarValue {
DataType::List(fields) if fields.data_type() ==
&DataType::LargeUtf8 => {
build_array_list_string!(LargeStringBuilder, LargeUtf8)
}
+ DataType::Struct(fields) => {
+ // Initialize a Vector to store the ScalarValues for each
column
+ let mut columns: Vec<Vec<ScalarValue>> =
+ (0..fields.len()).map(|_| Vec::new()).collect();
+
+ // Iterate over scalars to populate the column scalars for
each row
+ for scalar in scalars.into_iter() {
+ if let ScalarValue::Struct(values, fields) = scalar {
+ match values {
+ Some(values) => {
+ // Push value for each field
+ for c in 0..columns.len() {
+ let column = columns.get_mut(c).unwrap();
+ column.push(values[c].clone());
+ }
+ }
+ None => {
+ // Push NULL of the appropriate type for each
field
+ for c in 0..columns.len() {
+ let dtype = fields[c].data_type();
+ let column = columns.get_mut(c).unwrap();
+ column.push(ScalarValue::try_from(dtype)?);
+ }
+ }
+ };
+ } else {
+ return Err(DataFusionError::Internal(format!(
+ "Expected Struct but found: {}",
+ scalar
+ )));
+ };
+ }
+
+ // Call iter_to_array recursively to convert the scalars for
each column into Arrow arrays
+ let field_values = fields
+ .iter()
+ .zip(columns)
+ .map(|(field, column)| -> Result<(Field, ArrayRef)> {
+ Ok((field.clone(), Self::iter_to_array(column)?))
+ })
+ .collect::<Result<Vec<_>>>()?;
+
+ Arc::new(StructArray::from(field_values))
+ }
_ => {
return Err(DataFusionError::Internal(format!(
"Unsupported creation of {:?} array from ScalarValue {:?}",
@@ -905,6 +968,29 @@ impl ScalarValue {
e,
size
),
+ ScalarValue::Struct(values, fields) => match values {
+ Some(values) => {
+ let field_values: Vec<_> = fields
+ .iter()
+ .zip(values.iter())
+ .map(|(field, value)| {
+ (field.clone(), value.to_array_of_size(size))
+ })
+ .collect();
+
+ Arc::new(StructArray::from(field_values))
+ }
+ None => {
+ let field_values: Vec<_> = fields.iter().map(|field| {
+ let none_field =
Self::try_from(field.data_type()).expect(
+ "Failed to construct null ScalarValue from
Struct field type"
+ );
+ (field.clone(), none_field.to_array_of_size(size))
+ }).collect();
+
+ Arc::new(StructArray::from(field_values))
+ }
+ },
}
}
@@ -1002,6 +1088,24 @@ impl ScalarValue {
None => values.data_type().try_into()?,
}
}
+ DataType::Struct(fields) => {
+ let array =
+ array
+ .as_any()
+ .downcast_ref::<StructArray>()
+ .ok_or_else(|| {
+ DataFusionError::Internal(
+ "Failed to downcast ArrayRef to
StructArray".to_string(),
+ )
+ })?;
+ let mut field_values: Vec<ScalarValue> = Vec::new();
+ for col_index in 0..array.num_columns() {
+ let col_array = array.column(col_index);
+ let col_scalar = ScalarValue::try_from_array(col_array,
index)?;
+ field_values.push(col_scalar);
+ }
+ Self::Struct(Some(Box::new(field_values)),
Box::new(fields.clone()))
+ }
other => {
return Err(DataFusionError::NotImplemented(format!(
"Can't create a scalar from array of type \"{:?}\"",
@@ -1092,6 +1196,7 @@ impl ScalarValue {
ScalarValue::IntervalDayTime(val) => {
eq_array_primitive!(array, index, IntervalDayTimeArray, val)
}
+ ScalarValue::Struct(_, _) => unimplemented!(),
}
}
@@ -1171,6 +1276,19 @@ impl FromStr for ScalarValue {
}
}
+impl From<Vec<(&str, ScalarValue)>> for ScalarValue {
+ fn from(value: Vec<(&str, ScalarValue)>) -> Self {
+ let (fields, scalars): (Vec<_>, Vec<_>) = value
+ .into_iter()
+ .map(|(name, scalar)| {
+ (Field::new(name, scalar.get_datatype(), false), scalar)
+ })
+ .unzip();
+
+ Self::Struct(Some(Box::new(scalars)), Box::new(fields))
+ }
+}
+
macro_rules! impl_try_from {
($SCALAR:ident, $NATIVE:ident) => {
impl TryFrom<ScalarValue> for $NATIVE {
@@ -1278,6 +1396,9 @@ impl TryFrom<&DataType> for ScalarValue {
DataType::List(ref nested_type) => {
ScalarValue::List(None,
Box::new(nested_type.data_type().clone()))
}
+ DataType::Struct(fields) => {
+ ScalarValue::Struct(None, Box::new(fields.clone()))
+ }
_ => {
return Err(DataFusionError::NotImplemented(format!(
"Can't create a scalar from data_type \"{:?}\"",
@@ -1354,6 +1475,18 @@ impl fmt::Display for ScalarValue {
ScalarValue::Date64(e) => format_option!(f, e)?,
ScalarValue::IntervalDayTime(e) => format_option!(f, e)?,
ScalarValue::IntervalYearMonth(e) => format_option!(f, e)?,
+ ScalarValue::Struct(e, fields) => match e {
+ Some(l) => write!(
+ f,
+ "{{{}}}",
+ l.iter()
+ .zip(fields.iter())
+ .map(|(value, field)| format!("{}:{}", field.name(),
value))
+ .collect::<Vec<_>>()
+ .join(",")
+ )?,
+ None => write!(f, "NULL")?,
+ },
};
Ok(())
}
@@ -1400,6 +1533,21 @@ impl fmt::Debug for ScalarValue {
ScalarValue::IntervalYearMonth(_) => {
write!(f, "IntervalYearMonth(\"{}\")", self)
}
+ ScalarValue::Struct(e, fields) => {
+ // Use Debug representation of field values
+ match e {
+ Some(l) => write!(
+ f,
+ "Struct({{{}}})",
+ l.iter()
+ .zip(fields.iter())
+ .map(|(value, field)| format!("{}:{:?}",
field.name(), value))
+ .collect::<Vec<_>>()
+ .join(",")
+ ),
+ None => write!(f, "Struct(NULL)"),
+ }
+ }
}
}
}
@@ -1901,5 +2049,272 @@ mod tests {
)),
None
);
+
+ assert_eq!(
+ ScalarValue::from(vec![
+ ("A", ScalarValue::from(1.0)),
+ ("B", ScalarValue::from("Z"))
+ ])
+ .partial_cmp(&ScalarValue::from(vec![
+ ("A", ScalarValue::from(2.0)),
+ ("B", ScalarValue::from("A"))
+ ])),
+ Some(Ordering::Less)
+ );
+
+ // For different struct fields, `partial_cmp` returns None.
+ assert_eq!(
+ ScalarValue::from(vec![
+ ("A", ScalarValue::from(1.0)),
+ ("B", ScalarValue::from("Z"))
+ ])
+ .partial_cmp(&ScalarValue::from(vec![
+ ("a", ScalarValue::from(2.0)),
+ ("b", ScalarValue::from("A"))
+ ])),
+ None
+ );
+ }
+
+ #[test]
+ fn test_scalar_struct() {
+ let field_a = Field::new("A", DataType::Int32, false);
+ let field_b = Field::new("B", DataType::Boolean, false);
+ let field_c = Field::new("C", DataType::Utf8, false);
+
+ let field_e = Field::new("e", DataType::Int16, false);
+ let field_f = Field::new("f", DataType::Int64, false);
+ let field_d = Field::new(
+ "D",
+ DataType::Struct(vec![field_e.clone(), field_f.clone()]),
+ false,
+ );
+
+ let scalar = ScalarValue::Struct(
+ Some(Box::new(vec![
+ ScalarValue::Int32(Some(23)),
+ ScalarValue::Boolean(Some(false)),
+ ScalarValue::Utf8(Some("Hello".to_string())),
+ ScalarValue::from(vec![
+ ("e", ScalarValue::from(2i16)),
+ ("f", ScalarValue::from(3i64)),
+ ]),
+ ])),
+ Box::new(vec![
+ field_a.clone(),
+ field_b.clone(),
+ field_c.clone(),
+ field_d.clone(),
+ ]),
+ );
+
+ // Check Display
+ assert_eq!(
+ format!("{}", scalar),
+ String::from("{A:23,B:false,C:Hello,D:{e:2,f:3}}")
+ );
+
+ // Check Debug
+ assert_eq!(
+ format!("{:?}", scalar),
+ String::from(
+
r#"Struct({A:Int32(23),B:Boolean(false),C:Utf8("Hello"),D:Struct({e:Int16(2),f:Int64(3)})})"#
+ )
+ );
+
+ // Convert to length-2 array
+ let array = scalar.to_array_of_size(2);
+
+ let expected = Arc::new(StructArray::from(vec![
+ (
+ field_a.clone(),
+ Arc::new(Int32Array::from(vec![23, 23])) as ArrayRef,
+ ),
+ (
+ field_b.clone(),
+ Arc::new(BooleanArray::from(vec![false, false])) as ArrayRef,
+ ),
+ (
+ field_c.clone(),
+ Arc::new(StringArray::from(vec!["Hello", "Hello"])) as
ArrayRef,
+ ),
+ (
+ field_d.clone(),
+ Arc::new(StructArray::from(vec![
+ (
+ field_e.clone(),
+ Arc::new(Int16Array::from(vec![2, 2])) as ArrayRef,
+ ),
+ (
+ field_f.clone(),
+ Arc::new(Int64Array::from(vec![3, 3])) as ArrayRef,
+ ),
+ ])) as ArrayRef,
+ ),
+ ])) as ArrayRef;
+
+ assert_eq!(&array, &expected);
+
+ // Construct from second element of ArrayRef
+ let constructed = ScalarValue::try_from_array(&expected, 1).unwrap();
+ assert_eq!(constructed, scalar);
+
+ // None version
+ let none_scalar = ScalarValue::try_from(array.data_type()).unwrap();
+ assert!(none_scalar.is_null());
+ assert_eq!(format!("{:?}", none_scalar), String::from("Struct(NULL)"));
+
+ // Construct with convenience From<Vec<(&str, ScalarValue)>>
+ let constructed = ScalarValue::from(vec![
+ ("A", ScalarValue::from(23)),
+ ("B", ScalarValue::from(false)),
+ ("C", ScalarValue::from("Hello")),
+ (
+ "D",
+ ScalarValue::from(vec![
+ ("e", ScalarValue::from(2i16)),
+ ("f", ScalarValue::from(3i64)),
+ ]),
+ ),
+ ]);
+ assert_eq!(constructed, scalar);
+
+ // Build Array from Vec of structs
+ let scalars = vec![
+ ScalarValue::from(vec![
+ ("A", ScalarValue::from(23)),
+ ("B", ScalarValue::from(false)),
+ ("C", ScalarValue::from("Hello")),
+ (
+ "D",
+ ScalarValue::from(vec![
+ ("e", ScalarValue::from(2i16)),
+ ("f", ScalarValue::from(3i64)),
+ ]),
+ ),
+ ]),
+ ScalarValue::from(vec![
+ ("A", ScalarValue::from(7)),
+ ("B", ScalarValue::from(true)),
+ ("C", ScalarValue::from("World")),
+ (
+ "D",
+ ScalarValue::from(vec![
+ ("e", ScalarValue::from(4i16)),
+ ("f", ScalarValue::from(5i64)),
+ ]),
+ ),
+ ]),
+ ScalarValue::from(vec![
+ ("A", ScalarValue::from(-1000)),
+ ("B", ScalarValue::from(true)),
+ ("C", ScalarValue::from("!!!!!")),
+ (
+ "D",
+ ScalarValue::from(vec![
+ ("e", ScalarValue::from(6i16)),
+ ("f", ScalarValue::from(7i64)),
+ ]),
+ ),
+ ]),
+ ];
+ let array = ScalarValue::iter_to_array(scalars).unwrap();
+
+ let expected = Arc::new(StructArray::from(vec![
+ (
+ field_a,
+ Arc::new(Int32Array::from(vec![23, 7, -1000])) as ArrayRef,
+ ),
+ (
+ field_b,
+ Arc::new(BooleanArray::from(vec![false, true, true])) as
ArrayRef,
+ ),
+ (
+ field_c,
+ Arc::new(StringArray::from(vec!["Hello", "World", "!!!!!"]))
as ArrayRef,
+ ),
+ (
+ field_d,
+ Arc::new(StructArray::from(vec![
+ (
+ field_e,
+ Arc::new(Int16Array::from(vec![2, 4, 6])) as ArrayRef,
+ ),
+ (
+ field_f,
+ Arc::new(Int64Array::from(vec![3, 5, 7])) as ArrayRef,
+ ),
+ ])) as ArrayRef,
+ ),
+ ])) as ArrayRef;
+
+ assert_eq!(&array, &expected);
+ }
+
+ #[test]
+ fn test_scalar_list_in_struct() {
+ let field_a = Field::new("A", DataType::Utf8, false);
+ let field_list = Field::new(
+ "list_field",
+ DataType::List(Box::new(Field::new("item", DataType::Int32,
true))),
+ false,
+ );
+
+ let l0 = ScalarValue::List(
+ Some(Box::new(vec![
+ ScalarValue::from(1i32),
+ ScalarValue::from(2i32),
+ ScalarValue::from(3i32),
+ ])),
+ Box::new(DataType::Int32),
+ );
+
+ let l1 = ScalarValue::List(
+ Some(Box::new(vec![
+ ScalarValue::from(4i32),
+ ScalarValue::from(5i32),
+ ])),
+ Box::new(DataType::Int32),
+ );
+
+ let l2 = ScalarValue::List(
+ Some(Box::new(vec![ScalarValue::from(6i32)])),
+ Box::new(DataType::Int32),
+ );
+
+ let s0 = ScalarValue::from(vec![
+ ("A", ScalarValue::Utf8(Some(String::from("First")))),
+ ("list_field", l0),
+ ]);
+
+ let s1 = ScalarValue::from(vec![
+ ("A", ScalarValue::Utf8(Some(String::from("Second")))),
+ ("list_field", l1),
+ ]);
+
+ let s2 = ScalarValue::from(vec![
+ ("A", ScalarValue::Utf8(Some(String::from("Third")))),
+ ("list_field", l2),
+ ]);
+
+ let array = ScalarValue::iter_to_array(vec![s0, s1, s2]).unwrap();
+ let array = array.as_any().downcast_ref::<StructArray>().unwrap();
+
+ let expected = StructArray::from(vec![
+ (
+ field_a,
+ Arc::new(StringArray::from(vec!["First", "Second", "Third"]))
as ArrayRef,
+ ),
+ (
+ field_list,
+ Arc::new(ListArray::from_iter_primitive::<Int32Type, _,
_>(vec![
+ Some(vec![Some(1), Some(2), Some(3)]),
+ Some(vec![Some(4), Some(5)]),
+ Some(vec![Some(6)]),
+ ])),
+ ),
+ ]);
+
+ assert_eq!(array, &expected);
}
}