Hi, i am using avro 1.9.1 + Flink 1.10.1 + Confluent Kafka 5.5. In Intellij, I can see the FlinkKafkaConsumer already deserialized the upstream kafka message. However, I got below error when this message is serialized during pushToOperator. Per the stack trace, the reason is that AvroSerializer is created by AvroFactory.fromSpecific() which creates its private copy of specificData. This private specificData does not have logical type information. This blocks the deserialized messages from being passed to downstream operators. Any idea how to make this work? Appreciated very much!
org.apache.avro.AvroRuntimeException: Unknown datum type java.time.Instant: 2020-09-15T07:00:00Z at org.apache.avro.generic.GenericData.getSchemaName(GenericData.java:887) at org.apache.avro.specific.SpecificData.getSchemaName(SpecificData.java:420) at org.apache.avro.generic.GenericData.resolveUnion(GenericData.java:850) at org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1280) at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1199) at org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1261) at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1199) at org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1280) at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1199) at org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1261) at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1199) at org.apache.flink.formats.avro.typeutils.AvroSerializer.copy(AvroSerializer.java:242) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:715) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104) at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111) at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352) at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:185) at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)