I have created the example table to read data that follows the schema defined in example.avro.
> *CREATE TABLE example (* * fieldA STRING,* * fieldB INT,* * arrayField ARRAY<INT>* *) WITH (* * 'connector' = 'kafka',* * 'topic' = 'test_flink',* * 'properties.bootstrap.servers' = '*****',* * 'properties.group.id <http://properties.group.id>' = '*****',* * 'properties.auto.offset.reset' = 'earliest',* * 'format' = 'avro',* * 'scan.watermark.idle-timeout' = '1000',* * ***** *);* *{* * "namespace": "example.smkt",* * "type": "record",* * "name": "example",* * "fields": [* * {* * "name": "fieldA",* * "type": "string"* * },* * {* * "name": "fieldB",* * "type": "int"* * },* * {* * "name": "arrayField",* * "type": {* * "type": "array",* * "items": "int"* * }* * }* * ]* *}* 1. On one hand, if I insert data from sql-client and then read it from there, *everything works correctly*. 2. On the other hand, if I generate data from an external client and try to read it with SELECT * FROM example, I get the error: Caused by: java.lang.ArrayIndexOutOfBoundsException: 350 Even though I can read the data from an external client without issues and confirm that the data is correct. What I have verified: - *I create the data with INSERT and read it from Flink.* OK. - *I create data with INSERT and read it with an external Java program.* Example: Flink SQL> INSERT INTO example (fieldA, fieldB, arrayField) VALUES ( 'SITEB', 1, ARRAY[13, 5] ); - *Reading from external client:* Bytes: [B@7a5ceeddKey: null, Value: {"fieldA": "SITEB", "fieldB": 1, "arrayField": [1, 13, 5]} - Why is there always a 1 at the beginning of the array? Another case: Flink SQL> INSERT INTO example (fieldA, fieldB, arrayField) VALUES ( 'SITEB', 1, ARRAY[2, 2, 2] ); - *Reading from external client:* Bytes: [B@6cb107fdKey: null, Value: {"fieldA": "SITEB", "fieldB": 1, "arrayField": [1, 2, 1, 1, 2]} - Why are there additional values like 1, 1 in the output? Additional case, when array has just one element: INSERT INTO example (fieldA, fieldB, arrayField) VALUES ( 'A', 1, ARRAY[99] ); - *Result:* Bytes: [B@2ab4bc72 java.io.EOFException at org.apache.avro.io.BinaryDecoder.ensureBounds(BinaryDecoder.java:514) at org.apache.avro.io.BinaryDecoder.readInt(BinaryDecoder.java:155) ... The table definition and INSERT seem correct, but I haven't found specific examples in Flink's documentation. How can I verify the schema generated by the CREATE TABLE? In other words, the actual schema that Flink uses internally for serialization/deserialization. Why are those additional 1s values appearing in the arrays when reading the data? Why, when there is only one record, can't I deserialize it correctly with an external program? The Java program I created to investigate this issue is not very complex and simply reads messages from Kafka and attempts to deserialize them using the example.avro schema. Consumer<String, byte[]> consumer = new KafkaConsumer<>(props); > consumer.subscribe(Collections.singletonList(topicName)); > System.out.println("Subs to " + topicName + "props:" + props) ; > try { > while (true) { > System.out.println("Reading...") ; > ConsumerRecords<String, byte[]> records = > consumer.poll(Duration.ofMillis(1000)); > records.forEach(record -> { > try { > System.out.println("Bytes: " + record.value()); > if (record.value() != null) { > GenericRecord avroRecord = > deserializeAvro(record.value(), schema); > System.out.println("Key: " + record.key() + ", > Value: " + avroRecord); > }else{ > System.out.println("Key: " + record.key() + ", > Value: " + null); > } > } catch (IOException e) { > e.printStackTrace(); > } > }); > } > } finally { > consumer.close(); > } > } private static Schema loadSchema(String schemaFilePath) { > try { > String schemaContent = new String(Files.readAllBytes(new > File(schemaFilePath).toPath())); > System.out.println("Schema read " + schemaContent) ; > return new Schema.Parser().parse(schemaContent); > } catch (IOException e) { > throw new RuntimeException("Error reading schema file: " + > schemaFilePath, e); > } > } > private static GenericRecord deserializeAvro(byte[] data, Schema > schema) throws IOException { > DatumReader<GenericRecord> reader = new > GenericDatumReader<>(schema); > Decoder decoder = DecoderFactory.get().binaryDecoder(data, null); > return reader.read(null, decoder); > }