Piotr/Dawid, Thanks for the reply. FLINK-18223 seems not to related to this issue and I double checked that I am using Flink 1.11.0 instead of 1.10.0. My mistake. StreamExecutionEnvironment#getConfig()#enableObjectReuse()) solved the issue.
I am not using ConfluentRegistryDeserializationSchema. Instead, I am creating custom DeserializationSchema: /* the deser class */ public class SpecificRecordSerDe<T extends SpecificRecord> implements KafkaSerializationSchema<T>, KafkaContextAware<T>, KafkaDeserializationSchema<T>, Serializable { private final Class<T> tClass; private final String tSchemaStr; private volatile transient Schema tSchema; private String topic; private String schemaRegistryUrl; private KafkaAvroSerializer serializer; private KafkaAvroDecoder decoder; public SpecificRecordSerDe(final Class<T> tClass, String tSchemaStr, String schemaRegistryUrl) { this.tClass = tClass; this.tSchemaStr = tSchemaStr; this.topic = null; this.schemaRegistryUrl = schemaRegistryUrl; } @Override public T deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception { CachedSchemaRegistryClient client = new CachedSchemaRegistryClient( schemaRegistryUrl, 4); decoder = new KafkaAvroDecoder(client); GenericRecord generic = (GenericRecord) decoder.fromBytes(record.value()); DatumWriter<GenericRecord> writer = new SpecificDatumWriter<>(generic.getSchema(), ManagedSpecificData.getForClass(tClass)); ByteArrayOutputStream out = new ByteArrayOutputStream(); Encoder encoder = EncoderFactory.get().binaryEncoder(out, null); writer.write(generic, encoder); encoder.flush(); byte[] avroData = out.toByteArray(); out.close(); tSchema = new org.apache.avro.Schema.Parser().parse(tSchemaStr); SpecificDatumReader<T> reader = new SpecificDatumReader<>( generic.getSchema(), tSchema, ManagedSpecificData.getForClass(tClass)); Decoder anotherDecoder = DecoderFactory.get().binaryDecoder(avroData, null); T res = reader.read(null, anotherDecoder); return res; } } /* the specificData class */ public class ManagedSpecificData extends SpecificData { private static ManagedSpecificData getManagedSpecificData() { ManagedSpecificData res = new ManagedSpecificData(); registerAdvancedType(new TimestampMillisType(), res); registerAdvancedType(new LocalDateType(), res); return res; }} /* how we use above deser class */ SpecificRecordSerDe<PayloadRecord> deserializer = new SpecificRecordSerDe<>( PayloadRecord.class, PayloadRecord.getClassSchema().toString(), this.schemaRegistry); FlinkKafkaConsumer consumer = new FlinkKafkaConsumer( this.inputTopic, deserializer, this.sourceSettings); Thanks Lian On Thu, Sep 17, 2020 at 2:19 AM Dawid Wysakowicz <dwysakow...@apache.org> wrote: > Hi, > > Could you share exactly how do you configure avro & kafka? Do you use > Table API or DataStream API? Do you use the > ConfluentRegistryDeserializationSchema that comes with Flink or did you > built custom DeserializationSchema? Could you maybe share the code for > instantiating the source with us? It could help us track down the > problematic spot. > > Best, > > Dawid > > On 16/09/2020 08:09, Lian Jiang wrote: > > Hi, > > > > i am using avro 1.9.1 + Flink 1.10.1 + Confluent Kafka 5.5. In > > Intellij, I can see the FlinkKafkaConsumer already deserialized the > > upstream kafka message. However, I got below error when this message > > is serialized during pushToOperator. Per the stack trace, the reason > > is that AvroSerializer is created by AvroFactory.fromSpecific() which > > creates its private copy of specificData. This private specificData > > does not have logical type information. This blocks the deserialized > > messages from being passed to downstream operators. Any idea how to > > make this work? Appreciated very much! > > > > > > org.apache.avro.AvroRuntimeException: Unknown datum type > > java.time.Instant: 2020-09-15T07:00:00Z > > at > org.apache.avro.generic.GenericData.getSchemaName(GenericData.java:887) > > at > > > org.apache.avro.specific.SpecificData.getSchemaName(SpecificData.java:420) > > at org.apache.avro.generic.GenericData.resolveUnion(GenericData.java:850) > > at org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1280) > > at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1199) > > at org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1261) > > at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1199) > > at org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1280) > > at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1199) > > at org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1261) > > at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1199) > > at > > > org.apache.flink.formats.avro.typeutils.AvroSerializer.copy(AvroSerializer.java:242) > > at > > > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:715) > > at > > > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) > > at > > > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) > > at > > > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) > > at > > > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) > > at > > > org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104) > > at > > > org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111) > > at > > > org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352) > > at > > > org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:185) > > at > > > org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141) > > at > > > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755) > > at > > > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) > > -- Create your own email signature <https://www.wisestamp.com/signature-in-email/?utm_source=promotion&utm_medium=signature&utm_campaign=create_your_own&srcid=5234462839406592>