Roman Lupiichuk created FLINK-33110: ---------------------------------------
Summary: Array content gets replaced with last element duplicates Key: FLINK-33110 URL: https://issues.apache.org/jira/browse/FLINK-33110 Project: Flink Issue Type: Bug Affects Versions: 1.15.4 Reporter: Roman Lupiichuk After upgrade from Flink 1.14.0 to 1.15.4 a bunch of our tests started to fail. I've striped down one of the failing test to following (it's in Kotlin) {code:java} import org.apache.flink.configuration.Configuration import org.apache.flink.table.annotation.DataTypeHint import org.apache.flink.table.annotation.FunctionHint import org.apache.flink.table.annotation.InputGroup import org.apache.flink.table.api.TableEnvironment import org.apache.flink.table.functions.ScalarFunction import org.apache.flink.table.planner.factories.TestValuesTableFactory import org.apache.flink.types.Row import org.junit.jupiter.api.Test @FunctionHint(output = DataTypeHint("ARRAY<ROW<fieldName STRING NOT NULL>>")) object TestArrayFunc : ScalarFunction() { fun eval(@DataTypeHint(inputGroup = InputGroup.ANY) vararg values: Any): Array<Row> = values .map { data -> val casted = data as Map<String, String?> Row.of(casted["fieldName"]) } .toTypedArray() } class ArrayFieldTest { @Test fun test() { val tableEnv = TableEnvironment.create( Configuration().also { it.setString("table.exec.resource.default-parallelism", "1") }, ) tableEnv.createTemporarySystemFunction("TO_FIELDS_ARRAY", TestArrayFunc) val dataId = TestValuesTableFactory.registerData( listOf( TestValuesTableFactory.changelogRow( "+I", "123" ) ) ) tableEnv.executeSql( """ CREATE TABLE events ( id STRING ) WITH ( 'connector' = 'values', 'data-id' = '$dataId' ) """ ) tableEnv.executeSql( """ CREATE TABLE results ( fields ARRAY<ROW<fieldName STRING>>, event_time TIMESTAMP ) WITH ( 'connector' = 'print' ) """ ) tableEnv.executeSql( """ INSERT INTO results (fields, event_time) SELECT TO_FIELDS_ARRAY( MAP['fieldName', 'foo'], MAP['fieldName', 'hello'] ), NOW() FROM events """ ) } } {code} In Flink 1.14.0 it produces {code:java} +I[[+I[foo], +I[hello]], 2023-09-18T08:18:55.278]{code} That's correct and expected output. But in Flink 1.15.4 the output is {code:java} +I[[+I[hello], +I[hello]], 2023-09-18T08:21:12.569]{code} As one can see all elements in the array were replaced with the last element. The issue goes away if I # either remove NOT NULL constraint from function hint # or remove TIMESTAMP field from the sink table There is also no issue in regular Flink cluster, only in MiniCluster which is used in testing. -- This message was sent by Atlassian Jira (v8.20.10#820010)