I have created the example table to read data that follows the schema
defined in example.avro.


> *CREATE TABLE example (*

*  fieldA STRING,*

*  fieldB INT,*

*  arrayField ARRAY<INT>*

*) WITH (*

*    'connector' = 'kafka',*

*    'topic' = 'test_flink',*

*    'properties.bootstrap.servers' = '*****',*

*    'properties.group.id <http://properties.group.id>' = '*****',*

*    'properties.auto.offset.reset' = 'earliest',*

*    'format' = 'avro',*

*    'scan.watermark.idle-timeout' = '1000',*

*    *****

*);*


*{*

*  "namespace": "example.smkt",*

*  "type": "record",*

*  "name": "example",*

*  "fields": [*

*    {*

*      "name": "fieldA",*

*      "type": "string"*

*    },*

*    {*

*      "name": "fieldB",*

*      "type": "int"*

*    },*

*    {*

*      "name": "arrayField",*

*      "type": {*

*        "type": "array",*

*        "items": "int"*

*      }*

*    }*

*  ]*

*}*



   1. On one hand, if I insert data from sql-client and then read it from
   there, *everything works correctly*.
   2. On the other hand, if I generate data from an external client and try
   to read it with SELECT * FROM example, I get the error:

   Caused by: java.lang.ArrayIndexOutOfBoundsException: 350

   Even though I can read the data from an external client without issues
   and confirm that the data is correct.

What I have verified:

   -

   *I create the data with INSERT and read it from Flink.* OK.
   -

   *I create data with INSERT and read it with an external Java program.*
   Example:


   Flink SQL> INSERT INTO example (fieldA, fieldB, arrayField) VALUES (
       'SITEB',
       1,
       ARRAY[13, 5]
   );

   - *Reading from external client:*

      Bytes: [B@7a5ceeddKey: null, Value: {"fieldA": "SITEB",
"fieldB": 1, "arrayField": [1, 13, 5]}

      - Why is there always a 1 at the beginning of the array?

   Another case:

   Flink SQL> INSERT INTO example (fieldA, fieldB, arrayField) VALUES (
       'SITEB',
       1,
       ARRAY[2, 2, 2]
   );

   - *Reading from external client:*

      Bytes: [B@6cb107fdKey: null, Value: {"fieldA": "SITEB",
"fieldB": 1, "arrayField": [1, 2, 1, 1, 2]}

      -  Why are there additional values like 1, 1 in the output?

   Additional case, when array has just one element:

   INSERT INTO example (fieldA, fieldB, arrayField) VALUES (
       'A',
       1,
       ARRAY[99]
   );

   - *Result:*

      Bytes: [B@2ab4bc72
      java.io.EOFException
        at org.apache.avro.io.BinaryDecoder.ensureBounds(BinaryDecoder.java:514)
        at org.apache.avro.io.BinaryDecoder.readInt(BinaryDecoder.java:155)
        ...



The table definition and INSERT seem correct, but I haven't found specific
examples in Flink's documentation.
How can I verify the schema generated by the CREATE TABLE? In other words,
the actual schema that Flink uses internally for
serialization/deserialization.
Why are those additional 1s values appearing in the arrays when reading the
data?
Why, when there is only one record, can't I deserialize it correctly with
an external program?


The Java program I created to investigate this issue is not very complex
and simply reads messages from Kafka and attempts to deserialize them using
the example.avro schema.

Consumer<String, byte[]> consumer = new KafkaConsumer<>(props);
>        consumer.subscribe(Collections.singletonList(topicName));
>        System.out.println("Subs to " + topicName + "props:" + props) ;
>        try {
>            while (true) {
>                System.out.println("Reading...") ;
>                ConsumerRecords<String, byte[]> records =
> consumer.poll(Duration.ofMillis(1000));
>                records.forEach(record -> {
>                    try {
>                        System.out.println("Bytes: " + record.value());
>                        if (record.value() != null) {
>                            GenericRecord avroRecord =
> deserializeAvro(record.value(), schema);
>                            System.out.println("Key: " + record.key() + ",
> Value: " + avroRecord);
>                        }else{
>                            System.out.println("Key: " + record.key() + ",
> Value: " + null);
>                        }
>                    } catch (IOException e) {
>                        e.printStackTrace();
>                    }
>                });
>            }
>        } finally {
>            consumer.close();
>        }
>    }

   private static Schema loadSchema(String schemaFilePath) {
>        try {
>            String schemaContent = new String(Files.readAllBytes(new
> File(schemaFilePath).toPath()));
>            System.out.println("Schema read " + schemaContent) ;
>            return new Schema.Parser().parse(schemaContent);
>        } catch (IOException e) {
>            throw new RuntimeException("Error reading schema file: " +
> schemaFilePath, e);
>        }
>    }
>    private static GenericRecord deserializeAvro(byte[] data, Schema
> schema) throws IOException {
>        DatumReader<GenericRecord> reader = new
> GenericDatumReader<>(schema);
>        Decoder decoder = DecoderFactory.get().binaryDecoder(data, null);
>        return reader.read(null, decoder);
>    }

Reply via email to