dawidwys commented on code in PR #23173: URL: https://github.com/apache/flink/pull/23173#discussion_r1509113710
########## docs/data/sql_functions.yml: ########## @@ -676,7 +676,10 @@ collection: - sql: MAP_FROM_ARRAYS(array_of_keys, array_of_values) table: mapFromArrays(array_of_keys, array_of_values) description: Returns a map created from an arrays of keys and values. Note that the lengths of two arrays should be the same. - + - sql: ARRAY_EXCEPT(array1, array2) + table: arrayOne.arrayExcept(arrayTwo) + description: Returns an ARRAY that contains the elements from array1 that are not in array2. If no elements remain after excluding the elements in array2 from array1, the function returns an empty ARRAY. If one or both arguments are NULL, the function returns NULL. The order of the values within the returned array is specified. Review Comment: should we rather say: ``` the order of the elements from array1 is kept ``` ########## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CollectionFunctionsITCase.java: ########## @@ -1516,4 +1517,141 @@ private Stream<TestSetSpec> arraySortTestCases() { }, DataTypes.ARRAY(DataTypes.DATE()))); } + + private Stream<TestSetSpec> arrayExceptTestCases() { + return Stream.of( + TestSetSpec.forFunction(BuiltInFunctionDefinitions.ARRAY_EXCEPT) + .onFieldsWithData( + new Integer[] {1, 2, 2}, + null, + new Row[] { + Row.of(true, LocalDate.of(2022, 4, 20)), + Row.of(true, LocalDate.of(1990, 10, 14)), + null + }, + new Integer[] {null, null, 1}, + new Integer[][] { + new Integer[] {1, null, 3}, new Integer[] {0}, new Integer[] {1} + }, + new Map[] { + CollectionUtil.map(entry(1, "a"), entry(2, "b")), + CollectionUtil.map(entry(3, "c"), entry(4, "d")), + null + }) + .andDataTypes( + DataTypes.ARRAY(DataTypes.INT()), + DataTypes.ARRAY(DataTypes.INT()), + DataTypes.ARRAY( + DataTypes.ROW(DataTypes.BOOLEAN(), DataTypes.DATE())), + DataTypes.ARRAY(DataTypes.INT()), + DataTypes.ARRAY(DataTypes.ARRAY(DataTypes.INT())), + DataTypes.ARRAY(DataTypes.MAP(DataTypes.INT(), DataTypes.STRING()))) + // ARRAY<INT> + .testResult( + $("f0").arrayExcept(new Integer[] {1, null, 4}), + "ARRAY_EXCEPT(f0, ARRAY[1, NULL, 4])", + new Integer[] {2, 2}, + DataTypes.ARRAY(DataTypes.INT()).nullable()) + .testResult( + $("f0").arrayExcept(new Integer[] {1}), + "ARRAY_EXCEPT(f0, ARRAY[1])", + new Integer[] {2, 2}, + DataTypes.ARRAY(DataTypes.INT()).nullable()) + .testResult( + $("f0").arrayExcept(new Integer[] {42}), + "ARRAY_EXCEPT(f0, ARRAY[42])", + new Integer[] {1, 2, 2}, + DataTypes.ARRAY(DataTypes.INT()).nullable()) + // arrayTwo is NULL + .testResult( + $("f0").arrayExcept( + lit(null, DataTypes.ARRAY(DataTypes.INT())) + .cast(DataTypes.ARRAY(DataTypes.INT()))), + "ARRAY_EXCEPT(f0, CAST(NULL AS ARRAY<INT>))", + null, + DataTypes.ARRAY(DataTypes.INT()).nullable()) + // arrayTwo contains null elements + .testResult( + $("f0").arrayExcept(new Integer[] {null, 2}), + "ARRAY_EXCEPT(f0, ARRAY[null, 2])", + new Integer[] {1, 2}, + DataTypes.ARRAY(DataTypes.INT()).nullable()) + // arrayOne is NULL + .testResult( + $("f1").arrayExcept(new Integer[] {1, 2, 3}), + "ARRAY_EXCEPT(f1, ARRAY[1,2,3])", + null, + DataTypes.ARRAY(DataTypes.INT()).nullable()) + // arrayOne contains null elements + .testResult( + $("f3").arrayExcept(new Integer[] {null, 42}), + "ARRAY_EXCEPT(f3, ARRAY[null, 42])", + new Integer[] {null, 1}, + DataTypes.ARRAY(DataTypes.INT()).nullable()) + // ARRAY<ROW<BOOLEAN, DATE>> + .testResult( + $("f2").arrayExcept( + new Row[] { + Row.of(true, LocalDate.of(1990, 10, 14)) + }), + "ARRAY_EXCEPT(f2, ARRAY[(TRUE, DATE '1990-10-14')])", + new Row[] {Row.of(true, LocalDate.of(2022, 4, 20)), null}, + DataTypes.ARRAY( + DataTypes.ROW( + DataTypes.BOOLEAN(), DataTypes.DATE())) + .nullable()) + .testResult( + $("f2").arrayExcept( + lit( + null, + DataTypes.ARRAY( + DataTypes.ROW( + DataTypes.BOOLEAN(), + DataTypes.DATE()))) + .cast( + DataTypes.ARRAY( + DataTypes.ROW( + DataTypes.BOOLEAN(), + DataTypes + .DATE())))), Review Comment: Why do you need the `cast`? You cast to the same type as the literal type. ########## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ArrayExceptFunction.java: ########## @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.functions.scalar; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.data.ArrayData; +import org.apache.flink.table.data.GenericArrayData; +import org.apache.flink.table.functions.BuiltInFunctionDefinitions; +import org.apache.flink.table.functions.FunctionContext; +import org.apache.flink.table.functions.SpecializedFunction; +import org.apache.flink.table.runtime.util.EqualityAndHashcodeProvider; +import org.apache.flink.table.runtime.util.ObjectContainer; +import org.apache.flink.table.types.CollectionDataType; +import org.apache.flink.table.types.DataType; +import org.apache.flink.util.FlinkRuntimeException; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** Implementation of {@link BuiltInFunctionDefinitions#ARRAY_EXCEPT}. */ +@Internal +public class ArrayExceptFunction extends BuiltInScalarFunction { + private final ArrayData.ElementGetter elementGetter; + private final EqualityAndHashcodeProvider equalityAndHashcodeProvider; + + public ArrayExceptFunction(SpecializedFunction.SpecializedContext context) { + super(BuiltInFunctionDefinitions.ARRAY_EXCEPT, context); + final DataType dataType = + ((CollectionDataType) context.getCallContext().getArgumentDataTypes().get(0)) + .getElementDataType() + .toInternal(); + elementGetter = ArrayData.createElementGetter(dataType.toInternal().getLogicalType()); + this.equalityAndHashcodeProvider = new EqualityAndHashcodeProvider(context, dataType); + } + + @Override + public void open(FunctionContext context) throws Exception { + equalityAndHashcodeProvider.open(context); + } + + public @Nullable ArrayData eval(ArrayData arrayOne, ArrayData arrayTwo) { + try { + if (arrayOne == null || arrayTwo == null) { + return null; + } + + List<Object> list = new ArrayList<>(); + Map<ObjectContainer, Integer> map = new HashMap<>(); + for (int pos = 0; pos < arrayOne.size(); pos++) { + final Object element = elementGetter.getElementOrNull(arrayOne, pos); + if (element == null) { + map.put(null, map.getOrDefault(null, 0) + 1); + } else { + ObjectContainer objectContainer = new ObjectContainer(element); + map.put(objectContainer, map.getOrDefault(objectContainer, 0) + 1); + } + } + for (int pos = 0; pos < arrayTwo.size(); pos++) { + final Object element = elementGetter.getElementOrNull(arrayTwo, pos); + if (element == null) { + if (map.containsKey(null)) { + int count = map.get(null); + count--; + if (count == 0) { + map.remove(null); + } else { + map.put(null, count); + } + } + } else { + ObjectContainer objectContainer = new ObjectContainer(element); + if (map.containsKey(objectContainer)) { + int count = map.get(objectContainer); + count--; + if (count == 0) { + map.remove(objectContainer); + } else { + map.put(objectContainer, count); + } + } + } Review Comment: The two branches are identical. You can do sth like: ``` final ObjectContainer objectContainer = element == null ? null : new ObjectContainer(element); if (map.containsKey(objectContainer)) { int count = map.get(objectContainer); count--; if (count == 0) { map.remove(objectContainer); } else { map.put(objectContainer, count); } } ``` ########## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ArrayExceptFunction.java: ########## @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.functions.scalar; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.data.ArrayData; +import org.apache.flink.table.data.GenericArrayData; +import org.apache.flink.table.functions.BuiltInFunctionDefinitions; +import org.apache.flink.table.functions.FunctionContext; +import org.apache.flink.table.functions.SpecializedFunction; +import org.apache.flink.table.runtime.util.EqualityAndHashcodeProvider; +import org.apache.flink.table.runtime.util.ObjectContainer; +import org.apache.flink.table.types.CollectionDataType; +import org.apache.flink.table.types.DataType; +import org.apache.flink.util.FlinkRuntimeException; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** Implementation of {@link BuiltInFunctionDefinitions#ARRAY_EXCEPT}. */ +@Internal +public class ArrayExceptFunction extends BuiltInScalarFunction { + private final ArrayData.ElementGetter elementGetter; + private final EqualityAndHashcodeProvider equalityAndHashcodeProvider; + + public ArrayExceptFunction(SpecializedFunction.SpecializedContext context) { + super(BuiltInFunctionDefinitions.ARRAY_EXCEPT, context); + final DataType dataType = + ((CollectionDataType) context.getCallContext().getArgumentDataTypes().get(0)) + .getElementDataType() + .toInternal(); + elementGetter = ArrayData.createElementGetter(dataType.toInternal().getLogicalType()); + this.equalityAndHashcodeProvider = new EqualityAndHashcodeProvider(context, dataType); + } + + @Override + public void open(FunctionContext context) throws Exception { + equalityAndHashcodeProvider.open(context); + } + + public @Nullable ArrayData eval(ArrayData arrayOne, ArrayData arrayTwo) { + try { + if (arrayOne == null || arrayTwo == null) { + return null; + } + + List<Object> list = new ArrayList<>(); + Map<ObjectContainer, Integer> map = new HashMap<>(); + for (int pos = 0; pos < arrayOne.size(); pos++) { + final Object element = elementGetter.getElementOrNull(arrayOne, pos); + if (element == null) { + map.put(null, map.getOrDefault(null, 0) + 1); + } else { + ObjectContainer objectContainer = new ObjectContainer(element); + map.put(objectContainer, map.getOrDefault(objectContainer, 0) + 1); + } + } + for (int pos = 0; pos < arrayTwo.size(); pos++) { + final Object element = elementGetter.getElementOrNull(arrayTwo, pos); + if (element == null) { + if (map.containsKey(null)) { + int count = map.get(null); + count--; + if (count == 0) { + map.remove(null); + } else { + map.put(null, count); + } + } + } else { + ObjectContainer objectContainer = new ObjectContainer(element); + if (map.containsKey(objectContainer)) { + int count = map.get(objectContainer); + count--; + if (count == 0) { + map.remove(objectContainer); + } else { + map.put(objectContainer, count); + } + } + } + } + for (int pos = 0; pos < arrayOne.size(); pos++) { Review Comment: Maybe I am tired while reviewing the PR, but why do you go twice over `arrayOne`? Shouldn't you build cardinality map from `arrayTwo` and when going throug `arrayOne` check if the element is present in the cardinality map. If the element is present don't add it to the list and decrease the count in the map, if it is no longer in the cardinality map add it to the list? ########## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ArrayExceptFunction.java: ########## @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.functions.scalar; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.data.ArrayData; +import org.apache.flink.table.data.GenericArrayData; +import org.apache.flink.table.functions.BuiltInFunctionDefinitions; +import org.apache.flink.table.functions.FunctionContext; +import org.apache.flink.table.functions.SpecializedFunction; +import org.apache.flink.table.runtime.util.EqualityAndHashcodeProvider; +import org.apache.flink.table.runtime.util.ObjectContainer; +import org.apache.flink.table.types.CollectionDataType; +import org.apache.flink.table.types.DataType; +import org.apache.flink.util.FlinkRuntimeException; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** Implementation of {@link BuiltInFunctionDefinitions#ARRAY_EXCEPT}. */ +@Internal +public class ArrayExceptFunction extends BuiltInScalarFunction { + private final ArrayData.ElementGetter elementGetter; + private final EqualityAndHashcodeProvider equalityAndHashcodeProvider; + + public ArrayExceptFunction(SpecializedFunction.SpecializedContext context) { + super(BuiltInFunctionDefinitions.ARRAY_EXCEPT, context); + final DataType dataType = + ((CollectionDataType) context.getCallContext().getArgumentDataTypes().get(0)) + .getElementDataType() + .toInternal(); + elementGetter = ArrayData.createElementGetter(dataType.toInternal().getLogicalType()); + this.equalityAndHashcodeProvider = new EqualityAndHashcodeProvider(context, dataType); + } + + @Override + public void open(FunctionContext context) throws Exception { + equalityAndHashcodeProvider.open(context); + } + + public @Nullable ArrayData eval(ArrayData arrayOne, ArrayData arrayTwo) { + try { + if (arrayOne == null || arrayTwo == null) { + return null; + } + + List<Object> list = new ArrayList<>(); + Map<ObjectContainer, Integer> map = new HashMap<>(); + for (int pos = 0; pos < arrayOne.size(); pos++) { + final Object element = elementGetter.getElementOrNull(arrayOne, pos); + if (element == null) { + map.put(null, map.getOrDefault(null, 0) + 1); Review Comment: There is `Map#merge` for that purpose ``` map.merge(null, 1, (k, v) -> v + 1); ``` ########## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ArrayExceptFunction.java: ########## @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.functions.scalar; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.data.ArrayData; +import org.apache.flink.table.data.GenericArrayData; +import org.apache.flink.table.functions.BuiltInFunctionDefinitions; +import org.apache.flink.table.functions.FunctionContext; +import org.apache.flink.table.functions.SpecializedFunction; +import org.apache.flink.table.runtime.util.EqualityAndHashcodeProvider; +import org.apache.flink.table.runtime.util.ObjectContainer; +import org.apache.flink.table.types.CollectionDataType; +import org.apache.flink.table.types.DataType; +import org.apache.flink.util.FlinkRuntimeException; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** Implementation of {@link BuiltInFunctionDefinitions#ARRAY_EXCEPT}. */ +@Internal +public class ArrayExceptFunction extends BuiltInScalarFunction { + private final ArrayData.ElementGetter elementGetter; + private final EqualityAndHashcodeProvider equalityAndHashcodeProvider; + + public ArrayExceptFunction(SpecializedFunction.SpecializedContext context) { + super(BuiltInFunctionDefinitions.ARRAY_EXCEPT, context); + final DataType dataType = + ((CollectionDataType) context.getCallContext().getArgumentDataTypes().get(0)) + .getElementDataType() + .toInternal(); + elementGetter = ArrayData.createElementGetter(dataType.toInternal().getLogicalType()); + this.equalityAndHashcodeProvider = new EqualityAndHashcodeProvider(context, dataType); + } + + @Override + public void open(FunctionContext context) throws Exception { + equalityAndHashcodeProvider.open(context); + } + + public @Nullable ArrayData eval(ArrayData arrayOne, ArrayData arrayTwo) { + try { + if (arrayOne == null || arrayTwo == null) { + return null; + } + + List<Object> list = new ArrayList<>(); + Map<ObjectContainer, Integer> map = new HashMap<>(); + for (int pos = 0; pos < arrayOne.size(); pos++) { + final Object element = elementGetter.getElementOrNull(arrayOne, pos); + if (element == null) { + map.put(null, map.getOrDefault(null, 0) + 1); + } else { + ObjectContainer objectContainer = new ObjectContainer(element); + map.put(objectContainer, map.getOrDefault(objectContainer, 0) + 1); + } + } + for (int pos = 0; pos < arrayTwo.size(); pos++) { + final Object element = elementGetter.getElementOrNull(arrayTwo, pos); + if (element == null) { + if (map.containsKey(null)) { + int count = map.get(null); + count--; + if (count == 0) { + map.remove(null); + } else { + map.put(null, count); + } + } + } else { + ObjectContainer objectContainer = new ObjectContainer(element); + if (map.containsKey(objectContainer)) { + int count = map.get(objectContainer); + count--; + if (count == 0) { + map.remove(objectContainer); + } else { + map.put(objectContainer, count); + } + } + } Review Comment: You could also use `computeIfPresent` here: ``` map.computeIfPresent(objectContainer, (k, v) -> v - 1 <= 0 ? null : v -1) ``` ########## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CollectionFunctionsITCase.java: ########## @@ -1516,4 +1517,141 @@ private Stream<TestSetSpec> arraySortTestCases() { }, DataTypes.ARRAY(DataTypes.DATE()))); } + + private Stream<TestSetSpec> arrayExceptTestCases() { + return Stream.of( + TestSetSpec.forFunction(BuiltInFunctionDefinitions.ARRAY_EXCEPT) + .onFieldsWithData( + new Integer[] {1, 2, 2}, + null, + new Row[] { + Row.of(true, LocalDate.of(2022, 4, 20)), + Row.of(true, LocalDate.of(1990, 10, 14)), + null + }, + new Integer[] {null, null, 1}, + new Integer[][] { + new Integer[] {1, null, 3}, new Integer[] {0}, new Integer[] {1} + }, + new Map[] { + CollectionUtil.map(entry(1, "a"), entry(2, "b")), + CollectionUtil.map(entry(3, "c"), entry(4, "d")), + null + }) + .andDataTypes( + DataTypes.ARRAY(DataTypes.INT()), + DataTypes.ARRAY(DataTypes.INT()), + DataTypes.ARRAY( + DataTypes.ROW(DataTypes.BOOLEAN(), DataTypes.DATE())), + DataTypes.ARRAY(DataTypes.INT()), + DataTypes.ARRAY(DataTypes.ARRAY(DataTypes.INT())), + DataTypes.ARRAY(DataTypes.MAP(DataTypes.INT(), DataTypes.STRING()))) + // ARRAY<INT> + .testResult( Review Comment: can you also check removing the same element multiple times? e.g. `[1,2, 3, 2, 4 ,2] EXCEPT [2,2] = [1,3,4,2]` ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org