Hi 从错误上看应该是schema跟数据不匹配导致导致的,看起来目前avro不支持这种schema变更新老数据一起处理
Best, Shammon.FY On Fri, Mar 10, 2023 at 2:29 PM Peihui He <[email protected]> wrote: > java.io.IOException: Failed to deserialize Avro record. > at > > org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:106) > at > > org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:46) > at > > org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82) > at > > org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.deserialize(DynamicKafkaDeserializationSchema.java:130) > at > > org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:179) > at > > org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142) > at > > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826) > at > > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110) > at > > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66) > at > > org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269) > Caused by: java.io.EOFException > at > org.apache.flink.avro.shaded.org.apache.avro.io > .BinaryDecoder$InputStreamByteSource.readRaw(BinaryDecoder.java:851) > at > org.apache.flink.avro.shaded.org.apache.avro.io > .BinaryDecoder.doReadBytes(BinaryDecoder.java:373) > at > org.apache.flink.avro.shaded.org.apache.avro.io > .BinaryDecoder.readString(BinaryDecoder.java:290) > at > org.apache.flink.avro.shaded.org.apache.avro.io > .ResolvingDecoder.readString(ResolvingDecoder.java:208) > at > > org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:469) > at > > org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:459) > at > > org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:191) > at > > org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160) > at > > org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:259) > at > > org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247) > at > > org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179) > at > > org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.readArray(GenericDatumReader.java:298) > at > > org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:183) > at > > org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160) > at > > org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:259) > at > > org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247) > at > > org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179) > at > > org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160) > at > > org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153) > at > > org.apache.flink.formats.avro.AvroDeserializationSchema.deserialize(AvroDeserializationSchema.java:142) > at > > org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:103) > ... 9 more > > > 如上, > 比如 > 之前的schemal 是 > { > a, > b > } > > 后来调整为 > { > a, > b, > c > } > > 当程序升级后,由于kafka中同时包含新旧数据,就会报错了 > > Shammon FY <[email protected]> 于2023年2月24日周五 18:56写道: > > > Hi > > > > 你可以贴一下错误看下具体原因 > > > > Best, > > Shammon > > > > On Fri, Feb 24, 2023 at 6:10 PM Peihui He <[email protected]> wrote: > > > > > Hi, all > > > > > > 请教大家有没有遇到这样的情况,flink 使用avro > > > 消费kafka中数据,后来在schema结构中加入新的字段。在灰度过程中会混杂着新老数据,这样的flink 消费程序就会挂掉。 > > > > > > 大家一般是怎么处理的呢 > > > > > > Best Wishes. > > > > > >
