Thanks for the update. First of all, why did you decide to build your own DeserializationSchema instead of using ConfluentRegistryDeserializationSchema? Your implementation is quite inefficient you do deserialize > serialize > deserialize. Serialization/deserialization is usually one of the heaviest operations in the pipeline.
What do you return in your getProducedType? From the stack trace I guess you are instantiating the AvroTypeInfo? Could you maybe share a full runnable example? It would make it much easier to help you. Moreover the pattern with registering custom conversions in a SpecificData will not work with AvroSerializer. Custom serializers should be defined in the generated SpecificRecord (in your case PayloadRecord) in the SpecificRecordBase#getConversion(). Best, Dawid On 17/09/2020 16:34, Lian Jiang wrote: > Piotr/Dawid, > > Thanks for the reply. FLINK-18223 seems not to related to this issue > and I double checked that I am using Flink 1.11.0 instead of 1.10.0. > My mistake. > StreamExecutionEnvironment#getConfig()#enableObjectReuse()) solved the > issue. > > I am not using ConfluentRegistryDeserializationSchema. Instead, I am > creating custom DeserializationSchema: > > > /* > the deser class > */ > public class SpecificRecordSerDe<T extends SpecificRecord> implements > KafkaSerializationSchema<T>, KafkaContextAware<T>, > KafkaDeserializationSchema<T>, Serializable { private final Class<T> > tClass; private final String tSchemaStr; private volatile transient > Schema tSchema; private String topic; private String > schemaRegistryUrl; private KafkaAvroSerializer serializer; private > KafkaAvroDecoder decoder; public SpecificRecordSerDe(final Class<T> > tClass, String tSchemaStr, String schemaRegistryUrl) { this.tClass = > tClass; this.tSchemaStr = tSchemaStr; this.topic = null; > this.schemaRegistryUrl = schemaRegistryUrl; } > @Override public T deserialize(ConsumerRecord<byte[], byte[]> record) throws > Exception { > CachedSchemaRegistryClient client = new CachedSchemaRegistryClient( > schemaRegistryUrl, > 4); > > decoder = new KafkaAvroDecoder(client); > GenericRecord generic = (GenericRecord) > decoder.fromBytes(record.value());DatumWriter<GenericRecord> writer = new > SpecificDatumWriter<>(generic.getSchema(), > ManagedSpecificData.getForClass(tClass)); > ByteArrayOutputStream out = new ByteArrayOutputStream(); > Encoder encoder = EncoderFactory.get().binaryEncoder(out, null); > writer.write(generic, encoder); > encoder.flush(); > > byte[] avroData = out.toByteArray(); > out.close(); > > tSchema = new org.apache.avro.Schema.Parser().parse(tSchemaStr); > SpecificDatumReader<T> reader = new SpecificDatumReader<>( > generic.getSchema(), tSchema, > ManagedSpecificData.getForClass(tClass)); > Decoder anotherDecoder = DecoderFactory.get().binaryDecoder(avroData, > null); > T res = reader.read(null, anotherDecoder); > > return res; > } > } > > > /* > the specificData class > */ > public class ManagedSpecificData extends SpecificData { > private static ManagedSpecificData getManagedSpecificData() { > ManagedSpecificData res = new ManagedSpecificData(); > > registerAdvancedType(new TimestampMillisType(), res); > registerAdvancedType(new LocalDateType(), res); > > return res; > }} > > /* > how we use above deser class > */ > SpecificRecordSerDe<PayloadRecord> deserializer = new SpecificRecordSerDe<>( > PayloadRecord.class, > PayloadRecord.getClassSchema().toString(), > this.schemaRegistry); > > FlinkKafkaConsumer consumer = new FlinkKafkaConsumer( > this.inputTopic, > deserializer, > this.sourceSettings); > > > Thanks > Lian > > > > > > On Thu, Sep 17, 2020 at 2:19 AM Dawid Wysakowicz > <dwysakow...@apache.org <mailto:dwysakow...@apache.org>> wrote: > > Hi, > > Could you share exactly how do you configure avro & kafka? Do you use > Table API or DataStream API? Do you use the > ConfluentRegistryDeserializationSchema that comes with Flink or > did you > built custom DeserializationSchema? Could you maybe share the code for > instantiating the source with us? It could help us track down the > problematic spot. > > Best, > > Dawid > > On 16/09/2020 08:09, Lian Jiang wrote: > > Hi, > > > > i am using avro 1.9.1 + Flink 1.10.1 + Confluent Kafka 5.5. In > > Intellij, I can see the FlinkKafkaConsumer already deserialized the > > upstream kafka message. However, I got below error when this message > > is serialized during pushToOperator. Per the stack trace, the reason > > is that AvroSerializer is created by AvroFactory.fromSpecific() > which > > creates its private copy of specificData. This private specificData > > does not have logical type information. This blocks the deserialized > > messages from being passed to downstream operators. Any idea how to > > make this work? Appreciated very much! > > > > > > org.apache.avro.AvroRuntimeException: Unknown datum type > > java.time.Instant: 2020-09-15T07:00:00Z > > at > org.apache.avro.generic.GenericData.getSchemaName(GenericData.java:887) > > at > > > org.apache.avro.specific.SpecificData.getSchemaName(SpecificData.java:420) > > at > org.apache.avro.generic.GenericData.resolveUnion(GenericData.java:850) > > at > org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1280) > > at > org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1199) > > at > org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1261) > > at > org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1199) > > at > org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1280) > > at > org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1199) > > at > org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1261) > > at > org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1199) > > at > > > > org.apache.flink.formats.avro.typeutils.AvroSerializer.copy(AvroSerializer.java:242) > > at > > > > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:715) > > at > > > > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) > > at > > > > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) > > at > > > > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) > > at > > > > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) > > at > > > > org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104) > > at > > > > org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111) > > at > > > > org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352) > > at > > > > org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:185) > > at > > > > org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141) > > at > > > > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755) > > at > > > > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) > > > > -- > > > > Create your own email signature > <https://www.wisestamp.com/signature-in-email/?utm_source=promotion&utm_medium=signature&utm_campaign=create_your_own&srcid=5234462839406592> >
signature.asc
Description: OpenPGP digital signature