I have created the example table to read data that follows the schema
defined in example.avro.

> *CREATE TABLE example (*

*  fieldA STRING,*

*  fieldB INT,*

*  arrayField ARRAY<INT>*

*) WITH (*

*    'connector' = 'kafka',*

*    'topic' = 'test_flink',*

*    'properties.bootstrap.servers' = '*****',*

*    'properties.group.id <http://properties.group.id>' = '*****',*

*    'properties.auto.offset.reset' = 'earliest',*

*    'format' = 'avro',*

*    'scan.watermark.idle-timeout' = '1000',*

*    *****



*  "namespace": "example.smkt",*

*  "type": "record",*

*  "name": "example",*

*  "fields": [*

*    {*

*      "name": "fieldA",*

*      "type": "string"*

*    },*

*    {*

*      "name": "fieldB",*

*      "type": "int"*

*    },*

*    {*

*      "name": "arrayField",*

*      "type": {*

*        "type": "array",*

*        "items": "int"*

*      }*

*    }*

*  ]*


   1. On one hand, if I insert data from sql-client and then read it from
   there, *everything works correctly*.
   2. On the other hand, if I generate data from an external client and try
   to read it with SELECT * FROM example, I get the error:

   Caused by: java.lang.ArrayIndexOutOfBoundsException: 350

   Even though I can read the data from an external client without issues
   and confirm that the data is correct.

What I have verified:


   *I create the data with INSERT and read it from Flink.* OK.

   *I create data with INSERT and read it with an external Java program.*

   Flink SQL> INSERT INTO example (fieldA, fieldB, arrayField) VALUES (
       ARRAY[13, 5]

   - *Reading from external client:*

      Bytes: [B@7a5ceeddKey: null, Value: {"fieldA": "SITEB",
"fieldB": 1, "arrayField": [1, 13, 5]}

      - Why is there always a 1 at the beginning of the array?

   Another case:

   Flink SQL> INSERT INTO example (fieldA, fieldB, arrayField) VALUES (
       ARRAY[2, 2, 2]

   - *Reading from external client:*

      Bytes: [B@6cb107fdKey: null, Value: {"fieldA": "SITEB",
"fieldB": 1, "arrayField": [1, 2, 1, 1, 2]}

      -  Why are there additional values like 1, 1 in the output?

   Additional case, when array has just one element:

   INSERT INTO example (fieldA, fieldB, arrayField) VALUES (

   - *Result:*

      Bytes: [B@2ab4bc72
        at org.apache.avro.io.BinaryDecoder.ensureBounds(BinaryDecoder.java:514)
        at org.apache.avro.io.BinaryDecoder.readInt(BinaryDecoder.java:155)

The table definition and INSERT seem correct, but I haven't found specific
examples in Flink's documentation.
How can I verify the schema generated by the CREATE TABLE? In other words,
the actual schema that Flink uses internally for
Why are those additional 1s values appearing in the arrays when reading the
Why, when there is only one record, can't I deserialize it correctly with
an external program?

The Java program I created to investigate this issue is not very complex
and simply reads messages from Kafka and attempts to deserialize them using
the example.avro schema.

Consumer<String, byte[]> consumer = new KafkaConsumer<>(props);
>        consumer.subscribe(Collections.singletonList(topicName));
>        System.out.println("Subs to " + topicName + "props:" + props) ;
>        try {
>            while (true) {
>                System.out.println("Reading...") ;
>                ConsumerRecords<String, byte[]> records =
> consumer.poll(Duration.ofMillis(1000));
>                records.forEach(record -> {
>                    try {
>                        System.out.println("Bytes: " + record.value());
>                        if (record.value() != null) {
>                            GenericRecord avroRecord =
> deserializeAvro(record.value(), schema);
>                            System.out.println("Key: " + record.key() + ",
> Value: " + avroRecord);
>                        }else{
>                            System.out.println("Key: " + record.key() + ",
> Value: " + null);
>                        }
>                    } catch (IOException e) {
>                        e.printStackTrace();
>                    }
>                });
>            }
>        } finally {
>            consumer.close();
>        }
>    }

   private static Schema loadSchema(String schemaFilePath) {
>        try {
>            String schemaContent = new String(Files.readAllBytes(new
> File(schemaFilePath).toPath()));
>            System.out.println("Schema read " + schemaContent) ;
>            return new Schema.Parser().parse(schemaContent);
>        } catch (IOException e) {
>            throw new RuntimeException("Error reading schema file: " +
> schemaFilePath, e);
>        }
>    }
>    private static GenericRecord deserializeAvro(byte[] data, Schema
> schema) throws IOException {
>        DatumReader<GenericRecord> reader = new
> GenericDatumReader<>(schema);
>        Decoder decoder = DecoderFactory.get().binaryDecoder(data, null);
>        return reader.read(null, decoder);
>    }

