[ 
https://issues.apache.org/jira/browse/FLINK-33110?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17767106#comment-17767106
 ] 

Martijn Visser commented on FLINK-33110:
----------------------------------------

[~romanlupiichuk] Can you please verify if this issue still exists with Flink 
1.17?

> Array content gets replaced with last element duplicates
> --------------------------------------------------------
>
>                 Key: FLINK-33110
>                 URL: https://issues.apache.org/jira/browse/FLINK-33110
>             Project: Flink
>          Issue Type: Bug
>    Affects Versions: 1.15.4
>            Reporter: Roman Lupiichuk
>            Priority: Minor
>
> After upgrade from Flink 1.14.0 to 1.15.4 a bunch of our tests started to 
> fail.
> I've striped down one of the failing test to following (it's in Kotlin)
> {code:java}
> import org.apache.flink.configuration.Configuration
> import org.apache.flink.table.annotation.DataTypeHint
> import org.apache.flink.table.annotation.FunctionHint
> import org.apache.flink.table.annotation.InputGroup
> import org.apache.flink.table.api.TableEnvironment
> import org.apache.flink.table.functions.ScalarFunction
> import org.apache.flink.table.planner.factories.TestValuesTableFactory
> import org.apache.flink.types.Row
> import org.junit.jupiter.api.Test
> @FunctionHint(output = DataTypeHint("ARRAY<ROW<fieldName STRING NOT NULL>>"))
> object TestArrayFunc : ScalarFunction() {
>     fun eval(@DataTypeHint(inputGroup = InputGroup.ANY) vararg values: Any): 
> Array<Row> =
>         values
>             .map { data ->
>                 val casted = data as Map<String, String?>
>                 Row.of(casted["fieldName"])
>             }
>             .toTypedArray()
> }
> class ArrayFieldTest {
>     @Test
>     fun test() {
>         val tableEnv = TableEnvironment.create(
>             Configuration().also {
>                 it.setString("table.exec.resource.default-parallelism", "1")
>             },
>         )
>         tableEnv.createTemporarySystemFunction("TO_FIELDS_ARRAY", 
> TestArrayFunc)
>         val dataId = TestValuesTableFactory.registerData(
>             listOf(
>                 TestValuesTableFactory.changelogRow(
>                     "+I",
>                     "123"
>                 )
>             )
>         )
>         tableEnv.executeSql(
>             """
>                 CREATE TABLE events
>                 (
>                     id STRING
>                 ) WITH (
>                     'connector' = 'values',
>                     'data-id' = '$dataId'
>                 )
>             """
>         )
>         tableEnv.executeSql(
>             """
>                 CREATE TABLE results
>                 (
>                     fields ARRAY<ROW<fieldName STRING>>,
>                     event_time TIMESTAMP
>                 ) WITH (
>                     'connector' = 'print'
>                 )
>             """
>         )
>         tableEnv.executeSql(
>             """
>                 INSERT INTO results (fields, event_time)
>                 SELECT
>                     TO_FIELDS_ARRAY(
>                        MAP['fieldName', 'foo'],
>                        MAP['fieldName', 'hello']
>                     ),
>                     NOW()
>                 FROM events
>             """
>         )
>     }
> }
>  {code}
> In Flink 1.14.0 it produces
> {code:java}
> +I[[+I[foo], +I[hello]], 2023-09-18T08:18:55.278]{code}
> That's correct and expected output.
> But in Flink 1.15.4 the output is
> {code:java}
> +I[[+I[hello], +I[hello]], 2023-09-18T08:21:12.569]{code}
> As one can see all elements in the array were replaced with the last element 
> duplicates.
> The issue goes away if I
>  # either remove NOT NULL constraint from function hint
>  # or remove TIMESTAMP field from the sink table
> There is also no issue in regular Flink cluster, only in MiniCluster which is 
> used in testing.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to