Thanks for the update.

First of all, why did you decide to build your own DeserializationSchema
instead of using ConfluentRegistryDeserializationSchema? Your
implementation is quite inefficient you do deserialize > serialize >
deserialize. Serialization/deserialization is usually one of the
heaviest operations in the pipeline.

What do you return in your getProducedType? From the stack trace I guess
you are instantiating the AvroTypeInfo? Could you maybe share a full
runnable example? It would make it much easier to help you.

Moreover the pattern with registering custom conversions in a
SpecificData will not work with AvroSerializer. Custom serializers
should be defined in the generated SpecificRecord (in your case
PayloadRecord) in the SpecificRecordBase#getConversion().

Best,

Dawid


On 17/09/2020 16:34, Lian Jiang wrote:
> Piotr/Dawid,
>
> Thanks for the reply. FLINK-18223 seems not to related to this issue
> and I double checked that I am using Flink 1.11.0 instead of 1.10.0.
> My mistake.
> StreamExecutionEnvironment#getConfig()#enableObjectReuse()) solved the
> issue.
>
> I am not using ConfluentRegistryDeserializationSchema. Instead, I am
> creating custom DeserializationSchema:
>
>
> /*
> the deser class
> */
> public class SpecificRecordSerDe<T extends SpecificRecord> implements
> KafkaSerializationSchema<T>, KafkaContextAware<T>,
> KafkaDeserializationSchema<T>, Serializable { private final Class<T>
> tClass; private final String tSchemaStr; private volatile transient
> Schema tSchema; private String topic; private String
> schemaRegistryUrl; private KafkaAvroSerializer serializer; private
> KafkaAvroDecoder decoder; public SpecificRecordSerDe(final Class<T>
> tClass, String tSchemaStr, String schemaRegistryUrl) { this.tClass =
> tClass; this.tSchemaStr = tSchemaStr; this.topic = null;
> this.schemaRegistryUrl = schemaRegistryUrl; }
> @Override public T deserialize(ConsumerRecord<byte[], byte[]> record) throws 
> Exception {
> CachedSchemaRegistryClient client = new CachedSchemaRegistryClient(
>         schemaRegistryUrl,
>         4);
>
> decoder = new KafkaAvroDecoder(client);
>     GenericRecord generic = (GenericRecord) 
> decoder.fromBytes(record.value());DatumWriter<GenericRecord> writer = new 
> SpecificDatumWriter<>(generic.getSchema(), 
> ManagedSpecificData.getForClass(tClass));
>     ByteArrayOutputStream out = new ByteArrayOutputStream();
>     Encoder encoder = EncoderFactory.get().binaryEncoder(out, null);
>     writer.write(generic, encoder);
>     encoder.flush();
>
>     byte[] avroData = out.toByteArray();
>     out.close();
>
> tSchema = new org.apache.avro.Schema.Parser().parse(tSchemaStr);
>     SpecificDatumReader<T> reader = new SpecificDatumReader<>(
>             generic.getSchema(), tSchema, 
> ManagedSpecificData.getForClass(tClass));
>     Decoder anotherDecoder = DecoderFactory.get().binaryDecoder(avroData, 
> null);
>     T res = reader.read(null, anotherDecoder);
>
>     return res;
> }
> }
>
>
> /*
> the specificData class
> */
> public class ManagedSpecificData extends SpecificData {  
> private static ManagedSpecificData getManagedSpecificData() {
>     ManagedSpecificData res = new ManagedSpecificData();
>
>     registerAdvancedType(new TimestampMillisType(), res);
>     registerAdvancedType(new LocalDateType(), res);
>
>     return res;
> }}
>
> /*
> how we use above deser class
> */
> SpecificRecordSerDe<PayloadRecord> deserializer = new SpecificRecordSerDe<>(
>         PayloadRecord.class,
>         PayloadRecord.getClassSchema().toString(),
>         this.schemaRegistry);
>
> FlinkKafkaConsumer consumer = new FlinkKafkaConsumer(
>         this.inputTopic,
>         deserializer,
>         this.sourceSettings);
>
>
> Thanks
> Lian
>
>
>
>
>
> On Thu, Sep 17, 2020 at 2:19 AM Dawid Wysakowicz
> <dwysakow...@apache.org <mailto:dwysakow...@apache.org>> wrote:
>
>     Hi,
>
>     Could you share exactly how do you configure avro & kafka? Do you use
>     Table API or DataStream API? Do you use the
>     ConfluentRegistryDeserializationSchema that comes with Flink or
>     did you
>     built custom DeserializationSchema? Could you maybe share the code for
>     instantiating the source with us? It could help us track down the
>     problematic spot.
>
>     Best,
>
>     Dawid
>
>     On 16/09/2020 08:09, Lian Jiang wrote:
>     > Hi,
>     >
>     > i am using avro 1.9.1 + Flink 1.10.1 + Confluent Kafka 5.5. In
>     > Intellij, I can see the FlinkKafkaConsumer already deserialized the
>     > upstream kafka message. However, I got below error when this message
>     > is serialized during pushToOperator. Per the stack trace, the reason
>     > is that AvroSerializer is created by AvroFactory.fromSpecific()
>     which
>     > creates its private copy of specificData. This private specificData
>     > does not have logical type information. This blocks the deserialized
>     > messages from being passed to downstream operators. Any idea how to
>     > make this work? Appreciated very much!
>     >
>     >
>     > org.apache.avro.AvroRuntimeException: Unknown datum type
>     > java.time.Instant: 2020-09-15T07:00:00Z
>     > at
>     org.apache.avro.generic.GenericData.getSchemaName(GenericData.java:887)
>     > at
>     >
>     org.apache.avro.specific.SpecificData.getSchemaName(SpecificData.java:420)
>     > at
>     org.apache.avro.generic.GenericData.resolveUnion(GenericData.java:850)
>     > at
>     org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1280)
>     > at
>     org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1199)
>     > at
>     org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1261)
>     > at
>     org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1199)
>     > at
>     org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1280)
>     > at
>     org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1199)
>     > at
>     org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1261)
>     > at
>     org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1199)
>     > at
>     >
>     
> org.apache.flink.formats.avro.typeutils.AvroSerializer.copy(AvroSerializer.java:242)
>     > at
>     >
>     
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:715)
>     > at
>     >
>     
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
>     > at
>     >
>     
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
>     > at
>     >
>     
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
>     > at
>     >
>     
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
>     > at
>     >
>     
> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
>     > at
>     >
>     
> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
>     > at
>     >
>     
> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352)
>     > at
>     >
>     
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:185)
>     > at
>     >
>     
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
>     > at
>     >
>     
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
>     > at
>     >
>     
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
>
>
>
> -- 
>
>
>
> Create your own email signature
> <https://www.wisestamp.com/signature-in-email/?utm_source=promotion&utm_medium=signature&utm_campaign=create_your_own&srcid=5234462839406592>
>

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to