Hi,

Could it be related to https://issues.apache.org/jira/browse/FLINK-18223 ?

Also maybe as a workaround, is it working if you enable object reuse
(`StreamExecutionEnvironment#getConfig()#enableObjectReuse())`)?

Best regards
Piotrek

śr., 16 wrz 2020 o 08:09 Lian Jiang <jiangok2...@gmail.com> napisał(a):

> Hi,
>
> i am using avro 1.9.1 + Flink 1.10.1 + Confluent Kafka 5.5. In Intellij, I
> can see the FlinkKafkaConsumer already deserialized the upstream kafka
> message. However, I got below error when this message is serialized during
> pushToOperator. Per the stack trace, the reason is that AvroSerializer is
> created by AvroFactory.fromSpecific() which creates its private copy of
> specificData. This private specificData does not have logical type
> information. This blocks the deserialized messages from being passed to
> downstream operators. Any idea how to make this work? Appreciated very much!
>
>
> org.apache.avro.AvroRuntimeException: Unknown datum type
> java.time.Instant: 2020-09-15T07:00:00Z
> at org.apache.avro.generic.GenericData.getSchemaName(GenericData.java:887)
> at
> org.apache.avro.specific.SpecificData.getSchemaName(SpecificData.java:420)
> at org.apache.avro.generic.GenericData.resolveUnion(GenericData.java:850)
> at org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1280)
> at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1199)
> at org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1261)
> at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1199)
> at org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1280)
> at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1199)
> at org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1261)
> at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1199)
> at
> org.apache.flink.formats.avro.typeutils.AvroSerializer.copy(AvroSerializer.java:242)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:715)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
> at
> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
> at
> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
> at
> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352)
> at
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:185)
> at
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
>

Reply via email to