[ https://issues.apache.org/jira/browse/FLINK-33110?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17767106#comment-17767106 ]
Martijn Visser commented on FLINK-33110: ---------------------------------------- [~romanlupiichuk] Can you please verify if this issue still exists with Flink 1.17? > Array content gets replaced with last element duplicates > -------------------------------------------------------- > > Key: FLINK-33110 > URL: https://issues.apache.org/jira/browse/FLINK-33110 > Project: Flink > Issue Type: Bug > Affects Versions: 1.15.4 > Reporter: Roman Lupiichuk > Priority: Minor > > After upgrade from Flink 1.14.0 to 1.15.4 a bunch of our tests started to > fail. > I've striped down one of the failing test to following (it's in Kotlin) > {code:java} > import org.apache.flink.configuration.Configuration > import org.apache.flink.table.annotation.DataTypeHint > import org.apache.flink.table.annotation.FunctionHint > import org.apache.flink.table.annotation.InputGroup > import org.apache.flink.table.api.TableEnvironment > import org.apache.flink.table.functions.ScalarFunction > import org.apache.flink.table.planner.factories.TestValuesTableFactory > import org.apache.flink.types.Row > import org.junit.jupiter.api.Test > @FunctionHint(output = DataTypeHint("ARRAY<ROW<fieldName STRING NOT NULL>>")) > object TestArrayFunc : ScalarFunction() { > fun eval(@DataTypeHint(inputGroup = InputGroup.ANY) vararg values: Any): > Array<Row> = > values > .map { data -> > val casted = data as Map<String, String?> > Row.of(casted["fieldName"]) > } > .toTypedArray() > } > class ArrayFieldTest { > @Test > fun test() { > val tableEnv = TableEnvironment.create( > Configuration().also { > it.setString("table.exec.resource.default-parallelism", "1") > }, > ) > tableEnv.createTemporarySystemFunction("TO_FIELDS_ARRAY", > TestArrayFunc) > val dataId = TestValuesTableFactory.registerData( > listOf( > TestValuesTableFactory.changelogRow( > "+I", > "123" > ) > ) > ) > tableEnv.executeSql( > """ > CREATE TABLE events > ( > id STRING > ) WITH ( > 'connector' = 'values', > 'data-id' = '$dataId' > ) > """ > ) > tableEnv.executeSql( > """ > CREATE TABLE results > ( > fields ARRAY<ROW<fieldName STRING>>, > event_time TIMESTAMP > ) WITH ( > 'connector' = 'print' > ) > """ > ) > tableEnv.executeSql( > """ > INSERT INTO results (fields, event_time) > SELECT > TO_FIELDS_ARRAY( > MAP['fieldName', 'foo'], > MAP['fieldName', 'hello'] > ), > NOW() > FROM events > """ > ) > } > } > {code} > In Flink 1.14.0 it produces > {code:java} > +I[[+I[foo], +I[hello]], 2023-09-18T08:18:55.278]{code} > That's correct and expected output. > But in Flink 1.15.4 the output is > {code:java} > +I[[+I[hello], +I[hello]], 2023-09-18T08:21:12.569]{code} > As one can see all elements in the array were replaced with the last element > duplicates. > The issue goes away if I > # either remove NOT NULL constraint from function hint > # or remove TIMESTAMP field from the sink table > There is also no issue in regular Flink cluster, only in MiniCluster which is > used in testing. -- This message was sent by Atlassian Jira (v8.20.10#820010)