Piotr/Dawid,

Thanks for the reply. FLINK-18223 seems not to related to this issue and I
double checked that I am using Flink 1.11.0 instead of 1.10.0. My mistake.
StreamExecutionEnvironment#getConfig()#enableObjectReuse()) solved the
issue.

I am not using ConfluentRegistryDeserializationSchema. Instead, I am
creating custom DeserializationSchema:


/*
the deser class
*/
public class SpecificRecordSerDe<T extends SpecificRecord> implements

        KafkaSerializationSchema<T>, KafkaContextAware<T>,
KafkaDeserializationSchema<T>, Serializable {

private final Class<T> tClass;
private final String tSchemaStr;
private volatile transient Schema tSchema;
private String topic;
private String schemaRegistryUrl;
private KafkaAvroSerializer serializer;
private KafkaAvroDecoder decoder;

public SpecificRecordSerDe(final Class<T> tClass, String tSchemaStr,
String schemaRegistryUrl) {
    this.tClass = tClass;
    this.tSchemaStr = tSchemaStr;
    this.topic = null;
    this.schemaRegistryUrl = schemaRegistryUrl;
}

@Override
public T deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
    CachedSchemaRegistryClient client = new CachedSchemaRegistryClient(
        schemaRegistryUrl,
        4);

    decoder = new KafkaAvroDecoder(client);
    GenericRecord generic = (GenericRecord) decoder.fromBytes(record.value());
    DatumWriter<GenericRecord> writer = new
SpecificDatumWriter<>(generic.getSchema(),
ManagedSpecificData.getForClass(tClass));
    ByteArrayOutputStream out = new ByteArrayOutputStream();
    Encoder encoder = EncoderFactory.get().binaryEncoder(out, null);
    writer.write(generic, encoder);
    encoder.flush();

    byte[] avroData = out.toByteArray();
    out.close();

    tSchema = new org.apache.avro.Schema.Parser().parse(tSchemaStr);
    SpecificDatumReader<T> reader = new SpecificDatumReader<>(
            generic.getSchema(), tSchema,
ManagedSpecificData.getForClass(tClass));
    Decoder anotherDecoder = DecoderFactory.get().binaryDecoder(avroData, null);
    T res = reader.read(null, anotherDecoder);

    return res;
}
}


/*
the specificData class
*/
public class ManagedSpecificData extends SpecificData {
   private static ManagedSpecificData getManagedSpecificData() {
    ManagedSpecificData res = new ManagedSpecificData();

    registerAdvancedType(new TimestampMillisType(), res);
    registerAdvancedType(new LocalDateType(), res);

    return res;
}}

/*

how we use above deser class
*/

SpecificRecordSerDe<PayloadRecord> deserializer = new SpecificRecordSerDe<>(
        PayloadRecord.class,
        PayloadRecord.getClassSchema().toString(),
        this.schemaRegistry);

FlinkKafkaConsumer consumer = new FlinkKafkaConsumer(
        this.inputTopic,
        deserializer,
        this.sourceSettings);


Thanks

Lian







On Thu, Sep 17, 2020 at 2:19 AM Dawid Wysakowicz <dwysakow...@apache.org>
wrote:

> Hi,
>
> Could you share exactly how do you configure avro & kafka? Do you use
> Table API or DataStream API? Do you use the
> ConfluentRegistryDeserializationSchema that comes with Flink or did you
> built custom DeserializationSchema? Could you maybe share the code for
> instantiating the source with us? It could help us track down the
> problematic spot.
>
> Best,
>
> Dawid
>
> On 16/09/2020 08:09, Lian Jiang wrote:
> > Hi,
> >
> > i am using avro 1.9.1 + Flink 1.10.1 + Confluent Kafka 5.5. In
> > Intellij, I can see the FlinkKafkaConsumer already deserialized the
> > upstream kafka message. However, I got below error when this message
> > is serialized during pushToOperator. Per the stack trace, the reason
> > is that AvroSerializer is created by AvroFactory.fromSpecific() which
> > creates its private copy of specificData. This private specificData
> > does not have logical type information. This blocks the deserialized
> > messages from being passed to downstream operators. Any idea how to
> > make this work? Appreciated very much!
> >
> >
> > org.apache.avro.AvroRuntimeException: Unknown datum type
> > java.time.Instant: 2020-09-15T07:00:00Z
> > at
> org.apache.avro.generic.GenericData.getSchemaName(GenericData.java:887)
> > at
> >
> org.apache.avro.specific.SpecificData.getSchemaName(SpecificData.java:420)
> > at org.apache.avro.generic.GenericData.resolveUnion(GenericData.java:850)
> > at org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1280)
> > at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1199)
> > at org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1261)
> > at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1199)
> > at org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1280)
> > at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1199)
> > at org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1261)
> > at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1199)
> > at
> >
> org.apache.flink.formats.avro.typeutils.AvroSerializer.copy(AvroSerializer.java:242)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:715)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
> > at
> >
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
> > at
> >
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
> > at
> >
> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
> > at
> >
> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
> > at
> >
> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352)
> > at
> >
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:185)
> > at
> >
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
> > at
> >
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
> > at
> >
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
>
>

-- 

Create your own email signature
<https://www.wisestamp.com/signature-in-email/?utm_source=promotion&utm_medium=signature&utm_campaign=create_your_own&srcid=5234462839406592>

Reply via email to