java.io.IOException: Failed to deserialize Avro record.
at
org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:106)
at
org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:46)
at
org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82)
at
org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.deserialize(DynamicKafkaDeserializationSchema.java:130)
at
org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:179)
at
org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)
Caused by: java.io.EOFException
at
org.apache.flink.avro.shaded.org.apache.avro.io.BinaryDecoder$InputStreamByteSource.readRaw(BinaryDecoder.java:851)
at
org.apache.flink.avro.shaded.org.apache.avro.io.BinaryDecoder.doReadBytes(BinaryDecoder.java:373)
at
org.apache.flink.avro.shaded.org.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:290)
at
org.apache.flink.avro.shaded.org.apache.avro.io.ResolvingDecoder.readString(ResolvingDecoder.java:208)
at
org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:469)
at
org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:459)
at
org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:191)
at
org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
at
org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:259)
at
org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247)
at
org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
at
org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.readArray(GenericDatumReader.java:298)
at
org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:183)
at
org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
at
org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:259)
at
org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247)
at
org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
at
org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
at
org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
at
org.apache.flink.formats.avro.AvroDeserializationSchema.deserialize(AvroDeserializationSchema.java:142)
at
org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:103)
... 9 more


如上,
比如
之前的schemal 是
{
a,
b
}

后来调整为
{
a,
b,
c
}

当程序升级后,由于kafka中同时包含新旧数据,就会报错了

Shammon FY <zjur...@gmail.com> 于2023年2月24日周五 18:56写道:

> Hi
>
> 你可以贴一下错误看下具体原因
>
> Best,
> Shammon
>
> On Fri, Feb 24, 2023 at 6:10 PM Peihui He <peihu...@gmail.com> wrote:
>
> > Hi, all
> >
> > 请教大家有没有遇到这样的情况,flink 使用avro
> > 消费kafka中数据,后来在schema结构中加入新的字段。在灰度过程中会混杂着新老数据,这样的flink 消费程序就会挂掉。
> >
> > 大家一般是怎么处理的呢
> >
> > Best Wishes.
> >
>

回复