Thanks Dawid for proposing ConfluentRegistryDeserializationSchema. I am
trying ConfluentRegistryAvroDeserializationSchema (if this is what you mean)
but got "java.lang.Long cannot be cast to java.time.Instant". This may be
caused by https://issues.apache.org/jira/browse/FLINK-11030.
<https://issues.apache.org/jira/browse/FLINK-11030> Is there any progress
for this JIRA? Thanks. Regards!


Stacktrace:
java.lang.ClassCastException: java.lang.Long cannot be cast to
java.time.Instant
at com.mycompany.mypayload.MetadataRecord.put(MetadataRecord.java:136)
at org.apache.avro.generic.GenericData.setField(GenericData.java:795)
at
org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:139)
at
org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:237)
at
org.apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123)
at
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:170)
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
at
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:178)
at
org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:136)
at
org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:237)
at
org.apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123)
at
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:170)
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:144)
at
org.apache.flink.formats.avro.RegistryAvroDeserializationSchema.deserialize(RegistryAvroDeserializationSchema.java:74)
at
com.mycompany.serde.SpecificRecordSerDe.deserialize(SpecificRecordSerDe.java:89)
at
com.mycompany.serde.SpecificRecordSerDe.deserialize(SpecificRecordSerDe.java:16)
at
org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema.deserialize(KafkaDeserializationSchema.java:80)
at
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181)
at
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)



Code:

import org.apache.avro.specific.SpecificRecord;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import 
org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema;
import 
org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroSerializationSchema;
import org.apache.flink.streaming.connectors.kafka.KafkaContextAware;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;

import javax.annotation.Nullable;
import java.io.Serializable;

public class SpecificRecordSerDe<T extends SpecificRecord> implements
        KafkaSerializationSchema<T>, KafkaContextAware<T>,
KafkaDeserializationSchema<T>, Serializable {

    private final Class<T> tClass;
    private String topic; // for serializer
    private String subject; // for serializer
    private final String schemaRegistryUrl;
    private ConfluentRegistryAvroSerializationSchema<T> serializer;
    private ConfluentRegistryAvroDeserializationSchema<T> deserializer;

    private static final Object lock = new Object();

    public static <T> SpecificRecordSerDe forDeserializer(final
Class<T> tClass, String schemaRegistryUrl) {
        return new SpecificRecordSerDe(tClass, schemaRegistryUrl);
    }

    public static <T> SpecificRecordSerDe forSerializer(final Class<T> tClass,
                                                        String
schemaRegistryUrl,
                                                        final String topic,
                                                        final String subject) {
        return new SpecificRecordSerDe(tClass, schemaRegistryUrl,
topic, subject);
    }

    private SpecificRecordSerDe(final Class<T> tClass, String
schemaRegistryUrl) {
        this.tClass = tClass;
        this.schemaRegistryUrl = schemaRegistryUrl;
    }

    private SpecificRecordSerDe(final Class<T> tClass,
                                final String schemaRegistryUrl,
                                final String topic,
                                final String subject) {
        this(tClass, schemaRegistryUrl);
        this.topic = topic;
        this.subject = subject;
    }

    @Override
    public ProducerRecord<byte[], byte[]> serialize(T element,
@Nullable Long timestamp) {
        if (this.serializer == null) {
            synchronized (lock) {
                if (this.serializer == null) {
                    this.serializer = ConfluentRegistryAvroSerializationSchema
                            .forSpecific(tClass, this.subject,
this.schemaRegistryUrl);
                }
            }
        }

        byte[] bytes = this.serializer.serialize(element);
        return new ProducerRecord<>(this.topic, bytes);
    }

    public boolean isEndOfStream(T nextElement) {
        return false;
    }

    @Override
    public T deserialize(ConsumerRecord<byte[], byte[]> record) throws
Exception {
        if (deserializer == null) {
            synchronized (lock) {
                if (deserializer == null) {
                    deserializer = ConfluentRegistryAvroDeserializationSchema
                            .forSpecific(tClass, this.schemaRegistryUrl);
                }
            }
        }

        return deserializer.deserialize(record.value());
    }

    @Override
    public String getTargetTopic(T element) {
        return this.topic;
    }

    @Override
    public TypeInformation<T> getProducedType() {
        return TypeInformation.of(tClass);
    }
}




On Thu, Sep 17, 2020 at 9:42 AM Dawid Wysakowicz <dwysakow...@apache.org>
wrote:

> Thanks for the update.
>
> First of all, why did you decide to build your own DeserializationSchema
> instead of using ConfluentRegistryDeserializationSchema? Your
> implementation is quite inefficient you do deserialize > serialize >
> deserialize. Serialization/deserialization is usually one of the heaviest
> operations in the pipeline.
>
> What do you return in your getProducedType? From the stack trace I guess
> you are instantiating the AvroTypeInfo? Could you maybe share a full
> runnable example? It would make it much easier to help you.
>
> Moreover the pattern with registering custom conversions in a SpecificData
> will not work with AvroSerializer. Custom serializers should be defined in
> the generated SpecificRecord (in your case PayloadRecord) in the
> SpecificRecordBase#getConversion().
>
> Best,
>
> Dawid
>
>
> On 17/09/2020 16:34, Lian Jiang wrote:
>
> Piotr/Dawid,
>
> Thanks for the reply. FLINK-18223 seems not to related to this issue and I
> double checked that I am using Flink 1.11.0 instead of 1.10.0. My mistake.
> StreamExecutionEnvironment#getConfig()#enableObjectReuse()) solved the
> issue.
>
> I am not using ConfluentRegistryDeserializationSchema. Instead, I am
> creating custom DeserializationSchema:
>
>
> /*
> the deser class
> */
> public class SpecificRecordSerDe<T extends SpecificRecord> implements
>
>         KafkaSerializationSchema<T>, KafkaContextAware<T>, 
> KafkaDeserializationSchema<T>, Serializable {
> private final Class<T> tClass;private final String tSchemaStr;private 
> volatile transient Schema tSchema;private String topic;private String 
> schemaRegistryUrl;private KafkaAvroSerializer serializer;private 
> KafkaAvroDecoder decoder;
> public SpecificRecordSerDe(final Class<T> tClass, String tSchemaStr, String 
> schemaRegistryUrl) {
>     this.tClass = tClass;
>     this.tSchemaStr = tSchemaStr;
>     this.topic = null;
>     this.schemaRegistryUrl = schemaRegistryUrl;
> }
>
> @Overridepublic T deserialize(ConsumerRecord<byte[], byte[]> record) throws 
> Exception {    CachedSchemaRegistryClient client = new 
> CachedSchemaRegistryClient(
>         schemaRegistryUrl,
>         4);
>     decoder = new KafkaAvroDecoder(client);
>     GenericRecord generic = (GenericRecord) 
> decoder.fromBytes(record.value());    DatumWriter<GenericRecord> writer = new 
> SpecificDatumWriter<>(generic.getSchema(), 
> ManagedSpecificData.getForClass(tClass));
>     ByteArrayOutputStream out = new ByteArrayOutputStream();
>     Encoder encoder = EncoderFactory.get().binaryEncoder(out, null);
>     writer.write(generic, encoder);
>     encoder.flush();
>
>     byte[] avroData = out.toByteArray();
>     out.close();
>     tSchema = new org.apache.avro.Schema.Parser().parse(tSchemaStr);
>     SpecificDatumReader<T> reader = new SpecificDatumReader<>(
>             generic.getSchema(), tSchema, 
> ManagedSpecificData.getForClass(tClass));
>     Decoder anotherDecoder = DecoderFactory.get().binaryDecoder(avroData, 
> null);
>     T res = reader.read(null, anotherDecoder);
>
>     return res;
> }
> }
>
>
> /*
> the specificData class
> */public class ManagedSpecificData extends SpecificData {     private static 
> ManagedSpecificData getManagedSpecificData() {
>     ManagedSpecificData res = new ManagedSpecificData();
>
>     registerAdvancedType(new TimestampMillisType(), res);
>     registerAdvancedType(new LocalDateType(), res);
>
>     return res;
> }}
>
>
> /*
>
> how we use above deser class
> */
>
> SpecificRecordSerDe<PayloadRecord> deserializer = new SpecificRecordSerDe<>(
>         PayloadRecord.class,
>         PayloadRecord.getClassSchema().toString(),
>         this.schemaRegistry);
> FlinkKafkaConsumer consumer = new FlinkKafkaConsumer(
>         this.inputTopic,
>         deserializer,
>         this.sourceSettings);
>
>
>
> Thanks
>
> Lian
>
>
>
>
> On Thu, Sep 17, 2020 at 2:19 AM Dawid Wysakowicz <dwysakow...@apache.org>
> wrote:
>
>> Hi,
>>
>> Could you share exactly how do you configure avro & kafka? Do you use
>> Table API or DataStream API? Do you use the
>> ConfluentRegistryDeserializationSchema that comes with Flink or did you
>> built custom DeserializationSchema? Could you maybe share the code for
>> instantiating the source with us? It could help us track down the
>> problematic spot.
>>
>> Best,
>>
>> Dawid
>>
>> On 16/09/2020 08:09, Lian Jiang wrote:
>> > Hi,
>> >
>> > i am using avro 1.9.1 + Flink 1.10.1 + Confluent Kafka 5.5. In
>> > Intellij, I can see the FlinkKafkaConsumer already deserialized the
>> > upstream kafka message. However, I got below error when this message
>> > is serialized during pushToOperator. Per the stack trace, the reason
>> > is that AvroSerializer is created by AvroFactory.fromSpecific() which
>> > creates its private copy of specificData. This private specificData
>> > does not have logical type information. This blocks the deserialized
>> > messages from being passed to downstream operators. Any idea how to
>> > make this work? Appreciated very much!
>> >
>> >
>> > org.apache.avro.AvroRuntimeException: Unknown datum type
>> > java.time.Instant: 2020-09-15T07:00:00Z
>> > at
>> org.apache.avro.generic.GenericData.getSchemaName(GenericData.java:887)
>> > at
>> >
>> org.apache.avro.specific.SpecificData.getSchemaName(SpecificData.java:420)
>> > at
>> org.apache.avro.generic.GenericData.resolveUnion(GenericData.java:850)
>> > at
>> org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1280)
>> > at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1199)
>> > at
>> org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1261)
>> > at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1199)
>> > at
>> org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1280)
>> > at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1199)
>> > at
>> org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1261)
>> > at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1199)
>> > at
>> >
>> org.apache.flink.formats.avro.typeutils.AvroSerializer.copy(AvroSerializer.java:242)
>> > at
>> >
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:715)
>> > at
>> >
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
>> > at
>> >
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
>> > at
>> >
>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
>> > at
>> >
>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
>> > at
>> >
>> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
>> > at
>> >
>> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
>> > at
>> >
>> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352)
>> > at
>> >
>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:185)
>> > at
>> >
>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
>> > at
>> >
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
>> > at
>> >
>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
>>
>>
>
> --
>
>
>
> Create your own email signature
> <https://www.wisestamp.com/signature-in-email/?utm_source=promotion&utm_medium=signature&utm_campaign=create_your_own&srcid=5234462839406592>
>
>

-- 

Create your own email signature
<https://www.wisestamp.com/signature-in-email/?utm_source=promotion&utm_medium=signature&utm_campaign=create_your_own&srcid=5234462839406592>

Reply via email to