Attaching client code here again as it got blocked previously by the firewall.
Thanks, GIrish From: Girish Narayanan <girish.naraya...@ibm.com> Date: Thursday, 20 July 2023 at 12:06 PM To: user@flink.apache.org <user@flink.apache.org> Subject: Flink plain avrò deserilization issue Hi Team/Users, We are trying to send plain AVRO messages to a Kafka topic but they are not getting deserialized properly at Flink side. I am attaching the client code which sends the AVRO messages to kafka topic called orders. As of now we are able to successfully send Avro messages using AVSC library to Kafka topic and it is available in the corresponding Kafka topic (verified using Kafka-console-consumer.sh) However Flink is not able to deserialise the Avro messages properly. The Flink SQL used to create the table is as follows:CREATE TABLE `source_1` ( `orderId` INT, `description` STRING, `price` FLOAT, `quantity` INT ) WITH ( 'connector' = 'kafka', 'topic' = 'orders', 'properties.bootstrap.servers' = 'kafka.01:9000', 'scan.startup.mode' = 'latest-offset', 'format' = 'avro', 'properties.security.protocol' = 'PLAINTEXT' ); * We are constantly facing the below exception: Caused by: java.io.IOException: Failed to deserialize consumer record ConsumerRecord(topic = orders, partition = 0, leaderEpoch = 0, offset = 40, CreateTime = 1689655077724, serialized key size = -1, serialized value size = 15, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = [B@5eef8c84). at org.apache.flink.connector.kafka.source.reader.deserializer.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:57) at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:53) ... 14 more Caused by: java.io.IOException: Failed to deserialize Avro record. at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:106) at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:46) at org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82) at org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.deserialize(DynamicKafkaDeserializationSchema.java:113) at org.apache.flink.connector.kafka.source.reader.deserializer.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:54) ... 15 more Caused by: java.lang.ArrayIndexOutOfBoundsException: Array index out of range: 12345 at org.apache.flink.avro.shaded.org.apache.avro.io.parsing.Symbol$Alternative.getSymbol(Symbol.java:460) at org.apache.flink.avro.shaded.org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:283) at org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:188) * We found a probable solution in SO which says to send Avro schema as a part of every message which we tried but still facing the same error. https://stackoverflow.com/questions/66065158/failed-to-deserialize-avro-record-apache-flink-sql-cli * However this solution is contradictory to what is mentioned in the Flink docs which says that there is no need to send schema as a part of Avro messages and the docs also mention explicitly that it is not supported. Link to the docs: https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/table/formats/avro/#format-options Excerpt from the docs: Currently, the Avro schema is always derived from table schema. Explicitly defining an Avro schema is not supported yet. Thanks, Girish
<<attachment: client_code.zip>>