Hi All,

Avro was finally bumped in https://issues.apache.org/jira/browse/FLINK-18192.

The implementers didn't see https://issues.apache.org/jira/browse/FLINK-12532, but it is also updated now.

Best,
Aljoscha

On 21.09.20 08:04, Arvid Heise wrote:
Hi Lian,

we had a similar discussion on [1].

TL;DR you are using Avro 1.9.x while Flink still bundles Avro 1.8 [2] until
Hive bumps it [3]. In the thread, I gave some options to avoid running into
the issue.
The easiest fix is to use Avro 1.8.2 all the way, but you may run into [4]
if your logical type is nullable (which is not necessary in most cases).

Still, I think it's time for us to revise the decision to wait for Hive to
bump and rather upgrade independently. Avro was for a long time stuck on
1.8 but the project gained traction again in the past two years. On the
other hand, Hive seems to be rather slow to respond to that and we
shouldn't have a slow moving component block us to support a fast moving
component if it's such apparent that users want it.
@Aljoscha Krettek <aljos...@apache.org> could you please pick that topic up
and ping the respective maintainers?

[1]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Reading-from-AVRO-files-td35850.html
[2] https://issues.apache.org/jira/browse/FLINK-12532
[3] https://issues.apache.org/jira/browse/HIVE-21737
[4] https://issues.apache.org/jira/browse/AVRO-1891

On Sun, Sep 20, 2020 at 9:56 PM Lian Jiang <jiangok2...@gmail.com> wrote:

Thanks Dawid for proposing ConfluentRegistryDeserializationSchema. I am
trying ConfluentRegistryAvroDeserializationSchema (if this is what you
mean) but got "java.lang.Long cannot be cast to java.time.Instant". This
may be caused by https://issues.apache.org/jira/browse/FLINK-11030.
<https://issues.apache.org/jira/browse/FLINK-11030> Is there any progress
for this JIRA? Thanks. Regards!


Stacktrace:
java.lang.ClassCastException: java.lang.Long cannot be cast to
java.time.Instant
at com.mycompany.mypayload.MetadataRecord.put(MetadataRecord.java:136)
at org.apache.avro.generic.GenericData.setField(GenericData.java:795)
at
org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:139)
at
org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:237)
at
org.apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123)
at
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:170)
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
at
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:178)
at
org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:136)
at
org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:237)
at
org.apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123)
at
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:170)
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:144)
at
org.apache.flink.formats.avro.RegistryAvroDeserializationSchema.deserialize(RegistryAvroDeserializationSchema.java:74)
at
com.mycompany.serde.SpecificRecordSerDe.deserialize(SpecificRecordSerDe.java:89)
at
com.mycompany.serde.SpecificRecordSerDe.deserialize(SpecificRecordSerDe.java:16)
at
org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema.deserialize(KafkaDeserializationSchema.java:80)
at
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181)
at
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)



Code:

import org.apache.avro.specific.SpecificRecord;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import 
org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema;
import 
org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroSerializationSchema;
import org.apache.flink.streaming.connectors.kafka.KafkaContextAware;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;

import javax.annotation.Nullable;
import java.io.Serializable;

public class SpecificRecordSerDe<T extends SpecificRecord> implements
         KafkaSerializationSchema<T>, KafkaContextAware<T>, 
KafkaDeserializationSchema<T>, Serializable {

     private final Class<T> tClass;
     private String topic; // for serializer
     private String subject; // for serializer
     private final String schemaRegistryUrl;
     private ConfluentRegistryAvroSerializationSchema<T> serializer;
     private ConfluentRegistryAvroDeserializationSchema<T> deserializer;

     private static final Object lock = new Object();

     public static <T> SpecificRecordSerDe forDeserializer(final Class<T> 
tClass, String schemaRegistryUrl) {
         return new SpecificRecordSerDe(tClass, schemaRegistryUrl);
     }

     public static <T> SpecificRecordSerDe forSerializer(final Class<T> tClass,
                                                         String 
schemaRegistryUrl,
                                                         final String topic,
                                                         final String subject) {
         return new SpecificRecordSerDe(tClass, schemaRegistryUrl, topic, 
subject);
     }

     private SpecificRecordSerDe(final Class<T> tClass, String 
schemaRegistryUrl) {
         this.tClass = tClass;
         this.schemaRegistryUrl = schemaRegistryUrl;
     }

     private SpecificRecordSerDe(final Class<T> tClass,
                                 final String schemaRegistryUrl,
                                 final String topic,
                                 final String subject) {
         this(tClass, schemaRegistryUrl);
         this.topic = topic;
         this.subject = subject;
     }

     @Override
     public ProducerRecord<byte[], byte[]> serialize(T element, @Nullable Long 
timestamp) {
         if (this.serializer == null) {
             synchronized (lock) {
                 if (this.serializer == null) {
                     this.serializer = ConfluentRegistryAvroSerializationSchema
                             .forSpecific(tClass, this.subject, 
this.schemaRegistryUrl);
                 }
             }
         }

         byte[] bytes = this.serializer.serialize(element);
         return new ProducerRecord<>(this.topic, bytes);
     }

     public boolean isEndOfStream(T nextElement) {
         return false;
     }

     @Override
     public T deserialize(ConsumerRecord<byte[], byte[]> record) throws 
Exception {
         if (deserializer == null) {
             synchronized (lock) {
                 if (deserializer == null) {
                     deserializer = ConfluentRegistryAvroDeserializationSchema
                             .forSpecific(tClass, this.schemaRegistryUrl);
                 }
             }
         }

         return deserializer.deserialize(record.value());
     }

     @Override
     public String getTargetTopic(T element) {
         return this.topic;
     }

     @Override
     public TypeInformation<T> getProducedType() {
         return TypeInformation.of(tClass);
     }
}




On Thu, Sep 17, 2020 at 9:42 AM Dawid Wysakowicz <dwysakow...@apache.org>
wrote:

Thanks for the update.

First of all, why did you decide to build your own DeserializationSchema
instead of using ConfluentRegistryDeserializationSchema? Your
implementation is quite inefficient you do deserialize > serialize >
deserialize. Serialization/deserialization is usually one of the heaviest
operations in the pipeline.

What do you return in your getProducedType? From the stack trace I guess
you are instantiating the AvroTypeInfo? Could you maybe share a full
runnable example? It would make it much easier to help you.

Moreover the pattern with registering custom conversions in a
SpecificData will not work with AvroSerializer. Custom serializers should
be defined in the generated SpecificRecord (in your case PayloadRecord) in
the SpecificRecordBase#getConversion().

Best,

Dawid


On 17/09/2020 16:34, Lian Jiang wrote:

Piotr/Dawid,

Thanks for the reply. FLINK-18223 seems not to related to this issue and
I double checked that I am using Flink 1.11.0 instead of 1.10.0. My
mistake. StreamExecutionEnvironment#getConfig()#enableObjectReuse()) solved
the issue.

I am not using ConfluentRegistryDeserializationSchema. Instead, I am
creating custom DeserializationSchema:


/*
the deser class
*/
public class SpecificRecordSerDe<T extends SpecificRecord> implements

         KafkaSerializationSchema<T>, KafkaContextAware<T>, 
KafkaDeserializationSchema<T>, Serializable {
private final Class<T> tClass;private final String tSchemaStr;private volatile 
transient Schema tSchema;private String topic;private String 
schemaRegistryUrl;private KafkaAvroSerializer serializer;private KafkaAvroDecoder 
decoder;
public SpecificRecordSerDe(final Class<T> tClass, String tSchemaStr, String 
schemaRegistryUrl) {
     this.tClass = tClass;
     this.tSchemaStr = tSchemaStr;
     this.topic = null;
     this.schemaRegistryUrl = schemaRegistryUrl;
}

@Overridepublic T deserialize(ConsumerRecord<byte[], byte[]> record) throws 
Exception {    CachedSchemaRegistryClient client = new CachedSchemaRegistryClient(
         schemaRegistryUrl,
         4);
     decoder = new KafkaAvroDecoder(client);
     GenericRecord generic = (GenericRecord) decoder.fromBytes(record.value());    
DatumWriter<GenericRecord> writer = new 
SpecificDatumWriter<>(generic.getSchema(), ManagedSpecificData.getForClass(tClass));
     ByteArrayOutputStream out = new ByteArrayOutputStream();
     Encoder encoder = EncoderFactory.get().binaryEncoder(out, null);
     writer.write(generic, encoder);
     encoder.flush();

     byte[] avroData = out.toByteArray();
     out.close();
     tSchema = new org.apache.avro.Schema.Parser().parse(tSchemaStr);
     SpecificDatumReader<T> reader = new SpecificDatumReader<>(
             generic.getSchema(), tSchema, 
ManagedSpecificData.getForClass(tClass));
     Decoder anotherDecoder = DecoderFactory.get().binaryDecoder(avroData, 
null);
     T res = reader.read(null, anotherDecoder);

     return res;
}
}


/*
the specificData class
*/public class ManagedSpecificData extends SpecificData {     private static 
ManagedSpecificData getManagedSpecificData() {
     ManagedSpecificData res = new ManagedSpecificData();

     registerAdvancedType(new TimestampMillisType(), res);
     registerAdvancedType(new LocalDateType(), res);

     return res;
}}


/*

how we use above deser class
*/

SpecificRecordSerDe<PayloadRecord> deserializer = new SpecificRecordSerDe<>(
         PayloadRecord.class,
         PayloadRecord.getClassSchema().toString(),
         this.schemaRegistry);
FlinkKafkaConsumer consumer = new FlinkKafkaConsumer(
         this.inputTopic,
         deserializer,
         this.sourceSettings);



Thanks

Lian




On Thu, Sep 17, 2020 at 2:19 AM Dawid Wysakowicz <dwysakow...@apache.org>
wrote:

Hi,

Could you share exactly how do you configure avro & kafka? Do you use
Table API or DataStream API? Do you use the
ConfluentRegistryDeserializationSchema that comes with Flink or did you
built custom DeserializationSchema? Could you maybe share the code for
instantiating the source with us? It could help us track down the
problematic spot.

Best,

Dawid

On 16/09/2020 08:09, Lian Jiang wrote:
Hi,

i am using avro 1.9.1 + Flink 1.10.1 + Confluent Kafka 5.5. In
Intellij, I can see the FlinkKafkaConsumer already deserialized the
upstream kafka message. However, I got below error when this message
is serialized during pushToOperator. Per the stack trace, the reason
is that AvroSerializer is created by AvroFactory.fromSpecific() which
creates its private copy of specificData. This private specificData
does not have logical type information. This blocks the deserialized
messages from being passed to downstream operators. Any idea how to
make this work? Appreciated very much!


org.apache.avro.AvroRuntimeException: Unknown datum type
java.time.Instant: 2020-09-15T07:00:00Z
at
org.apache.avro.generic.GenericData.getSchemaName(GenericData.java:887)
at

org.apache.avro.specific.SpecificData.getSchemaName(SpecificData.java:420)
at
org.apache.avro.generic.GenericData.resolveUnion(GenericData.java:850)
at
org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1280)
at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1199)
at
org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1261)
at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1199)
at
org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1280)
at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1199)
at
org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1261)
at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1199)
at

org.apache.flink.formats.avro.typeutils.AvroSerializer.copy(AvroSerializer.java:242)
at

org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:715)
at

org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
at

org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
at

org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
at

org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
at

org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
at

org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
at

org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352)
at

org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:185)
at

org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
at

org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
at

org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)



--



Create your own email signature
<https://www.wisestamp.com/signature-in-email/?utm_source=promotion&utm_medium=signature&utm_campaign=create_your_own&srcid=5234462839406592>



--

Create your own email signature
<https://www.wisestamp.com/signature-in-email/?utm_source=promotion&utm_medium=signature&utm_campaign=create_your_own&srcid=5234462839406592>




Reply via email to