LiaCastaneda commented on code in PR #22689:
URL: https://github.com/apache/datafusion/pull/22689#discussion_r3371791774


##########
datafusion/functions-nested/src/map_transform.rs:
##########
@@ -0,0 +1,462 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! [`datafusion_expr::HigherOrderUDF`] definitions for map_transform function.
+
+use arrow::{
+    array::{Array, AsArray, MapArray, StructArray},
+    buffer::OffsetBuffer,
+    compute::take_arrays,
+    datatypes::{DataType, Field, FieldRef, Fields},
+};
+use datafusion_common::{
+    Result, ScalarValue, exec_err, plan_err,
+    utils::{list_values_row_number, take_function_args},
+};
+use datafusion_expr::{
+    ColumnarValue, Documentation, HigherOrderFunctionArgs, 
HigherOrderReturnFieldArgs,
+    HigherOrderSignature, HigherOrderUDFImpl, LambdaParametersProgress, 
ValueOrLambda,
+    Volatility,
+};
+use datafusion_macros::user_doc;
+use std::sync::Arc;
+
+use crate::utils::get_map_entry_field;
+
+make_higher_order_function_expr_and_func!(
+    MapTransform,
+    map_transform,
+    map lambda,
+    "transforms the values of a map",
+    map_transform_higher_order_function
+);
+
+#[user_doc(
+    doc_section(label = "Map Functions"),
+    description = "Returns a map that applies the lambda to each entry of the 
map and \
+    transforms the values. The keys are preserved unchanged.",
+    syntax_example = "map_transform(map, (k, v) -> expr)",
+    sql_example = r#"```sql
+> select map_transform(MAP {'a': 1, 'b': 2, 'c': 3}, (k, v) -> v * 10);
++--------------------------------------------------------------+
+| map_transform(MAP {'a': 1, 'b': 2, 'c': 3}, (k, v) -> v * 10) |
++--------------------------------------------------------------+
+| {a: 10, b: 20, c: 30}                                        |
++--------------------------------------------------------------+
+```"#,
+    argument(
+        name = "map",
+        description = "Map expression. Can be a constant, column, or function, 
and any combination of map operators."
+    ),
+    argument(
+        name = "lambda",
+        description = "Lambda accepting two parameters `(key, value)`. The 
return value is used as the new value for the entry."
+    )
+)]
+#[derive(Debug, PartialEq, Eq, Hash)]
+pub struct MapTransform {
+    signature: HigherOrderSignature,
+    aliases: Vec<String>,
+}
+
+impl Default for MapTransform {
+    fn default() -> Self {
+        Self::new()
+    }
+}
+
+impl MapTransform {
+    pub fn new() -> Self {
+        Self {
+            signature: HigherOrderSignature::exact(
+                vec![ValueOrLambda::Value(()), ValueOrLambda::Lambda(())],
+                Volatility::Immutable,
+            ),
+            aliases: vec![],
+        }
+    }
+}
+
+impl HigherOrderUDFImpl for MapTransform {
+    fn name(&self) -> &str {
+        "map_transform"
+    }
+
+    fn aliases(&self) -> &[String] {
+        &self.aliases
+    }
+
+    fn signature(&self) -> &HigherOrderSignature {
+        &self.signature
+    }
+
+    fn coerce_value_types(&self, arg_types: &[DataType]) -> 
Result<Vec<DataType>> {
+        let [map_type] = take_function_args(self.name(), arg_types)?;
+        match map_type {
+            DataType::Map(_, _) => Ok(vec![map_type.clone()]),
+            other => plan_err!(
+                "{} expected a map as first argument, got {other}",
+                self.name()
+            ),
+        }
+    }
+
+    fn lambda_parameters(
+        &self,
+        _step: usize,
+        fields: &[ValueOrLambda<FieldRef, Option<FieldRef>>],
+    ) -> Result<LambdaParametersProgress> {
+        let [ValueOrLambda::Value(map), ValueOrLambda::Lambda(_lambda)] =
+            take_function_args(self.name(), fields)?
+        else {
+            return plan_err!("{} expects a map followed by a lambda", 
self.name());
+        };
+
+        let entry_fields = get_map_entry_field(map.data_type())?;
+        let key_field = Arc::clone(entry_fields.first().ok_or_else(|| {
+            datafusion_common::DataFusionError::Internal(format!(
+                "{} expected map entries to contain a key field",
+                self.name()
+            ))
+        })?);
+        let value_field = Arc::clone(entry_fields.last().ok_or_else(|| {
+            datafusion_common::DataFusionError::Internal(format!(
+                "{} expected map entries to contain a value field",
+                self.name()
+            ))
+        })?);
+
+        Ok(LambdaParametersProgress::Complete(vec![vec![
+            key_field,
+            value_field,
+        ]]))
+    }
+
+    fn return_field_from_args(
+        &self,
+        args: HigherOrderReturnFieldArgs,
+    ) -> Result<Arc<Field>> {
+        let [ValueOrLambda::Value(map), ValueOrLambda::Lambda(lambda)] =
+            take_function_args(self.name(), args.arg_fields)?
+        else {
+            return plan_err!("{} expects a map followed by a lambda", 
self.name());
+        };
+
+        let (entries_field, ordered_keys) = match map.data_type() {
+            DataType::Map(field, ordered) => (Arc::clone(field), *ordered),
+            other => return plan_err!("expected map, got {other}"),
+        };
+
+        let entry_fields = match entries_field.data_type() {
+            DataType::Struct(fields) => fields.clone(),
+            other => {
+                return plan_err!("expected map entries to be a struct, got 
{other}");
+            }
+        };

Review Comment:
   ```suggestion
           let entry_fields = get_map_entry_field(map.data_type())?;
   ```



##########
datafusion/functions-nested/src/map_transform.rs:
##########
@@ -0,0 +1,462 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! [`datafusion_expr::HigherOrderUDF`] definitions for map_transform function.
+
+use arrow::{
+    array::{Array, AsArray, MapArray, StructArray},
+    buffer::OffsetBuffer,
+    compute::take_arrays,
+    datatypes::{DataType, Field, FieldRef, Fields},
+};
+use datafusion_common::{
+    Result, ScalarValue, exec_err, plan_err,
+    utils::{list_values_row_number, take_function_args},
+};
+use datafusion_expr::{
+    ColumnarValue, Documentation, HigherOrderFunctionArgs, 
HigherOrderReturnFieldArgs,
+    HigherOrderSignature, HigherOrderUDFImpl, LambdaParametersProgress, 
ValueOrLambda,
+    Volatility,
+};
+use datafusion_macros::user_doc;
+use std::sync::Arc;
+
+use crate::utils::get_map_entry_field;
+
+make_higher_order_function_expr_and_func!(
+    MapTransform,
+    map_transform,
+    map lambda,
+    "transforms the values of a map",
+    map_transform_higher_order_function
+);
+
+#[user_doc(
+    doc_section(label = "Map Functions"),
+    description = "Returns a map that applies the lambda to each entry of the 
map and \
+    transforms the values. The keys are preserved unchanged.",
+    syntax_example = "map_transform(map, (k, v) -> expr)",
+    sql_example = r#"```sql
+> select map_transform(MAP {'a': 1, 'b': 2, 'c': 3}, (k, v) -> v * 10);
++--------------------------------------------------------------+
+| map_transform(MAP {'a': 1, 'b': 2, 'c': 3}, (k, v) -> v * 10) |
++--------------------------------------------------------------+
+| {a: 10, b: 20, c: 30}                                        |
++--------------------------------------------------------------+
+```"#,
+    argument(
+        name = "map",
+        description = "Map expression. Can be a constant, column, or function, 
and any combination of map operators."
+    ),
+    argument(
+        name = "lambda",
+        description = "Lambda accepting two parameters `(key, value)`. The 
return value is used as the new value for the entry."
+    )
+)]
+#[derive(Debug, PartialEq, Eq, Hash)]
+pub struct MapTransform {
+    signature: HigherOrderSignature,
+    aliases: Vec<String>,
+}
+
+impl Default for MapTransform {
+    fn default() -> Self {
+        Self::new()
+    }
+}
+
+impl MapTransform {
+    pub fn new() -> Self {
+        Self {
+            signature: HigherOrderSignature::exact(
+                vec![ValueOrLambda::Value(()), ValueOrLambda::Lambda(())],
+                Volatility::Immutable,
+            ),
+            aliases: vec![],
+        }
+    }
+}
+
+impl HigherOrderUDFImpl for MapTransform {
+    fn name(&self) -> &str {
+        "map_transform"
+    }
+
+    fn aliases(&self) -> &[String] {
+        &self.aliases
+    }
+
+    fn signature(&self) -> &HigherOrderSignature {
+        &self.signature
+    }
+
+    fn coerce_value_types(&self, arg_types: &[DataType]) -> 
Result<Vec<DataType>> {
+        let [map_type] = take_function_args(self.name(), arg_types)?;
+        match map_type {
+            DataType::Map(_, _) => Ok(vec![map_type.clone()]),
+            other => plan_err!(
+                "{} expected a map as first argument, got {other}",
+                self.name()
+            ),
+        }
+    }
+
+    fn lambda_parameters(
+        &self,
+        _step: usize,
+        fields: &[ValueOrLambda<FieldRef, Option<FieldRef>>],
+    ) -> Result<LambdaParametersProgress> {
+        let [ValueOrLambda::Value(map), ValueOrLambda::Lambda(_lambda)] =
+            take_function_args(self.name(), fields)?
+        else {
+            return plan_err!("{} expects a map followed by a lambda", 
self.name());
+        };
+
+        let entry_fields = get_map_entry_field(map.data_type())?;
+        let key_field = Arc::clone(entry_fields.first().ok_or_else(|| {
+            datafusion_common::DataFusionError::Internal(format!(
+                "{} expected map entries to contain a key field",
+                self.name()
+            ))
+        })?);
+        let value_field = Arc::clone(entry_fields.last().ok_or_else(|| {
+            datafusion_common::DataFusionError::Internal(format!(
+                "{} expected map entries to contain a value field",
+                self.name()
+            ))

Review Comment:
   nit:
   ```suggestion
             datafusion_common::internal_datafusion_err!(
                   "{} expected map entries to contain a key field",
                   self.name()
               )```



##########
datafusion/functions-nested/src/map_transform.rs:
##########
@@ -0,0 +1,462 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! [`datafusion_expr::HigherOrderUDF`] definitions for map_transform function.
+
+use arrow::{
+    array::{Array, AsArray, MapArray, StructArray},
+    buffer::OffsetBuffer,
+    compute::take_arrays,
+    datatypes::{DataType, Field, FieldRef, Fields},
+};
+use datafusion_common::{
+    Result, ScalarValue, exec_err, plan_err,
+    utils::{list_values_row_number, take_function_args},
+};
+use datafusion_expr::{
+    ColumnarValue, Documentation, HigherOrderFunctionArgs, 
HigherOrderReturnFieldArgs,
+    HigherOrderSignature, HigherOrderUDFImpl, LambdaParametersProgress, 
ValueOrLambda,
+    Volatility,
+};
+use datafusion_macros::user_doc;
+use std::sync::Arc;
+
+use crate::utils::get_map_entry_field;
+
+make_higher_order_function_expr_and_func!(
+    MapTransform,
+    map_transform,
+    map lambda,
+    "transforms the values of a map",
+    map_transform_higher_order_function
+);
+
+#[user_doc(
+    doc_section(label = "Map Functions"),
+    description = "Returns a map that applies the lambda to each entry of the 
map and \
+    transforms the values. The keys are preserved unchanged.",
+    syntax_example = "map_transform(map, (k, v) -> expr)",
+    sql_example = r#"```sql
+> select map_transform(MAP {'a': 1, 'b': 2, 'c': 3}, (k, v) -> v * 10);
++--------------------------------------------------------------+
+| map_transform(MAP {'a': 1, 'b': 2, 'c': 3}, (k, v) -> v * 10) |
++--------------------------------------------------------------+
+| {a: 10, b: 20, c: 30}                                        |
++--------------------------------------------------------------+
+```"#,
+    argument(
+        name = "map",
+        description = "Map expression. Can be a constant, column, or function, 
and any combination of map operators."
+    ),
+    argument(
+        name = "lambda",
+        description = "Lambda accepting two parameters `(key, value)`. The 
return value is used as the new value for the entry."
+    )
+)]
+#[derive(Debug, PartialEq, Eq, Hash)]
+pub struct MapTransform {
+    signature: HigherOrderSignature,
+    aliases: Vec<String>,
+}
+
+impl Default for MapTransform {
+    fn default() -> Self {
+        Self::new()
+    }
+}
+
+impl MapTransform {
+    pub fn new() -> Self {
+        Self {
+            signature: HigherOrderSignature::exact(
+                vec![ValueOrLambda::Value(()), ValueOrLambda::Lambda(())],
+                Volatility::Immutable,
+            ),
+            aliases: vec![],
+        }
+    }
+}
+
+impl HigherOrderUDFImpl for MapTransform {
+    fn name(&self) -> &str {
+        "map_transform"
+    }
+
+    fn aliases(&self) -> &[String] {
+        &self.aliases
+    }
+
+    fn signature(&self) -> &HigherOrderSignature {
+        &self.signature
+    }
+
+    fn coerce_value_types(&self, arg_types: &[DataType]) -> 
Result<Vec<DataType>> {
+        let [map_type] = take_function_args(self.name(), arg_types)?;
+        match map_type {
+            DataType::Map(_, _) => Ok(vec![map_type.clone()]),
+            other => plan_err!(
+                "{} expected a map as first argument, got {other}",
+                self.name()
+            ),
+        }
+    }
+
+    fn lambda_parameters(
+        &self,
+        _step: usize,
+        fields: &[ValueOrLambda<FieldRef, Option<FieldRef>>],
+    ) -> Result<LambdaParametersProgress> {
+        let [ValueOrLambda::Value(map), ValueOrLambda::Lambda(_lambda)] =
+            take_function_args(self.name(), fields)?
+        else {
+            return plan_err!("{} expects a map followed by a lambda", 
self.name());
+        };
+
+        let entry_fields = get_map_entry_field(map.data_type())?;
+        let key_field = Arc::clone(entry_fields.first().ok_or_else(|| {
+            datafusion_common::DataFusionError::Internal(format!(
+                "{} expected map entries to contain a key field",
+                self.name()
+            ))
+        })?);
+        let value_field = Arc::clone(entry_fields.last().ok_or_else(|| {
+            datafusion_common::DataFusionError::Internal(format!(
+                "{} expected map entries to contain a value field",
+                self.name()
+            ))
+        })?);
+
+        Ok(LambdaParametersProgress::Complete(vec![vec![
+            key_field,
+            value_field,
+        ]]))
+    }
+
+    fn return_field_from_args(
+        &self,
+        args: HigherOrderReturnFieldArgs,
+    ) -> Result<Arc<Field>> {
+        let [ValueOrLambda::Value(map), ValueOrLambda::Lambda(lambda)] =
+            take_function_args(self.name(), args.arg_fields)?
+        else {
+            return plan_err!("{} expects a map followed by a lambda", 
self.name());
+        };
+
+        let (entries_field, ordered_keys) = match map.data_type() {
+            DataType::Map(field, ordered) => (Arc::clone(field), *ordered),
+            other => return plan_err!("expected map, got {other}"),
+        };
+
+        let entry_fields = match entries_field.data_type() {
+            DataType::Struct(fields) => fields.clone(),
+            other => {
+                return plan_err!("expected map entries to be a struct, got 
{other}");
+            }
+        };
+
+        let key_field = Arc::clone(entry_fields.first().ok_or_else(|| {
+            datafusion_common::DataFusionError::Internal(format!(
+                "{} expected map entries to contain a key field",
+                self.name()
+            ))

Review Comment:
   ```suggestion
               datafusion_common::internal_datafusion_err!(
                   "{} expected map entries to contain a value field",
                   self.name()
               )
   ```



##########
datafusion/functions-nested/src/map_transform.rs:
##########
@@ -0,0 +1,462 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! [`datafusion_expr::HigherOrderUDF`] definitions for map_transform function.
+
+use arrow::{
+    array::{Array, AsArray, MapArray, StructArray},
+    buffer::OffsetBuffer,
+    compute::take_arrays,
+    datatypes::{DataType, Field, FieldRef, Fields},
+};
+use datafusion_common::{
+    Result, ScalarValue, exec_err, plan_err,
+    utils::{list_values_row_number, take_function_args},
+};
+use datafusion_expr::{
+    ColumnarValue, Documentation, HigherOrderFunctionArgs, 
HigherOrderReturnFieldArgs,
+    HigherOrderSignature, HigherOrderUDFImpl, LambdaParametersProgress, 
ValueOrLambda,
+    Volatility,
+};
+use datafusion_macros::user_doc;
+use std::sync::Arc;
+
+use crate::utils::get_map_entry_field;
+
+make_higher_order_function_expr_and_func!(
+    MapTransform,
+    map_transform,
+    map lambda,
+    "transforms the values of a map",
+    map_transform_higher_order_function
+);
+
+#[user_doc(
+    doc_section(label = "Map Functions"),
+    description = "Returns a map that applies the lambda to each entry of the 
map and \
+    transforms the values. The keys are preserved unchanged.",
+    syntax_example = "map_transform(map, (k, v) -> expr)",
+    sql_example = r#"```sql
+> select map_transform(MAP {'a': 1, 'b': 2, 'c': 3}, (k, v) -> v * 10);
++--------------------------------------------------------------+
+| map_transform(MAP {'a': 1, 'b': 2, 'c': 3}, (k, v) -> v * 10) |
++--------------------------------------------------------------+
+| {a: 10, b: 20, c: 30}                                        |
++--------------------------------------------------------------+
+```"#,
+    argument(
+        name = "map",
+        description = "Map expression. Can be a constant, column, or function, 
and any combination of map operators."
+    ),
+    argument(
+        name = "lambda",
+        description = "Lambda accepting two parameters `(key, value)`. The 
return value is used as the new value for the entry."
+    )
+)]
+#[derive(Debug, PartialEq, Eq, Hash)]
+pub struct MapTransform {
+    signature: HigherOrderSignature,
+    aliases: Vec<String>,
+}
+
+impl Default for MapTransform {
+    fn default() -> Self {
+        Self::new()
+    }
+}
+
+impl MapTransform {
+    pub fn new() -> Self {
+        Self {
+            signature: HigherOrderSignature::exact(
+                vec![ValueOrLambda::Value(()), ValueOrLambda::Lambda(())],
+                Volatility::Immutable,
+            ),
+            aliases: vec![],
+        }
+    }
+}
+
+impl HigherOrderUDFImpl for MapTransform {
+    fn name(&self) -> &str {
+        "map_transform"
+    }
+
+    fn aliases(&self) -> &[String] {
+        &self.aliases
+    }
+
+    fn signature(&self) -> &HigherOrderSignature {
+        &self.signature
+    }
+
+    fn coerce_value_types(&self, arg_types: &[DataType]) -> 
Result<Vec<DataType>> {
+        let [map_type] = take_function_args(self.name(), arg_types)?;
+        match map_type {
+            DataType::Map(_, _) => Ok(vec![map_type.clone()]),
+            other => plan_err!(
+                "{} expected a map as first argument, got {other}",
+                self.name()
+            ),
+        }
+    }
+
+    fn lambda_parameters(
+        &self,
+        _step: usize,
+        fields: &[ValueOrLambda<FieldRef, Option<FieldRef>>],
+    ) -> Result<LambdaParametersProgress> {
+        let [ValueOrLambda::Value(map), ValueOrLambda::Lambda(_lambda)] =
+            take_function_args(self.name(), fields)?
+        else {
+            return plan_err!("{} expects a map followed by a lambda", 
self.name());
+        };
+
+        let entry_fields = get_map_entry_field(map.data_type())?;
+        let key_field = Arc::clone(entry_fields.first().ok_or_else(|| {
+            datafusion_common::DataFusionError::Internal(format!(
+                "{} expected map entries to contain a key field",
+                self.name()
+            ))
+        })?);
+        let value_field = Arc::clone(entry_fields.last().ok_or_else(|| {
+            datafusion_common::DataFusionError::Internal(format!(
+                "{} expected map entries to contain a value field",
+                self.name()
+            ))
+        })?);
+
+        Ok(LambdaParametersProgress::Complete(vec![vec![
+            key_field,
+            value_field,
+        ]]))
+    }
+
+    fn return_field_from_args(
+        &self,
+        args: HigherOrderReturnFieldArgs,
+    ) -> Result<Arc<Field>> {
+        let [ValueOrLambda::Value(map), ValueOrLambda::Lambda(lambda)] =
+            take_function_args(self.name(), args.arg_fields)?
+        else {
+            return plan_err!("{} expects a map followed by a lambda", 
self.name());
+        };
+
+        let (entries_field, ordered_keys) = match map.data_type() {
+            DataType::Map(field, ordered) => (Arc::clone(field), *ordered),
+            other => return plan_err!("expected map, got {other}"),
+        };
+
+        let entry_fields = match entries_field.data_type() {
+            DataType::Struct(fields) => fields.clone(),
+            other => {
+                return plan_err!("expected map entries to be a struct, got 
{other}");
+            }
+        };
+
+        let key_field = Arc::clone(entry_fields.first().ok_or_else(|| {
+            datafusion_common::DataFusionError::Internal(format!(
+                "{} expected map entries to contain a key field",
+                self.name()
+            ))
+        })?);
+
+        let original_value_field = entry_fields.last().ok_or_else(|| {
+            datafusion_common::DataFusionError::Internal(format!(
+                "{} expected map entries to contain a value field",
+                self.name()
+            ))
+        })?;
+
+        let new_value_field = Arc::new(Field::new(
+            original_value_field.name(),
+            lambda.data_type().clone(),
+            lambda.is_nullable(),
+        ));
+
+        let new_entries_struct =
+            DataType::Struct(Fields::from(vec![key_field, new_value_field]));
+        let new_entries_field = Arc::new(Field::new(
+            entries_field.name(),
+            new_entries_struct,
+            entries_field.is_nullable(),
+        ));
+
+        Ok(Arc::new(Field::new(
+            "",
+            DataType::Map(new_entries_field, ordered_keys),
+            map.is_nullable(),
+        )))
+    }
+
+    fn invoke_with_args(&self, args: HigherOrderFunctionArgs) -> 
Result<ColumnarValue> {
+        let [map, lambda] = take_function_args(self.name(), &args.args)?;
+        let (ValueOrLambda::Value(map), ValueOrLambda::Lambda(lambda)) = (map, 
lambda)
+        else {
+            return plan_err!("{} expects a map followed by a lambda", 
self.name());
+        };
+
+        let map_array_dyn = map.to_array(args.number_rows)?;
+        let map_array = match map_array_dyn.data_type() {
+            DataType::Map(_, _) => map_array_dyn.as_map(),
+            other => return exec_err!("{} expected a map, got {other}", 
self.name()),
+        };
+
+        // Fast path: every row is null.
+        if map_array.null_count() == map_array.len() {
+            return Ok(ColumnarValue::Scalar(ScalarValue::try_new_null(
+                args.return_type(),
+            )?));
+        }
+
+        let offsets = map_array.offsets();
+        let first = offsets.first().copied().unwrap_or(0) as usize;
+        let last = offsets.last().copied().unwrap_or(0) as usize;
+        let len = last - first;
+
+        let new_entries_field = match args.return_field.data_type() {
+            DataType::Map(field, _) => Arc::clone(field),
+            other => {
+                return exec_err!(
+                    "{} expected return_field to be a map, got {other}",
+                    self.name()
+                );
+            }
+        };
+        let (new_key_field, new_value_field) = match 
new_entries_field.data_type() {
+            DataType::Struct(fields) if fields.len() == 2 => {
+                (Arc::clone(&fields[0]), Arc::clone(&fields[1]))
+            }
+            other => {
+                return exec_err!(
+                    "{} expected map entries struct with two fields, got 
{other}",
+                    self.name()
+                );
+            }
+        };

Review Comment:
   looks like the "get the map fields" pattern repeats a lot in this file and 
with different approaches, maybe it's worth standarizing
   get_map_entry_field + first() + last() logic in a helper function under` 
datafusion/functions-nested/src/utils.rs `



##########
datafusion/sqllogictest/test_files/map/map_transform.slt:
##########


Review Comment:
   Can we add a sqllogictest for the expected behavior of 
[this](https://github.com/apache/datafusion/pull/22689/changes#r3371940346)?



##########
datafusion/functions-nested/src/map_transform.rs:
##########
@@ -0,0 +1,462 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! [`datafusion_expr::HigherOrderUDF`] definitions for map_transform function.
+
+use arrow::{
+    array::{Array, AsArray, MapArray, StructArray},
+    buffer::OffsetBuffer,
+    compute::take_arrays,
+    datatypes::{DataType, Field, FieldRef, Fields},
+};
+use datafusion_common::{
+    Result, ScalarValue, exec_err, plan_err,
+    utils::{list_values_row_number, take_function_args},
+};
+use datafusion_expr::{
+    ColumnarValue, Documentation, HigherOrderFunctionArgs, 
HigherOrderReturnFieldArgs,
+    HigherOrderSignature, HigherOrderUDFImpl, LambdaParametersProgress, 
ValueOrLambda,
+    Volatility,
+};
+use datafusion_macros::user_doc;
+use std::sync::Arc;
+
+use crate::utils::get_map_entry_field;
+
+make_higher_order_function_expr_and_func!(
+    MapTransform,
+    map_transform,
+    map lambda,
+    "transforms the values of a map",
+    map_transform_higher_order_function
+);
+
+#[user_doc(
+    doc_section(label = "Map Functions"),
+    description = "Returns a map that applies the lambda to each entry of the 
map and \
+    transforms the values. The keys are preserved unchanged.",
+    syntax_example = "map_transform(map, (k, v) -> expr)",
+    sql_example = r#"```sql
+> select map_transform(MAP {'a': 1, 'b': 2, 'c': 3}, (k, v) -> v * 10);
++--------------------------------------------------------------+
+| map_transform(MAP {'a': 1, 'b': 2, 'c': 3}, (k, v) -> v * 10) |
++--------------------------------------------------------------+
+| {a: 10, b: 20, c: 30}                                        |
++--------------------------------------------------------------+
+```"#,
+    argument(
+        name = "map",
+        description = "Map expression. Can be a constant, column, or function, 
and any combination of map operators."
+    ),
+    argument(
+        name = "lambda",
+        description = "Lambda accepting two parameters `(key, value)`. The 
return value is used as the new value for the entry."
+    )
+)]
+#[derive(Debug, PartialEq, Eq, Hash)]
+pub struct MapTransform {
+    signature: HigherOrderSignature,
+    aliases: Vec<String>,
+}
+
+impl Default for MapTransform {
+    fn default() -> Self {
+        Self::new()
+    }
+}
+
+impl MapTransform {
+    pub fn new() -> Self {
+        Self {
+            signature: HigherOrderSignature::exact(
+                vec![ValueOrLambda::Value(()), ValueOrLambda::Lambda(())],
+                Volatility::Immutable,
+            ),
+            aliases: vec![],
+        }
+    }
+}
+
+impl HigherOrderUDFImpl for MapTransform {
+    fn name(&self) -> &str {
+        "map_transform"
+    }
+
+    fn aliases(&self) -> &[String] {
+        &self.aliases
+    }
+
+    fn signature(&self) -> &HigherOrderSignature {
+        &self.signature
+    }
+
+    fn coerce_value_types(&self, arg_types: &[DataType]) -> 
Result<Vec<DataType>> {
+        let [map_type] = take_function_args(self.name(), arg_types)?;
+        match map_type {
+            DataType::Map(_, _) => Ok(vec![map_type.clone()]),
+            other => plan_err!(
+                "{} expected a map as first argument, got {other}",
+                self.name()
+            ),
+        }
+    }
+
+    fn lambda_parameters(
+        &self,
+        _step: usize,
+        fields: &[ValueOrLambda<FieldRef, Option<FieldRef>>],
+    ) -> Result<LambdaParametersProgress> {
+        let [ValueOrLambda::Value(map), ValueOrLambda::Lambda(_lambda)] =
+            take_function_args(self.name(), fields)?
+        else {
+            return plan_err!("{} expects a map followed by a lambda", 
self.name());
+        };
+
+        let entry_fields = get_map_entry_field(map.data_type())?;
+        let key_field = Arc::clone(entry_fields.first().ok_or_else(|| {
+            datafusion_common::DataFusionError::Internal(format!(
+                "{} expected map entries to contain a key field",
+                self.name()
+            ))
+        })?);
+        let value_field = Arc::clone(entry_fields.last().ok_or_else(|| {
+            datafusion_common::DataFusionError::Internal(format!(
+                "{} expected map entries to contain a value field",
+                self.name()
+            ))
+        })?);
+
+        Ok(LambdaParametersProgress::Complete(vec![vec![
+            key_field,
+            value_field,
+        ]]))
+    }
+
+    fn return_field_from_args(
+        &self,
+        args: HigherOrderReturnFieldArgs,
+    ) -> Result<Arc<Field>> {
+        let [ValueOrLambda::Value(map), ValueOrLambda::Lambda(lambda)] =
+            take_function_args(self.name(), args.arg_fields)?
+        else {
+            return plan_err!("{} expects a map followed by a lambda", 
self.name());
+        };
+
+        let (entries_field, ordered_keys) = match map.data_type() {
+            DataType::Map(field, ordered) => (Arc::clone(field), *ordered),
+            other => return plan_err!("expected map, got {other}"),
+        };
+
+        let entry_fields = match entries_field.data_type() {
+            DataType::Struct(fields) => fields.clone(),
+            other => {
+                return plan_err!("expected map entries to be a struct, got 
{other}");
+            }
+        };
+
+        let key_field = Arc::clone(entry_fields.first().ok_or_else(|| {
+            datafusion_common::DataFusionError::Internal(format!(
+                "{} expected map entries to contain a key field",
+                self.name()
+            ))
+        })?);
+
+        let original_value_field = entry_fields.last().ok_or_else(|| {
+            datafusion_common::DataFusionError::Internal(format!(
+                "{} expected map entries to contain a value field",
+                self.name()
+            ))
+        })?;
+
+        let new_value_field = Arc::new(Field::new(
+            original_value_field.name(),
+            lambda.data_type().clone(),
+            lambda.is_nullable(),
+        ));
+
+        let new_entries_struct =
+            DataType::Struct(Fields::from(vec![key_field, new_value_field]));
+        let new_entries_field = Arc::new(Field::new(
+            entries_field.name(),
+            new_entries_struct,
+            entries_field.is_nullable(),
+        ));
+
+        Ok(Arc::new(Field::new(
+            "",
+            DataType::Map(new_entries_field, ordered_keys),
+            map.is_nullable(),
+        )))
+    }
+
+    fn invoke_with_args(&self, args: HigherOrderFunctionArgs) -> 
Result<ColumnarValue> {
+        let [map, lambda] = take_function_args(self.name(), &args.args)?;
+        let (ValueOrLambda::Value(map), ValueOrLambda::Lambda(lambda)) = (map, 
lambda)
+        else {
+            return plan_err!("{} expects a map followed by a lambda", 
self.name());
+        };
+
+        let map_array_dyn = map.to_array(args.number_rows)?;
+        let map_array = match map_array_dyn.data_type() {
+            DataType::Map(_, _) => map_array_dyn.as_map(),
+            other => return exec_err!("{} expected a map, got {other}", 
self.name()),
+        };
+
+        // Fast path: every row is null.
+        if map_array.null_count() == map_array.len() {
+            return Ok(ColumnarValue::Scalar(ScalarValue::try_new_null(
+                args.return_type(),
+            )?));

Review Comment:
   shouldn't we return an array of nulls instead of a null scalar?



##########
datafusion/functions-nested/src/map_transform.rs:
##########
@@ -0,0 +1,462 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! [`datafusion_expr::HigherOrderUDF`] definitions for map_transform function.
+
+use arrow::{
+    array::{Array, AsArray, MapArray, StructArray},
+    buffer::OffsetBuffer,
+    compute::take_arrays,
+    datatypes::{DataType, Field, FieldRef, Fields},
+};
+use datafusion_common::{
+    Result, ScalarValue, exec_err, plan_err,
+    utils::{list_values_row_number, take_function_args},
+};
+use datafusion_expr::{
+    ColumnarValue, Documentation, HigherOrderFunctionArgs, 
HigherOrderReturnFieldArgs,
+    HigherOrderSignature, HigherOrderUDFImpl, LambdaParametersProgress, 
ValueOrLambda,
+    Volatility,
+};
+use datafusion_macros::user_doc;
+use std::sync::Arc;
+
+use crate::utils::get_map_entry_field;
+
+make_higher_order_function_expr_and_func!(
+    MapTransform,
+    map_transform,
+    map lambda,
+    "transforms the values of a map",
+    map_transform_higher_order_function
+);
+
+#[user_doc(
+    doc_section(label = "Map Functions"),
+    description = "Returns a map that applies the lambda to each entry of the 
map and \
+    transforms the values. The keys are preserved unchanged.",
+    syntax_example = "map_transform(map, (k, v) -> expr)",
+    sql_example = r#"```sql
+> select map_transform(MAP {'a': 1, 'b': 2, 'c': 3}, (k, v) -> v * 10);
++--------------------------------------------------------------+
+| map_transform(MAP {'a': 1, 'b': 2, 'c': 3}, (k, v) -> v * 10) |
++--------------------------------------------------------------+
+| {a: 10, b: 20, c: 30}                                        |
++--------------------------------------------------------------+
+```"#,
+    argument(
+        name = "map",
+        description = "Map expression. Can be a constant, column, or function, 
and any combination of map operators."
+    ),
+    argument(
+        name = "lambda",
+        description = "Lambda accepting two parameters `(key, value)`. The 
return value is used as the new value for the entry."
+    )
+)]
+#[derive(Debug, PartialEq, Eq, Hash)]
+pub struct MapTransform {
+    signature: HigherOrderSignature,
+    aliases: Vec<String>,
+}
+
+impl Default for MapTransform {
+    fn default() -> Self {
+        Self::new()
+    }
+}
+
+impl MapTransform {
+    pub fn new() -> Self {
+        Self {
+            signature: HigherOrderSignature::exact(
+                vec![ValueOrLambda::Value(()), ValueOrLambda::Lambda(())],
+                Volatility::Immutable,
+            ),
+            aliases: vec![],
+        }
+    }
+}
+
+impl HigherOrderUDFImpl for MapTransform {
+    fn name(&self) -> &str {
+        "map_transform"
+    }
+
+    fn aliases(&self) -> &[String] {
+        &self.aliases
+    }
+
+    fn signature(&self) -> &HigherOrderSignature {
+        &self.signature
+    }
+
+    fn coerce_value_types(&self, arg_types: &[DataType]) -> 
Result<Vec<DataType>> {
+        let [map_type] = take_function_args(self.name(), arg_types)?;
+        match map_type {
+            DataType::Map(_, _) => Ok(vec![map_type.clone()]),
+            other => plan_err!(
+                "{} expected a map as first argument, got {other}",
+                self.name()
+            ),
+        }
+    }
+
+    fn lambda_parameters(
+        &self,
+        _step: usize,
+        fields: &[ValueOrLambda<FieldRef, Option<FieldRef>>],
+    ) -> Result<LambdaParametersProgress> {
+        let [ValueOrLambda::Value(map), ValueOrLambda::Lambda(_lambda)] =
+            take_function_args(self.name(), fields)?
+        else {
+            return plan_err!("{} expects a map followed by a lambda", 
self.name());
+        };
+
+        let entry_fields = get_map_entry_field(map.data_type())?;
+        let key_field = Arc::clone(entry_fields.first().ok_or_else(|| {
+            datafusion_common::DataFusionError::Internal(format!(
+                "{} expected map entries to contain a key field",
+                self.name()
+            ))
+        })?);
+        let value_field = Arc::clone(entry_fields.last().ok_or_else(|| {
+            datafusion_common::DataFusionError::Internal(format!(
+                "{} expected map entries to contain a value field",
+                self.name()
+            ))
+        })?);
+
+        Ok(LambdaParametersProgress::Complete(vec![vec![
+            key_field,
+            value_field,
+        ]]))
+    }
+
+    fn return_field_from_args(
+        &self,
+        args: HigherOrderReturnFieldArgs,
+    ) -> Result<Arc<Field>> {
+        let [ValueOrLambda::Value(map), ValueOrLambda::Lambda(lambda)] =
+            take_function_args(self.name(), args.arg_fields)?
+        else {
+            return plan_err!("{} expects a map followed by a lambda", 
self.name());
+        };
+
+        let (entries_field, ordered_keys) = match map.data_type() {
+            DataType::Map(field, ordered) => (Arc::clone(field), *ordered),
+            other => return plan_err!("expected map, got {other}"),
+        };
+
+        let entry_fields = match entries_field.data_type() {
+            DataType::Struct(fields) => fields.clone(),
+            other => {
+                return plan_err!("expected map entries to be a struct, got 
{other}");
+            }
+        };
+
+        let key_field = Arc::clone(entry_fields.first().ok_or_else(|| {
+            datafusion_common::DataFusionError::Internal(format!(
+                "{} expected map entries to contain a key field",
+                self.name()
+            ))
+        })?);
+
+        let original_value_field = entry_fields.last().ok_or_else(|| {
+            datafusion_common::DataFusionError::Internal(format!(
+                "{} expected map entries to contain a value field",
+                self.name()
+            ))
+        })?;
+
+        let new_value_field = Arc::new(Field::new(
+            original_value_field.name(),
+            lambda.data_type().clone(),
+            lambda.is_nullable(),
+        ));
+
+        let new_entries_struct =
+            DataType::Struct(Fields::from(vec![key_field, new_value_field]));
+        let new_entries_field = Arc::new(Field::new(
+            entries_field.name(),
+            new_entries_struct,
+            entries_field.is_nullable(),
+        ));
+
+        Ok(Arc::new(Field::new(
+            "",
+            DataType::Map(new_entries_field, ordered_keys),
+            map.is_nullable(),
+        )))
+    }
+
+    fn invoke_with_args(&self, args: HigherOrderFunctionArgs) -> 
Result<ColumnarValue> {
+        let [map, lambda] = take_function_args(self.name(), &args.args)?;
+        let (ValueOrLambda::Value(map), ValueOrLambda::Lambda(lambda)) = (map, 
lambda)
+        else {
+            return plan_err!("{} expects a map followed by a lambda", 
self.name());
+        };

Review Comment:
   ```suggestion
           let (map, lambda) = value_lambda_pair(self.name(), &args.args)?;
   ```



##########
datafusion/functions-nested/src/map_transform.rs:
##########
@@ -0,0 +1,462 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! [`datafusion_expr::HigherOrderUDF`] definitions for map_transform function.
+
+use arrow::{
+    array::{Array, AsArray, MapArray, StructArray},
+    buffer::OffsetBuffer,
+    compute::take_arrays,
+    datatypes::{DataType, Field, FieldRef, Fields},
+};
+use datafusion_common::{
+    Result, ScalarValue, exec_err, plan_err,
+    utils::{list_values_row_number, take_function_args},
+};
+use datafusion_expr::{
+    ColumnarValue, Documentation, HigherOrderFunctionArgs, 
HigherOrderReturnFieldArgs,
+    HigherOrderSignature, HigherOrderUDFImpl, LambdaParametersProgress, 
ValueOrLambda,
+    Volatility,
+};
+use datafusion_macros::user_doc;
+use std::sync::Arc;
+
+use crate::utils::get_map_entry_field;
+
+make_higher_order_function_expr_and_func!(
+    MapTransform,
+    map_transform,
+    map lambda,
+    "transforms the values of a map",
+    map_transform_higher_order_function
+);
+
+#[user_doc(
+    doc_section(label = "Map Functions"),
+    description = "Returns a map that applies the lambda to each entry of the 
map and \
+    transforms the values. The keys are preserved unchanged.",
+    syntax_example = "map_transform(map, (k, v) -> expr)",
+    sql_example = r#"```sql
+> select map_transform(MAP {'a': 1, 'b': 2, 'c': 3}, (k, v) -> v * 10);
++--------------------------------------------------------------+
+| map_transform(MAP {'a': 1, 'b': 2, 'c': 3}, (k, v) -> v * 10) |
++--------------------------------------------------------------+
+| {a: 10, b: 20, c: 30}                                        |
++--------------------------------------------------------------+
+```"#,
+    argument(
+        name = "map",
+        description = "Map expression. Can be a constant, column, or function, 
and any combination of map operators."
+    ),
+    argument(
+        name = "lambda",
+        description = "Lambda accepting two parameters `(key, value)`. The 
return value is used as the new value for the entry."
+    )
+)]
+#[derive(Debug, PartialEq, Eq, Hash)]
+pub struct MapTransform {
+    signature: HigherOrderSignature,
+    aliases: Vec<String>,
+}
+
+impl Default for MapTransform {
+    fn default() -> Self {
+        Self::new()
+    }
+}
+
+impl MapTransform {
+    pub fn new() -> Self {
+        Self {
+            signature: HigherOrderSignature::exact(
+                vec![ValueOrLambda::Value(()), ValueOrLambda::Lambda(())],
+                Volatility::Immutable,
+            ),
+            aliases: vec![],
+        }
+    }
+}
+
+impl HigherOrderUDFImpl for MapTransform {
+    fn name(&self) -> &str {
+        "map_transform"
+    }
+
+    fn aliases(&self) -> &[String] {
+        &self.aliases
+    }
+
+    fn signature(&self) -> &HigherOrderSignature {
+        &self.signature
+    }
+
+    fn coerce_value_types(&self, arg_types: &[DataType]) -> 
Result<Vec<DataType>> {
+        let [map_type] = take_function_args(self.name(), arg_types)?;
+        match map_type {
+            DataType::Map(_, _) => Ok(vec![map_type.clone()]),
+            other => plan_err!(
+                "{} expected a map as first argument, got {other}",
+                self.name()
+            ),
+        }
+    }
+
+    fn lambda_parameters(
+        &self,
+        _step: usize,
+        fields: &[ValueOrLambda<FieldRef, Option<FieldRef>>],
+    ) -> Result<LambdaParametersProgress> {
+        let [ValueOrLambda::Value(map), ValueOrLambda::Lambda(_lambda)] =
+            take_function_args(self.name(), fields)?
+        else {
+            return plan_err!("{} expects a map followed by a lambda", 
self.name());
+        };
+
+        let entry_fields = get_map_entry_field(map.data_type())?;
+        let key_field = Arc::clone(entry_fields.first().ok_or_else(|| {
+            datafusion_common::DataFusionError::Internal(format!(
+                "{} expected map entries to contain a key field",
+                self.name()
+            ))
+        })?);
+        let value_field = Arc::clone(entry_fields.last().ok_or_else(|| {
+            datafusion_common::DataFusionError::Internal(format!(
+                "{} expected map entries to contain a value field",
+                self.name()
+            ))
+        })?);
+
+        Ok(LambdaParametersProgress::Complete(vec![vec![
+            key_field,
+            value_field,
+        ]]))
+    }
+
+    fn return_field_from_args(
+        &self,
+        args: HigherOrderReturnFieldArgs,
+    ) -> Result<Arc<Field>> {
+        let [ValueOrLambda::Value(map), ValueOrLambda::Lambda(lambda)] =
+            take_function_args(self.name(), args.arg_fields)?
+        else {
+            return plan_err!("{} expects a map followed by a lambda", 
self.name());
+        };
+
+        let (entries_field, ordered_keys) = match map.data_type() {
+            DataType::Map(field, ordered) => (Arc::clone(field), *ordered),
+            other => return plan_err!("expected map, got {other}"),
+        };
+
+        let entry_fields = match entries_field.data_type() {
+            DataType::Struct(fields) => fields.clone(),
+            other => {
+                return plan_err!("expected map entries to be a struct, got 
{other}");
+            }
+        };
+
+        let key_field = Arc::clone(entry_fields.first().ok_or_else(|| {
+            datafusion_common::DataFusionError::Internal(format!(
+                "{} expected map entries to contain a key field",
+                self.name()
+            ))
+        })?);
+
+        let original_value_field = entry_fields.last().ok_or_else(|| {
+            datafusion_common::DataFusionError::Internal(format!(
+                "{} expected map entries to contain a value field",
+                self.name()
+            ))
+        })?;
+
+        let new_value_field = Arc::new(Field::new(
+            original_value_field.name(),
+            lambda.data_type().clone(),
+            lambda.is_nullable(),
+        ));
+
+        let new_entries_struct =
+            DataType::Struct(Fields::from(vec![key_field, new_value_field]));
+        let new_entries_field = Arc::new(Field::new(
+            entries_field.name(),
+            new_entries_struct,
+            entries_field.is_nullable(),
+        ));
+
+        Ok(Arc::new(Field::new(
+            "",
+            DataType::Map(new_entries_field, ordered_keys),
+            map.is_nullable(),
+        )))
+    }
+
+    fn invoke_with_args(&self, args: HigherOrderFunctionArgs) -> 
Result<ColumnarValue> {
+        let [map, lambda] = take_function_args(self.name(), &args.args)?;
+        let (ValueOrLambda::Value(map), ValueOrLambda::Lambda(lambda)) = (map, 
lambda)
+        else {
+            return plan_err!("{} expects a map followed by a lambda", 
self.name());
+        };
+
+        let map_array_dyn = map.to_array(args.number_rows)?;
+        let map_array = match map_array_dyn.data_type() {
+            DataType::Map(_, _) => map_array_dyn.as_map(),
+            other => return exec_err!("{} expected a map, got {other}", 
self.name()),
+        };
+
+        // Fast path: every row is null.
+        if map_array.null_count() == map_array.len() {
+            return Ok(ColumnarValue::Scalar(ScalarValue::try_new_null(
+                args.return_type(),
+            )?));
+        }
+
+        let offsets = map_array.offsets();
+        let first = offsets.first().copied().unwrap_or(0) as usize;
+        let last = offsets.last().copied().unwrap_or(0) as usize;
+        let len = last - first;
+
+        let new_entries_field = match args.return_field.data_type() {
+            DataType::Map(field, _) => Arc::clone(field),
+            other => {
+                return exec_err!(
+                    "{} expected return_field to be a map, got {other}",
+                    self.name()
+                );
+            }
+        };
+        let (new_key_field, new_value_field) = match 
new_entries_field.data_type() {
+            DataType::Struct(fields) if fields.len() == 2 => {
+                (Arc::clone(&fields[0]), Arc::clone(&fields[1]))
+            }
+            other => {
+                return exec_err!(
+                    "{} expected map entries struct with two fields, got 
{other}",
+                    self.name()
+                );
+            }
+        };
+
+        let flat_keys = if first == 0 && last == map_array.keys().len() {
+            Arc::clone(map_array.keys())
+        } else {
+            map_array.keys().slice(first, len)
+        };
+        let flat_values = if first == 0 && last == map_array.values().len() {
+            Arc::clone(map_array.values())
+        } else {
+            map_array.values().slice(first, len)
+        };
+
+        // Fast path: no entries at all and the map has no nulls — return an 
empty map
+        // mirroring the input row count.
+        if len == 0 && map_array.null_count() == 0 {

Review Comment:
   nit: should we move this check right after the null fast path check?



##########
datafusion/functions-nested/src/map_transform.rs:
##########


Review Comment:
   Since we will probably want the equivalent function for keys at some point, 
it might be worth renaming this function to `transform_values`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to