Hi Agnelo, How is the writer schema encoded if you are using no schema registry? Or phrased differently: how does Flink know with which schema the data has been written so that it can map it to the new schema?
On Wed, Apr 14, 2021 at 8:33 AM Agnelo Dcosta <[email protected]> wrote: > Hi, we are using Flink SQL 1.12 and have a couple of tables created from > kafka topics. Format is avro (not confluent avro) and no schema registry as > such. > > In flink 1.11 we used to specify the schema, however in 1.12 the schema is > derived from the message itself. > > Is it possible for the producers to start sending new fields without > changes in the flink app? > > > > For example : > > { > > "name": "topic1", > > "type": "record", > > "fields": [ > > { > > "name": "field1", > > "type": "string" > > }, > > { > > "name": "field2", > > "type": "string" > > }, > > { > > *"name": "field3",* > > * "type": "string"* > > }, > > ] > > } > > > > Flink table has: > > CREATE TABLE topic1(\n" > > + " field1 string not null \n" > > + " ,field2 string not null \n" > > "'connector' = 'kafka' \n" > > + ",'topic' = 'topic1' \n" > > + ",'scan.startup.mode' = 'latest-offset' \n" > > + ",'properties.group.id' = 'topic1' \n" > > + ",'properties.bootstrap.servers' = 'localhost:8082' \n" > > + ",'properties.enable.auto.commit' = 'true' \n" > > + ",'format' = 'avro' \n"; > > > > With above settings I get a deserialization error: > > > > *java.io.IOException: Failed to deserialize Avro record.* > > * at > org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:104) > ~[flink-sql-avro-1.12.0.jar:1.12.0]* > > * at > org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:44) > ~[flink-sql-avro-1.12.0.jar:1.12.0]* > > * at > org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82) > ~[flink-core-1.12.0.jar:1.12.0]* > > * at > org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.deserialize(DynamicKafkaDeserializationSchema.java:113) > ~[flink-connector-kafka_2.11-1.12.0.jar:1.12.0]* > > * at > org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:177) > ~[flink-connector-kafka_2.11-1.12.0.jar:1.12.0]* > > * at > org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:137) > ~[flink-connector-kafka_2.11-1.12.0.jar:1.12.0]* > > * at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:761) > ~[flink-connector-kafka_2.11-1.12.0.jar:1.12.0]* > > * at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) > ~[flink-streaming-java_2.11-1.12.0.jar:1.12.0]* > > * at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) > ~[flink-streaming-java_2.11-1.12.0.jar:1.12.0]* >
