Hi,

i am using avro 1.9.1 + Flink 1.10.1 + Confluent Kafka 5.5. In Intellij, I
can see the FlinkKafkaConsumer already deserialized the upstream kafka
message. However, I got below error when this message is serialized during
pushToOperator. Per the stack trace, the reason is that AvroSerializer is
created by AvroFactory.fromSpecific() which creates its private copy of
specificData. This private specificData does not have logical type
information. This blocks the deserialized messages from being passed to
downstream operators. Any idea how to make this work? Appreciated very much!


org.apache.avro.AvroRuntimeException: Unknown datum type java.time.Instant:
2020-09-15T07:00:00Z
at org.apache.avro.generic.GenericData.getSchemaName(GenericData.java:887)
at
org.apache.avro.specific.SpecificData.getSchemaName(SpecificData.java:420)
at org.apache.avro.generic.GenericData.resolveUnion(GenericData.java:850)
at org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1280)
at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1199)
at org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1261)
at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1199)
at org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1280)
at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1199)
at org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1261)
at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1199)
at
org.apache.flink.formats.avro.typeutils.AvroSerializer.copy(AvroSerializer.java:242)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:715)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
at
org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
at
org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
at
org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352)
at
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:185)
at
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)

Reply via email to