Yunhong Zheng created FLINK-32897:
-------------------------------------

             Summary: Kafka with nested array row type will get wrong result
                 Key: FLINK-32897
                 URL: https://issues.apache.org/jira/browse/FLINK-32897
             Project: Flink
          Issue Type: Bug
          Components: Connectors / Kafka
    Affects Versions: 1.18.0
            Reporter: Yunhong Zheng
             Fix For: 1.19.0
         Attachments: image-2023-08-21-08-48-25-116.png

{code:java}
// code placeholder
CREATE TABLE kafkaTest (
  a STRING NOT NULL,
  config ARRAY<ROW<notificationTypeOptInReference STRING NOT NULL,cso STRING    
NOT NULL,autocreate BOOLEAN NOT NULL> NOT NULL> NOT NULL,
  ingestionTime TIMESTAMP(3) METADATA FROM 'timestamp',
  PRIMARY KEY (businessEvent) NOT ENFORCED) 
WITH (
     'connector' = 'kafka',
      'topic' = 'test',
      'properties.group.id' = 'testGroup',
      'scan.startup.mode' = 'earliest-offset',
      'properties.bootstrap.servers' = '', 
      'properties.security.protocol' = 'SASL_SSL',
      'properties.sasl.mechanism' = 'PLAIN',
      'properties.sasl.jaas.config' = ';',
      'value.format' = 'json',
      'sink.partitioner' = 'fixed'
); {code}
If we with the following INSERT, it will see that the last item in the array is 
placed in the topic 3 times and the first two are igniored.
{code:java}
// code placeholder
INSERT INTO kafkaTest VALUES ('Transaction', ARRAY[ROW('G', 'IT', 
true),ROW('H', 'FR', true), ROW('I', 'IT', false)], TIMESTAMP '2023-08-30 
14:01:00'); {code}
The result:

!image-2023-08-21-08-48-25-116.png|width=296,height=175!

If I use the 'print' sink, I can get the right result. So I think this is a bug 
of 'kafka' connector.

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to