Thanks Dawid for proposing ConfluentRegistryDeserializationSchema. I am trying ConfluentRegistryAvroDeserializationSchema (if this is what you mean) but got "java.lang.Long cannot be cast to java.time.Instant". This may be caused by https://issues.apache.org/jira/browse/FLINK-11030. <https://issues.apache.org/jira/browse/FLINK-11030> Is there any progress for this JIRA? Thanks. Regards!
Stacktrace: java.lang.ClassCastException: java.lang.Long cannot be cast to java.time.Instant at com.mycompany.mypayload.MetadataRecord.put(MetadataRecord.java:136) at org.apache.avro.generic.GenericData.setField(GenericData.java:795) at org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:139) at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:237) at org.apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123) at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:170) at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151) at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:178) at org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:136) at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:237) at org.apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123) at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:170) at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151) at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:144) at org.apache.flink.formats.avro.RegistryAvroDeserializationSchema.deserialize(RegistryAvroDeserializationSchema.java:74) at com.mycompany.serde.SpecificRecordSerDe.deserialize(SpecificRecordSerDe.java:89) at com.mycompany.serde.SpecificRecordSerDe.deserialize(SpecificRecordSerDe.java:16) at org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema.deserialize(KafkaDeserializationSchema.java:80) at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181) at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201) Code: import org.apache.avro.specific.SpecificRecord; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema; import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroSerializationSchema; import org.apache.flink.streaming.connectors.kafka.KafkaContextAware; import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema; import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.producer.ProducerRecord; import javax.annotation.Nullable; import java.io.Serializable; public class SpecificRecordSerDe<T extends SpecificRecord> implements KafkaSerializationSchema<T>, KafkaContextAware<T>, KafkaDeserializationSchema<T>, Serializable { private final Class<T> tClass; private String topic; // for serializer private String subject; // for serializer private final String schemaRegistryUrl; private ConfluentRegistryAvroSerializationSchema<T> serializer; private ConfluentRegistryAvroDeserializationSchema<T> deserializer; private static final Object lock = new Object(); public static <T> SpecificRecordSerDe forDeserializer(final Class<T> tClass, String schemaRegistryUrl) { return new SpecificRecordSerDe(tClass, schemaRegistryUrl); } public static <T> SpecificRecordSerDe forSerializer(final Class<T> tClass, String schemaRegistryUrl, final String topic, final String subject) { return new SpecificRecordSerDe(tClass, schemaRegistryUrl, topic, subject); } private SpecificRecordSerDe(final Class<T> tClass, String schemaRegistryUrl) { this.tClass = tClass; this.schemaRegistryUrl = schemaRegistryUrl; } private SpecificRecordSerDe(final Class<T> tClass, final String schemaRegistryUrl, final String topic, final String subject) { this(tClass, schemaRegistryUrl); this.topic = topic; this.subject = subject; } @Override public ProducerRecord<byte[], byte[]> serialize(T element, @Nullable Long timestamp) { if (this.serializer == null) { synchronized (lock) { if (this.serializer == null) { this.serializer = ConfluentRegistryAvroSerializationSchema .forSpecific(tClass, this.subject, this.schemaRegistryUrl); } } } byte[] bytes = this.serializer.serialize(element); return new ProducerRecord<>(this.topic, bytes); } public boolean isEndOfStream(T nextElement) { return false; } @Override public T deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception { if (deserializer == null) { synchronized (lock) { if (deserializer == null) { deserializer = ConfluentRegistryAvroDeserializationSchema .forSpecific(tClass, this.schemaRegistryUrl); } } } return deserializer.deserialize(record.value()); } @Override public String getTargetTopic(T element) { return this.topic; } @Override public TypeInformation<T> getProducedType() { return TypeInformation.of(tClass); } } On Thu, Sep 17, 2020 at 9:42 AM Dawid Wysakowicz <dwysakow...@apache.org> wrote: > Thanks for the update. > > First of all, why did you decide to build your own DeserializationSchema > instead of using ConfluentRegistryDeserializationSchema? Your > implementation is quite inefficient you do deserialize > serialize > > deserialize. Serialization/deserialization is usually one of the heaviest > operations in the pipeline. > > What do you return in your getProducedType? From the stack trace I guess > you are instantiating the AvroTypeInfo? Could you maybe share a full > runnable example? It would make it much easier to help you. > > Moreover the pattern with registering custom conversions in a SpecificData > will not work with AvroSerializer. Custom serializers should be defined in > the generated SpecificRecord (in your case PayloadRecord) in the > SpecificRecordBase#getConversion(). > > Best, > > Dawid > > > On 17/09/2020 16:34, Lian Jiang wrote: > > Piotr/Dawid, > > Thanks for the reply. FLINK-18223 seems not to related to this issue and I > double checked that I am using Flink 1.11.0 instead of 1.10.0. My mistake. > StreamExecutionEnvironment#getConfig()#enableObjectReuse()) solved the > issue. > > I am not using ConfluentRegistryDeserializationSchema. Instead, I am > creating custom DeserializationSchema: > > > /* > the deser class > */ > public class SpecificRecordSerDe<T extends SpecificRecord> implements > > KafkaSerializationSchema<T>, KafkaContextAware<T>, > KafkaDeserializationSchema<T>, Serializable { > private final Class<T> tClass;private final String tSchemaStr;private > volatile transient Schema tSchema;private String topic;private String > schemaRegistryUrl;private KafkaAvroSerializer serializer;private > KafkaAvroDecoder decoder; > public SpecificRecordSerDe(final Class<T> tClass, String tSchemaStr, String > schemaRegistryUrl) { > this.tClass = tClass; > this.tSchemaStr = tSchemaStr; > this.topic = null; > this.schemaRegistryUrl = schemaRegistryUrl; > } > > @Overridepublic T deserialize(ConsumerRecord<byte[], byte[]> record) throws > Exception { CachedSchemaRegistryClient client = new > CachedSchemaRegistryClient( > schemaRegistryUrl, > 4); > decoder = new KafkaAvroDecoder(client); > GenericRecord generic = (GenericRecord) > decoder.fromBytes(record.value()); DatumWriter<GenericRecord> writer = new > SpecificDatumWriter<>(generic.getSchema(), > ManagedSpecificData.getForClass(tClass)); > ByteArrayOutputStream out = new ByteArrayOutputStream(); > Encoder encoder = EncoderFactory.get().binaryEncoder(out, null); > writer.write(generic, encoder); > encoder.flush(); > > byte[] avroData = out.toByteArray(); > out.close(); > tSchema = new org.apache.avro.Schema.Parser().parse(tSchemaStr); > SpecificDatumReader<T> reader = new SpecificDatumReader<>( > generic.getSchema(), tSchema, > ManagedSpecificData.getForClass(tClass)); > Decoder anotherDecoder = DecoderFactory.get().binaryDecoder(avroData, > null); > T res = reader.read(null, anotherDecoder); > > return res; > } > } > > > /* > the specificData class > */public class ManagedSpecificData extends SpecificData { private static > ManagedSpecificData getManagedSpecificData() { > ManagedSpecificData res = new ManagedSpecificData(); > > registerAdvancedType(new TimestampMillisType(), res); > registerAdvancedType(new LocalDateType(), res); > > return res; > }} > > > /* > > how we use above deser class > */ > > SpecificRecordSerDe<PayloadRecord> deserializer = new SpecificRecordSerDe<>( > PayloadRecord.class, > PayloadRecord.getClassSchema().toString(), > this.schemaRegistry); > FlinkKafkaConsumer consumer = new FlinkKafkaConsumer( > this.inputTopic, > deserializer, > this.sourceSettings); > > > > Thanks > > Lian > > > > > On Thu, Sep 17, 2020 at 2:19 AM Dawid Wysakowicz <dwysakow...@apache.org> > wrote: > >> Hi, >> >> Could you share exactly how do you configure avro & kafka? Do you use >> Table API or DataStream API? Do you use the >> ConfluentRegistryDeserializationSchema that comes with Flink or did you >> built custom DeserializationSchema? Could you maybe share the code for >> instantiating the source with us? It could help us track down the >> problematic spot. >> >> Best, >> >> Dawid >> >> On 16/09/2020 08:09, Lian Jiang wrote: >> > Hi, >> > >> > i am using avro 1.9.1 + Flink 1.10.1 + Confluent Kafka 5.5. In >> > Intellij, I can see the FlinkKafkaConsumer already deserialized the >> > upstream kafka message. However, I got below error when this message >> > is serialized during pushToOperator. Per the stack trace, the reason >> > is that AvroSerializer is created by AvroFactory.fromSpecific() which >> > creates its private copy of specificData. This private specificData >> > does not have logical type information. This blocks the deserialized >> > messages from being passed to downstream operators. Any idea how to >> > make this work? Appreciated very much! >> > >> > >> > org.apache.avro.AvroRuntimeException: Unknown datum type >> > java.time.Instant: 2020-09-15T07:00:00Z >> > at >> org.apache.avro.generic.GenericData.getSchemaName(GenericData.java:887) >> > at >> > >> org.apache.avro.specific.SpecificData.getSchemaName(SpecificData.java:420) >> > at >> org.apache.avro.generic.GenericData.resolveUnion(GenericData.java:850) >> > at >> org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1280) >> > at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1199) >> > at >> org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1261) >> > at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1199) >> > at >> org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1280) >> > at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1199) >> > at >> org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1261) >> > at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1199) >> > at >> > >> org.apache.flink.formats.avro.typeutils.AvroSerializer.copy(AvroSerializer.java:242) >> > at >> > >> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:715) >> > at >> > >> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) >> > at >> > >> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) >> > at >> > >> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) >> > at >> > >> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) >> > at >> > >> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104) >> > at >> > >> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111) >> > at >> > >> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352) >> > at >> > >> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:185) >> > at >> > >> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141) >> > at >> > >> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755) >> > at >> > >> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) >> >> > > -- > > > > Create your own email signature > <https://www.wisestamp.com/signature-in-email/?utm_source=promotion&utm_medium=signature&utm_campaign=create_your_own&srcid=5234462839406592> > > -- Create your own email signature <https://www.wisestamp.com/signature-in-email/?utm_source=promotion&utm_medium=signature&utm_campaign=create_your_own&srcid=5234462839406592>