Hi, Could it be related to https://issues.apache.org/jira/browse/FLINK-18223 ?
Also maybe as a workaround, is it working if you enable object reuse (`StreamExecutionEnvironment#getConfig()#enableObjectReuse())`)? Best regards Piotrek śr., 16 wrz 2020 o 08:09 Lian Jiang <jiangok2...@gmail.com> napisał(a): > Hi, > > i am using avro 1.9.1 + Flink 1.10.1 + Confluent Kafka 5.5. In Intellij, I > can see the FlinkKafkaConsumer already deserialized the upstream kafka > message. However, I got below error when this message is serialized during > pushToOperator. Per the stack trace, the reason is that AvroSerializer is > created by AvroFactory.fromSpecific() which creates its private copy of > specificData. This private specificData does not have logical type > information. This blocks the deserialized messages from being passed to > downstream operators. Any idea how to make this work? Appreciated very much! > > > org.apache.avro.AvroRuntimeException: Unknown datum type > java.time.Instant: 2020-09-15T07:00:00Z > at org.apache.avro.generic.GenericData.getSchemaName(GenericData.java:887) > at > org.apache.avro.specific.SpecificData.getSchemaName(SpecificData.java:420) > at org.apache.avro.generic.GenericData.resolveUnion(GenericData.java:850) > at org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1280) > at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1199) > at org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1261) > at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1199) > at org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1280) > at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1199) > at org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1261) > at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1199) > at > org.apache.flink.formats.avro.typeutils.AvroSerializer.copy(AvroSerializer.java:242) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:715) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) > at > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) > at > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) > at > org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104) > at > org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111) > at > org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352) > at > org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:185) > at > org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) >