This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new d2da3e6  Add ScalarValue::Struct variant (#1091)
d2da3e6 is described below

commit d2da3e657501af29447fac405ad2cc0441707edb
Author: Jon Mease <[email protected]>
AuthorDate: Mon Oct 11 09:41:40 2021 -0400

    Add ScalarValue::Struct variant (#1091)
    
    * Add ScalarValue::Struct variant
    
    * cargo fmt / clippy
    
    * Add curly brackets around Display representation
    
    * Use recursive Debug representations of field values in Debug
    
    * Formatting
    
    * for_each + mutation -> map + unzip
    
    * Test recursive structs
    
    * clippy fixes
---
 datafusion/src/scalar.rs | 415 +++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 415 insertions(+)

diff --git a/datafusion/src/scalar.rs b/datafusion/src/scalar.rs
index 77d4c82..31c48a6 100644
--- a/datafusion/src/scalar.rs
+++ b/datafusion/src/scalar.rs
@@ -86,6 +86,9 @@ pub enum ScalarValue {
     IntervalYearMonth(Option<i32>),
     /// Interval with DayTime unit
     IntervalDayTime(Option<i64>),
+    /// struct of nested ScalarValue (boxed to reduce size_of(ScalarValue))
+    #[allow(clippy::box_vec)]
+    Struct(Option<Box<Vec<ScalarValue>>>, Box<Vec<Field>>),
 }
 
 // manual implementation of `PartialEq` that uses OrderedFloat to
@@ -153,6 +156,8 @@ impl PartialEq for ScalarValue {
             (IntervalYearMonth(_), _) => false,
             (IntervalDayTime(v1), IntervalDayTime(v2)) => v1.eq(v2),
             (IntervalDayTime(_), _) => false,
+            (Struct(v1, t1), Struct(v2, t2)) => v1.eq(v2) && t1.eq(t2),
+            (Struct(_, _), _) => false,
         }
     }
 }
@@ -228,6 +233,14 @@ impl PartialOrd for ScalarValue {
             (IntervalYearMonth(_), _) => None,
             (IntervalDayTime(v1), IntervalDayTime(v2)) => v1.partial_cmp(v2),
             (IntervalDayTime(_), _) => None,
+            (Struct(v1, t1), Struct(v2, t2)) => {
+                if t1.eq(t2) {
+                    v1.partial_cmp(v2)
+                } else {
+                    None
+                }
+            }
+            (Struct(_, _), _) => None,
         }
     }
 }
@@ -273,6 +286,10 @@ impl std::hash::Hash for ScalarValue {
             TimestampNanosecond(v) => v.hash(state),
             IntervalYearMonth(v) => v.hash(state),
             IntervalDayTime(v) => v.hash(state),
+            Struct(v, t) => {
+                v.hash(state);
+                t.hash(state);
+            }
         }
     }
 }
@@ -477,6 +494,7 @@ impl ScalarValue {
                 DataType::Interval(IntervalUnit::YearMonth)
             }
             ScalarValue::IntervalDayTime(_) => 
DataType::Interval(IntervalUnit::DayTime),
+            ScalarValue::Struct(_, fields) => 
DataType::Struct(fields.as_ref().clone()),
         }
     }
 
@@ -522,6 +540,7 @@ impl ScalarValue {
                 | ScalarValue::TimestampMillisecond(None)
                 | ScalarValue::TimestampMicrosecond(None)
                 | ScalarValue::TimestampNanosecond(None)
+                | ScalarValue::Struct(None, _)
         )
     }
 
@@ -758,6 +777,50 @@ impl ScalarValue {
             DataType::List(fields) if fields.data_type() == 
&DataType::LargeUtf8 => {
                 build_array_list_string!(LargeStringBuilder, LargeUtf8)
             }
+            DataType::Struct(fields) => {
+                // Initialize a Vector to store the ScalarValues for each 
column
+                let mut columns: Vec<Vec<ScalarValue>> =
+                    (0..fields.len()).map(|_| Vec::new()).collect();
+
+                // Iterate over scalars to populate the column scalars for 
each row
+                for scalar in scalars.into_iter() {
+                    if let ScalarValue::Struct(values, fields) = scalar {
+                        match values {
+                            Some(values) => {
+                                // Push value for each field
+                                for c in 0..columns.len() {
+                                    let column = columns.get_mut(c).unwrap();
+                                    column.push(values[c].clone());
+                                }
+                            }
+                            None => {
+                                // Push NULL of the appropriate type for each 
field
+                                for c in 0..columns.len() {
+                                    let dtype = fields[c].data_type();
+                                    let column = columns.get_mut(c).unwrap();
+                                    column.push(ScalarValue::try_from(dtype)?);
+                                }
+                            }
+                        };
+                    } else {
+                        return Err(DataFusionError::Internal(format!(
+                            "Expected Struct but found: {}",
+                            scalar
+                        )));
+                    };
+                }
+
+                // Call iter_to_array recursively to convert the scalars for 
each column into Arrow arrays
+                let field_values = fields
+                    .iter()
+                    .zip(columns)
+                    .map(|(field, column)| -> Result<(Field, ArrayRef)> {
+                        Ok((field.clone(), Self::iter_to_array(column)?))
+                    })
+                    .collect::<Result<Vec<_>>>()?;
+
+                Arc::new(StructArray::from(field_values))
+            }
             _ => {
                 return Err(DataFusionError::Internal(format!(
                     "Unsupported creation of {:?} array from ScalarValue {:?}",
@@ -905,6 +968,29 @@ impl ScalarValue {
                 e,
                 size
             ),
+            ScalarValue::Struct(values, fields) => match values {
+                Some(values) => {
+                    let field_values: Vec<_> = fields
+                        .iter()
+                        .zip(values.iter())
+                        .map(|(field, value)| {
+                            (field.clone(), value.to_array_of_size(size))
+                        })
+                        .collect();
+
+                    Arc::new(StructArray::from(field_values))
+                }
+                None => {
+                    let field_values: Vec<_> = fields.iter().map(|field| {
+                            let none_field = 
Self::try_from(field.data_type()).expect(
+                                "Failed to construct null ScalarValue from 
Struct field type"
+                            );
+                            (field.clone(), none_field.to_array_of_size(size))
+                        }).collect();
+
+                    Arc::new(StructArray::from(field_values))
+                }
+            },
         }
     }
 
@@ -1002,6 +1088,24 @@ impl ScalarValue {
                     None => values.data_type().try_into()?,
                 }
             }
+            DataType::Struct(fields) => {
+                let array =
+                    array
+                        .as_any()
+                        .downcast_ref::<StructArray>()
+                        .ok_or_else(|| {
+                            DataFusionError::Internal(
+                                "Failed to downcast ArrayRef to 
StructArray".to_string(),
+                            )
+                        })?;
+                let mut field_values: Vec<ScalarValue> = Vec::new();
+                for col_index in 0..array.num_columns() {
+                    let col_array = array.column(col_index);
+                    let col_scalar = ScalarValue::try_from_array(col_array, 
index)?;
+                    field_values.push(col_scalar);
+                }
+                Self::Struct(Some(Box::new(field_values)), 
Box::new(fields.clone()))
+            }
             other => {
                 return Err(DataFusionError::NotImplemented(format!(
                     "Can't create a scalar from array of type \"{:?}\"",
@@ -1092,6 +1196,7 @@ impl ScalarValue {
             ScalarValue::IntervalDayTime(val) => {
                 eq_array_primitive!(array, index, IntervalDayTimeArray, val)
             }
+            ScalarValue::Struct(_, _) => unimplemented!(),
         }
     }
 
@@ -1171,6 +1276,19 @@ impl FromStr for ScalarValue {
     }
 }
 
+impl From<Vec<(&str, ScalarValue)>> for ScalarValue {
+    fn from(value: Vec<(&str, ScalarValue)>) -> Self {
+        let (fields, scalars): (Vec<_>, Vec<_>) = value
+            .into_iter()
+            .map(|(name, scalar)| {
+                (Field::new(name, scalar.get_datatype(), false), scalar)
+            })
+            .unzip();
+
+        Self::Struct(Some(Box::new(scalars)), Box::new(fields))
+    }
+}
+
 macro_rules! impl_try_from {
     ($SCALAR:ident, $NATIVE:ident) => {
         impl TryFrom<ScalarValue> for $NATIVE {
@@ -1278,6 +1396,9 @@ impl TryFrom<&DataType> for ScalarValue {
             DataType::List(ref nested_type) => {
                 ScalarValue::List(None, 
Box::new(nested_type.data_type().clone()))
             }
+            DataType::Struct(fields) => {
+                ScalarValue::Struct(None, Box::new(fields.clone()))
+            }
             _ => {
                 return Err(DataFusionError::NotImplemented(format!(
                     "Can't create a scalar from data_type \"{:?}\"",
@@ -1354,6 +1475,18 @@ impl fmt::Display for ScalarValue {
             ScalarValue::Date64(e) => format_option!(f, e)?,
             ScalarValue::IntervalDayTime(e) => format_option!(f, e)?,
             ScalarValue::IntervalYearMonth(e) => format_option!(f, e)?,
+            ScalarValue::Struct(e, fields) => match e {
+                Some(l) => write!(
+                    f,
+                    "{{{}}}",
+                    l.iter()
+                        .zip(fields.iter())
+                        .map(|(value, field)| format!("{}:{}", field.name(), 
value))
+                        .collect::<Vec<_>>()
+                        .join(",")
+                )?,
+                None => write!(f, "NULL")?,
+            },
         };
         Ok(())
     }
@@ -1400,6 +1533,21 @@ impl fmt::Debug for ScalarValue {
             ScalarValue::IntervalYearMonth(_) => {
                 write!(f, "IntervalYearMonth(\"{}\")", self)
             }
+            ScalarValue::Struct(e, fields) => {
+                // Use Debug representation of field values
+                match e {
+                    Some(l) => write!(
+                        f,
+                        "Struct({{{}}})",
+                        l.iter()
+                            .zip(fields.iter())
+                            .map(|(value, field)| format!("{}:{:?}", 
field.name(), value))
+                            .collect::<Vec<_>>()
+                            .join(",")
+                    ),
+                    None => write!(f, "Struct(NULL)"),
+                }
+            }
         }
     }
 }
@@ -1901,5 +2049,272 @@ mod tests {
             )),
             None
         );
+
+        assert_eq!(
+            ScalarValue::from(vec![
+                ("A", ScalarValue::from(1.0)),
+                ("B", ScalarValue::from("Z"))
+            ])
+            .partial_cmp(&ScalarValue::from(vec![
+                ("A", ScalarValue::from(2.0)),
+                ("B", ScalarValue::from("A"))
+            ])),
+            Some(Ordering::Less)
+        );
+
+        // For different struct fields, `partial_cmp` returns None.
+        assert_eq!(
+            ScalarValue::from(vec![
+                ("A", ScalarValue::from(1.0)),
+                ("B", ScalarValue::from("Z"))
+            ])
+            .partial_cmp(&ScalarValue::from(vec![
+                ("a", ScalarValue::from(2.0)),
+                ("b", ScalarValue::from("A"))
+            ])),
+            None
+        );
+    }
+
+    #[test]
+    fn test_scalar_struct() {
+        let field_a = Field::new("A", DataType::Int32, false);
+        let field_b = Field::new("B", DataType::Boolean, false);
+        let field_c = Field::new("C", DataType::Utf8, false);
+
+        let field_e = Field::new("e", DataType::Int16, false);
+        let field_f = Field::new("f", DataType::Int64, false);
+        let field_d = Field::new(
+            "D",
+            DataType::Struct(vec![field_e.clone(), field_f.clone()]),
+            false,
+        );
+
+        let scalar = ScalarValue::Struct(
+            Some(Box::new(vec![
+                ScalarValue::Int32(Some(23)),
+                ScalarValue::Boolean(Some(false)),
+                ScalarValue::Utf8(Some("Hello".to_string())),
+                ScalarValue::from(vec![
+                    ("e", ScalarValue::from(2i16)),
+                    ("f", ScalarValue::from(3i64)),
+                ]),
+            ])),
+            Box::new(vec![
+                field_a.clone(),
+                field_b.clone(),
+                field_c.clone(),
+                field_d.clone(),
+            ]),
+        );
+
+        // Check Display
+        assert_eq!(
+            format!("{}", scalar),
+            String::from("{A:23,B:false,C:Hello,D:{e:2,f:3}}")
+        );
+
+        // Check Debug
+        assert_eq!(
+            format!("{:?}", scalar),
+            String::from(
+                
r#"Struct({A:Int32(23),B:Boolean(false),C:Utf8("Hello"),D:Struct({e:Int16(2),f:Int64(3)})})"#
+            )
+        );
+
+        // Convert to length-2 array
+        let array = scalar.to_array_of_size(2);
+
+        let expected = Arc::new(StructArray::from(vec![
+            (
+                field_a.clone(),
+                Arc::new(Int32Array::from(vec![23, 23])) as ArrayRef,
+            ),
+            (
+                field_b.clone(),
+                Arc::new(BooleanArray::from(vec![false, false])) as ArrayRef,
+            ),
+            (
+                field_c.clone(),
+                Arc::new(StringArray::from(vec!["Hello", "Hello"])) as 
ArrayRef,
+            ),
+            (
+                field_d.clone(),
+                Arc::new(StructArray::from(vec![
+                    (
+                        field_e.clone(),
+                        Arc::new(Int16Array::from(vec![2, 2])) as ArrayRef,
+                    ),
+                    (
+                        field_f.clone(),
+                        Arc::new(Int64Array::from(vec![3, 3])) as ArrayRef,
+                    ),
+                ])) as ArrayRef,
+            ),
+        ])) as ArrayRef;
+
+        assert_eq!(&array, &expected);
+
+        // Construct from second element of ArrayRef
+        let constructed = ScalarValue::try_from_array(&expected, 1).unwrap();
+        assert_eq!(constructed, scalar);
+
+        // None version
+        let none_scalar = ScalarValue::try_from(array.data_type()).unwrap();
+        assert!(none_scalar.is_null());
+        assert_eq!(format!("{:?}", none_scalar), String::from("Struct(NULL)"));
+
+        // Construct with convenience From<Vec<(&str, ScalarValue)>>
+        let constructed = ScalarValue::from(vec![
+            ("A", ScalarValue::from(23)),
+            ("B", ScalarValue::from(false)),
+            ("C", ScalarValue::from("Hello")),
+            (
+                "D",
+                ScalarValue::from(vec![
+                    ("e", ScalarValue::from(2i16)),
+                    ("f", ScalarValue::from(3i64)),
+                ]),
+            ),
+        ]);
+        assert_eq!(constructed, scalar);
+
+        // Build Array from Vec of structs
+        let scalars = vec![
+            ScalarValue::from(vec![
+                ("A", ScalarValue::from(23)),
+                ("B", ScalarValue::from(false)),
+                ("C", ScalarValue::from("Hello")),
+                (
+                    "D",
+                    ScalarValue::from(vec![
+                        ("e", ScalarValue::from(2i16)),
+                        ("f", ScalarValue::from(3i64)),
+                    ]),
+                ),
+            ]),
+            ScalarValue::from(vec![
+                ("A", ScalarValue::from(7)),
+                ("B", ScalarValue::from(true)),
+                ("C", ScalarValue::from("World")),
+                (
+                    "D",
+                    ScalarValue::from(vec![
+                        ("e", ScalarValue::from(4i16)),
+                        ("f", ScalarValue::from(5i64)),
+                    ]),
+                ),
+            ]),
+            ScalarValue::from(vec![
+                ("A", ScalarValue::from(-1000)),
+                ("B", ScalarValue::from(true)),
+                ("C", ScalarValue::from("!!!!!")),
+                (
+                    "D",
+                    ScalarValue::from(vec![
+                        ("e", ScalarValue::from(6i16)),
+                        ("f", ScalarValue::from(7i64)),
+                    ]),
+                ),
+            ]),
+        ];
+        let array = ScalarValue::iter_to_array(scalars).unwrap();
+
+        let expected = Arc::new(StructArray::from(vec![
+            (
+                field_a,
+                Arc::new(Int32Array::from(vec![23, 7, -1000])) as ArrayRef,
+            ),
+            (
+                field_b,
+                Arc::new(BooleanArray::from(vec![false, true, true])) as 
ArrayRef,
+            ),
+            (
+                field_c,
+                Arc::new(StringArray::from(vec!["Hello", "World", "!!!!!"])) 
as ArrayRef,
+            ),
+            (
+                field_d,
+                Arc::new(StructArray::from(vec![
+                    (
+                        field_e,
+                        Arc::new(Int16Array::from(vec![2, 4, 6])) as ArrayRef,
+                    ),
+                    (
+                        field_f,
+                        Arc::new(Int64Array::from(vec![3, 5, 7])) as ArrayRef,
+                    ),
+                ])) as ArrayRef,
+            ),
+        ])) as ArrayRef;
+
+        assert_eq!(&array, &expected);
+    }
+
+    #[test]
+    fn test_scalar_list_in_struct() {
+        let field_a = Field::new("A", DataType::Utf8, false);
+        let field_list = Field::new(
+            "list_field",
+            DataType::List(Box::new(Field::new("item", DataType::Int32, 
true))),
+            false,
+        );
+
+        let l0 = ScalarValue::List(
+            Some(Box::new(vec![
+                ScalarValue::from(1i32),
+                ScalarValue::from(2i32),
+                ScalarValue::from(3i32),
+            ])),
+            Box::new(DataType::Int32),
+        );
+
+        let l1 = ScalarValue::List(
+            Some(Box::new(vec![
+                ScalarValue::from(4i32),
+                ScalarValue::from(5i32),
+            ])),
+            Box::new(DataType::Int32),
+        );
+
+        let l2 = ScalarValue::List(
+            Some(Box::new(vec![ScalarValue::from(6i32)])),
+            Box::new(DataType::Int32),
+        );
+
+        let s0 = ScalarValue::from(vec![
+            ("A", ScalarValue::Utf8(Some(String::from("First")))),
+            ("list_field", l0),
+        ]);
+
+        let s1 = ScalarValue::from(vec![
+            ("A", ScalarValue::Utf8(Some(String::from("Second")))),
+            ("list_field", l1),
+        ]);
+
+        let s2 = ScalarValue::from(vec![
+            ("A", ScalarValue::Utf8(Some(String::from("Third")))),
+            ("list_field", l2),
+        ]);
+
+        let array = ScalarValue::iter_to_array(vec![s0, s1, s2]).unwrap();
+        let array = array.as_any().downcast_ref::<StructArray>().unwrap();
+
+        let expected = StructArray::from(vec![
+            (
+                field_a,
+                Arc::new(StringArray::from(vec!["First", "Second", "Third"])) 
as ArrayRef,
+            ),
+            (
+                field_list,
+                Arc::new(ListArray::from_iter_primitive::<Int32Type, _, 
_>(vec![
+                    Some(vec![Some(1), Some(2), Some(3)]),
+                    Some(vec![Some(4), Some(5)]),
+                    Some(vec![Some(6)]),
+                ])),
+            ),
+        ]);
+
+        assert_eq!(array, &expected);
     }
 }

Reply via email to